Created async requests executors, some sites fixes

This commit is contained in:
Soxoj
2021-03-20 20:57:07 +03:00
parent faa03b62e5
commit 314eb25d1f
5 changed files with 238 additions and 58 deletions
+119 -27
View File
@@ -3,6 +3,9 @@ import logging
import re
import ssl
import sys
import tqdm
import time
from typing import Callable, Any, Iterable, Tuple
import aiohttp
import tqdm.asyncio
@@ -37,6 +40,95 @@ common_errors = {
unsupported_characters = '#'
QueryDraft = Tuple[Callable, Any, Any]
QueriesDraft = Iterable[QueryDraft]
class AsyncExecutor:
def __init__(self, *args, **kwargs):
self.logger = kwargs['logger']
async def run(self, tasks: QueriesDraft):
start_time = time.time()
results = await self._run(tasks)
self.execution_time = time.time() - start_time
self.logger.debug(f'Spent time: {self.execution_time}')
return results
async def _run(self, tasks: QueriesDraft):
await asyncio.sleep(0)
class AsyncioSimpleExecutor(AsyncExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def _run(self, tasks: QueriesDraft):
futures = [f(*args, **kwargs) for f, args, kwargs in tasks]
return await asyncio.gather(*futures)
class AsyncioProgressbarExecutor(AsyncExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def _run(self, tasks: QueriesDraft):
futures = [f(*args, **kwargs) for f, args, kwargs in tasks]
results = []
for f in tqdm.asyncio.tqdm.as_completed(futures):
results.append(await f)
return results
class AsyncioProgressbarSemaphoreExecutor(AsyncExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.semaphore = asyncio.Semaphore(kwargs.get('in_parallel', 1))
async def _run(self, tasks: QueriesDraft):
async def _wrap_query(q: QueryDraft):
async with self.semaphore:
f, args, kwargs = q
return await f(*args, **kwargs)
async def semaphore_gather(tasks: QueriesDraft):
coros = [_wrap_query(q) for q in tasks]
results = []
for f in tqdm.asyncio.tqdm.as_completed(coros):
results.append(await f)
return results
return await semaphore_gather(tasks)
class AsyncioProgressbarQueueExecutor(AsyncExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.workers_count = kwargs.get('in_parallel', 10)
self.progress_func = kwargs.get('progress_func', tqdm.tqdm)
self.queue = asyncio.Queue(self.workers_count)
async def worker(self):
while True:
f, args, kwargs = await self.queue.get()
result = await f(*args, **kwargs)
self.results.append(result)
self.progress.update(1)
self.queue.task_done()
async def _run(self, tasks: QueriesDraft):
self.results = []
workers = [asyncio.create_task(self.worker())
for _ in range(self.workers_count)]
task_list = list(tasks)
self.progress = self.progress_func(total=len(task_list))
for t in task_list:
await self.queue.put(t)
await self.queue.join()
for w in workers:
w.cancel()
self.progress.close()
return self.results
async def get_response(request_future, site_name, logger):
html_text = None
@@ -87,19 +179,18 @@ async def get_response(request_future, site_name, logger):
return html_text, status_code, error_text, expection_text
async def update_site_dict_from_response(sitename, site_dict, results_info, semaphore, logger, query_notify):
async with semaphore:
site_obj = site_dict[sitename]
future = site_obj.request_future
if not future:
# ignore: search by incompatible id type
return
async def update_site_dict_from_response(sitename, site_dict, results_info, logger, query_notify):
site_obj = site_dict[sitename]
future = site_obj.request_future
if not future:
# ignore: search by incompatible id type
return
response = await get_response(request_future=future,
site_name=sitename,
logger=logger)
response = await get_response(request_future=future,
site_name=sitename,
logger=logger)
site_dict[sitename] = process_site_result(response, query_notify, logger, results_info, site_obj)
return sitename, process_site_result(response, query_notify, logger, results_info, site_obj)
# TODO: move to separate class
@@ -454,32 +545,33 @@ async def maigret(username, site_dict, query_notify, logger,
# Add this site's results into final dictionary with all of the other results.
results_total[site_name] = results_site
# TODO: move into top-level function
sem = asyncio.Semaphore(max_connections)
tasks = []
coroutines = []
for sitename, result_obj in results_total.items():
update_site_coro = update_site_dict_from_response(sitename, site_dict, result_obj, sem, logger, query_notify)
future = asyncio.ensure_future(update_site_coro)
tasks.append(future)
coroutines.append((update_site_dict_from_response, [sitename, site_dict, result_obj, logger, query_notify], {}))
if no_progressbar:
await asyncio.gather(*tasks)
executor = AsyncioSimpleExecutor(logger=logger)
else:
for f in tqdm.asyncio.tqdm.as_completed(tasks, timeout=timeout):
try:
await f
except asyncio.exceptions.TimeoutError:
# TODO: write timeout to results
pass
executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=max_connections, timeout=timeout+0.5)
results = await executor.run(coroutines)
await session.close()
# Notify caller that all queries are finished.
query_notify.finish()
return results_total
data = {}
for result in results:
# TODO: still can be empty
if result:
try:
data[result[0]] = result[1]
except Exception as e:
logger.error(e, exc_info=True)
logger.info(result)
return data
def timeout_check(value):
+2 -3
View File
@@ -261,7 +261,7 @@ async def main():
print('Maigret sites database self-checking...')
is_need_update = await self_check(db, site_data, logger, max_connections=args.connections)
if is_need_update:
if input('Do you want to save changes permanently? [yYnN]\n').lower() == 'y':
if input('Do you want to save changes permanently? [Yn]\n').lower() == 'y':
db.save_to_file(args.db_file)
print('Database was successfully updated.')
else:
@@ -337,14 +337,13 @@ async def main():
max_connections=args.connections,
)
username_result = (username, id_type, results)
general_results.append((username, id_type, results))
# TODO: tests
for website_name in results:
dictionary = results[website_name]
# TODO: fix no site data issue
if not dictionary:
if not dictionary or not recursive_search_enabled:
continue
new_usernames = dictionary.get('ids_usernames')
+50 -27
View File
@@ -2426,8 +2426,6 @@
},
"Ccmixter": {
"tags": [
"global",
"in",
"us"
],
"checkType": "message",
@@ -2443,13 +2441,18 @@
},
"Cent": {
"tags": [
"in",
"mx",
"tw",
"us"
"us",
"art",
"writing"
],
"urlProbe": "https://beta.cent.co/data/user/profile?userHandles={username}",
"checkType": "message",
"absenceStrs": "<title>Cent</title>",
"presenseStrs": [
"display_name"
],
"absenceStrs": [
"\"results\":[]"
],
"alexaRank": 31175,
"url": "https://beta.cent.co/@{username}",
"urlMain": "https://cent.co/",
@@ -11713,15 +11716,13 @@
"usernameClaimed": "vitaline",
"usernameUnclaimed": "noonewouldeverusethis7"
},
"Sevenforums": {
"SevenForums": {
"tags": [
"gb",
"us"
],
"checkType": "message",
"absenceStrs": "<title>Just a moment...</title>",
"engine": "vBulletin",
"alexaRank": 20828,
"url": "https://www.sevenforums.com/members/{username}.html",
"urlMain": "https://www.sevenforums.com",
"usernameClaimed": "adam",
"usernameUnclaimed": "noonewouldeverusethis7"
@@ -12349,7 +12350,7 @@
"us"
],
"headers": {
"authorization": "Bearer BQA6sdhtUg3hadjln7DCoAK6sLn7KrHfsn2DObW2gr-W3HgF0h1KZGVYgwispRDR1tqRntVeTd0Duvb2q4g"
"authorization": "Bearer BQBQDhCkzUqE4QBPyqrSyRZbBRp5pdttS7rj9J8qT7OllWuJazqP6CcE-1eGcNoRkxNl9Ds9JCdgY3soi6U"
},
"errors": {
"Spotify is currently not available in your country.": "Access denied in your country, use proxy/vpn"
@@ -12774,6 +12775,7 @@
"usernameUnclaimed": "noonewouldeverusethis7"
},
"TJournal": {
"similarSearch": true,
"tags": [
"ru"
],
@@ -12900,18 +12902,6 @@
"usernameClaimed": "taplink.ru",
"usernameUnclaimed": "noonewouldeverusethis77777"
},
"Taringa": {
"tags": [
"ar"
],
"checkType": "message",
"absenceStrs": "Moved Permanently",
"alexaRank": 4125,
"url": "https://www.taringa.net/{username}",
"urlMain": "https://taringa.net/",
"usernameClaimed": "blue",
"usernameUnclaimed": "noonewouldeverusethis7"
},
"TechPowerUp": {
"tags": [
"us"
@@ -13690,7 +13680,7 @@
"sec-ch-ua": "Google Chrome\";v=\"87\", \" Not;A Brand\";v=\"99\", \"Chromium\";v=\"87\"",
"authorization": "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36",
"x-guest-token": "1372637128920825857"
"x-guest-token": "1373308975769391104"
},
"errors": {
"Bad guest token": "x-guest-token update required"
@@ -13865,6 +13855,7 @@
"usernameUnclaimed": "noonewouldeverusethis7"
},
"VC.ru": {
"similarSearch": true,
"tags": [
"ru"
],
@@ -14062,7 +14053,7 @@
"video"
],
"headers": {
"Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTYxMDcyNjAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbH0.kzWxBf1qCJwjpZYUP6w-Pf4VptBMKpKUaMw8VnYwtPU"
"Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTYyNjMwODAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbH0.YKtLE0-AGmaXJNF99dVKjPW8z5_-wDs6tnnjVOybDaQ"
},
"activation": {
"url": "https://vimeo.com/_rv/viewer",
@@ -14091,7 +14082,7 @@
"usernameClaimed": "blue",
"usernameUnclaimed": "noonewouldeverusethis7"
},
"Virtualireland": {
"VirtualIreland": {
"tags": [
"ie",
"ru"
@@ -23657,6 +23648,21 @@
"urlMain": "https://linuxpip.org",
"usernameClaimed": "diehard",
"usernameUnclaimed": "noonewouldeverusethis7"
},
"Taringa": {
"checkType": "message",
"presenseStrs": [
"User",
" user-username",
" UserFeed"
],
"absenceStrs": [
"problema"
],
"url": "https://www.taringa.net/{username}",
"urlMain": "https://www.taringa.net",
"usernameClaimed": "UniversoGIA",
"usernameUnclaimed": "noonewouldeverusethis7"
}
},
"engines": {
@@ -23727,6 +23733,7 @@
],
"checkType": "message",
"errors": {
"\u041f\u0440\u043e\u0441\u0442\u0438\u0442\u0435, \u043d\u043e \u0432\u0430\u0448 IP \u0432 \u0441\u043f\u0438\u0441\u043a\u0435 \u0437\u0430\u043f\u0440\u0435\u0449\u0435\u043d\u043d\u044b\u0445 \u0430\u0434\u043c\u0438\u043d\u0438\u0441\u0442\u0440\u0430\u0446\u0438\u0435\u0439 \u0444\u043e\u0440\u0443\u043c\u0430": "IP ban",
"You have been banned": "IP ban",
"The administrator has banned your IP address": "IP ban",
"\u0418\u0437\u0432\u0438\u043d\u0438\u0442\u0435, \u0441\u0435\u0440\u0432\u0435\u0440 \u043f\u0435\u0440\u0435\u0433\u0440\u0443\u0436\u0435\u043d. \u041f\u043e\u0436\u0430\u043b\u0443\u0439\u0441\u0442\u0430, \u043f\u043e\u043f\u0440\u043e\u0431\u0443\u0439\u0442\u0435 \u0437\u0430\u0439\u0442\u0438 \u043f\u043e\u0437\u0436\u0435.": "Server is overloaded"
@@ -23762,6 +23769,7 @@
"error404"
],
"checkType": "message",
"requestHeadOnly": false,
"url": "{urlMain}/author/{username}/"
},
"presenseStrs": [
@@ -23769,6 +23777,21 @@
"/wp-includes/wlwmanifest.xml"
]
},
"Flarum": {
"name": "Flarum",
"site": {
"presenseStrs": [
"\"attributes\":{\"username\""
],
"absenceStrs": [
"NotFound"
],
"checkType": "message"
},
"presenseStrs": [
"flarum-loading-error"
]
},
"engine404": {
"name": "engine404",
"site": {
+1 -1
View File
@@ -170,7 +170,7 @@ async def submit_dialog(db, url_exists, cookie_file):
print(f'Sorry, we couldn\'t find params to detect account presence/absence in {site.name}.')
print('Try to run this mode again and increase features count or choose others.')
else:
if input(f'Site {site.name} successfully checked. Do you want to save it in the Maigret DB? [yY] ') in 'yY':
if input(f'Site {site.name} successfully checked. Do you want to save it in the Maigret DB? [Yn] ').lower() in 'y':
db.update_site(site)
return True
+66
View File
@@ -0,0 +1,66 @@
"""Maigret checking logic test functions"""
import pytest
import asyncio
import logging
from maigret.checking import AsyncioSimpleExecutor, AsyncioProgressbarExecutor, AsyncioProgressbarSemaphoreExecutor, AsyncioProgressbarQueueExecutor
logger = logging.getLogger(__name__)
async def func(n):
await asyncio.sleep(0.1 * (n % 3))
return n
@pytest.mark.asyncio
async def test_simple_asyncio_executor():
tasks = [(func, [n], {}) for n in range(10)]
executor = AsyncioSimpleExecutor(logger=logger)
assert await executor.run(tasks) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert executor.execution_time > 0.2
assert executor.execution_time < 0.3
@pytest.mark.asyncio
async def test_asyncio_progressbar_executor():
tasks = [(func, [n], {}) for n in range(10)]
executor = AsyncioProgressbarExecutor(logger=logger)
# no guarantees for the results order
assert sorted(await executor.run(tasks)) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert executor.execution_time > 0.2
assert executor.execution_time < 0.3
@pytest.mark.asyncio
async def test_asyncio_progressbar_semaphore_executor():
tasks = [(func, [n], {}) for n in range(10)]
executor = AsyncioProgressbarSemaphoreExecutor(logger=logger, in_parallel=5)
# no guarantees for the results order
assert sorted(await executor.run(tasks)) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert executor.execution_time > 0.2
assert executor.execution_time < 0.4
@pytest.mark.asyncio
async def test_asyncio_progressbar_queue_executor():
tasks = [(func, [n], {}) for n in range(10)]
executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=2)
assert await executor.run(tasks) == [0, 1, 3, 2, 4, 6, 7, 5, 9, 8]
assert executor.execution_time > 0.5
assert executor.execution_time < 0.6
executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=3)
assert await executor.run(tasks) == [0, 3, 1, 4, 6, 2, 7, 9, 5, 8]
assert executor.execution_time > 0.4
assert executor.execution_time < 0.5
executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=5)
assert await executor.run(tasks) == [0, 3, 6, 1, 4, 7, 9, 2, 5, 8]
assert executor.execution_time > 0.3
assert executor.execution_time < 0.4
executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=10)
assert await executor.run(tasks) == [0, 3, 6, 9, 1, 4, 7, 2, 5, 8]
assert executor.execution_time > 0.2
assert executor.execution_time < 0.3