diff --git a/format.sh b/format.sh new file mode 100755 index 0000000..604e98e --- /dev/null +++ b/format.sh @@ -0,0 +1,5 @@ +#!/bin/sh +FILES="maigret wizard.py maigret.py" + +echo 'black' +black --skip-string-normalization $FILES \ No newline at end of file diff --git a/lint.sh b/lint.sh new file mode 100755 index 0000000..5fec05e --- /dev/null +++ b/lint.sh @@ -0,0 +1,11 @@ +#!/bin/sh +FILES="maigret wizard.py maigret.py" + +echo 'syntax errors or undefined names' +flake8 --count --select=E9,F63,F7,F82 --show-source --statistics $FILES + +echo 'warning' +flake8 --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics --ignore=E731,W503 $FILES + +echo 'mypy' +mypy ./maigret \ No newline at end of file diff --git a/maigret.py b/maigret.py index 8e77e13..76d98a8 100755 --- a/maigret.py +++ b/maigret.py @@ -15,4 +15,4 @@ def run(): if __name__ == "__main__": - run() \ No newline at end of file + run() diff --git a/maigret/__init__.py b/maigret/__init__.py index 38786e8..8ccc7e8 100644 --- a/maigret/__init__.py +++ b/maigret/__init__.py @@ -2,4 +2,4 @@ from .checking import maigret as search from .sites import MaigretEngine, MaigretSite, MaigretDatabase -from .notify import QueryNotifyPrint as Notifier \ No newline at end of file +from .notify import QueryNotifyPrint as Notifier diff --git a/maigret/activation.py b/maigret/activation.py index a4346ad..fe22a87 100644 --- a/maigret/activation.py +++ b/maigret/activation.py @@ -9,46 +9,48 @@ class ParsingActivator: @staticmethod def twitter(site, logger, cookies={}): headers = dict(site.headers) - del headers['x-guest-token'] - r = requests.post(site.activation['url'], headers=headers) + del headers["x-guest-token"] + r = requests.post(site.activation["url"], headers=headers) logger.info(r) j = r.json() - guest_token = j[site.activation['src']] - site.headers['x-guest-token'] = guest_token + guest_token = j[site.activation["src"]] + site.headers["x-guest-token"] = guest_token @staticmethod def vimeo(site, logger, cookies={}): headers = dict(site.headers) - if 'Authorization' in headers: - del headers['Authorization'] - r = requests.get(site.activation['url'], headers=headers) - jwt_token = r.json()['jwt'] - site.headers['Authorization'] = 'jwt ' + jwt_token + if "Authorization" in headers: + del headers["Authorization"] + r = requests.get(site.activation["url"], headers=headers) + jwt_token = r.json()["jwt"] + site.headers["Authorization"] = "jwt " + jwt_token @staticmethod def spotify(site, logger, cookies={}): headers = dict(site.headers) - if 'Authorization' in headers: - del headers['Authorization'] - r = requests.get(site.activation['url']) - bearer_token = r.json()['accessToken'] - site.headers['authorization'] = f'Bearer {bearer_token}' + if "Authorization" in headers: + del headers["Authorization"] + r = requests.get(site.activation["url"]) + bearer_token = r.json()["accessToken"] + site.headers["authorization"] = f"Bearer {bearer_token}" @staticmethod def xssis(site, logger, cookies={}): if not cookies: - logger.debug('You must have cookies to activate xss.is parsing!') + logger.debug("You must have cookies to activate xss.is parsing!") return headers = dict(site.headers) post_data = { - '_xfResponseType': 'json', - '_xfToken': '1611177919,a2710362e45dad9aa1da381e21941a38' + "_xfResponseType": "json", + "_xfToken": "1611177919,a2710362e45dad9aa1da381e21941a38", } - headers['content-type'] = 'application/x-www-form-urlencoded; charset=UTF-8' - r = requests.post(site.activation['url'], headers=headers, cookies=cookies, data=post_data) - csrf = r.json()['csrf'] - site.get_params['_xfToken'] = csrf + headers["content-type"] = "application/x-www-form-urlencoded; charset=UTF-8" + r = requests.post( + site.activation["url"], headers=headers, cookies=cookies, data=post_data + ) + csrf = r.json()["csrf"] + site.get_params["_xfToken"] = csrf async def import_aiohttp_cookies(cookiestxt_filename): @@ -62,8 +64,8 @@ async def import_aiohttp_cookies(cookiestxt_filename): for key, cookie in list(domain.values())[0].items(): c = Morsel() c.set(key, cookie.value, cookie.value) - c['domain'] = cookie.domain - c['path'] = cookie.path + c["domain"] = cookie.domain + c["path"] = cookie.path cookies_list.append((key, c)) cookies.update_cookies(cookies_list) diff --git a/maigret/checking.py b/maigret/checking.py index 07e39ac..91d880a 100644 --- a/maigret/checking.py +++ b/maigret/checking.py @@ -5,135 +5,138 @@ import re import ssl import sys import tqdm -import time +from typing import Tuple, Optional import aiohttp import tqdm.asyncio from aiohttp_socks import ProxyConnector -from mock import Mock from python_socks import _errors as proxy_errors from socid_extractor import extract from .activation import ParsingActivator, import_aiohttp_cookies +from . import errors +from .errors import CheckError from .executors import AsyncioSimpleExecutor, AsyncioProgressbarQueueExecutor from .result import QueryResult, QueryStatus from .sites import MaigretDatabase, MaigretSite -from .types import CheckError from .utils import get_random_user_agent supported_recursive_search_ids = ( - 'yandex_public_id', - 'gaia_id', - 'vk_id', - 'ok_id', - 'wikimapia_uid', - 'steam_id', - 'uidme_uguid', + "yandex_public_id", + "gaia_id", + "vk_id", + "ok_id", + "wikimapia_uid", + "steam_id", + "uidme_uguid", ) -common_errors = { - '
Мы не нашли страницу': CheckError('Resolving', 'MegaFon 404 page'), - 'Доступ к информационному ресурсу ограничен на основании Федерального закона': CheckError('Censorship', 'MGTS'), - 'Incapsula incident ID': CheckError('Bot protection', 'Incapsula'), -} - -unsupported_characters = '#' +unsupported_characters = "#" -async def get_response(request_future, site_name, logger) -> (str, int, CheckError): +async def get_response( + request_future, site_name, logger +) -> Tuple[str, int, Optional[CheckError]]: html_text = None status_code = 0 - error = CheckError('Error') + error: Optional[CheckError] = CheckError("Error") try: response = await request_future status_code = response.status response_content = await response.content.read() - charset = response.charset or 'utf-8' - decoded_content = response_content.decode(charset, 'ignore') + charset = response.charset or "utf-8" + decoded_content = response_content.decode(charset, "ignore") html_text = decoded_content if status_code == 0: - error = CheckError('Connection lost') + error = CheckError("Connection lost") else: error = None logger.debug(html_text) except asyncio.TimeoutError as e: - error = CheckError('Request timeout', str(e)) + error = CheckError("Request timeout", str(e)) except aiohttp.client_exceptions.ClientConnectorError as e: - error = CheckError('Connecting failure', str(e)) + error = CheckError("Connecting failure", str(e)) except aiohttp.http_exceptions.BadHttpMessage as e: - error = CheckError('HTTP', str(e)) + error = CheckError("HTTP", str(e)) except proxy_errors.ProxyError as e: - error = CheckError('Proxy', str(e)) + error = CheckError("Proxy", str(e)) + except KeyboardInterrupt: + error = CheckError("Interrupted") except Exception as e: # python-specific exceptions if sys.version_info.minor > 6: - if isinstance(e, ssl.SSLCertVerificationError) or isinstance(e, ssl.SSLError): - error = CheckError('SSL', str(e)) + if isinstance(e, ssl.SSLCertVerificationError) or isinstance( + e, ssl.SSLError + ): + error = CheckError("SSL", str(e)) else: - logger.warning(f'Unhandled error while requesting {site_name}: {e}') + logger.warning(f"Unhandled error while requesting {site_name}: {e}") logger.debug(e, exc_info=True) - error = CheckError('Error', str(e)) + error = CheckError("Error", str(e)) # TODO: return only needed information - return html_text, status_code, error + return str(html_text), status_code, error -async def update_site_dict_from_response(sitename, site_dict, results_info, logger, query_notify): +async def update_site_dict_from_response( + sitename, site_dict, results_info, logger, query_notify +): site_obj = site_dict[sitename] future = site_obj.request_future if not future: # ignore: search by incompatible id type return - response = await get_response(request_future=future, - site_name=sitename, - logger=logger) + response = await get_response( + request_future=future, site_name=sitename, logger=logger + ) - return sitename, process_site_result(response, query_notify, logger, results_info, site_obj) + return sitename, process_site_result( + response, query_notify, logger, results_info, site_obj + ) # TODO: move to separate class -def detect_error_page(html_text, status_code, fail_flags, ignore_403) -> CheckError: +def detect_error_page( + html_text, status_code, fail_flags, ignore_403 +) -> Optional[CheckError]: # Detect service restrictions such as a country restriction for flag, msg in fail_flags.items(): if flag in html_text: - return CheckError('Site-specific', msg) + return CheckError("Site-specific", msg) # Detect common restrictions such as provider censorship and bot protection - for flag, err in common_errors.items(): - if flag in html_text: - return err + err = errors.detect(html_text) + if err: + return err # Detect common site errors if status_code == 403 and not ignore_403: - return CheckError('Access denied', '403 status code, use proxy/vpn') + return CheckError("Access denied", "403 status code, use proxy/vpn") elif status_code >= 500: - return CheckError(f'Server', f'{status_code} status code') + return CheckError("Server", f"{status_code} status code") return None -def process_site_result(response, query_notify, logger, results_info, site: MaigretSite): +def process_site_result( + response, query_notify, logger, results_info, site: MaigretSite +): if not response: return results_info fulltags = site.tags # Retrieve other site information again - username = results_info['username'] - is_parsing_enabled = results_info['parsing_enabled'] + username = results_info["username"] + is_parsing_enabled = results_info["parsing_enabled"] url = results_info.get("url_user") logger.debug(url) @@ -147,7 +150,7 @@ def process_site_result(response, query_notify, logger, results_info, site: Maig # TODO: refactor if not response: - logger.error(f'No response for {site.name}') + logger.error(f"No response for {site.name}") return results_info html_text, status_code, check_error = response @@ -156,28 +159,34 @@ def process_site_result(response, query_notify, logger, results_info, site: Maig response_time = None if logger.level == logging.DEBUG: - with open('debug.txt', 'a') as f: - status = status_code or 'No response' - f.write(f'url: {url}\nerror: {check_error}\nr: {status}\n') + with open("debug.txt", "a") as f: + status = status_code or "No response" + f.write(f"url: {url}\nerror: {check_error}\nr: {status}\n") if html_text: - f.write(f'code: {status}\nresponse: {str(html_text)}\n') + f.write(f"code: {status}\nresponse: {str(html_text)}\n") # additional check for errors if status_code and not check_error: - check_error = detect_error_page(html_text, status_code, site.errors, site.ignore403) + check_error = detect_error_page( + html_text, status_code, site.errors, site.ignore403 + ) if site.activation and html_text: - is_need_activation = any([s for s in site.activation['marks'] if s in html_text]) + is_need_activation = any( + [s for s in site.activation["marks"] if s in html_text] + ) if is_need_activation: - method = site.activation['method'] + method = site.activation["method"] try: activate_fun = getattr(ParsingActivator(), method) # TODO: async call activate_fun(site, logger) except AttributeError: - logger.warning(f'Activation method {method} for site {site.name} not found!') + logger.warning( + f"Activation method {method} for site {site.name} not found!" + ) except Exception as e: - logger.warning(f'Failed activation {method} for site {site.name}: {e}') + logger.warning(f"Failed activation {method} for site {site.name}: {e}") site_name = site.pretty_name # presense flags @@ -187,56 +196,75 @@ def process_site_result(response, query_notify, logger, results_info, site: Maig if html_text: if not presense_flags: is_presense_detected = True - site.stats['presense_flag'] = None + site.stats["presense_flag"] = None else: for presense_flag in presense_flags: if presense_flag in html_text: is_presense_detected = True - site.stats['presense_flag'] = presense_flag + site.stats["presense_flag"] = presense_flag logger.debug(presense_flag) break if check_error: logger.debug(check_error) - result = QueryResult(username, - site_name, - url, - QueryStatus.UNKNOWN, - query_time=response_time, - error=check_error, - context=str(CheckError), tags=fulltags) + result = QueryResult( + username, + site_name, + url, + QueryStatus.UNKNOWN, + query_time=response_time, + error=check_error, + context=str(CheckError), + tags=fulltags, + ) elif check_type == "message": absence_flags = site.absence_strs is_absence_flags_list = isinstance(absence_flags, list) - absence_flags_set = set(absence_flags) if is_absence_flags_list else {absence_flags} + absence_flags_set = ( + set(absence_flags) if is_absence_flags_list else {absence_flags} + ) # Checks if the error message is in the HTML - is_absence_detected = any([(absence_flag in html_text) for absence_flag in absence_flags_set]) + is_absence_detected = any( + [(absence_flag in html_text) for absence_flag in absence_flags_set] + ) if not is_absence_detected and is_presense_detected: - result = QueryResult(username, - site_name, - url, - QueryStatus.CLAIMED, - query_time=response_time, tags=fulltags) + result = QueryResult( + username, + site_name, + url, + QueryStatus.CLAIMED, + query_time=response_time, + tags=fulltags, + ) else: - result = QueryResult(username, - site_name, - url, - QueryStatus.AVAILABLE, - query_time=response_time, tags=fulltags) + result = QueryResult( + username, + site_name, + url, + QueryStatus.AVAILABLE, + query_time=response_time, + tags=fulltags, + ) elif check_type == "status_code": # Checks if the status code of the response is 2XX if (not status_code >= 300 or status_code < 200) and is_presense_detected: - result = QueryResult(username, - site_name, - url, - QueryStatus.CLAIMED, - query_time=response_time, tags=fulltags) + result = QueryResult( + username, + site_name, + url, + QueryStatus.CLAIMED, + query_time=response_time, + tags=fulltags, + ) else: - result = QueryResult(username, - site_name, - url, - QueryStatus.AVAILABLE, - query_time=response_time, tags=fulltags) + result = QueryResult( + username, + site_name, + url, + QueryStatus.AVAILABLE, + query_time=response_time, + tags=fulltags, + ) elif check_type == "response_url": # For this detection method, we have turned off the redirect. # So, there is no need to check the response URL: it will always @@ -244,21 +272,28 @@ def process_site_result(response, query_notify, logger, results_info, site: Maig # code indicates that the request was successful (i.e. no 404, or # forward to some odd redirect). if 200 <= status_code < 300 and is_presense_detected: - result = QueryResult(username, - site_name, - url, - QueryStatus.CLAIMED, - query_time=response_time, tags=fulltags) + result = QueryResult( + username, + site_name, + url, + QueryStatus.CLAIMED, + query_time=response_time, + tags=fulltags, + ) else: - result = QueryResult(username, - site_name, - url, - QueryStatus.AVAILABLE, - query_time=response_time, tags=fulltags) + result = QueryResult( + username, + site_name, + url, + QueryStatus.AVAILABLE, + query_time=response_time, + tags=fulltags, + ) else: # It should be impossible to ever get here... - raise ValueError(f"Unknown check type '{check_type}' for " - f"site '{site.name}'") + raise ValueError( + f"Unknown check type '{check_type}' for " f"site '{site.name}'" + ) extracted_ids_data = {} @@ -266,39 +301,49 @@ def process_site_result(response, query_notify, logger, results_info, site: Maig try: extracted_ids_data = extract(html_text) except Exception as e: - logger.warning(f'Error while parsing {site.name}: {e}', exc_info=True) + logger.warning(f"Error while parsing {site.name}: {e}", exc_info=True) if extracted_ids_data: new_usernames = {} for k, v in extracted_ids_data.items(): - if 'username' in k: - new_usernames[v] = 'username' + if "username" in k: + new_usernames[v] = "username" if k in supported_recursive_search_ids: new_usernames[v] = k - results_info['ids_usernames'] = new_usernames - results_info['ids_links'] = eval(extracted_ids_data.get('links', '[]')) + results_info["ids_usernames"] = new_usernames + results_info["ids_links"] = eval(extracted_ids_data.get("links", "[]")) result.ids_data = extracted_ids_data # Notify caller about results of query. query_notify.update(result, site.similar_search) # Save status of request - results_info['status'] = result + results_info["status"] = result # Save results from request - results_info['http_status'] = status_code - results_info['is_similar'] = site.similar_search + results_info["http_status"] = status_code + results_info["is_similar"] = site.similar_search # results_site['response_text'] = html_text - results_info['rank'] = site.alexa_rank + results_info["rank"] = site.alexa_rank return results_info -async def maigret(username, site_dict, logger, query_notify=None, - proxy=None, timeout=None, is_parsing_enabled=False, - id_type='username', debug=False, forced=False, - max_connections=100, no_progressbar=False, - cookies=None): +async def maigret( + username, + site_dict, + logger, + query_notify=None, + proxy=None, + timeout=None, + is_parsing_enabled=False, + id_type="username", + debug=False, + forced=False, + max_connections=100, + no_progressbar=False, + cookies=None, +): """Main search func Checks for existence of username on certain sites. @@ -342,24 +387,28 @@ async def maigret(username, site_dict, logger, query_notify=None, query_notify.start(username, id_type) # TODO: connector - connector = ProxyConnector.from_url(proxy) if proxy else aiohttp.TCPConnector(ssl=False) + connector = ( + ProxyConnector.from_url(proxy) if proxy else aiohttp.TCPConnector(ssl=False) + ) # connector = aiohttp.TCPConnector(ssl=False) connector.verify_ssl = False cookie_jar = None if cookies: - logger.debug(f'Using cookies jar file {cookies}') + logger.debug(f"Using cookies jar file {cookies}") cookie_jar = await import_aiohttp_cookies(cookies) - session = aiohttp.ClientSession(connector=connector, trust_env=True, cookie_jar=cookie_jar) + session = aiohttp.ClientSession( + connector=connector, trust_env=True, cookie_jar=cookie_jar + ) if logger.level == logging.DEBUG: - future = session.get(url='https://icanhazip.com') + future = session.get(url="https://icanhazip.com") ip, status, check_error = await get_response(future, None, logger) if ip: - logger.debug(f'My IP is: {ip.strip()}') + logger.debug(f"My IP is: {ip.strip()}") else: - logger.debug(f'IP requesting {check_error[0]}: {check_error[1]}') + logger.debug(f"IP requesting {check_error[0]}: {check_error[1]}") # Results from analysis of all sites results_total = {} @@ -371,46 +420,45 @@ async def maigret(username, site_dict, logger, query_notify=None, continue if site.disabled and not forced: - logger.debug(f'Site {site.name} is disabled, skipping...') + logger.debug(f"Site {site.name} is disabled, skipping...") continue # Results from analysis of this specific site results_site = {} # Record URL of main site and username - results_site['username'] = username - results_site['parsing_enabled'] = is_parsing_enabled - results_site['url_main'] = site.url_main - results_site['cookies'] = cookie_jar and cookie_jar.filter_cookies(site.url_main) or None + results_site["username"] = username + results_site["parsing_enabled"] = is_parsing_enabled + results_site["url_main"] = site.url_main + results_site["cookies"] = ( + cookie_jar and cookie_jar.filter_cookies(site.url_main) or None + ) headers = { - 'User-Agent': get_random_user_agent(), + "User-Agent": get_random_user_agent(), } headers.update(site.headers) - if 'url' not in site.__dict__: - logger.error('No URL for site %s', site.name) + if "url" not in site.__dict__: + logger.error("No URL for site %s", site.name) # URL of user on site (if it exists) url = site.url.format( - urlMain=site.url_main, - urlSubpath=site.url_subpath, - username=username + urlMain=site.url_main, urlSubpath=site.url_subpath, username=username ) # workaround to prevent slash errors - url = re.sub('(? bool: +async def self_check( + db: MaigretDatabase, site_data: dict, logger, silent=False, max_connections=10 +) -> bool: sem = asyncio.Semaphore(max_connections) tasks = [] all_sites = site_data @@ -628,13 +672,15 @@ async def self_check(db: MaigretDatabase, site_data: dict, logger, silent=False, total_disabled = disabled_new_count - disabled_old_count if total_disabled >= 0: - message = 'Disabled' + message = "Disabled" else: - message = 'Enabled' + message = "Enabled" total_disabled *= -1 if not silent: print( - f'{message} {total_disabled} ({disabled_old_count} => {disabled_new_count}) checked sites. Run with `--info` flag to get more information') + f"{message} {total_disabled} ({disabled_old_count} => {disabled_new_count}) checked sites. " + "Run with `--info` flag to get more information" + ) return total_disabled != 0 diff --git a/maigret/errors.py b/maigret/errors.py new file mode 100644 index 0000000..4b6c64d --- /dev/null +++ b/maigret/errors.py @@ -0,0 +1,104 @@ +from typing import Dict, List, Any + +from .result import QueryResult + + +# error got as a result of completed search query +class CheckError: + _type = 'Unknown' + _desc = '' + + def __init__(self, typename, desc=''): + self._type = typename + self._desc = desc + + def __str__(self): + if not self._desc: + return f'{self._type} error' + + return f'{self._type} error: {self._desc}' + + @property + def type(self): + return self._type + + @property + def desc(self): + return self._desc + + +COMMON_ERRORS = { + '
Мы не нашли страницу': CheckError( + 'Resolving', 'MegaFon 404 page' + ), + 'Доступ к информационному ресурсу ограничен на основании Федерального закона': CheckError( + 'Censorship', 'MGTS' + ), + 'Incapsula incident ID': CheckError('Bot protection', 'Incapsula'), +} + +ERRORS_TYPES = { + 'Captcha': 'Try to switch to another IP address or to use service cookies', + 'Bot protection': 'Try to switch to another IP address', + 'Censorship': 'switch to another internet service provider', + 'Request timeout': 'Try to increase timeout or to switch to another internet service provider', +} + +THRESHOLD = 3 # percent + + +def is_important(err_data): + return err_data['perc'] >= THRESHOLD + + +def is_not_permanent(err_data): + return True + + +def detect(text): + for flag, err in COMMON_ERRORS.items(): + if flag in text: + return err + return None + + +def solution_of(err_type) -> str: + return ERRORS_TYPES.get(err_type, '') + + +def extract_and_group(search_res: dict) -> List[Dict[str, Any]]: + errors_counts: Dict[str, int] = {} + for r in search_res: + if r and isinstance(r, dict) and r.get('status'): + if not isinstance(r['status'], QueryResult): + continue + + err = r['status'].error + if not err: + continue + errors_counts[err.type] = errors_counts.get(err.type, 0) + 1 + + counts = [] + for err, count in sorted(errors_counts.items(), key=lambda x: x[1], reverse=True): + counts.append( + { + 'err': err, + 'count': count, + 'perc': round(count / len(search_res), 2) * 100, + } + ) + + return counts diff --git a/maigret/executors.py b/maigret/executors.py index f2695d5..d7dd598 100644 --- a/maigret/executors.py +++ b/maigret/executors.py @@ -2,7 +2,7 @@ import asyncio import time import tqdm import sys -from typing import Iterable +from typing import Iterable, Any, List from .types import QueryDraft @@ -100,14 +100,13 @@ class AsyncioProgressbarQueueExecutor(AsyncExecutor): self.queue.task_done() async def _run(self, queries: Iterable[QueryDraft]): - self.results = [] + self.results: List[Any] = [] queries_list = list(queries) min_workers = min(len(queries_list), self.workers_count) - workers = [create_task_func()(self.worker()) - for _ in range(min_workers)] + workers = [create_task_func()(self.worker()) for _ in range(min_workers)] self.progress = self.progress_func(total=len(queries_list)) for t in queries_list: diff --git a/maigret/maigret.py b/maigret/maigret.py index 7b3a871..8aabd4c 100755 --- a/maigret/maigret.py +++ b/maigret/maigret.py @@ -12,11 +12,26 @@ from argparse import ArgumentParser, RawDescriptionHelpFormatter import requests from socid_extractor import extract, parse, __version__ as socid_version -from .checking import timeout_check, supported_recursive_search_ids, self_check, unsupported_characters, maigret +from .checking import ( + timeout_check, + supported_recursive_search_ids, + self_check, + unsupported_characters, + maigret, +) +from . import errors from .notify import QueryNotifyPrint -from .report import save_csv_report, save_xmind_report, save_html_report, save_pdf_report, \ - generate_report_context, save_txt_report, SUPPORTED_JSON_REPORT_FORMATS, check_supported_json_format, \ - save_json_report +from .report import ( + save_csv_report, + save_xmind_report, + save_html_report, + save_pdf_report, + generate_report_context, + save_txt_report, + SUPPORTED_JSON_REPORT_FORMATS, + check_supported_json_format, + save_json_report, +) from .sites import MaigretDatabase from .submit import submit_dialog from .utils import get_dict_ascii_tree @@ -24,168 +39,301 @@ from .utils import get_dict_ascii_tree __version__ = '0.1.19' -async def main(): - version_string = '\n'.join([ - f'%(prog)s {__version__}', - f'Socid-extractor: {socid_version}', - f'Aiohttp: {aiohttp.__version__}', - f'Requests: {requests.__version__}', - f'Python: {platform.python_version()}', - ]) +def notify_about_errors(search_results, query_notify): + errs = errors.extract_and_group(search_results.values()) + was_errs_displayed = False + for e in errs: + if not errors.is_important(e): + continue + text = f'Too many errors of type "{e["err"]}" ({e["perc"]}%)' + solution = errors.solution_of(e['err']) + if solution: + text = '. '.join([text, solution]) - parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, - description=f"Maigret v{__version__}" - ) - parser.add_argument("--version", - action="version", version=version_string, - help="Display version information and dependencies." - ) - parser.add_argument("--info", "-vv", - action="store_true", dest="info", default=False, - help="Display service information." - ) - parser.add_argument("--verbose", "-v", - action="store_true", dest="verbose", default=False, - help="Display extra information and metrics." - ) - parser.add_argument("-d", "--debug", "-vvv", - action="store_true", dest="debug", default=False, - help="Saving debugging information and sites responses in debug.txt." - ) - parser.add_argument("--site", - action="append", metavar='SITE_NAME', - dest="site_list", default=[], - help="Limit analysis to just the listed sites (use several times to specify more than one)" - ) - parser.add_argument("--proxy", "-p", metavar='PROXY_URL', - action="store", dest="proxy", default=None, - help="Make requests over a proxy. e.g. socks5://127.0.0.1:1080" - ) - parser.add_argument("--db", metavar="DB_FILE", - dest="db_file", default=None, - help="Load Maigret database from a JSON file or an online, valid, JSON file.") - parser.add_argument("--cookies-jar-file", metavar="COOKIE_FILE", - dest="cookie_file", default=None, - help="File with cookies.") - parser.add_argument("--timeout", - action="store", metavar='TIMEOUT', - dest="timeout", type=timeout_check, default=10, - help="Time (in seconds) to wait for response to requests." - "Default timeout of 10.0s. " - "A longer timeout will be more likely to get results from slow sites." - "On the other hand, this may cause a long delay to gather all results." - ) - parser.add_argument("-n", "--max-connections", - action="store", type=int, - dest="connections", default=100, - help="Allowed number of concurrent connections." - ) - parser.add_argument("-a", "--all-sites", - action="store_true", dest="all_sites", default=False, - help="Use all sites for scan." - ) - parser.add_argument("--top-sites", - action="store", default=500, type=int, - help="Count of sites for scan ranked by Alexa Top (default: 500)." - ) - parser.add_argument("--print-not-found", - action="store_true", dest="print_not_found", default=False, - help="Print sites where the username was not found." - ) - parser.add_argument("--print-errors", - action="store_true", dest="print_check_errors", default=False, - help="Print errors messages: connection, captcha, site country ban, etc." - ) - parser.add_argument("--submit", metavar='EXISTING_USER_URL', - type=str, dest="new_site_to_submit", default=False, - help="URL of existing profile in new site to submit." - ) - parser.add_argument("--no-color", - action="store_true", dest="no_color", default=False, - help="Don't color terminal output" - ) - parser.add_argument("--no-progressbar", - action="store_true", dest="no_progressbar", default=False, - help="Don't show progressbar." - ) - parser.add_argument("--browse", "-b", - action="store_true", dest="browse", default=False, - help="Browse to all results on default bowser." - ) - parser.add_argument("--no-recursion", - action="store_true", dest="disable_recursive_search", default=False, - help="Disable recursive search by additional data extracted from pages." - ) - parser.add_argument("--no-extracting", - action="store_true", dest="disable_extracting", default=False, - help="Disable parsing pages for additional data and other usernames." - ) - parser.add_argument("--self-check", - action="store_true", default=False, - help="Do self check for sites and database and disable non-working ones." - ) - parser.add_argument("--stats", - action="store_true", default=False, - help="Show database statistics." - ) - parser.add_argument("--use-disabled-sites", - action="store_true", default=False, - help="Use disabled sites to search (may cause many false positives)." - ) - parser.add_argument("--parse", - dest="parse_url", default='', - help="Parse page by URL and extract username and IDs to use for search." - ) - parser.add_argument("--id-type", - dest="id_type", default='username', - help="Specify identifier(s) type (default: username)." - ) - parser.add_argument("--ignore-ids", - action="append", metavar='IGNORED_IDS', - dest="ignore_ids_list", default=[], - help="Do not make search by the specified username or other ids." - ) - parser.add_argument("username", - nargs='+', metavar='USERNAMES', - action="store", - help="One or more usernames to check with social networks." - ) - parser.add_argument("--tags", - dest="tags", default='', - help="Specify tags of sites." - ) + query_notify.warning(text, '!') + was_errs_displayed = True + + if was_errs_displayed: + query_notify.warning( + 'You can see detailed site check errors with a flag `--print-errors`' + ) + + +async def main(): + version_string = '\n'.join( + [ + f'%(prog)s {__version__}', + f'Socid-extractor: {socid_version}', + f'Aiohttp: {aiohttp.__version__}', + f'Requests: {requests.__version__}', + f'Python: {platform.python_version()}', + ] + ) + + parser = ArgumentParser( + formatter_class=RawDescriptionHelpFormatter, + description=f"Maigret v{__version__}", + ) + parser.add_argument( + "--version", + action="version", + version=version_string, + help="Display version information and dependencies.", + ) + parser.add_argument( + "--info", + "-vv", + action="store_true", + dest="info", + default=False, + help="Display service information.", + ) + parser.add_argument( + "--verbose", + "-v", + action="store_true", + dest="verbose", + default=False, + help="Display extra information and metrics.", + ) + parser.add_argument( + "-d", + "--debug", + "-vvv", + action="store_true", + dest="debug", + default=False, + help="Saving debugging information and sites responses in debug.txt.", + ) + parser.add_argument( + "--site", + action="append", + metavar='SITE_NAME', + dest="site_list", + default=[], + help="Limit analysis to just the listed sites (use several times to specify more than one)", + ) + parser.add_argument( + "--proxy", + "-p", + metavar='PROXY_URL', + action="store", + dest="proxy", + default=None, + help="Make requests over a proxy. e.g. socks5://127.0.0.1:1080", + ) + parser.add_argument( + "--db", + metavar="DB_FILE", + dest="db_file", + default=None, + help="Load Maigret database from a JSON file or an online, valid, JSON file.", + ) + parser.add_argument( + "--cookies-jar-file", + metavar="COOKIE_FILE", + dest="cookie_file", + default=None, + help="File with cookies.", + ) + parser.add_argument( + "--timeout", + action="store", + metavar='TIMEOUT', + dest="timeout", + type=timeout_check, + default=30, + help="Time (in seconds) to wait for response to requests. " + "Default timeout of 30.0s. " + "A longer timeout will be more likely to get results from slow sites. " + "On the other hand, this may cause a long delay to gather all results. ", + ) + parser.add_argument( + "-n", + "--max-connections", + action="store", + type=int, + dest="connections", + default=100, + help="Allowed number of concurrent connections.", + ) + parser.add_argument( + "-a", + "--all-sites", + action="store_true", + dest="all_sites", + default=False, + help="Use all sites for scan.", + ) + parser.add_argument( + "--top-sites", + action="store", + default=500, + type=int, + help="Count of sites for scan ranked by Alexa Top (default: 500).", + ) + parser.add_argument( + "--print-not-found", + action="store_true", + dest="print_not_found", + default=False, + help="Print sites where the username was not found.", + ) + parser.add_argument( + "--print-errors", + action="store_true", + dest="print_check_errors", + default=False, + help="Print errors messages: connection, captcha, site country ban, etc.", + ) + parser.add_argument( + "--submit", + metavar='EXISTING_USER_URL', + type=str, + dest="new_site_to_submit", + default=False, + help="URL of existing profile in new site to submit.", + ) + parser.add_argument( + "--no-color", + action="store_true", + dest="no_color", + default=False, + help="Don't color terminal output", + ) + parser.add_argument( + "--no-progressbar", + action="store_true", + dest="no_progressbar", + default=False, + help="Don't show progressbar.", + ) + parser.add_argument( + "--browse", + "-b", + action="store_true", + dest="browse", + default=False, + help="Browse to all results on default bowser.", + ) + parser.add_argument( + "--no-recursion", + action="store_true", + dest="disable_recursive_search", + default=False, + help="Disable recursive search by additional data extracted from pages.", + ) + parser.add_argument( + "--no-extracting", + action="store_true", + dest="disable_extracting", + default=False, + help="Disable parsing pages for additional data and other usernames.", + ) + parser.add_argument( + "--self-check", + action="store_true", + default=False, + help="Do self check for sites and database and disable non-working ones.", + ) + parser.add_argument( + "--stats", action="store_true", default=False, help="Show database statistics." + ) + parser.add_argument( + "--use-disabled-sites", + action="store_true", + default=False, + help="Use disabled sites to search (may cause many false positives).", + ) + parser.add_argument( + "--parse", + dest="parse_url", + default='', + help="Parse page by URL and extract username and IDs to use for search.", + ) + parser.add_argument( + "--id-type", + dest="id_type", + default='username', + help="Specify identifier(s) type (default: username).", + ) + parser.add_argument( + "--ignore-ids", + action="append", + metavar='IGNORED_IDS', + dest="ignore_ids_list", + default=[], + help="Do not make search by the specified username or other ids.", + ) + parser.add_argument( + "username", + nargs='+', + metavar='USERNAMES', + action="store", + help="One or more usernames to check with social networks.", + ) + parser.add_argument( + "--tags", dest="tags", default='', help="Specify tags of sites." + ) # reports options - parser.add_argument("--folderoutput", "-fo", dest="folderoutput", default="reports", - help="If using multiple usernames, the output of the results will be saved to this folder." - ) - parser.add_argument("-T", "--txt", - action="store_true", dest="txt", default=False, - help="Create a TXT report (one report per username)." - ) - parser.add_argument("-C", "--csv", - action="store_true", dest="csv", default=False, - help="Create a CSV report (one report per username)." - ) - parser.add_argument("-H", "--html", - action="store_true", dest="html", default=False, - help="Create an HTML report file (general report on all usernames)." - ) - parser.add_argument("-X", "--xmind", - action="store_true", - dest="xmind", default=False, - help="Generate an XMind 8 mindmap report (one report per username)." - ) - parser.add_argument("-P", "--pdf", - action="store_true", - dest="pdf", default=False, - help="Generate a PDF report (general report on all usernames)." - ) - parser.add_argument("-J", "--json", - action="store", metavar='REPORT_TYPE', - dest="json", default='', type=check_supported_json_format, - help=f"Generate a JSON report of specific type: {', '.join(SUPPORTED_JSON_REPORT_FORMATS)}" - " (one report per username)." - ) + parser.add_argument( + "--folderoutput", + "-fo", + dest="folderoutput", + default="reports", + help="If using multiple usernames, the output of the results will be saved to this folder.", + ) + parser.add_argument( + "-T", + "--txt", + action="store_true", + dest="txt", + default=False, + help="Create a TXT report (one report per username).", + ) + parser.add_argument( + "-C", + "--csv", + action="store_true", + dest="csv", + default=False, + help="Create a CSV report (one report per username).", + ) + parser.add_argument( + "-H", + "--html", + action="store_true", + dest="html", + default=False, + help="Create an HTML report file (general report on all usernames).", + ) + parser.add_argument( + "-X", + "--xmind", + action="store_true", + dest="xmind", + default=False, + help="Generate an XMind 8 mindmap report (one report per username).", + ) + parser.add_argument( + "-P", + "--pdf", + action="store_true", + dest="pdf", + default=False, + help="Generate a PDF report (general report on all usernames).", + ) + parser.add_argument( + "-J", + "--json", + action="store", + metavar='REPORT_TYPE', + dest="json", + default='', + type=check_supported_json_format, + help=f"Generate a JSON report of specific type: {', '.join(SUPPORTED_JSON_REPORT_FORMATS)}" + " (one report per username).", + ) args = parser.parse_args() @@ -194,7 +342,7 @@ async def main(): logging.basicConfig( format='[%(filename)s:%(lineno)d] %(levelname)-3s %(asctime)s %(message)s', datefmt='%H:%M:%S', - level=log_level + level=log_level, ) if args.debug: @@ -211,8 +359,7 @@ async def main(): usernames = { u: args.id_type for u in args.username - if u not in ['-'] - and u not in args.ignore_ids_list + if u not in ['-'] and u not in args.ignore_ids_list } parsing_enabled = not args.disable_extracting @@ -228,8 +375,10 @@ async def main(): try: # temporary workaround for URL mutations MVP from socid_extractor import mutate_url + reqs += list(mutate_url(args.parse_url)) - except: + except Exception as e: + logger.warning(e) pass for req in reqs: @@ -251,38 +400,47 @@ async def main(): args.tags = list(set(str(args.tags).split(','))) if args.db_file is None: - args.db_file = \ - os.path.join(os.path.dirname(os.path.realpath(__file__)), - "resources/data.json" - ) + args.db_file = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "resources/data.json" + ) if args.top_sites == 0 or args.all_sites: args.top_sites = sys.maxsize # Create notify object for query results. - query_notify = QueryNotifyPrint(result=None, - verbose=args.verbose, - print_found_only=not args.print_not_found, - skip_check_errors=not args.print_check_errors, - color=not args.no_color) + query_notify = QueryNotifyPrint( + result=None, + verbose=args.verbose, + print_found_only=not args.print_not_found, + skip_check_errors=not args.print_check_errors, + color=not args.no_color, + ) # Create object with all information about sites we are aware of. db = MaigretDatabase().load_from_file(args.db_file) - get_top_sites_for_id = lambda x: db.ranked_sites_dict(top=args.top_sites, tags=args.tags, - names=args.site_list, - disabled=False, id_type=x) + get_top_sites_for_id = lambda x: db.ranked_sites_dict( + top=args.top_sites, + tags=args.tags, + names=args.site_list, + disabled=False, + id_type=x, + ) site_data = get_top_sites_for_id(args.id_type) if args.new_site_to_submit: - is_submitted = await submit_dialog(db, args.new_site_to_submit, args.cookie_file, logger) + is_submitted = await submit_dialog( + db, args.new_site_to_submit, args.cookie_file, logger + ) if is_submitted: db.save_to_file(args.db_file) # Database self-checking if args.self_check: print('Maigret sites database self-checking...') - is_need_update = await self_check(db, site_data, logger, max_connections=args.connections) + is_need_update = await self_check( + db, site_data, logger, max_connections=args.connections + ) if is_need_update: if input('Do you want to save changes permanently? [Yn]\n').lower() == 'y': db.save_to_file(args.db_file) @@ -314,9 +472,13 @@ async def main(): query_notify.warning('No sites to check, exiting!') sys.exit(2) else: - query_notify.warning(f'Starting a search on top {len(site_data)} sites from the Maigret database...') + query_notify.warning( + f'Starting a search on top {len(site_data)} sites from the Maigret database...' + ) if not args.all_sites: - query_notify.warning(f'You can run search by full list of sites with flag `-a`', '!') + query_notify.warning( + 'You can run search by full list of sites with flag `-a`', '!' + ) already_checked = set() general_results = [] @@ -331,34 +493,44 @@ async def main(): already_checked.add(username.lower()) if username in args.ignore_ids_list: - query_notify.warning(f'Skip a search by username {username} cause it\'s marked as ignored.') + query_notify.warning( + f'Skip a search by username {username} cause it\'s marked as ignored.' + ) continue # check for characters do not supported by sites generally - found_unsupported_chars = set(unsupported_characters).intersection(set(username)) + found_unsupported_chars = set(unsupported_characters).intersection( + set(username) + ) if found_unsupported_chars: - pretty_chars_str = ','.join(map(lambda s: f'"{s}"', found_unsupported_chars)) + pretty_chars_str = ','.join( + map(lambda s: f'"{s}"', found_unsupported_chars) + ) query_notify.warning( - f'Found unsupported URL characters: {pretty_chars_str}, skip search by username "{username}"') + f'Found unsupported URL characters: {pretty_chars_str}, skip search by username "{username}"' + ) continue sites_to_check = get_top_sites_for_id(id_type) - results = await maigret(username=username, - site_dict=dict(sites_to_check), - query_notify=query_notify, - proxy=args.proxy, - timeout=args.timeout, - is_parsing_enabled=parsing_enabled, - id_type=id_type, - debug=args.verbose, - logger=logger, - cookies=args.cookie_file, - forced=args.use_disabled_sites, - max_connections=args.connections, - no_progressbar=args.no_progressbar, - ) + results = await maigret( + username=username, + site_dict=dict(sites_to_check), + query_notify=query_notify, + proxy=args.proxy, + timeout=args.timeout, + is_parsing_enabled=parsing_enabled, + id_type=id_type, + debug=args.verbose, + logger=logger, + cookies=args.cookie_file, + forced=args.use_disabled_sites, + max_connections=args.connections, + no_progressbar=args.no_progressbar, + ) + + notify_about_errors(results, query_notify) general_results.append((username, id_type, results)) @@ -397,9 +569,13 @@ async def main(): query_notify.warning(f'TXT report for {username} saved in {filename}') if args.json: - filename = report_filepath_tpl.format(username=username, postfix=f'_{args.json}.json') + filename = report_filepath_tpl.format( + username=username, postfix=f'_{args.json}.json' + ) save_json_report(filename, username, results, report_type=args.json) - query_notify.warning(f'JSON {args.json} report for {username} saved in {filename}') + query_notify.warning( + f'JSON {args.json} report for {username} saved in {filename}' + ) # reporting for all the result if general_results: diff --git a/maigret/notify.py b/maigret/notify.py index aff58e7..a6290c5 100644 --- a/maigret/notify.py +++ b/maigret/notify.py @@ -11,7 +11,7 @@ from .result import QueryStatus from .utils import get_dict_ascii_tree -class QueryNotify(): +class QueryNotify: """Query Notify Object. Base class that describes methods available to notify the results of @@ -39,7 +39,7 @@ class QueryNotify(): return - def start(self, message=None, id_type='username'): + def start(self, message=None, id_type="username"): """Notify Start. Notify method for start of query. This method will be called before @@ -116,8 +116,14 @@ class QueryNotifyPrint(QueryNotify): Query notify class that prints results. """ - def __init__(self, result=None, verbose=False, print_found_only=False, - skip_check_errors=False, color=True): + def __init__( + self, + result=None, + verbose=False, + print_found_only=False, + skip_check_errors=False, + color=True, + ): """Create Query Notify Print Object. Contains information about a specific method of notifying the results @@ -162,22 +168,29 @@ class QueryNotifyPrint(QueryNotify): title = f"Checking {id_type}" if self.color: - print(Style.BRIGHT + Fore.GREEN + "[" + - Fore.YELLOW + "*" + - Fore.GREEN + f"] {title}" + - Fore.WHITE + f" {message}" + - Fore.GREEN + " on:") + print( + Style.BRIGHT + + Fore.GREEN + + "[" + + Fore.YELLOW + + "*" + + Fore.GREEN + + f"] {title}" + + Fore.WHITE + + f" {message}" + + Fore.GREEN + + " on:" + ) else: print(f"[*] {title} {message} on:") - def warning(self, message, symbol='-'): - msg = f'[{symbol}] {message}' + def warning(self, message, symbol="-"): + msg = f"[{symbol}] {message}" if self.color: print(Style.BRIGHT + Fore.YELLOW + msg) else: print(msg) - def update(self, result, is_similar=False): """Notify Update. @@ -196,18 +209,20 @@ class QueryNotifyPrint(QueryNotify): if not self.result.ids_data: ids_data_text = "" else: - ids_data_text = get_dict_ascii_tree(self.result.ids_data.items(), ' ') + ids_data_text = get_dict_ascii_tree(self.result.ids_data.items(), " ") - def make_colored_terminal_notify(status, text, status_color, text_color, appendix): + def make_colored_terminal_notify( + status, text, status_color, text_color, appendix + ): text = [ - f'{Style.BRIGHT}{Fore.WHITE}[{status_color}{status}{Fore.WHITE}]' + - f'{text_color} {text}: {Style.RESET_ALL}' + - f'{appendix}' + f"{Style.BRIGHT}{Fore.WHITE}[{status_color}{status}{Fore.WHITE}]" + + f"{text_color} {text}: {Style.RESET_ALL}" + + f"{appendix}" ] - return ''.join(text) + return "".join(text) def make_simple_terminal_notify(status, text, appendix): - return f'[{status}] {text}: {appendix}' + return f"[{status}] {text}: {appendix}" def make_terminal_notify(is_colored=True, *args): if is_colored: @@ -220,45 +235,55 @@ class QueryNotifyPrint(QueryNotify): # Output to the terminal is desired. if result.status == QueryStatus.CLAIMED: color = Fore.BLUE if is_similar else Fore.GREEN - status = '?' if is_similar else '+' + status = "?" if is_similar else "+" notify = make_terminal_notify( self.color, - status, result.site_name, - color, color, - result.site_url_user + ids_data_text + status, + result.site_name, + color, + color, + result.site_url_user + ids_data_text, ) elif result.status == QueryStatus.AVAILABLE: if not self.print_found_only: notify = make_terminal_notify( self.color, - '-', result.site_name, - Fore.RED, Fore.YELLOW, - 'Not found!' + ids_data_text + "-", + result.site_name, + Fore.RED, + Fore.YELLOW, + "Not found!" + ids_data_text, ) elif result.status == QueryStatus.UNKNOWN: if not self.skip_check_errors: notify = make_terminal_notify( self.color, - '?', result.site_name, - Fore.RED, Fore.RED, - str(self.result.error) + ids_data_text + "?", + result.site_name, + Fore.RED, + Fore.RED, + str(self.result.error) + ids_data_text, ) elif result.status == QueryStatus.ILLEGAL: if not self.print_found_only: - text = 'Illegal Username Format For This Site!' + text = "Illegal Username Format For This Site!" notify = make_terminal_notify( self.color, - '-', result.site_name, - Fore.RED, Fore.YELLOW, - text + ids_data_text + "-", + result.site_name, + Fore.RED, + Fore.YELLOW, + text + ids_data_text, ) else: # It should be impossible to ever get here... - raise ValueError(f"Unknown Query Status '{str(result.status)}' for " - f"site '{self.result.site_name}'") + raise ValueError( + f"Unknown Query Status '{str(result.status)}' for " + f"site '{self.result.site_name}'" + ) if notify: - sys.stdout.write('\x1b[1K\r') + sys.stdout.write("\x1b[1K\r") print(notify) return diff --git a/maigret/report.py b/maigret/report.py index 3a5cb94..8cefa58 100644 --- a/maigret/report.py +++ b/maigret/report.py @@ -5,6 +5,7 @@ import logging import os from argparse import ArgumentTypeError from datetime import datetime +from typing import Dict, Any import pycountry import xmind @@ -16,83 +17,85 @@ from .result import QueryStatus from .utils import is_country_tag, CaseConverter, enrich_link_str SUPPORTED_JSON_REPORT_FORMATS = [ - 'simple', - 'ndjson', + "simple", + "ndjson", ] -''' +""" UTILS -''' +""" def filter_supposed_data(data): - ### interesting fields - allowed_fields = ['fullname', 'gender', 'location', 'age'] - filtered_supposed_data = {CaseConverter.snake_to_title(k): v[0] - for k, v in data.items() - if k in allowed_fields} + # interesting fields + allowed_fields = ["fullname", "gender", "location", "age"] + filtered_supposed_data = { + CaseConverter.snake_to_title(k): v[0] + for k, v in data.items() + if k in allowed_fields + } return filtered_supposed_data -''' +""" REPORTS SAVING -''' +""" def save_csv_report(filename: str, username: str, results: dict): - with open(filename, 'w', newline='', encoding='utf-8') as f: + with open(filename, "w", newline="", encoding="utf-8") as f: generate_csv_report(username, results, f) def save_txt_report(filename: str, username: str, results: dict): - with open(filename, 'w', encoding='utf-8') as f: + with open(filename, "w", encoding="utf-8") as f: generate_txt_report(username, results, f) def save_html_report(filename: str, context: dict): template, _ = generate_report_template(is_pdf=False) filled_template = template.render(**context) - with open(filename, 'w') as f: + with open(filename, "w") as f: f.write(filled_template) def save_pdf_report(filename: str, context: dict): template, css = generate_report_template(is_pdf=True) filled_template = template.render(**context) - with open(filename, 'w+b') as f: + with open(filename, "w+b") as f: pisa.pisaDocument(io.StringIO(filled_template), dest=f, default_css=css) def save_json_report(filename: str, username: str, results: dict, report_type: str): - with open(filename, 'w', encoding='utf-8') as f: + with open(filename, "w", encoding="utf-8") as f: generate_json_report(username, results, f, report_type=report_type) -''' +""" REPORTS GENERATING -''' +""" def generate_report_template(is_pdf: bool): """ - HTML/PDF template generation + HTML/PDF template generation """ def get_resource_content(filename): - return open(os.path.join(maigret_path, 'resources', filename)).read() + return open(os.path.join(maigret_path, "resources", filename)).read() maigret_path = os.path.dirname(os.path.realpath(__file__)) if is_pdf: - template_content = get_resource_content('simple_report_pdf.tpl') - css_content = get_resource_content('simple_report_pdf.css') + template_content = get_resource_content("simple_report_pdf.tpl") + css_content = get_resource_content("simple_report_pdf.css") else: - template_content = get_resource_content('simple_report.tpl') + template_content = get_resource_content("simple_report.tpl") css_content = None template = Template(template_content) - template.globals['title'] = CaseConverter.snake_to_title - template.globals['detect_link'] = enrich_link_str + template.globals["title"] = CaseConverter.snake_to_title # type: ignore + template.globals["detect_link"] = enrich_link_str # type: ignore return template, css_content @@ -100,15 +103,15 @@ def generate_report_context(username_results: list): brief_text = [] usernames = {} extended_info_count = 0 - tags = {} - supposed_data = {} + tags: Dict[str, int] = {} + supposed_data: Dict[str, Any] = {} first_seen = None for username, id_type, results in username_results: found_accounts = 0 new_ids = [] - usernames[username] = {'type': id_type} + usernames[username] = {"type": id_type} for website_name in results: dictionary = results[website_name] @@ -116,19 +119,19 @@ def generate_report_context(username_results: list): if not dictionary: continue - if dictionary.get('is_similar'): + if dictionary.get("is_similar"): continue - status = dictionary.get('status') + status = dictionary.get("status") if not status: # FIXME: currently in case of timeout continue if status.ids_data: - dictionary['ids_data'] = status.ids_data + dictionary["ids_data"] = status.ids_data extended_info_count += 1 # detect first seen - created_at = status.ids_data.get('created_at') + created_at = status.ids_data.get("created_at") if created_at: if first_seen is None: first_seen = created_at @@ -138,37 +141,46 @@ def generate_report_context(username_results: list): new_time = parse_datetime_str(created_at) if new_time < known_time: first_seen = created_at - except: - logging.debug('Problems with converting datetime %s/%s', first_seen, created_at) + except Exception as e: + logging.debug( + "Problems with converting datetime %s/%s: %s", + first_seen, + created_at, + str(e), + ) for k, v in status.ids_data.items(): # suppose target data - field = 'fullname' if k == 'name' else k - if not field in supposed_data: + field = "fullname" if k == "name" else k + if field not in supposed_data: supposed_data[field] = [] supposed_data[field].append(v) # suppose country - if k in ['country', 'locale']: + if k in ["country", "locale"]: try: if is_country_tag(k): tag = pycountry.countries.get(alpha_2=v).alpha_2.lower() else: - tag = pycountry.countries.search_fuzzy(v)[0].alpha_2.lower() + tag = pycountry.countries.search_fuzzy(v)[ + 0 + ].alpha_2.lower() # TODO: move countries to another struct tags[tag] = tags.get(tag, 0) + 1 except Exception as e: - logging.debug('pycountry exception', exc_info=True) + logging.debug( + "Pycountry exception: %s", str(e), exc_info=True + ) - new_usernames = dictionary.get('ids_usernames') + new_usernames = dictionary.get("ids_usernames") if new_usernames: for u, utype in new_usernames.items(): - if not u in usernames: + if u not in usernames: new_ids.append((u, utype)) - usernames[u] = {'type': utype} + usernames[u] = {"type": utype} if status.status == QueryStatus.CLAIMED: found_accounts += 1 - dictionary['found'] = True + dictionary["found"] = True else: continue @@ -177,22 +189,24 @@ def generate_report_context(username_results: list): for t in status.tags: tags[t] = tags.get(t, 0) + 1 - brief_text.append(f'Search by {id_type} {username} returned {found_accounts} accounts.') + brief_text.append( + f"Search by {id_type} {username} returned {found_accounts} accounts." + ) if new_ids: ids_list = [] for u, t in new_ids: - ids_list.append(f'{u} ({t})' if t != 'username' else u) - brief_text.append(f'Found target\'s other IDs: ' + ', '.join(ids_list) + '.') + ids_list.append(f"{u} ({t})" if t != "username" else u) + brief_text.append("Found target's other IDs: " + ", ".join(ids_list) + ".") - brief_text.append(f'Extended info extracted from {extended_info_count} accounts.') + brief_text.append(f"Extended info extracted from {extended_info_count} accounts.") - brief = ' '.join(brief_text).strip() + brief = " ".join(brief_text).strip() tuple_sort = lambda d: sorted(d, key=lambda x: x[1], reverse=True) - if 'global' in tags: + if "global" in tags: # remove tag 'global' useless for country detection - del tags['global'] + del tags["global"] first_username = username_results[0][0] countries_lists = list(filter(lambda x: is_country_tag(x[0]), tags.items())) @@ -201,35 +215,33 @@ def generate_report_context(username_results: list): filtered_supposed_data = filter_supposed_data(supposed_data) return { - 'username': first_username, - 'brief': brief, - 'results': username_results, - 'first_seen': first_seen, - 'interests_tuple_list': tuple_sort(interests_list), - 'countries_tuple_list': tuple_sort(countries_lists), - 'supposed_data': filtered_supposed_data, - 'generated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + "username": first_username, + "brief": brief, + "results": username_results, + "first_seen": first_seen, + "interests_tuple_list": tuple_sort(interests_list), + "countries_tuple_list": tuple_sort(countries_lists), + "supposed_data": filtered_supposed_data, + "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } def generate_csv_report(username: str, results: dict, csvfile): writer = csv.writer(csvfile) - writer.writerow(['username', - 'name', - 'url_main', - 'url_user', - 'exists', - 'http_status' - ] - ) + writer.writerow( + ["username", "name", "url_main", "url_user", "exists", "http_status"] + ) for site in results: - writer.writerow([username, - site, - results[site]['url_main'], - results[site]['url_user'], - str(results[site]['status'].status), - results[site]['http_status'], - ]) + writer.writerow( + [ + username, + site, + results[site]["url_main"], + results[site]["url_user"], + str(results[site]["status"].status), + results[site]["http_status"], + ] + ) def generate_txt_report(username: str, results: dict, file): @@ -242,12 +254,11 @@ def generate_txt_report(username: str, results: dict, file): if dictionary.get("status").status == QueryStatus.CLAIMED: exists_counter += 1 file.write(dictionary["url_user"] + "\n") - file.write(f'Total Websites Username Detected On : {exists_counter}') + file.write(f"Total Websites Username Detected On : {exists_counter}") def generate_json_report(username: str, results: dict, file, report_type): - exists_counter = 0 - is_report_per_line = report_type.startswith('ndjson') + is_report_per_line = report_type.startswith("ndjson") all_json = {} for sitename in results: @@ -257,11 +268,11 @@ def generate_json_report(username: str, results: dict, file, report_type): continue data = dict(site_result) - data['status'] = data['status'].json() + data["status"] = data["status"].json() if is_report_per_line: - data['sitename'] = sitename - file.write(json.dumps(data) + '\n') + data["sitename"] = sitename + file.write(json.dumps(data) + "\n") else: all_json[sitename] = data @@ -269,9 +280,9 @@ def generate_json_report(username: str, results: dict, file, report_type): file.write(json.dumps(all_json)) -''' +""" XMIND 8 Functions -''' +""" def save_xmind_report(filename, username, results): @@ -284,7 +295,6 @@ def save_xmind_report(filename, username, results): def design_sheet(sheet, username, results): - ##all tag list alltags = {} supposed_data = {} @@ -300,7 +310,7 @@ def design_sheet(sheet, username, results): dictionary = results[website_name] if dictionary.get("status").status == QueryStatus.CLAIMED: - ## firsttime I found that entry + # firsttime I found that entry for tag in dictionary.get("status").tags: if tag.strip() == "": continue @@ -329,22 +339,22 @@ def design_sheet(sheet, username, results): # suppose target data if not isinstance(v, list): currentsublabel = userlink.addSubTopic() - field = 'fullname' if k == 'name' else k - if not field in supposed_data: + field = "fullname" if k == "name" else k + if field not in supposed_data: supposed_data[field] = [] supposed_data[field].append(v) currentsublabel.setTitle("%s: %s" % (k, v)) else: for currentval in v: currentsublabel = userlink.addSubTopic() - field = 'fullname' if k == 'name' else k - if not field in supposed_data: + field = "fullname" if k == "name" else k + if field not in supposed_data: supposed_data[field] = [] supposed_data[field].append(currentval) currentsublabel.setTitle("%s: %s" % (k, currentval)) - ### Add Supposed DATA + # add supposed data filterede_supposed_data = filter_supposed_data(supposed_data) - if (len(filterede_supposed_data) > 0): + if len(filterede_supposed_data) > 0: undefinedsection = root_topic1.addSubTopic() undefinedsection.setTitle("SUPPOSED DATA") for k, v in filterede_supposed_data.items(): @@ -353,7 +363,9 @@ def design_sheet(sheet, username, results): def check_supported_json_format(value): - if value and not value in SUPPORTED_JSON_REPORT_FORMATS: - raise ArgumentTypeError(f'JSON report type must be one of the following types: ' - + ', '.join(SUPPORTED_JSON_REPORT_FORMATS)) + if value and value not in SUPPORTED_JSON_REPORT_FORMATS: + raise ArgumentTypeError( + "JSON report type must be one of the following types: " + + ", ".join(SUPPORTED_JSON_REPORT_FORMATS) + ) return value diff --git a/maigret/resources/data.json b/maigret/resources/data.json index 145b319..c51c139 100644 --- a/maigret/resources/data.json +++ b/maigret/resources/data.json @@ -12148,7 +12148,7 @@ "us" ], "headers": { - "authorization": "Bearer BQAEeuyBT6S535Anlx4wU-pfPjjgiE8r2e7j0eOSnwZjSvjFvQgDzxwV__03-WNbwxPKyGehoJ5pQCBwUqs" + "authorization": "Bearer BQCe5Yx_Evl2m1Td_86SzknoVan7OZxN6y6WaR7xNrJb8vnZ5B7VZY401MdivLmCQcyv0LUkfo1M-15_m-E" }, "errors": { "Spotify is currently not available in your country.": "Access denied in your country, use proxy/vpn" @@ -13458,7 +13458,7 @@ "sec-ch-ua": "Google Chrome\";v=\"87\", \" Not;A Brand\";v=\"99\", \"Chromium\";v=\"87\"", "authorization": "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA", "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36", - "x-guest-token": "1387733472027070474" + "x-guest-token": "1388029767388106752" }, "errors": { "Bad guest token": "x-guest-token update required" @@ -13661,6 +13661,7 @@ "type": "vk_id", "checkType": "response_url", "alexaRank": 26, + "source": "VK", "url": "https://vk.com/id{username}", "urlMain": "https://vk.com/", "usernameClaimed": "270433952", @@ -13672,6 +13673,7 @@ ], "checkType": "status_code", "alexaRank": 28938, + "source": "VK", "url": "https://vkfaces.com/vk/user/{username}", "urlMain": "https://vkfaces.com", "usernameClaimed": "adam", @@ -13835,7 +13837,7 @@ "video" ], "headers": { - "Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTk2OTczNjAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbH0.yLRq0lhenTYfe0EKKJsk5HZJZt3ykUVNBGuiMCC5HR4" + "Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTk3NzM3NDAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbH0.4O4QL4IsoiKl0Cz1310Qjo9WablDr5LIyMOPQgMS1XE" }, "activation": { "url": "https://vimeo.com/_rv/viewer", @@ -16125,10 +16127,8 @@ "gb", "uk" ], - "checkType": "message", - "absenceStrs": "The specified member cannot be found. Please enter a member's entire name.", + "engine": "XenForo", "alexaRank": 12725, - "url": "https://forums.overclockers.co.uk/members/?username={username}", "urlMain": "https://forums.overclockers.co.uk", "usernameClaimed": "adam", "usernameUnclaimed": "noonewouldeverusethis7" @@ -23749,6 +23749,69 @@ "urlMain": "https://opensea.io", "usernameClaimed": "admin", "usernameUnclaimed": "noonewouldeverusethis7" + }, + "SmiHub": { + "checkType": "message", + "presenseStrs": [ + "profile", + "user-page", + "user", + " data-name=", + "user__img" + ], + "absenceStrs": [ + "text-lg mb-3" + ], + "source": "Instagram", + "url": "https://smihub.com/v/{username}", + "urlMain": "https://smihub.com", + "usernameClaimed": "blue", + "usernameUnclaimed": "noonewouldeverusethis7" + }, + "do100verno.info": { + "checkType": "message", + "presenseStrs": [ + "white-space: nowrap;" + ], + "absenceStrs": [ + "l-main", + " l-mainDcL", + " l-usrMenu" + ], + "url": "https://do100verno.info/card/{username}", + "urlMain": "https://do100verno.info", + "usernameClaimed": "ekostyle", + "usernameUnclaimed": "noonewouldeverusethis7" + }, + "www.kinokopilka.pro": { + "checkType": "message", + "presenseStrs": [ + "profile", + "user", + "people", + "users", + "/people" + ], + "url": "https://www.kinokopilka.pro/users/{username}", + "urlMain": "https://www.kinokopilka.pro", + "usernameClaimed": "admin", + "usernameUnclaimed": "noonewouldeverusethis7" + }, + "www.turpravda.com": { + "checkType": "message", + "presenseStrs": [ + "email", + " name" + ], + "absenceStrs": [ + "Title", + " Shortcut Icon", + " submit" + ], + "url": "https://www.turpravda.com/profile/{username}", + "urlMain": "https://www.turpravda.com", + "usernameClaimed": "admin", + "usernameUnclaimed": "noonewouldeverusethis7" } }, "engines": { diff --git a/maigret/result.py b/maigret/result.py index c84f80e..cb20eef 100644 --- a/maigret/result.py +++ b/maigret/result.py @@ -10,6 +10,7 @@ class QueryStatus(Enum): Describes status of query about a given username. """ + CLAIMED = "Claimed" # Username Detected AVAILABLE = "Available" # Username Not Detected UNKNOWN = "Unknown" # Error Occurred While Trying To Detect Username @@ -27,14 +28,24 @@ class QueryStatus(Enum): return self.value -class QueryResult(): +class QueryResult: """Query Result Object. Describes result of query about a given username. """ - def __init__(self, username, site_name, site_url_user, status, ids_data=None, - query_time=None, context=None, error=None, tags=[]): + def __init__( + self, + username, + site_name, + site_url_user, + status, + ids_data=None, + query_time=None, + context=None, + error=None, + tags=[], + ): """Create Query Result Object. Contains information about a specific method of detecting usernames on @@ -77,12 +88,12 @@ class QueryResult(): def json(self): return { - 'username': self.username, - 'site_name': self.site_name, - 'url': self.site_url_user, - 'status': str(self.status), - 'ids': self.ids_data or {}, - 'tags': self.tags, + "username": self.username, + "site_name": self.site_name, + "url": self.site_url_user, + "status": str(self.status), + "ids": self.ids_data or {}, + "tags": self.tags, } def is_found(self): diff --git a/maigret/sites.py b/maigret/sites.py index f77aa31..f821641 100644 --- a/maigret/sites.py +++ b/maigret/sites.py @@ -1,8 +1,9 @@ -# -*- coding: future_annotations -*- +# ****************************** -*- """Maigret Sites Information""" import copy import json import sys +from typing import Optional import requests @@ -10,12 +11,48 @@ from .utils import CaseConverter, URLMatcher, is_country_tag # TODO: move to data.json SUPPORTED_TAGS = [ - 'gaming', 'coding', 'photo', 'music', 'blog', 'finance', 'freelance', 'dating', - 'tech', 'forum', 'porn', 'erotic', 'webcam', 'video', 'movies', 'hacking', 'art', - 'discussion', 'sharing', 'writing', 'wiki', 'business', 'shopping', 'sport', - 'books', 'news', 'documents', 'travel', 'maps', 'hobby', 'apps', 'classified', - 'career', 'geosocial', 'streaming', 'education', 'networking', 'torrent', - 'science', 'medicine', 'reading', 'stock', + "gaming", + "coding", + "photo", + "music", + "blog", + "finance", + "freelance", + "dating", + "tech", + "forum", + "porn", + "erotic", + "webcam", + "video", + "movies", + "hacking", + "art", + "discussion", + "sharing", + "writing", + "wiki", + "business", + "shopping", + "sport", + "books", + "news", + "documents", + "travel", + "maps", + "hobby", + "apps", + "classified", + "career", + "geosocial", + "streaming", + "education", + "networking", + "torrent", + "science", + "medicine", + "reading", + "stock", ] @@ -32,13 +69,13 @@ class MaigretEngine: class MaigretSite: NOT_SERIALIZABLE_FIELDS = [ - 'name', - 'engineData', - 'requestFuture', - 'detectedEngine', - 'engineObj', - 'stats', - 'urlRegexp', + "name", + "engineData", + "requestFuture", + "detectedEngine", + "engineObj", + "stats", + "urlRegexp", ] def __init__(self, name, information): @@ -49,15 +86,15 @@ class MaigretSite: self.ignore403 = False self.tags = [] - self.type = 'username' + self.type = "username" self.headers = {} self.errors = {} self.activation = {} - self.url_subpath = '' + self.url_subpath = "" self.regex_check = None self.url_probe = None - self.check_type = '' - self.request_head_only = '' + self.check_type = "" + self.request_head_only = "" self.get_params = {} self.presense_strs = [] @@ -84,26 +121,29 @@ class MaigretSite: return f"{self.name} ({self.url_main})" def update_detectors(self): - if 'url' in self.__dict__: + if "url" in self.__dict__: url = self.url - for group in ['urlMain', 'urlSubpath']: + for group in ["urlMain", "urlSubpath"]: if group in url: - url = url.replace('{' + group + '}', self.__dict__[CaseConverter.camel_to_snake(group)]) + url = url.replace( + "{" + group + "}", + self.__dict__[CaseConverter.camel_to_snake(group)], + ) self.url_regexp = URLMatcher.make_profile_url_regexp(url, self.regex_check) - def detect_username(self, url: str) -> str: + def detect_username(self, url: str) -> Optional[str]: if self.url_regexp: match_groups = self.url_regexp.match(url) if match_groups: - return match_groups.groups()[-1].rstrip('/') + return match_groups.groups()[-1].rstrip("/") return None @property def pretty_name(self): if self.source: - return f'{self.name} [{self.source}]' + return f"{self.name} [{self.source}]" return self.name @property @@ -113,7 +153,7 @@ class MaigretSite: # convert to camelCase field = CaseConverter.snake_to_camel(k) # strip empty elements - if v in (False, '', [], {}, None, sys.maxsize, 'username'): + if v in (False, "", [], {}, None, sys.maxsize, "username"): continue if field in self.NOT_SERIALIZABLE_FIELDS: continue @@ -121,13 +161,13 @@ class MaigretSite: return result - def update(self, updates: dict) -> MaigretSite: + def update(self, updates: "dict") -> "MaigretSite": self.__dict__.update(updates) self.update_detectors() return self - def update_from_engine(self, engine: MaigretEngine) -> MaigretSite: + def update_from_engine(self, engine: MaigretEngine) -> "MaigretSite": engine_data = engine.site for k, v in engine_data.items(): field = CaseConverter.camel_to_snake(k) @@ -145,7 +185,7 @@ class MaigretSite: return self - def strip_engine_data(self) -> MaigretSite: + def strip_engine_data(self) -> "MaigretSite": if not self.engine_obj: return self @@ -190,30 +230,47 @@ class MaigretDatabase: def sites_dict(self): return {site.name: site for site in self._sites} - def ranked_sites_dict(self, reverse=False, top=sys.maxsize, tags=[], names=[], - disabled=True, id_type='username'): + def ranked_sites_dict( + self, + reverse=False, + top=sys.maxsize, + tags=[], + names=[], + disabled=True, + id_type="username", + ): """ - Ranking and filtering of the sites list + Ranking and filtering of the sites list """ normalized_names = list(map(str.lower, names)) normalized_tags = list(map(str.lower, tags)) is_name_ok = lambda x: x.name.lower() in normalized_names is_source_ok = lambda x: x.source and x.source.lower() in normalized_names - is_engine_ok = lambda x: isinstance(x.engine, str) and x.engine.lower() in normalized_tags + is_engine_ok = ( + lambda x: isinstance(x.engine, str) and x.engine.lower() in normalized_tags + ) is_tags_ok = lambda x: set(x.tags).intersection(set(normalized_tags)) - is_disabled_needed = lambda x: not x.disabled or ('disabled' in tags or disabled) + is_disabled_needed = lambda x: not x.disabled or ( + "disabled" in tags or disabled + ) is_id_type_ok = lambda x: x.type == id_type filter_tags_engines_fun = lambda x: not tags or is_engine_ok(x) or is_tags_ok(x) filter_names_fun = lambda x: not names or is_name_ok(x) or is_source_ok(x) - filter_fun = lambda x: filter_tags_engines_fun(x) and filter_names_fun(x) \ - and is_disabled_needed(x) and is_id_type_ok(x) + filter_fun = ( + lambda x: filter_tags_engines_fun(x) + and filter_names_fun(x) + and is_disabled_needed(x) + and is_id_type_ok(x) + ) filtered_list = [s for s in self.sites if filter_fun(s)] - sorted_list = sorted(filtered_list, key=lambda x: x.alexa_rank, reverse=reverse)[:top] + sorted_list = sorted( + filtered_list, key=lambda x: x.alexa_rank, reverse=reverse + )[:top] return {site.name: site for site in sorted_list} @property @@ -224,7 +281,7 @@ class MaigretDatabase: def engines_dict(self): return {engine.name: engine for engine in self._engines} - def update_site(self, site: MaigretSite) -> MaigretDatabase: + def update_site(self, site: MaigretSite) -> "MaigretDatabase": for s in self._sites: if s.name == site.name: s = site @@ -233,20 +290,20 @@ class MaigretDatabase: self._sites.append(site) return self - def save_to_file(self, filename: str) -> MaigretDatabase: + def save_to_file(self, filename: str) -> "MaigretDatabase": db_data = { - 'sites': {site.name: site.strip_engine_data().json for site in self._sites}, - 'engines': {engine.name: engine.json for engine in self._engines}, + "sites": {site.name: site.strip_engine_data().json for site in self._sites}, + "engines": {engine.name: engine.json for engine in self._engines}, } json_data = json.dumps(db_data, indent=4) - with open(filename, 'w') as f: + with open(filename, "w") as f: f.write(json_data) return self - def load_from_json(self, json_data: dict) -> MaigretDatabase: + def load_from_json(self, json_data: dict) -> "MaigretDatabase": # Add all of site information from the json file to internal site list. site_data = json_data.get("sites", {}) engines_data = json_data.get("engines", {}) @@ -258,30 +315,32 @@ class MaigretDatabase: try: maigret_site = MaigretSite(site_name, site_data[site_name]) - engine = site_data[site_name].get('engine') + engine = site_data[site_name].get("engine") if engine: maigret_site.update_from_engine(self.engines_dict[engine]) self._sites.append(maigret_site) except KeyError as error: - raise ValueError(f"Problem parsing json content for site {site_name}: " - f"Missing attribute {str(error)}." - ) + raise ValueError( + f"Problem parsing json content for site {site_name}: " + f"Missing attribute {str(error)}." + ) return self - def load_from_str(self, db_str: str) -> MaigretDatabase: + def load_from_str(self, db_str: "str") -> "MaigretDatabase": try: data = json.loads(db_str) except Exception as error: - raise ValueError(f"Problem parsing json contents from str" - f"'{db_str[:50]}'...: {str(error)}." - ) + raise ValueError( + f"Problem parsing json contents from str" + f"'{db_str[:50]}'...: {str(error)}." + ) return self.load_from_json(data) - def load_from_url(self, url: str) -> MaigretDatabase: - is_url_valid = url.startswith('http://') or url.startswith('https://') + def load_from_url(self, url: str) -> "MaigretDatabase": + is_url_valid = url.startswith("http://") or url.startswith("https://") if not is_url_valid: raise FileNotFoundError(f"Invalid data file URL '{url}'.") @@ -289,38 +348,40 @@ class MaigretDatabase: try: response = requests.get(url=url) except Exception as error: - raise FileNotFoundError(f"Problem while attempting to access " - f"data file URL '{url}': " - f"{str(error)}" - ) + raise FileNotFoundError( + f"Problem while attempting to access " + f"data file URL '{url}': " + f"{str(error)}" + ) if response.status_code == 200: try: data = response.json() except Exception as error: - raise ValueError(f"Problem parsing json contents at " - f"'{url}': {str(error)}." - ) + raise ValueError( + f"Problem parsing json contents at " f"'{url}': {str(error)}." + ) else: - raise FileNotFoundError(f"Bad response while accessing " - f"data file URL '{url}'." - ) + raise FileNotFoundError( + f"Bad response while accessing " f"data file URL '{url}'." + ) return self.load_from_json(data) - def load_from_file(self, filename: str) -> MaigretDatabase: + def load_from_file(self, filename: "str") -> "MaigretDatabase": try: - with open(filename, 'r', encoding='utf-8') as file: + with open(filename, "r", encoding="utf-8") as file: try: data = json.load(file) except Exception as error: - raise ValueError(f"Problem parsing json contents from " - f"file '{filename}': {str(error)}." - ) + raise ValueError( + f"Problem parsing json contents from " + f"file '{filename}': {str(error)}." + ) except FileNotFoundError as error: - raise FileNotFoundError(f"Problem while attempting to access " - f"data file '{filename}'." - ) + raise FileNotFoundError( + f"Problem while attempting to access " f"data file '{filename}'." + ) from error return self.load_from_json(data) @@ -328,8 +389,8 @@ class MaigretDatabase: sites = sites_dict or self.sites_dict found_flags = {} for _, s in sites.items(): - if 'presense_flag' in s.stats: - flag = s.stats['presense_flag'] + if "presense_flag" in s.stats: + flag = s.stats["presense_flag"] found_flags[flag] = found_flags.get(flag, 0) + 1 return found_flags @@ -338,7 +399,7 @@ class MaigretDatabase: if not sites_dict: sites_dict = self.sites_dict() - output = '' + output = "" disabled_count = 0 total_count = len(sites_dict) urls = {} @@ -349,18 +410,18 @@ class MaigretDatabase: disabled_count += 1 url = URLMatcher.extract_main_part(site.url) - if url.startswith('{username}'): - url = 'SUBDOMAIN' - elif url == '': - url = f'{site.url} ({site.engine})' + if url.startswith("{username}"): + url = "SUBDOMAIN" + elif url == "": + url = f"{site.url} ({site.engine})" else: - parts = url.split('/') - url = '/' + '/'.join(parts[1:]) + parts = url.split("/") + url = "/" + "/".join(parts[1:]) urls[url] = urls.get(url, 0) + 1 if not site.tags: - tags['NO_TAGS'] = tags.get('NO_TAGS', 0) + 1 + tags["NO_TAGS"] = tags.get("NO_TAGS", 0) + 1 for tag in site.tags: if is_country_tag(tag): @@ -368,17 +429,17 @@ class MaigretDatabase: continue tags[tag] = tags.get(tag, 0) + 1 - output += f'Enabled/total sites: {total_count - disabled_count}/{total_count}\n' - output += 'Top sites\' profile URLs:\n' + output += f"Enabled/total sites: {total_count - disabled_count}/{total_count}\n" + output += "Top sites' profile URLs:\n" for url, count in sorted(urls.items(), key=lambda x: x[1], reverse=True)[:20]: if count == 1: break - output += f'{count}\t{url}\n' - output += 'Top sites\' tags:\n' + output += f"{count}\t{url}\n" + output += "Top sites' tags:\n" for tag, count in sorted(tags.items(), key=lambda x: x[1], reverse=True): - mark = '' - if not tag in SUPPORTED_TAGS: - mark = ' (non-standard)' - output += f'{count}\t{tag}{mark}\n' + mark = "" + if tag not in SUPPORTED_TAGS: + mark = " (non-standard)" + output += f"{count}\t{tag}{mark}\n" return output diff --git a/maigret/submit.py b/maigret/submit.py index 40d276e..26892ec 100644 --- a/maigret/submit.py +++ b/maigret/submit.py @@ -1,39 +1,57 @@ +import asyncio import difflib +import re import requests -from .checking import * +from .activation import import_aiohttp_cookies +from .checking import maigret +from .result import QueryStatus +from .sites import MaigretDatabase, MaigretSite from .utils import get_random_user_agent -DESIRED_STRINGS = ["username", "not found", "пользователь", "profile", "lastname", "firstname", "biography", - "birthday", "репутация", "информация", "e-mail"] +DESIRED_STRINGS = [ + "username", + "not found", + "пользователь", + "profile", + "lastname", + "firstname", + "biography", + "birthday", + "репутация", + "информация", + "e-mail", +] -SUPPOSED_USERNAMES = ['alex', 'god', 'admin', 'red', 'blue', 'john'] +SUPPOSED_USERNAMES = ["alex", "god", "admin", "red", "blue", "john"] HEADERS = { - 'User-Agent': get_random_user_agent(), + "User-Agent": get_random_user_agent(), } RATIO = 0.6 TOP_FEATURES = 5 -URL_RE = re.compile(r'https?://(www\.)?') +URL_RE = re.compile(r"https?://(www\.)?") def get_match_ratio(x): - return round(max([ - difflib.SequenceMatcher(a=x.lower(), b=y).ratio() - for y in DESIRED_STRINGS - ]), 2) + return round( + max( + [difflib.SequenceMatcher(a=x.lower(), b=y).ratio() for y in DESIRED_STRINGS] + ), + 2, + ) def extract_mainpage_url(url): - return '/'.join(url.split('/', 3)[:3]) + return "/".join(url.split("/", 3)[:3]) async def site_self_check(site, logger, semaphore, db: MaigretDatabase, silent=False): changes = { - 'disabled': False, + "disabled": False, } check_data = [ @@ -41,7 +59,7 @@ async def site_self_check(site, logger, semaphore, db: MaigretDatabase, silent=F (site.username_unclaimed, QueryStatus.AVAILABLE), ] - logger.info(f'Checking {site.name}...') + logger.info(f"Checking {site.name}...") for username, status in check_data: results_dict = await maigret( @@ -58,10 +76,10 @@ async def site_self_check(site, logger, semaphore, db: MaigretDatabase, silent=F # TODO: make normal checking if site.name not in results_dict: logger.info(results_dict) - changes['disabled'] = True + changes["disabled"] = True continue - result = results_dict[site.name]['status'] + result = results_dict[site.name]["status"] site_status = result.status @@ -70,20 +88,23 @@ async def site_self_check(site, logger, semaphore, db: MaigretDatabase, silent=F msgs = site.absence_strs etype = site.check_type logger.warning( - f'Error while searching {username} in {site.name}: {result.context}, {msgs}, type {etype}') + f"Error while searching {username} in {site.name}: {result.context}, {msgs}, type {etype}" + ) # don't disable in case of available username if status == QueryStatus.CLAIMED: - changes['disabled'] = True + changes["disabled"] = True elif status == QueryStatus.CLAIMED: - logger.warning(f'Not found `{username}` in {site.name}, must be claimed') + logger.warning( + f"Not found `{username}` in {site.name}, must be claimed" + ) logger.info(results_dict[site.name]) - changes['disabled'] = True + changes["disabled"] = True else: - logger.warning(f'Found `{username}` in {site.name}, must be available') + logger.warning(f"Found `{username}` in {site.name}, must be available") logger.info(results_dict[site.name]) - changes['disabled'] = True + changes["disabled"] = True - logger.info(f'Site {site.name} checking is finished') + logger.info(f"Site {site.name} checking is finished") return changes @@ -93,31 +114,31 @@ async def detect_known_engine(db, url_exists, url_mainpage): r = requests.get(url_mainpage) except Exception as e: print(e) - print('Some error while checking main page') + print("Some error while checking main page") return None - for e in db.engines: - strs_to_check = e.__dict__.get('presenseStrs') + for engine in db.engines: + strs_to_check = engine.__dict__.get("presenseStrs") if strs_to_check and r and r.text: all_strs_in_response = True for s in strs_to_check: - if not s in r.text: + if s not in r.text: all_strs_in_response = False if all_strs_in_response: - engine_name = e.__dict__.get('name') - print(f'Detected engine {engine_name} for site {url_mainpage}') + engine_name = engine.__dict__.get("name") + print(f"Detected engine {engine_name} for site {url_mainpage}") sites = [] for u in SUPPOSED_USERNAMES: site_data = { - 'urlMain': url_mainpage, - 'name': url_mainpage.split('//')[0], - 'engine': engine_name, - 'usernameClaimed': u, - 'usernameUnclaimed': 'noonewouldeverusethis7', + "urlMain": url_mainpage, + "name": url_mainpage.split("//")[0], + "engine": engine_name, + "usernameClaimed": u, + "usernameUnclaimed": "noonewouldeverusethis7", } - maigret_site = MaigretSite(url_mainpage.split('/')[-1], site_data) + maigret_site = MaigretSite(url_mainpage.split("/")[-1], site_data) maigret_site.update_from_engine(db.engines_dict[engine_name]) sites.append(maigret_site) @@ -126,15 +147,19 @@ async def detect_known_engine(db, url_exists, url_mainpage): return None -async def check_features_manually(db, url_exists, url_mainpage, cookie_file, logger, redirects=True): - url_parts = url_exists.split('/') +async def check_features_manually( + db, url_exists, url_mainpage, cookie_file, logger, redirects=True +): + url_parts = url_exists.split("/") supposed_username = url_parts[-1] - new_name = input(f'Is "{supposed_username}" a valid username? If not, write it manually: ') + new_name = input( + f'Is "{supposed_username}" a valid username? If not, write it manually: ' + ) if new_name: supposed_username = new_name - non_exist_username = 'noonewouldeverusethis7' + non_exist_username = "noonewouldeverusethis7" - url_user = url_exists.replace(supposed_username, '{username}') + url_user = url_exists.replace(supposed_username, "{username}") url_not_exists = url_exists.replace(supposed_username, non_exist_username) # cookies @@ -143,15 +168,18 @@ async def check_features_manually(db, url_exists, url_mainpage, cookie_file, log cookie_jar = await import_aiohttp_cookies(cookie_file) cookie_dict = {c.key: c.value for c in cookie_jar} - exists_resp = requests.get(url_exists, cookies=cookie_dict, headers=HEADERS, allow_redirects=redirects) + exists_resp = requests.get( + url_exists, cookies=cookie_dict, headers=HEADERS, allow_redirects=redirects + ) logger.debug(exists_resp.status_code) logger.debug(exists_resp.text) - non_exists_resp = requests.get(url_not_exists, cookies=cookie_dict, headers=HEADERS, allow_redirects=redirects) + non_exists_resp = requests.get( + url_not_exists, cookies=cookie_dict, headers=HEADERS, allow_redirects=redirects + ) logger.debug(non_exists_resp.status_code) logger.debug(non_exists_resp.text) - a = exists_resp.text b = non_exists_resp.text @@ -162,61 +190,81 @@ async def check_features_manually(db, url_exists, url_mainpage, cookie_file, log b_minus_a = tokens_b.difference(tokens_a) if len(a_minus_b) == len(b_minus_a) == 0: - print('The pages for existing and non-existing account are the same!') + print("The pages for existing and non-existing account are the same!") - top_features_count = int(input(f'Specify count of features to extract [default {TOP_FEATURES}]: ') or TOP_FEATURES) + top_features_count = int( + input(f"Specify count of features to extract [default {TOP_FEATURES}]: ") + or TOP_FEATURES + ) - presence_list = sorted(a_minus_b, key=get_match_ratio, reverse=True)[:top_features_count] + presence_list = sorted(a_minus_b, key=get_match_ratio, reverse=True)[ + :top_features_count + ] - print('Detected text features of existing account: ' + ', '.join(presence_list)) - features = input('If features was not detected correctly, write it manually: ') + print("Detected text features of existing account: " + ", ".join(presence_list)) + features = input("If features was not detected correctly, write it manually: ") if features: - presence_list = features.split(',') + presence_list = features.split(",") - absence_list = sorted(b_minus_a, key=get_match_ratio, reverse=True)[:top_features_count] - print('Detected text features of non-existing account: ' + ', '.join(absence_list)) - features = input('If features was not detected correctly, write it manually: ') + absence_list = sorted(b_minus_a, key=get_match_ratio, reverse=True)[ + :top_features_count + ] + print("Detected text features of non-existing account: " + ", ".join(absence_list)) + features = input("If features was not detected correctly, write it manually: ") if features: - absence_list = features.split(',') + absence_list = features.split(",") site_data = { - 'absenceStrs': absence_list, - 'presenseStrs': presence_list, - 'url': url_user, - 'urlMain': url_mainpage, - 'usernameClaimed': supposed_username, - 'usernameUnclaimed': non_exist_username, - 'checkType': 'message', + "absenceStrs": absence_list, + "presenseStrs": presence_list, + "url": url_user, + "urlMain": url_mainpage, + "usernameClaimed": supposed_username, + "usernameUnclaimed": non_exist_username, + "checkType": "message", } - site = MaigretSite(url_mainpage.split('/')[-1], site_data) + site = MaigretSite(url_mainpage.split("/")[-1], site_data) return site async def submit_dialog(db, url_exists, cookie_file, logger): - domain_raw = URL_RE.sub('', url_exists).strip().strip('/') - domain_raw = domain_raw.split('/')[0] + domain_raw = URL_RE.sub("", url_exists).strip().strip("/") + domain_raw = domain_raw.split("/")[0] # check for existence matched_sites = list(filter(lambda x: domain_raw in x.url_main + x.url, db.sites)) if matched_sites: - print(f'Sites with domain "{domain_raw}" already exists in the Maigret database!') - status = lambda s: '(disabled)' if s.disabled else '' - url_block = lambda s: f'\n\t{s.url_main}\n\t{s.url}' - print('\n'.join([f'{site.name} {status(site)}{url_block(site)}' for site in matched_sites])) + print( + f'Sites with domain "{domain_raw}" already exists in the Maigret database!' + ) + status = lambda s: "(disabled)" if s.disabled else "" + url_block = lambda s: f"\n\t{s.url_main}\n\t{s.url}" + print( + "\n".join( + [ + f"{site.name} {status(site)}{url_block(site)}" + for site in matched_sites + ] + ) + ) - if input(f'Do you want to continue? [yN] ').lower() in 'n': + if input("Do you want to continue? [yN] ").lower() in "n": return False url_mainpage = extract_mainpage_url(url_exists) sites = await detect_known_engine(db, url_exists, url_mainpage) if not sites: - print('Unable to detect site engine, lets generate checking features') - sites = [await check_features_manually(db, url_exists, url_mainpage, cookie_file, logger)] + print("Unable to detect site engine, lets generate checking features") + sites = [ + await check_features_manually( + db, url_exists, url_mainpage, cookie_file, logger + ) + ] logger.debug(sites[0].__dict__) @@ -227,15 +275,24 @@ async def submit_dialog(db, url_exists, cookie_file, logger): for s in sites: chosen_site = s result = await site_self_check(s, logger, sem, db) - if not result['disabled']: + if not result["disabled"]: found = True break if not found: - print(f'Sorry, we couldn\'t find params to detect account presence/absence in {chosen_site.name}.') - print('Try to run this mode again and increase features count or choose others.') + print( + f"Sorry, we couldn't find params to detect account presence/absence in {chosen_site.name}." + ) + print( + "Try to run this mode again and increase features count or choose others." + ) else: - if input(f'Site {chosen_site.name} successfully checked. Do you want to save it in the Maigret DB? [Yn] ').lower() in 'y': + if ( + input( + f"Site {chosen_site.name} successfully checked. Do you want to save it in the Maigret DB? [Yn] " + ).lower() + in "y" + ): logger.debug(chosen_site.json) site_data = chosen_site.strip_engine_data() logger.debug(site_data.json) diff --git a/maigret/types.py b/maigret/types.py index 25e539d..b2f7b86 100644 --- a/maigret/types.py +++ b/maigret/types.py @@ -3,26 +3,3 @@ from typing import Callable, Any, Tuple # search query QueryDraft = Tuple[Callable, Any, Any] - -# error got as a result of completed search query -class CheckError: - _type = 'Unknown' - _desc = '' - - def __init__(self, typename, desc=''): - self._type = typename - self._desc = desc - - def __str__(self): - if not self._desc: - return f'{self._type} error' - - return f'{self._type} error: {self._desc}' - - @property - def type(self): - return self._type - - @property - def desc(self): - return self._desc diff --git a/maigret/utils.py b/maigret/utils.py index 877fb8a..3de46f3 100644 --- a/maigret/utils.py +++ b/maigret/utils.py @@ -3,80 +3,80 @@ import random DEFAULT_USER_AGENTS = [ - 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36', + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36", ] class CaseConverter: @staticmethod def camel_to_snake(camelcased_string: str) -> str: - return re.sub(r'(? str: - formatted = ''.join(word.title() for word in snakecased_string.split('_')) + formatted = "".join(word.title() for word in snakecased_string.split("_")) result = formatted[0].lower() + formatted[1:] return result @staticmethod def snake_to_title(snakecased_string: str) -> str: - words = snakecased_string.split('_') + words = snakecased_string.split("_") words[0] = words[0].title() - return ' '.join(words) + return " ".join(words) def is_country_tag(tag: str) -> bool: """detect if tag represent a country""" - return bool(re.match("^([a-zA-Z]){2}$", tag)) or tag == 'global' + return bool(re.match("^([a-zA-Z]){2}$", tag)) or tag == "global" def enrich_link_str(link: str) -> str: link = link.strip() - if link.startswith('www.') or (link.startswith('http') and '//' in link): + if link.startswith("www.") or (link.startswith("http") and "//" in link): return f'{link}' return link class URLMatcher: - _HTTP_URL_RE_STR = '^https?://(www.)?(.+)$' + _HTTP_URL_RE_STR = "^https?://(www.)?(.+)$" HTTP_URL_RE = re.compile(_HTTP_URL_RE_STR) - UNSAFE_SYMBOLS = '.?' + UNSAFE_SYMBOLS = ".?" @classmethod def extract_main_part(self, url: str) -> str: match = self.HTTP_URL_RE.search(url) if match and match.group(2): - return match.group(2).rstrip('/') + return match.group(2).rstrip("/") - return '' + return "" @classmethod - def make_profile_url_regexp(self, url: str, username_regexp: str = ''): + def make_profile_url_regexp(self, url: str, username_regexp: str = ""): url_main_part = self.extract_main_part(url) for c in self.UNSAFE_SYMBOLS: - url_main_part = url_main_part.replace(c, f'\\{c}') - username_regexp = username_regexp or '.+?' + url_main_part = url_main_part.replace(c, f"\\{c}") + username_regexp = username_regexp or ".+?" - url_regexp = url_main_part.replace('{username}', f'({username_regexp})') - regexp_str = self._HTTP_URL_RE_STR.replace('(.+)', url_regexp) + url_regexp = url_main_part.replace("{username}", f"({username_regexp})") + regexp_str = self._HTTP_URL_RE_STR.replace("(.+)", url_regexp) return re.compile(regexp_str) -def get_dict_ascii_tree(items, prepend='', new_line=True): - text = '' +def get_dict_ascii_tree(items, prepend="", new_line=True): + text = "" for num, item in enumerate(items): - box_symbol = '┣╸' if num != len(items) - 1 else '┗╸' + box_symbol = "┣╸" if num != len(items) - 1 else "┗╸" if type(item) == tuple: field_name, field_value = item - if field_value.startswith('[\''): + if field_value.startswith("['"): is_last_item = num == len(items) - 1 - prepend_symbols = ' ' * 3 if is_last_item else ' ┃ ' + prepend_symbols = " " * 3 if is_last_item else " ┃ " field_value = get_dict_ascii_tree(eval(field_value), prepend_symbols) - text += f'\n{prepend}{box_symbol}{field_name}: {field_value}' + text += f"\n{prepend}{box_symbol}{field_name}: {field_value}" else: - text += f'\n{prepend}{box_symbol} {item}' + text += f"\n{prepend}{box_symbol} {item}" if not new_line: text = text[1:] diff --git a/setup.cfg b/setup.cfg index 5daf14f..ce606a0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,3 +1,9 @@ [egg_info] tag_build = -tag_date = 0 \ No newline at end of file +tag_date = 0 + +[flake8] +per-file-ignores = __init__.py:F401 + +[mypy] +ignore_missing_imports = True \ No newline at end of file diff --git a/test.sh b/test.sh new file mode 100755 index 0000000..c30a4a9 --- /dev/null +++ b/test.sh @@ -0,0 +1,2 @@ +#!/bin/sh +pytest tests diff --git a/wizard.py b/wizard.py index 4ada966..8119de0 100755 --- a/wizard.py +++ b/wizard.py @@ -26,18 +26,24 @@ if __name__ == '__main__': # user input username = input('Enter username to search: ') - sites_count_raw = input(f'Select the number of sites to search ({TOP_SITES_COUNT} for default, {len(db.sites_dict)} max): ') + sites_count_raw = input( + f'Select the number of sites to search ({TOP_SITES_COUNT} for default, {len(db.sites_dict)} max): ' + ) sites_count = int(sites_count_raw) or TOP_SITES_COUNT sites = db.ranked_sites_dict(top=sites_count) show_progressbar_raw = input('Do you want to show a progressbar? [Yn] ') - show_progressbar = show_progressbar_raw.lower() != 'n' + show_progressbar = show_progressbar_raw.lower() != 'n' - extract_info_raw = input('Do you want to extract additional info from accounts\' pages? [Yn] ') - extract_info = extract_info_raw.lower() != 'n' + extract_info_raw = input( + 'Do you want to extract additional info from accounts\' pages? [Yn] ' + ) + extract_info = extract_info_raw.lower() != 'n' - use_notifier_raw = input('Do you want to use notifier for displaying results while searching? [Yn] ') + use_notifier_raw = input( + 'Do you want to use notifier for displaying results while searching? [Yn] ' + ) use_notifier = use_notifier_raw.lower() != 'n' notifier = None @@ -45,15 +51,16 @@ if __name__ == '__main__': notifier = maigret.Notifier(print_found_only=True, skip_check_errors=True) # search! - search_func = maigret.search(username=username, - site_dict=sites, - timeout=TIMEOUT, - logger=logger, - max_connections=MAX_CONNECTIONS, - query_notify=notifier, - no_progressbar=(not show_progressbar), - is_parsing_enabled=extract_info, - ) + search_func = maigret.search( + username=username, + site_dict=sites, + timeout=TIMEOUT, + logger=logger, + max_connections=MAX_CONNECTIONS, + query_notify=notifier, + no_progressbar=(not show_progressbar), + is_parsing_enabled=extract_info, + ) results = loop.run_until_complete(search_func)