From 552489abcbb442dda7a32e22c625540ae3e8e1b2 Mon Sep 17 00:00:00 2001 From: Soxoj Date: Sat, 12 Dec 2020 18:54:34 +0300 Subject: [PATCH] Parallel results processing --- maigret/maigret.py | 350 ++++++++++++++++++++++----------------------- 1 file changed, 175 insertions(+), 175 deletions(-) diff --git a/maigret/maigret.py b/maigret/maigret.py index cf5f390..4e56ee3 100755 --- a/maigret/maigret.py +++ b/maigret/maigret.py @@ -51,7 +51,7 @@ unsupported_characters = '#' cookies_file = 'cookies.txt' -async def get_response(request_future, error_type, social_network, logger): +async def get_response(request_future, social_network, logger): html_text = None status_code = 0 @@ -97,18 +97,19 @@ async def get_response(request_future, error_type, social_network, logger): return html_text, status_code, error_text, expection_text -async def update_site_data_from_response(site, site_data, site_info, semaphore, logger): +async def update_site_data_from_response(sitename, site_data, results_info, semaphore, logger, query_notify): async with semaphore: - future = site_info.get('request_future') + site_obj = site_data[sitename] + future = site_obj.get('request_future') if not future: # ignore: search by incompatible id type return - error_type = site_info['errorType'] - site_data[site]['resp'] = await get_response(request_future=future, - error_type=error_type, - social_network=site, - logger=logger) + response = await get_response(request_future=future, + social_network=sitename, + logger=logger) + + site_data[sitename] = process_site_result(response, query_notify, logger, results_info, site_obj, sitename) # TODO: move info separate module @@ -132,6 +133,152 @@ def detect_error_page(html_text, status_code, fail_flags, ignore_403): return None, None +def process_site_result(response, query_notify, logger, results_info, net_info, social_network): + if not response: + return results_info + + # Retrieve other site information again + username = results_info['username'] + is_parsing_enabled = results_info['parsing_enabled'] + url = results_info.get("url_user") + logger.debug(url) + + status = results_info.get("status") + if status is not None: + # We have already determined the user doesn't exist here + return results_info + + # Get the expected error type + error_type = net_info["errorType"] + + # Get the failure messages and comments + failure_errors = net_info.get("errors", {}) + + # TODO: refactor + if not response: + logger.error(f'No response for {social_network}') + return results_info + + html_text, status_code, error_text, expection_text = response + site_error_text = '?' + + # TODO: add elapsed request time counting + response_time = None + + if logger.level == logging.DEBUG: + with open('debug.txt', 'a') as f: + status = status_code or 'No response' + f.write(f'url: {url}\nerror: {str(error_text)}\nr: {status}\n') + if html_text: + f.write(f'code: {status}\nresponse: {str(html_text)}\n') + + if status_code and not error_text: + error_text, site_error_text = detect_error_page(html_text, status_code, failure_errors, + 'ignore_403' in net_info) + + # presense flags + # True by default + presense_flags = net_info.get("presenseStrs", []) + is_presense_detected = html_text and all( + [(presense_flag in html_text) for presense_flag in presense_flags]) or not presense_flags + + if error_text is not None: + logger.debug(error_text) + result = QueryResult(username, + social_network, + url, + QueryStatus.UNKNOWN, + query_time=response_time, + context=f'{error_text}: {site_error_text}') + elif error_type == "message": + absence_flags = net_info.get("errorMsg") + is_absence_flags_list = isinstance(absence_flags, list) + absence_flags_set = set(absence_flags) if is_absence_flags_list else {absence_flags} + # Checks if the error message is in the HTML + is_absence_detected = any([(absence_flag in html_text) for absence_flag in absence_flags_set]) + if not is_absence_detected and is_presense_detected: + result = QueryResult(username, + social_network, + url, + QueryStatus.CLAIMED, + query_time=response_time) + else: + result = QueryResult(username, + social_network, + url, + QueryStatus.AVAILABLE, + query_time=response_time) + elif error_type == "status_code": + # Checks if the status code of the response is 2XX + if (not status_code >= 300 or status_code < 200) and is_presense_detected: + result = QueryResult(username, + social_network, + url, + QueryStatus.CLAIMED, + query_time=response_time) + else: + result = QueryResult(username, + social_network, + url, + QueryStatus.AVAILABLE, + query_time=response_time) + elif error_type == "response_url": + # For this detection method, we have turned off the redirect. + # So, there is no need to check the response URL: it will always + # match the request. Instead, we will ensure that the response + # code indicates that the request was successful (i.e. no 404, or + # forward to some odd redirect). + if 200 <= status_code < 300 and is_presense_detected: + result = QueryResult(username, + social_network, + url, + QueryStatus.CLAIMED, + query_time=response_time) + else: + result = QueryResult(username, + social_network, + url, + QueryStatus.AVAILABLE, + query_time=response_time) + else: + # It should be impossible to ever get here... + raise ValueError(f"Unknown Error Type '{error_type}' for " + f"site '{social_network}'") + + extracted_ids_data = {} + + if is_parsing_enabled and result.status == QueryStatus.CLAIMED: + try: + extracted_ids_data = extract(html_text) + except Exception as e: + logger.warning(f'Error while parsing {social_network}: {e}', exc_info=True) + + if extracted_ids_data: + new_usernames = {} + for k, v in extracted_ids_data.items(): + if 'username' in k: + new_usernames[v] = 'username' + if k in supported_recursive_search_ids: + new_usernames[v] = k + + results_info['ids_usernames'] = new_usernames + result.ids_data = extracted_ids_data + + is_similar = net_info.get('similarSearch', False) + # Notify caller about results of query. + query_notify.update(result, is_similar) + + # Save status of request + results_info['status'] = result + + # Save results from request + results_info['http_status'] = status_code + results_info['is_similar'] = is_similar + # results_site['response_text'] = html_text + results_info['rank'] = net_info.get('rank', 0) + return results_info + + async def maigret(username, site_data, query_notify, logger, proxy=None, timeout=None, recursive_search=False, id_type='username', tags=None, debug=False, forced=False, @@ -179,7 +326,7 @@ async def maigret(username, site_data, query_notify, logger, if logger.level == logging.DEBUG: future = session.get(url='https://icanhazip.com') - ip, status, error, expection = await get_response(future, None, 'probe', logger) + ip, status, error, expection = await get_response(future, None, logger) if ip: logger.debug(f'My IP is: {ip.strip()}') else: @@ -205,7 +352,9 @@ async def maigret(username, site_data, query_notify, logger, # Results from analysis of this specific site results_site = {} - # Record URL of main site + # Record URL of main site and username + results_site['username'] = username + results_site['parsing_enabled'] = recursive_search results_site['url_main'] = net_info.get("urlMain") headers = { @@ -276,16 +425,14 @@ async def maigret(username, site_data, query_notify, logger, allow_redirects = True # TODO: cookies using - def parse_cookies(cookies_str): - cookies = SimpleCookie() - cookies.load(cookies_str) - return {key: morsel.value for key, morsel in cookies.items()} - - if os.path.exists(cookies_file): - cookies_obj = cookielib.MozillaCookieJar(cookies_file) - cookies_obj.load(ignore_discard=True, ignore_expires=True) - else: - cookies_obj = [] + # def parse_cookies(cookies_str): + # cookies = SimpleCookie() + # cookies.load(cookies_str) + # return {key: morsel.value for key, morsel in cookies.items()} + # + # if os.path.exists(cookies_file): + # cookies_obj = cookielib.MozillaCookieJar(cookies_file) + # cookies_obj.load(ignore_discard=True, ignore_expires=True) future = request_method(url=url_probe, headers=headers, allow_redirects=allow_redirects, @@ -303,162 +450,13 @@ async def maigret(username, site_data, query_notify, logger, sem = asyncio.Semaphore(max_connections) tasks = [] - for social_network, net_info in site_data.items(): - future = asyncio.ensure_future(update_site_data_from_response(social_network, site_data, net_info, sem, logger)) + for sitename, result_obj in results_total.items(): + update_site_coro = update_site_data_from_response(sitename, site_data, result_obj, sem, logger, query_notify) + future = asyncio.ensure_future(update_site_coro) tasks.append(future) await asyncio.gather(*tasks) await session.close() - # TODO: split to separate functions - for social_network, net_info in site_data.items(): - - # Retrieve results again - results_site = results_total.get(social_network) - if not results_site: - continue - - # Retrieve other site information again - url = results_site.get("url_user") - logger.debug(url) - - status = results_site.get("status") - if status is not None: - # We have already determined the user doesn't exist here - continue - - # Get the expected error type - error_type = net_info["errorType"] - - # Get the failure messages and comments - failure_errors = net_info.get("errors", {}) - - # TODO: refactor - resp = net_info.get('resp') - if not resp: - logger.error(f'No response for {social_network}') - continue - - html_text, status_code, error_text, expection_text = resp - site_error_text = '?' - - # TODO: add elapsed request time counting - response_time = None - - if debug: - with open('debug.txt', 'a') as f: - status = status_code or 'No response' - f.write(f'url: {url}\nerror: {str(error_text)}\nr: {status}\n') - if html_text: - f.write(f'code: {status}\nresponse: {str(html_text)}\n') - - if status_code and not error_text: - error_text, site_error_text = detect_error_page(html_text, status_code, failure_errors, - 'ignore_403' in net_info) - - # presense flags - # True by default - presense_flags = net_info.get("presenseStrs", []) - is_presense_detected = html_text and all( - [(presense_flag in html_text) for presense_flag in presense_flags]) or not presense_flags - - if error_text is not None: - logger.debug(error_text) - result = QueryResult(username, - social_network, - url, - QueryStatus.UNKNOWN, - query_time=response_time, - context=f'{error_text}: {site_error_text}') - elif error_type == "message": - absence_flags = net_info.get("errorMsg") - is_absence_flags_list = isinstance(absence_flags, list) - absence_flags_set = set(absence_flags) if is_absence_flags_list else {absence_flags} - # Checks if the error message is in the HTML - is_absence_detected = any([(absence_flag in html_text) for absence_flag in absence_flags_set]) - if not is_absence_detected and is_presense_detected: - result = QueryResult(username, - social_network, - url, - QueryStatus.CLAIMED, - query_time=response_time) - else: - result = QueryResult(username, - social_network, - url, - QueryStatus.AVAILABLE, - query_time=response_time) - elif error_type == "status_code": - # Checks if the status code of the response is 2XX - if (not status_code >= 300 or status_code < 200) and is_presense_detected: - result = QueryResult(username, - social_network, - url, - QueryStatus.CLAIMED, - query_time=response_time) - else: - result = QueryResult(username, - social_network, - url, - QueryStatus.AVAILABLE, - query_time=response_time) - elif error_type == "response_url": - # For this detection method, we have turned off the redirect. - # So, there is no need to check the response URL: it will always - # match the request. Instead, we will ensure that the response - # code indicates that the request was successful (i.e. no 404, or - # forward to some odd redirect). - if 200 <= status_code < 300 and is_presense_detected: - result = QueryResult(username, - social_network, - url, - QueryStatus.CLAIMED, - query_time=response_time) - else: - result = QueryResult(username, - social_network, - url, - QueryStatus.AVAILABLE, - query_time=response_time) - else: - # It should be impossible to ever get here... - raise ValueError(f"Unknown Error Type '{error_type}' for " - f"site '{social_network}'") - - extracted_ids_data = {} - - if recursive_search and result.status == QueryStatus.CLAIMED: - try: - extracted_ids_data = extract(html_text) - except Exception as e: - logger.warning(f'Error while parsing {social_network}: {e}', exc_info=True) - - if extracted_ids_data: - new_usernames = {} - for k, v in extracted_ids_data.items(): - if 'username' in k: - new_usernames[v] = 'username' - if k in supported_recursive_search_ids: - new_usernames[v] = k - - results_site['ids_usernames'] = new_usernames - result.ids_data = extracted_ids_data - - is_similar = net_info.get('similarSearch', False) - # Notify caller about results of query. - query_notify.update(result, is_similar) - - # Save status of request - results_site['status'] = result - - # Save results from request - results_site['http_status'] = status_code - results_site['is_similar'] = is_similar - # results_site['response_text'] = html_text - results_site['rank'] = net_info.get('rank', 0) - - # Add this site's results into final dictionary with all of the other results. - results_total[social_network] = results_site - # Notify caller that all queries are finished. query_notify.finish() @@ -675,7 +673,7 @@ async def main(): ) args = parser.parse_args() - # Logging + # Logging log_level = logging.ERROR logging.basicConfig( format='[%(filename)s:%(lineno)d] %(levelname)-3s %(asctime)s %(message)s', @@ -816,7 +814,7 @@ async def main(): continue results = await maigret(username, - site_data, + dict(site_data), query_notify, proxy=args.proxy, timeout=args.timeout, @@ -842,7 +840,9 @@ async def main(): exists_counter = 0 for website_name in results: dictionary = results[website_name] - + # TODO: fix no site data issue + if not dictionary: + continue new_usernames = dictionary.get('ids_usernames') if new_usernames: for u, utype in new_usernames.items():