archrepo2/archreposrv

307 lines
9.2 KiB
Python
Executable file

#!/usr/bin/env python3
# vim:fileencoding=utf-8
import os
import sys
import re
import pwd
import configparser
from functools import partial
import queue
import logging
import sqlite3
import pyinotify
Event = pyinotify.Event
from tornado.ioloop import IOLoop
import tornado.process
import archpkg
from myutils import enable_pretty_logging
enable_pretty_logging(logging.DEBUG)
# handles only x86_64, i686 and any arch packages
_pkgfile_pat = re.compile(r'(?:^|/)[a-z0-9_-]+-[a-z0-9_.]+-\d+-(?:x86_64|i686|any)\.pkg\.tar\.xz$')
class ActionInfo(archpkg.PkgNameInfo):
def __new__(cls, path, action, four=None, five=None):
if four is not None:
return super().__new__(cls, path, action, four, five)
file = os.path.split(path)[1]
self = cls.parseFilename(file)
self.action = action
self.path = path
return self
def __repr__(self):
return '<ActionInfo: %s %s>' % (self.action, self.path)
class RepoMan:
_timeout = None
_cmd_queue = queue.Queue()
_cmd_running = False
def __init__(self, config, base, ioloop=None):
self.action = []
self._ioloop = ioloop or IOLoop.instance()
self._base = base
self._repo_dir = config.get('path')
self._db_name = os.path.join(base, config.get('name') + '.db.tar.gz')
self._files_name = os.path.join(base, self._db_name.replace('.db.tar.gz', '.files.tar.gz'))
self._command_add = config.get('command-add', 'repo-add')
self._command_remove = config.get('command-remove', 'repo-remove')
self._wait_time = config.getint('wait-time', 10)
def queue_command(self, cmd, callbacks=None):
self._cmd_queue.put((cmd, callbacks))
if not self._cmd_running:
self.run_command()
def run_command(self):
self.__class__._cmd_running = True
try:
cmd, callbacks = self._cmd_queue.get_nowait()
except queue.Empty:
self.__class__._cmd_running = False
return
logging.info('Running cmd: %r', cmd)
# have to specify io_loop or we'll get error tracebacks
p = tornado.process.Subprocess(cmd, io_loop=self._ioloop)
p.set_exit_callback(partial(self.command_done, callbacks))
def command_done(self, callbacks, status):
if status == 0 and callbacks:
for cb in callbacks:
cb()
self.run_command()
def _do_cmd(self, cmd, items, callbacks):
cmd1 = [cmd, self._db_name]
cmd1.extend(items)
cmd2 = [cmd, '-f', self._files_name]
cmd2.extend(items)
self.queue_command(cmd1)
self.queue_command(cmd2, callbacks)
def _do_add(self, toadd):
if toadd:
files, callbacks = zip(*toadd)
self._do_cmd(self._command_add, files, callbacks)
def _do_remove(self, toremove):
if toremove:
files, callbacks = zip(*toremove)
self._do_cmd(self._command_remove, files, callbacks)
def add_action(self, action):
logging.info('Adding action %r to db %r', action, self._db_name)
self._action_pending(action)
def _action_pending(self, act):
self.action.append(act)
if self._timeout:
self._ioloop.remove_timeout(self._timeout)
self._timeout = self._ioloop.add_timeout(
self._ioloop.time() + self._wait_time,
self.run,
)
def run(self):
self._timeout = None
actions = self.action
self.action = []
actiondict = {}
for act in actions:
if act.name not in actiondict:
actiondict[act.name] = act
else:
oldact = actiondict[act.name]
if oldact == act and oldact.action != act.action:
# same package, opposite actions, do nothing
del actiondict[act.name]
else:
# take the later action
actiondict[act.name] = act
toadd = [(x.path, x.callback) for x in actiondict.values() if x.action == 'add']
toremove = [(x.name, x.callback) for x in actiondict.values() if x.action == 'remove']
self._do_add(toadd)
self._do_remove(toremove)
class EventHandler(pyinotify.ProcessEvent):
def my_init(self, config, wm, ioloop=None):
self.moved_away = {}
self.repomans = {}
self._ioloop = ioloop or IOLoop.instance()
base = config.get('path')
dbname = config.get('info-db', os.path.join(base, 'pkginfo.db'))
self._db = sqlite3.connect(dbname, isolation_level=None) # isolation_level=None means autocommit
self._db.execute('''create table if not exists pkginfo
(filename text unique,
pkgname text,
pkgarch text,
pkgver text,
owner text,
mtime int,
state int)''')
dirs = [os.path.join(base, x) for x in ('any', 'i686', 'x86_64')]
self.files = files = set()
for d in dirs:
files.update(os.path.join(d, f) for f in os.listdir(d))
wm.add_watch(d, pyinotify.ALL_EVENTS)
self.repomans[d] = RepoMan(config, d, self._ioloop)
self._initial_update(files)
def _initial_update(self, files):
oldfiles = {f[0] for f in self._db.execute('select filename from pkginfo')}
for f in sorted(files - oldfiles, key=pkgsortkey):
if not filterPkg(f):
self.dispatch(f, 'add')
for f in sorted(oldfiles - files, key=pkgsortkey):
if not filterPkg(f):
self.dispatch(f, 'remove')
def process_IN_CLOSE_WRITE(self, event):
logging.debug('Writing done: %s', event.pathname)
self.dispatch(event.pathname, 'add')
self.files.add(event.pathname)
def process_IN_DELETE(self, event):
logging.debug('Removing: %s', event.pathname)
self.dispatch(event.pathname, 'remove')
try:
self.files.remove(event.pathname)
except KeyError:
# symlinks haven't been added
pass
def movedOut(self, event):
logging.debug('Moved away: %s', event.pathname)
self.dispatch(event.pathname, 'remove')
def process_IN_MOVED_FROM(self, event):
self.moved_away[event.cookie] = self._ioloop.add_timeout(
self._ioloop.time() + 0.1,
partial(self.movedOut, event),
)
self.files.remove(event.pathname)
def process_IN_MOVED_TO(self, event):
if event.pathname in self.files:
logging.warn('Overwritten: %s', event.pathname)
self.files.add(event.pathname)
if event.cookie in self.moved_away:
self._ioloop.remove_timeout(self.moved_away[event.cookie])
else:
logging.debug('Moved here: %s', event.pathname)
self.dispatch(event.pathname, 'add')
def dispatch(self, path, action):
act = ActionInfo(path, action)
d, file = os.path.split(path)
base, arch = os.path.split(d)
# rename if a packager has added to a wrong directory
# but not for a link that has arch=any, as that's what we created
if action == 'add' and act.arch != arch and not (os.path.islink(path) and act.arch == 'any'):
newd = os.path.join(base, act.arch)
newpath = os.path.join(newd, file)
os.rename(path, newpath)
act.path = newpath
path = newpath
arch = act.arch
d = newd
if arch == 'any':
for newarch in ('i686', 'x86_64'):
newd = os.path.join(base, newarch)
newpath = os.path.join(newd, file)
if action == 'add':
os.symlink(os.path.join('..', arch, file), newpath)
self._real_dispatch(newd, ActionInfo(newpath, action))
else:
try:
os.unlink(newpath)
# this will be detected and handled later
except FileNotFoundError:
# someone deleted the file for us
pass
self._real_dispatch(d, act)
def _real_dispatch(self, d, act):
if act.action == 'add':
def callback():
self._db.execute('update pkginfo set state = 0 where pkgname = ? and pkgarch = ?', (act.name, act.arch))
stat = os.stat(act.path)
mtime = int(stat.st_mtime)
try:
owner = pwd.getpwuid(stat.st_uid).pw_name
except KeyError:
owner = 'uid_%d' % stat.st_uid
self._db.execute(
'''insert or replace into pkginfo
(filename, pkgname, pkgarch, pkgver, state, owner, mtime) values
(?, ?, ?, ?, ?, ?, ?)''',
(act.path, act.name, act.arch, act.fullversion, 1, owner, mtime))
else:
res = self._db.execute('select state from pkginfo where filename = ? and state = 1 limit 1', (act.path,))
if tuple(res) == ():
# the file isn't in repo database, ignoring
return
def callback():
self._db.execute('delete from pkginfo where filename = ?', (act.path,))
act.callback = callback
self.repomans[d].add_action(act)
# def process_default(self, event):
# print(event)
def filterPkg(path):
if isinstance(path, Event):
path = path.pathname
return not _pkgfile_pat.search(path)
def pkgsortkey(path):
pkg = archpkg.PkgNameInfo.parseFilename(os.path.split(path)[1])
return (pkg.name, pkg.arch, pkg)
def main(conffile):
config = configparser.ConfigParser()
config.read(conffile)
wm = pyinotify.WatchManager()
ioloop = IOLoop.instance()
handler = EventHandler(
filterPkg,
config=config['repository'],
wm=wm,
ioloop=ioloop,
)
notifier = pyinotify.TornadoAsyncNotifier(
wm,
ioloop,
default_proc_fun=handler,
)
try:
ioloop.start()
except KeyboardInterrupt:
ioloop.close()
notifier.stop()
if __name__ == '__main__':
main(sys.argv[1])