fix throttling

The old implementation will wait for all of the last max_concurrent ones
to complete.
This commit is contained in:
lilydjwg 2018-04-11 16:40:33 +08:00
parent 79c36641a2
commit 9f1f769738

View file

@ -122,40 +122,43 @@ class Source:
self.oldvers = {}
self.curvers = self.oldvers.copy()
futures = set()
token_q = asyncio.Queue(maxsize=self.max_concurrent)
async def worker(name, conf):
await token_q.get()
ret = await get_version(name, conf)
return name, ret
async def token_filler(n):
for _ in range(n):
await token_q.put(True)
config = self.config
futures = []
for name in config.sections():
if name == '__config__':
continue
conf = config[name]
conf['oldver'] = self.oldvers.get(name, None)
fu = asyncio.ensure_future(get_version(name, conf))
fu.name = name
futures.add(fu)
fu = asyncio.ensure_future(worker(name, conf))
futures.append(fu)
if len(futures) >= self.max_concurrent:
(done, futures) = await asyncio.wait(
futures, return_when = asyncio.FIRST_COMPLETED)
for fu in done:
self.future_done(fu)
filler_fu = asyncio.ensure_future(token_filler(len(futures)))
if futures:
(done, _) = await asyncio.wait(futures)
for fu in done:
self.future_done(fu)
for fu in asyncio.as_completed(futures):
try:
name, version = await fu
if version is not None:
self.print_version_update(name, version)
except Exception:
logger.exception('unexpected error happened', name=name)
await filler_fu
if self.newver:
write_verfile(self.newver, self.curvers)
def future_done(self, fu):
name = fu.name
try:
version = fu.result()
if version is not None:
self.print_version_update(name, version)
except Exception:
logger.exception('unexpected error happened', name=name)
def print_version_update(self, name, version):
oldver = self.oldvers.get(name, None)
if not oldver or oldver != version: