add module

This commit is contained in:
Kuoi 2022-05-11 22:23:21 +01:00
parent f8f6385d47
commit f1328756b2
2 changed files with 89 additions and 106 deletions

View file

@ -1,106 +0,0 @@
#!/usr/bin/python3
import asyncio
import os
from collections import defaultdict
from typing import Dict, List, Set
import pathlib
import logging
import subprocess
import pyalpm
from myutils import lock_file
from agithub import GitHub
from lilac2.const import mydir as lilacdir, OFFICIAL_REPOS, PACMAN_DB_DIR
from lilac2 import pkgbuild, packages
from webhooks.issue import parse_issue_text
repodir = pathlib.Path('~/archgitrepo/archlinuxcn').expanduser()
github_repo = 'archlinuxcn/repo'
logger = logging.getLogger(__name__)
async def file_issues(gh: GitHub, duplicate_info: Dict[str, List[str]]) -> None:
for pkgbase, pkgnames in duplicate_info.items():
body = f'''\
### 问题类型 / Type of issues
* 软件包被官方仓库收录 / available in official repos
### 受影响的软件包 / Affect packages
* {pkgbase}
'''
if [pkgbase] != pkgnames:
body += '----\n'
body += '\n'.join(f'''包 {pkgname} 从属于包基础 {pkgbase}。''' for pkgname in pkgnames)
# print(body)
issue = await gh.create_issue(github_repo, f'{", ".join(pkgnames)} in official repos now', body)
print('Created:', issue)
async def get_open_issue_packages(gh: GitHub) -> Set[str]:
issues = [
issue async for issue in gh.get_repo_issues(
github_repo, labels='in-official-repos')
]
ret: Set[str] = set()
for issue in issues:
_issuetype, packages = parse_issue_text(issue.body)
ret.update(packages)
return ret
def get_official_packages() -> set[str]:
ret: set[str] = set()
H = pyalpm.Handle('/', str(PACMAN_DB_DIR))
for repo in OFFICIAL_REPOS:
db = H.register_syncdb(repo, 0)
ret.update(p.name for p in db.pkgcache)
return ret
def main() -> None:
lock_file(lilacdir / '.lock')
token = os.environ['GITHUB_TOKEN']
gh = GitHub(token)
loop = asyncio.new_event_loop()
pkgbuild.update_pacmandb(PACMAN_DB_DIR, quiet=True)
official = get_official_packages()
subprocess.check_call(
['git', 'pull', '--no-edit', '-q'],
cwd=repodir, stdout=subprocess.DEVNULL,
)
ours = packages.get_all_pkgnames(repodir)
duplicates = official & {x[1] for x in ours}
logger.debug('duplicates: %r', duplicates)
# aiohttp is unhappy with async.run:
# Timeout context manager should be used inside a task
open_packages = loop.run_until_complete(get_open_issue_packages(gh))
logger.debug('open_packages: %r', open_packages)
duplicate_info: Dict[str, List[str]] = defaultdict(list)
for pkgbase, pkgname in ours:
if pkgbase in open_packages:
continue
if pkgname in duplicates:
duplicate_info[pkgbase].append(pkgname)
if duplicate_info:
loop.run_until_complete(file_issues(gh, duplicate_info))
if __name__ == '__main__':
from nicelogger import enable_pretty_logging
enable_pretty_logging('WARNING')
main()

89
webhooks/archpkg.py Normal file
View file

@ -0,0 +1,89 @@
from __future__ import annotations
import os
from collections import namedtuple
import subprocess
import re
from typing import Tuple, List, Dict
from pkg_resources import parse_version as _parse_version
def parse_arch_version(v: str) -> Tuple[int, Tuple[str, ...]]:
if ':' in v:
epoch = int(v.split(':', 1)[0])
else:
epoch = 0
return epoch, _parse_version(v)
class PkgNameInfo(namedtuple('PkgNameInfo', 'name, version, release, arch')):
def __lt__(self, other) -> bool:
if self.name != other.name or self.arch != other.arch:
return NotImplemented
if self.version != other.version:
return parse_arch_version(self.version) < parse_arch_version(other.version)
return float(self.release) < float(other.release)
def __gt__(self, other) -> bool:
# No, try the other side please.
return NotImplemented
@property
def fullversion(self) -> str:
return '%s-%s' % (self.version, self.release)
@classmethod
def parseFilename(cls, filename: str) -> 'PkgNameInfo':
return cls(*trimext(filename, 3).rsplit('-', 3))
def trimext(name: str, num: int = 1) -> str:
for i in range(num):
name = os.path.splitext(name)[0]
return name
def get_pkgname_with_bash(PKGBUILD: str) -> List[str]:
script = '''\
. '%s'
echo ${pkgname[*]}''' % PKGBUILD
# Python 3.4 has 'input' arg for check_output
p = subprocess.Popen(
['bwrap', '--unshare-all', '--ro-bind', '/', '/', '--tmpfs', '/home',
'--tmpfs', '/run', '--die-with-parent',
'--tmpfs', '/tmp', '--proc', '/proc', '--dev', '/dev', '/bin/bash'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
)
output = p.communicate(script.encode())[0].decode()
ret = p.wait()
if ret != 0:
raise subprocess.CalledProcessError(
ret, ['bash'], output)
return output.split()
pkgfile_pat = re.compile(r'(?:^|/).+-[^-]+-[\d.]+-(?:\w+)\.pkg\.tar\.(?:xz|zst)$')
def _strip_ver(s: str) -> str:
return re.sub(r'[<>=].*', '', s)
def get_package_info(name: str, local: bool = False) -> Dict[str, str]:
old_lang = os.environ['LANG']
os.environ['LANG'] = 'C'
args = '-Qi' if local else '-Si'
try:
outb = subprocess.check_output(["pacman", args, name])
out = outb.decode('latin1')
finally:
os.environ['LANG'] = old_lang
ret = {}
for l in out.splitlines():
if not l:
continue
if l[0] not in ' \t':
key, value = l.split(':', 1)
key = key.strip()
value = value.strip()
ret[key] = value
else:
ret[key] += ' ' + l.strip()
return ret