move repository monitor function into a module

This commit is contained in:
lilydjwg 2013-01-07 23:25:13 +08:00
parent c8784cc886
commit f1a77615d8
2 changed files with 308 additions and 293 deletions

View file

@ -1,314 +1,24 @@
#!/usr/bin/env python3
# vim:fileencoding=utf-8
import os
import sys
import re
import pwd
import configparser
from functools import partial
from itertools import filterfalse
import queue
import logging
import sqlite3
import pyinotify
Event = pyinotify.Event
from tornado.ioloop import IOLoop
import tornado.process
import archpkg
from myutils import enable_pretty_logging
enable_pretty_logging(logging.DEBUG)
# handles only x86_64, i686 and any arch packages
_pkgfile_pat = re.compile(r'(?:^|/)[a-z0-9_-]+-[a-z0-9_.]+-\d+-(?:x86_64|i686|any)\.pkg\.tar\.xz$')
class ActionInfo(archpkg.PkgNameInfo):
def __new__(cls, path, action, four=None, five=None):
if four is not None:
return super().__new__(cls, path, action, four, five)
file = os.path.split(path)[1]
self = cls.parseFilename(file)
self.action = action
self.path = path
return self
def __repr__(self):
return '<ActionInfo: %s %s>' % (self.action, self.path)
class RepoMan:
_timeout = None
_cmd_queue = queue.Queue()
_cmd_running = False
def __init__(self, config, base, ioloop=None):
self.action = []
self._ioloop = ioloop or IOLoop.instance()
self._base = base
self._repo_dir = config.get('path')
self._db_name = os.path.join(base, config.get('name') + '.db.tar.gz')
self._files_name = os.path.join(base, self._db_name.replace('.db.tar.gz', '.files.tar.gz'))
self._command_add = config.get('command-add', 'repo-add')
self._command_remove = config.get('command-remove', 'repo-remove')
self._wait_time = config.getint('wait-time', 10)
def queue_command(self, cmd, callbacks=None):
self._cmd_queue.put((cmd, callbacks))
if not self._cmd_running:
self.run_command()
def run_command(self):
self.__class__._cmd_running = True
try:
cmd, callbacks = self._cmd_queue.get_nowait()
except queue.Empty:
self.__class__._cmd_running = False
return
logging.info('Running cmd: %r', cmd)
# have to specify io_loop or we'll get error tracebacks
p = tornado.process.Subprocess(cmd, io_loop=self._ioloop)
p.set_exit_callback(partial(self.command_done, callbacks))
def command_done(self, callbacks, status):
if status == 0:
if callbacks:
for cb in callbacks:
cb()
logging.info('previous command done.')
else:
logging.warn('previous command failed with status code %d.', status)
self.run_command()
def _do_cmd(self, cmd, items, callbacks):
cmd1 = [cmd, self._db_name]
cmd1.extend(items)
cmd2 = [cmd, '-f', self._files_name]
cmd2.extend(items)
self.queue_command(cmd1)
self.queue_command(cmd2, callbacks)
def _do_add(self, toadd):
if toadd:
files, callbacks = zip(*toadd)
self._do_cmd(self._command_add, files, callbacks)
def _do_remove(self, toremove):
if toremove:
files, callbacks = zip(*toremove)
self._do_cmd(self._command_remove, files, callbacks)
def add_action(self, action):
logging.info('Adding action %r to db %r', action, self._db_name)
self._action_pending(action)
def _action_pending(self, act):
self.action.append(act)
if self._timeout:
self._ioloop.remove_timeout(self._timeout)
self._timeout = self._ioloop.add_timeout(
self._ioloop.time() + self._wait_time,
self.run,
)
def run(self):
self._timeout = None
actions = self.action
self.action = []
actiondict = {}
for act in actions:
if act.name not in actiondict:
actiondict[act.name] = act
else:
oldact = actiondict[act.name]
if oldact == act and oldact.action != act.action:
# same package, opposite actions, do nothing
del actiondict[act.name]
else:
# take the later action
actiondict[act.name] = act
toadd = [(x.path, x.callback) for x in actiondict.values() if x.action == 'add']
toremove = [(x.name, x.callback) for x in actiondict.values() if x.action == 'remove']
self._do_add(toadd)
self._do_remove(toremove)
class EventHandler(pyinotify.ProcessEvent):
def my_init(self, config, wm, ioloop=None):
self.moved_away = {}
self.repomans = {}
self._ioloop = ioloop or IOLoop.instance()
base = config.get('path')
dbname = config.get('info-db', os.path.join(base, 'pkginfo.db'))
self._db = sqlite3.connect(dbname, isolation_level=None) # isolation_level=None means autocommit
self._db.execute('''create table if not exists pkginfo
(filename text unique,
pkgname text,
pkgarch text,
pkgver text,
forarch text,
owner text,
mtime int,
state int)''')
dirs = [os.path.join(base, x) for x in ('any', 'i686', 'x86_64')]
self.files = files = set()
for d in dirs:
files.update(os.path.join(d, f) for f in os.listdir(d))
wm.add_watch(d, pyinotify.ALL_EVENTS)
self.repomans[d] = RepoMan(config, d, self._ioloop)
self._initial_update(files)
def _initial_update(self, files):
oldfiles = {f[0] for f in self._db.execute('select filename from pkginfo')}
for f in sorted(filterfalse(filterPkg, files - oldfiles), key=pkgsortkey):
self.dispatch(f, 'add')
for f in sorted(filterfalse(filterPkg, oldfiles - files), key=pkgsortkey):
self.dispatch(f, 'remove')
def process_IN_CLOSE_WRITE(self, event):
logging.debug('Writing done: %s', event.pathname)
self.dispatch(event.pathname, 'add')
self.files.add(event.pathname)
def process_IN_DELETE(self, event):
logging.debug('Removing: %s', event.pathname)
self.dispatch(event.pathname, 'remove')
try:
self.files.remove(event.pathname)
except KeyError:
# symlinks haven't been added
pass
def movedOut(self, event):
logging.debug('Moved away: %s', event.pathname)
self.dispatch(event.pathname, 'remove')
def process_IN_MOVED_FROM(self, event):
self.moved_away[event.cookie] = self._ioloop.add_timeout(
self._ioloop.time() + 0.1,
partial(self.movedOut, event),
)
self.files.remove(event.pathname)
def process_IN_MOVED_TO(self, event):
if event.pathname in self.files:
logging.warn('Overwritten: %s', event.pathname)
self.files.add(event.pathname)
if event.cookie in self.moved_away:
self._ioloop.remove_timeout(self.moved_away[event.cookie])
else:
logging.debug('Moved here: %s', event.pathname)
self.dispatch(event.pathname, 'add')
def dispatch(self, path, action):
act = ActionInfo(path, action)
d, file = os.path.split(path)
base, arch = os.path.split(d)
# rename if a packager has added to a wrong directory
# but not for a link that has arch=any, as that's what we created
if action == 'add' and act.arch != arch and not (os.path.islink(path) and act.arch == 'any'):
newd = os.path.join(base, act.arch)
newpath = os.path.join(newd, file)
os.rename(path, newpath)
act.path = newpath
path = newpath
arch = act.arch
d = newd
if arch == 'any':
for newarch in ('i686', 'x86_64'):
newd = os.path.join(base, newarch)
newpath = os.path.join(newd, file)
if action == 'add':
try:
os.symlink(os.path.join('..', arch, file), newpath)
except FileExistsError:
pass
else:
# XXX: this should be removed as soon as symlinks are supported
self._real_dispatch(newd, ActionInfo(newpath, action))
else:
try:
os.unlink(newpath)
# this will be detected and handled later
except FileNotFoundError:
# someone deleted the file for us
pass
self._real_dispatch(d, act)
def _real_dispatch(self, d, act):
if act.action == 'add':
arch = os.path.split(d)[1]
def callback():
self._db.execute('update pkginfo set state = 0 where pkgname = ? and forarch = ?', (act.name, arch))
stat = os.stat(act.path)
mtime = int(stat.st_mtime)
try:
owner = pwd.getpwuid(stat.st_uid).pw_name
except KeyError:
owner = 'uid_%d' % stat.st_uid
self._db.execute(
'''insert or replace into pkginfo
(filename, pkgname, pkgarch, pkgver, forarch, state, owner, mtime) values
(?, ?, ?, ?, ?, ?, ?, ?)''',
(act.path, act.name, act.arch, act.fullversion, arch, 1, owner, mtime))
else:
res = self._db.execute('select state from pkginfo where filename = ? and state = 1 limit 1', (act.path,))
if tuple(res) == ():
# the file isn't in repo database, just delete from our info database
logging.debug('deleting entry for not-in-database package: %s', act.path)
self._db.execute('delete from pkginfo where filename = ?', (act.path,))
return
def callback():
self._db.execute('delete from pkginfo where filename = ?', (act.path,))
act.callback = callback
self.repomans[d].add_action(act)
# def process_default(self, event):
# print(event)
def filterPkg(path):
if isinstance(path, Event):
path = path.pathname
return not _pkgfile_pat.search(path)
def pkgsortkey(path):
pkg = archpkg.PkgNameInfo.parseFilename(os.path.split(path)[1])
return (pkg.name, pkg.arch, pkg)
from repomon import repomon
def main(conffile):
config = configparser.ConfigParser()
config.read(conffile)
wm = pyinotify.WatchManager()
notifier = repomon(config['repository'])
ioloop = IOLoop.instance()
handler = EventHandler(
filterPkg,
config=config['repository'],
wm=wm,
ioloop=ioloop,
)
notifier = pyinotify.TornadoAsyncNotifier(
wm,
ioloop,
default_proc_fun=handler,
)
try:
ioloop.start()
except KeyboardInterrupt:

305
repomon.py Executable file
View file

@ -0,0 +1,305 @@
#!/usr/bin/env python3
# vim:fileencoding=utf-8
import os
import re
import pwd
from functools import partial
from itertools import filterfalse
import queue
import logging
import sqlite3
import pyinotify
Event = pyinotify.Event
from tornado.ioloop import IOLoop
import tornado.process
import archpkg
logger = logging.getLogger(__name__)
# handles only x86_64, i686 and any arch packages
_pkgfile_pat = re.compile(r'(?:^|/)[a-z0-9_-]+-[a-z0-9_.]+-\d+-(?:x86_64|i686|any)\.pkg\.tar\.xz$')
class ActionInfo(archpkg.PkgNameInfo):
def __new__(cls, path, action, four=None, five=None):
if four is not None:
return super().__new__(cls, path, action, four, five)
file = os.path.split(path)[1]
self = cls.parseFilename(file)
self.action = action
self.path = path
return self
def __repr__(self):
return '<ActionInfo: %s %s>' % (self.action, self.path)
class RepoMan:
_timeout = None
_cmd_queue = queue.Queue()
_cmd_running = False
def __init__(self, config, base, ioloop=None):
self.action = []
self._ioloop = ioloop or IOLoop.instance()
self._base = base
self._repo_dir = config.get('path')
self._db_name = os.path.join(base, config.get('name') + '.db.tar.gz')
self._files_name = os.path.join(base, self._db_name.replace('.db.tar.gz', '.files.tar.gz'))
self._command_add = config.get('command-add', 'repo-add')
self._command_remove = config.get('command-remove', 'repo-remove')
self._wait_time = config.getint('wait-time', 10)
def queue_command(self, cmd, callbacks=None):
self._cmd_queue.put((cmd, callbacks))
if not self._cmd_running:
self.run_command()
def run_command(self):
self.__class__._cmd_running = True
try:
cmd, callbacks = self._cmd_queue.get_nowait()
except queue.Empty:
self.__class__._cmd_running = False
return
logger.info('Running cmd: %r', cmd)
# have to specify io_loop or we'll get error tracebacks
p = tornado.process.Subprocess(cmd, io_loop=self._ioloop)
p.set_exit_callback(partial(self.command_done, callbacks))
def command_done(self, callbacks, status):
if status == 0:
if callbacks:
for cb in callbacks:
cb()
logger.info('previous command done.')
else:
logger.warn('previous command failed with status code %d.', status)
self.run_command()
def _do_cmd(self, cmd, items, callbacks):
cmd1 = [cmd, self._db_name]
cmd1.extend(items)
cmd2 = [cmd, '-f', self._files_name]
cmd2.extend(items)
self.queue_command(cmd1)
self.queue_command(cmd2, callbacks)
def _do_add(self, toadd):
if toadd:
files, callbacks = zip(*toadd)
self._do_cmd(self._command_add, files, callbacks)
def _do_remove(self, toremove):
if toremove:
files, callbacks = zip(*toremove)
self._do_cmd(self._command_remove, files, callbacks)
def add_action(self, action):
logger.info('Adding action %r to db %r', action, self._db_name)
self._action_pending(action)
def _action_pending(self, act):
self.action.append(act)
if self._timeout:
self._ioloop.remove_timeout(self._timeout)
self._timeout = self._ioloop.add_timeout(
self._ioloop.time() + self._wait_time,
self.run,
)
def run(self):
self._timeout = None
actions = self.action
self.action = []
actiondict = {}
for act in actions:
if act.name not in actiondict:
actiondict[act.name] = act
else:
oldact = actiondict[act.name]
if oldact == act and oldact.action != act.action:
# same package, opposite actions, do nothing
del actiondict[act.name]
else:
# take the later action
actiondict[act.name] = act
toadd = [(x.path, x.callback) for x in actiondict.values() if x.action == 'add']
toremove = [(x.name, x.callback) for x in actiondict.values() if x.action == 'remove']
self._do_add(toadd)
self._do_remove(toremove)
class EventHandler(pyinotify.ProcessEvent):
def my_init(self, config, wm, ioloop=None):
self.moved_away = {}
self.repomans = {}
self._ioloop = ioloop or IOLoop.instance()
base = config.get('path')
dbname = config.get('info-db', os.path.join(base, 'pkginfo.db'))
self._db = sqlite3.connect(dbname, isolation_level=None) # isolation_level=None means autocommit
self._db.execute('''create table if not exists pkginfo
(filename text unique,
pkgname text,
pkgarch text,
pkgver text,
forarch text,
owner text,
mtime int,
state int)''')
dirs = [os.path.join(base, x) for x in ('any', 'i686', 'x86_64')]
self.files = files = set()
for d in dirs:
files.update(os.path.join(d, f) for f in os.listdir(d))
wm.add_watch(d, pyinotify.ALL_EVENTS)
self.repomans[d] = RepoMan(config, d, self._ioloop)
self._initial_update(files)
def _initial_update(self, files):
oldfiles = {f[0] for f in self._db.execute('select filename from pkginfo')}
for f in sorted(filterfalse(filterPkg, files - oldfiles), key=pkgsortkey):
self.dispatch(f, 'add')
for f in sorted(filterfalse(filterPkg, oldfiles - files), key=pkgsortkey):
self.dispatch(f, 'remove')
def process_IN_CLOSE_WRITE(self, event):
logger.debug('Writing done: %s', event.pathname)
self.dispatch(event.pathname, 'add')
self.files.add(event.pathname)
def process_IN_DELETE(self, event):
logger.debug('Removing: %s', event.pathname)
self.dispatch(event.pathname, 'remove')
try:
self.files.remove(event.pathname)
except KeyError:
# symlinks haven't been added
pass
def movedOut(self, event):
logger.debug('Moved away: %s', event.pathname)
self.dispatch(event.pathname, 'remove')
def process_IN_MOVED_FROM(self, event):
self.moved_away[event.cookie] = self._ioloop.add_timeout(
self._ioloop.time() + 0.1,
partial(self.movedOut, event),
)
self.files.remove(event.pathname)
def process_IN_MOVED_TO(self, event):
if event.pathname in self.files:
logger.warn('Overwritten: %s', event.pathname)
self.files.add(event.pathname)
if event.cookie in self.moved_away:
self._ioloop.remove_timeout(self.moved_away[event.cookie])
else:
logger.debug('Moved here: %s', event.pathname)
self.dispatch(event.pathname, 'add')
def dispatch(self, path, action):
act = ActionInfo(path, action)
d, file = os.path.split(path)
base, arch = os.path.split(d)
# rename if a packager has added to a wrong directory
# but not for a link that has arch=any, as that's what we created
if action == 'add' and act.arch != arch and not (os.path.islink(path) and act.arch == 'any'):
newd = os.path.join(base, act.arch)
newpath = os.path.join(newd, file)
os.rename(path, newpath)
act.path = newpath
path = newpath
arch = act.arch
d = newd
if arch == 'any':
for newarch in ('i686', 'x86_64'):
newd = os.path.join(base, newarch)
newpath = os.path.join(newd, file)
if action == 'add':
try:
os.symlink(os.path.join('..', arch, file), newpath)
except FileExistsError:
pass
else:
# XXX: this should be removed as soon as symlinks are supported
self._real_dispatch(newd, ActionInfo(newpath, action))
else:
try:
os.unlink(newpath)
# this will be detected and handled later
except FileNotFoundError:
# someone deleted the file for us
pass
self._real_dispatch(d, act)
def _real_dispatch(self, d, act):
if act.action == 'add':
arch = os.path.split(d)[1]
def callback():
self._db.execute('update pkginfo set state = 0 where pkgname = ? and forarch = ?', (act.name, arch))
stat = os.stat(act.path)
mtime = int(stat.st_mtime)
try:
owner = pwd.getpwuid(stat.st_uid).pw_name
except KeyError:
owner = 'uid_%d' % stat.st_uid
self._db.execute(
'''insert or replace into pkginfo
(filename, pkgname, pkgarch, pkgver, forarch, state, owner, mtime) values
(?, ?, ?, ?, ?, ?, ?, ?)''',
(act.path, act.name, act.arch, act.fullversion, arch, 1, owner, mtime))
else:
res = self._db.execute('select state from pkginfo where filename = ? and state = 1 limit 1', (act.path,))
if tuple(res) == ():
# the file isn't in repo database, just delete from our info database
logger.debug('deleting entry for not-in-database package: %s', act.path)
self._db.execute('delete from pkginfo where filename = ?', (act.path,))
return
def callback():
self._db.execute('delete from pkginfo where filename = ?', (act.path,))
act.callback = callback
self.repomans[d].add_action(act)
# def process_default(self, event):
# print(event)
def filterPkg(path):
if isinstance(path, Event):
path = path.pathname
return not _pkgfile_pat.search(path)
def pkgsortkey(path):
pkg = archpkg.PkgNameInfo.parseFilename(os.path.split(path)[1])
return (pkg.name, pkg.arch, pkg)
def repomon(config):
wm = pyinotify.WatchManager()
ioloop = IOLoop.instance()
handler = EventHandler(
filterPkg,
config=config,
wm=wm,
ioloop=ioloop,
)
return pyinotify.TornadoAsyncNotifier(
wm,
ioloop,
default_proc_fun=handler,
)