option to not verify HTTPS certificate

fixes #190.
This commit is contained in:
lilydjwg 2021-06-23 16:00:17 +08:00
parent 3eee6480ab
commit ffaca8c949
11 changed files with 51 additions and 6 deletions

View file

@ -19,5 +19,10 @@ jobs:
${{ runner.os }}-cache-pip-
- name: Install deps
run: pip3 install -U tornado pytest pytest-asyncio pytest-httpbin flaky structlog toml aiohttp httpx mypy
- name: Run mypy for --install-types
run: PATH=$HOME/.local/bin:$PATH mypy nvchecker nvchecker_source tests
continue-on-error: true
- name: Install types
run: PATH=$HOME/.local/bin:$PATH yes | mypy --install-types
- name: Run mypy
run: PATH=$HOME/.local/bin:$PATH mypy nvchecker nvchecker_source tests

View file

@ -18,6 +18,7 @@
.. autodata:: nvchecker.api.proxy
.. autodata:: nvchecker.api.user_agent
.. autodata:: nvchecker.api.tries
.. autodata:: nvchecker.api.verify_cert
.. py:data:: nvchecker.api.entry_waiter
:type: contextvars.ContextVar

View file

@ -185,6 +185,9 @@ httptoken
In the keyfile add ``httptoken_{name}`` token.
verify_cert
Whether to verify the HTTPS certificate or not. Default is ``true``.
If both ``prefix`` and ``from_pattern``/``to_pattern`` are used,
``from_pattern``/``to_pattern`` are ignored. If you want to strip the prefix
and then do something special, just use ``from_pattern```/``to_pattern``. For

View file

@ -18,3 +18,4 @@ proxy: ContextVar[Optional[str]] = ContextVar('proxy', default=None)
user_agent = ContextVar('user_agent', default=DEFAULT_USER_AGENT)
httptoken = ContextVar('httptoken', default=None)
entry_waiter: ContextVar[EntryWaiter] = ContextVar('entry_waiter')
verify_cert = ContextVar('verify_cert', default=True)

View file

@ -35,12 +35,15 @@ class AiohttpSession(BaseSession):
follow_redirects: bool = True,
params = (),
json = None,
verify_cert: bool = True,
) -> Response:
kwargs = {
'headers': headers,
'params': params,
'allow_redirects': follow_redirects,
}
if not verify_cert:
kwargs['ssl'] = False
if proxy is not None:
kwargs['proxy'] = proxy

View file

@ -5,7 +5,7 @@ import structlog
from typing import Optional, Dict, Mapping
import json as _json
from ..ctxvars import tries, proxy, user_agent, httptoken
from ..ctxvars import tries, proxy, user_agent, httptoken, verify_cert
logger = structlog.get_logger(logger_name=__name__)
@ -66,6 +66,7 @@ class BaseSession:
p = proxy.get()
ua = user_agent.get()
httpt = httptoken.get()
verify = verify_cert.get()
headers = headers.copy()
headers.setdefault('User-Agent', ua)
@ -82,6 +83,7 @@ class BaseSession:
follow_redirects = follow_redirects,
json = json,
proxy = p or None,
verify_cert = verify,
)
except TemporaryError as e:
if i == t:
@ -101,6 +103,7 @@ class BaseSession:
follow_redirects: bool = True,
params = (),
json = None,
verify_cert: bool = True,
) -> Response:
''':meta private:'''
raise NotImplementedError

View file

@ -2,7 +2,7 @@
# Copyright (c) 2020 lilydjwg <lilydjwg@gmail.com>, et al.
import atexit
from typing import Dict, Optional
from typing import Dict, Optional, Tuple
import httpx
@ -16,7 +16,7 @@ class HttpxSession(BaseSession):
concurreny: int = 20,
timeout: int = 20,
) -> None:
self.clients: Dict[Optional[str], httpx.AsyncClient] = {}
self.clients: Dict[Tuple[Optional[str], bool], httpx.AsyncClient] = {}
self.timeout = timeout
async def request_impl(
@ -27,15 +27,17 @@ class HttpxSession(BaseSession):
follow_redirects: bool = True,
params = (),
json = None,
verify_cert: bool = True,
) -> Response:
client = self.clients.get(proxy)
client = self.clients.get((proxy, verify_cert))
if not client:
client = httpx.AsyncClient(
timeout = httpx.Timeout(self.timeout, pool=None),
http2 = True,
proxies = {'all://': proxy},
verify = verify_cert,
)
self.clients[proxy] = client
self.clients[(proxy, verify_cert)] = client
try:
r = await client.request(

View file

@ -52,12 +52,14 @@ class TornadoSession(BaseSession):
follow_redirects: bool = True,
params = (),
json = None,
verify_cert: bool = True,
) -> Response:
kwargs: Dict[str, Any] = {
'method': method,
'headers': headers,
'request_timeout': self.timeout,
'follow_redirects': follow_redirects,
'validate_cert': verify_cert,
}
if json:

View file

@ -22,6 +22,7 @@ from .ctxvars import tries as ctx_tries
from .ctxvars import proxy as ctx_proxy
from .ctxvars import user_agent as ctx_ua
from .ctxvars import httptoken as ctx_httpt
from .ctxvars import verify_cert as ctx_verify_cert
logger = structlog.get_logger(logger_name=__name__)
@ -252,6 +253,9 @@ class FunctionWorker(BaseWorker):
httpt = self.keymanager.get_key('httptoken_'+name)
if httpt is not None:
ctx_httpt.set(httpt)
verify_cert = entry.get('verify_cert', None)
if verify_cert is not None:
ctx_verify_cert.set(verify_cert)
try:
async with self.task_sem:

View file

@ -9,4 +9,4 @@ async def test_anitya(get_version):
assert await get_version("shutter", {
"source": "anitya",
"anitya": "fedora/shutter",
}) == "0.96"
}) == "0.97"

View file

@ -58,3 +58,24 @@ async def test_regex_with_tokenBearer(get_version, httpbin):
"httptoken": "Bearer username:password",
"regex": r'"token":"([a-w]+):.*"',
}) == "username"
async def test_regex_no_verify_ssl(get_version, httpbin_secure):
assert await get_version("example", {
"source": "regex",
"url": httpbin_secure.url + "/base64/" + base64_encode("version 1.12 released"),
"regex": r'version ([0-9.]+)',
"verify_cert": False,
}) == "1.12"
async def test_regex_bad_ssl(get_version, httpbin_secure):
try:
await get_version("example", {
"source": "regex",
"url": httpbin_secure.url + "/base64/" + base64_encode("version 1.12 released"),
"regex": r'version ([0-9.]+)',
})
except Exception:
pass
else:
assert False, 'certificate should not be trusted'