From bd1ac0962f7ac6472c23300c24622abaaa10fd59 Mon Sep 17 00:00:00 2001 From: lilydjwg Date: Thu, 17 Oct 2019 15:50:02 +0800 Subject: [PATCH] update modules from winterpy --- archrepo2/lib/archpkg.py | 53 ++++++++++++++++++++++--------------- archrepo2/lib/nicelogger.py | 15 ++++++++--- 2 files changed, 43 insertions(+), 25 deletions(-) diff --git a/archrepo2/lib/archpkg.py b/archrepo2/lib/archpkg.py index bb2d79d..f2f9aee 100644 --- a/archrepo2/lib/archpkg.py +++ b/archrepo2/lib/archpkg.py @@ -1,36 +1,45 @@ import os -from collections import defaultdict, namedtuple +from collections import namedtuple import subprocess import re +from typing import Tuple, List, Dict -from pkg_resources import parse_version +from pkg_resources import parse_version as _parse_version +from packaging.version import Version # type: ignore + +def parse_arch_version(v: str) -> Tuple[int, Version]: + if ':' in v: + epoch = int(v.split(':', 1)[0]) + else: + epoch = 0 + return epoch, _parse_version(v) class PkgNameInfo(namedtuple('PkgNameInfo', 'name, version, release, arch')): - def __lt__(self, other): + def __lt__(self, other) -> bool: if self.name != other.name or self.arch != other.arch: return NotImplemented if self.version != other.version: - return parse_version(self.version) < parse_version(other.version) + return parse_arch_version(self.version) < parse_arch_version(other.version) return float(self.release) < float(other.release) - def __gt__(self, other): + def __gt__(self, other) -> bool: # No, try the other side please. return NotImplemented @property - def fullversion(self): + def fullversion(self) -> str: return '%s-%s' % (self.version, self.release) @classmethod - def parseFilename(cls, filename): + def parseFilename(cls, filename: str) -> 'PkgNameInfo': return cls(*trimext(filename, 3).rsplit('-', 3)) -def trimext(name, num=1): +def trimext(name: str, num: int = 1) -> str: for i in range(num): name = os.path.splitext(name)[0] return name -def get_pkgname_with_bash(PKGBUILD): +def get_pkgname_with_bash(PKGBUILD: str) -> List[str]: script = '''\ . '%s' echo ${pkgname[*]}''' % PKGBUILD @@ -44,7 +53,7 @@ echo ${pkgname[*]}''' % PKGBUILD ret, ['bash'], output) return output.split() -def _run_bash(script): +def _run_bash(script: str) -> None: p = subprocess.Popen(['bash'], stdin=subprocess.PIPE) p.communicate(script.encode('latin1')) ret = p.wait() @@ -52,7 +61,7 @@ def _run_bash(script): raise subprocess.CalledProcessError( ret, ['bash']) -def get_aur_pkgbuild_with_bash(name): +def get_aur_pkgbuild_with_bash(name: str) -> None: script = '''\ . /usr/lib/yaourt/util.sh . /usr/lib/yaourt/aur.sh @@ -60,7 +69,7 @@ init_color aur_get_pkgbuild '%s' ''' % name _run_bash(script) -def get_abs_pkgbuild_with_bash(name): +def get_abs_pkgbuild_with_bash(name: str) -> None: script = '''\ . /usr/lib/yaourt/util.sh . /usr/lib/yaourt/abs.sh @@ -71,23 +80,23 @@ RSYNCOPT="$RSYNCOPT -O" abs_get_pkgbuild "$arg" ''' % name _run_bash(script) -pkgfile_pat = re.compile(r'(?:^|/).+-[^-]+-[\d.]+-(?:\w+)\.pkg\.tar\.xz$') +pkgfile_pat = re.compile(r'(?:^|/).+-[^-]+-[\d.]+-(?:\w+)\.pkg\.tar\.(?:xz|zst)$') -def _strip_ver(s): +def _strip_ver(s: str) -> str: return re.sub(r'[<>=].*', '', s) -def get_package_dependencies(name): - out = subprocess.check_output(["package-query", "-Sii", "-f", "%D", name]) - out = out.decode('latin1') +def get_package_dependencies(name: str) -> List[str]: + outb = subprocess.check_output(["package-query", "-Sii", "-f", "%D", name]) + out = outb.decode('latin1') return [_strip_ver(x) for x in out.split() if x != '-'] -def get_package_info(name, local=False): +def get_package_info(name: str, local: bool = False) -> Dict[str, str]: old_lang = os.environ['LANG'] os.environ['LANG'] = 'C' args = '-Qi' if local else '-Si' try: - out = subprocess.check_output(["pacman", args, name]) - out = out.decode('latin1') + outb = subprocess.check_output(["pacman", args, name]) + out = outb.decode('latin1') finally: os.environ['LANG'] = old_lang @@ -104,7 +113,7 @@ def get_package_info(name, local=False): ret[key] += ' ' + l.strip() return ret -def get_package_repository(name): +def get_package_repository(name: str) -> str: try: out = subprocess.check_output(["package-query", "-Sii", "-f", "%r", name]) repo = out.strip().decode('latin1') @@ -112,6 +121,6 @@ def get_package_repository(name): repo = 'local' return repo -def is_official(name): +def is_official(name: str) -> bool: repo = get_package_repository(name) return repo in ('core', 'extra', 'community', 'multilib', 'testing') diff --git a/archrepo2/lib/nicelogger.py b/archrepo2/lib/nicelogger.py index a173563..98603d8 100644 --- a/archrepo2/lib/nicelogger.py +++ b/archrepo2/lib/nicelogger.py @@ -10,7 +10,7 @@ import logging class TornadoLogFormatter(logging.Formatter): def __init__(self, color, *args, **kwargs): - super().__init__(self, *args, **kwargs) + super().__init__(*args, **kwargs) self._color = color if color: import curses @@ -41,13 +41,22 @@ class TornadoLogFormatter(logging.Formatter): record.message = "Bad message (%r): %r" % (e, record.__dict__) record.asctime = time.strftime( "%m-%d %H:%M:%S", self.converter(record.created)) - record.asctime += '.%03d' % ((record.created % 1) * 1000) - prefix = '[%(levelname)1.1s %(asctime)s %(module)s:%(lineno)d]' % \ + prefix = '[%(levelname)1.1s %(asctime)s.%(msecs)03d %(module)s:%(lineno)d]' % \ record.__dict__ if self._color: prefix = (self._colors.get(record.levelno, self._normal) + prefix + self._normal) formatted = prefix + " " + record.message + + formatted += ''.join( + ' %s=%s' % (k, v) for k, v in record.__dict__.items() + if k not in { + 'levelname', 'asctime', 'module', 'lineno', 'args', 'message', + 'filename', 'exc_info', 'exc_text', 'created', 'funcName', + 'processName', 'process', 'msecs', 'relativeCreated', 'thread', + 'threadName', 'name', 'levelno', 'msg', 'pathname', 'stack_info', + }) + if record.exc_info: if not record.exc_text: record.exc_text = self.formatException(record.exc_info)