#!/usr/bin/env python3 # vim:fileencoding=utf-8 import os import sys import re import pwd import configparser from functools import partial import queue import logging import sqlite3 import pyinotify from tornado.ioloop import IOLoop import tornado.process import archpkg from myutils import enable_pretty_logging enable_pretty_logging(logging.DEBUG) # handles only x86_64, i686 and any arch packages _pkgfile_pat = re.compile(r'(?:^|/)[a-z0-9_-]+-[a-z0-9_.]+-\d+-(?:x86_64|i686|any)\.pkg\.tar\.xz$') class ActionInfo(archpkg.PkgNameInfo): def __new__(cls, path, action, four=None, five=None): if four is not None: return super().__new__(cls, path, action, four, five) file = os.path.split(path)[1] self = cls.parseFilename(file) self.action = action self.path = path return self def __repr__(self): return '' % (self.action, self.path) class RepoMan: _timeout = None _cmd_queue = queue.Queue() _cmd_running = False def __init__(self, config, base, ioloop=None): self.action = [] self._ioloop = ioloop or IOLoop.instance() self._base = base self._repo_dir = config.get('path') self._db_name = os.path.join(base, config.get('name') + '.db.tar.gz') self._files_name = os.path.join(base, self._db_name.replace('.db.tar.gz', '.files.tar.gz')) self._command_add = config.get('command-add', 'repo-add') self._command_remove = config.get('command-remove', 'repo-remove') self._wait_time = config.getint('wait-time', 10) def queue_command(self, cmd, callbacks=None): self._cmd_queue.put((cmd, callbacks)) if not self._cmd_running: self.run_command() def run_command(self): self.__class__._cmd_running = True try: cmd, callbacks = self._cmd_queue.get_nowait() except queue.Empty: self.__class__._cmd_running = False return logging.info('Running cmd: %r', cmd) # have to specify io_loop or we'll get error tracebacks p = tornado.process.Subprocess(cmd, io_loop=self._ioloop) p.set_exit_callback(partial(self.command_done, callbacks)) def command_done(self, callbacks, status): if status == 0 and callbacks: for cb in callbacks: cb() self.run_command() def _do_cmd(self, cmd, items, callbacks): cmd1 = [cmd, self._db_name] cmd1.extend(items) cmd2 = [cmd, '-f', self._files_name] cmd2.extend(items) self.queue_command(cmd1) self.queue_command(cmd2, callbacks) def _do_add(self, toadd): if toadd: files, callbacks = zip(*toadd) self._do_cmd(self._command_add, files, callbacks) def _do_remove(self, toremove): if toremove: files, callbacks = zip(*toremove) self._do_cmd(self._command_remove, files, callbacks) def add_action(self, action): logging.info('Adding action %r to db %r', action, self._db_name) self._action_pending(action) def _action_pending(self, act): self.action.append(act) if self._timeout: self._ioloop.remove_timeout(self._timeout) self._timeout = self._ioloop.add_timeout( self._ioloop.time() + self._wait_time, self.run, ) def run(self): self._timeout = None actions = self.action self.action = [] actiondict = {} for act in actions: if act.name not in actiondict: actiondict[act.name] = act else: oldact = actiondict[act.name] if oldact == act and oldact.action != act.action: # same package, opposite actions, do nothing del actiondict[act.name] else: # take the later action actiondict[act.name] = act toadd = [(x.path, x.callback) for x in actiondict.values() if x.action == 'add'] toremove = [(x.name, x.callback) for x in actiondict.values() if x.action == 'remove'] self._do_add(toadd) self._do_remove(toremove) class EventHandler(pyinotify.ProcessEvent): def my_init(self, config, wm, ioloop=None): self.moved_away = {} self.repomans = {} self._ioloop = ioloop or IOLoop.instance() base = config.get('path') dbname = config.get('info-db', os.path.join(base, 'pkginfo.db')) self._db = sqlite3.connect(dbname, isolation_level=None) # isolation_level=None means autocommit self._db.execute('''create table if not exists pkginfo (filename text unique, pkgname text, pkgarch text, pkgver text, owner text, mtime int, state int)''') dirs = [os.path.join(base, x) for x in ('any', 'i686', 'x86_64')] self.files = files = set() for d in dirs: files.update(os.path.join(d, f) for f in os.listdir(d)) wm.add_watch(d, pyinotify.ALL_EVENTS) self.repomans[d] = RepoMan(config, d, self._ioloop) self._initial_update(files) def _initial_update(self, files): oldfiles = {f[0] for f in self._db.execute('select filename from pkginfo')} for f in files - oldfiles: if f.endswith('.pkg.tar.xz'): self.dispatch(f, 'add') for f in oldfiles - files: if f.endswith('.pkg.tar.xz'): self.dispatch(f, 'remove') def process_IN_CLOSE_WRITE(self, event): logging.debug('Writing done: %s', event.pathname) self.dispatch(event.pathname, 'add') self.files.add(event.pathname) def process_IN_DELETE(self, event): logging.debug('Removing: %s', event.pathname) self.dispatch(event.pathname, 'remove') try: self.files.remove(event.pathname) except KeyError: # symlinks haven't been added pass def movedOut(self, event): logging.debug('Moved away: %s', event.pathname) self.dispatch(event.pathname, 'remove') def process_IN_MOVED_FROM(self, event): self.moved_away[event.cookie] = self._ioloop.add_timeout( self._ioloop.time() + 0.1, partial(self.movedOut, event), ) self.files.remove(event.pathname) def process_IN_MOVED_TO(self, event): if event.pathname in self.files: logging.warn('Overwritten: %s', event.pathname) self.files.add(event.pathname) if event.cookie in self.moved_away: self._ioloop.remove_timeout(self.moved_away[event.cookie]) else: logging.debug('Moved here: %s', event.pathname) self.dispatch(event.pathname, 'add') def dispatch(self, path, action): act = ActionInfo(path, action) d, file = os.path.split(path) base, arch = os.path.split(d) # rename if a packager has added to a wrong directory # but not for a link that has arch=any, as that's what we created if action == 'add' and act.arch != arch and not (os.path.islink(path) and act.arch == 'any'): newd = os.path.join(base, act.arch) newpath = os.path.join(newd, file) os.rename(path, newpath) act.path = newpath path = newpath arch = act.arch d = newd if arch == 'any': for newarch in ('i686', 'x86_64'): newd = os.path.join(base, newarch) newpath = os.path.join(newd, file) if action == 'add': os.symlink(os.path.join('..', arch, file), newpath) self._real_dispatch(newd, ActionInfo(newpath, action)) else: try: os.unlink(newpath) # this will be detected and handled later except FileNotFoundError: # someone deleted the file for us pass self._real_dispatch(d, act) def _real_dispatch(self, d, act): if act.action == 'add': self._db.execute('update pkginfo set state = 0 where pkgname = ? and pkgarch = ?', (act.name, act.arch)) def callback(): stat = os.stat(act.path) mtime = int(stat.st_mtime) try: owner = pwd.getpwuid(stat.st_uid).pw_name except KeyError: owner = 'uid_%d' % stat.st_uid self._db.execute( '''insert or replace into pkginfo (filename, pkgname, pkgarch, pkgver, state, owner, mtime) values (?, ?, ?, ?, ?, ?, ?)''', (act.path, act.name, act.arch, act.fullversion, 1, owner, mtime)) else: res = self._db.execute('select state from pkginfo where filename = ? and state = 1 limit 1', (act.path,)) if tuple(res) == (): # the file isn't in repo database, ignoring return def callback(): self._db.execute('delete from pkginfo where filename = ?', (act.path,)) act.callback = callback self.repomans[d].add_action(act) # def process_default(self, event): # print(event) def filterPkg(event): return not _pkgfile_pat.search(event.pathname) def main(conffile): config = configparser.ConfigParser() config.read(conffile) wm = pyinotify.WatchManager() ioloop = IOLoop.instance() handler = EventHandler( filterPkg, config=config['repository'], wm=wm, ioloop=ioloop, ) notifier = pyinotify.TornadoAsyncNotifier( wm, ioloop, default_proc_fun=handler, ) try: ioloop.start() except KeyboardInterrupt: ioloop.close() notifier.stop() if __name__ == '__main__': main(sys.argv[1])