Merge pull request #83 from soxoj/executors-update

Created async requests executors, some sites fixes
This commit is contained in:
soxoj
2021-03-20 20:59:22 +03:00
committed by GitHub
5 changed files with 238 additions and 58 deletions
+119 -27
View File
@@ -3,6 +3,9 @@ import logging
import re import re
import ssl import ssl
import sys import sys
import tqdm
import time
from typing import Callable, Any, Iterable, Tuple
import aiohttp import aiohttp
import tqdm.asyncio import tqdm.asyncio
@@ -37,6 +40,95 @@ common_errors = {
unsupported_characters = '#' unsupported_characters = '#'
QueryDraft = Tuple[Callable, Any, Any]
QueriesDraft = Iterable[QueryDraft]
class AsyncExecutor:
def __init__(self, *args, **kwargs):
self.logger = kwargs['logger']
async def run(self, tasks: QueriesDraft):
start_time = time.time()
results = await self._run(tasks)
self.execution_time = time.time() - start_time
self.logger.debug(f'Spent time: {self.execution_time}')
return results
async def _run(self, tasks: QueriesDraft):
await asyncio.sleep(0)
class AsyncioSimpleExecutor(AsyncExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def _run(self, tasks: QueriesDraft):
futures = [f(*args, **kwargs) for f, args, kwargs in tasks]
return await asyncio.gather(*futures)
class AsyncioProgressbarExecutor(AsyncExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def _run(self, tasks: QueriesDraft):
futures = [f(*args, **kwargs) for f, args, kwargs in tasks]
results = []
for f in tqdm.asyncio.tqdm.as_completed(futures):
results.append(await f)
return results
class AsyncioProgressbarSemaphoreExecutor(AsyncExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.semaphore = asyncio.Semaphore(kwargs.get('in_parallel', 1))
async def _run(self, tasks: QueriesDraft):
async def _wrap_query(q: QueryDraft):
async with self.semaphore:
f, args, kwargs = q
return await f(*args, **kwargs)
async def semaphore_gather(tasks: QueriesDraft):
coros = [_wrap_query(q) for q in tasks]
results = []
for f in tqdm.asyncio.tqdm.as_completed(coros):
results.append(await f)
return results
return await semaphore_gather(tasks)
class AsyncioProgressbarQueueExecutor(AsyncExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.workers_count = kwargs.get('in_parallel', 10)
self.progress_func = kwargs.get('progress_func', tqdm.tqdm)
self.queue = asyncio.Queue(self.workers_count)
async def worker(self):
while True:
f, args, kwargs = await self.queue.get()
result = await f(*args, **kwargs)
self.results.append(result)
self.progress.update(1)
self.queue.task_done()
async def _run(self, tasks: QueriesDraft):
self.results = []
workers = [asyncio.create_task(self.worker())
for _ in range(self.workers_count)]
task_list = list(tasks)
self.progress = self.progress_func(total=len(task_list))
for t in task_list:
await self.queue.put(t)
await self.queue.join()
for w in workers:
w.cancel()
self.progress.close()
return self.results
async def get_response(request_future, site_name, logger): async def get_response(request_future, site_name, logger):
html_text = None html_text = None
@@ -87,19 +179,18 @@ async def get_response(request_future, site_name, logger):
return html_text, status_code, error_text, expection_text return html_text, status_code, error_text, expection_text
async def update_site_dict_from_response(sitename, site_dict, results_info, semaphore, logger, query_notify): async def update_site_dict_from_response(sitename, site_dict, results_info, logger, query_notify):
async with semaphore: site_obj = site_dict[sitename]
site_obj = site_dict[sitename] future = site_obj.request_future
future = site_obj.request_future if not future:
if not future: # ignore: search by incompatible id type
# ignore: search by incompatible id type return
return
response = await get_response(request_future=future, response = await get_response(request_future=future,
site_name=sitename, site_name=sitename,
logger=logger) logger=logger)
site_dict[sitename] = process_site_result(response, query_notify, logger, results_info, site_obj) return sitename, process_site_result(response, query_notify, logger, results_info, site_obj)
# TODO: move to separate class # TODO: move to separate class
@@ -454,32 +545,33 @@ async def maigret(username, site_dict, query_notify, logger,
# Add this site's results into final dictionary with all of the other results. # Add this site's results into final dictionary with all of the other results.
results_total[site_name] = results_site results_total[site_name] = results_site
# TODO: move into top-level function coroutines = []
sem = asyncio.Semaphore(max_connections)
tasks = []
for sitename, result_obj in results_total.items(): for sitename, result_obj in results_total.items():
update_site_coro = update_site_dict_from_response(sitename, site_dict, result_obj, sem, logger, query_notify) coroutines.append((update_site_dict_from_response, [sitename, site_dict, result_obj, logger, query_notify], {}))
future = asyncio.ensure_future(update_site_coro)
tasks.append(future)
if no_progressbar: if no_progressbar:
await asyncio.gather(*tasks) executor = AsyncioSimpleExecutor(logger=logger)
else: else:
for f in tqdm.asyncio.tqdm.as_completed(tasks, timeout=timeout): executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=max_connections, timeout=timeout+0.5)
try:
await f results = await executor.run(coroutines)
except asyncio.exceptions.TimeoutError:
# TODO: write timeout to results
pass
await session.close() await session.close()
# Notify caller that all queries are finished. # Notify caller that all queries are finished.
query_notify.finish() query_notify.finish()
return results_total data = {}
for result in results:
# TODO: still can be empty
if result:
try:
data[result[0]] = result[1]
except Exception as e:
logger.error(e, exc_info=True)
logger.info(result)
return data
def timeout_check(value): def timeout_check(value):
+2 -3
View File
@@ -261,7 +261,7 @@ async def main():
print('Maigret sites database self-checking...') print('Maigret sites database self-checking...')
is_need_update = await self_check(db, site_data, logger, max_connections=args.connections) is_need_update = await self_check(db, site_data, logger, max_connections=args.connections)
if is_need_update: if is_need_update:
if input('Do you want to save changes permanently? [yYnN]\n').lower() == 'y': if input('Do you want to save changes permanently? [Yn]\n').lower() == 'y':
db.save_to_file(args.db_file) db.save_to_file(args.db_file)
print('Database was successfully updated.') print('Database was successfully updated.')
else: else:
@@ -337,14 +337,13 @@ async def main():
max_connections=args.connections, max_connections=args.connections,
) )
username_result = (username, id_type, results)
general_results.append((username, id_type, results)) general_results.append((username, id_type, results))
# TODO: tests # TODO: tests
for website_name in results: for website_name in results:
dictionary = results[website_name] dictionary = results[website_name]
# TODO: fix no site data issue # TODO: fix no site data issue
if not dictionary: if not dictionary or not recursive_search_enabled:
continue continue
new_usernames = dictionary.get('ids_usernames') new_usernames = dictionary.get('ids_usernames')
+50 -27
View File
@@ -2426,8 +2426,6 @@
}, },
"Ccmixter": { "Ccmixter": {
"tags": [ "tags": [
"global",
"in",
"us" "us"
], ],
"checkType": "message", "checkType": "message",
@@ -2443,13 +2441,18 @@
}, },
"Cent": { "Cent": {
"tags": [ "tags": [
"in", "us",
"mx", "art",
"tw", "writing"
"us"
], ],
"urlProbe": "https://beta.cent.co/data/user/profile?userHandles={username}",
"checkType": "message", "checkType": "message",
"absenceStrs": "<title>Cent</title>", "presenseStrs": [
"display_name"
],
"absenceStrs": [
"\"results\":[]"
],
"alexaRank": 31175, "alexaRank": 31175,
"url": "https://beta.cent.co/@{username}", "url": "https://beta.cent.co/@{username}",
"urlMain": "https://cent.co/", "urlMain": "https://cent.co/",
@@ -11713,15 +11716,13 @@
"usernameClaimed": "vitaline", "usernameClaimed": "vitaline",
"usernameUnclaimed": "noonewouldeverusethis7" "usernameUnclaimed": "noonewouldeverusethis7"
}, },
"Sevenforums": { "SevenForums": {
"tags": [ "tags": [
"gb", "gb",
"us" "us"
], ],
"checkType": "message", "engine": "vBulletin",
"absenceStrs": "<title>Just a moment...</title>",
"alexaRank": 20828, "alexaRank": 20828,
"url": "https://www.sevenforums.com/members/{username}.html",
"urlMain": "https://www.sevenforums.com", "urlMain": "https://www.sevenforums.com",
"usernameClaimed": "adam", "usernameClaimed": "adam",
"usernameUnclaimed": "noonewouldeverusethis7" "usernameUnclaimed": "noonewouldeverusethis7"
@@ -12349,7 +12350,7 @@
"us" "us"
], ],
"headers": { "headers": {
"authorization": "Bearer BQA6sdhtUg3hadjln7DCoAK6sLn7KrHfsn2DObW2gr-W3HgF0h1KZGVYgwispRDR1tqRntVeTd0Duvb2q4g" "authorization": "Bearer BQBQDhCkzUqE4QBPyqrSyRZbBRp5pdttS7rj9J8qT7OllWuJazqP6CcE-1eGcNoRkxNl9Ds9JCdgY3soi6U"
}, },
"errors": { "errors": {
"Spotify is currently not available in your country.": "Access denied in your country, use proxy/vpn" "Spotify is currently not available in your country.": "Access denied in your country, use proxy/vpn"
@@ -12774,6 +12775,7 @@
"usernameUnclaimed": "noonewouldeverusethis7" "usernameUnclaimed": "noonewouldeverusethis7"
}, },
"TJournal": { "TJournal": {
"similarSearch": true,
"tags": [ "tags": [
"ru" "ru"
], ],
@@ -12900,18 +12902,6 @@
"usernameClaimed": "taplink.ru", "usernameClaimed": "taplink.ru",
"usernameUnclaimed": "noonewouldeverusethis77777" "usernameUnclaimed": "noonewouldeverusethis77777"
}, },
"Taringa": {
"tags": [
"ar"
],
"checkType": "message",
"absenceStrs": "Moved Permanently",
"alexaRank": 4125,
"url": "https://www.taringa.net/{username}",
"urlMain": "https://taringa.net/",
"usernameClaimed": "blue",
"usernameUnclaimed": "noonewouldeverusethis7"
},
"TechPowerUp": { "TechPowerUp": {
"tags": [ "tags": [
"us" "us"
@@ -13690,7 +13680,7 @@
"sec-ch-ua": "Google Chrome\";v=\"87\", \" Not;A Brand\";v=\"99\", \"Chromium\";v=\"87\"", "sec-ch-ua": "Google Chrome\";v=\"87\", \" Not;A Brand\";v=\"99\", \"Chromium\";v=\"87\"",
"authorization": "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA", "authorization": "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36", "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36",
"x-guest-token": "1372637128920825857" "x-guest-token": "1373308975769391104"
}, },
"errors": { "errors": {
"Bad guest token": "x-guest-token update required" "Bad guest token": "x-guest-token update required"
@@ -13865,6 +13855,7 @@
"usernameUnclaimed": "noonewouldeverusethis7" "usernameUnclaimed": "noonewouldeverusethis7"
}, },
"VC.ru": { "VC.ru": {
"similarSearch": true,
"tags": [ "tags": [
"ru" "ru"
], ],
@@ -14062,7 +14053,7 @@
"video" "video"
], ],
"headers": { "headers": {
"Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTYxMDcyNjAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbH0.kzWxBf1qCJwjpZYUP6w-Pf4VptBMKpKUaMw8VnYwtPU" "Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTYyNjMwODAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbH0.YKtLE0-AGmaXJNF99dVKjPW8z5_-wDs6tnnjVOybDaQ"
}, },
"activation": { "activation": {
"url": "https://vimeo.com/_rv/viewer", "url": "https://vimeo.com/_rv/viewer",
@@ -14091,7 +14082,7 @@
"usernameClaimed": "blue", "usernameClaimed": "blue",
"usernameUnclaimed": "noonewouldeverusethis7" "usernameUnclaimed": "noonewouldeverusethis7"
}, },
"Virtualireland": { "VirtualIreland": {
"tags": [ "tags": [
"ie", "ie",
"ru" "ru"
@@ -23657,6 +23648,21 @@
"urlMain": "https://linuxpip.org", "urlMain": "https://linuxpip.org",
"usernameClaimed": "diehard", "usernameClaimed": "diehard",
"usernameUnclaimed": "noonewouldeverusethis7" "usernameUnclaimed": "noonewouldeverusethis7"
},
"Taringa": {
"checkType": "message",
"presenseStrs": [
"User",
" user-username",
" UserFeed"
],
"absenceStrs": [
"problema"
],
"url": "https://www.taringa.net/{username}",
"urlMain": "https://www.taringa.net",
"usernameClaimed": "UniversoGIA",
"usernameUnclaimed": "noonewouldeverusethis7"
} }
}, },
"engines": { "engines": {
@@ -23727,6 +23733,7 @@
], ],
"checkType": "message", "checkType": "message",
"errors": { "errors": {
"\u041f\u0440\u043e\u0441\u0442\u0438\u0442\u0435, \u043d\u043e \u0432\u0430\u0448 IP \u0432 \u0441\u043f\u0438\u0441\u043a\u0435 \u0437\u0430\u043f\u0440\u0435\u0449\u0435\u043d\u043d\u044b\u0445 \u0430\u0434\u043c\u0438\u043d\u0438\u0441\u0442\u0440\u0430\u0446\u0438\u0435\u0439 \u0444\u043e\u0440\u0443\u043c\u0430": "IP ban",
"You have been banned": "IP ban", "You have been banned": "IP ban",
"The administrator has banned your IP address": "IP ban", "The administrator has banned your IP address": "IP ban",
"\u0418\u0437\u0432\u0438\u043d\u0438\u0442\u0435, \u0441\u0435\u0440\u0432\u0435\u0440 \u043f\u0435\u0440\u0435\u0433\u0440\u0443\u0436\u0435\u043d. \u041f\u043e\u0436\u0430\u043b\u0443\u0439\u0441\u0442\u0430, \u043f\u043e\u043f\u0440\u043e\u0431\u0443\u0439\u0442\u0435 \u0437\u0430\u0439\u0442\u0438 \u043f\u043e\u0437\u0436\u0435.": "Server is overloaded" "\u0418\u0437\u0432\u0438\u043d\u0438\u0442\u0435, \u0441\u0435\u0440\u0432\u0435\u0440 \u043f\u0435\u0440\u0435\u0433\u0440\u0443\u0436\u0435\u043d. \u041f\u043e\u0436\u0430\u043b\u0443\u0439\u0441\u0442\u0430, \u043f\u043e\u043f\u0440\u043e\u0431\u0443\u0439\u0442\u0435 \u0437\u0430\u0439\u0442\u0438 \u043f\u043e\u0437\u0436\u0435.": "Server is overloaded"
@@ -23762,6 +23769,7 @@
"error404" "error404"
], ],
"checkType": "message", "checkType": "message",
"requestHeadOnly": false,
"url": "{urlMain}/author/{username}/" "url": "{urlMain}/author/{username}/"
}, },
"presenseStrs": [ "presenseStrs": [
@@ -23769,6 +23777,21 @@
"/wp-includes/wlwmanifest.xml" "/wp-includes/wlwmanifest.xml"
] ]
}, },
"Flarum": {
"name": "Flarum",
"site": {
"presenseStrs": [
"\"attributes\":{\"username\""
],
"absenceStrs": [
"NotFound"
],
"checkType": "message"
},
"presenseStrs": [
"flarum-loading-error"
]
},
"engine404": { "engine404": {
"name": "engine404", "name": "engine404",
"site": { "site": {
+1 -1
View File
@@ -170,7 +170,7 @@ async def submit_dialog(db, url_exists, cookie_file):
print(f'Sorry, we couldn\'t find params to detect account presence/absence in {site.name}.') print(f'Sorry, we couldn\'t find params to detect account presence/absence in {site.name}.')
print('Try to run this mode again and increase features count or choose others.') print('Try to run this mode again and increase features count or choose others.')
else: else:
if input(f'Site {site.name} successfully checked. Do you want to save it in the Maigret DB? [yY] ') in 'yY': if input(f'Site {site.name} successfully checked. Do you want to save it in the Maigret DB? [Yn] ').lower() in 'y':
db.update_site(site) db.update_site(site)
return True return True
+66
View File
@@ -0,0 +1,66 @@
"""Maigret checking logic test functions"""
import pytest
import asyncio
import logging
from maigret.checking import AsyncioSimpleExecutor, AsyncioProgressbarExecutor, AsyncioProgressbarSemaphoreExecutor, AsyncioProgressbarQueueExecutor
logger = logging.getLogger(__name__)
async def func(n):
await asyncio.sleep(0.1 * (n % 3))
return n
@pytest.mark.asyncio
async def test_simple_asyncio_executor():
tasks = [(func, [n], {}) for n in range(10)]
executor = AsyncioSimpleExecutor(logger=logger)
assert await executor.run(tasks) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert executor.execution_time > 0.2
assert executor.execution_time < 0.3
@pytest.mark.asyncio
async def test_asyncio_progressbar_executor():
tasks = [(func, [n], {}) for n in range(10)]
executor = AsyncioProgressbarExecutor(logger=logger)
# no guarantees for the results order
assert sorted(await executor.run(tasks)) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert executor.execution_time > 0.2
assert executor.execution_time < 0.3
@pytest.mark.asyncio
async def test_asyncio_progressbar_semaphore_executor():
tasks = [(func, [n], {}) for n in range(10)]
executor = AsyncioProgressbarSemaphoreExecutor(logger=logger, in_parallel=5)
# no guarantees for the results order
assert sorted(await executor.run(tasks)) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert executor.execution_time > 0.2
assert executor.execution_time < 0.4
@pytest.mark.asyncio
async def test_asyncio_progressbar_queue_executor():
tasks = [(func, [n], {}) for n in range(10)]
executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=2)
assert await executor.run(tasks) == [0, 1, 3, 2, 4, 6, 7, 5, 9, 8]
assert executor.execution_time > 0.5
assert executor.execution_time < 0.6
executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=3)
assert await executor.run(tasks) == [0, 3, 1, 4, 6, 2, 7, 9, 5, 8]
assert executor.execution_time > 0.4
assert executor.execution_time < 0.5
executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=5)
assert await executor.run(tasks) == [0, 3, 6, 1, 4, 7, 9, 2, 5, 8]
assert executor.execution_time > 0.3
assert executor.execution_time < 0.4
executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=10)
assert await executor.run(tasks) == [0, 3, 6, 9, 1, 4, 7, 2, 5, 8]
assert executor.execution_time > 0.2
assert executor.execution_time < 0.3