Added asyncio tasks with timeouts, non-blocking work with queue

This commit is contained in:
Soxoj
2021-04-11 17:56:27 +03:00
parent dabba859f3
commit 2fee65fe4e
+30 -13
View File
@@ -46,6 +46,14 @@ QueryDraft = Tuple[Callable, Any, Any]
QueriesDraft = Iterable[QueryDraft] QueriesDraft = Iterable[QueryDraft]
def create_task_func():
if sys.version_info.minor > 6:
create_asyncio_task = asyncio.create_task
else:
loop = asyncio.get_event_loop()
create_asyncio_task = loop.create_task
return create_asyncio_task
class AsyncExecutor: class AsyncExecutor:
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.logger = kwargs['logger'] self.logger = kwargs['logger']
@@ -109,29 +117,38 @@ class AsyncioProgressbarQueueExecutor(AsyncExecutor):
self.workers_count = kwargs.get('in_parallel', 10) self.workers_count = kwargs.get('in_parallel', 10)
self.progress_func = kwargs.get('progress_func', tqdm.tqdm) self.progress_func = kwargs.get('progress_func', tqdm.tqdm)
self.queue = asyncio.Queue(self.workers_count) self.queue = asyncio.Queue(self.workers_count)
self.timeout = kwargs.get('timeout')
async def worker(self): async def worker(self):
while True: while True:
f, args, kwargs = await self.queue.get() try:
result = await f(*args, **kwargs) f, args, kwargs = self.queue.get_nowait()
except asyncio.QueueEmpty:
return
query_future = f(*args, **kwargs)
query_task = create_task_func()(query_future)
try:
result = await asyncio.wait_for(query_task, timeout=self.timeout)
except asyncio.TimeoutError:
result = None
self.results.append(result) self.results.append(result)
self.progress.update(1) self.progress.update(1)
self.queue.task_done() self.queue.task_done()
async def _run(self, tasks: QueriesDraft): async def _run(self, queries: QueriesDraft):
self.results = [] self.results = []
if sys.version_info.minor > 6: queries_list = list(queries)
create_task = asyncio.create_task
else:
loop = asyncio.get_event_loop()
create_task = loop.create_task
workers = [create_task(self.worker()) min_workers = min(len(queries_list), self.workers_count)
for _ in range(self.workers_count)]
task_list = list(tasks) workers = [create_task_func()(self.worker())
self.progress = self.progress_func(total=len(task_list)) for _ in range(min_workers)]
for t in task_list:
self.progress = self.progress_func(total=len(queries_list))
for t in queries_list:
await self.queue.put(t) await self.queue.put(t)
await self.queue.join() await self.queue.join()
for w in workers: for w in workers: