diff --git a/archrepo2/repomon.py b/archrepo2/repomon.py index 4d9d942..e746437 100755 --- a/archrepo2/repomon.py +++ b/archrepo2/repomon.py @@ -13,6 +13,7 @@ import socket import time import hashlib import pickle +import collections import pyinotify Event = pyinotify.Event @@ -41,6 +42,21 @@ class ActionInfo(archpkg.PkgNameInfo): def __repr__(self): return '' % (self.action, self.path) +class RSet(collections.Counter): + '''a multiset that supports (only) add() and remove()''' + + def add(self, thing): + self[thing] += 1 + + def remove(self, thing): + self[thing] -= 1 + + def __setitem__(self, thing, value): + if value <= 0 and thing in self: + del self[thing] + else: + super().__setitem__(thing, value) + class RepoMan: _timeout = None _cmd_queue = queue.Queue() @@ -50,6 +66,7 @@ class RepoMan: self.action = [] self._ioloop = ioloop or IOLoop.instance() self._base = base + self._processing_paths = RSet() self._repo_dir = config.get('path') self.name = config.get('name') @@ -96,8 +113,7 @@ class RepoMan: def command_done(self, callbacks, status): if status == 0: if callbacks: - for cb in callbacks: - cb() + self._do_callbacks(callbacks) logger.info('previous command done.') else: logger.warn('previous command failed with status code %d.', status) @@ -113,7 +129,8 @@ class RepoMan: def _do_add(self, toadd): if toadd: - files, callbacks = zip(*toadd) + files = [x.path for x in toadd] + callbacks = [x.callback for x in toadd] if self._without_db: self._do_callbacks(callbacks) else: @@ -121,15 +138,34 @@ class RepoMan: def _do_remove(self, toremove): if toremove: - files, callbacks = zip(*toremove) + indb_remove = [] + now_callbacks = [] + + for x in toremove: + res = self._db.execute( + '''select state from pkginfo + where filename = ? and state = 1 and pkgrepo = ? limit 1''', + (x.path, x.name) + ) + if tuple(res) == (): + # the file isn't in repo database, just delete from our info database + logger.debug( + 'deleting entry for not-in-database package: %s', x.path) + now_callbacks.append(x.callback) + else: + indb_remove.append(x) + + self._do_callbacks(now_callbacks) + names = [x.name for x in indb_remove] + callbacks = [x.callback for x in indb_remove] if self._without_db: self._do_callbacks(callbacks) else: - self._do_cmd(self._command_remove, files, callbacks) + self._do_cmd(self._command_remove, names, callbacks) def _do_callbacks(self, callbacks): for cb in callbacks: - cb() + cb(self._processing_paths) def add_action(self, action): logger.info('Adding action %r to db %r', action, self._db_file) @@ -184,6 +220,8 @@ class RepoMan: actions = self.action self.action = [] actiondict = {} + + # reduce for act in actions: if act.name not in actiondict: actiondict[act.name] = act @@ -195,15 +233,39 @@ class RepoMan: else: # take the later action, but record the former try: - actiondict[act.name].callback(state=0) + actiondict[act.name].callback(self._processing_paths, state=0) except: logger.exception('failed to run action %r.', actiondict[act.name]) actiondict[act.name] = act - toadd = [(x.path, x.callback) for x in actiondict.values() if x.action == 'add'] - toremove = [(x.name, x.callback) for x in actiondict.values() if x.action == 'remove'] + + processing = self._processing_paths + + delayed = [] + toadd = [] + toremove = [] + toprocess = RSet() + logger.debug('Still processing: %r', processing) + for x in actiondict.values(): + if x.path in processing: + delayed.append(x) + continue + + if x.action == 'add': + toadd.append(x) + elif x.action == 'remove': + toremove.append(x) + else: + raise ValueError('unknown action: %r', x) + toprocess.add(x.path) + + self._processing_paths = toprocess self._do_add(toadd) self._do_remove(toremove) + # try to run at next chance + for action in delayed: + self._action_pending(action) + class EventHandler(pyinotify.ProcessEvent): def my_init(self, config, wm, ioloop=None): self.moved_away = {} @@ -354,7 +416,7 @@ class EventHandler(pyinotify.ProcessEvent): def _real_dispatch(self, d, act): if act.action == 'add': arch = os.path.split(d)[1] - def callback(stat, state=1): + def callback(stat, rset, state=1): self._db.execute( 'update pkginfo set state = 0 where pkgname = ? and forarch = ? and pkgrepo = ?', (act.name, arch, self.name) @@ -377,23 +439,19 @@ class EventHandler(pyinotify.ProcessEvent): (filename, pkgrepo, pkgname, pkgarch, pkgver, forarch, state, owner, mtime, info) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', (act.path, self.name, act.name, act.arch, act.fullversion, arch, state, owner, mtime, info)) + # FIXME: this can be never called when the command failed + rset.remove(act.path) logger.info('Action %r done.', act) # stat path here, so that it is more unlikely to have disappeared since callback = partial(callback, os.stat(act.path)) else: - res = self._db.execute( - 'select state from pkginfo where filename = ? and state = 1 and pkgrepo = ? limit 1', - (act.path, self.name) - ) - if tuple(res) == (): - # the file isn't in repo database, just delete from our info database - logger.debug('deleting entry for not-in-database package: %s', act.path) - self._db.execute('delete from pkginfo where filename = ? and pkgrepo = ?', (act.path, self.name)) - return - def callback(state=any): + def callback(rset, state=any): '''``state`` is not used''' self._db.execute('delete from pkginfo where filename = ? and pkgrepo = ?', (act.path, self.name)) + # FIXME: this can be never called when the command failed + rset.remove(act.path) + logger.info('Action %r done.', act) act.callback = callback self.repomans[d].add_action(act)