setup httpclient from configuration options

closes #76
This commit is contained in:
lilydjwg 2020-09-28 15:09:51 +08:00
parent 254a229401
commit 4f3a900505
13 changed files with 163 additions and 77 deletions

View file

@ -52,8 +52,8 @@ If you want to send an HTTP request, it's preferred to use :meth:
`nvchecker.api.session` object. It will use the auto-selected HTTP backend and
handle the ``proxy`` option automatically.
For details about these objects, see :mod:`the API documentation <nvchecker.api>
`, or take existing source plugins as examples.
For details about these objects, see :mod:`the API documentation <nvchecker.api>`,
or take existing source plugins as examples.
How to write a more powerful plugin
-----------------------------------

View file

@ -134,6 +134,9 @@ proxy
max_concurrency
Max number of concurrent jobs. Default: 20.
http_timeout
Time in seconds to wait for HTTP requests. Default: 20.
keyfile
Specify an ini config file containing key (token) information. This file
should contain a ``keys`` table, mapping key names to key values. See

View file

@ -46,8 +46,13 @@ def main() -> None:
task_sem = asyncio.Semaphore(options.max_concurrency)
result_q: asyncio.Queue[RawResult] = asyncio.Queue()
dispatcher = core.setup_httpclient(
options.max_concurrency,
options.httplib,
options.http_timeout,
)
try:
futures = core.dispatch(
futures = dispatcher.dispatch(
entries, task_sem, result_q,
keymanager, args.tries,
options.source_configs,

View file

@ -1,7 +1,7 @@
# MIT licensed
# Copyright (c) 2020 lilydjwg <lilydjwg@gmail.com>, et al.
from .httpclient import session, TemporaryError # type: ignore
from .httpclient import session, TemporaryError
from .util import (
Entry, BaseWorker, RawResult, VersionResult,
AsyncCache, KeyManager, GetVersionError,

View file

@ -33,6 +33,7 @@ from .util import (
from . import __version__
from .sortversion import sort_version_keys
from .ctxvars import tries as ctx_tries
from . import httpclient
logger = structlog.get_logger(logger_name=__name__)
@ -140,6 +141,8 @@ class Options(NamedTuple):
proxy: Optional[str]
keymanager: KeyManager
source_configs: Dict[str, Dict[str, Any]]
httplib: Optional[str]
http_timeout: int
class FileLoadError(Exception):
def __init__(self, kind, exc):
@ -191,59 +194,75 @@ def load_file(
max_concurrency = c.get('max_concurrency', 20)
proxy = c.get('proxy')
httplib = c.get('httplib', None)
http_timeout = c.get('http_timeout', 20)
else:
max_concurrency = 20
proxy = None
httplib = None
http_timeout = 20
return cast(Entries, config), Options(
ver_files, max_concurrency, proxy, keymanager,
source_configs,
source_configs, httplib, http_timeout,
)
def dispatch(
entries: Entries,
task_sem: asyncio.Semaphore,
result_q: Queue[RawResult],
keymanager: KeyManager,
tries: int,
source_configs: Dict[str, Dict[str, Any]],
) -> List[asyncio.Future]:
mods: Dict[str, Tuple[types.ModuleType, List]] = {}
ctx_tries.set(tries)
root_ctx = contextvars.copy_context()
def setup_httpclient(
max_concurrency: int = 20,
httplib: Optional[str] = None,
http_timeout: int = 20,
) -> Dispatcher:
httplib_ = httplib or httpclient.find_best_httplib()
httpclient.setup(
httplib_, max_concurrency, http_timeout)
return Dispatcher()
for name, entry in entries.items():
source = entry.get('source', 'none')
if source not in mods:
mod = import_module('nvchecker_source.' + source)
tasks: List[Tuple[str, Entry]] = []
mods[source] = mod, tasks
config = source_configs.get(source)
if config and getattr(mod, 'configure'):
mod.configure(config) # type: ignore
else:
tasks = mods[source][1]
tasks.append((name, entry))
class Dispatcher:
def dispatch(
self,
entries: Entries,
task_sem: asyncio.Semaphore,
result_q: Queue[RawResult],
keymanager: KeyManager,
tries: int,
source_configs: Dict[str, Dict[str, Any]],
) -> List[asyncio.Future]:
mods: Dict[str, Tuple[types.ModuleType, List]] = {}
ctx_tries.set(tries)
root_ctx = contextvars.copy_context()
ret = []
for mod, tasks in mods.values():
if hasattr(mod, 'Worker'):
worker_cls = mod.Worker # type: ignore
else:
worker_cls = FunctionWorker
for name, entry in entries.items():
source = entry.get('source', 'none')
if source not in mods:
mod = import_module('nvchecker_source.' + source)
tasks: List[Tuple[str, Entry]] = []
mods[source] = mod, tasks
config = source_configs.get(source)
if config and getattr(mod, 'configure'):
mod.configure(config) # type: ignore
else:
tasks = mods[source][1]
tasks.append((name, entry))
ctx = root_ctx.copy()
worker = ctx.run(
worker_cls,
task_sem, result_q, tasks, keymanager,
)
if worker_cls is FunctionWorker:
func = mod.get_version # type: ignore
ctx.run(worker.initialize, func)
ret = []
for mod, tasks in mods.values():
if hasattr(mod, 'Worker'):
worker_cls = mod.Worker # type: ignore
else:
worker_cls = FunctionWorker
ret.append(ctx.run(worker.run))
ctx = root_ctx.copy()
worker = ctx.run(
worker_cls,
task_sem, result_q, tasks, keymanager,
)
if worker_cls is FunctionWorker:
func = mod.get_version # type: ignore
ctx.run(worker.initialize, func)
return ret
ret.append(ctx.run(worker.run))
return ret
def substitute_version(
version: str, conf: Entry,

View file

@ -1,27 +1,55 @@
# MIT licensed
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
try:
import tornado, pycurl
# connection reuse, http/2
which = 'tornado'
except ImportError:
try:
import aiohttp
which = 'aiohttp'
# connection reuse
except ImportError:
try:
import httpx
which = 'httpx'
except ImportError:
import tornado
which = 'tornado'
# fallback
m = __import__('%s_httpclient' % which, globals(), locals(), level=1)
__all__ = m.__all__
for x in __all__:
globals()[x] = getattr(m, x)
from typing import Optional
from .base import TemporaryError
class Proxy:
_obj = None
def set_obj(self, obj):
super().__setattr__('_obj', obj)
def __getattr__(self, name):
return getattr(self._obj, name)
def __setattr__(self, name, value):
return setattr(self._obj, name, value)
session = Proxy()
def setup(
which: Optional[str] = None,
concurreny: int = 20,
timeout: int = 20,
) -> None:
if which is None:
which = find_best_httplib()
m = __import__(
'%s_httpclient' % which, globals(), locals(), level=1)
session.set_obj(m.session)
session.setup(concurreny, timeout)
def find_best_httplib() -> str:
try:
import tornado, pycurl
# connection reuse, http/2
which = 'tornado'
except ImportError:
try:
import aiohttp
which = 'aiohttp'
# connection reuse
except ImportError:
try:
import httpx
which = 'httpx'
except ImportError:
import tornado
which = 'tornado'
# fallback
return which

View file

@ -16,10 +16,14 @@ logger = structlog.get_logger(logger_name=__name__)
connector = aiohttp.TCPConnector(limit=20)
class AiohttpSession(BaseSession):
def __init__(self):
def setup(
self,
concurreny: int = 20,
timeout: int = 20,
) -> None:
self.session = aiohttp.ClientSession(
connector = aiohttp.TCPConnector(limit=20),
timeout = aiohttp.ClientTimeout(total=20),
connector = aiohttp.TCPConnector(limit=concurreny),
timeout = aiohttp.ClientTimeout(total=timeout),
trust_env = True,
)

View file

@ -24,6 +24,13 @@ class Response:
class BaseSession:
'''The base class for different HTTP backend.'''
def setup(
self,
concurreny: int = 20,
timeout: int = 20,
) -> None:
pass
async def get(self, *args, **kwargs):
'''Shortcut for ``GET`` request.'''
return await self.request(

View file

@ -11,8 +11,13 @@ from .base import BaseSession, TemporaryError, Response
__all__ = ['session']
class HttpxSession(BaseSession):
def __init__(self):
self.clients = {}
def setup(
self,
concurreny: int = 20,
timeout: int = 20,
) -> None:
self.clients: Dict[Optional[str], httpx.AsyncClient] = {}
self.timeout = timeout
async def request_impl(
self, url: str, *,
@ -25,7 +30,7 @@ class HttpxSession(BaseSession):
client = self.clients.get(proxy)
if not client:
client = httpx.AsyncClient(
timeout = httpx.Timeout(20, pool=None),
timeout = httpx.Timeout(self.timeout, pool=None),
http2 = True,
proxies = {'all://': proxy},
)

View file

@ -9,7 +9,6 @@ from tornado.httpclient import AsyncHTTPClient, HTTPRequest
try:
import pycurl
AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient", max_clients=20)
except ImportError:
pycurl = None # type: ignore
@ -31,6 +30,20 @@ def try_use_http2(curl):
curl.setopt(pycurl.HTTP_VERSION, 4)
class TornadoSession(BaseSession):
def setup(
self,
concurreny: int = 20,
timeout: int = 20,
) -> None:
impl: Optional[str]
if pycurl:
impl = "tornado.curl_httpclient.CurlAsyncHTTPClient"
else:
impl = None
AsyncHTTPClient.configure(
impl, max_clients = concurreny)
self.timeout = timeout
async def request_impl(
self, url: str, *,
method: str,
@ -42,6 +55,7 @@ class TornadoSession(BaseSession):
kwargs: Dict[str, Any] = {
'method': method,
'headers': headers,
'request_timeout': self.timeout,
}
if json:

View file

@ -10,7 +10,7 @@ import sys
import structlog
from .httpclient.base import TemporaryError
from .httpclient import TemporaryError
def _console_msg(event):
evt = event['event']

View file

@ -17,7 +17,7 @@ import abc
import toml
import structlog
from .httpclient import session # type: ignore
from .httpclient import session
from .ctxvars import tries as ctx_tries
from .ctxvars import proxy as ctx_proxy
from .ctxvars import user_agent as ctx_ua

View file

@ -27,7 +27,8 @@ async def run(
else:
keymanager = core.KeyManager(None)
futures = core.dispatch(
dispatcher = core.setup_httpclient()
futures = dispatcher.dispatch(
entries, task_sem, result_q,
keymanager, 1, {},
)