This commit is contained in:
BioArchLinuxBot 2022-05-12 01:48:42 +00:00
parent 14db8014ed
commit 3fce56a6a9
6 changed files with 905 additions and 0 deletions

202
agithub.py Normal file
View file

@ -0,0 +1,202 @@
from __future__ import annotations
import datetime
import json
import weakref
import asyncio
import logging
import time
from typing import (
AsyncGenerator, Tuple, Any, Dict, Optional, List, Union,
)
from aiohttp.client import ClientResponse
import aiohttputils
logger = logging.getLogger(__name__)
JsonDict = Dict[str, Any]
Json = Union[List[JsonDict], JsonDict]
def parse_datetime(s):
dt = datetime.datetime.strptime(s, '%Y-%m-%dT%H:%M:%SZ')
return dt.replace(tzinfo=datetime.timezone.utc)
class GitHubError(Exception):
def __init__(self, message, documentation, code):
self.message = message
self.documentation = documentation
self.code = code
class GitHub(aiohttputils.ClientBase):
baseurl = 'https://api.github.com/'
def __init__(self, token, session=None):
self.token = f'token {token}'
super().__init__(session = session)
async def api_request(
self, path: str, method: str = 'get',
data: Optional[JsonDict] = None, **kwargs,
) -> Tuple[Json, ClientResponse]:
h = kwargs.get('headers', None)
if not h:
h = kwargs['headers'] = {}
h.setdefault('Accept', 'application/vnd.github.v3+json')
h.setdefault('Authorization', self.token)
if data:
binary_data = json.dumps(data, ensure_ascii=False).encode('utf-8')
if method == 'get':
method = 'post'
h.setdefault('Content-Type', 'application/json')
kwargs['data'] = binary_data
for _ in range(3):
res = await self.request(path, method=method, **kwargs)
j: JsonDict
if res.status == 204:
j = {}
else:
j = await res.json()
if 'message' in j:
if res.status == 403 and int(res.headers.get('X-RateLimit-Remaining', -1)) == 0:
reset = int(res.headers['X-RateLimit-Reset']) - time.time() + 1
logger.warn('rate limited; sleeping for %ds: %s', reset, j['message'])
await asyncio.sleep(reset)
continue
raise GitHubError(j['message'], j['documentation_url'], res.status)
return j, res
raise Exception('unreachable')
async def get_repo_issues(
self, repo: str, *, state: str = 'open', labels: str = '',
) -> AsyncGenerator[Issue, None]:
params = {'state': state}
if labels:
params['labels'] = labels
j, r = await self.api_request(f'/repos/{repo}/issues', params = params)
assert isinstance(j, list)
for x in j:
yield Issue(x, self)
while 'next' in r.links:
url = str(r.links['next']['url'])
j, r = await self.api_request(url)
assert isinstance(j, list)
for x in j:
yield Issue(x, self)
async def get_issue(self, repo: str, issue_nr: int) -> 'Issue':
j, _ = await self.api_request(f'/repos/{repo}/issues/{issue_nr}')
assert isinstance(j, dict)
return Issue(j, self)
async def get_issue_comments(
self, repo: str, issue_nr: int,
) -> AsyncGenerator[Comment, None]:
j, r = await self.api_request(f'/repos/{repo}/issues/{issue_nr}/comments')
assert isinstance(j, list)
for x in j:
yield Comment(x, self)
while 'next' in r.links:
url = str(r.links['next']['url'])
j, r = await self.api_request(url)
assert isinstance(j, list)
for x in j:
yield Comment(x, self)
async def create_issue(
self, repo: str, title: str, body: Optional[str] = None,
labels: List[str] = [],
) -> 'Issue':
data: JsonDict = {
'title': title,
}
if body:
data['body'] = body
if labels:
data['labels'] = labels
issue, _ = await self.api_request(f'/repos/{repo}/issues', data = data)
assert isinstance(issue, dict)
return Issue(issue, self)
async def find_login_by_email(self, email: str) -> str:
j, _ = await self.api_request(f'/search/users?q={email}')
assert isinstance(j, dict)
try:
return j['items'][0]['login']
except IndexError:
raise LookupError(email)
class Issue:
def __init__(self, data: JsonDict, gh: GitHub) -> None:
self.gh = weakref.proxy(gh)
self._data = data
self.body = data['body']
self.number = data['number']
self.title = data['title']
self.labels = [x['name'] for x in data['labels']]
self.updated_at = parse_datetime(data['updated_at'])
self._api_url = f"{data['repository_url']}/issues/{data['number']}"
self.closed = data['state'] == 'closed'
self.author = data['user']['login']
self.closed_by = data.get('closed_by') and data['closed_by']['login'] or None
async def comment(self, comment: str) -> JsonDict:
j, _ = await self.gh.api_request(f'{self._api_url}/comments', data = {'body': comment})
return j
async def add_labels(self, labels: List[str]) -> JsonDict:
j, _ = await self.gh.api_request(f'{self._api_url}/labels', data = labels)
return j
async def assign(self, assignees: List[str]) -> JsonDict:
payload = {'assignees': assignees}
j, _ = await self.gh.api_request(f'{self._api_url}/assignees', data = payload)
return j
async def close(self) -> None:
data, _ = await self.gh.api_request(
f'{self._api_url}', method = 'patch', data = {'state': 'closed'})
self._data = data
self.closed = data['state'] == 'closed'
async def reopen(self) -> None:
data, _ = await self.gh.api_request(
f'{self._api_url}', method = 'patch', data = {'state': 'open'})
self._data = data
self.closed = data['state'] == 'closed'
def __repr__(self):
return f'<Issue {self.number}: {self.title!r}>'
class Comment:
def __init__(self, data: JsonDict, gh: GitHub) -> None:
self.gh = weakref.proxy(gh)
self._update(data)
def _update(self, data: JsonDict) -> None:
self._data = data
self.author = data['user']['login']
self.html_url = data['html_url']
self.url = data['url']
self.body = data['body']
async def delete(self) -> None:
await self.gh.api_request(self.url, method = 'DELETE')
async def edit(self, body: str) -> None:
data, _ = await self.gh.api_request(
self.url, method = 'PATCH',
data = {'body': body},
)
self._update(data)
def __repr__(self) -> str:
return f'<Comment by {self.author}: {self.html_url}>'

87
aiohttputils.py Normal file
View file

@ -0,0 +1,87 @@
import os
from http.cookiejar import MozillaCookieJar
from urllib.parse import urljoin
from typing import Optional
import asyncio
import aiohttp
from aiohttp.client import ClientResponse
class ClientBase:
session = None
userAgent = None
lasturl = None
auto_referer = False
baseurl: Optional[str] = None
cookiefile: Optional[os.PathLike] = None
__our_session: bool = False
def __init__(self, *, baseurl=None, cookiefile=None, session=None):
if baseurl is not None:
self.baseurl = baseurl
self.session = session
self.cookiefile = cookiefile
async def async_init(self) -> None:
if not self.session:
s = aiohttp.ClientSession()
self.__our_session = True
self.session = s
if self.cookiefile:
s.cookies = MozillaCookieJar(self.cookiefile)
if os.path.exists(self.cookiefile):
s.cookies.load() # type: ignore
def __del__(self):
if self.cookiefile:
self.session.cookies.save()
if self.__our_session:
loop = asyncio.get_event_loop()
closer = self.session.close()
if loop.is_running():
asyncio.ensure_future(closer)
else:
asyncio.run(closer)
async def request(
self, url: str, method: Optional[str] = None, **kwargs,
) -> ClientResponse:
if not self.session:
await self.async_init()
if self.baseurl:
url = urljoin(self.baseurl, url)
if self.auto_referer and self.lasturl:
h = kwargs.get('headers', None)
if not h:
h = kwargs['headers'] = {}
h.setdefault('Referer', self.lasturl)
if self.userAgent:
h = kwargs.get('headers', None)
if not h:
h = kwargs['headers'] = {}
h.setdefault('User-Agent', self.userAgent)
if method is None:
if 'data' in kwargs:
method = 'post'
else:
method = 'get'
response = await self.session.request(method, url, **kwargs) # type: ignore
# url may have been changed due to redirection
self.lasturl = str(response.url)
return response
async def test():
client = ClientBase(baseurl='https://www.baidu.com/', cookiefile='test')
res = await client.request('/')
res = await client.request('/404')
print(res, client.lasturl)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(test())

89
archpkg.py Normal file
View file

@ -0,0 +1,89 @@
from __future__ import annotations
import os
from collections import namedtuple
import subprocess
import re
from typing import Tuple, List, Dict
from pkg_resources import parse_version as _parse_version
def parse_arch_version(v: str) -> Tuple[int, Tuple[str, ...]]:
if ':' in v:
epoch = int(v.split(':', 1)[0])
else:
epoch = 0
return epoch, _parse_version(v)
class PkgNameInfo(namedtuple('PkgNameInfo', 'name, version, release, arch')):
def __lt__(self, other) -> bool:
if self.name != other.name or self.arch != other.arch:
return NotImplemented
if self.version != other.version:
return parse_arch_version(self.version) < parse_arch_version(other.version)
return float(self.release) < float(other.release)
def __gt__(self, other) -> bool:
# No, try the other side please.
return NotImplemented
@property
def fullversion(self) -> str:
return '%s-%s' % (self.version, self.release)
@classmethod
def parseFilename(cls, filename: str) -> 'PkgNameInfo':
return cls(*trimext(filename, 3).rsplit('-', 3))
def trimext(name: str, num: int = 1) -> str:
for i in range(num):
name = os.path.splitext(name)[0]
return name
def get_pkgname_with_bash(PKGBUILD: str) -> List[str]:
script = '''\
. '%s'
echo ${pkgname[*]}''' % PKGBUILD
# Python 3.4 has 'input' arg for check_output
p = subprocess.Popen(
['bwrap', '--unshare-all', '--ro-bind', '/', '/', '--tmpfs', '/home',
'--tmpfs', '/run', '--die-with-parent',
'--tmpfs', '/tmp', '--proc', '/proc', '--dev', '/dev', '/bin/bash'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
)
output = p.communicate(script.encode())[0].decode()
ret = p.wait()
if ret != 0:
raise subprocess.CalledProcessError(
ret, ['bash'], output)
return output.split()
pkgfile_pat = re.compile(r'(?:^|/).+-[^-]+-[\d.]+-(?:\w+)\.pkg\.tar\.(?:xz|zst)$')
def _strip_ver(s: str) -> str:
return re.sub(r'[<>=].*', '', s)
def get_package_info(name: str, local: bool = False) -> Dict[str, str]:
old_lang = os.environ['LANG']
os.environ['LANG'] = 'C'
args = '-Qi' if local else '-Si'
try:
outb = subprocess.check_output(["pacman", args, name])
out = outb.decode('latin1')
finally:
os.environ['LANG'] = old_lang
ret = {}
for l in out.splitlines():
if not l:
continue
if l[0] not in ' \t':
key, value = l.split(':', 1)
key = key.strip()
value = value.strip()
ret[key] = value
else:
ret[key] += ' ' + l.strip()
return ret

82
htmlutils.py Normal file
View file

@ -0,0 +1,82 @@
from __future__ import annotations
import re
import copy
from html.entities import entitydefs
from lxml import html # type: ignore
def _br2span_inplace(el):
for br in el.iterchildren(tag='br'):
sp = html.Element('span')
sp.text = '\n'
sp.tail = br.tail
el.replace(br, sp)
def extractText(el):
el = copy.copy(el)
_br2span_inplace(el)
return el.text_content()
def iter_text_and_br(el):
if el.text:
yield el.text
for i in el.iterchildren():
if i.tag == 'br':
yield '\n'
if i.tail:
yield i.tail
def un_jsescape(s):
'''%xx & %uxxxx -> char, opposite of Javascript's escape()'''
return re.sub(
r'%u([0-9a-fA-F]{4})|%([0-9a-fA-F]{2})',
lambda m: chr(int(m.group(1) or m.group(2), 16)),
s
)
def entityunescape(string):
'''HTML entity decode'''
string = re.sub(r'&#[^;]+;', _sharp2uni, string)
string = re.sub(r'&[^;]+;', lambda m: entitydefs[m.group(0)[1:-1]], string)
return string
def entityunescape_loose(string):
'''HTML entity decode. losse version.'''
string = re.sub(r'&#[0-9a-fA-F]+[;]?', _sharp2uni, string)
string = re.sub(r'&\w+[;]?', lambda m: entitydefs[m.group(0)[1:].rstrip(';')], string)
return string
def _sharp2uni(m):
'''&#...; ==> unicode'''
s = m.group(0)[2:].rstrip(';')
if s.startswith('x'):
return chr(int('0'+s, 16))
else:
return chr(int(s))
def parse_document_from_requests(response, session=None, *, encoding=None):
'''
``response``: requests ``Response`` object, or URL
``encoding``: override detected encoding
'''
if isinstance(response, str):
if session is None:
raise ValueError('URL given but no session')
r = session.get(response)
else:
r = response
if encoding:
r.encoding = encoding
# fromstring handles bytes well
# https://stackoverflow.com/a/15305248/296473
parser = html.HTMLParser(encoding=encoding or r.encoding)
doc = html.fromstring(r.content, base_url=r.url, parser=parser)
doc.make_links_absolute()
return doc
def parse_html_with_encoding(data, encoding='utf-8'):
parser = html.HTMLParser(encoding=encoding)
return html.fromstring(data, parser=parser)

350
myutils.py Normal file
View file

@ -0,0 +1,350 @@
from __future__ import annotations
import os, sys
import re
import datetime
import time
from functools import lru_cache, wraps
import logging
import contextlib
import signal
import hashlib
import base64
import fcntl
from typing import Tuple, Union, Optional, Dict, Any, Generator
logger = logging.getLogger(__name__)
def safe_overwrite(fname: str, data: Union[bytes, str], *,
method: str = 'write', mode: str = 'w', encoding: Optional[str] = None) -> None:
# FIXME: directory has no read perm
# FIXME: symlinks and hard links
tmpname = fname + '.tmp'
# if not using "with", write can fail without exception
with open(tmpname, mode, encoding=encoding) as f:
getattr(f, method)(data)
# see also: https://thunk.org/tytso/blog/2009/03/15/dont-fear-the-fsync/
f.flush()
os.fsync(f.fileno())
# if the above write failed (because disk is full etc), the old data should be kept
os.rename(tmpname, fname)
UNITS = 'KMGTPEZY'
def filesize(size: int) -> str:
amt, unit = filesize_ex(size)
if unit:
return '%.1f%siB' % (amt, unit)
else:
return '%dB' % amt
def filesize_ex(size: int) -> Tuple[Union[float, int], str]:
left: Union[int, float] = abs(size)
unit = -1
n = len(UNITS)
while left > 1100 and unit < n:
left = left / 1024
unit += 1
if unit == -1:
return size, ''
else:
if size < 0:
left = -left
return left, UNITS[unit]
class FileSize(int):
def __str__(self) -> str:
return filesize(self).rstrip('iB')
def parse_filesize(s: str) -> int:
s1 = s.rstrip('iB')
if not s1:
raise ValueError(s)
last = s1[-1]
try:
idx = UNITS.index(last)
except ValueError:
return int(float(s1))
v = float(s1[:-1]) * 1024 ** (idx+1)
return int(v)
def humantime(t: int) -> str:
'''seconds -> XhYmZs'''
if t < 0:
sign = '-'
t = -t
else:
sign = ''
m, s = divmod(t, 60)
h, m = divmod(m, 60)
d, h = divmod(h, 24)
ret = ''
if d:
ret += '%dd' % d
if h:
ret += '%dh' % h
if m:
ret += '%dm' % m
if s:
ret += '%ds' % s
if not ret:
ret = '0s'
return sign + ret
def dehumantime(s: str) -> int:
'''XhYmZs -> seconds'''
m = re.match(r'(?:(?P<d>\d+)d)?(?:(?P<h>\d+)h)?(?:(?P<m>\d+)m)?(?:(?P<s>\d+)s)?$', s)
if m:
return (
int(m.group('d') or 0) * 3600 * 24 +
int(m.group('h') or 0) * 3600 +
int(m.group('m') or 0) * 60 +
int(m.group('s') or 0)
)
else:
raise ValueError(s)
def _timed_read(file, timeout):
from select import select
if select([file], [], [], timeout)[0]:
return file.read(1)
def getchar(prompt, hidden=False, end='\n', timeout=None):
'''读取一个字符'''
import termios
sys.stdout.write(prompt)
sys.stdout.flush()
fd = sys.stdin.fileno()
ch: Optional[str]
def _read() -> Optional[str]:
ch: Optional[str]
if timeout is None:
ch = sys.stdin.read(1)
else:
ch = _timed_read(sys.stdin, timeout)
return ch
if os.isatty(fd):
old = termios.tcgetattr(fd)
new = termios.tcgetattr(fd)
if hidden:
new[3] = new[3] & ~termios.ICANON & ~termios.ECHO
else:
new[3] = new[3] & ~termios.ICANON
new[6][termios.VMIN] = 1
new[6][termios.VTIME] = 0
try:
termios.tcsetattr(fd, termios.TCSANOW, new)
termios.tcsendbreak(fd, 0)
ch = _read()
finally:
termios.tcsetattr(fd, termios.TCSAFLUSH, old)
else:
ch = _read()
sys.stdout.write(end)
return ch
def loadso(fname):
'''ctypes.CDLL 的 wrapper从 sys.path 中搜索文件'''
from ctypes import CDLL
for d in sys.path:
p = os.path.join(d, fname)
if os.path.exists(p):
return CDLL(p)
raise ImportError('%s not found' % fname)
def dofile(path):
G = {}
with open(path) as f:
exec(f.read(), G)
return G
def restart_if_failed(func, max_tries, args=(), kwargs={}, secs=60, sleep=None):
'''
re-run when some exception happens, until `max_tries` in `secs`
'''
import traceback
from collections import deque
dq = deque(maxlen=max_tries)
while True:
dq.append(time.time())
try:
return func(*args, **kwargs)
except Exception:
traceback.print_exc()
if len(dq) == max_tries and time.time() - dq[0] < secs:
break
if sleep is not None:
time.sleep(sleep)
else:
break
def daterange(start, stop=datetime.date.today(), step=datetime.timedelta(days=1)):
d = start
while d < stop:
yield d
d += step
@lru_cache()
def findfont(fontname):
from subprocess import check_output
out = check_output(['fc-match', '-v', fontname]).decode()
for l in out.split('\n'):
if l.lstrip().startswith('file:'):
return l.split('"', 2)[1]
def debugfunc(logger=logging, *, _id=[0]):
def w(func):
@wraps(func)
def wrapper(*args, **kwargs):
myid = _id[0]
_id[0] += 1
logger.debug('[func %d] %s(%r, %r)', myid, func.__name__, args, kwargs)
ret = func(*args, **kwargs)
logger.debug('[func %d] return: %r', myid, ret)
return ret
return wrapper
return w
@contextlib.contextmanager
def execution_timeout(timeout):
def timed_out(signum, sigframe):
raise TimeoutError
delay, interval = signal.setitimer(signal.ITIMER_REAL, timeout, 0)
old_hdl = signal.signal(signal.SIGALRM, timed_out)
now = time.time()
try:
yield
finally:
# inner timeout must be smaller, or the timer event will be delayed
if delay:
elapsed = time.time() - now
delay = max(delay - elapsed, 0.000001)
else:
delay = 0
signal.setitimer(signal.ITIMER_REAL, delay, interval)
signal.signal(signal.SIGALRM, old_hdl)
def find_executables(name, path=None):
'''find all matching executables with specific name in path'''
if path is None:
path = os.environ['PATH'].split(os.pathsep)
elif isinstance(path, str):
path = path.split(os.pathsep)
path = [p for p in path if os.path.isdir(p)]
return [os.path.join(p, f) for p in path for f in os.listdir(p) if f == name]
# The following three are learnt from makepkg
def user_choose(prompt, timeout=None):
# XXX: hard-coded term characters are ok?
prompt = '\x1b[1;34m::\x1b[1;37m %s\x1b[0m ' % prompt
return getchar(prompt, timeout=timeout)
def msg(msg):
# XXX: hard-coded term characters are ok?
print('\x1b[1;32m==>\x1b[1;37m %s\x1b[0m' % msg)
def msg2(msg):
# XXX: hard-coded term characters are ok?
print('\x1b[1;34m ->\x1b[1;37m %s\x1b[0m' % msg)
def is_internal_ip(ip):
import ipaddress
ip = ipaddress.ip_address(ip)
return ip.is_loopback or ip.is_private or ip.is_reserved or ip.is_link_local
@contextlib.contextmanager
def at_dir(d: os.PathLike) -> Generator[None, None, None]:
old_dir = os.getcwd()
os.chdir(d)
try:
yield
finally:
os.chdir(old_dir)
def firstExistentPath(paths):
for p in paths:
if os.path.exists(p):
return p
def md5sum_of_file(file):
with open(file, 'rb') as f:
m = hashlib.md5()
while True:
d = f.read(81920)
if not d:
break
m.update(d)
return m.hexdigest()
def md5(s, encoding='utf-8'):
m = hashlib.md5()
m.update(s.encode(encoding))
return m.hexdigest()
def base64_encode(s):
if isinstance(s, str):
s = s.encode()
return base64.b64encode(s).decode('ascii')
def lock_file(path: os.PathLike) -> None:
lock = os.open(path, os.O_WRONLY | os.O_CREAT, 0o600)
try:
fcntl.flock(lock, fcntl.LOCK_EX|fcntl.LOCK_NB)
except BlockingIOError:
logger.warning('Waiting for lock to release...')
fcntl.flock(lock, fcntl.LOCK_EX)
@contextlib.contextmanager
def file_lock(file: os.PathLike) -> Generator[None, None, None]:
lock = os.open(file, os.O_WRONLY | os.O_CREAT, 0o600)
try:
fcntl.flock(lock, fcntl.LOCK_EX)
yield
finally:
os.close(lock)
def dict_bytes_to_str(d: Dict[Any, Any]) -> Dict[Any, Any]:
ret = {}
for k, v in d.items():
if isinstance(k, bytes):
try:
k = k.decode()
except UnicodeDecodeError:
pass
if isinstance(v, bytes):
try:
v = v.decode()
except UnicodeDecodeError:
pass
elif isinstance(v, dict):
v = dict_bytes_to_str(v)
elif isinstance(v, list):
try:
v = [x.decode() for x in v]
except UnicodeDecodeError:
pass
ret[k] = v
return ret
def xsel(input=None):
import subprocess
if input is None:
return subprocess.getoutput('uniclip')
else:
p = subprocess.Popen(['uniclip', '-i'], stdin=subprocess.PIPE)
p.communicate(input.encode())
return p.wait()

95
nicelogger.py Normal file
View file

@ -0,0 +1,95 @@
'''
A Tornado-inspired logging formatter, with displayed time with millisecond accuracy
FYI: pyftpdlib also has a Tornado-style logger.
'''
from __future__ import annotations
import sys
import time
import logging
class TornadoLogFormatter(logging.Formatter):
def __init__(self, color, *args, **kwargs):
super().__init__(*args, **kwargs)
self._color = color
if color:
import curses
curses.setupterm()
if sys.hexversion < 0x30203f0:
fg_color = str(curses.tigetstr("setaf") or
curses.tigetstr("setf") or "", "ascii")
else:
fg_color = curses.tigetstr("setaf") or curses.tigetstr("setf") or b""
self._colors = {
logging.DEBUG: str(curses.tparm(fg_color, 4), # Blue
"ascii"),
logging.INFO: str(curses.tparm(fg_color, 2), # Green
"ascii"),
logging.WARNING: str(curses.tparm(fg_color, 3), # Yellow
"ascii"),
logging.ERROR: str(curses.tparm(fg_color, 1), # Red
"ascii"),
logging.CRITICAL: str(curses.tparm(fg_color, 9), # Bright Red
"ascii"),
}
self._normal = str(curses.tigetstr("sgr0"), "ascii")
def format(self, record):
try:
record.message = record.getMessage()
except Exception as e:
record.message = "Bad message (%r): %r" % (e, record.__dict__)
record.asctime = time.strftime(
"%m-%d %H:%M:%S", self.converter(record.created))
prefix = '[%(levelname)1.1s %(asctime)s.%(msecs)03d %(module)s:%(lineno)d]' % \
record.__dict__
if self._color:
prefix = (self._colors.get(record.levelno, self._normal) +
prefix + self._normal)
formatted = prefix + " " + record.message
formatted += ''.join(
' %s=%s' % (k, v) for k, v in record.__dict__.items()
if k not in {
'levelname', 'asctime', 'module', 'lineno', 'args', 'message',
'filename', 'exc_info', 'exc_text', 'created', 'funcName',
'processName', 'process', 'msecs', 'relativeCreated', 'thread',
'threadName', 'name', 'levelno', 'msg', 'pathname', 'stack_info',
})
if record.exc_info:
if not record.exc_text:
record.exc_text = self.formatException(record.exc_info)
if record.exc_text:
formatted = formatted.rstrip() + "\n" + record.exc_text
return formatted.replace("\n", "\n ")
def enable_pretty_logging(level=logging.DEBUG, handler=None, color=None):
'''
handler: specify a handler instead of default StreamHandler
color: boolean, force color to be on / off. Default to be on only when
``handler`` isn't specified and the term supports color
'''
logger = logging.getLogger()
if handler is None:
h = logging.StreamHandler()
else:
h = handler
if color is None:
color = False
if handler is None and sys.stderr.isatty():
try:
import curses
curses.setupterm()
if curses.tigetnum("colors") > 0:
color = True
except:
import traceback
traceback.print_exc()
formatter = TornadoLogFormatter(color=color)
h.setLevel(level)
h.setFormatter(formatter)
logger.setLevel(level)
logger.addHandler(h)