update httpclient implementations

This commit is contained in:
lilydjwg 2020-08-15 16:58:13 +08:00
parent 5e209cc9ad
commit 8520a62271
14 changed files with 268 additions and 104 deletions

2
NEW
View file

@ -1,5 +1,5 @@
TODO: TODO:
* use contextvars for `tries` and `proxy` (passing to `httpclient`)
* update tests * update tests
* update README * update README
* set keyfile via command line?
* create source plugin documentation * create source plugin documentation

View file

@ -7,3 +7,4 @@ from .util import (
Entry, BaseWorker, RawResult, VersionResult, Entry, BaseWorker, RawResult, VersionResult,
) )
from .sortversion import sort_version_keys from .sortversion import sort_version_keys
from .ctxvars import tries, proxy

View file

@ -17,6 +17,7 @@ import types
from pathlib import Path from pathlib import Path
from importlib import import_module from importlib import import_module
import re import re
import contextvars
import structlog import structlog
import toml import toml
@ -29,6 +30,7 @@ from .util import (
) )
from . import __version__ from . import __version__
from .sortversion import sort_version_keys from .sortversion import sort_version_keys
from .ctxvars import tries as ctx_tries
logger = structlog.get_logger(logger_name=__name__) logger = structlog.get_logger(logger_name=__name__)
@ -182,6 +184,9 @@ def dispatch(
tries: int, tries: int,
) -> List[asyncio.Future]: ) -> List[asyncio.Future]:
mods: Dict[str, Tuple[types.ModuleType, List]] = {} mods: Dict[str, Tuple[types.ModuleType, List]] = {}
ctx_tries.set(tries)
root_ctx = contextvars.copy_context()
for name, entry in entries.items(): for name, entry in entries.items():
source = entry.get('source', 'none') source = entry.get('source', 'none')
if source not in mods: if source not in mods:
@ -199,15 +204,16 @@ def dispatch(
else: else:
worker_cls = FunctionWorker worker_cls = FunctionWorker
worker = worker_cls( ctx = root_ctx.copy()
token_q, result_q, tasks, worker = ctx.run(
tries, keymanager, worker_cls,
token_q, result_q, tasks, keymanager,
) )
if worker_cls is FunctionWorker: if worker_cls is FunctionWorker:
func = mod.get_version # type: ignore func = mod.get_version # type: ignore
worker.initialize(func) ctx.run(worker.initialize, func)
ret.append(worker.run()) ret.append(ctx.run(worker.run))
return ret return ret

12
nvchecker/ctxvars.py Normal file
View file

@ -0,0 +1,12 @@
# MIT licensed
# Copyright (c) 2020 lilydjwg <lilydjwg@gmail.com>, et al.
from contextvars import ContextVar
from . import __version__
DEFAULT_USER_AGENT = 'lilydjwg/nvchecker %s' % __version__
tries = ContextVar('tries', default=1)
proxy = ContextVar('proxy', default=None)
user_agent = ContextVar('user_agent', default=DEFAULT_USER_AGENT)

View file

@ -11,9 +11,13 @@ except ImportError:
which = 'aiohttp' which = 'aiohttp'
# connection reuse # connection reuse
except ImportError: except ImportError:
import tornado try:
which = 'tornado' import httpx
# fallback which = 'httpx'
except ImportError:
import tornado
which = 'tornado'
# fallback
m = __import__('%s_httpclient' % which, globals(), locals(), level=1) m = __import__('%s_httpclient' % which, globals(), locals(), level=1)
__all__ = m.__all__ __all__ = m.__all__

View file

@ -1,46 +1,61 @@
# MIT licensed # MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al. # Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
import atexit import atexit
import asyncio import asyncio
from typing import Optional, Dict
import aiohttp import aiohttp
from .httpclient import DEFAULT_USER_AGENT
from .base import BaseSession, TemporaryError, Response
__all__ = ['session']
connector = aiohttp.TCPConnector(limit=20) connector = aiohttp.TCPConnector(limit=20)
__all__ = ['session', 'HTTPError', 'NetworkErrors'] class AiohttpSession(BaseSession):
def __init__(self):
self.session = aiohttp.ClientSession(
connector = aiohttp.TCPConnector(limit=20),
timeout = aiohttp.ClientTimeout(total=20),
trust_env = True,
)
class HTTPError(Exception): async def request_impl(
def __init__(self, code, message, response): self, url: str, *,
self.code = code method: str,
self.message = message proxy: Optional[str] = None,
self.response = response headers: Dict[str, str] = {},
params = (),
json = None,
) -> Response:
kwargs = {
'method': method,
'headers': headers,
}
class BetterClientSession(aiohttp.ClientSession): if proxy is not None:
async def _request(self, *args, **kwargs): kwargs['proxy'] = proxy
if hasattr(self, "nv_config") and self.nv_config.get("proxy"):
kwargs.setdefault("proxy", self.nv_config.get("proxy"))
kwargs.setdefault("headers", {}).setdefault('User-Agent', DEFAULT_USER_AGENT) try:
res = await self.session.request(
url, **kwargs)
except (
asyncio.TimeoutError, aiohttp.ClientConnectorError,
) as e:
raise TemporaryError(599, repr(e), e)
res = await super(BetterClientSession, self)._request( if res.status >= 500:
*args, **kwargs) raise TemporaryError(res.status, res.reason, res)
if res.status >= 400: else:
raise HTTPError(res.status, res.reason, res) res.raise_for_status()
return res
session = BetterClientSession( body = await res.content
connector = connector, return Response(body)
timeout = aiohttp.ClientTimeout(total=20),
trust_env = True,
)
@atexit.register @atexit.register
def cleanup(): def cleanup():
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.run_until_complete(session.close()) loop.run_until_complete(session.close())
NetworkErrors = ( session = AiohttpSession()
asyncio.TimeoutError,
aiohttp.ClientConnectorError,
)

View file

@ -0,0 +1,65 @@
# MIT licensed
# Copyright (c) 2019-2020 lilydjwg <lilydjwg@gmail.com>, et al.
import structlog
from typing import Optional, Dict
import json as _json
from ..ctxvars import tries, proxy, user_agent
logger = structlog.get_logger(logger_name=__name__)
class Response:
def __init__(self, body):
self.body = body
def json(self):
return _json.loads(self.body.decode('utf-8'))
class BaseSession:
async def get(self, *args, **kwargs):
return await self.request(
method='GET', *args, **kwargs)
async def post(self, *args, **kwargs):
return await self.request(
method='POST', *args, **kwargs)
async def request(self, *args, **kwargs):
t = tries.get()
p = proxy.get()
ua = user_agent.get()
headers = kwargs.setdefault('headers', {})
headers['User-Agent'] = ua
for i in range(1, t+1):
try:
return await self.request_impl(
proxy = p,
*args, **kwargs,
)
except TemporaryError as e:
if i == t:
raise
else:
logger.warning('temporary error, retrying',
tries = i, exc_info = e)
continue
async def request_impl(
self, url: str, *,
method: str,
proxy: Optional[str] = None,
headers: Dict[str, str] = {},
params = (),
json = None,
) -> Response:
raise NotImplementedError
class TemporaryError(Exception):
def __init__(self, code, message, response):
self.code = code
self.message = message
self.response = response

View file

@ -1,6 +0,0 @@
# MIT licensed
# Copyright (c) 2019 lilydjwg <lilydjwg@gmail.com>, et al.
from .. import __version__
DEFAULT_USER_AGENT = 'lilydjwg/nvchecker %s' % __version__

View file

@ -0,0 +1,66 @@
# MIT licensed
# Copyright (c) 2020 lilydjwg <lilydjwg@gmail.com>, et al.
import atexit
from typing import Dict, Optional
import httpx
from .base import BaseSession, TemporaryError, Response
__all__ = ['session']
class HttpxSession(BaseSession):
def __init__(self):
self.clients = {}
async def request_impl(
self, url: str, *,
method: str,
proxy: Optional[str] = None,
headers: Dict[str, str] = {},
params = (),
json = None,
) -> Response:
client = self.clients.get(proxy)
if not client:
client = httpx.AsyncClient(
timeout = httpx.Timeout(20, pool_timeout=None),
http2 = True,
proxies = {'all://': proxy},
)
self.clients[proxy] = client
try:
r = await client.request(
method, url, json = json,
headers = headers,
params = params,
)
if r.status_code >= 500:
raise TemporaryError(
r.status_code,
r.reason_phrase,
r,
)
else:
r.raise_for_status()
except httpx.TransportError as e:
raise TemporaryError(599, repr(e), e)
body = await r.aread()
return Response(body)
async def aclose(self):
for client in self.clients.values():
await client.aclose()
del self.clients
@atexit.register
def cleanup():
import asyncio
loop = asyncio.get_event_loop()
loop.run_until_complete(session.aclose())
session = HttpxSession()

View file

@ -1,11 +1,11 @@
# MIT licensed # MIT licensed
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al. # Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
import json import json as _json
from urllib.parse import urlencode from urllib.parse import urlencode
from typing import Optional, Dict, Any
from tornado.httpclient import AsyncHTTPClient, HTTPRequest, HTTPResponse from tornado.httpclient import AsyncHTTPClient, HTTPRequest
from tornado.httpclient import HTTPError
try: try:
import pycurl import pycurl
@ -13,9 +13,9 @@ try:
except ImportError: except ImportError:
pycurl = None # type: ignore pycurl = None # type: ignore
from .httpclient import DEFAULT_USER_AGENT from .base import BaseSession, TemporaryError, Response
__all__ = ['session', 'HTTPError', 'NetworkErrors'] __all__ = ['session']
HTTP2_AVAILABLE = None if pycurl else False HTTP2_AVAILABLE = None if pycurl else False
@ -30,54 +30,43 @@ def try_use_http2(curl):
elif HTTP2_AVAILABLE: elif HTTP2_AVAILABLE:
curl.setopt(pycurl.HTTP_VERSION, 4) curl.setopt(pycurl.HTTP_VERSION, 4)
class Session: class TornadoSession(BaseSession):
def post(self, url, **kwargs): async def request_impl(
j = kwargs.pop('json', None) self, url: str, *,
if j: method: str,
kwargs['body'] = json.dumps(j) proxy: Optional[str] = None,
return self.get(url, method='POST', **kwargs) headers: Dict[str, str] = {},
params = (),
json = None,
) -> Response:
kwargs: Dict[str, Any] = {
'method': method,
'headers': headers,
}
def get(self, url, **kwargs): if json:
kwargs['body'] = _json.dumps(json)
kwargs['prepare_curl_callback'] = try_use_http2 kwargs['prepare_curl_callback'] = try_use_http2
proxy = kwargs.get('proxy')
if proxy:
del kwargs['proxy']
elif hasattr(self, 'nv_config') and self.nv_config.get('proxy'):
proxy = self.nv_config.get('proxy')
if proxy: if proxy:
host, port = proxy.rsplit(':', 1) host, port = proxy.rsplit(':', 1)
kwargs['proxy_host'] = host kwargs['proxy_host'] = host
kwargs['proxy_port'] = int(port) kwargs['proxy_port'] = int(port)
params = kwargs.get('params')
if params: if params:
del kwargs['params']
q = urlencode(params) q = urlencode(params)
url += '?' + q url += '?' + q
kwargs.setdefault("headers", {}).setdefault('User-Agent', DEFAULT_USER_AGENT)
r = HTTPRequest(url, **kwargs) r = HTTPRequest(url, **kwargs)
return ResponseManager(r) res = await AsyncHTTPClient().fetch(
r, raise_error=False)
if res.code >= 500:
raise TemporaryError(
res.code, res.reason, res
)
else:
res.rethrow()
class ResponseManager: return Response(res.body)
def __init__(self, req):
self.req = req
async def __aenter__(self): session = TornadoSession()
return await AsyncHTTPClient().fetch(self.req)
async def __aexit__(self, exc_type, exc, tb):
pass
async def json_response(self, **kwargs):
return json.loads(self.body.decode('utf-8'))
async def read(self):
return self.body
HTTPResponse.json = json_response # type: ignore
HTTPResponse.read = read # type: ignore
session = Session()
NetworkErrors = ()

View file

@ -10,7 +10,7 @@ import sys
import structlog import structlog
from .httpclient import HTTPError, NetworkErrors # type: ignore from .httpclient.base import TemporaryError
def _console_msg(event): def _console_msg(event):
evt = event['event'] evt = event['event']
@ -49,11 +49,9 @@ def filter_exc(logger, level, event):
else: else:
exc = exc_info exc = exc_info
if isinstance(exc, HTTPError): if isinstance(exc, TemporaryError):
if exc.code == 599: # tornado timeout if exc.code == 599: # network issues
del event['exc_info'] del event['exc_info']
elif isinstance(exc, NetworkErrors):
del event['exc_info']
event['error'] = exc event['error'] = exc
return event return event

View file

@ -12,11 +12,15 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
from pathlib import Path from pathlib import Path
import contextvars
import toml import toml
import structlog import structlog
from .httpclient import session # type: ignore from .httpclient import session # type: ignore
from .ctxvars import tries as ctx_tries
from .ctxvars import proxy as ctx_proxy
from .ctxvars import user_agent as ctx_ua
logger = structlog.get_logger(logger_name=__name__) logger = structlog.get_logger(logger_name=__name__)
@ -55,12 +59,10 @@ class BaseWorker:
token_q: Queue[bool], token_q: Queue[bool],
result_q: Queue[RawResult], result_q: Queue[RawResult],
tasks: List[Tuple[str, Entry]], tasks: List[Tuple[str, Entry]],
tries: int,
keymanager: KeyManager, keymanager: KeyManager,
) -> None: ) -> None:
self.token_q = token_q self.token_q = token_q
self.result_q = result_q self.result_q = result_q
self.tries = tries
self.keymanager = keymanager self.keymanager = keymanager
self.tasks = tasks self.tasks = tasks
@ -85,8 +87,8 @@ class AsyncCache:
async def _get_json(self, key: Tuple[str, str]) -> Any: async def _get_json(self, key: Tuple[str, str]) -> Any:
url = key[1] url = key[1]
async with session.get(url) as res: res = await session.get(url)
return await res.json(content_type=None) return res.json()
async def get_json(self, url: str) -> Any: async def get_json(self, url: str) -> Any:
return await self.get( return await self.get(
@ -136,18 +138,30 @@ class FunctionWorker(BaseWorker):
self.cache = AsyncCache() self.cache = AsyncCache()
async def run(self) -> None: async def run(self) -> None:
futures = [ futures = []
self.run_one(name, entry) for name, entry in self.tasks:
for name, entry in self.tasks ctx = contextvars.copy_context()
] fu = ctx.run(self.run_one, name, entry)
for fu in asyncio.as_completed(futures): futures.append(fu)
await fu
for fu2 in asyncio.as_completed(futures):
await fu2
async def run_one( async def run_one(
self, name: str, entry: Entry, self, name: str, entry: Entry,
) -> None: ) -> None:
assert self.func is not None assert self.func is not None
tries = entry.get('tries', None)
if tries is not None:
ctx_tries.set(tries)
proxy = entry.get('proxy', None)
if tries is not None:
ctx_proxy.set(proxy)
ua = entry.get('user_agent', None)
if ua is not None:
ctx_ua.set(ua)
try: try:
async with self.acquire_token(): async with self.acquire_token():
version = await self.func( version = await self.func(

View file

@ -6,8 +6,8 @@ from nvchecker.api import session, GetVersionError
URL = 'https://www.archlinux.org/packages/search/json/' URL = 'https://www.archlinux.org/packages/search/json/'
async def request(pkg): async def request(pkg):
async with session.get(URL, params={"name": pkg}) as res: res = await session.get(URL, params={"name": pkg})
return await res.json() return res.json()
async def get_version(name, conf, *, cache, **kwargs): async def get_version(name, conf, *, cache, **kwargs):
pkg = conf.get('archpkg') or name pkg = conf.get('archpkg') or name

View file

@ -25,8 +25,8 @@ class AurResults:
params = [('v', '5'), ('type', 'info')] params = [('v', '5'), ('type', 'info')]
params.extend(('arg[]', name) for name in aurnames params.extend(('arg[]', name) for name in aurnames
if name not in self.cache) if name not in self.cache)
async with session.get(AUR_URL, params=params) as res: res = await session.get(AUR_URL, params=params)
data = await res.json() data = res.json()
new_results = {r['Name']: r for r in data['results']} new_results = {r['Name']: r for r in data['results']}
cache = self.cache cache = self.cache