diff --git a/agithub.py b/agithub.py new file mode 100644 index 0000000..f1f8fbd --- /dev/null +++ b/agithub.py @@ -0,0 +1,202 @@ +from __future__ import annotations + +import datetime +import json +import weakref +import asyncio +import logging +import time +from typing import ( + AsyncGenerator, Tuple, Any, Dict, Optional, List, Union, +) + +from aiohttp.client import ClientResponse +import aiohttputils + +logger = logging.getLogger(__name__) + +JsonDict = Dict[str, Any] +Json = Union[List[JsonDict], JsonDict] + +def parse_datetime(s): + dt = datetime.datetime.strptime(s, '%Y-%m-%dT%H:%M:%SZ') + return dt.replace(tzinfo=datetime.timezone.utc) + +class GitHubError(Exception): + def __init__(self, message, documentation, code): + self.message = message + self.documentation = documentation + self.code = code + +class GitHub(aiohttputils.ClientBase): + baseurl = 'https://api.github.com/' + + def __init__(self, token, session=None): + self.token = f'token {token}' + super().__init__(session = session) + + async def api_request( + self, path: str, method: str = 'get', + data: Optional[JsonDict] = None, **kwargs, + ) -> Tuple[Json, ClientResponse]: + h = kwargs.get('headers', None) + if not h: + h = kwargs['headers'] = {} + h.setdefault('Accept', 'application/vnd.github.v3+json') + h.setdefault('Authorization', self.token) + + if data: + binary_data = json.dumps(data, ensure_ascii=False).encode('utf-8') + if method == 'get': + method = 'post' + h.setdefault('Content-Type', 'application/json') + kwargs['data'] = binary_data + + for _ in range(3): + res = await self.request(path, method=method, **kwargs) + j: JsonDict + if res.status == 204: + j = {} + else: + j = await res.json() + if 'message' in j: + if res.status == 403 and int(res.headers.get('X-RateLimit-Remaining', -1)) == 0: + reset = int(res.headers['X-RateLimit-Reset']) - time.time() + 1 + logger.warn('rate limited; sleeping for %ds: %s', reset, j['message']) + await asyncio.sleep(reset) + continue + raise GitHubError(j['message'], j['documentation_url'], res.status) + return j, res + + raise Exception('unreachable') + + async def get_repo_issues( + self, repo: str, *, state: str = 'open', labels: str = '', + ) -> AsyncGenerator[Issue, None]: + params = {'state': state} + if labels: + params['labels'] = labels + j, r = await self.api_request(f'/repos/{repo}/issues', params = params) + assert isinstance(j, list) + + for x in j: + yield Issue(x, self) + + while 'next' in r.links: + url = str(r.links['next']['url']) + j, r = await self.api_request(url) + assert isinstance(j, list) + for x in j: + yield Issue(x, self) + + async def get_issue(self, repo: str, issue_nr: int) -> 'Issue': + j, _ = await self.api_request(f'/repos/{repo}/issues/{issue_nr}') + assert isinstance(j, dict) + return Issue(j, self) + + async def get_issue_comments( + self, repo: str, issue_nr: int, + ) -> AsyncGenerator[Comment, None]: + j, r = await self.api_request(f'/repos/{repo}/issues/{issue_nr}/comments') + + assert isinstance(j, list) + for x in j: + yield Comment(x, self) + + while 'next' in r.links: + url = str(r.links['next']['url']) + j, r = await self.api_request(url) + assert isinstance(j, list) + for x in j: + yield Comment(x, self) + + async def create_issue( + self, repo: str, title: str, body: Optional[str] = None, + labels: List[str] = [], + ) -> 'Issue': + data: JsonDict = { + 'title': title, + } + if body: + data['body'] = body + if labels: + data['labels'] = labels + + issue, _ = await self.api_request(f'/repos/{repo}/issues', data = data) + assert isinstance(issue, dict) + return Issue(issue, self) + + async def find_login_by_email(self, email: str) -> str: + j, _ = await self.api_request(f'/search/users?q={email}') + assert isinstance(j, dict) + try: + return j['items'][0]['login'] + except IndexError: + raise LookupError(email) + +class Issue: + def __init__(self, data: JsonDict, gh: GitHub) -> None: + self.gh = weakref.proxy(gh) + self._data = data + self.body = data['body'] + self.number = data['number'] + self.title = data['title'] + self.labels = [x['name'] for x in data['labels']] + self.updated_at = parse_datetime(data['updated_at']) + self._api_url = f"{data['repository_url']}/issues/{data['number']}" + self.closed = data['state'] == 'closed' + self.author = data['user']['login'] + self.closed_by = data.get('closed_by') and data['closed_by']['login'] or None + + async def comment(self, comment: str) -> JsonDict: + j, _ = await self.gh.api_request(f'{self._api_url}/comments', data = {'body': comment}) + return j + + async def add_labels(self, labels: List[str]) -> JsonDict: + j, _ = await self.gh.api_request(f'{self._api_url}/labels', data = labels) + return j + + async def assign(self, assignees: List[str]) -> JsonDict: + payload = {'assignees': assignees} + j, _ = await self.gh.api_request(f'{self._api_url}/assignees', data = payload) + return j + + async def close(self) -> None: + data, _ = await self.gh.api_request( + f'{self._api_url}', method = 'patch', data = {'state': 'closed'}) + self._data = data + self.closed = data['state'] == 'closed' + + async def reopen(self) -> None: + data, _ = await self.gh.api_request( + f'{self._api_url}', method = 'patch', data = {'state': 'open'}) + self._data = data + self.closed = data['state'] == 'closed' + + def __repr__(self): + return f'' + +class Comment: + def __init__(self, data: JsonDict, gh: GitHub) -> None: + self.gh = weakref.proxy(gh) + self._update(data) + + def _update(self, data: JsonDict) -> None: + self._data = data + self.author = data['user']['login'] + self.html_url = data['html_url'] + self.url = data['url'] + self.body = data['body'] + + async def delete(self) -> None: + await self.gh.api_request(self.url, method = 'DELETE') + + async def edit(self, body: str) -> None: + data, _ = await self.gh.api_request( + self.url, method = 'PATCH', + data = {'body': body}, + ) + self._update(data) + + def __repr__(self) -> str: + return f'' diff --git a/aiohttputils.py b/aiohttputils.py new file mode 100644 index 0000000..5f9654b --- /dev/null +++ b/aiohttputils.py @@ -0,0 +1,87 @@ +import os +from http.cookiejar import MozillaCookieJar +from urllib.parse import urljoin +from typing import Optional +import asyncio + +import aiohttp +from aiohttp.client import ClientResponse + +class ClientBase: + session = None + userAgent = None + lasturl = None + auto_referer = False + baseurl: Optional[str] = None + cookiefile: Optional[os.PathLike] = None + __our_session: bool = False + + def __init__(self, *, baseurl=None, cookiefile=None, session=None): + if baseurl is not None: + self.baseurl = baseurl + self.session = session + self.cookiefile = cookiefile + + async def async_init(self) -> None: + if not self.session: + s = aiohttp.ClientSession() + self.__our_session = True + self.session = s + + if self.cookiefile: + s.cookies = MozillaCookieJar(self.cookiefile) + if os.path.exists(self.cookiefile): + s.cookies.load() # type: ignore + + def __del__(self): + if self.cookiefile: + self.session.cookies.save() + if self.__our_session: + loop = asyncio.get_event_loop() + closer = self.session.close() + if loop.is_running(): + asyncio.ensure_future(closer) + else: + asyncio.run(closer) + + async def request( + self, url: str, method: Optional[str] = None, **kwargs, + ) -> ClientResponse: + if not self.session: + await self.async_init() + + if self.baseurl: + url = urljoin(self.baseurl, url) + + if self.auto_referer and self.lasturl: + h = kwargs.get('headers', None) + if not h: + h = kwargs['headers'] = {} + h.setdefault('Referer', self.lasturl) + + if self.userAgent: + h = kwargs.get('headers', None) + if not h: + h = kwargs['headers'] = {} + h.setdefault('User-Agent', self.userAgent) + + if method is None: + if 'data' in kwargs: + method = 'post' + else: + method = 'get' + + response = await self.session.request(method, url, **kwargs) # type: ignore + # url may have been changed due to redirection + self.lasturl = str(response.url) + return response + +async def test(): + client = ClientBase(baseurl='https://www.baidu.com/', cookiefile='test') + res = await client.request('/') + res = await client.request('/404') + print(res, client.lasturl) + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(test()) diff --git a/archpkg.py b/archpkg.py new file mode 100644 index 0000000..9510e59 --- /dev/null +++ b/archpkg.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +import os +from collections import namedtuple +import subprocess +import re +from typing import Tuple, List, Dict + +from pkg_resources import parse_version as _parse_version + +def parse_arch_version(v: str) -> Tuple[int, Tuple[str, ...]]: + if ':' in v: + epoch = int(v.split(':', 1)[0]) + else: + epoch = 0 + return epoch, _parse_version(v) + +class PkgNameInfo(namedtuple('PkgNameInfo', 'name, version, release, arch')): + def __lt__(self, other) -> bool: + if self.name != other.name or self.arch != other.arch: + return NotImplemented + if self.version != other.version: + return parse_arch_version(self.version) < parse_arch_version(other.version) + return float(self.release) < float(other.release) + + def __gt__(self, other) -> bool: + # No, try the other side please. + return NotImplemented + + @property + def fullversion(self) -> str: + return '%s-%s' % (self.version, self.release) + + @classmethod + def parseFilename(cls, filename: str) -> 'PkgNameInfo': + return cls(*trimext(filename, 3).rsplit('-', 3)) + +def trimext(name: str, num: int = 1) -> str: + for i in range(num): + name = os.path.splitext(name)[0] + return name + +def get_pkgname_with_bash(PKGBUILD: str) -> List[str]: + script = '''\ +. '%s' +echo ${pkgname[*]}''' % PKGBUILD + # Python 3.4 has 'input' arg for check_output + p = subprocess.Popen( + ['bwrap', '--unshare-all', '--ro-bind', '/', '/', '--tmpfs', '/home', + '--tmpfs', '/run', '--die-with-parent', + '--tmpfs', '/tmp', '--proc', '/proc', '--dev', '/dev', '/bin/bash'], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + ) + output = p.communicate(script.encode())[0].decode() + ret = p.wait() + if ret != 0: + raise subprocess.CalledProcessError( + ret, ['bash'], output) + return output.split() + +pkgfile_pat = re.compile(r'(?:^|/).+-[^-]+-[\d.]+-(?:\w+)\.pkg\.tar\.(?:xz|zst)$') + +def _strip_ver(s: str) -> str: + return re.sub(r'[<>=].*', '', s) + +def get_package_info(name: str, local: bool = False) -> Dict[str, str]: + old_lang = os.environ['LANG'] + os.environ['LANG'] = 'C' + args = '-Qi' if local else '-Si' + try: + outb = subprocess.check_output(["pacman", args, name]) + out = outb.decode('latin1') + finally: + os.environ['LANG'] = old_lang + + ret = {} + for l in out.splitlines(): + if not l: + continue + if l[0] not in ' \t': + key, value = l.split(':', 1) + key = key.strip() + value = value.strip() + ret[key] = value + else: + ret[key] += ' ' + l.strip() + return ret + + diff --git a/htmlutils.py b/htmlutils.py new file mode 100644 index 0000000..47cc3e3 --- /dev/null +++ b/htmlutils.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +import re +import copy +from html.entities import entitydefs + +from lxml import html # type: ignore + +def _br2span_inplace(el): + for br in el.iterchildren(tag='br'): + sp = html.Element('span') + sp.text = '\n' + sp.tail = br.tail + el.replace(br, sp) + +def extractText(el): + el = copy.copy(el) + _br2span_inplace(el) + return el.text_content() + +def iter_text_and_br(el): + if el.text: + yield el.text + for i in el.iterchildren(): + if i.tag == 'br': + yield '\n' + if i.tail: + yield i.tail + +def un_jsescape(s): + '''%xx & %uxxxx -> char, opposite of Javascript's escape()''' + return re.sub( + r'%u([0-9a-fA-F]{4})|%([0-9a-fA-F]{2})', + lambda m: chr(int(m.group(1) or m.group(2), 16)), + s + ) + +def entityunescape(string): + '''HTML entity decode''' + string = re.sub(r'&#[^;]+;', _sharp2uni, string) + string = re.sub(r'&[^;]+;', lambda m: entitydefs[m.group(0)[1:-1]], string) + return string + +def entityunescape_loose(string): + '''HTML entity decode. losse version.''' + string = re.sub(r'&#[0-9a-fA-F]+[;;]?', _sharp2uni, string) + string = re.sub(r'&\w+[;;]?', lambda m: entitydefs[m.group(0)[1:].rstrip(';;')], string) + return string + +def _sharp2uni(m): + '''&#...; ==> unicode''' + s = m.group(0)[2:].rstrip(';;') + if s.startswith('x'): + return chr(int('0'+s, 16)) + else: + return chr(int(s)) + +def parse_document_from_requests(response, session=None, *, encoding=None): + ''' + ``response``: requests ``Response`` object, or URL + ``encoding``: override detected encoding + ''' + if isinstance(response, str): + if session is None: + raise ValueError('URL given but no session') + r = session.get(response) + else: + r = response + if encoding: + r.encoding = encoding + + # fromstring handles bytes well + # https://stackoverflow.com/a/15305248/296473 + parser = html.HTMLParser(encoding=encoding or r.encoding) + doc = html.fromstring(r.content, base_url=r.url, parser=parser) + doc.make_links_absolute() + + return doc + +def parse_html_with_encoding(data, encoding='utf-8'): + parser = html.HTMLParser(encoding=encoding) + return html.fromstring(data, parser=parser) diff --git a/myutils.py b/myutils.py new file mode 100644 index 0000000..59f786d --- /dev/null +++ b/myutils.py @@ -0,0 +1,350 @@ +from __future__ import annotations + +import os, sys +import re +import datetime +import time +from functools import lru_cache, wraps +import logging +import contextlib +import signal +import hashlib +import base64 +import fcntl +from typing import Tuple, Union, Optional, Dict, Any, Generator + +logger = logging.getLogger(__name__) + +def safe_overwrite(fname: str, data: Union[bytes, str], *, + method: str = 'write', mode: str = 'w', encoding: Optional[str] = None) -> None: + # FIXME: directory has no read perm + # FIXME: symlinks and hard links + tmpname = fname + '.tmp' + # if not using "with", write can fail without exception + with open(tmpname, mode, encoding=encoding) as f: + getattr(f, method)(data) + # see also: https://thunk.org/tytso/blog/2009/03/15/dont-fear-the-fsync/ + f.flush() + os.fsync(f.fileno()) + # if the above write failed (because disk is full etc), the old data should be kept + os.rename(tmpname, fname) + +UNITS = 'KMGTPEZY' + +def filesize(size: int) -> str: + amt, unit = filesize_ex(size) + if unit: + return '%.1f%siB' % (amt, unit) + else: + return '%dB' % amt + +def filesize_ex(size: int) -> Tuple[Union[float, int], str]: + left: Union[int, float] = abs(size) + unit = -1 + n = len(UNITS) + while left > 1100 and unit < n: + left = left / 1024 + unit += 1 + if unit == -1: + return size, '' + else: + if size < 0: + left = -left + return left, UNITS[unit] + +class FileSize(int): + def __str__(self) -> str: + return filesize(self).rstrip('iB') + +def parse_filesize(s: str) -> int: + s1 = s.rstrip('iB') + if not s1: + raise ValueError(s) + + last = s1[-1] + try: + idx = UNITS.index(last) + except ValueError: + return int(float(s1)) + + v = float(s1[:-1]) * 1024 ** (idx+1) + return int(v) + +def humantime(t: int) -> str: + '''seconds -> XhYmZs''' + if t < 0: + sign = '-' + t = -t + else: + sign = '' + + m, s = divmod(t, 60) + h, m = divmod(m, 60) + d, h = divmod(h, 24) + ret = '' + if d: + ret += '%dd' % d + if h: + ret += '%dh' % h + if m: + ret += '%dm' % m + if s: + ret += '%ds' % s + if not ret: + ret = '0s' + return sign + ret + +def dehumantime(s: str) -> int: + '''XhYmZs -> seconds''' + m = re.match(r'(?:(?P\d+)d)?(?:(?P\d+)h)?(?:(?P\d+)m)?(?:(?P\d+)s)?$', s) + if m: + return ( + int(m.group('d') or 0) * 3600 * 24 + + int(m.group('h') or 0) * 3600 + + int(m.group('m') or 0) * 60 + + int(m.group('s') or 0) + ) + else: + raise ValueError(s) + +def _timed_read(file, timeout): + from select import select + if select([file], [], [], timeout)[0]: + return file.read(1) + +def getchar(prompt, hidden=False, end='\n', timeout=None): + '''读取一个字符''' + import termios + sys.stdout.write(prompt) + sys.stdout.flush() + fd = sys.stdin.fileno() + ch: Optional[str] + + def _read() -> Optional[str]: + ch: Optional[str] + if timeout is None: + ch = sys.stdin.read(1) + else: + ch = _timed_read(sys.stdin, timeout) + return ch + + if os.isatty(fd): + old = termios.tcgetattr(fd) + new = termios.tcgetattr(fd) + if hidden: + new[3] = new[3] & ~termios.ICANON & ~termios.ECHO + else: + new[3] = new[3] & ~termios.ICANON + new[6][termios.VMIN] = 1 + new[6][termios.VTIME] = 0 + try: + termios.tcsetattr(fd, termios.TCSANOW, new) + termios.tcsendbreak(fd, 0) + ch = _read() + finally: + termios.tcsetattr(fd, termios.TCSAFLUSH, old) + else: + ch = _read() + + sys.stdout.write(end) + return ch + +def loadso(fname): + '''ctypes.CDLL 的 wrapper,从 sys.path 中搜索文件''' + from ctypes import CDLL + + for d in sys.path: + p = os.path.join(d, fname) + if os.path.exists(p): + return CDLL(p) + raise ImportError('%s not found' % fname) + +def dofile(path): + G = {} + with open(path) as f: + exec(f.read(), G) + return G + +def restart_if_failed(func, max_tries, args=(), kwargs={}, secs=60, sleep=None): + ''' + re-run when some exception happens, until `max_tries` in `secs` + ''' + import traceback + from collections import deque + + dq = deque(maxlen=max_tries) + while True: + dq.append(time.time()) + try: + return func(*args, **kwargs) + except Exception: + traceback.print_exc() + if len(dq) == max_tries and time.time() - dq[0] < secs: + break + if sleep is not None: + time.sleep(sleep) + else: + break + +def daterange(start, stop=datetime.date.today(), step=datetime.timedelta(days=1)): + d = start + while d < stop: + yield d + d += step + +@lru_cache() +def findfont(fontname): + from subprocess import check_output + out = check_output(['fc-match', '-v', fontname]).decode() + for l in out.split('\n'): + if l.lstrip().startswith('file:'): + return l.split('"', 2)[1] + +def debugfunc(logger=logging, *, _id=[0]): + def w(func): + @wraps(func) + def wrapper(*args, **kwargs): + myid = _id[0] + _id[0] += 1 + logger.debug('[func %d] %s(%r, %r)', myid, func.__name__, args, kwargs) + ret = func(*args, **kwargs) + logger.debug('[func %d] return: %r', myid, ret) + return ret + return wrapper + return w + +@contextlib.contextmanager +def execution_timeout(timeout): + def timed_out(signum, sigframe): + raise TimeoutError + + delay, interval = signal.setitimer(signal.ITIMER_REAL, timeout, 0) + old_hdl = signal.signal(signal.SIGALRM, timed_out) + now = time.time() + try: + yield + finally: + # inner timeout must be smaller, or the timer event will be delayed + if delay: + elapsed = time.time() - now + delay = max(delay - elapsed, 0.000001) + else: + delay = 0 + signal.setitimer(signal.ITIMER_REAL, delay, interval) + signal.signal(signal.SIGALRM, old_hdl) + +def find_executables(name, path=None): + '''find all matching executables with specific name in path''' + if path is None: + path = os.environ['PATH'].split(os.pathsep) + elif isinstance(path, str): + path = path.split(os.pathsep) + path = [p for p in path if os.path.isdir(p)] + + return [os.path.join(p, f) for p in path for f in os.listdir(p) if f == name] + +# The following three are learnt from makepkg +def user_choose(prompt, timeout=None): + # XXX: hard-coded term characters are ok? + prompt = '\x1b[1;34m::\x1b[1;37m %s\x1b[0m ' % prompt + return getchar(prompt, timeout=timeout) + +def msg(msg): + # XXX: hard-coded term characters are ok? + print('\x1b[1;32m==>\x1b[1;37m %s\x1b[0m' % msg) + +def msg2(msg): + # XXX: hard-coded term characters are ok? + print('\x1b[1;34m ->\x1b[1;37m %s\x1b[0m' % msg) + +def is_internal_ip(ip): + import ipaddress + ip = ipaddress.ip_address(ip) + return ip.is_loopback or ip.is_private or ip.is_reserved or ip.is_link_local + +@contextlib.contextmanager +def at_dir(d: os.PathLike) -> Generator[None, None, None]: + old_dir = os.getcwd() + os.chdir(d) + try: + yield + finally: + os.chdir(old_dir) + +def firstExistentPath(paths): + for p in paths: + if os.path.exists(p): + return p + +def md5sum_of_file(file): + with open(file, 'rb') as f: + m = hashlib.md5() + while True: + d = f.read(81920) + if not d: + break + m.update(d) + return m.hexdigest() + +def md5(s, encoding='utf-8'): + m = hashlib.md5() + m.update(s.encode(encoding)) + return m.hexdigest() + +def base64_encode(s): + if isinstance(s, str): + s = s.encode() + return base64.b64encode(s).decode('ascii') + +def lock_file(path: os.PathLike) -> None: + lock = os.open(path, os.O_WRONLY | os.O_CREAT, 0o600) + try: + fcntl.flock(lock, fcntl.LOCK_EX|fcntl.LOCK_NB) + except BlockingIOError: + logger.warning('Waiting for lock to release...') + fcntl.flock(lock, fcntl.LOCK_EX) + +@contextlib.contextmanager +def file_lock(file: os.PathLike) -> Generator[None, None, None]: + lock = os.open(file, os.O_WRONLY | os.O_CREAT, 0o600) + try: + fcntl.flock(lock, fcntl.LOCK_EX) + yield + finally: + os.close(lock) + +def dict_bytes_to_str(d: Dict[Any, Any]) -> Dict[Any, Any]: + ret = {} + for k, v in d.items(): + if isinstance(k, bytes): + try: + k = k.decode() + except UnicodeDecodeError: + pass + + if isinstance(v, bytes): + try: + v = v.decode() + except UnicodeDecodeError: + pass + elif isinstance(v, dict): + v = dict_bytes_to_str(v) + elif isinstance(v, list): + try: + v = [x.decode() for x in v] + except UnicodeDecodeError: + pass + + ret[k] = v + + return ret + +def xsel(input=None): + import subprocess + + if input is None: + return subprocess.getoutput('uniclip') + else: + p = subprocess.Popen(['uniclip', '-i'], stdin=subprocess.PIPE) + p.communicate(input.encode()) + return p.wait() diff --git a/nicelogger.py b/nicelogger.py new file mode 100644 index 0000000..326728a --- /dev/null +++ b/nicelogger.py @@ -0,0 +1,95 @@ +''' +A Tornado-inspired logging formatter, with displayed time with millisecond accuracy + +FYI: pyftpdlib also has a Tornado-style logger. +''' + +from __future__ import annotations + +import sys +import time +import logging + +class TornadoLogFormatter(logging.Formatter): + def __init__(self, color, *args, **kwargs): + super().__init__(*args, **kwargs) + self._color = color + if color: + import curses + curses.setupterm() + if sys.hexversion < 0x30203f0: + fg_color = str(curses.tigetstr("setaf") or + curses.tigetstr("setf") or "", "ascii") + else: + fg_color = curses.tigetstr("setaf") or curses.tigetstr("setf") or b"" + self._colors = { + logging.DEBUG: str(curses.tparm(fg_color, 4), # Blue + "ascii"), + logging.INFO: str(curses.tparm(fg_color, 2), # Green + "ascii"), + logging.WARNING: str(curses.tparm(fg_color, 3), # Yellow + "ascii"), + logging.ERROR: str(curses.tparm(fg_color, 1), # Red + "ascii"), + logging.CRITICAL: str(curses.tparm(fg_color, 9), # Bright Red + "ascii"), + } + self._normal = str(curses.tigetstr("sgr0"), "ascii") + + def format(self, record): + try: + record.message = record.getMessage() + except Exception as e: + record.message = "Bad message (%r): %r" % (e, record.__dict__) + record.asctime = time.strftime( + "%m-%d %H:%M:%S", self.converter(record.created)) + prefix = '[%(levelname)1.1s %(asctime)s.%(msecs)03d %(module)s:%(lineno)d]' % \ + record.__dict__ + if self._color: + prefix = (self._colors.get(record.levelno, self._normal) + + prefix + self._normal) + formatted = prefix + " " + record.message + + formatted += ''.join( + ' %s=%s' % (k, v) for k, v in record.__dict__.items() + if k not in { + 'levelname', 'asctime', 'module', 'lineno', 'args', 'message', + 'filename', 'exc_info', 'exc_text', 'created', 'funcName', + 'processName', 'process', 'msecs', 'relativeCreated', 'thread', + 'threadName', 'name', 'levelno', 'msg', 'pathname', 'stack_info', + }) + + if record.exc_info: + if not record.exc_text: + record.exc_text = self.formatException(record.exc_info) + if record.exc_text: + formatted = formatted.rstrip() + "\n" + record.exc_text + return formatted.replace("\n", "\n ") + +def enable_pretty_logging(level=logging.DEBUG, handler=None, color=None): + ''' + handler: specify a handler instead of default StreamHandler + color: boolean, force color to be on / off. Default to be on only when + ``handler`` isn't specified and the term supports color + ''' + logger = logging.getLogger() + if handler is None: + h = logging.StreamHandler() + else: + h = handler + if color is None: + color = False + if handler is None and sys.stderr.isatty(): + try: + import curses + curses.setupterm() + if curses.tigetnum("colors") > 0: + color = True + except: + import traceback + traceback.print_exc() + formatter = TornadoLogFormatter(color=color) + h.setLevel(level) + h.setFormatter(formatter) + logger.setLevel(level) + logger.addHandler(h)