#!/usr/bin/env python3 # vim:fileencoding=utf-8 import os import sys import configparser from functools import partial import queue import logging import pyinotify from tornado.ioloop import IOLoop import tornado.process import archpkg from myutils import enable_pretty_logging enable_pretty_logging(logging.DEBUG) class ActionInfo: def __init__(self, path, action): self.pkginfo = archpkg.parsePkgName(path) self.action = action self.path = path def __getattr__(self, name): return getattr(self.pkginfo, name) def __repr__(self): return '' % (self.action, self.path) class RepoMan: _timeout = None _cmd_queue = queue.Queue() _cmd_running = False def __init__(self, config, base, ioloop=None): self.action = [] self._ioloop = ioloop or IOLoop.instance() self._base = base self._repo_dir = config.get('path') self._db_name = os.path.join(base, config.get('name') + '.db.tar.gz') self._files_name = os.path.join(base, self._db_name.replace('.db.tar.gz', '.files.tar.gz')) self._command_add = config.get('command-add', 'repo-add') self._command_remove = config.get('command-remove', 'repo-remove') self._wait_time = config.getint('wait-time', 10) def queue_command(self, cmd): self._cmd_queue.put(cmd) if not self._cmd_running: self.run_command() def run_command(self, status=0): self.__class__._cmd_running = True try: cmd = self._cmd_queue.get_nowait() except queue.Empty: self.__class__._cmd_running = False return logging.info('Running cmd: %r', cmd) # have to specify io_loop or we'll get error tracebacks p = tornado.process.Subprocess(cmd, io_loop=self._ioloop) p.set_exit_callback(self.run_command) def _do_cmd(self, cmd, items): cmd1 = [cmd, self._db_name] cmd1.extend(items) cmd2 = [cmd, '-f', self._files_name] cmd2.extend(items) self.queue_command(cmd1) self.queue_command(cmd2) def _do_add(self, toadd): if toadd: self._do_cmd(self._command_add, toadd) def _do_remove(self, toremove): if toremove: self._do_cmd(self._command_remove, toremove) def add_action(self, action): logging.info('Adding action %r to db %r', action, self._db_name) self._action_pending(action) def _action_pending(self, act): self.action.append(act) if self._timeout: self._ioloop.remove_timeout(self._timeout) self._timeout = self._ioloop.add_timeout( self._ioloop.time() + self._wait_time, self.run, ) def run(self): self._timeout = None actions = self.action self.action = [] actiondict = {} for act in actions: if act.name not in actiondict: actiondict[act.name] = act else: oldact = actiondict[act.name] if oldact.pkginfo == act.pkginfo: # same package, do nothing del actiondict[act.name] else: # take the later action actiondict[act.name] = act toadd = [x.path for x in actiondict.values() if x.action == 'add'] toremove = [x.name for x in actiondict.values() if x.action == 'remove'] self._do_add(toadd) self._do_remove(toremove) class EventHandler(pyinotify.ProcessEvent): def my_init(self, config, wm, ioloop=None): self.moved_away = {} self.files = files = set() self.repomans = {} self._ioloop = ioloop or IOLoop.instance() base = config.get('path') dirs = [os.path.join(base, x) for x in ('any', 'i686', 'x86_64')] for d in dirs: files.update(os.path.join(d, f) for f in os.listdir(d)) wm.add_watch(d, pyinotify.ALL_EVENTS) self.repomans[d] = RepoMan(config, d, self._ioloop) def process_IN_CLOSE_WRITE(self, event): logging.debug('Writing done: %s', event.pathname) self.dispatch(event.pathname, 'add') self.files.add(event.pathname) def process_IN_DELETE(self, event): logging.debug('Removing: %s', event.pathname) self.dispatch(event.pathname, 'remove') try: self.files.remove(event.pathname) except KeyError: # symlinks haven't been added pass def movedOut(self, event): logging.debug('Moved away: %s', event.pathname) self.dispatch(event.pathname, 'remove') def process_IN_MOVED_FROM(self, event): self.moved_away[event.cookie] = self._ioloop.add_timeout( self._ioloop.time() + 0.1, partial(self.movedOut, event), ) self.files.remove(event.pathname) def process_IN_MOVED_TO(self, event): if event.pathname in self.files: logging.warn('Overwritten: %s', event.pathname) self.files.add(event.pathname) if event.cookie in self.moved_away: self._ioloop.remove_timeout(self.moved_away[event.cookie]) else: logging.debug('Moved here: %s', event.pathname) self.dispatch(event.pathname, 'add') def dispatch(self, path, action): act = ActionInfo(path, action) d, file = os.path.split(path) base, arch = os.path.split(d) # rename if a packager has added to a wrong directory # but not for a link that has arch=any, as that's what we created if action == 'add' and act.arch != arch and not (os.path.islink(path) and act.arch == 'any'): newd = os.path.join(base, act.arch) newpath = os.path.join(newd, file) os.rename(path, newpath) act.path = newpath path = newpath arch = act.arch d = newd if arch == 'any': for newarch in ('i686', 'x86_64'): newd = os.path.join(base, newarch) newpath = os.path.join(newd, file) if action == 'add': os.symlink(os.path.join('..', arch, file), newpath) self.repomans[newd].add_action(ActionInfo(newpath, action)) else: try: os.unlink(newpath) # this will be detected and handled later except FileNotFoundError: # someone deleted the file for us pass self.repomans[d].add_action(act) # def process_default(self, event): # print(event) def filterPkg(event): return not event.pathname.endswith('.pkg.tar.xz') def main(conffile): config = configparser.ConfigParser() config.read(conffile) wm = pyinotify.WatchManager() ioloop = IOLoop.instance() handler = EventHandler( filterPkg, config=config['repository'], wm=wm, ioloop=ioloop, ) notifier = pyinotify.TornadoAsyncNotifier( wm, ioloop, default_proc_fun=handler, ) try: ioloop.start() except KeyboardInterrupt: ioloop.close() notifier.stop() if __name__ == '__main__': main(sys.argv[1])