diff --git a/maigret/checking.py b/maigret/checking.py index 783c65e..e9fa106 100644 --- a/maigret/checking.py +++ b/maigret/checking.py @@ -46,6 +46,14 @@ QueryDraft = Tuple[Callable, Any, Any] QueriesDraft = Iterable[QueryDraft] +def create_task_func(): + if sys.version_info.minor > 6: + create_asyncio_task = asyncio.create_task + else: + loop = asyncio.get_event_loop() + create_asyncio_task = loop.create_task + return create_asyncio_task + class AsyncExecutor: def __init__(self, *args, **kwargs): self.logger = kwargs['logger'] @@ -109,29 +117,38 @@ class AsyncioProgressbarQueueExecutor(AsyncExecutor): self.workers_count = kwargs.get('in_parallel', 10) self.progress_func = kwargs.get('progress_func', tqdm.tqdm) self.queue = asyncio.Queue(self.workers_count) + self.timeout = kwargs.get('timeout') async def worker(self): while True: - f, args, kwargs = await self.queue.get() - result = await f(*args, **kwargs) + try: + f, args, kwargs = self.queue.get_nowait() + except asyncio.QueueEmpty: + return + + query_future = f(*args, **kwargs) + query_task = create_task_func()(query_future) + try: + result = await asyncio.wait_for(query_task, timeout=self.timeout) + except asyncio.TimeoutError: + result = None + self.results.append(result) self.progress.update(1) self.queue.task_done() - async def _run(self, tasks: QueriesDraft): + async def _run(self, queries: QueriesDraft): self.results = [] - if sys.version_info.minor > 6: - create_task = asyncio.create_task - else: - loop = asyncio.get_event_loop() - create_task = loop.create_task + queries_list = list(queries) - workers = [create_task(self.worker()) - for _ in range(self.workers_count)] - task_list = list(tasks) - self.progress = self.progress_func(total=len(task_list)) - for t in task_list: + min_workers = min(len(queries_list), self.workers_count) + + workers = [create_task_func()(self.worker()) + for _ in range(min_workers)] + + self.progress = self.progress_func(total=len(queries_list)) + for t in queries_list: await self.queue.put(t) await self.queue.join() for w in workers: