from __future__ import annotations import os, sys import re import datetime import time from functools import lru_cache, wraps import logging import contextlib import signal import hashlib import base64 import fcntl from typing import Tuple, Union, Optional, Dict, Any, Generator logger = logging.getLogger(__name__) def safe_overwrite(fname: str, data: Union[bytes, str], *, method: str = 'write', mode: str = 'w', encoding: Optional[str] = None) -> None: # FIXME: directory has no read perm # FIXME: symlinks and hard links tmpname = fname + '.tmp' # if not using "with", write can fail without exception with open(tmpname, mode, encoding=encoding) as f: getattr(f, method)(data) # see also: https://thunk.org/tytso/blog/2009/03/15/dont-fear-the-fsync/ f.flush() os.fsync(f.fileno()) # if the above write failed (because disk is full etc), the old data should be kept os.rename(tmpname, fname) UNITS = 'KMGTPEZY' def filesize(size: int) -> str: amt, unit = filesize_ex(size) if unit: return '%.1f%siB' % (amt, unit) else: return '%dB' % amt def filesize_ex(size: int) -> Tuple[Union[float, int], str]: left: Union[int, float] = abs(size) unit = -1 n = len(UNITS) while left > 1100 and unit < n: left = left / 1024 unit += 1 if unit == -1: return size, '' else: if size < 0: left = -left return left, UNITS[unit] class FileSize(int): def __str__(self) -> str: return filesize(self).rstrip('iB') def parse_filesize(s: str) -> int: s1 = s.rstrip('iB') if not s1: raise ValueError(s) last = s1[-1] try: idx = UNITS.index(last) except ValueError: return int(float(s1)) v = float(s1[:-1]) * 1024 ** (idx+1) return int(v) def humantime(t: int) -> str: '''seconds -> XhYmZs''' if t < 0: sign = '-' t = -t else: sign = '' m, s = divmod(t, 60) h, m = divmod(m, 60) d, h = divmod(h, 24) ret = '' if d: ret += '%dd' % d if h: ret += '%dh' % h if m: ret += '%dm' % m if s: ret += '%ds' % s if not ret: ret = '0s' return sign + ret def dehumantime(s: str) -> int: '''XhYmZs -> seconds''' m = re.match(r'(?:(?P\d+)d)?(?:(?P\d+)h)?(?:(?P\d+)m)?(?:(?P\d+)s)?$', s) if m: return ( int(m.group('d') or 0) * 3600 * 24 + int(m.group('h') or 0) * 3600 + int(m.group('m') or 0) * 60 + int(m.group('s') or 0) ) else: raise ValueError(s) def _timed_read(file, timeout): from select import select if select([file], [], [], timeout)[0]: return file.read(1) def getchar(prompt, hidden=False, end='\n', timeout=None): '''读取一个字符''' import termios sys.stdout.write(prompt) sys.stdout.flush() fd = sys.stdin.fileno() ch: Optional[str] def _read() -> Optional[str]: ch: Optional[str] if timeout is None: ch = sys.stdin.read(1) else: ch = _timed_read(sys.stdin, timeout) return ch if os.isatty(fd): old = termios.tcgetattr(fd) new = termios.tcgetattr(fd) if hidden: new[3] = new[3] & ~termios.ICANON & ~termios.ECHO else: new[3] = new[3] & ~termios.ICANON new[6][termios.VMIN] = 1 new[6][termios.VTIME] = 0 try: termios.tcsetattr(fd, termios.TCSANOW, new) termios.tcsendbreak(fd, 0) ch = _read() finally: termios.tcsetattr(fd, termios.TCSAFLUSH, old) else: ch = _read() sys.stdout.write(end) return ch def loadso(fname): '''ctypes.CDLL 的 wrapper,从 sys.path 中搜索文件''' from ctypes import CDLL for d in sys.path: p = os.path.join(d, fname) if os.path.exists(p): return CDLL(p) raise ImportError('%s not found' % fname) def dofile(path): G = {} with open(path) as f: exec(f.read(), G) return G def restart_if_failed(func, max_tries, args=(), kwargs={}, secs=60, sleep=None): ''' re-run when some exception happens, until `max_tries` in `secs` ''' import traceback from collections import deque dq = deque(maxlen=max_tries) while True: dq.append(time.time()) try: return func(*args, **kwargs) except Exception: traceback.print_exc() if len(dq) == max_tries and time.time() - dq[0] < secs: break if sleep is not None: time.sleep(sleep) else: break def daterange(start, stop=datetime.date.today(), step=datetime.timedelta(days=1)): d = start while d < stop: yield d d += step @lru_cache() def findfont(fontname): from subprocess import check_output out = check_output(['fc-match', '-v', fontname]).decode() for l in out.split('\n'): if l.lstrip().startswith('file:'): return l.split('"', 2)[1] def debugfunc(logger=logging, *, _id=[0]): def w(func): @wraps(func) def wrapper(*args, **kwargs): myid = _id[0] _id[0] += 1 logger.debug('[func %d] %s(%r, %r)', myid, func.__name__, args, kwargs) ret = func(*args, **kwargs) logger.debug('[func %d] return: %r', myid, ret) return ret return wrapper return w @contextlib.contextmanager def execution_timeout(timeout): def timed_out(signum, sigframe): raise TimeoutError delay, interval = signal.setitimer(signal.ITIMER_REAL, timeout, 0) old_hdl = signal.signal(signal.SIGALRM, timed_out) now = time.time() try: yield finally: # inner timeout must be smaller, or the timer event will be delayed if delay: elapsed = time.time() - now delay = max(delay - elapsed, 0.000001) else: delay = 0 signal.setitimer(signal.ITIMER_REAL, delay, interval) signal.signal(signal.SIGALRM, old_hdl) def find_executables(name, path=None): '''find all matching executables with specific name in path''' if path is None: path = os.environ['PATH'].split(os.pathsep) elif isinstance(path, str): path = path.split(os.pathsep) path = [p for p in path if os.path.isdir(p)] return [os.path.join(p, f) for p in path for f in os.listdir(p) if f == name] # The following three are learnt from makepkg def user_choose(prompt, timeout=None): # XXX: hard-coded term characters are ok? prompt = '\x1b[1;34m::\x1b[1;37m %s\x1b[0m ' % prompt return getchar(prompt, timeout=timeout) def msg(msg): # XXX: hard-coded term characters are ok? print('\x1b[1;32m==>\x1b[1;37m %s\x1b[0m' % msg) def msg2(msg): # XXX: hard-coded term characters are ok? print('\x1b[1;34m ->\x1b[1;37m %s\x1b[0m' % msg) def is_internal_ip(ip): import ipaddress ip = ipaddress.ip_address(ip) return ip.is_loopback or ip.is_private or ip.is_reserved or ip.is_link_local @contextlib.contextmanager def at_dir(d: os.PathLike) -> Generator[None, None, None]: old_dir = os.getcwd() os.chdir(d) try: yield finally: os.chdir(old_dir) def firstExistentPath(paths): for p in paths: if os.path.exists(p): return p def md5sum_of_file(file): with open(file, 'rb') as f: m = hashlib.md5() while True: d = f.read(81920) if not d: break m.update(d) return m.hexdigest() def md5(s, encoding='utf-8'): m = hashlib.md5() m.update(s.encode(encoding)) return m.hexdigest() def base64_encode(s): if isinstance(s, str): s = s.encode() return base64.b64encode(s).decode('ascii') def lock_file(path: os.PathLike) -> None: lock = os.open(path, os.O_WRONLY | os.O_CREAT, 0o600) try: fcntl.flock(lock, fcntl.LOCK_EX|fcntl.LOCK_NB) except BlockingIOError: logger.warning('Waiting for lock to release...') fcntl.flock(lock, fcntl.LOCK_EX) @contextlib.contextmanager def file_lock(file: os.PathLike) -> Generator[None, None, None]: lock = os.open(file, os.O_WRONLY | os.O_CREAT, 0o600) try: fcntl.flock(lock, fcntl.LOCK_EX) yield finally: os.close(lock) def dict_bytes_to_str(d: Dict[Any, Any]) -> Dict[Any, Any]: ret = {} for k, v in d.items(): if isinstance(k, bytes): try: k = k.decode() except UnicodeDecodeError: pass if isinstance(v, bytes): try: v = v.decode() except UnicodeDecodeError: pass elif isinstance(v, dict): v = dict_bytes_to_str(v) elif isinstance(v, list): try: v = [x.decode() for x in v] except UnicodeDecodeError: pass ret[k] = v return ret def xsel(input=None): import subprocess if input is None: return subprocess.getoutput('uniclip') else: p = subprocess.Popen(['uniclip', '-i'], stdin=subprocess.PIPE) p.communicate(input.encode()) return p.wait()