AsyncIO rewrite

This commit is contained in:
Felix Yan 2017-07-04 17:04:29 +08:00
parent c423bdee0a
commit 84df2716b5
46 changed files with 352 additions and 503 deletions

View file

@ -1,17 +1,15 @@
sudo: false
language: python
python:
- "3.4"
- "3.5"
- "3.6"
- "nightly"
# ptr requires Python >= 3.3 but pypy3 reports 3.2
# - "pypy3"
install: pip install -U pytest pytest-runner
script: python setup.py pytest
install: pip install -U aiohttp pytest pytest-asyncio pytest-xdist flaky
script: pytest
env:
global:
- ASYNC_TEST_TIMEOUT=20
# github
- secure: "JNuxbHbO+Qj88r0So+FKp8GBVmobGlBNi0hkZIyOH4cBXtuiM1Jo6FtRYInfTUH5TcgfMQml1a8p9g8n1fbRcTsxPt3kkT0ZleW1fJNudOHJFOmDooM4gC2/A+6aMl3xdnLCQ9cXxqsXjIUBie3GhqC4ufInU7VshxOn7KZADbI3zDuLuw9gdsBQf/OADY4oO3y1URxdnWjssP8pwfDFRSEkuLKNDtsYrhkmp3jRAq5DMtMXTEyHly9CJHow7yMyoBHa6Q/J7+C57pI4JsO8c0nJWy/wQUnqw9EeLE/9gAHY1sHlEpjZtJrV45kRd+KC6x4FtoFjvngxymK2A0zmecBI3DRTWBAZedPPVatAD9nlDmwAacBtwvuZJkt6fMUBWMY1I1NEiwdYxceBiqrnvU48FfNOylXE6KuarCQZik/VWk8olIQjXIukMu8EQ58pnEuLZB7wbwNzMLheomuVMEK1nfLOltKaytztl/7cKlsx6SmxY5rQI/x7QInd+rq9OxDDwCo+jEofPKvAcCbUJj6SqfB7QAUxJwwD/ER4/Bji9KSz3BoCu+x7h/ILcskNqLlg4LDCcpxqMOyxePk7A30sSop1E5YLWo0lmS9s88mEz89tzCWSDVIzwQrdMghNBe6JFMzOoKDRDhEkMrs3MAK+FUJkbteGhHrdC86EidU="
# gitlab

View file

@ -45,9 +45,8 @@ Contents
Dependency
==========
- Python 3
- Tornado
- Optional pycurl
- Python 3.5+
- aiohttp
- All commands used in your version source files
Running
@ -132,7 +131,7 @@ regex
When multiple version strings are found, the maximum of those is chosen.
proxy
The HTTP proxy to use. The format is ``host:port``, e.g. ``localhost:8087``. This requires `pycurl <http://pycurl.sourceforge.net/>`_.
The HTTP proxy to use. The format is ``host:port``, e.g. ``localhost:8087``.
user_agent
The ``User-Agent`` header value to use. Use something more like a tool (e.g. ``curl/7.40.0``) in Europe or the real web page won't get through because cookie policies (SourceForge has this issue).
@ -191,7 +190,7 @@ sort_version_key
``pkg_resources.parse_version``. ``vercmp`` use ``pyalpm.vercmp``.
proxy
The HTTP proxy to use. The format is ``host:port``, e.g. ``localhost:8087``. This requires `pycurl <http://pycurl.sourceforge.net/>`_.
The HTTP proxy to use. The format is ``host:port``, e.g. ``localhost:8087``.
An environment variable ``NVCHECKER_GITHUB_TOKEN`` can be set to a GitHub OAuth token in order to request more frequently than anonymously.
@ -289,7 +288,7 @@ cpan
The name used on CPAN, e.g. ``YAML``.
proxy
The HTTP proxy to use. The format is ``host:port``, e.g. ``localhost:8087``. This requires `pycurl <http://pycurl.sourceforge.net/>`_.
The HTTP proxy to use. The format is ``host:port``, e.g. ``localhost:8087``.
Check Packagist
---------------

View file

@ -1,4 +1,4 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
__version__ = '0.4.5dev'
__version__ = '0.5dev'

View file

@ -6,11 +6,11 @@ import os
import sys
import logging
import configparser
from tornado.stack_context import ExceptionStackContext
import asyncio
from .lib import nicelogger
from .get_version import get_version
from .source import session
from . import __version__
@ -60,9 +60,8 @@ def write_verfile(file, versions):
safe_overwrite(file, data, method='writelines')
class Source:
started = False
tasks = 0
oldver = newver = None
def __init__(self, file):
self.config = config = configparser.ConfigParser(
dict_type=dict, allow_no_value=True
@ -72,62 +71,51 @@ class Source:
if '__config__' in config:
c = config['__config__']
d = os.path.dirname(file.name)
self.oldver = os.path.expandvars(os.path.expanduser(os.path.join(d, c.get('oldver'))))
self.newver = os.path.expandvars(os.path.expanduser(os.path.join(d, c.get('newver'))))
self.oldver = os.path.expandvars(os.path.expanduser(
os.path.join(d, c.get('oldver'))))
self.newver = os.path.expandvars(os.path.expanduser(
os.path.join(d, c.get('newver'))))
def check(self):
self.started = True
session.nv_config = config["__config__"]
async def check(self):
if self.oldver:
self.oldvers = read_verfile(self.oldver)
else:
self.oldvers = {}
self.curvers = self.oldvers.copy()
futures = []
config = self.config
for name in config.sections():
if name == '__config__':
continue
self.task_inc()
conf = config[name]
conf['oldver'] = self.oldvers.get(name, None)
with ExceptionStackContext(self._handle_exception):
get_version(name, conf, self.print_version_update)
futures.append(get_version(name, conf))
def _handle_exception(self, type, value, traceback):
self.task_dec()
raise value.with_traceback(traceback)
for fu in asyncio.as_completed(futures):
try:
name, version = await fu
if version is not None:
self.print_version_update(name, version)
except Exception:
logger.exception('error happened dealing with %s', name)
def task_inc(self):
self.tasks += 1
def task_dec(self):
self.tasks -= 1
if self.tasks == 0 and self.started:
if self.newver:
write_verfile(self.newver, self.curvers)
self.on_finish()
if self.newver:
write_verfile(self.newver, self.curvers)
def print_version_update(self, name, version):
try:
if version is None:
return
oldver = self.oldvers.get(name, None)
if not oldver or oldver != version:
logger.info('%s updated version %s', name, version)
self.curvers[name] = version
self.on_update(name, version, oldver)
else:
logger.debug('%s current version %s', name, version)
finally:
self.task_dec()
oldver = self.oldvers.get(name, None)
if not oldver or oldver != version:
logger.info('%s updated to %s', name, version)
self.curvers[name] = version
self.on_update(name, version, oldver)
else:
logger.debug('%s current %s', name, version)
def on_update(self, name, version, oldver):
pass
def on_finish(self):
pass
def __repr__(self):
return '<Source from %r>' % self.name

View file

@ -11,12 +11,11 @@ handler_precedence = (
'cratesio', 'npm', 'hackage', 'cpan', 'gitlab', 'packagist'
)
def get_version(name, conf, callback):
async def get_version(name, conf):
for key in handler_precedence:
if key in conf:
func = import_module('.source.' + key, __package__).get_version
func(name, conf, callback)
break
return await func(name, conf)
else:
logger.error('%s: no idea to get version info.', name)
callback(name, None)
return name, None

View file

@ -5,7 +5,7 @@
import logging
import argparse
from tornado.ioloop import IOLoop
import asyncio
from .lib import notify
from . import core
@ -21,9 +21,6 @@ class Source(core.Source):
notifications.append(msg)
notify.update('nvchecker', '\n'.join(notifications))
def on_finish(self):
IOLoop.instance().stop()
def main():
global args
@ -37,11 +34,11 @@ def main():
if not args.file:
return
s = Source(args.file)
ioloop = IOLoop.instance()
ioloop.add_callback(s.check)
ioloop.start()
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(s.check())
if __name__ == '__main__':
main()

View file

@ -1,4 +1,18 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
from .base import *
import atexit
import aiohttp
connector = aiohttp.TCPConnector(limit=20)
config = None
class BetterClientSession(aiohttp.ClientSession):
async def _request(self, *args, **kwargs):
if hasattr(self, "nv_config") and self.nv_config.get("proxy"):
kwargs.setdefault("proxy", self.nv_config.get("proxy"))
return await super(BetterClientSession, self)._request(*args, **kwargs)
session = BetterClientSession(connector=connector, read_timeout=10, conn_timeout=5)
atexit.register(session.close)

View file

@ -1,33 +1,22 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
from functools import partial
import logging
import json
from tornado.httpclient import AsyncHTTPClient
from . import session
logger = logging.getLogger(__name__)
URL = 'https://www.archlinux.org/packages/search/json/?name='
URL = 'https://www.archlinux.org/packages/search/json/'
def get_version(name, conf, callback):
async def get_version(name, conf):
pkg = conf.get('archpkg') or name
strip_release = conf.getboolean('strip-release', False)
url = URL + pkg
AsyncHTTPClient().fetch(
url, partial(_pkg_done, name, strip_release, callback))
def _pkg_done(name, strip_release, callback, res):
if res.error:
raise res.error
data = json.loads(res.body.decode('utf-8'))
async with session.get(URL, params={"name": pkg}) as res:
data = await res.json()
if not data['results']:
logger.error('Arch package not found: %s', name)
callback(name, None)
return
return name, None
r = [r for r in data['results'] if r['repo'] != 'testing'][0]
if strip_release:
@ -35,4 +24,4 @@ def _pkg_done(name, strip_release, callback, res):
else:
version = r['pkgver'] + '-' + r['pkgrel']
callback(name, version)
return name, version

View file

@ -1,36 +1,24 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
from functools import partial
import json
import logging
from tornado.httpclient import AsyncHTTPClient
from tornado.escape import url_escape
from . import session
AUR_URL = 'https://aur.archlinux.org/rpc/?v=5&type=info&arg[]='
logger = logging.getLogger(__name__)
def get_version(name, conf, callback):
async def get_version(name, conf):
aurname = conf.get('aur') or name
strip_release = conf.getboolean('strip-release', False)
url = AUR_URL + url_escape(aurname)
AsyncHTTPClient().fetch(
url, partial(_aur_done, name, strip_release, callback))
def _aur_done(name, strip_release, callback, res):
if res.error:
raise res.error
data = json.loads(res.body.decode('utf-8'))
async with session.get(AUR_URL, params={"v": 5, "type": "info", "arg[]": aurname}) as res:
data = await res.json()
if not data['results']:
logger.error('AUR upstream not found for %s', name)
callback(name, None)
return
return name, None
version = data['results'][0]['Version']
if strip_release and '-' in version:
version = version.rsplit('-', 1)[0]
callback(name, version)
return name, version

View file

@ -1,10 +0,0 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
from tornado.httpclient import AsyncHTTPClient
try:
import pycurl
AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient")
except ImportError:
pycurl = None

View file

@ -1,11 +1,7 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
import os
import json
from functools import partial
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
from aiohttp import request
from ..sortversion import sort_version_keys
@ -13,7 +9,7 @@ from ..sortversion import sort_version_keys
BITBUCKET_URL = 'https://bitbucket.org/api/2.0/repositories/%s/commits/%s'
BITBUCKET_MAX_TAG = 'https://bitbucket.org/api/1.0/repositories/%s/tags'
def get_version(name, conf, callback):
async def get_version(name, conf):
repo = conf.get('bitbucket')
br = conf.get('branch', '')
use_max_tag = conf.getboolean('use_max_tag', False)
@ -23,16 +19,12 @@ def get_version(name, conf, callback):
url = BITBUCKET_MAX_TAG % repo
else:
url = BITBUCKET_URL % (repo, br)
request = HTTPRequest(url, user_agent='lilydjwg/nvchecker')
AsyncHTTPClient().fetch(request,
callback=partial(_bitbucket_done, name, use_max_tag, ignored_tags, sort_version_key, callback))
def _bitbucket_done(name, use_max_tag, ignored_tags, sort_version_key, callback, res):
data = json.loads(res.body.decode('utf-8'))
async with request("GET", url) as res:
data = await res.json()
if use_max_tag:
data = [tag for tag in data if tag not in ignored_tags]
data.sort(key=sort_version_key)
version = data[-1]
else:
version = data['values'][0]['date'].split('T', 1)[0].replace('-', '')
callback(name, version)
return name, version

View file

@ -1,44 +1,18 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
import queue
import logging
from functools import partial
import tornado.process
from tornado.ioloop import IOLoop
import asyncio
logger = logging.getLogger(__name__)
cmd_q = queue.Queue()
cmd_q.running = False
def get_version(name, conf, callback):
async def get_version(name, conf):
cmd = conf['cmd']
cmd_q.put((name, cmd, callback))
if not cmd_q.running:
_run_command()
p = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE)
def _run_command():
cmd_q.running = True
try:
name, cmd, callback = cmd_q.get_nowait()
except queue.Empty:
cmd_q.running = False
return
p = tornado.process.Subprocess(cmd, shell=True,
stdout=tornado.process.Subprocess.STREAM)
p.set_exit_callback(partial(_command_done, name, callback, p))
def _command_done(name, callback, process, status):
if status != 0:
logger.error('%s: command exited with %d.', name, status)
callback(name, None)
else:
process.stdout.read_until_close(partial(_got_version_from_cmd, callback, name))
_run_command()
def _got_version_from_cmd(callback, name, output):
output = output.strip().decode('latin1')
callback(name, output)
output = (await p.communicate())[0].strip().decode('latin1')
if p.returncode != 0:
logger.error('%s: command exited with %d.', name, p.returncode)
return name, None
return name, output

View file

@ -2,24 +2,13 @@
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
import os
import json
from functools import partial
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
from ..sortversion import sort_version_keys
from . import session
API_URL = 'https://crates.io/api/v1/crates/%s'
def get_version(name, conf, callback):
async def get_version(name, conf):
name = conf.get('cratesio') or name
request = HTTPRequest(API_URL % name, user_agent='lilydjwg/nvchecker')
AsyncHTTPClient().fetch(
request,
callback = partial(_cratesio_done, name, callback),
)
def _cratesio_done(name, callback, res):
data = json.loads(res.body.decode('utf-8'))
async with session.get(API_URL % name) as res:
data = await res.json()
version = [v['num'] for v in data['versions'] if not v['yanked']][0]
callback(name, version)
return name, version

View file

@ -1,34 +1,24 @@
# MIT licensed
# Copyright (c) 2017 Felix Yan <felixonmars@archlinux.org>, et al.
from functools import partial
import logging
import json
from tornado.httpclient import AsyncHTTPClient
from . import session
logger = logging.getLogger(__name__)
URL = 'https://sources.debian.net/api/src/%(pkgname)s/?suite=%(suite)s'
def get_version(name, conf, callback):
async def get_version(name, conf):
pkg = conf.get('debianpkg') or name
strip_release = conf.getboolean('strip-release', False)
suite = conf.get('suite') or "sid"
url = URL % {"pkgname": pkg, "suite": suite}
AsyncHTTPClient().fetch(
url, partial(_pkg_done, name, strip_release, callback))
def _pkg_done(name, strip_release, callback, res):
if res.error:
raise res.error
data = json.loads(res.body.decode('utf-8'))
async with session.get(url) as res:
data = await res.json()
if not data.get('versions'):
logger.error('Debian package not found: %s', name)
callback(name, None)
return
return name, None
r = data['versions'][0]
if strip_release:
@ -36,4 +26,4 @@ def _pkg_done(name, strip_release, callback, res):
else:
version = r['version']
callback(name, version)
return name, version

View file

@ -4,9 +4,7 @@
import re
import time
import logging
from functools import partial
from tornado.httpclient import AsyncHTTPClient
from . import session
logger = logging.getLogger(__name__)
@ -14,14 +12,11 @@ GCODE_URL = 'https://code.google.com/p/%s/source/list'
GCODE_HG_RE = re.compile(
r'<a onclick="cancelBubble=true" href="detail\?r=[0-9a-f]+">([^<]+)</a>')
def get_version(name, conf, callback):
async def get_version(name, conf):
repo = conf.get('gcode_hg') or name
url = GCODE_URL % repo
AsyncHTTPClient().fetch(url, user_agent='lilydjwg/nvchecker',
callback=partial(_gcodehg_done, name, callback))
def _gcodehg_done(name, callback, res):
data = res.body.decode('utf-8')
async with session.get(url) as res:
data = await res.text()
m = GCODE_HG_RE.search(data)
if m:
t = time.strptime(m.group(1), '%b %d, %Y')
@ -29,4 +24,4 @@ def _gcodehg_done(name, callback, res):
else:
logger.error('%s: version not found.', name)
version = None
callback(name, version)
return name, version

View file

@ -3,27 +3,22 @@
import re
import logging
from functools import partial
from tornado.httpclient import AsyncHTTPClient
from . import session
logger = logging.getLogger(__name__)
GCODE_URL = 'https://code.google.com/p/%s/source/list'
GCODE_SVN_RE = re.compile(r'<a href="detail\?r=\d+">r(\d+)</a>')
def get_version(name, conf, callback):
async def get_version(name, conf):
repo = conf.get('gcode_svn') or name
url = GCODE_URL % repo
AsyncHTTPClient().fetch(url, user_agent='lilydjwg/nvchecker',
callback=partial(_gcodehg_done, name, callback))
def _gcodehg_done(name, callback, res):
data = res.body.decode('utf-8')
async with session.get(url) as res:
data = await res.text()
m = GCODE_SVN_RE.search(data)
if m:
version = m.group(1)
else:
logger.error('%s: version not found.', name)
version = None
callback(name, version)
return name, version

View file

@ -2,19 +2,15 @@
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
import os
import json
from functools import partial
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
from .base import pycurl
from . import session
from ..sortversion import sort_version_keys
GITHUB_URL = 'https://api.github.com/repos/%s/commits?sha=%s'
GITHUB_LATEST_RELEASE = 'https://api.github.com/repos/%s/releases/latest'
GITHUB_MAX_TAG = 'https://api.github.com/repos/%s/tags'
def get_version(name, conf, callback):
async def get_version(name, conf):
repo = conf.get('github')
br = conf.get('branch', 'master')
use_latest_release = conf.getboolean('use_latest_release', False)
@ -33,17 +29,9 @@ def get_version(name, conf, callback):
kwargs = {}
if conf.get('proxy'):
if pycurl:
kwargs['proxy_host'] = "".join(conf['proxy'].split(':')[:-1])
kwargs['proxy_port'] = int(conf['proxy'].split(':')[-1])
else:
logger.warn('%s: proxy set but not used because pycurl is unavailable.', name)
request = HTTPRequest(url, headers=headers, user_agent='lilydjwg/nvchecker', **kwargs)
AsyncHTTPClient().fetch(request,
callback=partial(_github_done, name, use_latest_release, use_max_tag, ignored_tags, sort_version_key, callback))
def _github_done(name, use_latest_release, use_max_tag, ignored_tags, sort_version_key, callback, res):
data = json.loads(res.body.decode('utf-8'))
kwargs["proxy"] = conf.get("proxy")
async with session.get(url, headers=headers, **kwargs) as res:
data = await res.json()
if use_latest_release:
version = data['tag_name']
elif use_max_tag:
@ -54,4 +42,4 @@ def _github_done(name, use_latest_release, use_max_tag, ignored_tags, sort_versi
# YYYYMMDD.HHMMSS
version = data[0]['commit']['committer']['date'] \
.rstrip('Z').replace('-', '').replace(':', '').replace('T', '.')
callback(name, version)
return name, version

View file

@ -2,13 +2,10 @@
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
import os
import json
from functools import partial
import logging
import urllib.parse
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
from . import session
from ..sortversion import sort_version_keys
GITLAB_URL = 'https://%s/api/v3/projects/%s/repository/commits?ref_name=%s'
@ -16,7 +13,7 @@ GITLAB_MAX_TAG = 'https://%s/api/v3/projects/%s/repository/tags'
logger = logging.getLogger(__name__)
def get_version(name, conf, callback):
async def get_version(name, conf):
repo = urllib.parse.quote_plus(conf.get('gitlab'))
br = conf.get('branch', 'master')
host = conf.get('host', "gitlab.com")
@ -28,8 +25,7 @@ def get_version(name, conf, callback):
token = conf.get('token', os.environ.get(env_name, None))
if token is None:
logger.error('%s: No gitlab token specified.', name)
callback(name, None)
return
return name, None
if use_max_tag:
url = GITLAB_MAX_TAG % (host, repo)
@ -37,16 +33,12 @@ def get_version(name, conf, callback):
url = GITLAB_URL % (host, repo, br)
headers = {"PRIVATE-TOKEN": token}
request = HTTPRequest(url, headers=headers, user_agent='lilydjwg/nvchecker')
AsyncHTTPClient().fetch(request,
callback=partial(_gitlab_done, name, use_max_tag, ignored_tags, sort_version_key, callback))
def _gitlab_done(name, use_max_tag, ignored_tags, sort_version_key, callback, res):
data = json.loads(res.body.decode('utf-8'))
async with session.get(url, headers=headers) as res:
data = await res.json()
if use_max_tag:
data = [tag["name"] for tag in data if tag["name"] not in ignored_tags]
data.sort(key=sort_version_key)
version = data[-1]
else:
version = data[0]['created_at'].split('T', 1)[0].replace('-', '')
callback(name, version)
return name, version

View file

@ -1,5 +1,5 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
def get_version(name, conf, callback):
callback(name, conf.get('manual'))
async def get_version(name, conf):
return name, conf.get('manual')

View file

@ -3,15 +3,14 @@
from . import cmd
def get_version(name, conf, callback):
async def get_version(name, conf):
referree = conf.get('pacman') or name
c = "LANG=C pacman -Si %s | grep -F Version | awk '{print $3}'" % referree
conf['cmd'] = c
strip_release = conf.getboolean('strip-release', False)
def callback_wrapper(name, version):
strip_release = conf.getboolean('strip-release', False)
if strip_release and '-' in version:
version = version.rsplit('-', 1)[0]
callback(name, version)
_, version = await cmd.get_version(name, conf)
cmd.get_version(name, conf, callback_wrapper)
if strip_release and '-' in version:
version = version.rsplit('-', 1)[0]
return name, version

View file

@ -4,50 +4,36 @@
import re
import sre_constants
import logging
import urllib.parse
from functools import partial
from tornado.httpclient import AsyncHTTPClient
from .base import pycurl
from . import session
from ..sortversion import sort_version_keys
logger = logging.getLogger(__name__)
def get_version(name, conf, callback):
async def get_version(name, conf):
try:
r = re.compile(conf['regex'])
regex = re.compile(conf['regex'])
except sre_constants.error:
logger.warn('%s: bad regex, skipped.', name, exc_info=True)
callback(name, None)
return
return name, None
encoding = conf.get('encoding', 'latin1')
httpclient = AsyncHTTPClient()
kwargs = {}
headers = {}
if conf.get('proxy'):
if pycurl:
host, port = urllib.parse.splitport(conf['proxy'])
kwargs['proxy_host'] = host
kwargs['proxy_port'] = int(port)
else:
logger.warn('%s: proxy set but not used because pycurl is unavailable.', name)
kwargs["proxy"] = conf.get("proxy")
if conf.get('user_agent'):
kwargs['user_agent'] = conf['user_agent']
headers['User-Agent'] = conf['user_agent']
sort_version_key = sort_version_keys[conf.get("sort_version_key", "parse_version")]
httpclient.fetch(conf['url'], partial(
_got_version, name, r, encoding, sort_version_key, callback
), **kwargs)
def _got_version(name, regex, encoding, sort_version_key, callback, res):
version = None
try:
body = res.body.decode(encoding)
async with session.get(conf['url'], headers=headers, **kwargs) as res:
version = None
try:
version = max(regex.findall(body), key=sort_version_key)
except ValueError:
logger.error('%s: version string not found.', name)
finally:
callback(name, version)
body = (await res.read()).decode(encoding)
try:
version = max(regex.findall(body), key=sort_version_key)
except ValueError:
logger.error('%s: version string not found.', name)
finally:
return name, version

View file

@ -1,32 +1,20 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
import json
from functools import partial
from tornado.httpclient import AsyncHTTPClient
from .base import pycurl
from . import session
def simple_json(urlpat, confkey, version_from_json):
def get_version(name, conf, callback):
async def get_version(name, conf):
repo = conf.get(confkey) or name
url = urlpat % repo
kwargs = {}
if conf.get('proxy'):
if pycurl:
kwargs['proxy_host'] = "".join(conf['proxy'].split(':')[:-1])
kwargs['proxy_port'] = int(conf['proxy'].split(':')[-1])
else:
logger.warn('%s: proxy set but not used because pycurl is unavailable.', name)
kwargs["proxy"] = conf.get('proxy')
AsyncHTTPClient().fetch(url, user_agent='lilydjwg/nvchecker',
callback=partial(_json_done, name, callback), **kwargs)
def _json_done(name, callback, res):
data = json.loads(res.body.decode('utf-8'))
async with session.get(url, **kwargs) as res:
data = await res.json()
version = version_from_json(data)
callback(name, version)
return name, version
return get_version

View file

@ -2,11 +2,7 @@
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
import logging
from functools import partial
import tornado.process
from tornado.ioloop import IOLoop
import asyncio
from pkg_resources import parse_version
import os.path as _path
@ -30,7 +26,7 @@ def _parse_oldver(oldver):
return PROT_VER, 0, ver
return PROT_VER, count, ver
def get_version(name, conf, callback):
async def get_version(name, conf):
vcs = conf['vcs']
use_max_tag = conf.getboolean('use_max_tag', False)
ignored_tags = conf.get("ignored_tags", "").split()
@ -38,33 +34,21 @@ def get_version(name, conf, callback):
cmd = _cmd_prefix + [name, vcs]
if use_max_tag:
cmd += ["get_tags"]
p = tornado.process.Subprocess(cmd, io_loop=IOLoop.instance(),
stdout=tornado.process.Subprocess.STREAM)
p.set_exit_callback(partial(_command_done, name, oldver, use_max_tag, ignored_tags, callback, p))
p = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE)
def _command_done(name, oldver, use_max_tag, ignored_tags, callback, process, status):
if status != 0:
logger.error('%s: command exited with %d.', name, status)
callback(name, None)
output = (await p.communicate())[0].strip().decode('latin1')
if p.returncode != 0:
logger.error('%s: command exited with %d.', name, p.returncode)
return name, None
else:
if use_max_tag:
process.stdout.read_until_close(partial(_got_tags_from_cmd,
callback, name, ignored_tags))
data = [tag for tag in output.split("\n") if tag not in ignored_tags]
data.sort(key=parse_version)
version = data[-1]
return name, version
else:
process.stdout.read_until_close(partial(_got_version_from_cmd,
callback, name, oldver))
def _got_tags_from_cmd(callback, name, ignored_tags, output):
output = output.strip().decode('latin1')
data = [tag for tag in output.split("\n") if tag not in ignored_tags]
data.sort(key=parse_version)
version = data[-1]
callback(name, version)
def _got_version_from_cmd(callback, name, oldver_str, output):
output = output.strip().decode('latin1')
oldver = _parse_oldver(oldver_str)
if output == oldver[2]:
callback(name, None)
else:
callback(name, "%d.%d.%s" % (oldver[0], oldver[1] + 1, output))
oldver = _parse_oldver(oldver)
if output == oldver[2]:
return name, None
else:
return name, "%d.%d.%s" % (oldver[0], oldver[1] + 1, output)

5
setup.cfg Normal file
View file

@ -0,0 +1,5 @@
[flake8]
ignore = E111, E302, E501
[tool:pytest]
# addopts = -n auto

View file

@ -18,9 +18,11 @@ setup(
zip_safe = True,
packages = find_packages(exclude=["tests"]),
install_requires = ['tornado>=4.1', 'setuptools'],
install_requires = ['aiohttp', 'setuptools'],
tests_require = [
'pytest',
'pytest-asyncio',
'pytest-xdist',
'flaky',
],
entry_points = {
@ -42,8 +44,8 @@ setup(
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
"Topic :: Internet",
"Topic :: Internet :: WWW/HTTP",
"Topic :: Software Development",

26
tests/conftest.py Normal file
View file

@ -0,0 +1,26 @@
import configparser
import pytest
import asyncio
from nvchecker.get_version import get_version as _get_version
@pytest.fixture(scope="module")
async def get_version():
async def __call__(name, config):
if isinstance(config, dict):
_config = configparser.ConfigParser(dict_type=dict, allow_no_value=True)
_config.read_dict({name: config})
config = _config[name]
return (await _get_version(name, config))[1]
return __call__
@pytest.yield_fixture(scope="module")
def event_loop(request):
"""Override pytest-asyncio's event_loop fixture,
Don't create an instance of the default event loop for each test case.
"""
loop = asyncio.get_event_loop()
yield loop

View file

@ -1,24 +0,0 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
import configparser
from tornado.ioloop import IOLoop
import tornado.testing
from nvchecker.get_version import get_version
class ExternalVersionTestCase(tornado.testing.AsyncTestCase):
def get_new_ioloop(self):
return IOLoop.instance()
def sync_get_version(self, name, config):
def get_version_callback(name, version):
self.stop(version)
if isinstance(config, dict):
_config = configparser.ConfigParser(dict_type=dict, allow_no_value=True)
_config.read_dict({name: config})
config = _config[name]
get_version(name, config, get_version_callback)
return self.wait()

View file

@ -1,17 +1,14 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
import os
from flaky import flaky
import pytest
pytestmark = pytest.mark.asyncio
from tests.helper import ExternalVersionTestCase
@flaky
async def test_archpkg(get_version):
assert await get_version("ipw2100-fw", {"archpkg": None}) == "1.3-8"
@pytest.mark.skipif("TRAVIS" in os.environ,
reason="Travis-CI has issues connecting to the Arch website")
class ArchPKGTest(ExternalVersionTestCase):
def test_archpkg(self):
self.assertEqual(self.sync_get_version("ipw2100-fw", {"archpkg": None}), "1.3-8")
def test_archpkg_strip_release(self):
self.assertEqual(self.sync_get_version("ipw2100-fw", {"archpkg": None, "strip-release": 1}), "1.3")
@flaky
async def test_archpkg_strip_release(get_version):
assert await get_version("ipw2100-fw", {"archpkg": None, "strip-release": 1}) == "1.3"

View file

@ -1,12 +1,14 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
from tests.helper import ExternalVersionTestCase
from flaky import flaky
import pytest
pytestmark = pytest.mark.asyncio
@flaky
async def test_aur(get_version):
assert await get_version("asciidoc-fake", {"aur": None}) == "1.0-1"
class AURTest(ExternalVersionTestCase):
def test_aur(self):
self.assertEqual(self.sync_get_version("asciidoc-fake", {"aur": None}), "1.0-1")
def test_aur_strip_release(self):
self.assertEqual(self.sync_get_version("asciidoc-fake", {"aur": None, "strip-release": 1}), "1.0")
@flaky
async def test_aur_strip_release(get_version):
assert await get_version("asciidoc-fake", {"aur": None, "strip-release": 1}) == "1.0"

View file

@ -1,14 +1,14 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
from tests.helper import ExternalVersionTestCase
import pytest
pytestmark = pytest.mark.asyncio
class BitBucketTest(ExternalVersionTestCase):
def test_bitbucket(self):
self.assertEqual(self.sync_get_version("example", {"bitbucket": "prawee/git-tag"}), "20150303")
async def test_bitbucket(get_version):
assert await get_version("example", {"bitbucket": "prawee/git-tag"}) == "20150303"
def test_bitbucket_max_tag(self):
self.assertEqual(self.sync_get_version("example", {"bitbucket": "prawee/git-tag", "use_max_tag": 1}), "1.7.0")
async def test_bitbucket_max_tag(get_version):
assert await get_version("example", {"bitbucket": "prawee/git-tag", "use_max_tag": 1}) == "1.7.0"
def test_bitbucket_max_tag_with_ignored_tags(self):
self.assertEqual(self.sync_get_version("example", {"bitbucket": "prawee/git-tag", "use_max_tag": 1, "ignored_tags": "1.6.0 1.7.0"}), "v1.5")
async def test_bitbucket_max_tag_with_ignored_tags(get_version):
assert await get_version("example", {"bitbucket": "prawee/git-tag", "use_max_tag": 1, "ignored_tags": "1.6.0 1.7.0"}) == "v1.5"

View file

@ -1,9 +1,11 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
from tests.helper import ExternalVersionTestCase
import pytest
pytestmark = pytest.mark.asyncio
async def test_cmd(get_version):
assert await get_version("example", {"cmd": "echo Meow"}) == "Meow"
class CMDTest(ExternalVersionTestCase):
def test_cmd(self):
self.assertEqual(self.sync_get_version("example", {"cmd": "echo Meow"}), "Meow")
async def test_cmd_complex(get_version):
assert await get_version("example", {"cmd": "echo Meow | sed 's/meow/woof/i'"}) == "woof"

View file

@ -1,9 +1,8 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
from tests.helper import ExternalVersionTestCase
import pytest
pytestmark = pytest.mark.asyncio
class CPANTest(ExternalVersionTestCase):
def test_cpan(self):
self.assertEqual(self.sync_get_version("POE-Component-Server-HTTPServer", {"cpan": None}), "0.9.2")
async def test_cpan(get_version):
assert await get_version("POE-Component-Server-HTTPServer", {"cpan": None}) == "0.9.2"

View file

@ -1,11 +1,8 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
from tests.helper import ExternalVersionTestCase
import pytest
pytestmark = pytest.mark.asyncio
class CratesIOTest(ExternalVersionTestCase):
def test_npm(self):
self.assertEqual(
self.sync_get_version("example", {"cratesio": None}),
"0.1.0")
async def test_cratesio(get_version):
assert await get_version("example", {"cratesio": None}) == "0.1.0"

View file

@ -1,16 +1,14 @@
# MIT licensed
# Copyright (c) 2017 Felix Yan <felixonmars@archlinux.org>, et al.
import os
import pytest
pytestmark = pytest.mark.asyncio
from tests.helper import ExternalVersionTestCase
async def test_debianpkg(get_version):
assert await get_version("sigrok-firmware-fx2lafw", {"debianpkg": None}) == "0.1.3-1"
class DebianPKGTest(ExternalVersionTestCase):
def test_debianpkg(self):
self.assertEqual(self.sync_get_version("sigrok-firmware-fx2lafw", {"debianpkg": None}), "0.1.3-1")
async def test_debianpkg_strip_release(get_version):
assert await get_version("sigrok-firmware-fx2lafw", {"debianpkg": None, "strip-release": 1}) == "0.1.3"
def test_debianpkg_strip_release(self):
self.assertEqual(self.sync_get_version("sigrok-firmware-fx2lafw", {"debianpkg": None, "strip-release": 1}), "0.1.3")
def test_debianpkg_suite(self):
self.assertEqual(self.sync_get_version("sigrok-firmware-fx2lafw", {"debianpkg": None, "suite": "jessie"}), "0.1.2-1")
async def test_debianpkg_suite(get_version):
assert await get_version("sigrok-firmware-fx2lafw", {"debianpkg": None, "suite": "jessie"}) == "0.1.2-1"

View file

@ -1,9 +1,8 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
from tests.helper import ExternalVersionTestCase
import pytest
pytestmark = pytest.mark.asyncio
class RubyGemsTest(ExternalVersionTestCase):
def test_gems(self):
self.assertEqual(self.sync_get_version("example", {"gems": None}), "1.0.2")
async def test_gems(get_version):
assert await get_version("example", {"gems": None}) == "1.0.2"

View file

@ -3,20 +3,18 @@
import os
import pytest
from tests.helper import ExternalVersionTestCase
pytestmark = [pytest.mark.asyncio,
pytest.mark.skipif("NVCHECKER_GITHUB_TOKEN" not in os.environ,
reason="requires NVCHECKER_GITHUB_TOKEN, or it fails too much")]
async def test_github(get_version):
assert await get_version("example", {"github": "harry-sanabria/ReleaseTestRepo"}) == "20140122.012101"
@pytest.mark.skipif("NVCHECKER_GITHUB_TOKEN" not in os.environ,
reason="requires NVCHECKER_GITHUB_TOKEN, or it fails too much")
class GitHubTest(ExternalVersionTestCase):
def test_github(self):
self.assertEqual(self.sync_get_version("example", {"github": "harry-sanabria/ReleaseTestRepo"}), "20140122.012101")
async def test_github_latest_release(get_version):
assert await get_version("example", {"github": "harry-sanabria/ReleaseTestRepo", "use_latest_release": 1}) == "release3"
def test_github_latest_release(self):
self.assertEqual(self.sync_get_version("example", {"github": "harry-sanabria/ReleaseTestRepo", "use_latest_release": 1}), "release3")
async def test_github_max_tag(get_version):
assert await get_version("example", {"github": "harry-sanabria/ReleaseTestRepo", "use_max_tag": 1}) == "second_release"
def test_github_max_tag(self):
self.assertEqual(self.sync_get_version("example", {"github": "harry-sanabria/ReleaseTestRepo", "use_max_tag": 1}), "second_release")
def test_github_max_tag_with_ignored_tags(self):
self.assertEqual(self.sync_get_version("example", {"github": "harry-sanabria/ReleaseTestRepo", "use_max_tag": 1, "ignored_tags": "second_release release3"}), "first_release")
async def test_github_max_tag_with_ignored_tags(get_version):
assert await get_version("example", {"github": "harry-sanabria/ReleaseTestRepo", "use_max_tag": 1, "ignored_tags": "second_release release3"}) == "first_release"

View file

@ -3,20 +3,18 @@
import os
import pytest
from tests.helper import ExternalVersionTestCase
pytestmark = [pytest.mark.asyncio,
pytest.mark.skipif("NVCHECKER_GITLAB_TOKEN_GITLAB_COM" not in os.environ,
reason="requires NVCHECKER_GITLAB_TOKEN_GITLAB_COM")]
async def test_gitlab(get_version):
ver = await get_version("example",
{"gitlab": "gitlab-org/gitlab-test"})
assert len(ver) == 8
assert ver.isdigit()
@pytest.mark.skipif("NVCHECKER_GITLAB_TOKEN_GITLAB_COM" not in os.environ,
reason="requires NVCHECKER_GITLAB_TOKEN_GITLAB_COM")
class GitLabTest(ExternalVersionTestCase):
def test_gitlab(self):
ver = self.sync_get_version("example",
{"gitlab": "gitlab-org/gitlab-test"})
self.assertEqual(len(ver), 8)
self.assertTrue(ver.isdigit())
async def test_gitlab_max_tag(get_version):
assert await get_version("example", {"gitlab": "gitlab-org/gitlab-test", "use_max_tag": 1}) == "v1.1.0"
def test_gitlab_max_tag(self):
self.assertEqual(self.sync_get_version("example", {"gitlab": "gitlab-org/gitlab-test", "use_max_tag": 1}), "v1.1.0")
def test_gitlab_max_tag_with_ignored_tags(self):
self.assertEqual(self.sync_get_version("example", {"gitlab": "gitlab-org/gitlab-test", "use_max_tag": 1, "ignored_tags": "v1.1.0"}), "v1.0.0")
async def test_gitlab_max_tag_with_ignored_tags(get_version):
assert await get_version("example", {"gitlab": "gitlab-org/gitlab-test", "use_max_tag": 1, "ignored_tags": "v1.1.0"}) == "v1.0.0"

View file

@ -1,9 +1,8 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
from tests.helper import ExternalVersionTestCase
import pytest
pytestmark = pytest.mark.asyncio
class HackageTest(ExternalVersionTestCase):
def test_hackage(self):
self.assertEqual(self.sync_get_version("sessions", {"hackage": None}), "2008.7.18")
async def test_hackage(get_version):
assert await get_version("sessions", {"hackage": None}) == "2008.7.18"

View file

@ -1,9 +1,8 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
from tests.helper import ExternalVersionTestCase
import pytest
pytestmark = pytest.mark.asyncio
class ManualTest(ExternalVersionTestCase):
def test_manual(self):
self.assertEqual(self.sync_get_version("example", {"manual": "Meow"}), "Meow")
async def test_manual(get_version):
assert await get_version("example", {"manual": "Meow"}) == "Meow"

View file

@ -1,9 +1,8 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
from tests.helper import ExternalVersionTestCase
import pytest
pytestmark = pytest.mark.asyncio
class NPMTest(ExternalVersionTestCase):
def test_npm(self):
self.assertEqual(self.sync_get_version("example", {"npm": None}), "0.0.0")
async def test_npm(get_version):
assert await get_version("example", {"npm": None}) == "0.0.0"

View file

@ -1,9 +1,8 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
from tests.helper import ExternalVersionTestCase
import pytest
pytestmark = pytest.mark.asyncio
class PackagistTest(ExternalVersionTestCase):
def test_packagist(self):
self.assertEqual(self.sync_get_version("butterfly/example-web-application", {"packagist": None}), "1.2.0")
async def test_packagist(get_version):
assert await get_version("butterfly/example-web-application", {"packagist": None}) == "1.2.0"

View file

@ -3,14 +3,12 @@
import shutil
import pytest
from tests.helper import ExternalVersionTestCase
pytestmark = [pytest.mark.asyncio,
pytest.mark.skipif(shutil.which("pacman") is None,
reason="requires pacman command")]
async def test_pacman(get_version):
assert await get_version("ipw2100-fw", {"pacman": None}) == "1.3-8"
@pytest.mark.skipif(shutil.which("pacman") is None,
reason="requires pacman command")
class PacmanTest(ExternalVersionTestCase):
def test_pacman(self):
self.assertEqual(self.sync_get_version("ipw2100-fw", {"pacman": None}), "1.3-8")
def test_pacman_strip_release(self):
self.assertEqual(self.sync_get_version("ipw2100-fw", {"pacman": None, "strip-release": 1}), "1.3")
async def test_pacman_strip_release(get_version):
assert await get_version("ipw2100-fw", {"pacman": None, "strip-release": 1}) == "1.3"

25
tests/test_proxy.py Normal file
View file

@ -0,0 +1,25 @@
# MIT licensed
# Copyright (c) 2017 Felix Yan <felixonmars@archlinux.org>, et al.
import aiohttp
import pytest
pytestmark = pytest.mark.asyncio
async def test_proxy(get_version, monkeypatch):
from nvchecker.source import session
async def fake_request(*args, proxy, **kwargs):
class fake_response():
async def read():
return proxy.encode("ascii")
def release():
pass
return fake_response
monkeypatch.setattr(session, "nv_config", {"proxy": "255.255.255.255:65535"}, raising=False)
monkeypatch.setattr(aiohttp.ClientSession, "_request", fake_request)
assert await get_version("example", {"regex": "(.+)", "url": "deadbeef"}) == "255.255.255.255:65535"
assert await get_version("example", {"regex": "(.+)", "url": "deadbeef", "proxy": "0.0.0.0:0"}) == "0.0.0.0:0"

View file

@ -1,9 +1,8 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
from tests.helper import ExternalVersionTestCase
import pytest
pytestmark = pytest.mark.asyncio
class PyPITest(ExternalVersionTestCase):
def test_pypi(self):
self.assertEqual(self.sync_get_version("example", {"pypi": None}), "0.1.0")
async def test_pypi(get_version):
assert await get_version("example", {"pypi": None}) == "0.1.0"

View file

@ -1,13 +1,12 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
from tests.helper import ExternalVersionTestCase
import pytest
pytestmark = pytest.mark.asyncio
class RegexTest(ExternalVersionTestCase):
def test_regex(self):
self.assertEqual(self.sync_get_version("example", {
"url": "https://httpbin.org/get",
"regex": '"User-Agent": "(\w+)"',
"user_agent": "Meow",
}), "Meow")
async def test_regex(get_version):
assert await get_version("example", {
"url": "https://httpbin.org/get",
"regex": '"User-Agent": "(\w+)"',
"user_agent": "Meow",
}) == "Meow"

View file

@ -4,30 +4,29 @@
import os
import shutil
import pytest
from tests.helper import ExternalVersionTestCase
pytestmark = pytest.mark.asyncio
class VCSTest(ExternalVersionTestCase):
@pytest.mark.skipif(shutil.which("git") is None,
reason="requires git command")
def test_git(self):
os.path.exists("example") or os.mkdir("example")
self.assertEqual(self.sync_get_version("example", {"vcs": "git+https://github.com/harry-sanabria/ReleaseTestRepo.git"}), "1.1.2b3cdf6134b07ae6ac77f11b586dc1ae6d1521db")
@pytest.mark.skipif(shutil.which("git") is None,
reason="requires git command")
async def test_git(get_version):
os.path.exists("example") or os.mkdir("example")
assert await get_version("example", {"vcs": "git+https://github.com/harry-sanabria/ReleaseTestRepo.git"}) == "1.1.2b3cdf6134b07ae6ac77f11b586dc1ae6d1521db"
@pytest.mark.skipif(shutil.which("hg") is None,
reason="requires hg command")
def test_mercurial(self):
os.path.exists("example") or os.mkdir("example")
self.assertEqual(self.sync_get_version("example", {"vcs": "hg+https://bitbucket.org/pil0t/testrepo"}), "1.1.84679e29c7d9")
@pytest.mark.skipif(shutil.which("hg") is None,
reason="requires hg command")
async def test_mercurial(get_version):
os.path.exists("example") or os.mkdir("example")
assert await get_version("example", {"vcs": "hg+https://bitbucket.org/pil0t/testrepo"}) == "1.1.84679e29c7d9"
@pytest.mark.skipif(shutil.which("git") is None,
reason="requires git command")
def test_git_max_tag(self):
os.path.exists("example") or os.mkdir("example")
self.assertEqual(self.sync_get_version("example", {"vcs": "git+https://github.com/harry-sanabria/ReleaseTestRepo.git", "use_max_tag": 1}), "second_release")
@pytest.mark.skipif(shutil.which("git") is None,
reason="requires git command")
async def test_git_max_tag(get_version):
os.path.exists("example") or os.mkdir("example")
assert await get_version("example", {"vcs": "git+https://github.com/harry-sanabria/ReleaseTestRepo.git", "use_max_tag": 1}) == "second_release"
@pytest.mark.skipif(shutil.which("git") is None,
reason="requires git command")
def test_git_max_tag_with_ignored_tags(self):
os.path.exists("example") or os.mkdir("example")
self.assertEqual(self.sync_get_version("example", {"vcs": "git+https://github.com/harry-sanabria/ReleaseTestRepo.git", "use_max_tag": 1, "ignored_tags": "second_release release3"}), "first_release")
@pytest.mark.skipif(shutil.which("git") is None,
reason="requires git command")
async def test_git_max_tag_with_ignored_tags(get_version):
os.path.exists("example") or os.mkdir("example")
assert await get_version("example", {"vcs": "git+https://github.com/harry-sanabria/ReleaseTestRepo.git", "use_max_tag": 1, "ignored_tags": "second_release release3"}) == "first_release"