mirror of
https://github.com/lilydjwg/archrepo2.git
synced 2025-03-10 12:02:43 +00:00
repomon: move send_notification to EventHandler
This commit is contained in:
parent
b3d473d79b
commit
79494a5722
1 changed files with 78 additions and 59 deletions
|
@ -50,10 +50,11 @@ class RepoMan:
|
||||||
_cmd_queue = queue.Queue()
|
_cmd_queue = queue.Queue()
|
||||||
_cmd_running = False
|
_cmd_running = False
|
||||||
|
|
||||||
def __init__(self, config, base):
|
def __init__(self, config, base, siteman):
|
||||||
self.action = []
|
self.action = []
|
||||||
self._ioloop = IOLoop.current()
|
self._ioloop = IOLoop.current()
|
||||||
self._base = base
|
self._base = base
|
||||||
|
self._siteman = siteman
|
||||||
|
|
||||||
self._repo_dir = config.get('path')
|
self._repo_dir = config.get('path')
|
||||||
self.name = config.get('name')
|
self.name = config.get('name')
|
||||||
|
@ -66,27 +67,20 @@ class RepoMan:
|
||||||
self._auto_rename = config.getboolean('auto-rename', True)
|
self._auto_rename = config.getboolean('auto-rename', True)
|
||||||
self._symlink_any = config.getboolean('symlink-any', True)
|
self._symlink_any = config.getboolean('symlink-any', True)
|
||||||
|
|
||||||
notification_type = config.get('notification-type', 'null')
|
|
||||||
if notification_type != 'null':
|
|
||||||
self._notification_addrs = config.get('notification-addresses')
|
|
||||||
self._notification_secret = config.get('notification-secret')
|
|
||||||
self.send_notification = getattr(
|
|
||||||
self,
|
|
||||||
'send_notification_' + notification_type.replace('-', '_'),
|
|
||||||
)
|
|
||||||
|
|
||||||
def queue_command(self, cmd, callbacks=None):
|
def queue_command(self, cmd, callbacks=None):
|
||||||
self._cmd_queue.put((cmd, callbacks))
|
self._cmd_queue.put((cmd, callbacks))
|
||||||
if not self._cmd_running:
|
if not self._cmd_running:
|
||||||
self.run_command()
|
self.run_command()
|
||||||
|
|
||||||
def run_command(self):
|
def run_command(self):
|
||||||
|
if not self._cmd_running:
|
||||||
|
self._siteman.inc_running()
|
||||||
self.__class__._cmd_running = True
|
self.__class__._cmd_running = True
|
||||||
try:
|
try:
|
||||||
cmd, callbacks = self._cmd_queue.get_nowait()
|
cmd, callbacks = self._cmd_queue.get_nowait()
|
||||||
except queue.Empty:
|
except queue.Empty:
|
||||||
self.send_notification()
|
|
||||||
self.__class__._cmd_running = False
|
self.__class__._cmd_running = False
|
||||||
|
self._siteman.dec_running()
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.info('Running cmd: %r', cmd)
|
logger.info('Running cmd: %r', cmd)
|
||||||
|
@ -148,52 +142,6 @@ class RepoMan:
|
||||||
self.run,
|
self.run,
|
||||||
)
|
)
|
||||||
|
|
||||||
def send_notification_simple_udp(self):
|
|
||||||
msg = self._new_notification_msg()
|
|
||||||
|
|
||||||
socks = {}
|
|
||||||
for address, port in self._parse_notification_address_inet():
|
|
||||||
try:
|
|
||||||
af, socktype, proto, canonname, sockaddr = socket.getaddrinfo(
|
|
||||||
address, port, 0, socket.SOCK_DGRAM, 0, 0)[0]
|
|
||||||
except:
|
|
||||||
logger.exception('failed to create socket to %r for notification',
|
|
||||||
(address, port))
|
|
||||||
continue
|
|
||||||
|
|
||||||
info = af, socktype, proto
|
|
||||||
if info not in socks:
|
|
||||||
sock = socket.socket(*info)
|
|
||||||
socks[info] = sock
|
|
||||||
else:
|
|
||||||
sock = socks[info]
|
|
||||||
sock.sendto(msg, sockaddr)
|
|
||||||
logger.info('simple udp notification sent to %s.', (address, port))
|
|
||||||
|
|
||||||
def _new_notification_msg(self):
|
|
||||||
s = 'update'
|
|
||||||
t = str(int(time.time()))
|
|
||||||
data = s + '|' + t
|
|
||||||
hashing = data + self._notification_secret
|
|
||||||
sig = hashlib.sha1(hashing.encode('utf-8')).hexdigest()
|
|
||||||
msg = data + '|' + sig
|
|
||||||
logger.info('new notification msg: %s.', msg)
|
|
||||||
return msg.encode('utf-8')
|
|
||||||
|
|
||||||
def _parse_notification_address_inet(self):
|
|
||||||
cached = self._notification_addrs
|
|
||||||
if isinstance(cached, str):
|
|
||||||
addresses = []
|
|
||||||
for addr in cached.split():
|
|
||||||
host, port = addr.rsplit(':', 1)
|
|
||||||
port = int(port)
|
|
||||||
addresses.append((host, port))
|
|
||||||
cached = self._notification_addrs = tuple(addresses)
|
|
||||||
return cached
|
|
||||||
|
|
||||||
def send_notification_null(self):
|
|
||||||
logger.info('null notification sent.')
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self._timeout = None
|
self._timeout = None
|
||||||
actions = self.action
|
actions = self.action
|
||||||
|
@ -218,7 +166,24 @@ class RepoMan:
|
||||||
self._do_remove(toremove)
|
self._do_remove(toremove)
|
||||||
|
|
||||||
class EventHandler(pyinotify.ProcessEvent):
|
class EventHandler(pyinotify.ProcessEvent):
|
||||||
def my_init(self, filter_pkg, supported_archs, config, wm):
|
_n_running = 0
|
||||||
|
def my_init(
|
||||||
|
self, filter_pkg, supported_archs, config, wm,
|
||||||
|
):
|
||||||
|
|
||||||
|
notification_type = config.get(
|
||||||
|
'notification-type', 'null')
|
||||||
|
if notification_type != 'null':
|
||||||
|
self._notification_addrs = config.get(
|
||||||
|
'notification-addresses')
|
||||||
|
self._notification_secret = config.get(
|
||||||
|
'notification-secret')
|
||||||
|
self.send_notification = getattr(
|
||||||
|
self,
|
||||||
|
'send_notification_' +
|
||||||
|
notification_type.replace('-', '_'),
|
||||||
|
)
|
||||||
|
|
||||||
self.filter_pkg = filter_pkg
|
self.filter_pkg = filter_pkg
|
||||||
self.moved_away = {}
|
self.moved_away = {}
|
||||||
self.repomans = {}
|
self.repomans = {}
|
||||||
|
@ -260,7 +225,7 @@ class EventHandler(pyinotify.ProcessEvent):
|
||||||
if os.path.exists(p): # filter broken symlinks
|
if os.path.exists(p): # filter broken symlinks
|
||||||
files.add(p)
|
files.add(p)
|
||||||
wm.add_watch(d, pyinotify.ALL_EVENTS)
|
wm.add_watch(d, pyinotify.ALL_EVENTS)
|
||||||
self.repomans[d] = RepoMan(config, d)
|
self.repomans[d] = RepoMan(config, d, self)
|
||||||
self.name = self.repomans[d].name
|
self.name = self.repomans[d].name
|
||||||
self._auto_rename = self.repomans[d]._auto_rename
|
self._auto_rename = self.repomans[d]._auto_rename
|
||||||
self._symlink_any = self.repomans[d]._symlink_any
|
self._symlink_any = self.repomans[d]._symlink_any
|
||||||
|
@ -455,6 +420,60 @@ class EventHandler(pyinotify.ProcessEvent):
|
||||||
self._db.execute('''delete from sigfiles where filename = ? and pkgrepo = ?''',
|
self._db.execute('''delete from sigfiles where filename = ? and pkgrepo = ?''',
|
||||||
(rpath, self.name))
|
(rpath, self.name))
|
||||||
|
|
||||||
|
def dec_running(self):
|
||||||
|
self._n_running -= 1
|
||||||
|
if self._n_running == 0:
|
||||||
|
self.send_notification()
|
||||||
|
|
||||||
|
def inc_running(self):
|
||||||
|
self._n_running += 1
|
||||||
|
|
||||||
|
def send_notification_simple_udp(self):
|
||||||
|
msg = self._new_notification_msg()
|
||||||
|
|
||||||
|
socks = {}
|
||||||
|
for address, port in self._parse_notification_address_inet():
|
||||||
|
try:
|
||||||
|
af, socktype, proto, canonname, sockaddr = socket.getaddrinfo(
|
||||||
|
address, port, 0, socket.SOCK_DGRAM, 0, 0)[0]
|
||||||
|
except:
|
||||||
|
logger.exception('failed to create socket to %r for notification',
|
||||||
|
(address, port))
|
||||||
|
continue
|
||||||
|
|
||||||
|
info = af, socktype, proto
|
||||||
|
if info not in socks:
|
||||||
|
sock = socket.socket(*info)
|
||||||
|
socks[info] = sock
|
||||||
|
else:
|
||||||
|
sock = socks[info]
|
||||||
|
sock.sendto(msg, sockaddr)
|
||||||
|
logger.info('simple udp notification sent to %s.', (address, port))
|
||||||
|
|
||||||
|
def _new_notification_msg(self):
|
||||||
|
s = 'update'
|
||||||
|
t = str(int(time.time()))
|
||||||
|
data = s + '|' + t
|
||||||
|
hashing = data + self._notification_secret
|
||||||
|
sig = hashlib.sha1(hashing.encode('utf-8')).hexdigest()
|
||||||
|
msg = data + '|' + sig
|
||||||
|
logger.info('new notification msg: %s.', msg)
|
||||||
|
return msg.encode('utf-8')
|
||||||
|
|
||||||
|
def _parse_notification_address_inet(self):
|
||||||
|
cached = self._notification_addrs
|
||||||
|
if isinstance(cached, str):
|
||||||
|
addresses = []
|
||||||
|
for addr in cached.split():
|
||||||
|
host, port = addr.rsplit(':', 1)
|
||||||
|
port = int(port)
|
||||||
|
addresses.append((host, port))
|
||||||
|
cached = self._notification_addrs = tuple(addresses)
|
||||||
|
return cached
|
||||||
|
|
||||||
|
def send_notification_null(self):
|
||||||
|
logger.info('null notification sent.')
|
||||||
|
|
||||||
def filter_pkg(regex, path):
|
def filter_pkg(regex, path):
|
||||||
if isinstance(path, Event):
|
if isinstance(path, Event):
|
||||||
path = path.pathname
|
path = path.pathname
|
||||||
|
|
Loading…
Add table
Reference in a new issue