From 8520a62271d1300a49c67b910528440279c0fe79 Mon Sep 17 00:00:00 2001 From: lilydjwg Date: Sat, 15 Aug 2020 16:58:13 +0800 Subject: [PATCH] update httpclient implementations --- NEW | 2 +- nvchecker/api.py | 1 + nvchecker/core.py | 16 +++-- nvchecker/ctxvars.py | 12 ++++ nvchecker/httpclient/__init__.py | 10 ++- nvchecker/httpclient/aiohttp_httpclient.py | 73 +++++++++++++--------- nvchecker/httpclient/base.py | 65 +++++++++++++++++++ nvchecker/httpclient/httpclient.py | 6 -- nvchecker/httpclient/httpx_httpclient.py | 66 +++++++++++++++++++ nvchecker/httpclient/tornado_httpclient.py | 71 +++++++++------------ nvchecker/slogconf.py | 8 +-- nvchecker/util.py | 34 +++++++--- nvchecker_source/archpkg.py | 4 +- nvchecker_source/aur.py | 4 +- 14 files changed, 268 insertions(+), 104 deletions(-) create mode 100644 nvchecker/ctxvars.py create mode 100644 nvchecker/httpclient/base.py delete mode 100644 nvchecker/httpclient/httpclient.py create mode 100644 nvchecker/httpclient/httpx_httpclient.py diff --git a/NEW b/NEW index a029b18..0b8312c 100644 --- a/NEW +++ b/NEW @@ -1,5 +1,5 @@ TODO: -* use contextvars for `tries` and `proxy` (passing to `httpclient`) * update tests * update README +* set keyfile via command line? * create source plugin documentation diff --git a/nvchecker/api.py b/nvchecker/api.py index 0a57541..8258126 100644 --- a/nvchecker/api.py +++ b/nvchecker/api.py @@ -7,3 +7,4 @@ from .util import ( Entry, BaseWorker, RawResult, VersionResult, ) from .sortversion import sort_version_keys +from .ctxvars import tries, proxy diff --git a/nvchecker/core.py b/nvchecker/core.py index d7c4baf..5346d1a 100644 --- a/nvchecker/core.py +++ b/nvchecker/core.py @@ -17,6 +17,7 @@ import types from pathlib import Path from importlib import import_module import re +import contextvars import structlog import toml @@ -29,6 +30,7 @@ from .util import ( ) from . import __version__ from .sortversion import sort_version_keys +from .ctxvars import tries as ctx_tries logger = structlog.get_logger(logger_name=__name__) @@ -182,6 +184,9 @@ def dispatch( tries: int, ) -> List[asyncio.Future]: mods: Dict[str, Tuple[types.ModuleType, List]] = {} + ctx_tries.set(tries) + root_ctx = contextvars.copy_context() + for name, entry in entries.items(): source = entry.get('source', 'none') if source not in mods: @@ -199,15 +204,16 @@ def dispatch( else: worker_cls = FunctionWorker - worker = worker_cls( - token_q, result_q, tasks, - tries, keymanager, + ctx = root_ctx.copy() + worker = ctx.run( + worker_cls, + token_q, result_q, tasks, keymanager, ) if worker_cls is FunctionWorker: func = mod.get_version # type: ignore - worker.initialize(func) + ctx.run(worker.initialize, func) - ret.append(worker.run()) + ret.append(ctx.run(worker.run)) return ret diff --git a/nvchecker/ctxvars.py b/nvchecker/ctxvars.py new file mode 100644 index 0000000..1eb66fe --- /dev/null +++ b/nvchecker/ctxvars.py @@ -0,0 +1,12 @@ +# MIT licensed +# Copyright (c) 2020 lilydjwg , et al. + +from contextvars import ContextVar + +from . import __version__ + +DEFAULT_USER_AGENT = 'lilydjwg/nvchecker %s' % __version__ + +tries = ContextVar('tries', default=1) +proxy = ContextVar('proxy', default=None) +user_agent = ContextVar('user_agent', default=DEFAULT_USER_AGENT) diff --git a/nvchecker/httpclient/__init__.py b/nvchecker/httpclient/__init__.py index 024d9b0..dd1f6dd 100644 --- a/nvchecker/httpclient/__init__.py +++ b/nvchecker/httpclient/__init__.py @@ -11,9 +11,13 @@ except ImportError: which = 'aiohttp' # connection reuse except ImportError: - import tornado - which = 'tornado' - # fallback + try: + import httpx + which = 'httpx' + except ImportError: + import tornado + which = 'tornado' + # fallback m = __import__('%s_httpclient' % which, globals(), locals(), level=1) __all__ = m.__all__ diff --git a/nvchecker/httpclient/aiohttp_httpclient.py b/nvchecker/httpclient/aiohttp_httpclient.py index de5c743..0737801 100644 --- a/nvchecker/httpclient/aiohttp_httpclient.py +++ b/nvchecker/httpclient/aiohttp_httpclient.py @@ -1,46 +1,61 @@ # MIT licensed -# Copyright (c) 2013-2017 lilydjwg , et al. +# Copyright (c) 2013-2020 lilydjwg , et al. import atexit import asyncio +from typing import Optional, Dict + import aiohttp -from .httpclient import DEFAULT_USER_AGENT + +from .base import BaseSession, TemporaryError, Response + +__all__ = ['session'] connector = aiohttp.TCPConnector(limit=20) -__all__ = ['session', 'HTTPError', 'NetworkErrors'] +class AiohttpSession(BaseSession): + def __init__(self): + self.session = aiohttp.ClientSession( + connector = aiohttp.TCPConnector(limit=20), + timeout = aiohttp.ClientTimeout(total=20), + trust_env = True, + ) -class HTTPError(Exception): - def __init__(self, code, message, response): - self.code = code - self.message = message - self.response = response + async def request_impl( + self, url: str, *, + method: str, + proxy: Optional[str] = None, + headers: Dict[str, str] = {}, + params = (), + json = None, + ) -> Response: + kwargs = { + 'method': method, + 'headers': headers, + } -class BetterClientSession(aiohttp.ClientSession): - async def _request(self, *args, **kwargs): - if hasattr(self, "nv_config") and self.nv_config.get("proxy"): - kwargs.setdefault("proxy", self.nv_config.get("proxy")) + if proxy is not None: + kwargs['proxy'] = proxy - kwargs.setdefault("headers", {}).setdefault('User-Agent', DEFAULT_USER_AGENT) + try: + res = await self.session.request( + url, **kwargs) + except ( + asyncio.TimeoutError, aiohttp.ClientConnectorError, + ) as e: + raise TemporaryError(599, repr(e), e) - res = await super(BetterClientSession, self)._request( - *args, **kwargs) - if res.status >= 400: - raise HTTPError(res.status, res.reason, res) - return res + if res.status >= 500: + raise TemporaryError(res.status, res.reason, res) + else: + res.raise_for_status() -session = BetterClientSession( - connector = connector, - timeout = aiohttp.ClientTimeout(total=20), - trust_env = True, -) + body = await res.content + return Response(body) @atexit.register def cleanup(): - loop = asyncio.get_event_loop() - loop.run_until_complete(session.close()) + loop = asyncio.get_event_loop() + loop.run_until_complete(session.close()) -NetworkErrors = ( - asyncio.TimeoutError, - aiohttp.ClientConnectorError, -) +session = AiohttpSession() diff --git a/nvchecker/httpclient/base.py b/nvchecker/httpclient/base.py new file mode 100644 index 0000000..a56c12c --- /dev/null +++ b/nvchecker/httpclient/base.py @@ -0,0 +1,65 @@ +# MIT licensed +# Copyright (c) 2019-2020 lilydjwg , et al. + +import structlog +from typing import Optional, Dict +import json as _json + +from ..ctxvars import tries, proxy, user_agent + +logger = structlog.get_logger(logger_name=__name__) + +class Response: + def __init__(self, body): + self.body = body + + def json(self): + return _json.loads(self.body.decode('utf-8')) + +class BaseSession: + async def get(self, *args, **kwargs): + return await self.request( + method='GET', *args, **kwargs) + + async def post(self, *args, **kwargs): + return await self.request( + method='POST', *args, **kwargs) + + async def request(self, *args, **kwargs): + t = tries.get() + p = proxy.get() + ua = user_agent.get() + + headers = kwargs.setdefault('headers', {}) + headers['User-Agent'] = ua + + for i in range(1, t+1): + try: + return await self.request_impl( + proxy = p, + *args, **kwargs, + ) + except TemporaryError as e: + if i == t: + raise + else: + logger.warning('temporary error, retrying', + tries = i, exc_info = e) + continue + + async def request_impl( + self, url: str, *, + method: str, + proxy: Optional[str] = None, + headers: Dict[str, str] = {}, + params = (), + json = None, + ) -> Response: + raise NotImplementedError + +class TemporaryError(Exception): + def __init__(self, code, message, response): + self.code = code + self.message = message + self.response = response + diff --git a/nvchecker/httpclient/httpclient.py b/nvchecker/httpclient/httpclient.py deleted file mode 100644 index 6776a1e..0000000 --- a/nvchecker/httpclient/httpclient.py +++ /dev/null @@ -1,6 +0,0 @@ -# MIT licensed -# Copyright (c) 2019 lilydjwg , et al. - -from .. import __version__ - -DEFAULT_USER_AGENT = 'lilydjwg/nvchecker %s' % __version__ diff --git a/nvchecker/httpclient/httpx_httpclient.py b/nvchecker/httpclient/httpx_httpclient.py new file mode 100644 index 0000000..b1161a4 --- /dev/null +++ b/nvchecker/httpclient/httpx_httpclient.py @@ -0,0 +1,66 @@ +# MIT licensed +# Copyright (c) 2020 lilydjwg , et al. + +import atexit +from typing import Dict, Optional + +import httpx + +from .base import BaseSession, TemporaryError, Response + +__all__ = ['session'] + +class HttpxSession(BaseSession): + def __init__(self): + self.clients = {} + + async def request_impl( + self, url: str, *, + method: str, + proxy: Optional[str] = None, + headers: Dict[str, str] = {}, + params = (), + json = None, + ) -> Response: + client = self.clients.get(proxy) + if not client: + client = httpx.AsyncClient( + timeout = httpx.Timeout(20, pool_timeout=None), + http2 = True, + proxies = {'all://': proxy}, + ) + self.clients[proxy] = client + + try: + r = await client.request( + method, url, json = json, + headers = headers, + params = params, + ) + if r.status_code >= 500: + raise TemporaryError( + r.status_code, + r.reason_phrase, + r, + ) + else: + r.raise_for_status() + + except httpx.TransportError as e: + raise TemporaryError(599, repr(e), e) + + body = await r.aread() + return Response(body) + + async def aclose(self): + for client in self.clients.values(): + await client.aclose() + del self.clients + +@atexit.register +def cleanup(): + import asyncio + loop = asyncio.get_event_loop() + loop.run_until_complete(session.aclose()) + +session = HttpxSession() diff --git a/nvchecker/httpclient/tornado_httpclient.py b/nvchecker/httpclient/tornado_httpclient.py index 124a3d7..8b51080 100644 --- a/nvchecker/httpclient/tornado_httpclient.py +++ b/nvchecker/httpclient/tornado_httpclient.py @@ -1,11 +1,11 @@ # MIT licensed # Copyright (c) 2013-2020 lilydjwg , et al. -import json +import json as _json from urllib.parse import urlencode +from typing import Optional, Dict, Any -from tornado.httpclient import AsyncHTTPClient, HTTPRequest, HTTPResponse -from tornado.httpclient import HTTPError +from tornado.httpclient import AsyncHTTPClient, HTTPRequest try: import pycurl @@ -13,9 +13,9 @@ try: except ImportError: pycurl = None # type: ignore -from .httpclient import DEFAULT_USER_AGENT +from .base import BaseSession, TemporaryError, Response -__all__ = ['session', 'HTTPError', 'NetworkErrors'] +__all__ = ['session'] HTTP2_AVAILABLE = None if pycurl else False @@ -30,54 +30,43 @@ def try_use_http2(curl): elif HTTP2_AVAILABLE: curl.setopt(pycurl.HTTP_VERSION, 4) -class Session: - def post(self, url, **kwargs): - j = kwargs.pop('json', None) - if j: - kwargs['body'] = json.dumps(j) - return self.get(url, method='POST', **kwargs) +class TornadoSession(BaseSession): + async def request_impl( + self, url: str, *, + method: str, + proxy: Optional[str] = None, + headers: Dict[str, str] = {}, + params = (), + json = None, + ) -> Response: + kwargs: Dict[str, Any] = { + 'method': method, + 'headers': headers, + } - def get(self, url, **kwargs): + if json: + kwargs['body'] = _json.dumps(json) kwargs['prepare_curl_callback'] = try_use_http2 - proxy = kwargs.get('proxy') - if proxy: - del kwargs['proxy'] - elif hasattr(self, 'nv_config') and self.nv_config.get('proxy'): - proxy = self.nv_config.get('proxy') if proxy: host, port = proxy.rsplit(':', 1) kwargs['proxy_host'] = host kwargs['proxy_port'] = int(port) - params = kwargs.get('params') if params: - del kwargs['params'] q = urlencode(params) url += '?' + q - kwargs.setdefault("headers", {}).setdefault('User-Agent', DEFAULT_USER_AGENT) r = HTTPRequest(url, **kwargs) - return ResponseManager(r) + res = await AsyncHTTPClient().fetch( + r, raise_error=False) + if res.code >= 500: + raise TemporaryError( + res.code, res.reason, res + ) + else: + res.rethrow() -class ResponseManager: - def __init__(self, req): - self.req = req + return Response(res.body) - async def __aenter__(self): - return await AsyncHTTPClient().fetch(self.req) - - async def __aexit__(self, exc_type, exc, tb): - pass - -async def json_response(self, **kwargs): - return json.loads(self.body.decode('utf-8')) - -async def read(self): - return self.body - -HTTPResponse.json = json_response # type: ignore -HTTPResponse.read = read # type: ignore -session = Session() - -NetworkErrors = () +session = TornadoSession() diff --git a/nvchecker/slogconf.py b/nvchecker/slogconf.py index 73cd092..c0db526 100644 --- a/nvchecker/slogconf.py +++ b/nvchecker/slogconf.py @@ -10,7 +10,7 @@ import sys import structlog -from .httpclient import HTTPError, NetworkErrors # type: ignore +from .httpclient.base import TemporaryError def _console_msg(event): evt = event['event'] @@ -49,11 +49,9 @@ def filter_exc(logger, level, event): else: exc = exc_info - if isinstance(exc, HTTPError): - if exc.code == 599: # tornado timeout + if isinstance(exc, TemporaryError): + if exc.code == 599: # network issues del event['exc_info'] - elif isinstance(exc, NetworkErrors): - del event['exc_info'] event['error'] = exc return event diff --git a/nvchecker/util.py b/nvchecker/util.py index 6ad26c2..377acd6 100644 --- a/nvchecker/util.py +++ b/nvchecker/util.py @@ -12,11 +12,15 @@ from typing import ( TYPE_CHECKING, ) from pathlib import Path +import contextvars import toml import structlog from .httpclient import session # type: ignore +from .ctxvars import tries as ctx_tries +from .ctxvars import proxy as ctx_proxy +from .ctxvars import user_agent as ctx_ua logger = structlog.get_logger(logger_name=__name__) @@ -55,12 +59,10 @@ class BaseWorker: token_q: Queue[bool], result_q: Queue[RawResult], tasks: List[Tuple[str, Entry]], - tries: int, keymanager: KeyManager, ) -> None: self.token_q = token_q self.result_q = result_q - self.tries = tries self.keymanager = keymanager self.tasks = tasks @@ -85,8 +87,8 @@ class AsyncCache: async def _get_json(self, key: Tuple[str, str]) -> Any: url = key[1] - async with session.get(url) as res: - return await res.json(content_type=None) + res = await session.get(url) + return res.json() async def get_json(self, url: str) -> Any: return await self.get( @@ -136,18 +138,30 @@ class FunctionWorker(BaseWorker): self.cache = AsyncCache() async def run(self) -> None: - futures = [ - self.run_one(name, entry) - for name, entry in self.tasks - ] - for fu in asyncio.as_completed(futures): - await fu + futures = [] + for name, entry in self.tasks: + ctx = contextvars.copy_context() + fu = ctx.run(self.run_one, name, entry) + futures.append(fu) + + for fu2 in asyncio.as_completed(futures): + await fu2 async def run_one( self, name: str, entry: Entry, ) -> None: assert self.func is not None + tries = entry.get('tries', None) + if tries is not None: + ctx_tries.set(tries) + proxy = entry.get('proxy', None) + if tries is not None: + ctx_proxy.set(proxy) + ua = entry.get('user_agent', None) + if ua is not None: + ctx_ua.set(ua) + try: async with self.acquire_token(): version = await self.func( diff --git a/nvchecker_source/archpkg.py b/nvchecker_source/archpkg.py index 2f6efec..230752a 100644 --- a/nvchecker_source/archpkg.py +++ b/nvchecker_source/archpkg.py @@ -6,8 +6,8 @@ from nvchecker.api import session, GetVersionError URL = 'https://www.archlinux.org/packages/search/json/' async def request(pkg): - async with session.get(URL, params={"name": pkg}) as res: - return await res.json() + res = await session.get(URL, params={"name": pkg}) + return res.json() async def get_version(name, conf, *, cache, **kwargs): pkg = conf.get('archpkg') or name diff --git a/nvchecker_source/aur.py b/nvchecker_source/aur.py index 4d141b4..29c87da 100644 --- a/nvchecker_source/aur.py +++ b/nvchecker_source/aur.py @@ -25,8 +25,8 @@ class AurResults: params = [('v', '5'), ('type', 'info')] params.extend(('arg[]', name) for name in aurnames if name not in self.cache) - async with session.get(AUR_URL, params=params) as res: - data = await res.json() + res = await session.get(AUR_URL, params=params) + data = res.json() new_results = {r['Name']: r for r in data['results']} cache = self.cache