Parallel execution optimization (#1897)

* Connection failure fix: removed futures, added semaphores

* Additional fixes

* Tqdm replace to alive_progress, poetry update

* Self-check mode fix, tests fixes

* Sites checks fixes (#1896)

* Fixed incorrect site names, added method to compare sites
This commit is contained in:
Soxoj
2024-11-26 13:55:12 +01:00
committed by GitHub
parent b370bc4c44
commit 324c118530
10 changed files with 1301 additions and 1134 deletions
+105 -101
View File
@@ -1,40 +1,40 @@
# Standard library imports
import ast
import asyncio
import logging
import random
import re
import ssl
import sys
from typing import Dict, List, Optional, Tuple
from urllib.parse import quote
# Third party imports
import aiodns
import alive_progress
from alive_progress import alive_bar
from aiohttp import ClientSession, TCPConnector, http_exceptions
from aiohttp.client_exceptions import ClientConnectorError, ServerDisconnectedError
from python_socks import _errors as proxy_errors
from socid_extractor import extract
try:
from mock import Mock
except ImportError:
from unittest.mock import Mock
import ast
import re
import ssl
import sys
import tqdm
import random
from typing import Tuple, Optional, Dict, List
from urllib.parse import quote
import aiodns
import tqdm.asyncio
from python_socks import _errors as proxy_errors
from socid_extractor import extract
from aiohttp import TCPConnector, ClientSession, http_exceptions
from aiohttp.client_exceptions import ServerDisconnectedError, ClientConnectorError
from .activation import ParsingActivator, import_aiohttp_cookies
# Local imports
from . import errors
from .activation import ParsingActivator, import_aiohttp_cookies
from .errors import CheckError
from .executors import (
AsyncExecutor,
AsyncioSimpleExecutor,
AsyncioProgressbarQueueExecutor,
)
from .result import QueryResult, QueryStatus
from .sites import MaigretDatabase, MaigretSite
from .types import QueryOptions, QueryResultWrapper
from .utils import get_random_user_agent, ascii_data_display
from .utils import ascii_data_display, get_random_user_agent
SUPPORTED_IDS = (
@@ -58,102 +58,99 @@ class CheckerBase:
class SimpleAiohttpChecker(CheckerBase):
def __init__(self, *args, **kwargs):
proxy = kwargs.get('proxy')
cookie_jar = kwargs.get('cookie_jar')
self.proxy = kwargs.get('proxy')
self.cookie_jar = kwargs.get('cookie_jar')
self.logger = kwargs.get('logger', Mock())
# moved here to speed up the launch of Maigret
from aiohttp_socks import ProxyConnector
# make http client session
connector = ProxyConnector.from_url(proxy) if proxy else TCPConnector(ssl=False)
connector.verify_ssl = False
self.session = ClientSession(
connector=connector, trust_env=True, cookie_jar=cookie_jar
)
self.url = None
self.headers = None
self.allow_redirects = True
self.timeout = 0
self.method = 'get'
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get'):
if method == 'get':
request_method = self.session.get
else:
request_method = self.session.head
future = request_method(
url=url,
headers=headers,
allow_redirects=allow_redirects,
timeout=timeout,
)
return future
self.url = url
self.headers = headers
self.allow_redirects = allow_redirects
self.timeout = timeout
self.method = method
return None
async def close(self):
await self.session.close()
async def check(self, future) -> Tuple[str, int, Optional[CheckError]]:
html_text = None
status_code = 0
error: Optional[CheckError] = CheckError("Unknown")
pass
async def _make_request(self, session, url, headers, allow_redirects, timeout, method, logger) -> Tuple[str, int, Optional[CheckError]]:
try:
response = await future
request_method = session.get if method == 'get' else session.head
async with request_method(
url=url,
headers=headers,
allow_redirects=allow_redirects,
timeout=timeout,
) as response:
status_code = response.status
response_content = await response.content.read()
charset = response.charset or "utf-8"
decoded_content = response_content.decode(charset, "ignore")
status_code = response.status
response_content = await response.content.read()
charset = response.charset or "utf-8"
decoded_content = response_content.decode(charset, "ignore")
html_text = decoded_content
error = CheckError("Connection lost") if status_code == 0 else None
logger.debug(decoded_content)
error = None
if status_code == 0:
error = CheckError("Connection lost")
self.logger.debug(html_text)
return decoded_content, status_code, error
except asyncio.TimeoutError as e:
error = CheckError("Request timeout", str(e))
return None, 0, CheckError("Request timeout", str(e))
except ClientConnectorError as e:
error = CheckError("Connecting failure", str(e))
return None, 0, CheckError("Connecting failure", str(e))
except ServerDisconnectedError as e:
error = CheckError("Server disconnected", str(e))
return None, 0, CheckError("Server disconnected", str(e))
except http_exceptions.BadHttpMessage as e:
error = CheckError("HTTP", str(e))
return None, 0, CheckError("HTTP", str(e))
except proxy_errors.ProxyError as e:
error = CheckError("Proxy", str(e))
return None, 0, CheckError("Proxy", str(e))
except KeyboardInterrupt:
error = CheckError("Interrupted")
return None, 0, CheckError("Interrupted")
except Exception as e:
# python-specific exceptions
if sys.version_info.minor > 6 and (
isinstance(e, ssl.SSLCertVerificationError)
or isinstance(e, ssl.SSLError)
):
error = CheckError("SSL", str(e))
return None, 0, CheckError("SSL", str(e))
else:
self.logger.debug(e, exc_info=True)
error = CheckError("Unexpected", str(e))
logger.debug(e, exc_info=True)
return None, 0, CheckError("Unexpected", str(e))
if error == "Invalid proxy response":
self.logger.debug(error, exc_info=True)
async def check(self) -> Tuple[str, int, Optional[CheckError]]:
from aiohttp_socks import ProxyConnector
connector = ProxyConnector.from_url(self.proxy) if self.proxy else TCPConnector(ssl=False)
connector.verify_ssl = False
return str(html_text), status_code, error
async with ClientSession(
connector=connector,
trust_env=True,
cookie_jar=self.cookie_jar.copy() if self.cookie_jar else None
) as session:
html_text, status_code, error = await self._make_request(
session,
self.url,
self.headers,
self.allow_redirects,
self.timeout,
self.method,
self.logger
)
if error and str(error) == "Invalid proxy response":
self.logger.debug(error, exc_info=True)
return str(html_text) if html_text else '', status_code, error
class ProxiedAiohttpChecker(SimpleAiohttpChecker):
def __init__(self, *args, **kwargs):
proxy = kwargs.get('proxy')
cookie_jar = kwargs.get('cookie_jar')
self.proxy = kwargs.get('proxy')
self.cookie_jar = kwargs.get('cookie_jar')
self.logger = kwargs.get('logger', Mock())
# moved here to speed up the launch of Maigret
from aiohttp_socks import ProxyConnector
connector = ProxyConnector.from_url(proxy)
connector.verify_ssl = False
self.session = ClientSession(
connector=connector, trust_env=True, cookie_jar=cookie_jar
)
class AiodnsDomainResolver(CheckerBase):
if sys.platform == 'win32': # Temporary workaround for Windows
@@ -192,7 +189,7 @@ class CheckerMock:
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get'):
return None
async def check(self, future) -> Tuple[str, int, Optional[CheckError]]:
async def check(self) -> Tuple[str, int, Optional[CheckError]]:
await asyncio.sleep(0)
return '', 0, None
@@ -544,13 +541,16 @@ async def check_site_for_username(
default_result = make_site_result(
site, username, options, logger, retry=kwargs.get('retry')
)
future = default_result.get("future")
if not future:
# future = default_result.get("future")
# if not future:
# return site.name, default_result
checker = default_result.get("checker")
if not checker:
print(f"error, no checker for {site.name}")
return site.name, default_result
checker = default_result["checker"]
response = await checker.check(future=future)
response = await checker.check()
response_result = process_site_result(
response, query_notify, logger, default_result, site
@@ -562,8 +562,8 @@ async def check_site_for_username(
async def debug_ip_request(checker, logger):
future = checker.prepare(url="https://icanhazip.com")
ip, status, check_error = await checker.check(future)
checker.prepare(url="https://icanhazip.com")
ip, status, check_error = await checker.check()
if ip:
logger.debug(f"My IP is: {ip.strip()}")
else:
@@ -753,10 +753,8 @@ async def maigret(
# closing http client session
await clearweb_checker.close()
if tor_proxy:
await tor_checker.close()
if i2p_proxy:
await i2p_checker.close()
await tor_checker.close()
await i2p_checker.close()
# notify caller that all queries are finished
query_notify.finish()
@@ -791,7 +789,7 @@ def timeout_check(value):
async def site_self_check(
site: MaigretSite,
logger,
logger: logging.Logger,
semaphore,
db: MaigretDatabase,
silent=False,
@@ -837,6 +835,9 @@ async def site_self_check(
result = results_dict[site.name]["status"]
if result.error and 'Cannot connect to host' in result.error.desc:
changes["disabled"] = True
site_status = result.status
if site_status != status:
@@ -864,6 +865,7 @@ async def site_self_check(
if changes["disabled"] != site.disabled:
site.disabled = changes["disabled"]
logger.info(f"Switching disabled status of {site.name} to {site.disabled}")
db.update_site(site)
if not silent:
action = "Disabled" if site.disabled else "Enabled"
@@ -880,7 +882,7 @@ async def site_self_check(
async def self_check(
db: MaigretDatabase,
site_data: dict,
logger,
logger: logging.Logger,
silent=False,
max_connections=10,
proxy=None,
@@ -905,8 +907,10 @@ async def self_check(
tasks.append(future)
if tasks:
for f in tqdm.asyncio.tqdm.as_completed(tasks):
await f
with alive_bar(len(tasks), title='Self-checking', force_tty=True) as progress:
for f in asyncio.as_completed(tasks):
await f
progress() # Update the progress bar
unchecked_new_count = len([site for site in all_sites.values() if "unchecked" in site.tags])
disabled_new_count = disabled_count(all_sites.values())
+42 -33
View File
@@ -1,12 +1,13 @@
import asyncio
import time
import tqdm
import sys
from typing import Iterable, Any, List
import time
from typing import Any, Iterable, List
import alive_progress
from alive_progress import alive_bar
from .types import QueryDraft
def create_task_func():
if sys.version_info.minor > 6:
create_asyncio_task = asyncio.create_task
@@ -34,9 +35,14 @@ class AsyncExecutor:
class AsyncioSimpleExecutor(AsyncExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.semaphore = asyncio.Semaphore(kwargs.get('in_parallel', 100))
async def _run(self, tasks: Iterable[QueryDraft]):
futures = [f(*args, **kwargs) for f, args, kwargs in tasks]
async def sem_task(f, args, kwargs):
async with self.semaphore:
return await f(*args, **kwargs)
futures = [sem_task(f, args, kwargs) for f, args, kwargs in tasks]
return await asyncio.gather(*futures)
@@ -46,9 +52,20 @@ class AsyncioProgressbarExecutor(AsyncExecutor):
async def _run(self, tasks: Iterable[QueryDraft]):
futures = [f(*args, **kwargs) for f, args, kwargs in tasks]
total_tasks = len(futures)
results = []
for f in tqdm.asyncio.tqdm.as_completed(futures):
results.append(await f)
# Use alive_bar for progress tracking
with alive_bar(total_tasks, title='Searching', force_tty=True) as progress:
# Chunk progress updates for efficiency
async def track_task(task):
result = await task
progress() # Update progress bar once task completes
return result
# Use gather to run tasks concurrently and track progress
results = await asyncio.gather(*(track_task(f) for f in futures))
return results
@@ -66,8 +83,12 @@ class AsyncioProgressbarSemaphoreExecutor(AsyncExecutor):
async def semaphore_gather(tasks: Iterable[QueryDraft]):
coros = [_wrap_query(q) for q in tasks]
results = []
for f in tqdm.asyncio.tqdm.as_completed(coros):
results.append(await f)
# Use alive_bar correctly as a context manager
with alive_bar(len(coros), title='Searching', force_tty=True) as progress:
for f in asyncio.as_completed(coros):
results.append(await f)
progress() # Update the progress bar
return results
return await semaphore_gather(tasks)
@@ -77,24 +98,13 @@ class AsyncioProgressbarQueueExecutor(AsyncExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.workers_count = kwargs.get('in_parallel', 10)
self.progress_func = kwargs.get('progress_func', tqdm.tqdm)
self.queue = asyncio.Queue(self.workers_count)
self.timeout = kwargs.get('timeout')
self.bar_update = None # Store the update function from alive_bar
async def increment_progress(self, count):
update_func = self.progress.update
if asyncio.iscoroutinefunction(update_func):
await update_func(count)
else:
update_func(count)
await asyncio.sleep(0)
async def stop_progress(self):
stop_func = self.progress.close
if asyncio.iscoroutinefunction(stop_func):
await stop_func()
else:
stop_func()
if self.bar_update:
self.bar_update(count)
await asyncio.sleep(0)
async def worker(self):
@@ -117,22 +127,21 @@ class AsyncioProgressbarQueueExecutor(AsyncExecutor):
async def _run(self, queries: Iterable[QueryDraft]):
self.results: List[Any] = []
queries_list = list(queries)
min_workers = min(len(queries_list), self.workers_count)
workers = [create_task_func()(self.worker()) for _ in range(min_workers)]
self.progress = self.progress_func(total=len(queries_list))
# Initialize alive_progress bar
with alive_bar(len(queries_list), title="Searching", force_tty=True) as bar:
self.bar_update = bar # `alive_bar` uses its instance to update progress
for t in queries_list:
await self.queue.put(t)
for t in queries_list:
await self.queue.put(t)
await self.queue.join()
await self.queue.join()
for w in workers:
w.cancel()
for w in workers:
w.cancel()
await self.stop_progress()
return self.results
return self.results
Generated
+1099 -915
View File
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -55,7 +55,7 @@ socid-extractor = "^0.0.26"
soupsieve = "^2.6"
stem = "^1.8.1"
torrequest = "^0.1.0"
tqdm = "^4.66.1"
alive_progress = "^2.4.1"
typing-extensions = "^4.8.0"
webencodings = "^0.5.1"
xhtml2pdf = "^0.2.11"
+2 -2
View File
@@ -3128,9 +3128,9 @@ Rank data fetched from Alexa by domains.
1. ![](https://www.google.com/s2/favicons?domain=https://archive.transformativeworks.org) [archive.transformativeworks.org (https://archive.transformativeworks.org)](https://archive.transformativeworks.org)*: top 100M*
1. ![](https://www.google.com/s2/favicons?domain=https://www.tnaflix.com) [www.tnaflix.com (https://www.tnaflix.com)](https://www.tnaflix.com)*: top 100M*
1. ![](https://www.google.com/s2/favicons?domain=https://massagerepublic.com) [massagerepublic.com (https://massagerepublic.com)](https://massagerepublic.com)*: top 100M*
1. ![](https://www.google.com/s2/favicons?domain=https://mynickname.com) [mynickname.com (https://mynickname.com)](https://mynickname.com)*: top 100M, unchecked*
1. ![](https://www.google.com/s2/favicons?domain=https://mynickname.com) [mynickname.com (https://mynickname.com)](https://mynickname.com)*: top 100M*
The list was updated at (2024-11-26 10:27:01.383232+00:00 UTC)
The list was updated at (2024-11-26 UTC)
## Statistics
Enabled/total sites: 2694/3126 = 86.18%
+28 -9
View File
@@ -1,25 +1,44 @@
{
"engines": {},
"sites": {
"GooglePlayStore": {
"ValidActive": {
"tags": ["global", "us"],
"disabled": false,
"checkType": "status_code",
"alexaRank": 1,
"url": "https://play.google.com/store/apps/developer?id={username}",
"urlMain": "https://play.google.com/store",
"usernameClaimed": "Facebook_nosuchname",
"usernameClaimed": "OpenAI",
"usernameUnclaimed": "noonewouldeverusethis7"
},
"Reddit": {
"tags": ["news", "social", "us"],
"InvalidActive": {
"tags": ["global", "us"],
"disabled": false,
"checkType": "status_code",
"presenseStrs": ["totalKarma"],
"alexaRank": 1,
"url": "https://play.google.com/store/apps/dev?id={username}",
"urlMain": "https://play.google.com/store",
"usernameClaimed": "OpenAI",
"usernameUnclaimed": "noonewouldeverusethis7"
},
"ValidInactive": {
"tags": ["global", "us"],
"disabled": true,
"alexaRank": 17,
"url": "https://www.reddit.com/user/{username}",
"urlMain": "https://www.reddit.com/",
"usernameClaimed": "blue",
"checkType": "status_code",
"alexaRank": 1,
"url": "https://play.google.com/store/apps/developer?id={username}",
"urlMain": "https://play.google.com/store",
"usernameClaimed": "OpenAI",
"usernameUnclaimed": "noonewouldeverusethis7"
},
"InvalidInactive": {
"tags": ["global", "us"],
"disabled": true,
"checkType": "status_code",
"alexaRank": 1,
"url": "https://play.google.com/store/apps/dev?id={username}",
"urlMain": "https://play.google.com/store",
"usernameClaimed": "OpenAI",
"usernameUnclaimed": "noonewouldeverusethis7"
}
}
+12 -55
View File
@@ -35,65 +35,22 @@ RESULTS_EXAMPLE = {
@pytest.mark.slow
def test_self_check_db_positive_disable(test_db):
logger = Mock()
assert test_db.sites[0].disabled is False
loop = asyncio.get_event_loop()
loop.run_until_complete(
self_check(test_db, test_db.sites_dict, logger, silent=True)
)
assert test_db.sites[0].disabled is True
@pytest.mark.slow
@pytest.mark.skip(reason="broken, fixme")
def test_self_check_db_positive_enable(test_db):
@pytest.mark.asyncio
async def test_self_check_db(test_db):
# initalize logger to debug
logger = Mock()
test_db.sites[0].disabled = True
test_db.sites[0].username_claimed = 'Skyeng'
assert test_db.sites[0].disabled is True
assert test_db.sites_dict['InvalidActive'].disabled is False
assert test_db.sites_dict['ValidInactive'].disabled is True
assert test_db.sites_dict['ValidActive'].disabled is False
assert test_db.sites_dict['InvalidInactive'].disabled is True
loop = asyncio.get_event_loop()
loop.run_until_complete(
self_check(test_db, test_db.sites_dict, logger, silent=True)
)
await self_check(test_db, test_db.sites_dict, logger, silent=False)
assert test_db.sites[0].disabled is False
@pytest.mark.slow
def test_self_check_db_negative_disabled(test_db):
logger = Mock()
test_db.sites[0].disabled = True
assert test_db.sites[0].disabled is True
loop = asyncio.get_event_loop()
loop.run_until_complete(
self_check(test_db, test_db.sites_dict, logger, silent=True)
)
assert test_db.sites[0].disabled is True
@pytest.mark.skip(reason='broken, fixme')
@pytest.mark.slow
def test_self_check_db_negative_enabled(test_db):
logger = Mock()
test_db.sites[0].disabled = False
test_db.sites[0].username_claimed = 'Skyeng'
assert test_db.sites[0].disabled is False
loop = asyncio.get_event_loop()
loop.run_until_complete(
self_check(test_db, test_db.sites_dict, logger, silent=True)
)
assert test_db.sites[0].disabled is False
assert test_db.sites_dict['InvalidActive'].disabled is True
assert test_db.sites_dict['ValidInactive'].disabled is False
assert test_db.sites_dict['ValidActive'].disabled is False
assert test_db.sites_dict['InvalidInactive'].disabled is True
@pytest.mark.slow
+6 -14
View File
@@ -3,23 +3,13 @@
This module generates the listing of supported sites in file `SITES.md`
and pretty prints file with sites data.
"""
import aiohttp
import asyncio
import json
import sys
import requests
import logging
import threading
import xml.etree.ElementTree as ET
from datetime import datetime
from argparse import ArgumentParser, RawDescriptionHelpFormatter
import tqdm.asyncio
from maigret.maigret import get_response, site_self_check
from maigret.sites import MaigretSite, MaigretDatabase, MaigretEngine
from maigret.utils import CaseConverter
from maigret.maigret import get_response
from maigret.sites import MaigretDatabase, MaigretEngine
async def check_engine_of_site(site_name, sites_with_engines, future, engine_name, semaphore, logger):
async with semaphore:
@@ -98,8 +88,10 @@ if __name__ == '__main__':
tasks.append(future)
# progress bar
for f in tqdm.asyncio.tqdm.as_completed(tasks):
loop.run_until_complete(f)
with alive_progress(len(tasks), title='Checking sites') as progress:
for f in asyncio.as_completed(tasks):
loop.run_until_complete(f)
progress()
print(f'Total detected {len(new_engine_sites)} sites on engine {engine_name}')
# dict with new found engine sites
+5 -3
View File
@@ -3,7 +3,7 @@ import json
import random
import re
import tqdm.asyncio
import alive_progress
from mock import Mock
import requests
@@ -181,7 +181,7 @@ if __name__ == '__main__':
raw_maigret_data = json.dumps({site.name: site.json for site in sites_subset})
new_sites = []
for site in tqdm.asyncio.tqdm(urls):
for site in alive_progress.alive_it(urls):
site_lowercase = site.lower()
domain_raw = URL_RE.sub('', site_lowercase).strip().strip('/')
@@ -271,7 +271,9 @@ if __name__ == '__main__':
future = asyncio.ensure_future(check_coro)
tasks.append(future)
for f in tqdm.asyncio.tqdm.as_completed(tasks, timeout=TIMEOUT):
with alive_progress(len(tasks), title='Checking sites') as progress:
for f in asyncio.as_completed(tasks):
progress()
try:
loop.run_until_complete(f)
except asyncio.exceptions.TimeoutError:
+1 -1
View File
@@ -137,7 +137,7 @@ Rank data fetched from Alexa by domains.
site_file.write(f'1. {favicon} [{site}]({url_main})*: top {valid_rank}{tags}*{note}\n')
db.update_site(site)
site_file.write(f'\nThe list was updated at ({datetime.now(timezone.utc)} UTC)\n')
site_file.write(f'\nThe list was updated at ({datetime.now(timezone.utc).date()} UTC)\n')
db.save_to_file(args.base_file)
statistics_text = db.get_db_stats(is_markdown=True)