feat : add token option for regex and httpheader. add htmlparser with xpath source

This commit is contained in:
Maud LAURENT 2021-05-07 14:17:53 +02:00
parent 9d2d47ed15
commit 588d824077
4 changed files with 176 additions and 46 deletions

View file

@ -233,6 +233,17 @@ regex
When multiple version strings are found, the maximum of those is chosen.
token
(*Optional*) A personal authorization token used to call the url. The token type depends the authorization required.
- For Bearer token set : ``Bearer Your_bearer_token``
- For Basic token set : ``Basic Your_base64_encoded_token``
To set an authorization token, you can set:
- a key named ``regex_<app_name>`` in the keyfile
- the token option
This source supports :ref:`list options`.
Search in an HTTP header
@ -262,6 +273,41 @@ method
follow_redirects
(*Optional*) Whether to follow 3xx HTTP redirects. Default is ``false``. If you are looking at a ``Location`` header, you shouldn't change this.
token
(*Optional*) A personal authorization token used to call the url. The token type depends the authorization required.
- For Bearer token set : ``Bearer Your_bearer_token``
- For Basic token set : ``Basic Your_base64_encoded_token``
To set an authorization token, you can set:
- a key named ``httpheader_<app_name>`` in the keyfile
- the token option
Search with an HTML Parser
~~~~~~~~~~~~~~~~~~~~~~~~~~
::
source = "htmlparser"
Send an HTTP request and search through the body a specific xpath.
url
The URL of the HTTP request.
xpath
A xpath expression used to find the version string
token
(*Optional*) A personal authorization token used to call the url. The token type depends the authorization required.
- For Bearer token set : ``Bearer Your_bearer_token``
- For Basic token set : ``Basic Your_base64_encoded_token``
To set an authorization token, you can set:
- a key named ``htmlparser_<app_name>`` in the keyfile
- the token option
Find with a Command
~~~~~~~~~~~~~~~~~~~

View file

@ -0,0 +1,48 @@
# MIT licensed
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
from lxml import html, etree
from nvchecker.api import (
VersionResult, Entry, KeyManager,
TemporaryError, session
)
async def get_version(name, conf, **kwargs):
try:
return await get_version_real(name, conf, **kwargs)
except TemporaryError as e:
check_ratelimit(e, name)
async def get_version_real(
name: str, conf: Entry, *, keymanager: KeyManager,
**kwargs,
) -> VersionResult:
encoding = conf.get('encoding', 'latin1')
# Load token from config
token = conf.get('token')
# Load token from keyman
if token is None:
key_name = 'htmlparser_' + name
token = keymanager.get_key(key_name)
# Set private token if token exists.
headers = {}
if token:
headers["Authorization"] = token
data = await session.get(conf.get('url'), headers = headers)
body = html.fromstring(data.body.decode(encoding))
try:
checkxpath = body.xpath(conf.get('xpath'))
except etree.XPathEvalError as e:
raise GetVersionError('bad xpath', exc_info=e)
try:
version = body.xpath(conf.get('xpath'))
except ValueError:
if not conf.get('missing_ok', False):
raise GetVersionError('version string not found.')
return version

View file

@ -3,37 +3,53 @@
import re
import sre_constants
from nvchecker.api import (
VersionResult, Entry, KeyManager,
TemporaryError,session, GetVersionError
)
from nvchecker.api import session, GetVersionError
async def get_version(name, conf, **kwargs):
return await get_version_real(name, conf, **kwargs)
async def get_version(name, conf, *, cache, **kwargs):
key = tuple(sorted(conf.items()))
return await cache.get(key, get_version_impl)
async def get_version_real(
name: str, conf: Entry, *, keymanager: KeyManager,
**kwargs,
) -> VersionResult:
async def get_version_impl(info):
conf = dict(info)
url = conf['url']
header = conf.get('header', 'Location')
follow_redirects = conf.get('follow_redirects', False)
method = conf.get('method', 'HEAD')
url = conf.get('url')
header = conf.get('header', 'Location')
follow_redirects = conf.get('follow_redirects', False)
method = conf.get('method', 'HEAD')
try:
regex = re.compile(conf['regex'])
except sre_constants.error as e:
raise GetVersionError('bad regex', exc_info=e)
# Load token from config
token = conf.get('token')
# Load token from keyman
if token is None:
key_name = 'httpheader_' + name
token = keymanager.get_key(key_name)
res = await session.request(
url,
method = method,
follow_redirects = follow_redirects,
)
# Set private token if token exists.
headers = {}
if token:
headers["Authorization"] = token
header_value = res.headers.get(header)
if not header_value:
raise GetVersionError('header %s not found or is empty' % header)
try:
regex = re.compile(conf['regex'])
except sre_constants.error as e:
raise GetVersionError('bad regex', exc_info=e)
try:
version = regex.findall(header_value)
except ValueError:
raise GetVersionError('version string not found.')
return version
res = await session.request(
url,
method=method,
headers=headers,
follow_redirects=follow_redirects,
)
header_value = res.headers.get(header)
if not header_value:
raise GetVersionError('header %s not found or is empty' % header)
try:
version = regex.findall(header_value)
except ValueError:
raise GetVersionError('version string not found.')
return version

View file

@ -3,28 +3,48 @@
import re
import sre_constants
from nvchecker.api import (
VersionResult, Entry, KeyManager,
TemporaryError, session, GetVersionError
)
from nvchecker.api import session, GetVersionError
async def get_version(name, conf, *, cache, **kwargs):
key = tuple(sorted(conf.items()))
return await cache.get(key, get_version_impl)
async def get_version(name, conf, **kwargs):
try:
return await get_version_real(name, conf, **kwargs)
except TemporaryError as e:
check_ratelimit(e, name)
async def get_version_impl(info):
conf = dict(info)
try:
regex = re.compile(conf['regex'])
except sre_constants.error as e:
raise GetVersionError('bad regex', exc_info=e)
async def get_version_real(
name: str, conf: Entry, *, keymanager: KeyManager,
**kwargs,
) -> VersionResult:
encoding = conf.get('encoding', 'latin1')
# Load token from config
token = conf.get('token')
# Load token from keyman
if token is None:
key_name = 'regex_' + name
token = keymanager.get_key(key_name)
res = await session.get(conf['url'])
body = res.body.decode(encoding)
try:
version = regex.findall(body)
except ValueError:
if not conf.get('missing_ok', False):
raise GetVersionError('version string not found.')
return version
# Set private token if token exists.
headers = {}
if token:
headers["Authorization"] = token
try:
regex = re.compile(conf['regex'])
except sre_constants.error as e:
raise GetVersionError('bad regex', exc_info=e)
encoding = conf.get('encoding', 'latin1')
res = await session.get(conf.get('url'), headers=headers)
body = res.body.decode(encoding)
try:
version = regex.findall(body)
except ValueError:
if not conf.get('missing_ok', False):
raise GetVersionError('version string not found.')
return version