Compare commits

..

No commits in common. "master" and "v2.12" have entirely different histories.

91 changed files with 538 additions and 1995 deletions

71
.github/workflows/codeql-analysis.yml vendored Normal file
View file

@ -0,0 +1,71 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
name: "CodeQL"
on:
push:
branches: [master]
pull_request:
# The branches below must be a subset of the branches above
branches: [master]
schedule:
- cron: '0 18 * * 5'
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
# Override automatic language detection by changing the below list
# Supported options are ['csharp', 'cpp', 'go', 'java', 'javascript', 'python']
language: ['python']
# Learn more...
# https://docs.github.com/en/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#overriding-automatic-language-detection
steps:
- name: Checkout repository
uses: actions/checkout@v3
with:
# We must fetch at least the immediate parents so that if this is
# a pull request then we can checkout the head.
fetch-depth: 2
# If this run was triggered by a pull request event, then checkout
# the head of the pull request instead of the merge commit.
- run: git checkout HEAD^2
if: ${{ github.event_name == 'pull_request' }}
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v2
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
#- run: |
# make bootstrap
# make release
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v2

View file

@ -1,84 +0,0 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "CodeQL"
on:
push:
branches: [ "master" ]
pull_request:
branches: [ "master" ]
schedule:
- cron: '34 14 * * 4'
jobs:
analyze:
name: Analyze
# Runner size impacts CodeQL analysis time. To learn more, please see:
# - https://gh.io/recommended-hardware-resources-for-running-codeql
# - https://gh.io/supported-runners-and-hardware-resources
# - https://gh.io/using-larger-runners
# Consider using larger runners for possible analysis time improvements.
runs-on: ${{ (matrix.language == 'swift' && 'macos-latest') || 'ubuntu-latest' }}
timeout-minutes: ${{ (matrix.language == 'swift' && 120) || 360 }}
permissions:
# required for all workflows
security-events: write
# only required for workflows in private repositories
actions: read
contents: read
strategy:
fail-fast: false
matrix:
language: [ 'python' ]
# CodeQL supports [ 'c-cpp', 'csharp', 'go', 'java-kotlin', 'javascript-typescript', 'python', 'ruby', 'swift' ]
# Use only 'java-kotlin' to analyze code written in Java, Kotlin or both
# Use only 'javascript-typescript' to analyze code written in JavaScript, TypeScript or both
# Learn more about CodeQL language support at https://aka.ms/codeql-docs/language-support
steps:
- name: Checkout repository
uses: actions/checkout@v4
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# For more details on CodeQL's query packs, refer to: https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs
# queries: security-extended,security-and-quality
# Autobuild attempts to build any compiled languages (C/C++, C#, Go, Java, or Swift).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v3
# Command-line programs to run using the OS shell.
# 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun
# If the Autobuild fails above, remove it and uncomment the following three lines.
# modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance.
# - run: |
# echo "Run, Build Application using script"
# ./location_of_script_within_repo/buildscript.sh
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v3
with:
category: "/language:${{matrix.language}}"

View file

@ -6,9 +6,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@v4
uses: actions/checkout@v3
- name: Setup Cache
uses: actions/cache@v4
uses: actions/cache@v3
env:
cache-name: cache-pipmypy
with:
@ -18,7 +18,7 @@ jobs:
${{ runner.os }}-${{ env.cache-name }}-
${{ runner.os }}-cache-pip-
- name: Install deps
run: pip3 install -U tornado pytest pytest-asyncio pytest-httpbin pytest-rerunfailures structlog tomli platformdirs aiohttp httpx mypy awesomeversion
run: pip3 install -U tornado pytest pytest-asyncio pytest-httpbin flaky structlog tomli platformdirs aiohttp httpx mypy awesomeversion
- name: Run mypy for --install-types
run: PATH=$HOME/.local/bin:$PATH mypy --namespace-packages --explicit-package-bases nvchecker nvchecker_source tests
continue-on-error: true

View file

@ -7,12 +7,11 @@ jobs:
fail-fast: false
matrix:
python-version:
- "3.7"
- "3.8"
- "3.9"
- "3.10"
- "3.11"
- "3.12"
- "3.13"
# pypy fails in some cases but we don't care much about that
# with github actions we can't mark some jobs to not affect the overall
# conclusion so we have to omit "allow-failure" tests.
@ -20,20 +19,23 @@ jobs:
# - pypy-3.7
deps:
- tornado pycurl
# timer runs when loop is closed, see https://github.com/lilydjwg/nvchecker/actions/runs/11650699759/job/32439742210
# - aiohttp
- aiohttp
- tornado
- httpx[http2]>=0.14.0
exclude: []
exclude:
# Python 3.7 has a bug with openssl 3.x: https://bugs.python.org/issue43788
# https://github.com/lilydjwg/nvchecker/actions/runs/4524633969/jobs/7968599431
- python-version: "3.7"
deps: tornado
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v3
- name: Setup Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Setup Cache
uses: actions/cache@v4
uses: actions/cache@v3
env:
cache-name: cache-pip
with:
@ -42,7 +44,6 @@ jobs:
restore-keys: |
${{ runner.os }}-${{ env.cache-name }}-${{ matrix.deps }}-
${{ runner.os }}-${{ env.cache-name }}-
- name: Install pycurl deps
if: ${{ contains(matrix.deps, 'pycurl') }}
run: |
@ -50,44 +51,10 @@ jobs:
sudo apt install -y libcurl4-openssl-dev
# werkzeug is pinned for httpbin compatibility https://github.com/postmanlabs/httpbin/issues/673
- name: Install Python deps
env:
# use env to avoid `>` being redirection
deps: ${{ matrix.deps }}
run: pip install -U $deps pytest 'pytest-asyncio>=0.24' pytest-httpbin pytest-rerunfailures structlog tomli platformdirs lxml jq 'werkzeug<2.1' awesomeversion
# don't use binary distribution because:
# hardcoded cacert path doesn't work on Ubuntu (should have been resolved?)
# limited compression support (only deflate & gzip)
- name: Install pycurl
if: ${{ contains(matrix.deps, 'pycurl') }}
run: |
pip uninstall -y pycurl
pip install -U pycurl --no-binary :all:
run: pip install -U ${{ matrix.deps }} pytest pytest-asyncio pytest-httpbin flaky structlog tomli platformdirs lxml 'werkzeug<2.1' awesomeversion
- name: Decrypt keys
env:
KEY: ${{ secrets.KEY }}
run: if [[ -n $KEY ]]; then openssl enc -d -aes-256-ctr -pbkdf2 -k $KEY -in keyfile.toml.enc -out keyfile.toml; fi
- name: Setup mitmproxy cache
uses: actions/cache@v4
env:
cache-name: cache-mitm
with:
path: ~/.mitmproxy
key: ${{ env.cache-name }}
restore-keys: |
${{ env.cache-name }}-
- name: Install mitmproxy
run: |
/usr/bin/python -m venv --system-site-packages ~/.mitmproxy/venv
. ~/.mitmproxy/venv/bin/activate
pip install -U mitmproxy
# https://github.com/DevToys-app/DevToys/issues/1373#issuecomment-2599820594
sudo sysctl -w kernel.apparmor_restrict_unprivileged_unconfined=0
sudo sysctl -w kernel.apparmor_restrict_unprivileged_userns=0
# - name: Setup upterm session
# uses: lhotari/action-upterm@v1
- name: Run pytest
env:
mitmdump: /home/runner/.mitmproxy/venv/bin/mitmdump
run: scripts/run_cached_tests
run: if [[ -f keyfile.toml ]]; then KEYFILE=keyfile.toml pytest; else pytest; fi

View file

@ -1,10 +0,0 @@
version: 2
build:
os: ubuntu-22.04
tools:
python: "3.11"
sphinx:
configuration: docs/conf.py
python:
install:
- requirements: docs/requirements.txt

View file

@ -23,7 +23,7 @@ This is the version 2.0 branch. For the old version 1.x, please switch to the ``
Dependency
----------
- Python 3.8+
- Python 3.7+
- Python library: structlog, platformdirs, tomli (on Python < 3.11)
- One of these Python library combinations (ordered by preference):

View file

@ -30,9 +30,8 @@ autodoc_inherit_docstrings = False
# https://bitbucket.org/birkenfeld/sphinx/issue/1337/autoclass_content-both-uses-object__init__
autodoc_docstring_signature = False
intersphinx_mapping = {"python": ("https://docs.python.org/3", None)}
intersphinx_mapping = {"python": ("https://docs.python.org/3.8/", None)}
html_theme = "sphinx_rtd_theme"
on_rtd = os.environ.get("READTHEDOCS", None) == "True"
# On RTD we can't import sphinx_rtd_theme, but it will be applied by
@ -40,6 +39,8 @@ on_rtd = os.environ.get("READTHEDOCS", None) == "True"
# as on RTD.
if not on_rtd:
import sphinx_rtd_theme
html_theme = "sphinx_rtd_theme"
html_theme_path = [sphinx_rtd_theme.get_html_theme_path()]
html_theme_options = {

View file

@ -1,3 +1,4 @@
tomli
structlog
platformdirs
tornado>=6

View file

@ -17,7 +17,7 @@ This is the version 2.0 branch. For the old version 1.x, please switch to the ``
Dependency
----------
- Python 3.8+
- Python 3.7+
- Python library: structlog, platformdirs, tomli (on Python < 3.11)
- One of these Python library combinations (ordered by preference):
@ -71,8 +71,8 @@ The JSON log is one JSON string per line. The following documented events and
fields are stable, undocumented ones may change without notice.
event=updated
An update is detected. Fields ``name``, ``revision``, ``old_version`` and ``version`` are
available. ``old_version`` may be ``null`` and ``revision`` may be absent.
An update is detected. Fields ``name``, ``old_version`` and ``version`` are
available. ``old_version`` maybe ``null``.
event=up-to-date
There is no update. Fields ``name`` and ``version`` are available.
@ -170,9 +170,6 @@ prefix
Strip the prefix string if the version string starts with it. Otherwise the
version string is returned as-is.
If both ``prefix`` and ``from_pattern``/``to_pattern`` are used, ``prefix``
is applied first.
from_pattern, to_pattern
Both are Python-compatible regular expressions. If ``from_pattern`` is found
in the version string, it will be replaced with ``to_pattern``.
@ -182,7 +179,7 @@ from_pattern, to_pattern
missing_ok
Suppress warnings and errors if a version checking module finds nothing.
Not all sources support it.
Currently only ``regex`` supports it.
proxy
The HTTP proxy to use. The format is ``proto://host:port``, e.g.
@ -215,6 +212,13 @@ httptoken
verify_cert
Whether to verify the HTTPS certificate or not. Default is ``true``.
If both ``prefix`` and ``from_pattern``/``to_pattern`` are used,
``from_pattern``/``to_pattern`` are ignored. If you want to strip the prefix
and then do something special, just use ``from_pattern``/``to_pattern``. For
example, the transformation of ``v1_1_0`` => ``1.1.0`` can be achieved with
``from_pattern = 'v(\d+)_(\d+)_(\d+)'`` and ``to_pattern = '\1.\2.\3'``.
(Note that in TOML it's easiler to write regexes in single quotes so you don't need to escape ``\``.)
.. _list options:
List Options
@ -322,46 +326,10 @@ post_data
post_data_type
(*Optional*) Specifies the ``Content-Type`` of the request body (``post_data``). By default, this is ``application/x-www-form-urlencoded``.
This source can also work with XML to some extent, e.g. it can parse an RSS feed like this:
.. code-block:: toml
[ProxmoxVE]
source = "htmlparser"
url = "https://my.proxmox.com/en/announcements/tag/proxmox-ve/rss"
xpath = "//item/title"
from_pattern = 'Proxmox VE ([\d.]+) released!'
to_pattern = '\1'
.. note::
An additional dependency "lxml" is required.
You can use ``pip install 'nvchecker[htmlparser]'``.
Search with an JSON Parser (jq)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
::
source = "jq"
Send an HTTP request and search through the body with a specific ``jq`` filter.
url
The URL of the HTTP request.
filter
An ``jq`` filter used to find the version string.
post_data
(*Optional*) When present, a ``POST`` request (instead of a ``GET``) will be used. The value should be a string containing the full body of the request. The encoding of the string can be specified using the ``post_data_type`` option.
post_data_type
(*Optional*) Specifies the ``Content-Type`` of the request body (``post_data``). By default, this is ``application/json``.
This source supports :ref:`list options`.
.. note::
An additional dependency "jq" is required.
Find with a Command
~~~~~~~~~~~~~~~~~~~
::
@ -399,8 +367,8 @@ Check GitHub
source = "github"
Check `GitHub <https://github.com/>`_ for updates. The version returned is in
date format ``%Y%m%d.%H%M%S``, e.g. ``20130701.012212``, unless ``use_latest_release``,
``use_max_tag`` or ``use_max_release`` is used. See below.
date format ``%Y%m%d.%H%M%S``, e.g. ``20130701.012212``, unless ``use_latest_release``
or ``use_max_tag`` is used. See below.
github
The github repository, with author, e.g. ``lilydjwg/nvchecker``.
@ -411,9 +379,6 @@ branch
path
Only commits containing this file path will be returned.
host
Hostname for self-hosted GitHub instance.
use_latest_release
Set this to ``true`` to check for the latest release on GitHub.
@ -423,26 +388,7 @@ use_latest_release
small ones like `nvchecker's <https://github.com/lilydjwg/nvchecker/releases>`_
are only git tags that should use ``use_max_tag`` below.
Will return the release's tag name instead of date. (For historical reasons
it doesn't return the release name. See below to change.)
use_max_release
Set this to ``true`` to check for the max release on GitHub.
This option returns the largest one sorted by the
``sort_version_key`` option. Will return the tag name instead of date.
use_release_name
When ``use_latest_release`` or ``use_max_release`` is ``true``,
setting this to ``true`` will cause nvchecker to return the release name
instead of the tag name.
include_prereleases
When ``use_latest_release`` or ``use_max_release`` is ``true``,
set this to ``true`` to take prereleases into account.
This returns the release names (not the tag names).
This requires a token because it's using the v4 GraphQL API.
Will return the release name instead of date.
use_latest_tag
Set this to ``true`` to check for the latest tag on GitHub.
@ -455,24 +401,22 @@ query
use_max_tag
Set this to ``true`` to check for the max tag on GitHub. Unlike
``use_max_release``, this option includes both annotated tags and
``use_latest_release``, this option includes both annotated tags and
lightweight ones, and return the largest one sorted by the
``sort_version_key`` option. Will return the tag name instead of date.
token
A personal authorization token used to call the API.
An authorization token may be needed in order to use ``use_latest_tag``,
``include_prereleases`` or to request more frequently than anonymously.
An authorization token may be needed in order to use ``use_latest_tag`` or to
request more frequently than anonymously.
To set an authorization token, you can set:
- a key named ``github`` in the keyfile
- the token option
- an entry in the keyfile for the host (e.g. ``github.com``)
- an entry in your ``netrc`` file for the host
This source supports :ref:`list options` when ``use_max_tag`` or
``use_max_release`` is set.
This source supports :ref:`list options` when ``use_max_tag`` is set.
Check Gitea
~~~~~~~~~~~
@ -501,21 +445,11 @@ token
To set an authorization token, you can set:
- a key named ``gitea_{host}`` in the keyfile, where ``host`` is all-lowercased host name
- the token option
- an entry in the keyfile for the host (e.g. ``gitea.com``)
- an entry in your ``netrc`` file for the host
This source supports :ref:`list options` when ``use_max_tag`` is set.
Check Gogs / Forgejo / Codeberg
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Please use the above "gitea" source. Gitea is a fork of `Gogs <https://gogs.io/>`_.
`Forgejo <https://forgejo.org/>`_ is a fork of Gitea. Codeberg is a code
hosting provider that uses Forgejo. They share the same API endpoints nvchecker uses.
Alternatively, you can try the generic "git" source.
Check BitBucket
~~~~~~~~~~~~~~~
::
@ -584,9 +518,8 @@ token
To set an authorization token, you can set:
- a key named ``gitlab_{host}`` in the keyfile, where ``host`` is all-lowercased host name
- the token option
- an entry in the keyfile for the host (e.g. ``gitlab.com``)
- an entry in your ``netrc`` file for the host
This source supports :ref:`list options` when ``use_max_tag`` is set.
@ -596,7 +529,7 @@ Check PyPI
source = "pypi"
Check `PyPI <https://pypi.python.org/>`_ for updates. Yanked releases are ignored.
Check `PyPI <https://pypi.python.org/>`_ for updates.
pypi
The name used on PyPI, e.g. ``PySide``.
@ -604,8 +537,6 @@ pypi
use_pre_release
Whether to accept pre release. Default is false.
This source supports :ref:`list options`.
.. note::
An additional dependency "packaging" is required.
You can use ``pip install 'nvchecker[pypi]'``.
@ -690,16 +621,11 @@ Check crates.io
source = "cratesio"
Check `crates.io <https://crates.io/>`_ for updates. Yanked releases are ignored.
Check `crates.io <https://crates.io/>`_ for updates.
cratesio
The crate name on crates.io, e.g. ``tokio``.
use_pre_release
Whether to accept pre release. Default is false.
This source supports :ref:`list options`.
Check Local Pacman Database
~~~~~~~~~~~~~~~~~~~~~~~~~~~
::
@ -796,11 +722,6 @@ This enables you to track updates from `Anitya <https://release-monitoring.org/>
anitya
``distro/package``, where ``distro`` can be a lot of things like "fedora", "arch linux", "gentoo", etc. ``package`` is the package name of the chosen distribution.
anitya_id
The identifier of the project/package in anitya.
Note that either anitya or anitya_id needs to be specified, anitya_id is preferred when both specified.
Check Android SDK
~~~~~~~~~~~~~~~~~
::
@ -834,9 +755,6 @@ This enables you to track updates of macOS applications which using `Sparkle fra
sparkle
The url of the sparkle appcast.
release_notes_language
The language of release notes to return when localized release notes are available (defaults to ``en`` for English, the unlocalized release notes are used as a fallback)
Check Pagure
~~~~~~~~~~~~
::
@ -884,29 +802,6 @@ strip_release
Note that either pkg or srcpkg needs to be specified (but not both) or the item name will be used as pkg.
Check RPM repository
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
::
source = "rpmrepo"
This enables you to check latest package versions in an arbitrary RPM repository in `repomd` format used by package managers such as ``dnf`` (Fedora, RHEL, AlmaLinux etc.) or ``zypper`` (openSUSE) without the need for native RPM tools.
pkg
Name of the RPM package (you can also use ``rpmrepo`` as with other sources, but ``pkg`` is preferred for clarity)
repo
URL of the repository (required, ``repodata/repomd.xml`` should be there)
arch
Architecture of the RPM package (``binary``, ``src``, ``any``, ``x86_64``, ``aarch64``, etc, defaults to ``binary``)
This source supports :ref:`list options`.
.. note::
An additional dependency "lxml" is required.
You can use ``pip install 'nvchecker[rpmrepo]'``.
Check Git repository
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
::
@ -926,19 +821,6 @@ branch
When this source returns tags (``use_commit`` is not true) it supports :ref:`list options`.
Check Mercurial repository
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
::
source = "mercurial"
This enables you to check tags of an arbitrary mercurial (hg) repository.
mercurial
URL of the Mercurial repository.
This source returns tags and supports :ref:`list options`.
Check container registry
~~~~~~~~~~~~~~~~~~~~~~~~
::
@ -948,9 +830,7 @@ Check container registry
This enables you to check tags of images on a container registry like Docker.
container
The path (and tag) for the container image. For official Docker images, use namespace ``library/`` (e.g. ``library/python``).
If no tag is given, it checks latest available tag (sort by tag name), otherwise, it checks the tag's update time.
The path for the container image. For official Docker images, use namespace ``library/`` (e.g. ``library/python``).
registry
The container registry host. Default: ``docker.io``
@ -961,23 +841,17 @@ container name while this plugin requires the full name. If the host part is
omitted, use ``docker.io``, and if there is no slash in the path, prepend
``library/`` to the path. Here are some examples:
+-----------------------------------------------------+-----------+---------------------------------+
| Pull command | registry | container |
+=====================================================+===========+=================================+
| docker pull quay.io/prometheus/node-exporter | quay.io | prometheus/node-exporter |
+-----------------------------------------------------+-----------+---------------------------------+
| docker pull quay.io/prometheus/node-exporter:master | quay.io | prometheus/node-exporter:master |
+-----------------------------------------------------+-----------+---------------------------------+
| docker pull openeuler/openeuler | docker.io | openeuler/openeuler |
+-----------------------------------------------------+-----------+---------------------------------+
| docker pull openeuler/openeuler:20.03-lts | docker.io | openeuler/openeuler:20.03-lts |
+-----------------------------------------------------+-----------+---------------------------------+
| docker pull python | docker.io | library/python |
+-----------------------------------------------------+-----------+---------------------------------+
| docker pull python:3.11 | docker.io | library/python:3.11 |
+-----------------------------------------------------+-----------+---------------------------------+
+----------------------------------------------+-----------+--------------------------+
| Pull command | registry | container |
+==============================================+===========+==========================+
| docker pull quay.io/prometheus/node-exporter | quay.io | prometheus/node-exporter |
+----------------------------------------------+-----------+--------------------------+
| docker pull nvidia/cuda | docker.io | nvidia/cuda |
+----------------------------------------------+-----------+--------------------------+
| docker pull python | docker.io | library/python |
+----------------------------------------------+-----------+--------------------------+
If no tag is given, this source returns tags and supports :ref:`list options`.
This source returns tags and supports :ref:`list options`.
Check ALPM database
~~~~~~~~~~~~~~~~~~~
@ -994,7 +868,7 @@ repo
Name of the package repository in which the package resides. If not provided, nvchecker will use ``repos`` value, see below.
repos
An array of possible repositories in which the package may reside in, nvchecker will use the first repository which contains the package. If not provided, ``core``, ``extra`` and ``multilib`` will be used, in that order.
An array of possible repositories in which the package may reside in, nvchecker will use the first repository which contains the package. If not provided, ``core``, ``extra``, ``community`` and ``multilib`` will be used, in that order.
dbpath
Path to the ALPM database directory. Default: ``/var/lib/pacman``. You need to update the database yourself.
@ -1053,47 +927,6 @@ Check `Visual Studio Code Marketplace <https://marketplace.visualstudio.com/vsco
vsmarketplace
The extension's Unique Identifier on marketplace.visualstudio.com/vscode, e.g. ``ritwickdey.LiveServer``.
Check Go packages and modules
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
::
source = "go"
Check `Go packages and modules <https://pkg.go.dev/>`_ for updates.
go
The name of Go package or module, e.g. ``github.com/caddyserver/caddy/v2/cmd``.
Check opam repository
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
::
source = "opam"
This enables you to check latest package versions in an arbitrary `opam repository <https://opam.ocaml.org/doc/Manual.html#Repositories>` without the need for the opam command line tool.
pkg
Name of the opam package
repo
URL of the repository (optional, the default ``https://opam.ocaml.org`` repository is used if not specified)
This source supports :ref:`list options`.
Check Snapcraft
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
::
source = "snapcraft"
This source allows you to check the latest package versions in the `Snapcraft <https://snapcraft.io>`_.
snap
Name of the snap package.
channel
Name of the channel.
Combine others' results
~~~~~~~~~~~~~~~~~~~~~~~
::

View file

@ -23,9 +23,3 @@ ignore_missing_imports = True
[mypy-tomllib]
ignore_missing_imports = True
[mypy-jq]
ignore_missing_imports = True
[mypy-tomli]
ignore_missing_imports = True

View file

@ -1,4 +1,4 @@
# MIT licensed
# Copyright (c) 2013-2024 lilydjwg <lilydjwg@gmail.com>, et al.
# Copyright (c) 2013-2023 lilydjwg <lilydjwg@gmail.com>, et al.
__version__ = '2.17dev'
__version__ = '2.12'

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# MIT licensed
# Copyright (c) 2013-2024 lilydjwg <lilydjwg@gmail.com>, et al.
# Copyright (c) 2013-2022 lilydjwg <lilydjwg@gmail.com>, et al.
from __future__ import annotations
@ -13,7 +13,7 @@ from pathlib import Path
import structlog
from . import core
from .util import ResultData, RawResult, KeyManager, EntryWaiter
from .util import VersData, RawResult, KeyManager, EntryWaiter
from .ctxvars import proxy as ctx_proxy
logger = structlog.get_logger(logger_name=__name__)
@ -75,33 +75,26 @@ def main() -> None:
oldvers = core.read_verfile(options.ver_files[0])
else:
oldvers = {}
result_coro = core.process_result(oldvers, result_q, entry_waiter, verbose=bool(args.entry))
result_coro = core.process_result(oldvers, result_q, entry_waiter)
runner_coro = core.run_tasks(futures)
if sys.version_info >= (3, 10):
# Python 3.10 has deprecated asyncio.get_event_loop
results, has_failures = asyncio.run(run(result_coro, runner_coro))
newvers, has_failures = asyncio.run(run(result_coro, runner_coro))
else:
# Python < 3.10 will create an eventloop when asyncio.Queue is initialized
results, has_failures = asyncio.get_event_loop().run_until_complete(run(result_coro, runner_coro))
newvers, has_failures = asyncio.get_event_loop().run_until_complete(run(result_coro, runner_coro))
if options.ver_files is not None:
newverf = options.ver_files[1]
if args.entry:
# don't remove other entries when only one entry is specified on cmdline
vers = core.read_verfile(newverf)
else:
vers = {}
vers.update(results)
core.write_verfile(newverf, vers)
core.write_verfile(options.ver_files[1], newvers)
if args.failures and has_failures:
sys.exit(3)
async def run(
result_coro: Coroutine[None, None, Tuple[ResultData, bool]],
result_coro: Coroutine[None, None, Tuple[VersData, bool]],
runner_coro: Coroutine[None, None, None],
) -> Tuple[ResultData, bool]:
) -> Tuple[VersData, bool]:
result_fu = asyncio.create_task(result_coro)
runner_fu = asyncio.create_task(runner_coro)
await runner_fu

View file

@ -3,7 +3,7 @@
from .httpclient import session, TemporaryError, HTTPError
from .util import (
Entry, BaseWorker, RawResult, VersionResult, RichResult,
Entry, BaseWorker, RawResult, VersionResult,
AsyncCache, KeyManager, GetVersionError, EntryWaiter,
)
from .sortversion import sort_version_keys

View file

@ -1,5 +1,5 @@
# MIT licensed
# Copyright (c) 2013-2020, 2024 lilydjwg <lilydjwg@gmail.com>, et al.
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
from __future__ import annotations
@ -20,7 +20,6 @@ from importlib import import_module
import re
import contextvars
import json
import dataclasses
import structlog
@ -37,7 +36,7 @@ import platformdirs
from .lib import nicelogger
from . import slogconf
from .util import (
Entry, Entries, KeyManager, RawResult, RichResult, ResultData,
Entry, Entries, KeyManager, RawResult, Result, VersData,
FunctionWorker, GetVersionError,
FileLoadError, EntryWaiter,
)
@ -77,8 +76,6 @@ def process_common_arguments(args: argparse.Namespace) -> bool:
processors = [
slogconf.exc_info,
slogconf.filter_exc,
slogconf.filter_nones,
slogconf.filter_taskname,
]
logger_factory = None
@ -113,12 +110,11 @@ def process_common_arguments(args: argparse.Namespace) -> bool:
return True
return False
def safe_overwrite(file: Path, data: Union[bytes, str], *,
def safe_overwrite(fname: str, data: Union[bytes, str], *,
method: str = 'write', mode: str = 'w', encoding: Optional[str] = None) -> None:
# FIXME: directory has no read perm
# FIXME: hard links
resolved_path = file.resolve()
tmpname = str(resolved_path) + '.tmp'
# FIXME: symlinks and hard links
tmpname = fname + '.tmp'
# if not using "with", write can fail without exception
with open(tmpname, mode, encoding=encoding) as f:
getattr(f, method)(data)
@ -126,9 +122,9 @@ def safe_overwrite(file: Path, data: Union[bytes, str], *,
f.flush()
os.fsync(f.fileno())
# if the above write failed (because disk is full etc), the old data should be kept
os.rename(tmpname, resolved_path)
os.rename(tmpname, fname)
def read_verfile(file: Path) -> ResultData:
def read_verfile(file: Path) -> VersData:
try:
with open(file) as f:
data = f.read()
@ -144,34 +140,16 @@ def read_verfile(file: Path) -> ResultData:
name, ver = l.rstrip().split(None, 1)
v[name] = ver
if v.get('version') is None:
v = {k: RichResult(version=a) for k, a in v.items()}
elif v['version'] == 2:
v = {k: RichResult(**a) for k, a in v['data'].items()}
else:
raise Exception('unknown verfile version', v['version'])
return v
def write_verfile(file: Path, versions: ResultData) -> None:
d = {
'version': 2,
# sort and indent to make it friendly to human and git
'data': dict(sorted(versions.items())),
}
def write_verfile(file: Path, versions: VersData) -> None:
# sort and indent to make it friendly to human and git
data = json.dumps(
d,
indent = 2,
ensure_ascii = False,
default = json_encode,
dict(sorted(versions.items())),
indent=2,
ensure_ascii=False,
) + '\n'
safe_overwrite(file, data)
def json_encode(obj):
if isinstance(obj, RichResult):
d = {k: v for k, v in dataclasses.asdict(obj).items() if v is not None}
return d
raise TypeError(obj)
safe_overwrite(str(file), data)
class Options(NamedTuple):
ver_files: Optional[Tuple[Path, Path]]
@ -299,12 +277,13 @@ def substitute_version(
) -> str:
'''
Substitute the version string via defined rules in the configuration file.
See usage.rst#global-options for details.
See README.rst#global-options for details.
'''
prefix = conf.get('prefix')
if prefix:
if version.startswith(prefix):
version = version[len(prefix):]
return version
from_pattern = conf.get('from_pattern')
if from_pattern:
@ -312,54 +291,44 @@ def substitute_version(
if to_pattern is None:
raise ValueError("from_pattern exists but to_pattern doesn't")
version = re.sub(from_pattern, to_pattern, version)
return re.sub(from_pattern, to_pattern, version)
# No substitution rules found. Just return the original version string.
return version
def apply_list_options(
versions: List[Union[str, RichResult]],
conf: Entry,
name: str,
) -> Optional[Union[str, RichResult]]:
versions: List[str], conf: Entry,
) -> Optional[str]:
pattern = conf.get('include_regex')
if versions and pattern:
if pattern:
re_pat = re.compile(pattern)
versions2 = [x for x in versions
if re_pat.fullmatch(str(x))]
if not versions2:
logger.warning('include_regex matched no versions',
name=name, versions=versions, regex=pattern)
return None
versions = versions2
versions = [x for x in versions
if re_pat.fullmatch(x)]
pattern = conf.get('exclude_regex')
if pattern:
re_pat = re.compile(pattern)
versions = [x for x in versions
if not re_pat.fullmatch(str(x))]
if not re_pat.fullmatch(x)]
ignored = set(conf.get('ignored', '').split())
if ignored:
versions = [x for x in versions
if str(x) not in ignored]
versions = [x for x in versions if x not in ignored]
if not versions:
return None
sort_version_key = sort_version_keys[
conf.get("sort_version_key", "parse_version")]
versions.sort(key=lambda version: sort_version_key(str(version))) # type: ignore
versions.sort(key=sort_version_key) # type: ignore
return versions[-1]
def _process_result(r: RawResult) -> Union[RichResult, Exception]:
def _process_result(r: RawResult) -> Union[Result, Exception]:
version = r.version
conf = r.conf
name = r.name
url = None
revision = None
gitref = None
if isinstance(version, GetVersionError):
kw = version.kwargs
kw['name'] = name
@ -370,17 +339,7 @@ def _process_result(r: RawResult) -> Union[RichResult, Exception]:
name=r.name, exc_info=r.version)
return version
elif isinstance(version, list):
version_str = apply_list_options(version, conf, name)
if isinstance(version_str, RichResult):
url = version_str.url
gitref = version_str.gitref
revision = version_str.revision
version_str = version_str.version
elif isinstance(version, RichResult):
version_str = version.version
url = version.url
gitref = version.gitref
revision = version.revision
version_str = apply_list_options(version, conf)
else:
version_str = version
@ -389,12 +348,7 @@ def _process_result(r: RawResult) -> Union[RichResult, Exception]:
try:
version_str = substitute_version(version_str, conf)
return RichResult(
version = version_str,
url = url,
gitref = gitref,
revision = revision,
)
return Result(name, version_str, conf)
except (ValueError, re.error) as e:
logger.exception('error occurred in version substitutions', name=name)
return e
@ -403,35 +357,19 @@ def _process_result(r: RawResult) -> Union[RichResult, Exception]:
return ValueError('no version returned')
def check_version_update(
oldvers: ResultData,
name: str,
r: RichResult,
verbose: bool,
oldvers: VersData, name: str, version: str,
) -> None:
if old_result := oldvers.get(name):
oldver = old_result.version
oldver = oldvers.get(name, None)
if not oldver or oldver != version:
logger.info('updated', name=name, version=version, old_version=oldver)
else:
oldver = None
if not oldver or oldver != r.version:
logger.info(
'updated',
name = name,
version = r.version,
revision = r.revision,
old_version = oldver,
url = r.url,
)
else:
# provide visible user feedback if it was the only entry
level = logging.INFO if verbose else logging.DEBUG
logger.log(level, 'up-to-date', name=name, version=r.version, url=r.url)
logger.debug('up-to-date', name=name, version=version)
async def process_result(
oldvers: ResultData,
oldvers: VersData,
result_q: Queue[RawResult],
entry_waiter: EntryWaiter,
verbose: bool = False,
) -> Tuple[ResultData, bool]:
) -> Tuple[VersData, bool]:
ret = {}
has_failures = False
try:
@ -444,13 +382,11 @@ async def process_result(
r1 = e
if isinstance(r1, Exception):
entry_waiter.set_exception(r.name, r1)
# no versions are returned from "apply_list_options"?
logger.error('no-result', name=r.name, error=repr(r1))
has_failures = True
continue
check_version_update(oldvers, r.name, r1, verbose)
entry_waiter.set_result(r.name, r1.version)
ret[r.name] = r1
check_version_update(oldvers, r1.name, r1.version)
entry_waiter.set_result(r1.name, r1.version)
ret[r1.name] = r1.version
except asyncio.CancelledError:
return ret, has_failures

View file

@ -12,6 +12,7 @@ from .base import BaseSession, TemporaryError, Response, HTTPError
__all__ = ['session']
logger = structlog.get_logger(logger_name=__name__)
connector = aiohttp.TCPConnector(limit=20)
class AiohttpSession(BaseSession):
session = None

View file

@ -121,4 +121,4 @@ class TemporaryError(BaseHTTPError):
'''A temporary error (e.g. network error) happens.'''
class HTTPError(BaseHTTPError):
'''An HTTP 4xx error happens'''
''' An HTTP 4xx error happens '''

View file

@ -1,5 +1,5 @@
# MIT licensed
# Copyright (c) 2020-2022,2024 lilydjwg <lilydjwg@gmail.com>, et al.
# Copyright (c) 2020 lilydjwg <lilydjwg@gmail.com>, et al.
from typing import Dict, Optional, Tuple
@ -34,7 +34,7 @@ class HttpxSession(BaseSession):
client = httpx.AsyncClient(
timeout = httpx.Timeout(self.timeout, pool=None),
http2 = True,
proxy = proxy,
proxies = {'all://': proxy},
verify = verify_cert,
)
self.clients[(proxy, verify_cert)] = client
@ -49,8 +49,7 @@ class HttpxSession(BaseSession):
method, url, json = json, content = body,
headers = headers,
follow_redirects = follow_redirects,
# httpx checks for None but not ()
params = params or None,
params = params,
)
err_cls: Optional[type] = None
if r.status_code >= 500:

View file

@ -4,7 +4,6 @@
import json as _json
from urllib.parse import urlencode
from typing import Optional, Dict, Any
import os
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
@ -18,9 +17,8 @@ from .base import BaseSession, TemporaryError, Response, HTTPError
__all__ = ['session']
HTTP2_AVAILABLE = None if pycurl else False
SSL_CERT_FILE = os.environ.get('SSL_CERT_FILE')
def setup_curl(curl):
def try_use_http2(curl):
global HTTP2_AVAILABLE
if HTTP2_AVAILABLE is None:
try:
@ -31,10 +29,6 @@ def setup_curl(curl):
elif HTTP2_AVAILABLE:
curl.setopt(pycurl.HTTP_VERSION, 4)
if SSL_CERT_FILE:
curl.setopt_string(pycurl.CAINFO, SSL_CERT_FILE)
curl.setopt_string(pycurl.ACCEPT_ENCODING, "")
class TornadoSession(BaseSession):
def setup(
self,
@ -74,7 +68,7 @@ class TornadoSession(BaseSession):
kwargs['body'] = body
elif json:
kwargs['body'] = _json.dumps(json)
kwargs['prepare_curl_callback'] = setup_curl
kwargs['prepare_curl_callback'] = try_use_http2
if proxy:
host, port = proxy.rsplit(':', 1)

View file

@ -71,7 +71,6 @@ class TornadoLogFormatter(logging.Formatter):
'filename', 'exc_info', 'exc_text', 'created', 'funcName',
'processName', 'process', 'msecs', 'relativeCreated', 'thread',
'threadName', 'name', 'levelno', 'msg', 'pathname', 'stack_info',
'taskName',
})
if record.exc_info:

View file

@ -1,6 +1,6 @@
# vim: se sw=2:
# MIT licensed
# Copyright (c) 2018-2020,2023-2024 lilydjwg <lilydjwg@gmail.com>, et al.
# Copyright (c) 2018 lilydjwg <lilydjwg@gmail.com>, et al.
import logging
import os
@ -26,9 +26,6 @@ def _console_msg(event):
else:
msg = evt
if 'revision' in event and not event['revision']:
del event['revision']
if 'name' in event:
msg = f"{event['name']}: {msg}"
del event['name']
@ -42,17 +39,6 @@ def exc_info(logger, level, event):
event['exc_info'] = True
return event
def filter_nones(logger, level, event):
if 'url' in event and event['url'] is None:
del event['url']
return event
def filter_taskname(logger, level, event):
# added in Python 3.12, not useful to us, but appears as a normal KV.
if 'taskName' in event:
del event['taskName']
return event
def filter_exc(logger, level, event):
exc_info = event.get('exc_info')
if not exc_info:

View file

@ -1,16 +1,13 @@
# vim: se sw=2:
# MIT licensed
# Copyright (c) 2013-2024 lilydjwg <lilydjwg@gmail.com>, et al.
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
import sys
import argparse
import shutil
import structlog
import json
import os.path
from . import core
from .util import RichResult
logger = structlog.get_logger(logger_name=__name__)
@ -45,11 +42,10 @@ def take() -> None:
if args.all:
oldvers.update(newvers)
else:
name: str
for name in args.names:
if "=" in name:
name, newver = name.split("=")
oldvers[name] = RichResult(version=newver)
oldvers[name] = newver
else:
try:
oldvers[name] = newvers[name]
@ -64,12 +60,9 @@ def take() -> None:
sys.exit(2)
try:
if os.path.islink(oldverf):
shutil.copy(oldverf, oldverf.with_name(oldverf.name + '~'))
else:
oldverf.rename(
oldverf.with_name(oldverf.name + '~'),
)
oldverf.rename(
oldverf.with_name(oldverf.name + '~'),
)
except FileNotFoundError:
pass
core.write_verfile(oldverf, oldvers)
@ -108,8 +101,8 @@ def cmp() -> None:
oldverf = opt.ver_files[0]
newverf = opt.ver_files[1]
oldvers = {k: v.version for k, v in core.read_verfile(oldverf).items()}
newvers = {k: v.version for k, v in core.read_verfile(newverf).items()}
oldvers = core.read_verfile(oldverf)
newvers = core.read_verfile(newverf)
differences = []

View file

@ -3,7 +3,6 @@
from __future__ import annotations
import sys
import asyncio
from asyncio import Queue
from typing import (
@ -14,8 +13,6 @@ from typing import (
from pathlib import Path
import contextvars
import abc
import netrc
from dataclasses import dataclass
if TYPE_CHECKING:
import tomli as tomllib
@ -39,40 +36,13 @@ logger = structlog.get_logger(logger_name=__name__)
Entry = Dict[str, Any]
Entry.__doc__ = '''The configuration `dict` for an entry.'''
Entries = Dict[str, Entry]
if sys.version_info[:2] >= (3, 11):
from typing import LiteralString
else:
LiteralString = str
if sys.version_info[:2] >= (3, 10):
@dataclass(kw_only=True)
class RichResult:
version: str
gitref: Optional[str] = None
revision: Optional[str] = None
url: Optional[str] = None
def __str__(self):
return self.version
else:
@dataclass
class RichResult:
version: str
gitref: Optional[str] = None
revision: Optional[str] = None
url: Optional[str] = None
def __str__(self):
return self.version
VersionResult = Union[None, str, RichResult, List[Union[str, RichResult]], Exception]
VersData = Dict[str, str]
VersionResult = Union[None, str, List[str], Exception]
VersionResult.__doc__ = '''The result of a `get_version` check.
* `None` - No version found.
* `str` - A single version string is found.
* `RichResult` - A version string with additional information.
* `List[Union[str, RichResult]]` - Multiple version strings with or without additional information are found. :ref:`list options` will be applied.
* `List[str]` - Multiple version strings are found. :ref:`list options` will be applied.
* `Exception` - An error occurred.
'''
@ -99,18 +69,10 @@ class KeyManager:
else:
keys = {}
self.keys = keys
try:
netrc_file = netrc.netrc()
netrc_hosts = netrc_file.hosts
except (FileNotFoundError, netrc.NetrcParseError):
netrc_hosts = {}
self.netrc = netrc_hosts
def get_key(self, name: str, legacy_name: Optional[str] = None) -> Optional[str]:
def get_key(self, name: str) -> Optional[str]:
'''Get the named key (token) in the keyfile.'''
keyfile_token = self.keys.get(name) or self.keys.get(legacy_name)
netrc_passwd = (e := self.netrc.get(name)) and e[2]
return keyfile_token or netrc_passwd
return self.keys.get(name)
class EntryWaiter:
def __init__(self) -> None:
@ -144,7 +106,10 @@ RawResult.name.__doc__ = 'The name (table name) of the entry.'
RawResult.version.__doc__ = 'The result from the check.'
RawResult.conf.__doc__ = 'The entry configuration (table content) of the entry.'
ResultData = Dict[str, RichResult]
class Result(NamedTuple):
name: str
version: str
conf: Entry
class BaseWorker:
'''The base class for defining `Worker` classes for source plugins.
@ -325,6 +290,6 @@ class GetVersionError(Exception):
:param msg: The error message.
:param kwargs: Arbitrary additional context for the error.
'''
def __init__(self, msg: LiteralString, **kwargs: Any) -> None:
def __init__(self, msg: str, **kwargs: Any) -> None:
self.msg = msg
self.kwargs = kwargs

View file

@ -20,7 +20,7 @@ async def get_version(name, conf, *, cache, **kwargs):
repo = conf.get('repo')
if repo is None:
repos = conf.get('repos') or ['core', 'extra', 'multilib']
repos = conf.get('repos') or ['core', 'extra', 'community', 'multilib']
else:
repos = [repo]

View file

@ -1,17 +1,10 @@
# MIT licensed
# Copyright (c) 2017-2020 lilydjwg <lilydjwg@gmail.com>, et al.
from nvchecker.api import RichResult
URL = 'https://release-monitoring.org/api/project/{pkg}'
async def get_version(name, conf, *, cache, **kwargs):
pkg = conf.get('anitya_id')
if pkg is None:
pkg = conf.get('anitya')
pkg = conf.get('anitya')
url = URL.format(pkg = pkg)
data = await cache.get_json(url)
return RichResult(
version = data['version'],
url = f'https://release-monitoring.org/project/{data["id"]}/',
)
return data['version']

View file

@ -11,8 +11,8 @@ import functools
from collections import defaultdict
from nvchecker.api import (
session, GetVersionError, VersionResult,
RichResult, Entry, AsyncCache, KeyManager,
session, GetVersionError,
VersionResult, Entry, AsyncCache, KeyManager,
)
APT_RELEASE_URL = "%s/dists/%s/Release"
@ -92,13 +92,12 @@ async def get_url(url: str) -> str:
None, _decompress_data,
url, data)
async def parse_packages(key: Tuple[AsyncCache, str]) -> Tuple[Dict[str, str], Dict[str, str], Dict[str, str]]:
async def parse_packages(key: Tuple[AsyncCache, str]) -> Tuple[Dict[str, str], Dict[str, str]]:
cache, url = key
apt_packages = await cache.get(url, get_url) # type: ignore
pkg_map = defaultdict(list)
srcpkg_map = defaultdict(list)
pkg_to_src_map = defaultdict(list)
pkg = None
srcpkg = None
@ -111,7 +110,6 @@ async def parse_packages(key: Tuple[AsyncCache, str]) -> Tuple[Dict[str, str], D
version = line[9:]
if pkg is not None:
pkg_map[pkg].append(version)
pkg_to_src_map["%s/%s" % (pkg, version)] = srcpkg if srcpkg is not None else pkg
if srcpkg is not None:
srcpkg_map[srcpkg].append(version)
pkg = srcpkg = None
@ -120,10 +118,8 @@ async def parse_packages(key: Tuple[AsyncCache, str]) -> Tuple[Dict[str, str], D
for pkg, vs in pkg_map.items()}
srcpkg_map_max = {pkg: max(vs, key=functools.cmp_to_key(compare_version))
for pkg, vs in srcpkg_map.items()}
pkg_to_src_map_max = {pkg: pkg_to_src_map["%s/%s" % (pkg, vs)]
for pkg, vs in pkg_map_max.items()}
return pkg_map_max, srcpkg_map_max, pkg_to_src_map_max
return pkg_map_max, srcpkg_map_max
async def get_version(
name: str, conf: Entry, *,
@ -152,38 +148,16 @@ async def get_version(
else:
raise GetVersionError('Packages file not found in APT repository')
pkg_map, srcpkg_map, pkg_to_src_map = await cache.get(
pkg_map, srcpkg_map = await cache.get(
(cache, APT_PACKAGES_URL % (mirror, suite, packages_path)), parse_packages) # type: ignore
if pkg and pkg in pkg_map:
version = pkg_map[pkg]
changelog_name = pkg_to_src_map[pkg]
elif srcpkg and srcpkg in srcpkg_map:
version = srcpkg_map[srcpkg]
changelog_name = srcpkg
else:
raise GetVersionError('package not found in APT repository')
# Get Changelogs field from the Release file
changelogs_url = None
for line in apt_release.split('\n'):
if line.startswith('Changelogs: '):
changelogs_url = line[12:]
break
# Build the changelog URL (see https://wiki.debian.org/DebianRepository/Format#Changelogs for spec)
changelog = None
if changelogs_url is not None and changelogs_url != 'no':
changelog_section = changelog_name[:4] if changelog_name.startswith('lib') else changelog_name[:1]
changelog = changelogs_url.replace('@CHANGEPATH@', f'{repo}/{changelog_section}/{changelog_name}/{changelog_name}_{version}')
if strip_release:
version = version.split("-")[0]
if changelog is not None:
return RichResult(
version = version,
url = changelog,
)
else:
return version
return version

View file

@ -1,9 +1,9 @@
# MIT licensed
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
from nvchecker.api import session, RichResult, GetVersionError
from nvchecker.api import session, GetVersionError
URL = 'https://archlinux.org/packages/search/json/'
URL = 'https://www.archlinux.org/packages/search/json/'
async def request(pkg):
res = await session.get(URL, params={"name": pkg})
@ -31,7 +31,4 @@ async def get_version(name, conf, *, cache, **kwargs):
else:
version = r['pkgver'] + '-' + r['pkgrel']
return RichResult(
version = version,
url = f'https://archlinux.org/packages/{r["repo"]}/{r["arch"]}/{r["pkgname"]}/',
)
return version

View file

@ -1,12 +1,12 @@
# MIT licensed
# Copyright (c) 2013-2020,2024 lilydjwg <lilydjwg@gmail.com>, et al.
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
from datetime import datetime, timezone
from datetime import datetime
import asyncio
from typing import Iterable, Dict, List, Tuple, Any, Optional
from nvchecker.api import (
session, GetVersionError, VersionResult, RichResult,
session, GetVersionError, VersionResult,
Entry, BaseWorker, RawResult,
)
@ -95,15 +95,11 @@ async def _run_batch_impl(
version = result['Version']
if use_last_modified:
dt = datetime.fromtimestamp(result['LastModified'], timezone.utc)
version += '-' + dt.strftime('%Y%m%d%H%M%S')
version += '-' + datetime.utcfromtimestamp(result['LastModified']).strftime('%Y%m%d%H%M%S')
if strip_release and '-' in version:
version = version.rsplit('-', 1)[0]
ret[name] = RichResult(
version = version,
url = f'https://aur.archlinux.org/packages/{name}',
)
ret[name] = version
return ret

View file

@ -1,10 +1,10 @@
# MIT licensed
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
from typing import Any, List, Union
from typing import Any, List
from urllib.parse import urlencode
from nvchecker.api import VersionResult, RichResult, Entry, AsyncCache
from nvchecker.api import VersionResult, Entry, AsyncCache
# doc: https://developer.atlassian.com/cloud/bitbucket/rest/api-group-commits/#api-repositories-workspace-repo-slug-commits-get
BITBUCKET_URL = 'https://bitbucket.org/api/2.0/repositories/%s/commits/%s'
@ -22,7 +22,7 @@ async def get_version(
use_sorted_tags = conf.get('use_sorted_tags', False)
if use_sorted_tags or use_max_tag:
parameters = {'fields': 'values.name,values.links.html.href,next'}
parameters = {'fields': 'values.name,next'}
if use_sorted_tags:
parameters['sort'] = conf.get('sort', '-target.date')
@ -33,41 +33,37 @@ async def get_version(
url = BITBUCKET_MAX_TAG % repo
url += '?' + urlencode(parameters)
return await _get_tags(url, max_page=1, cache=cache)
version = await _get_tags(url, max_page=1, cache=cache)
elif use_max_tag:
url = BITBUCKET_MAX_TAG % repo
url += '?' + urlencode(parameters)
max_page = conf.get('max_page', 3)
return await _get_tags(url, max_page=max_page, cache=cache)
version = await _get_tags(url, max_page=max_page, cache=cache)
else:
url = BITBUCKET_URL % (repo, br)
data = await cache.get_json(url)
return RichResult(
version = data['values'][0]['date'].split('T', 1)[0].replace('-', ''),
url = data['values'][0]['links']['html']['href'],
)
version = data['values'][0]['date'].split('T', 1)[0].replace('-', '')
return version
async def _get_tags(
url: str, *,
max_page: int,
cache: AsyncCache,
) -> VersionResult:
ret: List[Union[str, RichResult]] = []
) -> List[str]:
ret: List[str] = []
for _ in range(max_page):
data = await cache.get_json(url)
ret.extend([
RichResult(
version = tag['name'],
url = tag['links']['html']['href'],
) for tag in data['values']
])
ret.extend(x['name'] for x in data['values'])
if 'next' in data:
url = data['next']
else:
break
return ret

View file

@ -4,7 +4,6 @@
from typing import Dict, List, NamedTuple, Optional, Tuple
from urllib.request import parse_http_list
from urllib.parse import urljoin
import json
from nvchecker.api import session, HTTPError
@ -58,7 +57,15 @@ async def get_registry_auth_info(registry_host: str) -> AuthInfo:
async def get_container_tags(info: Tuple[str, str, AuthInfo]) -> List[str]:
image_path, registry_host, auth_info = info
token = await get_auth_token(auth_info, image_path)
auth_params = {
'scope': f'repository:{image_path}:pull',
}
if auth_info.service:
auth_params['service'] = auth_info.service
res = await session.get(auth_info.realm, params=auth_params)
token = res.json()['token']
tags = []
url = f'https://{registry_host}/v2/{image_path}/tags/list'
@ -76,18 +83,6 @@ async def get_container_tags(info: Tuple[str, str, AuthInfo]) -> List[str]:
return tags
async def get_auth_token(auth_info, image_path):
auth_params = {
'scope': f'repository:{image_path}:pull',
}
if auth_info.service:
auth_params['service'] = auth_info.service
res = await session.get(auth_info.realm, params=auth_params)
token = res.json()['token']
return token
def parse_next_link(value: str) -> str:
ending = '>; rel="next"'
if value.endswith(ending):
@ -95,70 +90,13 @@ def parse_next_link(value: str) -> str:
else:
raise ValueError(value)
async def get_container_tag_update_time(info: Tuple[str, str, str, AuthInfo]):
'''
Find the update time of a container tag.
In fact, it's the creation time of the image ID referred by the tag. Tag itself does not have any update time.
'''
image_path, image_tag, registry_host, auth_info = info
token = await get_auth_token(auth_info, image_path)
# HTTP headers
headers = {
'Authorization': f'Bearer {token}',
# Prefer Image Manifest Version 2, Schema 2: https://distribution.github.io/distribution/spec/manifest-v2-2/
'Accept': ', '.join([
'application/vnd.oci.image.manifest.v1+json',
'application/vnd.oci.image.index.v1+json',
'application/vnd.docker.distribution.manifest.v2+json',
'application/vnd.docker.distribution.manifest.list.v2+json',
'application/json',
]),
}
# Get tag manifest
url = f'https://{registry_host}/v2/{image_path}/manifests/{image_tag}'
res = await session.get(url, headers=headers)
data = res.json()
# Schema 1 returns the creation time in the response
if data['schemaVersion'] == 1:
return json.loads(data['history'][0]['v1Compatibility'])['created']
# For schema 2, we have to fetch the config's blob
# For multi-arch images, multiple manifests are bounded with the same tag. We should choose one and then request
# the manifest's detail
if data.get('manifests'):
# It's quite hard to find the manifest matching with current CPU architecture and system.
# For now we just choose the first and it should probably work for most cases
image_digest = data['manifests'][0]['digest']
url = f'https://{registry_host}/v2/{image_path}/manifests/{image_digest}'
res = await session.get(url, headers=headers)
data = res.json()
digest = data['config']['digest']
url = f'https://{registry_host}/v2/{image_path}/blobs/{digest}'
res = await session.get(url, headers=headers)
data = res.json()
return data['created']
async def get_version(name, conf, *, cache, **kwargs):
image_path = conf.get('container', name)
image_tag = None
# image tag is optional
if ':' in image_path:
image_path, image_tag = image_path.split(':', 1)
registry_host = conf.get('registry', 'docker.io')
if registry_host == 'docker.io':
registry_host = 'registry-1.docker.io'
auth_info = await cache.get(registry_host, get_registry_auth_info)
# if a tag is given, return the tag's update time, otherwise return the image's tag list
if image_tag:
key = image_path, image_tag, registry_host, auth_info
return await cache.get(key, get_container_tag_update_time)
key = image_path, registry_host, auth_info
return await cache.get(key, get_container_tags)

View file

@ -1,15 +1,11 @@
# MIT licensed
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
from nvchecker.api import RichResult
# Using metacpan
CPAN_URL = 'https://fastapi.metacpan.org/release/%s'
async def get_version(name, conf, *, cache, **kwargs):
key = conf.get('cpan', name)
data = await cache.get_json(CPAN_URL % key)
return RichResult(
version = str(data['version']),
url = f'https://metacpan.org/release/{data["author"]}/{data["name"]}',
)
return str(data['version'])

View file

@ -1,7 +1,7 @@
# MIT licensed
# Copyright (c) 2022 Pekka Ristola <pekkarr [at] protonmail [dot] com>, et al.
from nvchecker.api import session, RichResult, GetVersionError
from nvchecker.api import session, GetVersionError
CRAN_URL = 'https://cran.r-project.org/package=%s/DESCRIPTION'
VERSION_FIELD = 'Version: '
@ -23,7 +23,4 @@ async def get_version(name, conf, *, cache, **kwargs):
else:
raise GetVersionError('Invalid DESCRIPTION file')
return RichResult(
version = version,
url = f'https://cran.r-project.org/web/packages/{package}/',
)
return version

View file

@ -1,40 +1,10 @@
# MIT licensed
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
import re
import structlog
from nvchecker.api import RichResult
logger = structlog.get_logger(logger_name=__name__)
API_URL = 'https://crates.io/api/v1/crates/%s'
# https://semver.org/#is-there-a-suggested-regular-expression-regex-to-check-a-semver-string
VERSION_PATTERN = r'^(?P<major>0|[1-9]\d*)\.(?P<minor>0|[1-9]\d*)\.(?P<patch>0|[1-9]\d*)(?:-(?P<prerelease>(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+(?P<buildmetadata>[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$'
async def get_version(name, conf, *, cache, **kwargs):
name = conf.get('cratesio') or name
use_pre_release = conf.get('use_pre_release', False)
data = await cache.get_json(API_URL % name)
results = []
for v in data['versions']:
if v['yanked']:
continue
version = v['num']
match = re.fullmatch(VERSION_PATTERN, version)
if match is None:
logger.warning('ignoring invalid version', version=version)
continue
if not use_pre_release and match.group('prerelease'):
continue
results.append(
RichResult(
version=version,
url=f'https://crates.io/crates/{name}/{version}',
)
)
return results
version = [v['num'] for v in data['versions'] if not v['yanked']][0]
return version

View file

@ -2,7 +2,7 @@
# Copyright (c) 2020 lilydjwg <lilydjwg@gmail.com>, et al.
# Copyright (c) 2017 Felix Yan <felixonmars@archlinux.org>, et al.
from nvchecker.api import RichResult, GetVersionError
from nvchecker.api import GetVersionError
URL = 'https://sources.debian.org/api/src/%(pkgname)s/?suite=%(suite)s'
@ -22,7 +22,4 @@ async def get_version(name, conf, *, cache, **kwargs):
else:
version = r['version']
return RichResult(
version = version,
url = f'https://sources.debian.org/src/{data["package"]}/{r["version"]}/',
)
return version

View file

@ -1,16 +1,9 @@
# MIT licensed
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
from nvchecker.api import RichResult
GEMS_URL = 'https://rubygems.org/api/v1/versions/%s.json'
async def get_version(name, conf, *, cache, **kwargs):
key = conf.get('gems', name)
data = await cache.get_json(GEMS_URL % key)
return [
RichResult(
version = item['number'],
url = f'https://rubygems.org/gems/{key}/versions/{item["number"]}',
) for item in data
]
return [item['number'] for item in data]

View file

@ -3,8 +3,6 @@
from .cmd import run_cmd
from nvchecker.api import RichResult
async def get_version(
name, conf, *, cache, keymanager=None
):
@ -15,27 +13,13 @@ async def get_version(
ref = conf.get('branch')
if ref is None:
ref = 'HEAD'
gitref = None
else:
ref = 'refs/heads/' + ref
gitref = ref
cmd = f"git ls-remote {git} {ref}"
data = await cache.get(cmd, run_cmd)
version = data.split(None, 1)[0]
return RichResult(
version = version,
revision = version,
gitref = gitref,
)
return data.split(None, 1)[0]
else:
cmd = f"git ls-remote --tags --refs {git}"
data = await cache.get(cmd, run_cmd)
versions = []
for line in data.splitlines():
revision, version = line.split("\trefs/tags/", 1)
versions.append(RichResult(
version = version,
revision = revision,
gitref = f"refs/tags/{version}",
))
versions = [line.split("refs/tags/")[1] for line in data.splitlines()]
return versions

View file

@ -9,8 +9,7 @@ GITEA_URL = 'https://%s/api/v1/repos/%s/commits'
GITEA_MAX_TAG = 'https://%s/api/v1/repos/%s/tags'
from nvchecker.api import (
VersionResult, RichResult, Entry,
AsyncCache, KeyManager,
VersionResult, Entry, AsyncCache, KeyManager,
)
async def get_version(
@ -33,7 +32,8 @@ async def get_version(
token = conf.get('token')
# Load token from keyman
if token is None:
token = keymanager.get_key(host.lower(), 'gitea_' + host.lower())
key_name = 'gitea_' + host.lower()
token = keymanager.get_key(key_name)
# Set private token if token exists.
headers = {}
@ -42,16 +42,7 @@ async def get_version(
data = await cache.get_json(url, headers = headers)
if use_max_tag:
return [
RichResult(
version = tag['name'],
revision = tag['id'],
url = f'https://{host}/{conf["gitea"]}/releases/tag/{tag["name"]}',
) for tag in data
]
version = [tag["name"] for tag in data]
else:
return RichResult(
version = data[0]['commit']['committer']['date'],
revision = data[0]['sha'],
url = data[0]['html_url'],
)
version = data[0]['commit']['committer']['date'].split('T', 1)[0].replace('-', '')
return version

View file

@ -1,52 +1,30 @@
# MIT licensed
# Copyright (c) 2013-2020, 2024 lilydjwg <lilydjwg@gmail.com>, et al.
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
import time
from urllib.parse import urlencode
from typing import List, Tuple, Union, Optional
import asyncio
from typing import Tuple
import structlog
from nvchecker.api import (
VersionResult, Entry, AsyncCache, KeyManager,
HTTPError, session, RichResult, GetVersionError,
TemporaryError, session, GetVersionError,
)
logger = structlog.get_logger(logger_name=__name__)
ALLOW_REQUEST = None
RATE_LIMITED_ERROR = False
GITHUB_URL = 'https://api.%s/repos/%s/commits'
GITHUB_LATEST_RELEASE = 'https://api.%s/repos/%s/releases/latest'
GITHUB_URL = 'https://api.github.com/repos/%s/commits'
GITHUB_LATEST_RELEASE = 'https://api.github.com/repos/%s/releases/latest'
# https://developer.github.com/v3/git/refs/#get-all-references
GITHUB_MAX_TAG = 'https://api.%s/repos/%s/git/refs/tags'
GITHUB_MAX_RELEASE = 'https://api.%s/repos/%s/releases'
GITHUB_GRAPHQL_URL = 'https://api.%s/graphql'
GITHUB_MAX_TAG = 'https://api.github.com/repos/%s/git/refs/tags'
GITHUB_GRAPHQL_URL = 'https://api.github.com/graphql'
async def get_version(name, conf, **kwargs):
global RATE_LIMITED_ERROR, ALLOW_REQUEST
if RATE_LIMITED_ERROR:
raise RuntimeError('rate limited')
if ALLOW_REQUEST is None:
ALLOW_REQUEST = asyncio.Event()
ALLOW_REQUEST.set()
for _ in range(2): # retry once
try:
await ALLOW_REQUEST.wait()
return await get_version_real(name, conf, **kwargs)
except HTTPError as e:
if e.code in [403, 429]:
if n := check_ratelimit(e, name):
ALLOW_REQUEST.clear()
await asyncio.sleep(n+1)
ALLOW_REQUEST.set()
continue
RATE_LIMITED_ERROR = True
raise
try:
return await get_version_real(name, conf, **kwargs)
except TemporaryError as e:
check_ratelimit(e, name)
QUERY_LATEST_TAG = '''
{{
@ -57,9 +35,6 @@ QUERY_LATEST_TAG = '''
edges {{
node {{
name
target {{
oid
}}
}}
}}
}}
@ -67,29 +42,8 @@ QUERY_LATEST_TAG = '''
}}
'''
QUERY_LATEST_RELEASE_WITH_PRERELEASES = '''
{{
repository(name: "{name}", owner: "{owner}") {{
releases(first: 1, orderBy: {{field: CREATED_AT, direction: DESC}}) {{
edges {{
node {{
name
url
tag {{
name
}}
tagCommit {{
oid
}}
}}
}}
}}
}}
}}
'''
async def get_latest_tag(key: Tuple[str, str, str, str]) -> RichResult:
host, repo, query, token = key
async def get_latest_tag(key: Tuple[str, str, str]) -> str:
repo, query, token = key
owner, reponame = repo.split('/')
headers = {
'Authorization': f'bearer {token}',
@ -102,7 +56,7 @@ async def get_latest_tag(key: Tuple[str, str, str, str]) -> RichResult:
)
res = await session.post(
GITHUB_GRAPHQL_URL % host,
GITHUB_GRAPHQL_URL,
headers = headers,
json = {'query': q},
)
@ -112,50 +66,7 @@ async def get_latest_tag(key: Tuple[str, str, str, str]) -> RichResult:
if not refs:
raise GetVersionError('no tag found')
version = refs[0]['node']['name']
revision = refs[0]['node']['target']['oid']
return RichResult(
version = version,
gitref = f"refs/tags/{version}",
revision = revision,
url = f'https://github.com/{repo}/releases/tag/{version}',
)
async def get_latest_release_with_prereleases(key: Tuple[str, str, str, str]) -> RichResult:
host, repo, token, use_release_name = key
owner, reponame = repo.split('/')
headers = {
'Authorization': f'bearer {token}',
'Content-Type': 'application/json',
}
q = QUERY_LATEST_RELEASE_WITH_PRERELEASES.format(
owner = owner,
name = reponame,
)
res = await session.post(
GITHUB_GRAPHQL_URL % host,
headers = headers,
json = {'query': q},
)
j = res.json()
refs = j['data']['repository']['releases']['edges']
if not refs:
raise GetVersionError('no release found')
tag_name = refs[0]['node']['tag']['name']
if use_release_name:
version = refs[0]['node']['name']
else:
version = tag_name
return RichResult(
version = version,
gitref = f"refs/tags/{tag_name}",
revision = refs[0]['node']['tagCommit']['oid'],
url = refs[0]['node']['url'],
)
return refs[0]['node']['name']
async def get_version_real(
name: str, conf: Entry, *,
@ -163,13 +74,12 @@ async def get_version_real(
**kwargs,
) -> VersionResult:
repo = conf['github']
host = conf.get('host', "github.com")
# Load token from config
token = conf.get('token')
# Load token from keyman
if token is None:
token = keymanager.get_key(host.lower(), 'github')
token = keymanager.get_key('github')
use_latest_tag = conf.get('use_latest_tag', False)
if use_latest_tag:
@ -177,31 +87,18 @@ async def get_version_real(
raise GetVersionError('token not given but it is required')
query = conf.get('query', '')
return await cache.get((host, repo, query, token), get_latest_tag) # type: ignore
use_latest_release = conf.get('use_latest_release', False)
include_prereleases = conf.get('include_prereleases', False)
use_release_name = conf.get('use_release_name', False)
if use_latest_release and include_prereleases:
if not token:
raise GetVersionError('token not given but it is required')
return await cache.get(
(host, repo, token, use_release_name),
get_latest_release_with_prereleases) # type: ignore
return await cache.get((repo, query, token), get_latest_tag) # type: ignore
br = conf.get('branch')
path = conf.get('path')
use_latest_release = conf.get('use_latest_release', False)
use_max_tag = conf.get('use_max_tag', False)
use_max_release = conf.get('use_max_release', False)
if use_latest_release:
url = GITHUB_LATEST_RELEASE % (host, repo)
url = GITHUB_LATEST_RELEASE % repo
elif use_max_tag:
url = GITHUB_MAX_TAG % (host, repo)
elif use_max_release:
url = GITHUB_MAX_RELEASE % (host, repo)
url = GITHUB_MAX_TAG % repo
else:
url = GITHUB_URL % (host, repo)
url = GITHUB_URL % repo
parameters = {}
if br:
parameters['sha'] = br
@ -217,62 +114,27 @@ async def get_version_real(
data = await cache.get_json(url, headers = headers)
if use_max_tag:
tags: List[Union[str, RichResult]] = [
RichResult(
version = ref['ref'].split('/', 2)[-1],
gitref = ref['ref'],
revision = ref['object']['sha'],
url = f'https://github.com/{repo}/releases/tag/{ref["ref"].split("/", 2)[-1]}',
) for ref in data
]
tags = [ref['ref'].split('/', 2)[-1] for ref in data]
if not tags:
raise GetVersionError('No tag found in upstream repository.')
return tags
if use_max_release:
releases: List[Union[str, RichResult]] = [
RichResult(
version = ref['name'] if use_release_name else ref['tag_name'],
gitref = f"refs/tags/{ref['tag_name']}",
url = ref['html_url'],
) for ref in data if include_prereleases or not ref['prerelease']
]
if not releases:
raise GetVersionError('No release found in upstream repository.')
return releases
if use_latest_release:
if 'tag_name' not in data:
raise GetVersionError('No release found in upstream repository.')
if use_release_name:
version = data['name']
else:
version = data['tag_name']
return RichResult(
version = version,
gitref = f"refs/tags/{data['tag_name']}",
url = data['html_url'],
)
version = data['tag_name']
else:
return RichResult(
# YYYYMMDD.HHMMSS
version = data[0]['commit']['committer']['date'].rstrip('Z').replace('-', '').replace(':', '').replace('T', '.'),
revision = data[0]['sha'],
url = data[0]['html_url'],
)
# YYYYMMDD.HHMMSS
version = data[0]['commit']['committer']['date'] \
.rstrip('Z').replace('-', '').replace(':', '').replace('T', '.')
def check_ratelimit(exc: HTTPError, name: str) -> Optional[int]:
return version
def check_ratelimit(exc, name):
res = exc.response
if not res:
raise exc
if v := res.headers.get('retry-after'):
n = int(v)
logger.warning('retry-after', n=n)
return n
raise
# default -1 is used to re-raise the exception
n = int(res.headers.get('X-RateLimit-Remaining', -1))
@ -282,6 +144,5 @@ def check_ratelimit(exc: HTTPError, name: str) -> Optional[int]:
'Or get an API token to increase the allowance if not yet',
name = name,
reset = reset)
return None
raise exc
else:
raise

View file

@ -6,8 +6,8 @@ import urllib.parse
import structlog
from nvchecker.api import (
VersionResult, RichResult, Entry,
AsyncCache, KeyManager, TemporaryError,
VersionResult, Entry, AsyncCache, KeyManager,
TemporaryError,
)
GITLAB_URL = 'https://%s/api/v4/projects/%s/repository/commits'
@ -42,7 +42,8 @@ async def get_version_real(
token = conf.get('token')
# Load token from keyman
if token is None:
token = keymanager.get_key(host.lower(), 'gitlab_' + host.lower())
key_name = 'gitlab_' + host.lower()
token = keymanager.get_key(key_name)
# Set private token if token exists.
headers = {}
@ -51,19 +52,10 @@ async def get_version_real(
data = await cache.get_json(url, headers = headers)
if use_max_tag:
return [
RichResult(
version = tag['name'],
revision = tag['commit']['id'],
url = f'https://{host}/{conf["gitlab"]}/-/tags/{tag["name"]}',
) for tag in data
]
version = [tag["name"] for tag in data]
else:
return RichResult(
version = data[0]['created_at'].split('T', 1)[0].replace('-', ''),
revision = data[0]['id'],
url = data[0]['web_url'],
)
version = data[0]['created_at'].split('T', 1)[0].replace('-', '')
return version
def check_ratelimit(exc, name):
res = exc.response

View file

@ -1,40 +0,0 @@
# MIT licensed
# Copyright (c) 2024 bgme <i@bgme.me>.
from lxml import html
from nvchecker.api import (
RichResult, Entry, AsyncCache, KeyManager,
session, GetVersionError,
)
GO_PKG_URL = 'https://pkg.go.dev/{pkg}?tab=versions'
GO_PKG_VERSION_URL = 'https://pkg.go.dev/{pkg}@{version}'
async def get_version(
name: str, conf: Entry, *,
cache: AsyncCache, keymanager: KeyManager,
**kwargs,
) -> RichResult:
key = tuple(sorted(conf.items()))
return await cache.get(key, get_version_impl)
async def get_version_impl(info) -> RichResult:
conf = dict(info)
pkg_name = conf.get('go')
url = GO_PKG_URL.format(pkg=pkg_name)
res = await session.get(url)
doc = html.fromstring(res.body.decode())
elements = doc.xpath("//div[@class='Version-tag']/a/text()")
try:
version = elements[0] # type: ignore
return RichResult(
version = version, # type: ignore
url = GO_PKG_VERSION_URL.format(pkg=pkg_name, version=version),
)
except IndexError:
raise GetVersionError("parse error", pkg_name=pkg_name)

View file

@ -1,15 +1,10 @@
# MIT licensed
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
from nvchecker.api import RichResult
HACKAGE_URL = 'https://hackage.haskell.org/package/%s/preferred.json'
async def get_version(name, conf, *, cache, **kwargs):
key = conf.get('hackage', name)
data = await cache.get_json(HACKAGE_URL % key)
version = data['normal-version'][0]
return RichResult(
version = version,
url = f'https://hackage.haskell.org/package/{key}-{version}',
)
return data['normal-version'][0]

View file

@ -29,11 +29,7 @@ async def get_version_impl(info):
header_value = res.headers.get(header)
if not header_value:
raise GetVersionError(
'header not found or is empty',
header = header,
value = header_value,
)
raise GetVersionError('header %s not found or is empty' % header)
try:
version = regex.findall(header_value)

View file

@ -1,42 +0,0 @@
# MIT licensed
# Copyright (c) 2024 Rocket Aaron <i@rocka.me>, et al.
import json
import jq
from nvchecker.api import session, GetVersionError
async def get_version(name, conf, *, cache, **kwargs):
key = tuple(sorted(conf.items()))
return await cache.get(key, get_version_impl)
async def get_version_impl(info):
conf = dict(info)
try:
program = jq.compile(conf.get('filter', '.'))
except ValueError as e:
raise GetVersionError('bad jq filter', exc_info=e)
data = conf.get('post_data')
if data is None:
res = await session.get(conf['url'])
else:
res = await session.post(conf['url'], body = data, headers = {
'Content-Type': conf.get('post_data_type', 'application/json')
})
try:
obj = json.loads(res.body)
except json.decoder.JSONDecodeError as e:
raise GetVersionError('bad json string', exc_info=e)
try:
version = program.input(obj).all()
if version == [None] and not conf.get('missing_ok', False):
raise GetVersionError('version string not found.')
version = [str(v) for v in version]
except ValueError as e:
raise GetVersionError('failed to filter json', exc_info=e)
return version

View file

@ -1,20 +0,0 @@
# MIT Licensed
# Copyright (c) 2024 Bert Peters <bertptrs@archlinux.org>, et al.
from __future__ import annotations
from nvchecker.api import AsyncCache, Entry, RichResult
PROJECT_INFO_URL = "https://api.launchpad.net/1.0/{launchpad}"
async def get_version(name: str, conf: Entry, *, cache: AsyncCache, **kwargs):
launchpad = conf["launchpad"]
project_data = await cache.get_json(PROJECT_INFO_URL.format(launchpad=launchpad))
data = await cache.get_json(project_data['releases_collection_link'])
return [
RichResult(version=entry["version"], url=entry["web_link"])
for entry in data["entries"]
]

View file

@ -1,10 +0,0 @@
# MIT licensed
# Copyright (c) 2020 Felix Yan <felixonmars@archlinux.org>, et al.
async def get_version(name, conf, *, cache, **kwargs):
url = conf['mercurial'] + '/json-tags'
data = await cache.get_json(url)
version = [tag['tag'] for tag in data['tags']]
return version

View file

@ -3,7 +3,7 @@
import json
import re
from nvchecker.api import session, RichResult
from nvchecker.api import session
NPM_URL = 'https://registry.npmjs.org/%s'
@ -26,13 +26,4 @@ async def get_version(name, conf, *, cache, **kwargs):
data = await cache.get(NPM_URL % key, get_first_1k)
dist_tags = json.loads(re.search(b'"dist-tags":({.*?})', data).group(1))
version = dist_tags['latest']
# There is no standardised URL scheme, so we only return an URL for the default registry
if NPM_URL.startswith('https://registry.npmjs.org/'):
return RichResult(
version = version,
url = f'https://www.npmjs.com/package/{key}/v/{version}',
)
else:
return version
return dist_tags['latest']

View file

@ -1,71 +0,0 @@
# MIT licensed
# Copyright (c) 2024 Daniel Peukert <daniel@peukert.cc>, et al.
import asyncio
from io import BytesIO
import tarfile
from typing import List
from nvchecker.api import (
session, VersionResult,
Entry, AsyncCache,
KeyManager, RichResult
)
OPAM_REPO_INDEX_URL = "%s/index.tar.gz"
OPAM_VERSION_PATH_PREFIX = "packages/%s/%s."
OPAM_VERSION_PATH_SUFFIX = "/opam"
OPAM_DEFAULT_REPO = 'https://opam.ocaml.org'
OPAM_DEFAULT_REPO_VERSION_URL = "%s/packages/%s/%s.%s"
def _decompress_and_list_files(data: bytes) -> List[str]:
# Convert the bytes to a file object and get a list of files
archive = tarfile.open(mode='r', fileobj=BytesIO(data))
return archive.getnames()
async def get_files(url: str) -> List[str]:
# Download the file and get its contents
res = await session.get(url)
data = res.body
# Get the file list of the archive
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _decompress_and_list_files, data)
async def get_package_versions(files: List[str], pkg: str) -> List[str]:
# Prepare the filename prefix based on the package name
prefix = OPAM_VERSION_PATH_PREFIX % (pkg , pkg)
# Only keep opam files that are relevant to the package we're working with
filtered_files = []
for filename in files:
if filename.startswith(prefix) and filename.endswith(OPAM_VERSION_PATH_SUFFIX):
filtered_files.append(filename[len(prefix):-1*len(OPAM_VERSION_PATH_SUFFIX)])
return filtered_files
async def get_version(
name: str, conf: Entry, *,
cache: AsyncCache, keymanager: KeyManager,
**kwargs,
):
pkg = conf.get('pkg', name)
repo = conf.get('repo', OPAM_DEFAULT_REPO).rstrip('/')
# Get the list of files in the repo index (see https://opam.ocaml.org/doc/Manual.html#Repositories for repo structure)
files = await cache.get(OPAM_REPO_INDEX_URL % repo, get_files) # type: ignore
# Parse the version strings from the file names
raw_versions = await get_package_versions(files, pkg)
# Convert the version strings into RichResults
versions = []
for version in raw_versions:
versions.append(RichResult(
version = version,
# There is no standardised URL scheme, so we only return an URL for the default registry
url = OPAM_DEFAULT_REPO_VERSION_URL % (repo, pkg, pkg, version) if repo == OPAM_DEFAULT_REPO else None,
))
return versions

View file

@ -1,8 +1,6 @@
# MIT licensed
# Copyright (c) 2013-2021 Th3Whit3Wolf <the.white.wolf.is.1337@gmail.com>, et al.
from nvchecker.api import RichResult
API_URL = 'https://open-vsx.org/api/%s/%s'
async def get_version(name, conf, *, cache, **kwargs):
@ -12,7 +10,4 @@ async def get_version(name, conf, *, cache, **kwargs):
extension = splitName[1]
data = await cache.get_json(API_URL % (publisher, extension))
version = data['version']
return RichResult(
version = version,
url = f'https://open-vsx.org/extension/{publisher}/{extension}/{version}',
)
return version

View file

@ -1,8 +1,6 @@
# MIT licensed
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
from nvchecker.api import RichResult
PACKAGIST_URL = 'https://packagist.org/packages/%s.json'
async def get_version(name, conf, *, cache, **kwargs):
@ -16,8 +14,4 @@ async def get_version(name, conf, *, cache, **kwargs):
}
if len(versions):
version = max(versions, key=lambda version: versions[version]["time"])
return RichResult(
version = version,
url = f'https://packagist.org/packages/{data["package"]["name"]}#{version}',
)
return max(versions, key=lambda version: versions[version]["time"])

View file

@ -6,10 +6,10 @@ import urllib.parse
import structlog
from nvchecker.api import (
VersionResult, RichResult, Entry, AsyncCache, KeyManager,
VersionResult, Entry, AsyncCache, KeyManager,
)
PAGURE_URL = 'https://%s/api/0/%s/git/tags?with_commits=true'
PAGURE_URL = 'https://%s/api/0/%s/git/tags'
logger = structlog.get_logger(logger_name=__name__)
@ -24,9 +24,5 @@ async def get_version(
url = PAGURE_URL % (host, repo)
data = await cache.get_json(url)
return [
RichResult(
version = version,
url = f'https://{host}/{repo}/tree/{version_hash}',
) for version, version_hash in data["tags"].items()
]
version = data["tags"]
return version

View file

@ -1,16 +1,9 @@
# MIT licensed
# Copyright (c) 2013-2021,2023-2024 lilydjwg <lilydjwg@gmail.com>, et al.
# Copyright (c) 2013-2021 lilydjwg <lilydjwg@gmail.com>, et al.
import structlog
from packaging.version import Version, InvalidVersion
from nvchecker.api import RichResult
logger = structlog.get_logger(logger_name=__name__)
from packaging.version import Version
async def get_version(name, conf, *, cache, **kwargs):
ret = []
package = conf.get('pypi') or name
use_pre_release = conf.get('use_pre_release', False)
@ -18,26 +11,11 @@ async def get_version(name, conf, *, cache, **kwargs):
data = await cache.get_json(url)
for version in data['releases'].keys():
# Skip versions that are marked as yanked.
if (vers := data['releases'][version]) and vers[0]['yanked']:
continue
try:
parsed_version = Version(version)
except InvalidVersion:
if data['releases'][version]:
# emit a warning if there is something under the invalid version
# sympy has an empty "0.5.13-hg" version
logger.warning('ignoring invalid version', version=version)
continue
if not use_pre_release and parsed_version.is_prerelease:
continue
ret.append(RichResult(
version = version,
url = f'https://pypi.org/project/{package}/{version}/',
))
return ret
if use_pre_release:
version = sorted(
data['releases'].keys(),
key = Version,
)[-1]
else:
version = data['info']['version']
return version

View file

@ -1,7 +1,7 @@
# MIT licensed
# Copyright (c) 2019 lilydjwg <lilydjwg@gmail.com>, et al.
from nvchecker.api import RichResult, GetVersionError
from nvchecker.api import GetVersionError
API_URL = 'https://repology.org/api/v1/project/{}'
@ -25,9 +25,5 @@ async def get_version(name, conf, *, cache, **kwargs):
raise GetVersionError('package is not found in subrepo',
repo=repo, subrepo=subrepo)
return [
RichResult(
version = pkg['version'],
url = f'https://repology.org/project/{project}/packages',
) for pkg in pkgs
]
versions = [pkg['version'] for pkg in pkgs]
return versions

View file

@ -1,84 +0,0 @@
# MIT licensed
# Copyright (c) 2024 Jakub Ružička <jru@debian.org>, et al.
import asyncio
import gzip
import pathlib
import urllib
from typing import Set
import lxml.etree
from nvchecker.api import session, AsyncCache, Entry, KeyManager, VersionResult
# XML namespaces used in repodata (dead links haha)
NS = {
'common': 'http://linux.duke.edu/metadata/common',
'repo': 'http://linux.duke.edu/metadata/repo',
'rpm': 'http://linux.duke.edu/metadata/rpm'
}
async def get_version(
name: str, conf: Entry, *,
cache: AsyncCache, keymanager: KeyManager,
**kwargs,
) -> VersionResult:
repo = conf['repo']
arch = conf.get('arch', 'binary')
pkg = conf.get('pkg')
if not pkg:
pkg = conf.get('rpmrepo', name)
repo_url = urllib.parse.urlparse(repo)
repo_path = pathlib.PurePosixPath(repo_url.path)
# get the url of repomd.xml
repomd_path = repo_path / 'repodata' / 'repomd.xml'
repomd_url = repo_url._replace(path=str(repomd_path)).geturl()
# download repomd.xml (use cache)
repomd_body = await cache.get(repomd_url, get_file) # type: ignore
# parse repomd.xml
repomd_xml = lxml.etree.fromstring(repomd_body)
# get the url of *primary.xml.gz
primary_element = repomd_xml.find('repo:data[@type="primary"]/repo:location', namespaces=NS)
primary_path = repo_path / primary_element.get('href') # type: ignore
primary_url = repo_url._replace(path=str(primary_path)).geturl()
# download and decompress *primary.xml.gz (use cache)
primary_body = await cache.get(primary_url, get_file_gz) # type: ignore
# parse *primary.xml metadata
metadata = lxml.etree.fromstring(primary_body)
# use set to eliminate duplication
versions_set: Set[str] = set()
# iterate package metadata
for el in metadata.findall(f'common:package[common:name="{pkg}"]', namespaces=NS):
pkg_arch = el.findtext('common:arch', namespaces=NS)
# filter bych arch
if arch == 'binary':
if pkg_arch == 'src':
continue
elif arch != 'any':
if pkg_arch != arch:
continue
version_info = el.find('common:version', namespaces=NS)
version = version_info.get('ver') # type: ignore
versions_set.add(version) # type: ignore
versions = list(versions_set)
return versions # type: ignore
async def get_file(url: str) -> bytes:
res = await session.get(url)
return res.body
async def get_file_gz(url: str) -> bytes:
res = await session.get(url)
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
None, gzip.decompress, res.body)

View file

@ -1,33 +0,0 @@
# MIT licensed
# Copyright (c) 2025 Maxim Slipenko <maxim@slipenko.com>, et al.
from nvchecker.api import (
GetVersionError
)
from nvchecker.httpclient.base import HTTPError
URL="https://api.snapcraft.io/v2/snaps/info/%(snap)s"
async def get_version(
name: str, conf, *,
cache, keymanager,
**kwargs,
):
try:
snap = conf.get("snap")
channel = conf.get("channel")
result = await cache.get_json(
URL % { "snap": snap },
headers={
"Snap-Device-Series": "16",
},
)
except HTTPError:
raise GetVersionError(f"Failed to request snap info for {snap}")
for c in result['channel-map']:
if c['channel']['name'] == channel:
return c['version']
raise GetVersionError(f"Failed to find version for {snap}")

View file

@ -4,25 +4,23 @@
from xml.etree import ElementTree
from nvchecker.api import session, RichResult
from nvchecker.api import session
NAMESPACE = 'http://www.andymatuschak.org/xml-namespaces/sparkle'
XML_NAMESPACE = 'http://www.w3.org/XML/1998/namespace'
SPARKLE_NAMESPACE = 'http://www.andymatuschak.org/xml-namespaces/sparkle'
async def get_version(name, conf, *, cache, **kwargs):
sparkle = conf['sparkle']
release_notes_language = conf.get('release_notes_language', 'en')
return await cache.get((sparkle, release_notes_language), get_version_impl)
return await cache.get(sparkle, get_version_impl)
async def get_version_impl(info):
sparkle, release_notes_language = info
async def get_version_impl(sparkle):
res = await session.get(sparkle)
root = ElementTree.fromstring(res.body).find('./channel/item[1]')
item = root.find('./enclosure')
root = ElementTree.fromstring(res.body)
item = root.find('./channel/item[1]/enclosure')
version_string = item.get(f'{{{SPARKLE_NAMESPACE}}}shortVersionString')
build_number = item.get(f'{{{SPARKLE_NAMESPACE}}}version')
version_string = item.get(f'{{{NAMESPACE}}}shortVersionString')
build_number = item.get(f'{{{NAMESPACE}}}version')
if (version_string and version_string.isdigit()) and (
build_number and not build_number.isdigit()
@ -36,25 +34,4 @@ async def get_version_impl(info):
if build_number and (build_number not in version):
version.append(build_number)
version_str = '-'.join(version) if version else None
release_notes_link = None
for release_notes in root.findall(f'./{{{SPARKLE_NAMESPACE}}}releaseNotesLink'):
language = release_notes.get(f'{{{XML_NAMESPACE}}}lang')
# If the release notes have no language set, store them, but keep looking for our preferred language
if language is None:
release_notes_link = release_notes.text.strip()
# If the release notes match our preferred language, store them and stop looking
if language == release_notes_language:
release_notes_link = release_notes.text.strip()
break
if release_notes_link is not None:
return RichResult(
version = version_str,
url = release_notes_link,
)
else:
return version_str
return '-'.join(version) if version else None

View file

@ -2,7 +2,7 @@
# Copyright (c) 2020 lilydjwg <lilydjwg@gmail.com>, et al.
# Copyright (c) 2017 Felix Yan <felixonmars@archlinux.org>, et al.
from nvchecker.api import RichResult, GetVersionError
from nvchecker.api import GetVersionError
URL = 'https://api.launchpad.net/1.0/ubuntu/+archive/primary?ws.op=getPublishedSources&source_name=%s&exact_match=true'
@ -42,7 +42,4 @@ async def get_version(name, conf, *, cache, **kwargs):
else:
version = releases[0]['source_package_version']
return RichResult(
version = version,
url = f'https://packages.ubuntu.com/{releases[0]["distro_series_link"].rsplit("/", 1)[-1]}/{pkg}',
)
return version

View file

@ -3,7 +3,7 @@
from nvchecker.api import (
VersionResult, Entry, AsyncCache, KeyManager,
TemporaryError, session, RichResult, GetVersionError,
TemporaryError, session, GetVersionError,
)
API_URL = 'https://marketplace.visualstudio.com/_apis/public/gallery/extensionquery'
@ -51,7 +51,4 @@ async def get_version(name: str, conf: Entry, *, cache: AsyncCache, **kwargs):
j = res.json()
version = j['results'][0]['extensions'][0]['versions'][0]['version']
return RichResult(
version = version,
url = f'https://marketplace.visualstudio.com/items?itemName={name}',
)
return version

View file

@ -5,7 +5,6 @@ build-backend = "setuptools.build_meta"
[tool.pytest.ini_options]
# addopts = -n auto
asyncio_mode = "strict"
asyncio_default_fixture_loop_scope = "session"
# build and upload
# rm -rf dist && python -m build --no-isolation && twine check dist/* && twine upload dist/*
# rm -rf dist && python -m build --no-isolation && twine check dist/* && twine upload -s dist/*

View file

@ -2,6 +2,11 @@
oldver = "old_ver.json"
newver = "new_ver.json"
[vim]
source = "regex"
regex = "7\\.3\\.\\d+"
url = "http://ftp.vim.org/pub/vim/patches/7.3/"
[google-chrome]
source = "cmd"
cmd = '''wget -qO- http://dl.google.com/linux/chrome/rpm/stable/x86_64/repodata/other.xml.gz | zgrep -A1 "google-chrome-stable" | awk -F\" '/version/ {print $4"-"$6}' '''
@ -20,13 +25,17 @@ github = "lilydjwg/nvchecker"
[ssed]
source = "regex"
regex = "The current version is ([\\d.]+)\\."
url = "https://sed.sourceforge.net/grabbag/ssed/"
url = "http://sed.sourceforge.net/grabbag/ssed/"
proxy = "http://localhost:8087"
[PySide]
source = "pypi"
pypi = "nvchecker"
pypi = "PySide"
[test]
source = "manual"
manual = "0.1"
["Sparkle Test App"]
source = "sparkle"
sparkle = "https://sparkle-project.org/files/sparkletestcast.xml"

View file

@ -18,7 +18,7 @@ _handler_precedence = (
BOOL_KEYS = [
'strip_release', 'use_last_modified',
'use_latest_release', 'use_latest_tag',
'use_max_release', 'use_max_tag', 'use_pre_release',
'use_max_tag', 'use_pre_release',
]
INT_KEYS = [

View file

@ -26,11 +26,6 @@ def get_args():
parser.add_argument('-k', '--keyfile',
metavar='FILE', type=str,
help='use specified keyfile (override the one in configuration file)')
parser.add_argument('-t', '--tries', default=1, type=int, metavar='N',
help='try N times when network errors occur')
parser.add_argument('--failures', action='store_true',
help='exit with code 3 if failures / errors happen during checking')
return parser.parse_args()
def main():
@ -48,10 +43,6 @@ def main():
cmd.extend(['-c', args.file])
if args.keyfile:
cmd.extend(['-k', args.keyfile])
if args.tries:
cmd.extend(['-t', str(args.tries)])
if args.failures:
cmd.append('--failures')
process = subprocess.Popen(cmd, pass_fds=(wfd,))
os.close(wfd)

View file

@ -1,40 +0,0 @@
#!/bin/bash -e
mitmdump=${mitmdump:-mitmdump}
if [[ -f ~/.mitmproxy/nvdump ]]; then
$mitmdump -S ~/.mitmproxy/nvdump -p 7890 --ignore-hosts '127\.0\.0\.1' --server-replay-reuse --server-replay-extra=forward -w newdump >mitmdump_output &
else
$mitmdump -w ~/.mitmproxy/nvdump -p 7890 --ignore-hosts '127\.0\.0\.1' >mitmdump_output &
fi
mitm_pid=$!
on_exit () {
kill -INT $mitm_pid
if [[ -s newdump ]]; then
cat newdump >> ~/.mitmproxy/nvdump
fi
cat mitmdump_output
}
trap on_exit EXIT
if [[ -f keyfile.toml ]]; then
export KEYFILE=keyfile.toml
fi
for _ in {1..10}; do
if [[ -s ~/.mitmproxy/mitmproxy-ca-cert.pem ]]; then
break
fi
sleep 1
done
export SSL_CERT_FILE=$HOME/.mitmproxy/mitmproxy-ca-cert.pem
export GIT_SSL_CAINFO=$SSL_CERT_FILE
export http_proxy=http://localhost:7890 https_proxy=http://localhost:7890
pytest

View file

@ -24,12 +24,10 @@ classifiers =
Programming Language :: Python
Programming Language :: Python :: 3
Programming Language :: Python :: 3 :: Only
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Programming Language :: Python :: 3.10
Programming Language :: Python :: 3.11
Programming Language :: Python :: 3.12
Programming Language :: Python :: 3.13
Topic :: Internet
Topic :: Internet :: WWW/HTTP
Topic :: Software Development
@ -39,10 +37,10 @@ classifiers =
[options]
zip_safe = True
python_requires = >=3.8
packages = find_namespace:
install_requires =
setuptools; python_version<"3.8"
tomli; python_version<"3.11"
structlog
platformdirs
@ -64,10 +62,6 @@ pypi =
packaging
htmlparser =
lxml
rpmrepo =
lxml
jq =
jq
[options.entry_points]
console_scripts =

View file

@ -1,11 +1,11 @@
# MIT licensed
# Copyright (c) 2020, 2024 lilydjwg <lilydjwg@gmail.com>, et al.
# Copyright (c) 2020 lilydjwg <lilydjwg@gmail.com>, et al.
import asyncio
import structlog
import os
from pathlib import Path
from typing import TYPE_CHECKING, Dict
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import tomli as tomllib
@ -20,13 +20,13 @@ import pytest_asyncio
from nvchecker import core
from nvchecker import __main__ as main
from nvchecker.util import Entries, ResultData, RawResult
from nvchecker.util import Entries, VersData, RawResult
use_keyfile = False
async def run(
entries: Entries, max_concurrency: int = 20,
) -> Dict[str, str]:
) -> VersData:
task_sem = asyncio.Semaphore(max_concurrency)
result_q: asyncio.Queue[RawResult] = asyncio.Queue()
keyfile = os.environ.get('KEYFILE')
@ -43,14 +43,14 @@ async def run(
keymanager, entry_waiter, 1, {},
)
oldvers: ResultData = {}
oldvers: VersData = {}
result_coro = core.process_result(oldvers, result_q, entry_waiter)
runner_coro = core.run_tasks(futures)
results, _has_failures = await main.run(result_coro, runner_coro)
return {k: r.version for k, r in results.items()}
vers, _has_failures = await main.run(result_coro, runner_coro)
return vers
@pytest_asyncio.fixture(scope="session")
@pytest_asyncio.fixture(scope="module")
async def get_version():
async def __call__(name, config):
entries = {name: config}
@ -59,7 +59,7 @@ async def get_version():
return __call__
@pytest_asyncio.fixture(scope="session")
@pytest_asyncio.fixture(scope="module")
async def run_str():
async def __call__(str):
entries = tomllib.loads(str)
@ -68,7 +68,7 @@ async def run_str():
return __call__
@pytest_asyncio.fixture(scope="session")
@pytest_asyncio.fixture(scope="module")
async def run_str_multi():
async def __call__(str):
entries = tomllib.loads(str)
@ -77,6 +77,15 @@ async def run_str_multi():
return __call__
loop = asyncio.new_event_loop()
@pytest.fixture(scope="session")
def event_loop(request):
"""Override pytest-asyncio's event_loop fixture,
Don't create an instance of the default event loop for each test case.
We need the same ioloop across tests for the aiohttp support.
"""
yield loop
@pytest.fixture(scope="session", autouse=True)
def raise_on_logger_msg():
def proc(logger, method_name, event_dict):

View file

@ -1,45 +1,20 @@
# MIT licensed
# Copyright (c) 2023 Pekka Ristola <pekkarr [at] protonmail [dot] com>, et al.
import pathlib
import shutil
import subprocess
import tempfile
import pytest
pytestmark = [
pytest.mark.asyncio,
pytest.mark.skipif(shutil.which('pacman') is None, reason='requires pacman command'),
pytest.mark.skipif(shutil.which('fakeroot') is None, reason='requires fakeroot command'),
]
global temp_dir, db_path
def setup_module(module):
global temp_dir, db_path
temp_dir = tempfile.TemporaryDirectory()
temp_path = pathlib.Path(temp_dir.name)
db_path = temp_path / 'test-db'
db_path.mkdir(exist_ok=True)
cmd = ['fakeroot', 'pacman', '-Fy', '--dbpath', db_path]
subprocess.check_call(cmd)
def teardown_module(module):
temp_dir.cleanup()
async def test_alpmfiles(get_version):
assert await get_version('test', {
'source': 'alpmfiles',
'pkgname': 'libuv',
'filename': 'usr/lib/libuv\\.so\\.([^.]+)',
'dbpath': db_path,
}) == '1'
async def test_alpmfiles_strip(get_version):
@ -49,5 +24,5 @@ async def test_alpmfiles_strip(get_version):
'repo': 'core',
'filename': 'libc\\.so\\.[^.]+',
'strip_dir': True,
'dbpath': db_path,
'dbpath': '/var/lib/pacman',
}) == 'libc.so.6'

View file

@ -5,7 +5,6 @@
import pytest
pytestmark = [pytest.mark.asyncio, pytest.mark.needs_net]
@pytest.mark.flaky(reruns=10)
async def test_android_addon(get_version):
assert await get_version("android-google-play-apk-expansion", {
"source": "android_sdk",
@ -14,12 +13,11 @@ async def test_android_addon(get_version):
}) == "1.r03"
async def test_android_package(get_version):
version = await get_version("android-sdk-cmake", {
assert await get_version("android-sdk-cmake", {
"source": "android_sdk",
"android_sdk": "cmake;",
"repo": "package",
})
assert version.startswith("3.")
}) == "3.22.1"
async def test_android_package_channel(get_version):
@ -28,7 +26,7 @@ async def test_android_package_channel(get_version):
"android_sdk": "ndk;",
"repo": "package",
"channel": "beta,dev,canary",
}) == "26.0.10636728"
}) == "25.0.8528842"
async def test_android_list(get_version):
assert await get_version("android-sdk-cmake-older", {
@ -39,7 +37,7 @@ async def test_android_list(get_version):
}) == "3.10.2"
async def test_android_package_os(get_version):
assert await get_version("android-usb-driver", {
await get_version("android-usb-driver", {
"source": "android_sdk",
"android_sdk": "extras;google;usb_driver",
"repo": "addon",
@ -47,7 +45,7 @@ async def test_android_package_os(get_version):
}) == "13"
async def test_android_package_os_missing(get_version):
assert await get_version("android-usb-driver", {
await get_version("android-usb-driver", {
"source": "android_sdk",
"android_sdk": "extras;google;usb_driver",
"repo": "addon",

View file

@ -13,10 +13,3 @@ async def test_anitya(get_version):
"anitya": "fedora/shutter",
})
assert re.match(r"[0-9.]+", version)
async def test_anitya_by_id(get_version):
version = await get_version("shutter", {
"source": "anitya",
"anitya_id": "4813",
})
assert re.match(r"[0-9.]+", version)

View file

@ -2,18 +2,19 @@
# Copyright (c) 2020-2021 lilydjwg <lilydjwg@gmail.com>, et al.
# Copyright (c) 2017 Felix Yan <felixonmars@archlinux.org>, et al.
from flaky import flaky
import pytest
pytestmark = [pytest.mark.asyncio, pytest.mark.needs_net]
@pytest.mark.flaky(reruns=10)
@flaky(max_runs=10)
async def test_apt(get_version):
assert await get_version("sigrok-firmware-fx2lafw", {
"source": "apt",
"mirror": "http://deb.debian.org/debian/",
"suite": "sid",
}) == "0.1.7-3"
}) == "0.1.7-1"
@pytest.mark.flaky(reruns=10)
@flaky(max_runs=10)
async def test_apt_srcpkg(get_version):
ver = await get_version("test", {
"source": "apt",
@ -23,7 +24,7 @@ async def test_apt_srcpkg(get_version):
})
assert ver.startswith("0.0~git20150829.56e4718-")
@pytest.mark.flaky(reruns=10)
@flaky(max_runs=10)
async def test_apt_strip_release(get_version):
assert await get_version("sigrok-firmware-fx2lafw", {
"source": "apt",
@ -32,8 +33,7 @@ async def test_apt_strip_release(get_version):
"strip_release": 1,
}) == "0.1.7"
@pytest.mark.skip
@pytest.mark.flaky(reruns=10)
@flaky(max_runs=10)
async def test_apt_deepin(get_version):
assert await get_version("sigrok-firmware-fx2lafw", {
"source": "apt",

View file

@ -1,34 +1,35 @@
# MIT licensed
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
from flaky import flaky
import pytest
pytestmark = [pytest.mark.asyncio, pytest.mark.needs_net]
@pytest.mark.flaky
@flaky
async def test_archpkg(get_version):
assert await get_version("base", {
"source": "archpkg",
}) == "3-2"
}) == "3-1"
@pytest.mark.flaky
@flaky
async def test_archpkg_strip_release(get_version):
assert await get_version("base", {
"source": "archpkg",
"strip_release": True,
}) == "3"
@pytest.mark.flaky
@flaky
async def test_archpkg_provided(get_version):
assert await get_version("dbus", {
"source": "archpkg",
"provided": "libdbus-1.so",
}) == "3-64"
@pytest.mark.flaky
@flaky
async def test_archpkg_provided_strip(get_version):
int(await get_version("jsoncpp", {
assert await get_version("jsoncpp", {
"source": "archpkg",
"provided": "libjsoncpp.so",
"strip_release": True,
}))
}) == "25"

View file

@ -1,24 +1,29 @@
# MIT licensed
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
import os
from flaky import flaky
import pytest
pytestmark = [pytest.mark.asyncio,
pytest.mark.needs_net]
pytest.mark.needs_net,
pytest.mark.skipif(os.environ.get('TRAVIS') == 'true',
reason="fail too often")]
@pytest.mark.flaky(reruns=10)
@flaky(max_runs=10)
async def test_aur(get_version):
assert await get_version("ssed", {
"source": "aur",
}) == "3.62-2"
@pytest.mark.flaky(reruns=10)
@flaky(max_runs=10)
async def test_aur_strip_release(get_version):
assert await get_version("ssed", {
"source": "aur",
"strip_release": 1,
}) == "3.62"
@pytest.mark.flaky(reruns=10)
@flaky(max_runs=10)
async def test_aur_use_last_modified(get_version):
assert await get_version("ssed", {
"source": "aur",

View file

@ -1,12 +1,8 @@
# MIT licensed
# Copyright (c) 2020 Chih-Hsuan Yen <yan12125 at gmail dot com>
import os
import datetime
import pytest
pytestmark = [pytest.mark.asyncio, pytest.mark.needs_net,
pytest.mark.skipif(bool(os.environ.get('GITHUB_RUN_ID')), reason="400 very often")]
pytestmark = [pytest.mark.asyncio, pytest.mark.needs_net]
async def test_container(get_version):
assert await get_version("hello-world", {
@ -15,31 +11,6 @@ async def test_container(get_version):
"include_regex": "linux",
}) == "linux"
async def test_container_with_tag(get_version):
update_time = await get_version("bitnami/mongodb:5.0", {
"source": "container",
"container": "bitnami/mongodb:5.0",
})
# the update time is changing occasionally, so we can not compare the exact time, otherwise the test will be failed in the future
assert datetime.date.fromisoformat(update_time.split('T')[0]) > datetime.date(2023, 12, 1)
async def test_container_with_tag_and_multi_arch(get_version):
update_time = await get_version("hello-world:linux", {
"source": "container",
"container": "library/hello-world:linux",
})
# the update time is changing occasionally, so we can not compare the exact time, otherwise the test will be failed in the future
assert datetime.date.fromisoformat(update_time.split('T')[0]) > datetime.date(2023, 1, 1)
async def test_container_with_tag_and_registry(get_version):
update_time = await get_version("hello-world-nginx:v1.0", {
"source": "container",
"registry": "quay.io",
"container": "redhattraining/hello-world-nginx:v1.0",
})
# the update time probably won't be changed
assert datetime.date.fromisoformat(update_time.split('T')[0]) == datetime.date(2019, 6, 26)
async def test_container_paging(get_version):
assert await get_version("prometheus-operator", {
"source": "container",

View file

@ -7,4 +7,4 @@ pytestmark = [pytest.mark.asyncio, pytest.mark.needs_net]
async def test_cran(get_version):
assert await get_version("xml2", {
"source": "cran",
}) == "1.3.7"
}) == "1.3.4"

View file

@ -7,24 +7,4 @@ pytestmark = [pytest.mark.asyncio, pytest.mark.needs_net]
async def test_cratesio(get_version):
assert await get_version("example", {
"source": "cratesio",
}) == "1.1.0"
async def test_cratesio_list(get_version):
assert await get_version("example", {
"source": "cratesio",
"include_regex": r"^1\.0.*",
}) == "1.0.2"
async def test_cratesio_skip_prerelease(get_version):
with pytest.raises(RuntimeError, match='include_regex matched no versions'):
await get_version("cargo-lock", {
"source": "cratesio",
"include_regex": r".*-.*",
})
async def test_cratesio_use_prerelease(get_version):
await get_version("cargo-lock", {
"source": "cratesio",
"use_pre_release": "true",
"include_regex": r".*-.*",
})
}) == "0.1.0"

View file

@ -2,23 +2,24 @@
# Copyright (c) 2020 lilydjwg <lilydjwg@gmail.com>, et al.
# Copyright (c) 2017 Felix Yan <felixonmars@archlinux.org>, et al.
from flaky import flaky
import pytest
pytestmark = [pytest.mark.asyncio, pytest.mark.needs_net]
@pytest.mark.flaky(reruns=10)
@flaky(max_runs=10)
async def test_debianpkg(get_version):
assert await get_version("sigrok-firmware-fx2lafw", {
"source": "debianpkg",
}) == "0.1.7-3"
}) == "0.1.7-1"
@pytest.mark.flaky(reruns=10)
@flaky(max_runs=10)
async def test_debianpkg_strip_release(get_version):
assert await get_version("sigrok-firmware-fx2lafw", {
"source": "debianpkg",
"strip_release": 1,
}) == "0.1.7"
@pytest.mark.flaky(reruns=10)
@flaky(max_runs=10)
async def test_debianpkg_suite(get_version):
assert await get_version("sigrok-firmware-fx2lafw", {
"source": "debianpkg",

View file

@ -1,19 +1,20 @@
# MIT licensed
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
from flaky import flaky
import pytest
pytestmark = [pytest.mark.asyncio,
pytest.mark.needs_net]
@pytest.mark.flaky(reruns=10)
@flaky(max_runs=10)
async def test_gitea(get_version):
ver = await get_version("example", {
"source": "gitea",
"gitea": "gitea/tea"})
assert ver.startswith('20')
assert 'T' in ver
assert len(ver) == 8
assert ver.isdigit()
@pytest.mark.flaky(reruns=10)
@flaky(max_runs=10)
async def test_gitea_max_tag_with_include(get_version):
assert await get_version("example", {
"source": "gitea",

View file

@ -24,33 +24,9 @@ async def test_github_default_not_master(get_version):
async def test_github_latest_release(get_version):
assert await get_version("example", {
"source": "github",
"github": "dpeukert/ReleaseTestRepo",
"github": "harry-sanabria/ReleaseTestRepo",
"use_latest_release": True,
}) == "v0.0.0"
async def test_github_latest_release_include_prereleases(get_version):
assert await get_version("example", {
"source": "github",
"github": "dpeukert/ReleaseTestRepo",
"use_latest_release": True,
"include_prereleases": True,
}) == "v0.0.1-pre"
async def test_github_latest_release_with_release_name(get_version):
version = await get_version("example", {
"source": "github",
"github": "mamba-org/mamba",
"use_latest_release": True,
})
assert version.startswith('20') # tag name
version = await get_version("example", {
"source": "github",
"github": "mamba-org/mamba",
"use_latest_release": True,
"use_release_name": True,
})
assert not version.startswith('20') # release name
}) == "release3"
async def test_github_max_tag(get_version):
assert await get_version("example", {
@ -59,20 +35,6 @@ async def test_github_max_tag(get_version):
"use_max_tag": True,
}) == "second_release"
async def test_github_max_release(get_version):
assert await get_version("example", {
"source": "github",
"github": "harry-sanabria/ReleaseTestRepo",
"use_max_release": True,
}) == "second_release"
assert await get_version("example", {
"source": "github",
"github": "harry-sanabria/ReleaseTestRepo",
"use_max_release": True,
"use_release_name": True,
}) == "second_release"
async def test_github_max_tag_with_ignored(get_version):
assert await get_version("example", {
"source": "github",
@ -81,21 +43,6 @@ async def test_github_max_tag_with_ignored(get_version):
"ignored": "second_release release3",
}) == "first_release"
async def test_github_max_release_with_ignored(get_version):
assert await get_version("example", {
"source": "github",
"github": "harry-sanabria/ReleaseTestRepo",
"use_max_release": True,
"ignored": "second_release release3",
}) == "first_release"
assert await get_version("example", {
"source": "github",
"github": "harry-sanabria/ReleaseTestRepo",
"use_max_release": True,
"ignored": "second_release",
"use_release_name": True,
}) == "release #3"
async def test_github_with_path(get_version):
assert await get_version("example", {
"source": "github",
@ -120,16 +67,6 @@ async def test_github_max_tag_with_include(get_version):
})
assert re.match(r'chrome-[\d.]+', version)
async def test_github_max_release_with_include(get_version):
version = await get_version("example", {
"source": "github",
"github": "EFForg/https-everywhere",
"use_max_release": True,
"use_release_name": True,
"include_regex": r"Release \d.*",
})
assert re.match(r'Release [\d.]+', version)
async def test_github_latest_tag(get_version):
assert await get_version("example", {
"source": "github",

View file

@ -1,38 +0,0 @@
# MIT licensed
# Copyright (c) 2024 bgme <i@bgme.me>.
import pytest
from nvchecker.api import HTTPError
try:
import lxml
lxml_available = True
except ImportError:
lxml_available = False
pytestmark = [
pytest.mark.asyncio,
pytest.mark.needs_net,
pytest.mark.skipif(not lxml_available, reason="needs lxml")
]
async def test_go(get_version):
ver = await get_version("one version", {
"source": "go",
"go": "github.com/caddyserver/replace-response",
})
assert ver.startswith("v0.0.0-")
assert await get_version("multiple version", {
"source": "go",
"go": "github.com/corazawaf/coraza-caddy",
}) == "v1.2.2"
with pytest.raises(HTTPError):
await get_version("not found", {
"source": "go",
"go": "github.com/asdas/sadfasdf",
})

View file

@ -1,10 +1,11 @@
# MIT licensed
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
from flaky import flaky
import pytest
pytestmark = [pytest.mark.asyncio, pytest.mark.needs_net]
@pytest.mark.flaky(reruns=10)
@flaky(max_runs=10)
async def test_hackage(get_version):
assert await get_version("sessions", {
"source": "hackage",

View file

@ -1,31 +1,24 @@
# MIT licensed
# Copyright (c) 2021,2024 lilydjwg <lilydjwg@gmail.com>, et al.
# Copyright (c) 2021 lilydjwg <lilydjwg@gmail.com>, et al.
import pytest
import pytest_httpbin
assert pytest_httpbin # for pyflakes
httpbin_available = True
try:
import pytest_httpbin
assert pytest_httpbin # for pyflakes
except ImportError:
httpbin_available = False
pytestmark = [pytest.mark.asyncio, pytest.mark.needs_net]
pytestmark = pytest.mark.asyncio
@pytest.mark.needs_net
async def test_redirection(get_version):
assert await get_version("unifiedremote", {
"source": "httpheader",
"url": "https://www.unifiedremote.com/download/linux-x64-deb",
"regex": r'urserver-([\d.]+).deb',
}) is not None
assert await get_version("unifiedremote", {
"source": "httpheader",
"url": "https://www.unifiedremote.com/download/linux-x64-deb",
"regex": r'urserver-([\d.]+).deb',
}) is not None
@pytest.mark.skipif(not httpbin_available, reason="needs pytest_httpbin")
async def test_get_version_withtoken(get_version, httpbin):
assert await get_version("unifiedremote", {
"source": "httpheader",
"url": httpbin.url + "/basic-auth/username/superpassword",
"httptoken": "Basic dXNlcm5hbWU6c3VwZXJwYXNzd29yZA==",
"header": "server",
"regex": r'([0-9.]+)*',
}) is not None
assert await get_version("unifiedremote", {
"source": "httpheader",
"url": httpbin.url + "/basic-auth/username/superpassword",
"httptoken": "Basic dXNlcm5hbWU6c3VwZXJwYXNzd29yZA==",
"header": "server",
"regex": r'([0-9.]+)*',
}) is not None

View file

@ -1,33 +0,0 @@
# MIT licensed
# Copyright (c) 2024 Rocket Aaron <i@rocka.me>, et al.
import pytest
jq_available = True
try:
import jq
except ImportError:
jq_available = False
pytestmark = [
pytest.mark.asyncio,
pytest.mark.needs_net,
pytest.mark.skipif(not jq_available, reason="needs jq"),
]
async def test_jq(get_version):
ver = await get_version("aur", {
"source": "jq",
"url": "https://aur.archlinux.org/rpc/v5/info?arg[]=nvchecker-git"
})
ver = ver.strip()
assert ver.startswith("{")
assert ver.endswith("}")
async def test_jq_filter(get_version):
ver = await get_version("aur", {
"source": "jq",
"url": "https://aur.archlinux.org/rpc/v5/info?arg[]=nvchecker-git",
"filter": '.results[0].PackageBase',
})
assert ver == "nvchecker-git"

View file

@ -1,16 +0,0 @@
# MIT Licensed
# Copyright (c) 2024 Bert Peters <bertptrs@archlinux.org>, et al.
import pytest
pytestmark = [pytest.mark.asyncio, pytest.mark.needs_net]
@pytest.mark.flaky(reruns=10)
async def test_launchpad(get_version):
version = await get_version(
"sakura",
{
"source": "launchpad",
"launchpad": "sakura",
}
)
assert version == '3.8.8'

View file

@ -1,15 +0,0 @@
# MIT licensed
# Copyright (c) 2020 Felix Yan <felixonmars@archlinux.org>, et al.
import pytest
pytestmark = [
pytest.mark.asyncio,
pytest.mark.needs_net,
]
@pytest.mark.skip
async def test_mercurial(get_version):
assert await get_version("example", {
"source": "mercurial",
"mercurial": "https://repo.mercurial-scm.org/hg-website/json-tags",
}) == "v1.0"

View file

@ -1,25 +0,0 @@
# MIT licensed
# Copyright (c) 2024 Daniel Peukert <daniel@peukert.cc>, et al.
import pytest
pytestmark = [pytest.mark.asyncio, pytest.mark.needs_net]
async def test_opam_official(get_version):
assert await get_version("test", {
"source": "opam",
"pkg": "omigrate",
}) == "0.3.2"
async def test_opam_coq(get_version):
assert await get_version("test", {
"source": "opam",
"repo": "https://coq.inria.fr/opam/released",
"pkg": "coq-abp",
}) == "8.10.0"
async def test_opam_coq_trailing_slash(get_version):
assert await get_version("test", {
"source": "opam",
"repo": "https://coq.inria.fr/opam/released/",
"pkg": "coq-abp",
}) == "8.10.0"

View file

@ -13,7 +13,7 @@ pytestmark = [pytest.mark.asyncio,
async def test_pacman(get_version):
assert await get_version("base", {
"source": "pacman",
}) == "3-2"
}) == "3-1"
async def test_pacman_strip_release(get_version):
assert await get_version("base", {

View file

@ -20,20 +20,3 @@ async def test_pypi_pre_release(get_version):
"source": "pypi",
"use_pre_release": 1,
}) == "1.0.1a1"
async def test_pypi_list(get_version):
assert await get_version("urllib3", {
"source": "pypi",
"include_regex": "^1\\..*",
}) == "1.26.20"
async def test_pypi_invalid_version(get_version):
await get_version("sympy", {
"source": "pypi",
})
async def test_pypi_yanked_version(get_version):
assert await get_version("urllib3", {
"source": "pypi",
"include_regex": "^(1\\..*)|(2\\.0\\.[0,1])",
}) == "1.26.20"

View file

@ -1,138 +1,130 @@
# MIT licensed
# Copyright (c) 2013-2020,2024 lilydjwg <lilydjwg@gmail.com>, et al.
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
import base64
import pytest
import pytest_httpbin
assert pytest_httpbin # for pyflakes
httpbin_available = True
try:
import pytest_httpbin
assert pytest_httpbin # for pyflakes
except ImportError:
httpbin_available = False
pytestmark = [
pytest.mark.asyncio,
pytest.mark.skipif(not httpbin_available, reason="needs pytest_httpbin"),
]
pytestmark = pytest.mark.asyncio
def base64_encode(s):
return base64.b64encode(s.encode('utf-8')).decode('ascii')
return base64.b64encode(s.encode('utf-8')).decode('ascii')
async def test_regex_httpbin_default_user_agent(get_version, httpbin):
ua = await get_version("example", {
"source": "regex",
"url": httpbin.url + "/get",
"regex": r'"User-Agent":\s*"([^"]+)"',
})
assert ua.startswith("lilydjwg/nvchecker")
ua = await get_version("example", {
"source": "regex",
"url": httpbin.url + "/get",
"regex": r'"User-Agent":\s*"([^"]+)"',
})
assert ua.startswith("lilydjwg/nvchecker")
async def test_regex_httpbin_user_agent(get_version, httpbin):
assert await get_version("example", {
"source": "regex",
"url": httpbin.url + "/get",
"regex": r'"User-Agent":\s*"(\w+)"',
"user_agent": "Meow",
}) == "Meow"
assert await get_version("example", {
"source": "regex",
"url": httpbin.url + "/get",
"regex": r'"User-Agent":\s*"(\w+)"',
"user_agent": "Meow",
}) == "Meow"
async def test_regex(get_version, httpbin):
assert await get_version("example", {
"source": "regex",
"url": httpbin.url + "/base64/" + base64_encode("version 1.12 released"),
"regex": r'version ([0-9.]+)',
}) == "1.12"
assert await get_version("example", {
"source": "regex",
"url": httpbin.url + "/base64/" + base64_encode("version 1.12 released"),
"regex": r'version ([0-9.]+)',
}) == "1.12"
async def test_missing_ok(get_version, httpbin):
assert await get_version("example", {
"source": "regex",
"url": httpbin.url + "/base64/" + base64_encode("something not there"),
"regex": "foobar",
"missing_ok": True,
}) is None
assert await get_version("example", {
"source": "regex",
"url": httpbin.url + "/base64/" + base64_encode("something not there"),
"regex": "foobar",
"missing_ok": True,
}) is None
async def test_missing(get_version, httpbin):
with pytest.raises(RuntimeError):
await get_version("example", {
"source": "regex",
"url": httpbin.url + "/base64/" + base64_encode("something not there"),
"regex": "foobar",
})
with pytest.raises(RuntimeError):
await get_version("example", {
"source": "regex",
"url": httpbin.url + "/base64/" + base64_encode("something not there"),
"regex": "foobar",
})
async def test_multi_group(get_version, httpbin):
with pytest.raises(RuntimeError):
await get_version("example", {
"source": "regex",
"url": httpbin.url + "/base64/" + base64_encode("1.2"),
"regex": r"(\d+)\.(\d+)",
})
with pytest.raises(RuntimeError):
await get_version("example", {
"source": "regex",
"url": httpbin.url + "/base64/" + base64_encode("1.2"),
"regex": r"(\d+)\.(\d+)",
})
async def test_regex_with_tokenBasic(get_version, httpbin):
assert await get_version("example", {
"source": "regex",
"url": httpbin.url + "/basic-auth/username/superpassword",
"httptoken": "Basic dXNlcm5hbWU6c3VwZXJwYXNzd29yZA==",
"regex": r'"user":\s*"([a-w]+)"',
}) == "username"
assert await get_version("example", {
"source": "regex",
"url": httpbin.url + "/basic-auth/username/superpassword",
"httptoken": "Basic dXNlcm5hbWU6c3VwZXJwYXNzd29yZA==",
"regex": r'"user":"([a-w]+)"',
}) == "username"
async def test_regex_with_tokenBearer(get_version, httpbin):
assert await get_version("example", {
"source": "regex",
"url": httpbin.url + "/bearer",
"httptoken": "Bearer username:password",
"regex": r'"token":\s*"([a-w]+):.*"',
}) == "username"
assert await get_version("example", {
"source": "regex",
"url": httpbin.url + "/bearer",
"httptoken": "Bearer username:password",
"regex": r'"token":"([a-w]+):.*"',
}) == "username"
async def test_regex_no_verify_ssl(get_version, httpbin_secure):
assert await get_version("example", {
"source": "regex",
"url": httpbin_secure.url + "/base64/" + base64_encode("version 1.12 released"),
"regex": r'version ([0-9.]+)',
"verify_cert": False,
}) == "1.12"
assert await get_version("example", {
"source": "regex",
"url": httpbin_secure.url + "/base64/" + base64_encode("version 1.12 released"),
"regex": r'version ([0-9.]+)',
"verify_cert": False,
}) == "1.12"
async def test_regex_bad_ssl(get_version, httpbin_secure):
try:
await get_version("example", {
"source": "regex",
"url": httpbin_secure.url + "/base64/" + base64_encode("version 1.12 released"),
"regex": r'version ([0-9.]+)',
})
except Exception:
pass
else:
assert False, 'certificate should not be trusted'
try:
await get_version("example", {
"source": "regex",
"url": httpbin_secure.url + "/base64/" + base64_encode("version 1.12 released"),
"regex": r'version ([0-9.]+)',
})
except Exception:
pass
else:
assert False, 'certificate should not be trusted'
async def test_regex_post(get_version, httpbin):
assert await get_version("example", {
"source": "regex",
"url": httpbin.url + "/post",
"regex": r'"ABCDEF":\s*"(\w+)"',
"post_data": "ABCDEF=234&CDEFG=xyz"
}) == "234"
assert await get_version("example", {
"source": "regex",
"url": httpbin.url + "/post",
"regex": r'"ABCDEF":\s*"(\w+)"',
"post_data": "ABCDEF=234&CDEFG=xyz"
}) == "234"
async def test_regex_post2(get_version, httpbin):
assert await get_version("example", {
"source": "regex",
"url": httpbin.url + "/post",
"regex": r'"CDEFG":\s*"(\w+)"',
"post_data": "ABCDEF=234&CDEFG=xyz"
}) == "xyz"
assert await get_version("example", {
"source": "regex",
"url": httpbin.url + "/post",
"regex": r'"CDEFG":\s*"(\w+)"',
"post_data": "ABCDEF=234&CDEFG=xyz"
}) == "xyz"
async def test_regex_post_json(get_version, httpbin):
assert await get_version("example", {
"source": "regex",
"url": httpbin.url + "/post",
"regex": r'"ABCDEF":\s*(\w+)',
"post_data": '{"ABCDEF":234,"CDEFG":"xyz"}',
"post_data_type": "application/json"
}) == "234"
assert await get_version("example", {
"source": "regex",
"url": httpbin.url + "/post",
"regex": r'"ABCDEF":\s*(\w+)',
"post_data": '{"ABCDEF":234,"CDEFG":"xyz"}',
"post_data_type": "application/json"
}) == "234"
async def test_regex_post_json2(get_version, httpbin):
assert await get_version("example", {
"source": "regex",
"url": httpbin.url + "/post",
"regex": r'"CDEFG":\s*"(\w+)"',
"post_data": '{"ABCDEF":234,"CDEFG":"xyz"}',
"post_data_type": "application/json"
}) == "xyz"
assert await get_version("example", {
"source": "regex",
"url": httpbin.url + "/post",
"regex": r'"CDEFG":\s*"(\w+)"',
"post_data": '{"ABCDEF":234,"CDEFG":"xyz"}',
"post_data_type": "application/json"
}) == "xyz"

View file

@ -5,14 +5,12 @@ import pytest
pytestmark = [pytest.mark.asyncio,
pytest.mark.needs_net]
@pytest.mark.flaky(reruns=10)
async def test_repology(get_version):
assert await get_version("ssed", {
"source": "repology",
"repo": "aur",
}) == "3.62"
@pytest.mark.flaky(reruns=10)
async def test_repology_subrepo(get_version):
assert await get_version("asciiquarium", {
"source": "repology",

View file

@ -1,19 +0,0 @@
# MIT licensed
# Copyright (c) 2024 Jakub Ružička <jru@debian.org>, et al.
import pytest
pytestmark = [pytest.mark.asyncio, pytest.mark.needs_net]
async def test_rpmrepo_fedora(get_version):
assert await get_version("knot_fedora-39", {
"source": "rpmrepo",
"pkg": "knot",
"repo": "http://ftp.sh.cvut.cz/fedora/linux/updates/39/Everything/x86_64/",
}) == "3.3.9"
async def test_rpmrepo_alma(get_version):
assert await get_version("knot_fedora-39", {
"source": "rpmrepo",
"pkg": "tmux",
"repo": "http://ftp.sh.cvut.cz/almalinux/9.5/BaseOS/x86_64/os/",
}) == "3.2a"

View file

@ -1,28 +0,0 @@
# MIT licensed
# Copyright (c) 2025 Maxim Slipenko <maxim@slipenko.com>, et al.
import pytest
pytestmark = [pytest.mark.asyncio, pytest.mark.needs_net]
async def test_snapcraft(get_version):
assert await get_version("test", {
"source": "snapcraft",
"snap": "test-snapd-public",
"channel": "edge",
}) == "2.0"
async def test_snapcraft_non_existent_snap(get_version):
with pytest.raises(RuntimeError, match='Failed to request snap info for not-existent-snap'):
assert await get_version("test", {
"source": "snapcraft",
"snap": "not-existent-snap",
"channel": "stable",
})
async def test_snapcraft_non_existent_channel(get_version):
with pytest.raises(RuntimeError, match='Failed to find version for test-snapd-public'):
assert await get_version("test", {
"source": "snapcraft",
"snap": "test-snapd-public",
"channel": "non-existent-channel",
})

View file

@ -45,8 +45,8 @@ async def test_substitute_regex_empty_to_pattern(get_version):
async def test_substitute_prefix_has_higher_priority(get_version):
assert await get_version("example", {
"source": "manual",
"manual": "Version 1.2 Beta 3",
"prefix": "Version ",
"from_pattern": r" Beta ",
"to_pattern": r"b",
}) == "1.2b3"
"manual": "r15",
"prefix": "r",
"from_pattern": r"r(\d+)",
"to_pattern": r"R\1",
}) == "15"

View file

@ -1,32 +1,32 @@
# MIT licensed
# Copyright (c) 2020,2024 lilydjwg <lilydjwg@gmail.com>, et al.
# Copyright (c) 2020 lilydjwg <lilydjwg@gmail.com>, et al.
# Copyright (c) 2017 Felix Yan <felixonmars@archlinux.org>, et al.
from flaky import flaky
import pytest
pytestmark = [pytest.mark.asyncio, pytest.mark.needs_net]
@pytest.mark.flaky
@flaky
async def test_ubuntupkg(get_version):
v = await get_version("sigrok-firmware-fx2lafw", {
assert await get_version("sigrok-firmware-fx2lafw", {
"source": "ubuntupkg",
})
assert v.startswith("0.1.7-")
}) == "0.1.7-1"
@pytest.mark.flaky
@flaky
async def test_ubuntupkg_strip_release(get_version):
assert await get_version("sigrok-firmware-fx2lafw", {
"source": "ubuntupkg",
"strip_release": True,
}) == "0.1.7"
@pytest.mark.flaky
@flaky
async def test_ubuntupkg_suite(get_version):
assert await get_version("sigrok-firmware-fx2lafw", {
"source": "ubuntupkg",
"suite": "xenial",
}) == "0.1.2-1"
@pytest.mark.flaky(reruns=10)
@flaky
async def test_ubuntupkg_suite_with_paging(get_version):
assert await get_version("ffmpeg", {
"source": "ubuntupkg",

View file

@ -1,7 +1,7 @@
[tox]
isolated_build = True
# you may find `tox --skip-missing-interpreters=true` helpful.
envlist = py3{8,9,10,11,12}
envlist = py3{7,8,9,10}
[testenv]
usedevelop = false