This commit is contained in:
lilydjwg 2020-08-13 20:01:02 +08:00
parent 435edf8589
commit b76bfb5606
5 changed files with 36 additions and 17 deletions

View file

@ -168,7 +168,7 @@ newver
proxy proxy
The HTTP proxy to use. The format is ``proto://host:port``, e.g. ``http://localhost:8087``. The HTTP proxy to use. The format is ``proto://host:port``, e.g. ``http://localhost:8087``.
max_concurrent max_concurrency
Max number of concurrent jobs. Default: 20. Max number of concurrent jobs. Default: 20.
keyfile keyfile

View file

@ -34,7 +34,7 @@ def main() -> None:
file = args.file file = args.file
entries, options = core.load_file(file) entries, options = core.load_file(file)
token_q = core.token_queue(options.max_concurrent) token_q = core.token_queue(options.max_concurrency)
result_q: asyncio.Queue[RawResult] = asyncio.Queue() result_q: asyncio.Queue[RawResult] = asyncio.Queue()
try: try:
futures = core.dispatch( futures = core.dispatch(

View file

@ -26,7 +26,7 @@ from .lib import nicelogger
from . import slogconf from . import slogconf
from .util import ( from .util import (
Entry, Entries, KeyManager, RawResult, Result, VersData, Entry, Entries, KeyManager, RawResult, Result, VersData,
FunctionWorker, FunctionWorker, GetVersionError,
) )
from . import __version__ from . import __version__
from .sortversion import sort_version_keys from .sortversion import sort_version_keys
@ -128,7 +128,7 @@ def write_verfile(file: Path, versions: VersData) -> None:
class Options(NamedTuple): class Options(NamedTuple):
ver_files: Optional[Tuple[Path, Path]] ver_files: Optional[Tuple[Path, Path]]
max_concurrent: int max_concurrency: int
keymanager: KeyManager keymanager: KeyManager
def load_file( def load_file(
@ -156,16 +156,16 @@ def load_file(
os.path.expanduser(c.get('keyfile'))) os.path.expanduser(c.get('keyfile')))
keyfile = d / keyfile_s keyfile = d / keyfile_s
max_concurrent = c.getint( max_concurrency = c.get(
'max_concurrent', 20) 'max_concurrency', 20)
keymanager = KeyManager(keyfile) keymanager = KeyManager(keyfile)
else: else:
max_concurrent = 20 max_concurrency = 20
keymanager = KeyManager(None) keymanager = KeyManager(None)
return cast(Entries, config), Options( return cast(Entries, config), Options(
ver_files, max_concurrent, keymanager) ver_files, max_concurrency, keymanager)
def token_queue(maxsize: int) -> Queue[bool]: def token_queue(maxsize: int) -> Queue[bool]:
token_q: Queue[bool] = Queue(maxsize=maxsize) token_q: Queue[bool] = Queue(maxsize=maxsize)
@ -270,7 +270,12 @@ def _process_result(r: RawResult) -> Optional[Result]:
conf = r.conf conf = r.conf
name = r.name name = r.name
if isinstance(version, Exception): if isinstance(version, GetVersionError):
kw = version.kwargs
kw['name'] = name
logger.error(version.msg, **kw)
return None
elif isinstance(version, Exception):
logger.error('unexpected error happened', logger.error('unexpected error happened',
name=r.name, exc_info=r.version) name=r.name, exc_info=r.version)
return None return None

View file

@ -8,12 +8,14 @@ from asyncio import Queue
import contextlib import contextlib
from typing import ( from typing import (
Dict, Optional, List, AsyncGenerator, NamedTuple, Union, Dict, Optional, List, AsyncGenerator, NamedTuple, Union,
Any, Tuple, Coroutine, Callable, Any, Tuple, Callable, TYPE_CHECKING,
TYPE_CHECKING,
) )
from pathlib import Path from pathlib import Path
import toml import toml
import structlog
logger = structlog.get_logger(logger_name=__name__)
Entry = Dict[str, Any] Entry = Dict[str, Any]
Entries = Dict[str, Entry] Entries = Dict[str, Entry]
@ -62,10 +64,12 @@ class BaseWorker:
@contextlib.asynccontextmanager @contextlib.asynccontextmanager
async def acquire_token(self) -> AsyncGenerator[None, None]: async def acquire_token(self) -> AsyncGenerator[None, None]:
token = await self.token_q.get() token = await self.token_q.get()
logger.debug('got token')
try: try:
yield yield
finally: finally:
await self.token_q.put(token) await self.token_q.put(token)
logger.debug('return token')
if TYPE_CHECKING: if TYPE_CHECKING:
from typing_extensions import Protocol from typing_extensions import Protocol
@ -155,3 +159,8 @@ class FunctionWorker(BaseWorker):
version = await fu version = await fu
self.cache[key] = version self.cache[key] = version
return version return version
class GetVersionError(Exception):
def __init__(self, msg: str, **kwargs: Any) -> None:
self.msg = msg
self.kwargs = kwargs

View file

@ -5,6 +5,8 @@ import asyncio
import structlog import structlog
from nvchecker.util import GetVersionError
logger = structlog.get_logger(logger_name=__name__) logger = structlog.get_logger(logger_name=__name__)
def cacher(name, conf): def cacher(name, conf):
@ -12,6 +14,7 @@ def cacher(name, conf):
async def get_version(name, conf, *, keymanager=None): async def get_version(name, conf, *, keymanager=None):
cmd = conf['cmd'] cmd = conf['cmd']
logger.debug('running cmd', name=name, cmd=cmd)
p = await asyncio.create_subprocess_shell( p = await asyncio.create_subprocess_shell(
cmd, cmd,
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
@ -22,12 +25,14 @@ async def get_version(name, conf, *, keymanager=None):
output = output.strip().decode('latin1') output = output.strip().decode('latin1')
error = error.strip().decode(errors='replace') error = error.strip().decode(errors='replace')
if p.returncode != 0: if p.returncode != 0:
logger.error('command exited with error', raise GetVersionError(
cmd=cmd, error=error, 'command exited with error',
name=name, returncode=p.returncode) cmd=cmd, error=error,
name=name, returncode=p.returncode)
elif not output: elif not output:
logger.error('command exited without output', raise GetVersionError(
cmd=cmd, error=error, 'command exited without output',
name=name, returncode=p.returncode) cmd=cmd, error=error,
name=name, returncode=p.returncode)
else: else:
return output return output