From 79494a5722cda58019e6faf7f49ff8ba077e51f6 Mon Sep 17 00:00:00 2001 From: lilydjwg Date: Tue, 26 Feb 2019 15:44:46 +0800 Subject: [PATCH] repomon: move send_notification to EventHandler --- archrepo2/repomon.py | 137 ++++++++++++++++++++++++------------------- 1 file changed, 78 insertions(+), 59 deletions(-) diff --git a/archrepo2/repomon.py b/archrepo2/repomon.py index b788815..bdb8b82 100755 --- a/archrepo2/repomon.py +++ b/archrepo2/repomon.py @@ -50,10 +50,11 @@ class RepoMan: _cmd_queue = queue.Queue() _cmd_running = False - def __init__(self, config, base): + def __init__(self, config, base, siteman): self.action = [] self._ioloop = IOLoop.current() self._base = base + self._siteman = siteman self._repo_dir = config.get('path') self.name = config.get('name') @@ -66,27 +67,20 @@ class RepoMan: self._auto_rename = config.getboolean('auto-rename', True) self._symlink_any = config.getboolean('symlink-any', True) - notification_type = config.get('notification-type', 'null') - if notification_type != 'null': - self._notification_addrs = config.get('notification-addresses') - self._notification_secret = config.get('notification-secret') - self.send_notification = getattr( - self, - 'send_notification_' + notification_type.replace('-', '_'), - ) - def queue_command(self, cmd, callbacks=None): self._cmd_queue.put((cmd, callbacks)) if not self._cmd_running: self.run_command() def run_command(self): + if not self._cmd_running: + self._siteman.inc_running() self.__class__._cmd_running = True try: cmd, callbacks = self._cmd_queue.get_nowait() except queue.Empty: - self.send_notification() self.__class__._cmd_running = False + self._siteman.dec_running() return logger.info('Running cmd: %r', cmd) @@ -148,52 +142,6 @@ class RepoMan: self.run, ) - def send_notification_simple_udp(self): - msg = self._new_notification_msg() - - socks = {} - for address, port in self._parse_notification_address_inet(): - try: - af, socktype, proto, canonname, sockaddr = socket.getaddrinfo( - address, port, 0, socket.SOCK_DGRAM, 0, 0)[0] - except: - logger.exception('failed to create socket to %r for notification', - (address, port)) - continue - - info = af, socktype, proto - if info not in socks: - sock = socket.socket(*info) - socks[info] = sock - else: - sock = socks[info] - sock.sendto(msg, sockaddr) - logger.info('simple udp notification sent to %s.', (address, port)) - - def _new_notification_msg(self): - s = 'update' - t = str(int(time.time())) - data = s + '|' + t - hashing = data + self._notification_secret - sig = hashlib.sha1(hashing.encode('utf-8')).hexdigest() - msg = data + '|' + sig - logger.info('new notification msg: %s.', msg) - return msg.encode('utf-8') - - def _parse_notification_address_inet(self): - cached = self._notification_addrs - if isinstance(cached, str): - addresses = [] - for addr in cached.split(): - host, port = addr.rsplit(':', 1) - port = int(port) - addresses.append((host, port)) - cached = self._notification_addrs = tuple(addresses) - return cached - - def send_notification_null(self): - logger.info('null notification sent.') - def run(self): self._timeout = None actions = self.action @@ -218,7 +166,24 @@ class RepoMan: self._do_remove(toremove) class EventHandler(pyinotify.ProcessEvent): - def my_init(self, filter_pkg, supported_archs, config, wm): + _n_running = 0 + def my_init( + self, filter_pkg, supported_archs, config, wm, + ): + + notification_type = config.get( + 'notification-type', 'null') + if notification_type != 'null': + self._notification_addrs = config.get( + 'notification-addresses') + self._notification_secret = config.get( + 'notification-secret') + self.send_notification = getattr( + self, + 'send_notification_' + + notification_type.replace('-', '_'), + ) + self.filter_pkg = filter_pkg self.moved_away = {} self.repomans = {} @@ -260,7 +225,7 @@ class EventHandler(pyinotify.ProcessEvent): if os.path.exists(p): # filter broken symlinks files.add(p) wm.add_watch(d, pyinotify.ALL_EVENTS) - self.repomans[d] = RepoMan(config, d) + self.repomans[d] = RepoMan(config, d, self) self.name = self.repomans[d].name self._auto_rename = self.repomans[d]._auto_rename self._symlink_any = self.repomans[d]._symlink_any @@ -455,6 +420,60 @@ class EventHandler(pyinotify.ProcessEvent): self._db.execute('''delete from sigfiles where filename = ? and pkgrepo = ?''', (rpath, self.name)) + def dec_running(self): + self._n_running -= 1 + if self._n_running == 0: + self.send_notification() + + def inc_running(self): + self._n_running += 1 + + def send_notification_simple_udp(self): + msg = self._new_notification_msg() + + socks = {} + for address, port in self._parse_notification_address_inet(): + try: + af, socktype, proto, canonname, sockaddr = socket.getaddrinfo( + address, port, 0, socket.SOCK_DGRAM, 0, 0)[0] + except: + logger.exception('failed to create socket to %r for notification', + (address, port)) + continue + + info = af, socktype, proto + if info not in socks: + sock = socket.socket(*info) + socks[info] = sock + else: + sock = socks[info] + sock.sendto(msg, sockaddr) + logger.info('simple udp notification sent to %s.', (address, port)) + + def _new_notification_msg(self): + s = 'update' + t = str(int(time.time())) + data = s + '|' + t + hashing = data + self._notification_secret + sig = hashlib.sha1(hashing.encode('utf-8')).hexdigest() + msg = data + '|' + sig + logger.info('new notification msg: %s.', msg) + return msg.encode('utf-8') + + def _parse_notification_address_inet(self): + cached = self._notification_addrs + if isinstance(cached, str): + addresses = [] + for addr in cached.split(): + host, port = addr.rsplit(':', 1) + port = int(port) + addresses.append((host, port)) + cached = self._notification_addrs = tuple(addresses) + return cached + + def send_notification_null(self): + logger.info('null notification sent.') + def filter_pkg(regex, path): if isinstance(path, Event): path = path.pathname