save paths being processing so that we can delay later update

But this doesn't seem to solve any problems...
This commit is contained in:
lilydjwg 2014-02-13 21:52:17 +08:00
parent 686227b6f2
commit 8e84b8906e

View file

@ -13,6 +13,7 @@ import socket
import time import time
import hashlib import hashlib
import pickle import pickle
import collections
import pyinotify import pyinotify
Event = pyinotify.Event Event = pyinotify.Event
@ -41,6 +42,21 @@ class ActionInfo(archpkg.PkgNameInfo):
def __repr__(self): def __repr__(self):
return '<ActionInfo: %s %s>' % (self.action, self.path) return '<ActionInfo: %s %s>' % (self.action, self.path)
class RSet(collections.Counter):
'''a multiset that supports (only) add() and remove()'''
def add(self, thing):
self[thing] += 1
def remove(self, thing):
self[thing] -= 1
def __setitem__(self, thing, value):
if value <= 0 and thing in self:
del self[thing]
else:
super().__setitem__(thing, value)
class RepoMan: class RepoMan:
_timeout = None _timeout = None
_cmd_queue = queue.Queue() _cmd_queue = queue.Queue()
@ -50,6 +66,7 @@ class RepoMan:
self.action = [] self.action = []
self._ioloop = ioloop or IOLoop.instance() self._ioloop = ioloop or IOLoop.instance()
self._base = base self._base = base
self._processing_paths = RSet()
self._repo_dir = config.get('path') self._repo_dir = config.get('path')
self.name = config.get('name') self.name = config.get('name')
@ -96,8 +113,7 @@ class RepoMan:
def command_done(self, callbacks, status): def command_done(self, callbacks, status):
if status == 0: if status == 0:
if callbacks: if callbacks:
for cb in callbacks: self._do_callbacks(callbacks)
cb()
logger.info('previous command done.') logger.info('previous command done.')
else: else:
logger.warn('previous command failed with status code %d.', status) logger.warn('previous command failed with status code %d.', status)
@ -113,7 +129,8 @@ class RepoMan:
def _do_add(self, toadd): def _do_add(self, toadd):
if toadd: if toadd:
files, callbacks = zip(*toadd) files = [x.path for x in toadd]
callbacks = [x.callback for x in toadd]
if self._without_db: if self._without_db:
self._do_callbacks(callbacks) self._do_callbacks(callbacks)
else: else:
@ -121,15 +138,34 @@ class RepoMan:
def _do_remove(self, toremove): def _do_remove(self, toremove):
if toremove: if toremove:
files, callbacks = zip(*toremove) indb_remove = []
now_callbacks = []
for x in toremove:
res = self._db.execute(
'''select state from pkginfo
where filename = ? and state = 1 and pkgrepo = ? limit 1''',
(x.path, x.name)
)
if tuple(res) == ():
# the file isn't in repo database, just delete from our info database
logger.debug(
'deleting entry for not-in-database package: %s', x.path)
now_callbacks.append(x.callback)
else:
indb_remove.append(x)
self._do_callbacks(now_callbacks)
names = [x.name for x in indb_remove]
callbacks = [x.callback for x in indb_remove]
if self._without_db: if self._without_db:
self._do_callbacks(callbacks) self._do_callbacks(callbacks)
else: else:
self._do_cmd(self._command_remove, files, callbacks) self._do_cmd(self._command_remove, names, callbacks)
def _do_callbacks(self, callbacks): def _do_callbacks(self, callbacks):
for cb in callbacks: for cb in callbacks:
cb() cb(self._processing_paths)
def add_action(self, action): def add_action(self, action):
logger.info('Adding action %r to db %r', action, self._db_file) logger.info('Adding action %r to db %r', action, self._db_file)
@ -184,6 +220,8 @@ class RepoMan:
actions = self.action actions = self.action
self.action = [] self.action = []
actiondict = {} actiondict = {}
# reduce
for act in actions: for act in actions:
if act.name not in actiondict: if act.name not in actiondict:
actiondict[act.name] = act actiondict[act.name] = act
@ -195,15 +233,39 @@ class RepoMan:
else: else:
# take the later action, but record the former # take the later action, but record the former
try: try:
actiondict[act.name].callback(state=0) actiondict[act.name].callback(self._processing_paths, state=0)
except: except:
logger.exception('failed to run action %r.', actiondict[act.name]) logger.exception('failed to run action %r.', actiondict[act.name])
actiondict[act.name] = act actiondict[act.name] = act
toadd = [(x.path, x.callback) for x in actiondict.values() if x.action == 'add']
toremove = [(x.name, x.callback) for x in actiondict.values() if x.action == 'remove'] processing = self._processing_paths
delayed = []
toadd = []
toremove = []
toprocess = RSet()
logger.debug('Still processing: %r', processing)
for x in actiondict.values():
if x.path in processing:
delayed.append(x)
continue
if x.action == 'add':
toadd.append(x)
elif x.action == 'remove':
toremove.append(x)
else:
raise ValueError('unknown action: %r', x)
toprocess.add(x.path)
self._processing_paths = toprocess
self._do_add(toadd) self._do_add(toadd)
self._do_remove(toremove) self._do_remove(toremove)
# try to run at next chance
for action in delayed:
self._action_pending(action)
class EventHandler(pyinotify.ProcessEvent): class EventHandler(pyinotify.ProcessEvent):
def my_init(self, config, wm, ioloop=None): def my_init(self, config, wm, ioloop=None):
self.moved_away = {} self.moved_away = {}
@ -354,7 +416,7 @@ class EventHandler(pyinotify.ProcessEvent):
def _real_dispatch(self, d, act): def _real_dispatch(self, d, act):
if act.action == 'add': if act.action == 'add':
arch = os.path.split(d)[1] arch = os.path.split(d)[1]
def callback(stat, state=1): def callback(stat, rset, state=1):
self._db.execute( self._db.execute(
'update pkginfo set state = 0 where pkgname = ? and forarch = ? and pkgrepo = ?', 'update pkginfo set state = 0 where pkgname = ? and forarch = ? and pkgrepo = ?',
(act.name, arch, self.name) (act.name, arch, self.name)
@ -377,23 +439,19 @@ class EventHandler(pyinotify.ProcessEvent):
(filename, pkgrepo, pkgname, pkgarch, pkgver, forarch, state, owner, mtime, info) values (filename, pkgrepo, pkgname, pkgarch, pkgver, forarch, state, owner, mtime, info) values
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''',
(act.path, self.name, act.name, act.arch, act.fullversion, arch, state, owner, mtime, info)) (act.path, self.name, act.name, act.arch, act.fullversion, arch, state, owner, mtime, info))
# FIXME: this can be never called when the command failed
rset.remove(act.path)
logger.info('Action %r done.', act) logger.info('Action %r done.', act)
# stat path here, so that it is more unlikely to have disappeared since # stat path here, so that it is more unlikely to have disappeared since
callback = partial(callback, os.stat(act.path)) callback = partial(callback, os.stat(act.path))
else: else:
res = self._db.execute( def callback(rset, state=any):
'select state from pkginfo where filename = ? and state = 1 and pkgrepo = ? limit 1',
(act.path, self.name)
)
if tuple(res) == ():
# the file isn't in repo database, just delete from our info database
logger.debug('deleting entry for not-in-database package: %s', act.path)
self._db.execute('delete from pkginfo where filename = ? and pkgrepo = ?', (act.path, self.name))
return
def callback(state=any):
'''``state`` is not used''' '''``state`` is not used'''
self._db.execute('delete from pkginfo where filename = ? and pkgrepo = ?', (act.path, self.name)) self._db.execute('delete from pkginfo where filename = ? and pkgrepo = ?', (act.path, self.name))
# FIXME: this can be never called when the command failed
rset.remove(act.path)
logger.info('Action %r done.', act)
act.callback = callback act.callback = callback
self.repomans[d].add_action(act) self.repomans[d].add_action(act)