From 6fd3ba95ba442b866e12f803c4914771319c51cb Mon Sep 17 00:00:00 2001 From: lilydjwg Date: Fri, 25 Jun 2021 15:22:40 +0800 Subject: [PATCH] apt: handle multiple verions correctly closes #191 --- nvchecker_source/apt.py | 74 +++++++++++++++++++++++++++++++++++++---- tests/test_apt.py | 11 ++++++ 2 files changed, 79 insertions(+), 6 deletions(-) diff --git a/nvchecker_source/apt.py b/nvchecker_source/apt.py index e04ef59..dddb634 100644 --- a/nvchecker_source/apt.py +++ b/nvchecker_source/apt.py @@ -3,9 +3,12 @@ from __future__ import annotations +import re import asyncio -from io import StringIO from typing import Dict, Tuple +import itertools +import functools +from collections import defaultdict from nvchecker.api import ( session, GetVersionError, @@ -17,6 +20,60 @@ APT_PACKAGES_PATH = "%s/binary-%s/Packages%s" APT_PACKAGES_URL = "%s/dists/%s/%s" APT_PACKAGES_SUFFIX_PREFER = (".xz", ".gz", "") +DpkgVersion = Tuple[int, str, str] + +def parse_version(s: str) -> DpkgVersion: + try: + epoch_str, rest = s.split(':', 1) + except ValueError: + epoch = 0 + rest = s + else: + epoch = int(epoch_str) + + try: + ver, rev = rest.split('-', 1) + except ValueError: + ver = rest + rev = '' + + return epoch, ver, rev + +def _compare_part(a: str, b: str) -> int: + sa = re.split(r'(\d+)', a) + sb = re.split(r'(\d+)', b) + for idx, (pa, pb) in enumerate(itertools.zip_longest(sa, sb)): + if pa is None: + return -1 + elif pb is None: + return 1 + + if idx % 2 == 1: + ret = int(pa) - int(pb) + if ret != 0: + return ret + else: + if pa < pb: + return -1 + elif pa > pb: + return 1 + + return 0 + +def compare_version_parsed(a: DpkgVersion, b: DpkgVersion) -> int: + ret = a[0] - b[0] + if ret != 0: + return ret + ret = _compare_part(a[1], b[1]) + if ret != 0: + return ret + return _compare_part(a[2], b[2]) + +def compare_version(a: str, b: str) -> int: + va = parse_version(a) + vb = parse_version(b) + return compare_version_parsed(va, vb) + def _decompress_data(url: str, data: bytes) -> str: if url.endswith(".xz"): import lzma @@ -39,8 +96,8 @@ async def parse_packages(key: Tuple[AsyncCache, str]) -> Tuple[Dict[str, str], D cache, url = key apt_packages = await cache.get(url, get_url) # type: ignore - pkg_map = {} - srcpkg_map = {} + pkg_map = defaultdict(list) + srcpkg_map = defaultdict(list) pkg = None srcpkg = None @@ -52,12 +109,17 @@ async def parse_packages(key: Tuple[AsyncCache, str]) -> Tuple[Dict[str, str], D elif line.startswith("Version: "): version = line[9:] if pkg is not None: - pkg_map[pkg] = version + pkg_map[pkg].append(version) if srcpkg is not None: - srcpkg_map[srcpkg] = version + srcpkg_map[srcpkg].append(version) pkg = srcpkg = None - return pkg_map, srcpkg_map + pkg_map_max = {pkg: max(vs, key=functools.cmp_to_key(compare_version)) + for pkg, vs in pkg_map.items()} + srcpkg_map_max = {pkg: max(vs, key=functools.cmp_to_key(compare_version)) + for pkg, vs in srcpkg_map.items()} + + return pkg_map_max, srcpkg_map_max async def get_version( name: str, conf: Entry, *, diff --git a/tests/test_apt.py b/tests/test_apt.py index adcc07b..aef8a2e 100644 --- a/tests/test_apt.py +++ b/tests/test_apt.py @@ -39,3 +39,14 @@ async def test_apt_deepin(get_version): "mirror": "https://community-packages.deepin.com/deepin", "suite": "apricot", }) == "0.1.6-1" + +@flaky(max_runs=10) +async def test_apt_multiversions(get_version): + assert await get_version("ms-teams", { + "source": "apt", + "mirror": "https://packages.microsoft.com/repos/ms-teams", + "pkg": "teams", + "suite": "stable", + "repo": "main", + "arch": "amd64", + }) == "1.4.00.13653"