#!/usr/bin/env python3 # vim:fileencoding=utf-8 import os import sys import re import pwd import configparser from functools import partial from itertools import filterfalse import queue import logging import sqlite3 import pyinotify Event = pyinotify.Event from tornado.ioloop import IOLoop import tornado.process import archpkg from myutils import enable_pretty_logging enable_pretty_logging(logging.DEBUG) # handles only x86_64, i686 and any arch packages _pkgfile_pat = re.compile(r'(?:^|/)[a-z0-9_-]+-[a-z0-9_.]+-\d+-(?:x86_64|i686|any)\.pkg\.tar\.xz$') class ActionInfo(archpkg.PkgNameInfo): def __new__(cls, path, action, four=None, five=None): if four is not None: return super().__new__(cls, path, action, four, five) file = os.path.split(path)[1] self = cls.parseFilename(file) self.action = action self.path = path return self def __repr__(self): return '' % (self.action, self.path) class RepoMan: _timeout = None _cmd_queue = queue.Queue() _cmd_running = False def __init__(self, config, base, ioloop=None): self.action = [] self._ioloop = ioloop or IOLoop.instance() self._base = base self._repo_dir = config.get('path') self._db_name = os.path.join(base, config.get('name') + '.db.tar.gz') self._files_name = os.path.join(base, self._db_name.replace('.db.tar.gz', '.files.tar.gz')) self._command_add = config.get('command-add', 'repo-add') self._command_remove = config.get('command-remove', 'repo-remove') self._wait_time = config.getint('wait-time', 10) def queue_command(self, cmd, callbacks=None): self._cmd_queue.put((cmd, callbacks)) if not self._cmd_running: self.run_command() def run_command(self): self.__class__._cmd_running = True try: cmd, callbacks = self._cmd_queue.get_nowait() except queue.Empty: self.__class__._cmd_running = False return logging.info('Running cmd: %r', cmd) # have to specify io_loop or we'll get error tracebacks p = tornado.process.Subprocess(cmd, io_loop=self._ioloop) p.set_exit_callback(partial(self.command_done, callbacks)) def command_done(self, callbacks, status): if status == 0 and callbacks: for cb in callbacks: cb() self.run_command() def _do_cmd(self, cmd, items, callbacks): cmd1 = [cmd, self._db_name] cmd1.extend(items) cmd2 = [cmd, '-f', self._files_name] cmd2.extend(items) self.queue_command(cmd1) self.queue_command(cmd2, callbacks) def _do_add(self, toadd): if toadd: files, callbacks = zip(*toadd) self._do_cmd(self._command_add, files, callbacks) def _do_remove(self, toremove): if toremove: files, callbacks = zip(*toremove) self._do_cmd(self._command_remove, files, callbacks) def add_action(self, action): logging.info('Adding action %r to db %r', action, self._db_name) self._action_pending(action) def _action_pending(self, act): self.action.append(act) if self._timeout: self._ioloop.remove_timeout(self._timeout) self._timeout = self._ioloop.add_timeout( self._ioloop.time() + self._wait_time, self.run, ) def run(self): self._timeout = None actions = self.action self.action = [] actiondict = {} for act in actions: if act.name not in actiondict: actiondict[act.name] = act else: oldact = actiondict[act.name] if oldact == act and oldact.action != act.action: # same package, opposite actions, do nothing del actiondict[act.name] else: # take the later action actiondict[act.name] = act toadd = [(x.path, x.callback) for x in actiondict.values() if x.action == 'add'] toremove = [(x.name, x.callback) for x in actiondict.values() if x.action == 'remove'] self._do_add(toadd) self._do_remove(toremove) class EventHandler(pyinotify.ProcessEvent): def my_init(self, config, wm, ioloop=None): self.moved_away = {} self.repomans = {} self._ioloop = ioloop or IOLoop.instance() base = config.get('path') dbname = config.get('info-db', os.path.join(base, 'pkginfo.db')) self._db = sqlite3.connect(dbname, isolation_level=None) # isolation_level=None means autocommit self._db.execute('''create table if not exists pkginfo (filename text unique, pkgname text, pkgarch text, pkgver text, owner text, mtime int, state int)''') dirs = [os.path.join(base, x) for x in ('any', 'i686', 'x86_64')] self.files = files = set() for d in dirs: files.update(os.path.join(d, f) for f in os.listdir(d)) wm.add_watch(d, pyinotify.ALL_EVENTS) self.repomans[d] = RepoMan(config, d, self._ioloop) self._initial_update(files) def _initial_update(self, files): oldfiles = {f[0] for f in self._db.execute('select filename from pkginfo')} for f in sorted(filterfalse(filterPkg, files - oldfiles), key=pkgsortkey): self.dispatch(f, 'add') for f in sorted(filterfalse(filterPkg, oldfiles - files), key=pkgsortkey): self.dispatch(f, 'remove') def process_IN_CLOSE_WRITE(self, event): logging.debug('Writing done: %s', event.pathname) self.dispatch(event.pathname, 'add') self.files.add(event.pathname) def process_IN_DELETE(self, event): logging.debug('Removing: %s', event.pathname) self.dispatch(event.pathname, 'remove') try: self.files.remove(event.pathname) except KeyError: # symlinks haven't been added pass def movedOut(self, event): logging.debug('Moved away: %s', event.pathname) self.dispatch(event.pathname, 'remove') def process_IN_MOVED_FROM(self, event): self.moved_away[event.cookie] = self._ioloop.add_timeout( self._ioloop.time() + 0.1, partial(self.movedOut, event), ) self.files.remove(event.pathname) def process_IN_MOVED_TO(self, event): if event.pathname in self.files: logging.warn('Overwritten: %s', event.pathname) self.files.add(event.pathname) if event.cookie in self.moved_away: self._ioloop.remove_timeout(self.moved_away[event.cookie]) else: logging.debug('Moved here: %s', event.pathname) self.dispatch(event.pathname, 'add') def dispatch(self, path, action): act = ActionInfo(path, action) d, file = os.path.split(path) base, arch = os.path.split(d) # rename if a packager has added to a wrong directory # but not for a link that has arch=any, as that's what we created if action == 'add' and act.arch != arch and not (os.path.islink(path) and act.arch == 'any'): newd = os.path.join(base, act.arch) newpath = os.path.join(newd, file) os.rename(path, newpath) act.path = newpath path = newpath arch = act.arch d = newd if arch == 'any': for newarch in ('i686', 'x86_64'): newd = os.path.join(base, newarch) newpath = os.path.join(newd, file) if action == 'add': os.symlink(os.path.join('..', arch, file), newpath) self._real_dispatch(newd, ActionInfo(newpath, action)) else: try: os.unlink(newpath) # this will be detected and handled later except FileNotFoundError: # someone deleted the file for us pass self._real_dispatch(d, act) def _real_dispatch(self, d, act): if act.action == 'add': def callback(): self._db.execute('update pkginfo set state = 0 where pkgname = ? and pkgarch = ?', (act.name, act.arch)) stat = os.stat(act.path) mtime = int(stat.st_mtime) try: owner = pwd.getpwuid(stat.st_uid).pw_name except KeyError: owner = 'uid_%d' % stat.st_uid self._db.execute( '''insert or replace into pkginfo (filename, pkgname, pkgarch, pkgver, state, owner, mtime) values (?, ?, ?, ?, ?, ?, ?)''', (act.path, act.name, act.arch, act.fullversion, 1, owner, mtime)) else: res = self._db.execute('select state from pkginfo where filename = ? and state = 1 limit 1', (act.path,)) if tuple(res) == (): # the file isn't in repo database, ignoring return def callback(): self._db.execute('delete from pkginfo where filename = ?', (act.path,)) act.callback = callback self.repomans[d].add_action(act) # def process_default(self, event): # print(event) def filterPkg(path): if isinstance(path, Event): path = path.pathname return not _pkgfile_pat.search(path) def pkgsortkey(path): pkg = archpkg.PkgNameInfo.parseFilename(os.path.split(path)[1]) return (pkg.name, pkg.arch, pkg) def main(conffile): config = configparser.ConfigParser() config.read(conffile) wm = pyinotify.WatchManager() ioloop = IOLoop.instance() handler = EventHandler( filterPkg, config=config['repository'], wm=wm, ioloop=ioloop, ) notifier = pyinotify.TornadoAsyncNotifier( wm, ioloop, default_proc_fun=handler, ) try: ioloop.start() except KeyboardInterrupt: ioloop.close() notifier.stop() if __name__ == '__main__': main(sys.argv[1])