Async generator-executor for site checks (#1978)

This commit is contained in:
Soxoj
2024-12-17 22:48:11 +01:00
committed by GitHub
parent 36ce285572
commit 97e5f600d0
7 changed files with 127 additions and 28 deletions
+10 -15
View File
@@ -26,11 +26,7 @@ except ImportError:
from . import errors from . import errors
from .activation import ParsingActivator, import_aiohttp_cookies from .activation import ParsingActivator, import_aiohttp_cookies
from .errors import CheckError from .errors import CheckError
from .executors import ( from .executors import AsyncioQueueGeneratorExecutor
AsyncExecutor,
AsyncioSimpleExecutor,
AsyncioProgressbarQueueExecutor,
)
from .result import MaigretCheckResult, MaigretCheckStatus from .result import MaigretCheckResult, MaigretCheckStatus
from .sites import MaigretDatabase, MaigretSite from .sites import MaigretDatabase, MaigretSite
from .types import QueryOptions, QueryResultWrapper from .types import QueryOptions, QueryResultWrapper
@@ -670,12 +666,7 @@ async def maigret(
await debug_ip_request(clearweb_checker, logger) await debug_ip_request(clearweb_checker, logger)
# setup parallel executor # setup parallel executor
executor: Optional[AsyncExecutor] = None executor = AsyncioQueueGeneratorExecutor(
if no_progressbar:
# TODO: switch to AsyncioProgressbarQueueExecutor with progress object mock
executor = AsyncioSimpleExecutor(logger=logger)
else:
executor = AsyncioProgressbarQueueExecutor(
logger=logger, logger=logger,
in_parallel=max_connections, in_parallel=max_connections,
timeout=timeout + 0.5, timeout=timeout + 0.5,
@@ -728,13 +719,17 @@ async def maigret(
}, },
) )
cur_results = await executor.run(tasks_dict.values()) cur_results = []
with alive_bar(
# wait for executor timeout errors len(tasks_dict), title="Searching", force_tty=True, disable=no_progressbar
await asyncio.sleep(1) ) as progress:
async for result in executor.run(tasks_dict.values()):
cur_results.append(result)
progress()
all_results.update(cur_results) all_results.update(cur_results)
# rerun for failed sites
sites = get_failed_sites(dict(cur_results)) sites = get_failed_sites(dict(cur_results))
attempts -= 1 attempts -= 1
+69 -1
View File
@@ -1,7 +1,7 @@
import asyncio import asyncio
import sys import sys
import time import time
from typing import Any, Iterable, List from typing import Any, Iterable, List, Callable
import alive_progress import alive_progress
from alive_progress import alive_bar from alive_progress import alive_bar
@@ -19,6 +19,7 @@ def create_task_func():
class AsyncExecutor: class AsyncExecutor:
# Deprecated: will be removed soon, don't use it
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.logger = kwargs['logger'] self.logger = kwargs['logger']
@@ -34,6 +35,7 @@ class AsyncExecutor:
class AsyncioSimpleExecutor(AsyncExecutor): class AsyncioSimpleExecutor(AsyncExecutor):
# Deprecated: will be removed soon, don't use it
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.semaphore = asyncio.Semaphore(kwargs.get('in_parallel', 100)) self.semaphore = asyncio.Semaphore(kwargs.get('in_parallel', 100))
@@ -48,6 +50,7 @@ class AsyncioSimpleExecutor(AsyncExecutor):
class AsyncioProgressbarExecutor(AsyncExecutor): class AsyncioProgressbarExecutor(AsyncExecutor):
# Deprecated: will be removed soon, don't use it
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@@ -71,6 +74,7 @@ class AsyncioProgressbarExecutor(AsyncExecutor):
class AsyncioProgressbarSemaphoreExecutor(AsyncExecutor): class AsyncioProgressbarSemaphoreExecutor(AsyncExecutor):
# Deprecated: will be removed soon, don't use it
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.semaphore = asyncio.Semaphore(kwargs.get('in_parallel', 1)) self.semaphore = asyncio.Semaphore(kwargs.get('in_parallel', 1))
@@ -174,3 +178,67 @@ class AsyncioProgressbarQueueExecutor(AsyncExecutor):
w.cancel() w.cancel()
return self.results return self.results
class AsyncioQueueGeneratorExecutor:
# Deprecated: will be removed soon, don't use it
def __init__(self, *args, **kwargs):
self.workers_count = kwargs.get('in_parallel', 10)
self.queue = asyncio.Queue()
self.timeout = kwargs.get('timeout')
self.logger = kwargs['logger']
self._results = asyncio.Queue()
self._stop_signal = object()
async def worker(self):
"""Process tasks from the queue and put results into the results queue."""
while True:
task = await self.queue.get()
if task is self._stop_signal:
self.queue.task_done()
break
try:
f, args, kwargs = task
query_future = f(*args, **kwargs)
query_task = create_task_func()(query_future)
try:
result = await asyncio.wait_for(query_task, timeout=self.timeout)
except asyncio.TimeoutError:
result = kwargs.get('default')
await self._results.put(result)
except Exception as e:
self.logger.error(f"Error in worker: {e}")
finally:
self.queue.task_done()
async def run(self, queries: Iterable[Callable[..., Any]]):
"""Run workers to process queries in parallel."""
start_time = time.time()
# Add tasks to the queue
for t in queries:
await self.queue.put(t)
# Create workers
workers = [
asyncio.create_task(self.worker()) for _ in range(self.workers_count)
]
# Add stop signals
for _ in range(self.workers_count):
await self.queue.put(self._stop_signal)
try:
while any(w.done() is False for w in workers) or not self._results.empty():
try:
result = await asyncio.wait_for(self._results.get(), timeout=1)
yield result
except asyncio.TimeoutError:
pass
finally:
# Ensure all workers are awaited
await asyncio.gather(*workers)
self.execution_time = time.time() - start_time
self.logger.debug(f"Spent time: {self.execution_time}")
+3 -1
View File
@@ -496,7 +496,9 @@ async def main():
if args.web is not None: if args.web is not None:
from maigret.web.app import app from maigret.web.app import app
port = args.web if args.web else 5000 # args.web is either the specified port or 5000 by default port = (
args.web if args.web else 5000
) # args.web is either the specified port or 5000 by default
app.run(port=port) app.run(port=port)
return return
+3 -2
View File
@@ -7218,7 +7218,8 @@
"url": "https://gramho.com/explore-hashtag/{username}", "url": "https://gramho.com/explore-hashtag/{username}",
"source": "Instagram", "source": "Instagram",
"usernameClaimed": "adam", "usernameClaimed": "adam",
"usernameUnclaimed": "noonewouldeverusethis7" "usernameUnclaimed": "noonewouldeverusethis7",
"disabled": true
}, },
"Gravatar": { "Gravatar": {
"tags": [ "tags": [
@@ -17476,7 +17477,7 @@
"method": "vimeo" "method": "vimeo"
}, },
"headers": { "headers": {
"Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MzM5Njc3MjAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbCwianRpIjoiNGJkNDE4NzktM2VhOS00ZWRiLWIzZDUtNjAyNjQ3YjMyNTVhIn0.kPbKREujSfYsisyF0pS_HskTapRlHBfVLRw4cis1ezk" "Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MzQxMTc1NDAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbCwianRpIjoiNDc4Y2ZhZGUtZjI0Yy00MDVkLTliYWItN2RlNGEzNGM4MzI5In0.guN7Fg8dqq7EYdckrJ-6Rdkj_5MOl6FaC4YUSOceDpU"
}, },
"urlProbe": "https://api.vimeo.com/users/{username}?fields=name%2Cgender%2Cbio%2Curi%2Clink%2Cbackground_video%2Clocation_details%2Cpictures%2Cverified%2Cmetadata.public_videos.total%2Cavailable_for_hire%2Ccan_work_remotely%2Cmetadata.connections.videos.total%2Cmetadata.connections.albums.total%2Cmetadata.connections.followers.total%2Cmetadata.connections.following.total%2Cmetadata.public_videos.total%2Cmetadata.connections.vimeo_experts.is_enrolled%2Ctotal_collection_count%2Ccreated_time%2Cprofile_preferences%2Cmembership%2Cclients%2Cskills%2Cproject_types%2Crates%2Ccategories%2Cis_expert%2Cprofile_discovery%2Cwebsites%2Ccontact_emails&fetch_user_profile=1", "urlProbe": "https://api.vimeo.com/users/{username}?fields=name%2Cgender%2Cbio%2Curi%2Clink%2Cbackground_video%2Clocation_details%2Cpictures%2Cverified%2Cmetadata.public_videos.total%2Cavailable_for_hire%2Ccan_work_remotely%2Cmetadata.connections.videos.total%2Cmetadata.connections.albums.total%2Cmetadata.connections.followers.total%2Cmetadata.connections.following.total%2Cmetadata.public_videos.total%2Cmetadata.connections.vimeo_experts.is_enrolled%2Ctotal_collection_count%2Ccreated_time%2Cprofile_preferences%2Cmembership%2Cclients%2Cskills%2Cproject_types%2Crates%2Ccategories%2Cis_expert%2Cprofile_discovery%2Cwebsites%2Ccontact_emails&fetch_user_profile=1",
"checkType": "status_code", "checkType": "status_code",
+1
View File
@@ -188,6 +188,7 @@ class Submitter:
) )
return entered_username if entered_username else supposed_username return entered_username if entered_username else supposed_username
# TODO: replace with checking.py/SimpleAiohttpChecker call
@staticmethod @staticmethod
async def get_html_response_to_compare( async def get_html_response_to_compare(
url: str, session: ClientSession = None, redirects=False, headers: Dict = None url: str, session: ClientSession = None, redirects=False, headers: Dict = None
+33
View File
@@ -8,6 +8,7 @@ from maigret.executors import (
AsyncioProgressbarExecutor, AsyncioProgressbarExecutor,
AsyncioProgressbarSemaphoreExecutor, AsyncioProgressbarSemaphoreExecutor,
AsyncioProgressbarQueueExecutor, AsyncioProgressbarQueueExecutor,
AsyncioQueueGeneratorExecutor,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -76,3 +77,35 @@ async def test_asyncio_progressbar_queue_executor():
assert await executor.run(tasks) == [0, 3, 6, 9, 1, 4, 7, 2, 5, 8] assert await executor.run(tasks) == [0, 3, 6, 9, 1, 4, 7, 2, 5, 8]
assert executor.execution_time > 0.2 assert executor.execution_time > 0.2
assert executor.execution_time < 0.4 assert executor.execution_time < 0.4
@pytest.mark.asyncio
async def test_asyncio_queue_generator_executor():
tasks = [(func, [n], {}) for n in range(10)]
executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=2)
results = [result async for result in executor.run(tasks)]
assert results == [0, 1, 3, 2, 4, 6, 7, 5, 9, 8]
assert executor.execution_time > 0.5
assert executor.execution_time < 0.6
executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=3)
results = [result async for result in executor.run(tasks)]
assert results == [0, 3, 1, 4, 6, 2, 7, 9, 5, 8]
assert executor.execution_time > 0.4
assert executor.execution_time < 0.5
executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=5)
results = [result async for result in executor.run(tasks)]
assert results in (
[0, 3, 6, 1, 4, 7, 9, 2, 5, 8],
[0, 3, 6, 1, 4, 9, 7, 2, 5, 8],
)
assert executor.execution_time > 0.3
assert executor.execution_time < 0.4
executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=10)
results = [result async for result in executor.run(tasks)]
assert results == [0, 3, 6, 9, 1, 4, 7, 2, 5, 8]
assert executor.execution_time > 0.2
assert executor.execution_time < 0.3
+2 -3
View File
@@ -1,9 +1,8 @@
import pytest import pytest
from unittest.mock import AsyncMock, MagicMock, patch from unittest.mock import MagicMock, patch
from maigret.submit import Submitter, MaigretSite, MaigretEngine from maigret.submit import Submitter
from aiohttp import ClientSession from aiohttp import ClientSession
from maigret.sites import MaigretDatabase from maigret.sites import MaigretDatabase
from maigret.settings import Settings
import logging import logging