From 314eb25d1fae2a60b14a385478d4fce3420152d2 Mon Sep 17 00:00:00 2001 From: Soxoj Date: Sat, 20 Mar 2021 20:57:07 +0300 Subject: [PATCH] Created async requests executors, some sites fixes --- maigret/checking.py | 146 +++++++++++++++++++++++++++++------- maigret/maigret.py | 5 +- maigret/resources/data.json | 77 ++++++++++++------- maigret/submit.py | 2 +- tests/test_checking.py | 66 ++++++++++++++++ 5 files changed, 238 insertions(+), 58 deletions(-) create mode 100644 tests/test_checking.py diff --git a/maigret/checking.py b/maigret/checking.py index deb99e6..e606aab 100644 --- a/maigret/checking.py +++ b/maigret/checking.py @@ -3,6 +3,9 @@ import logging import re import ssl import sys +import tqdm +import time +from typing import Callable, Any, Iterable, Tuple import aiohttp import tqdm.asyncio @@ -37,6 +40,95 @@ common_errors = { unsupported_characters = '#' +QueryDraft = Tuple[Callable, Any, Any] +QueriesDraft = Iterable[QueryDraft] + +class AsyncExecutor: + def __init__(self, *args, **kwargs): + self.logger = kwargs['logger'] + + async def run(self, tasks: QueriesDraft): + start_time = time.time() + results = await self._run(tasks) + self.execution_time = time.time() - start_time + self.logger.debug(f'Spent time: {self.execution_time}') + return results + + async def _run(self, tasks: QueriesDraft): + await asyncio.sleep(0) + + +class AsyncioSimpleExecutor(AsyncExecutor): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + async def _run(self, tasks: QueriesDraft): + futures = [f(*args, **kwargs) for f, args, kwargs in tasks] + return await asyncio.gather(*futures) + + +class AsyncioProgressbarExecutor(AsyncExecutor): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + async def _run(self, tasks: QueriesDraft): + futures = [f(*args, **kwargs) for f, args, kwargs in tasks] + results = [] + for f in tqdm.asyncio.tqdm.as_completed(futures): + results.append(await f) + return results + + +class AsyncioProgressbarSemaphoreExecutor(AsyncExecutor): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.semaphore = asyncio.Semaphore(kwargs.get('in_parallel', 1)) + + async def _run(self, tasks: QueriesDraft): + async def _wrap_query(q: QueryDraft): + async with self.semaphore: + f, args, kwargs = q + return await f(*args, **kwargs) + + async def semaphore_gather(tasks: QueriesDraft): + coros = [_wrap_query(q) for q in tasks] + results = [] + for f in tqdm.asyncio.tqdm.as_completed(coros): + results.append(await f) + return results + + return await semaphore_gather(tasks) + + +class AsyncioProgressbarQueueExecutor(AsyncExecutor): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.workers_count = kwargs.get('in_parallel', 10) + self.progress_func = kwargs.get('progress_func', tqdm.tqdm) + self.queue = asyncio.Queue(self.workers_count) + + async def worker(self): + while True: + f, args, kwargs = await self.queue.get() + result = await f(*args, **kwargs) + self.results.append(result) + self.progress.update(1) + self.queue.task_done() + + async def _run(self, tasks: QueriesDraft): + self.results = [] + workers = [asyncio.create_task(self.worker()) + for _ in range(self.workers_count)] + task_list = list(tasks) + self.progress = self.progress_func(total=len(task_list)) + for t in task_list: + await self.queue.put(t) + await self.queue.join() + for w in workers: + w.cancel() + self.progress.close() + return self.results + async def get_response(request_future, site_name, logger): html_text = None @@ -87,19 +179,18 @@ async def get_response(request_future, site_name, logger): return html_text, status_code, error_text, expection_text -async def update_site_dict_from_response(sitename, site_dict, results_info, semaphore, logger, query_notify): - async with semaphore: - site_obj = site_dict[sitename] - future = site_obj.request_future - if not future: - # ignore: search by incompatible id type - return +async def update_site_dict_from_response(sitename, site_dict, results_info, logger, query_notify): + site_obj = site_dict[sitename] + future = site_obj.request_future + if not future: + # ignore: search by incompatible id type + return - response = await get_response(request_future=future, - site_name=sitename, - logger=logger) + response = await get_response(request_future=future, + site_name=sitename, + logger=logger) - site_dict[sitename] = process_site_result(response, query_notify, logger, results_info, site_obj) + return sitename, process_site_result(response, query_notify, logger, results_info, site_obj) # TODO: move to separate class @@ -454,32 +545,33 @@ async def maigret(username, site_dict, query_notify, logger, # Add this site's results into final dictionary with all of the other results. results_total[site_name] = results_site - # TODO: move into top-level function - - sem = asyncio.Semaphore(max_connections) - - tasks = [] + coroutines = [] for sitename, result_obj in results_total.items(): - update_site_coro = update_site_dict_from_response(sitename, site_dict, result_obj, sem, logger, query_notify) - future = asyncio.ensure_future(update_site_coro) - tasks.append(future) + coroutines.append((update_site_dict_from_response, [sitename, site_dict, result_obj, logger, query_notify], {})) if no_progressbar: - await asyncio.gather(*tasks) + executor = AsyncioSimpleExecutor(logger=logger) else: - for f in tqdm.asyncio.tqdm.as_completed(tasks, timeout=timeout): - try: - await f - except asyncio.exceptions.TimeoutError: - # TODO: write timeout to results - pass + executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=max_connections, timeout=timeout+0.5) + + results = await executor.run(coroutines) await session.close() # Notify caller that all queries are finished. query_notify.finish() - return results_total + data = {} + for result in results: + # TODO: still can be empty + if result: + try: + data[result[0]] = result[1] + except Exception as e: + logger.error(e, exc_info=True) + logger.info(result) + + return data def timeout_check(value): diff --git a/maigret/maigret.py b/maigret/maigret.py index b613d76..9f37704 100755 --- a/maigret/maigret.py +++ b/maigret/maigret.py @@ -261,7 +261,7 @@ async def main(): print('Maigret sites database self-checking...') is_need_update = await self_check(db, site_data, logger, max_connections=args.connections) if is_need_update: - if input('Do you want to save changes permanently? [yYnN]\n').lower() == 'y': + if input('Do you want to save changes permanently? [Yn]\n').lower() == 'y': db.save_to_file(args.db_file) print('Database was successfully updated.') else: @@ -337,14 +337,13 @@ async def main(): max_connections=args.connections, ) - username_result = (username, id_type, results) general_results.append((username, id_type, results)) # TODO: tests for website_name in results: dictionary = results[website_name] # TODO: fix no site data issue - if not dictionary: + if not dictionary or not recursive_search_enabled: continue new_usernames = dictionary.get('ids_usernames') diff --git a/maigret/resources/data.json b/maigret/resources/data.json index 64e1277..d78e4b0 100644 --- a/maigret/resources/data.json +++ b/maigret/resources/data.json @@ -2426,8 +2426,6 @@ }, "Ccmixter": { "tags": [ - "global", - "in", "us" ], "checkType": "message", @@ -2443,13 +2441,18 @@ }, "Cent": { "tags": [ - "in", - "mx", - "tw", - "us" + "us", + "art", + "writing" ], + "urlProbe": "https://beta.cent.co/data/user/profile?userHandles={username}", "checkType": "message", - "absenceStrs": "Cent", + "presenseStrs": [ + "display_name" + ], + "absenceStrs": [ + "\"results\":[]" + ], "alexaRank": 31175, "url": "https://beta.cent.co/@{username}", "urlMain": "https://cent.co/", @@ -11713,15 +11716,13 @@ "usernameClaimed": "vitaline", "usernameUnclaimed": "noonewouldeverusethis7" }, - "Sevenforums": { + "SevenForums": { "tags": [ "gb", "us" ], - "checkType": "message", - "absenceStrs": "Just a moment...", + "engine": "vBulletin", "alexaRank": 20828, - "url": "https://www.sevenforums.com/members/{username}.html", "urlMain": "https://www.sevenforums.com", "usernameClaimed": "adam", "usernameUnclaimed": "noonewouldeverusethis7" @@ -12349,7 +12350,7 @@ "us" ], "headers": { - "authorization": "Bearer BQA6sdhtUg3hadjln7DCoAK6sLn7KrHfsn2DObW2gr-W3HgF0h1KZGVYgwispRDR1tqRntVeTd0Duvb2q4g" + "authorization": "Bearer BQBQDhCkzUqE4QBPyqrSyRZbBRp5pdttS7rj9J8qT7OllWuJazqP6CcE-1eGcNoRkxNl9Ds9JCdgY3soi6U" }, "errors": { "Spotify is currently not available in your country.": "Access denied in your country, use proxy/vpn" @@ -12774,6 +12775,7 @@ "usernameUnclaimed": "noonewouldeverusethis7" }, "TJournal": { + "similarSearch": true, "tags": [ "ru" ], @@ -12900,18 +12902,6 @@ "usernameClaimed": "taplink.ru", "usernameUnclaimed": "noonewouldeverusethis77777" }, - "Taringa": { - "tags": [ - "ar" - ], - "checkType": "message", - "absenceStrs": "Moved Permanently", - "alexaRank": 4125, - "url": "https://www.taringa.net/{username}", - "urlMain": "https://taringa.net/", - "usernameClaimed": "blue", - "usernameUnclaimed": "noonewouldeverusethis7" - }, "TechPowerUp": { "tags": [ "us" @@ -13690,7 +13680,7 @@ "sec-ch-ua": "Google Chrome\";v=\"87\", \" Not;A Brand\";v=\"99\", \"Chromium\";v=\"87\"", "authorization": "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA", "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36", - "x-guest-token": "1372637128920825857" + "x-guest-token": "1373308975769391104" }, "errors": { "Bad guest token": "x-guest-token update required" @@ -13865,6 +13855,7 @@ "usernameUnclaimed": "noonewouldeverusethis7" }, "VC.ru": { + "similarSearch": true, "tags": [ "ru" ], @@ -14062,7 +14053,7 @@ "video" ], "headers": { - "Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTYxMDcyNjAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbH0.kzWxBf1qCJwjpZYUP6w-Pf4VptBMKpKUaMw8VnYwtPU" + "Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTYyNjMwODAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbH0.YKtLE0-AGmaXJNF99dVKjPW8z5_-wDs6tnnjVOybDaQ" }, "activation": { "url": "https://vimeo.com/_rv/viewer", @@ -14091,7 +14082,7 @@ "usernameClaimed": "blue", "usernameUnclaimed": "noonewouldeverusethis7" }, - "Virtualireland": { + "VirtualIreland": { "tags": [ "ie", "ru" @@ -23657,6 +23648,21 @@ "urlMain": "https://linuxpip.org", "usernameClaimed": "diehard", "usernameUnclaimed": "noonewouldeverusethis7" + }, + "Taringa": { + "checkType": "message", + "presenseStrs": [ + "User", + " user-username", + " UserFeed" + ], + "absenceStrs": [ + "problema" + ], + "url": "https://www.taringa.net/{username}", + "urlMain": "https://www.taringa.net", + "usernameClaimed": "UniversoGIA", + "usernameUnclaimed": "noonewouldeverusethis7" } }, "engines": { @@ -23727,6 +23733,7 @@ ], "checkType": "message", "errors": { + "\u041f\u0440\u043e\u0441\u0442\u0438\u0442\u0435, \u043d\u043e \u0432\u0430\u0448 IP \u0432 \u0441\u043f\u0438\u0441\u043a\u0435 \u0437\u0430\u043f\u0440\u0435\u0449\u0435\u043d\u043d\u044b\u0445 \u0430\u0434\u043c\u0438\u043d\u0438\u0441\u0442\u0440\u0430\u0446\u0438\u0435\u0439 \u0444\u043e\u0440\u0443\u043c\u0430": "IP ban", "You have been banned": "IP ban", "The administrator has banned your IP address": "IP ban", "\u0418\u0437\u0432\u0438\u043d\u0438\u0442\u0435, \u0441\u0435\u0440\u0432\u0435\u0440 \u043f\u0435\u0440\u0435\u0433\u0440\u0443\u0436\u0435\u043d. \u041f\u043e\u0436\u0430\u043b\u0443\u0439\u0441\u0442\u0430, \u043f\u043e\u043f\u0440\u043e\u0431\u0443\u0439\u0442\u0435 \u0437\u0430\u0439\u0442\u0438 \u043f\u043e\u0437\u0436\u0435.": "Server is overloaded" @@ -23762,6 +23769,7 @@ "error404" ], "checkType": "message", + "requestHeadOnly": false, "url": "{urlMain}/author/{username}/" }, "presenseStrs": [ @@ -23769,6 +23777,21 @@ "/wp-includes/wlwmanifest.xml" ] }, + "Flarum": { + "name": "Flarum", + "site": { + "presenseStrs": [ + "\"attributes\":{\"username\"" + ], + "absenceStrs": [ + "NotFound" + ], + "checkType": "message" + }, + "presenseStrs": [ + "flarum-loading-error" + ] + }, "engine404": { "name": "engine404", "site": { diff --git a/maigret/submit.py b/maigret/submit.py index 9f88b03..d31744b 100644 --- a/maigret/submit.py +++ b/maigret/submit.py @@ -170,7 +170,7 @@ async def submit_dialog(db, url_exists, cookie_file): print(f'Sorry, we couldn\'t find params to detect account presence/absence in {site.name}.') print('Try to run this mode again and increase features count or choose others.') else: - if input(f'Site {site.name} successfully checked. Do you want to save it in the Maigret DB? [yY] ') in 'yY': + if input(f'Site {site.name} successfully checked. Do you want to save it in the Maigret DB? [Yn] ').lower() in 'y': db.update_site(site) return True diff --git a/tests/test_checking.py b/tests/test_checking.py new file mode 100644 index 0000000..a0bddf5 --- /dev/null +++ b/tests/test_checking.py @@ -0,0 +1,66 @@ +"""Maigret checking logic test functions""" +import pytest +import asyncio +import logging +from maigret.checking import AsyncioSimpleExecutor, AsyncioProgressbarExecutor, AsyncioProgressbarSemaphoreExecutor, AsyncioProgressbarQueueExecutor + +logger = logging.getLogger(__name__) + +async def func(n): + await asyncio.sleep(0.1 * (n % 3)) + return n + + +@pytest.mark.asyncio +async def test_simple_asyncio_executor(): + tasks = [(func, [n], {}) for n in range(10)] + executor = AsyncioSimpleExecutor(logger=logger) + assert await executor.run(tasks) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + assert executor.execution_time > 0.2 + assert executor.execution_time < 0.3 + +@pytest.mark.asyncio +async def test_asyncio_progressbar_executor(): + tasks = [(func, [n], {}) for n in range(10)] + + executor = AsyncioProgressbarExecutor(logger=logger) + # no guarantees for the results order + assert sorted(await executor.run(tasks)) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + assert executor.execution_time > 0.2 + assert executor.execution_time < 0.3 + + +@pytest.mark.asyncio +async def test_asyncio_progressbar_semaphore_executor(): + tasks = [(func, [n], {}) for n in range(10)] + + executor = AsyncioProgressbarSemaphoreExecutor(logger=logger, in_parallel=5) + # no guarantees for the results order + assert sorted(await executor.run(tasks)) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + assert executor.execution_time > 0.2 + assert executor.execution_time < 0.4 + + +@pytest.mark.asyncio +async def test_asyncio_progressbar_queue_executor(): + tasks = [(func, [n], {}) for n in range(10)] + + executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=2) + assert await executor.run(tasks) == [0, 1, 3, 2, 4, 6, 7, 5, 9, 8] + assert executor.execution_time > 0.5 + assert executor.execution_time < 0.6 + + executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=3) + assert await executor.run(tasks) == [0, 3, 1, 4, 6, 2, 7, 9, 5, 8] + assert executor.execution_time > 0.4 + assert executor.execution_time < 0.5 + + executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=5) + assert await executor.run(tasks) == [0, 3, 6, 1, 4, 7, 9, 2, 5, 8] + assert executor.execution_time > 0.3 + assert executor.execution_time < 0.4 + + executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=10) + assert await executor.run(tasks) == [0, 3, 6, 9, 1, 4, 7, 2, 5, 8] + assert executor.execution_time > 0.2 + assert executor.execution_time < 0.3 \ No newline at end of file