diff --git a/docs/plugin.rst b/docs/plugin.rst index df0de6c..e1abc30 100644 --- a/docs/plugin.rst +++ b/docs/plugin.rst @@ -52,8 +52,8 @@ If you want to send an HTTP request, it's preferred to use :meth: `nvchecker.api.session` object. It will use the auto-selected HTTP backend and handle the ``proxy`` option automatically. -For details about these objects, see :mod:`the API documentation -`, or take existing source plugins as examples. +For details about these objects, see :mod:`the API documentation `, +or take existing source plugins as examples. How to write a more powerful plugin ----------------------------------- diff --git a/docs/usage.rst b/docs/usage.rst index f6265fd..9740e96 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -134,6 +134,9 @@ proxy max_concurrency Max number of concurrent jobs. Default: 20. +http_timeout + Time in seconds to wait for HTTP requests. Default: 20. + keyfile Specify an ini config file containing key (token) information. This file should contain a ``keys`` table, mapping key names to key values. See diff --git a/nvchecker/__main__.py b/nvchecker/__main__.py index 5cb9af9..9401b58 100755 --- a/nvchecker/__main__.py +++ b/nvchecker/__main__.py @@ -46,8 +46,13 @@ def main() -> None: task_sem = asyncio.Semaphore(options.max_concurrency) result_q: asyncio.Queue[RawResult] = asyncio.Queue() + dispatcher = core.setup_httpclient( + options.max_concurrency, + options.httplib, + options.http_timeout, + ) try: - futures = core.dispatch( + futures = dispatcher.dispatch( entries, task_sem, result_q, keymanager, args.tries, options.source_configs, diff --git a/nvchecker/api.py b/nvchecker/api.py index 5fad9c6..a05019c 100644 --- a/nvchecker/api.py +++ b/nvchecker/api.py @@ -1,7 +1,7 @@ # MIT licensed # Copyright (c) 2020 lilydjwg , et al. -from .httpclient import session, TemporaryError # type: ignore +from .httpclient import session, TemporaryError from .util import ( Entry, BaseWorker, RawResult, VersionResult, AsyncCache, KeyManager, GetVersionError, diff --git a/nvchecker/core.py b/nvchecker/core.py index 4634bd2..8bfe81c 100644 --- a/nvchecker/core.py +++ b/nvchecker/core.py @@ -33,6 +33,7 @@ from .util import ( from . import __version__ from .sortversion import sort_version_keys from .ctxvars import tries as ctx_tries +from . import httpclient logger = structlog.get_logger(logger_name=__name__) @@ -140,6 +141,8 @@ class Options(NamedTuple): proxy: Optional[str] keymanager: KeyManager source_configs: Dict[str, Dict[str, Any]] + httplib: Optional[str] + http_timeout: int class FileLoadError(Exception): def __init__(self, kind, exc): @@ -191,59 +194,75 @@ def load_file( max_concurrency = c.get('max_concurrency', 20) proxy = c.get('proxy') + httplib = c.get('httplib', None) + http_timeout = c.get('http_timeout', 20) else: max_concurrency = 20 proxy = None + httplib = None + http_timeout = 20 return cast(Entries, config), Options( ver_files, max_concurrency, proxy, keymanager, - source_configs, + source_configs, httplib, http_timeout, ) -def dispatch( - entries: Entries, - task_sem: asyncio.Semaphore, - result_q: Queue[RawResult], - keymanager: KeyManager, - tries: int, - source_configs: Dict[str, Dict[str, Any]], -) -> List[asyncio.Future]: - mods: Dict[str, Tuple[types.ModuleType, List]] = {} - ctx_tries.set(tries) - root_ctx = contextvars.copy_context() +def setup_httpclient( + max_concurrency: int = 20, + httplib: Optional[str] = None, + http_timeout: int = 20, +) -> Dispatcher: + httplib_ = httplib or httpclient.find_best_httplib() + httpclient.setup( + httplib_, max_concurrency, http_timeout) + return Dispatcher() - for name, entry in entries.items(): - source = entry.get('source', 'none') - if source not in mods: - mod = import_module('nvchecker_source.' + source) - tasks: List[Tuple[str, Entry]] = [] - mods[source] = mod, tasks - config = source_configs.get(source) - if config and getattr(mod, 'configure'): - mod.configure(config) # type: ignore - else: - tasks = mods[source][1] - tasks.append((name, entry)) +class Dispatcher: + def dispatch( + self, + entries: Entries, + task_sem: asyncio.Semaphore, + result_q: Queue[RawResult], + keymanager: KeyManager, + tries: int, + source_configs: Dict[str, Dict[str, Any]], + ) -> List[asyncio.Future]: + mods: Dict[str, Tuple[types.ModuleType, List]] = {} + ctx_tries.set(tries) + root_ctx = contextvars.copy_context() - ret = [] - for mod, tasks in mods.values(): - if hasattr(mod, 'Worker'): - worker_cls = mod.Worker # type: ignore - else: - worker_cls = FunctionWorker + for name, entry in entries.items(): + source = entry.get('source', 'none') + if source not in mods: + mod = import_module('nvchecker_source.' + source) + tasks: List[Tuple[str, Entry]] = [] + mods[source] = mod, tasks + config = source_configs.get(source) + if config and getattr(mod, 'configure'): + mod.configure(config) # type: ignore + else: + tasks = mods[source][1] + tasks.append((name, entry)) - ctx = root_ctx.copy() - worker = ctx.run( - worker_cls, - task_sem, result_q, tasks, keymanager, - ) - if worker_cls is FunctionWorker: - func = mod.get_version # type: ignore - ctx.run(worker.initialize, func) + ret = [] + for mod, tasks in mods.values(): + if hasattr(mod, 'Worker'): + worker_cls = mod.Worker # type: ignore + else: + worker_cls = FunctionWorker - ret.append(ctx.run(worker.run)) + ctx = root_ctx.copy() + worker = ctx.run( + worker_cls, + task_sem, result_q, tasks, keymanager, + ) + if worker_cls is FunctionWorker: + func = mod.get_version # type: ignore + ctx.run(worker.initialize, func) - return ret + ret.append(ctx.run(worker.run)) + + return ret def substitute_version( version: str, conf: Entry, diff --git a/nvchecker/httpclient/__init__.py b/nvchecker/httpclient/__init__.py index 78ae39c..6c5ae14 100644 --- a/nvchecker/httpclient/__init__.py +++ b/nvchecker/httpclient/__init__.py @@ -1,27 +1,55 @@ # MIT licensed -# Copyright (c) 2013-2017 lilydjwg , et al. +# Copyright (c) 2013-2020 lilydjwg , et al. -try: - import tornado, pycurl - # connection reuse, http/2 - which = 'tornado' -except ImportError: - try: - import aiohttp - which = 'aiohttp' - # connection reuse - except ImportError: - try: - import httpx - which = 'httpx' - except ImportError: - import tornado - which = 'tornado' - # fallback - -m = __import__('%s_httpclient' % which, globals(), locals(), level=1) -__all__ = m.__all__ -for x in __all__: - globals()[x] = getattr(m, x) +from typing import Optional from .base import TemporaryError + +class Proxy: + _obj = None + + def set_obj(self, obj): + super().__setattr__('_obj', obj) + + def __getattr__(self, name): + return getattr(self._obj, name) + + def __setattr__(self, name, value): + return setattr(self._obj, name, value) + +session = Proxy() + +def setup( + which: Optional[str] = None, + concurreny: int = 20, + timeout: int = 20, +) -> None: + if which is None: + which = find_best_httplib() + + m = __import__( + '%s_httpclient' % which, globals(), locals(), level=1) + + session.set_obj(m.session) + session.setup(concurreny, timeout) + +def find_best_httplib() -> str: + try: + import tornado, pycurl + # connection reuse, http/2 + which = 'tornado' + except ImportError: + try: + import aiohttp + which = 'aiohttp' + # connection reuse + except ImportError: + try: + import httpx + which = 'httpx' + except ImportError: + import tornado + which = 'tornado' + # fallback + + return which diff --git a/nvchecker/httpclient/aiohttp_httpclient.py b/nvchecker/httpclient/aiohttp_httpclient.py index 3aa8a45..fe538ad 100644 --- a/nvchecker/httpclient/aiohttp_httpclient.py +++ b/nvchecker/httpclient/aiohttp_httpclient.py @@ -16,10 +16,14 @@ logger = structlog.get_logger(logger_name=__name__) connector = aiohttp.TCPConnector(limit=20) class AiohttpSession(BaseSession): - def __init__(self): + def setup( + self, + concurreny: int = 20, + timeout: int = 20, + ) -> None: self.session = aiohttp.ClientSession( - connector = aiohttp.TCPConnector(limit=20), - timeout = aiohttp.ClientTimeout(total=20), + connector = aiohttp.TCPConnector(limit=concurreny), + timeout = aiohttp.ClientTimeout(total=timeout), trust_env = True, ) diff --git a/nvchecker/httpclient/base.py b/nvchecker/httpclient/base.py index 9876a06..8115fd4 100644 --- a/nvchecker/httpclient/base.py +++ b/nvchecker/httpclient/base.py @@ -24,6 +24,13 @@ class Response: class BaseSession: '''The base class for different HTTP backend.''' + def setup( + self, + concurreny: int = 20, + timeout: int = 20, + ) -> None: + pass + async def get(self, *args, **kwargs): '''Shortcut for ``GET`` request.''' return await self.request( diff --git a/nvchecker/httpclient/httpx_httpclient.py b/nvchecker/httpclient/httpx_httpclient.py index 7c78b1d..acfc5e5 100644 --- a/nvchecker/httpclient/httpx_httpclient.py +++ b/nvchecker/httpclient/httpx_httpclient.py @@ -11,8 +11,13 @@ from .base import BaseSession, TemporaryError, Response __all__ = ['session'] class HttpxSession(BaseSession): - def __init__(self): - self.clients = {} + def setup( + self, + concurreny: int = 20, + timeout: int = 20, + ) -> None: + self.clients: Dict[Optional[str], httpx.AsyncClient] = {} + self.timeout = timeout async def request_impl( self, url: str, *, @@ -25,7 +30,7 @@ class HttpxSession(BaseSession): client = self.clients.get(proxy) if not client: client = httpx.AsyncClient( - timeout = httpx.Timeout(20, pool=None), + timeout = httpx.Timeout(self.timeout, pool=None), http2 = True, proxies = {'all://': proxy}, ) diff --git a/nvchecker/httpclient/tornado_httpclient.py b/nvchecker/httpclient/tornado_httpclient.py index 8b51080..fbd77b3 100644 --- a/nvchecker/httpclient/tornado_httpclient.py +++ b/nvchecker/httpclient/tornado_httpclient.py @@ -9,7 +9,6 @@ from tornado.httpclient import AsyncHTTPClient, HTTPRequest try: import pycurl - AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient", max_clients=20) except ImportError: pycurl = None # type: ignore @@ -31,6 +30,20 @@ def try_use_http2(curl): curl.setopt(pycurl.HTTP_VERSION, 4) class TornadoSession(BaseSession): + def setup( + self, + concurreny: int = 20, + timeout: int = 20, + ) -> None: + impl: Optional[str] + if pycurl: + impl = "tornado.curl_httpclient.CurlAsyncHTTPClient" + else: + impl = None + AsyncHTTPClient.configure( + impl, max_clients = concurreny) + self.timeout = timeout + async def request_impl( self, url: str, *, method: str, @@ -42,6 +55,7 @@ class TornadoSession(BaseSession): kwargs: Dict[str, Any] = { 'method': method, 'headers': headers, + 'request_timeout': self.timeout, } if json: diff --git a/nvchecker/slogconf.py b/nvchecker/slogconf.py index fefb86e..fd041f1 100644 --- a/nvchecker/slogconf.py +++ b/nvchecker/slogconf.py @@ -10,7 +10,7 @@ import sys import structlog -from .httpclient.base import TemporaryError +from .httpclient import TemporaryError def _console_msg(event): evt = event['event'] diff --git a/nvchecker/util.py b/nvchecker/util.py index 7c167af..01d47f6 100644 --- a/nvchecker/util.py +++ b/nvchecker/util.py @@ -17,7 +17,7 @@ import abc import toml import structlog -from .httpclient import session # type: ignore +from .httpclient import session from .ctxvars import tries as ctx_tries from .ctxvars import proxy as ctx_proxy from .ctxvars import user_agent as ctx_ua diff --git a/tests/conftest.py b/tests/conftest.py index f02c92c..2f2ba4e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,7 +27,8 @@ async def run( else: keymanager = core.KeyManager(None) - futures = core.dispatch( + dispatcher = core.setup_httpclient() + futures = dispatcher.dispatch( entries, task_sem, result_q, keymanager, 1, {}, )