diff --git a/maigret/checking.py b/maigret/checking.py index 50e05c9..f7694b9 100644 --- a/maigret/checking.py +++ b/maigret/checking.py @@ -54,10 +54,9 @@ async def get_response(request_future, logger) -> Tuple[str, int, Optional[Check decoded_content = response_content.decode(charset, "ignore") html_text = decoded_content + error = None if status_code == 0: error = CheckError("Connection lost") - else: - error = None logger.debug(html_text) @@ -73,11 +72,10 @@ async def get_response(request_future, logger) -> Tuple[str, int, Optional[Check error = CheckError("Interrupted") except Exception as e: # python-specific exceptions - if sys.version_info.minor > 6: - if isinstance(e, ssl.SSLCertVerificationError) or isinstance( - e, ssl.SSLError - ): - error = CheckError("SSL", str(e)) + if sys.version_info.minor > 6 and ( + isinstance(e, ssl.SSLCertVerificationError) or isinstance(e, ssl.SSLError) + ): + error = CheckError("SSL", str(e)) else: logger.debug(e, exc_info=True) error = CheckError("Unexpected", str(e)) @@ -109,6 +107,14 @@ def detect_error_page( return None +def debug_response_logging(url, html_text, status_code, check_error): + with open("debug.log", "a") as f: + status = status_code or "No response" + f.write(f"url: {url}\nerror: {check_error}\nr: {status}\n") + if html_text: + f.write(f"code: {status}\nresponse: {str(html_text)}\n") + + def process_site_result( response, query_notify, logger, results_info: QueryResultWrapper, site: MaigretSite ): @@ -142,11 +148,7 @@ def process_site_result( response_time = None if logger.level == logging.DEBUG: - with open("debug.txt", "a") as f: - status = status_code or "No response" - f.write(f"url: {url}\nerror: {check_error}\nr: {status}\n") - if html_text: - f.write(f"code: {status}\nresponse: {str(html_text)}\n") + debug_response_logging(url, html_text, status_code, check_error) # additional check for errors if status_code and not check_error: @@ -154,29 +156,34 @@ def process_site_result( html_text, status_code, site.errors, site.ignore403 ) - if site.activation and html_text: - is_need_activation = any( - [s for s in site.activation["marks"] if s in html_text] - ) - if is_need_activation: - method = site.activation["method"] - try: - activate_fun = getattr(ParsingActivator(), method) - # TODO: async call - activate_fun(site, logger) - except AttributeError: - logger.warning( - f"Activation method {method} for site {site.name} not found!" - ) - except Exception as e: - logger.warning(f"Failed activation {method} for site {site.name}: {str(e)}", exc_info=True) - # TODO: temporary check error + # parsing activation + is_need_activation = any( + [s for s in site.activation.get("marks", []) if s in html_text] + ) + + if site.activation and html_text and is_need_activation: + method = site.activation["method"] + try: + activate_fun = getattr(ParsingActivator(), method) + # TODO: async call + activate_fun(site, logger) + except AttributeError: + logger.warning( + f"Activation method {method} for site {site.name} not found!" + ) + except Exception as e: + logger.warning( + f"Failed activation {method} for site {site.name}: {str(e)}", + exc_info=True, + ) + # TODO: temporary check error site_name = site.pretty_name # presense flags # True by default presense_flags = site.presense_strs is_presense_detected = False + if html_text: if not presense_flags: is_presense_detected = True @@ -263,9 +270,6 @@ def process_site_result( results_info["ids_links"] = eval(extracted_ids_data.get("links", "[]")) result.ids_data = extracted_ids_data - # Notify caller about results of query. - query_notify.update(result, site.similar_search) - # Save status of request results_info["status"] = result @@ -413,6 +417,8 @@ async def check_site_for_username( response, query_notify, logger, default_result, site ) + query_notify.update(response_result['status'], site.similar_search) + return site.name, response_result @@ -617,15 +623,10 @@ async def site_self_check( "disabled": False, } - try: - check_data = [ - (site.username_claimed, QueryStatus.CLAIMED), - (site.username_unclaimed, QueryStatus.AVAILABLE), - ] - except Exception as e: - logger.error(e) - logger.error(site.__dict__) - check_data = [] + check_data = [ + (site.username_claimed, QueryStatus.CLAIMED), + (site.username_unclaimed, QueryStatus.AVAILABLE), + ] logger.info(f"Checking {site.name}...") diff --git a/maigret/errors.py b/maigret/errors.py index 77413ee..11484fd 100644 --- a/maigret/errors.py +++ b/maigret/errors.py @@ -54,7 +54,9 @@ COMMON_ERRORS = { 'Censorship', 'MGTS' ), 'Incapsula incident ID': CheckError('Bot protection', 'Incapsula'), - 'Сайт заблокирован хостинг-провайдером': CheckError('Site-specific', 'Site is disabled (Beget)'), + 'Сайт заблокирован хостинг-провайдером': CheckError( + 'Site-specific', 'Site is disabled (Beget)' + ), } ERRORS_TYPES = { diff --git a/maigret/notify.py b/maigret/notify.py index a6290c5..03d1049 100644 --- a/maigret/notify.py +++ b/maigret/notify.py @@ -152,6 +152,27 @@ class QueryNotifyPrint(QueryNotify): return + def make_colored_terminal_notify( + self, status, text, status_color, text_color, appendix + ): + text = [ + f"{Style.BRIGHT}{Fore.WHITE}[{status_color}{status}{Fore.WHITE}]" + + f"{text_color} {text}: {Style.RESET_ALL}" + + f"{appendix}" + ] + return "".join(text) + + def make_simple_terminal_notify( + self, status, text, status_color, text_color, appendix + ): + return f"[{status}] {text}: {appendix}" + + def make_terminal_notify(self, *args): + if self.color: + return self.make_colored_terminal_notify(*args) + else: + return self.make_simple_terminal_notify(*args) + def start(self, message, id_type): """Notify Start. @@ -204,40 +225,18 @@ class QueryNotifyPrint(QueryNotify): Return Value: Nothing. """ + notify = None self.result = result - if not self.result.ids_data: - ids_data_text = "" - else: + ids_data_text = "" + if self.result.ids_data: ids_data_text = get_dict_ascii_tree(self.result.ids_data.items(), " ") - def make_colored_terminal_notify( - status, text, status_color, text_color, appendix - ): - text = [ - f"{Style.BRIGHT}{Fore.WHITE}[{status_color}{status}{Fore.WHITE}]" - + f"{text_color} {text}: {Style.RESET_ALL}" - + f"{appendix}" - ] - return "".join(text) - - def make_simple_terminal_notify(status, text, appendix): - return f"[{status}] {text}: {appendix}" - - def make_terminal_notify(is_colored=True, *args): - if is_colored: - return make_colored_terminal_notify(*args) - else: - return make_simple_terminal_notify(*args) - - notify = None - # Output to the terminal is desired. if result.status == QueryStatus.CLAIMED: color = Fore.BLUE if is_similar else Fore.GREEN status = "?" if is_similar else "+" - notify = make_terminal_notify( - self.color, + notify = self.make_terminal_notify( status, result.site_name, color, @@ -246,8 +245,7 @@ class QueryNotifyPrint(QueryNotify): ) elif result.status == QueryStatus.AVAILABLE: if not self.print_found_only: - notify = make_terminal_notify( - self.color, + notify = self.make_terminal_notify( "-", result.site_name, Fore.RED, @@ -256,8 +254,7 @@ class QueryNotifyPrint(QueryNotify): ) elif result.status == QueryStatus.UNKNOWN: if not self.skip_check_errors: - notify = make_terminal_notify( - self.color, + notify = self.make_terminal_notify( "?", result.site_name, Fore.RED, @@ -267,8 +264,7 @@ class QueryNotifyPrint(QueryNotify): elif result.status == QueryStatus.ILLEGAL: if not self.print_found_only: text = "Illegal Username Format For This Site!" - notify = make_terminal_notify( - self.color, + notify = self.make_terminal_notify( "-", result.site_name, Fore.RED, @@ -286,8 +282,6 @@ class QueryNotifyPrint(QueryNotify): sys.stdout.write("\x1b[1K\r") print(notify) - return - def __str__(self): """Convert Object To String. diff --git a/maigret/report.py b/maigret/report.py index 62dfc0b..c24ddd1 100644 --- a/maigret/report.py +++ b/maigret/report.py @@ -293,11 +293,20 @@ def save_xmind_report(filename, username, results): os.remove(filename) workbook = xmind.load(filename) sheet = workbook.getPrimarySheet() - design_sheet(sheet, username, results) + design_xmind_sheet(sheet, username, results) xmind.save(workbook, path=filename) -def design_sheet(sheet, username, results): +def add_xmind_subtopic(userlink, k, v, supposed_data): + currentsublabel = userlink.addSubTopic() + field = "fullname" if k == "name" else k + if field not in supposed_data: + supposed_data[field] = [] + supposed_data[field].append(v) + currentsublabel.setTitle("%s: %s" % (k, v)) + + +def design_xmind_sheet(sheet, username, results): alltags = {} supposed_data = {} @@ -311,56 +320,43 @@ def design_sheet(sheet, username, results): for website_name in results: dictionary = results[website_name] + result_status = dictionary.get("status") + if result_status.status != QueryStatus.CLAIMED: + continue - if dictionary.get("status").status == QueryStatus.CLAIMED: - # firsttime I found that entry - for tag in dictionary.get("status").tags: - if tag.strip() == "": - continue - if tag not in alltags.keys(): - if not is_country_tag(tag): - tagsection = root_topic1.addSubTopic() - tagsection.setTitle(tag) - alltags[tag] = tagsection + stripped_tags = list(map(lambda x: x.strip(), result_status.tags)) + normalized_tags = list( + filter(lambda x: x and not is_country_tag(x), stripped_tags) + ) - category = None - for tag in dictionary.get("status").tags: - if tag.strip() == "": - continue - if not is_country_tag(tag): - category = tag + category = None + for tag in normalized_tags: + if tag in alltags.keys(): + continue + tagsection = root_topic1.addSubTopic() + tagsection.setTitle(tag) + alltags[tag] = tagsection + category = tag - if category is None: - userlink = undefinedsection.addSubTopic() - userlink.addLabel(dictionary.get("status").site_url_user) + section = alltags[category] if category else undefinedsection + userlink = section.addSubTopic() + userlink.addLabel(result_status.site_url_user) + + ids_data = result_status.ids_data or {} + for k, v in ids_data.items(): + # suppose target data + if isinstance(v, list): + for currentval in v: + add_xmind_subtopic(userlink, k, currentval, supposed_data) else: - userlink = alltags[category].addSubTopic() - userlink.addLabel(dictionary.get("status").site_url_user) + add_xmind_subtopic(userlink, k, v, supposed_data) - if dictionary.get("status").ids_data: - for k, v in dictionary.get("status").ids_data.items(): - # suppose target data - if not isinstance(v, list): - currentsublabel = userlink.addSubTopic() - field = "fullname" if k == "name" else k - if field not in supposed_data: - supposed_data[field] = [] - supposed_data[field].append(v) - currentsublabel.setTitle("%s: %s" % (k, v)) - else: - for currentval in v: - currentsublabel = userlink.addSubTopic() - field = "fullname" if k == "name" else k - if field not in supposed_data: - supposed_data[field] = [] - supposed_data[field].append(currentval) - currentsublabel.setTitle("%s: %s" % (k, currentval)) # add supposed data - filterede_supposed_data = filter_supposed_data(supposed_data) - if len(filterede_supposed_data) > 0: + filtered_supposed_data = filter_supposed_data(supposed_data) + if len(filtered_supposed_data) > 0: undefinedsection = root_topic1.addSubTopic() undefinedsection.setTitle("SUPPOSED DATA") - for k, v in filterede_supposed_data.items(): + for k, v in filtered_supposed_data.items(): currentsublabel = undefinedsection.addSubTopic() currentsublabel.setTitle("%s: %s" % (k, v)) diff --git a/maigret/sites.py b/maigret/sites.py index c7c064b..9fd66bb 100644 --- a/maigret/sites.py +++ b/maigret/sites.py @@ -167,6 +167,17 @@ class MaigretSite: return result + def get_url_type(self) -> str: + url = URLMatcher.extract_main_part(self.url) + if url.startswith("{username}"): + url = "SUBDOMAIN" + elif url == "": + url = f"{self.url} ({self.engine})" + else: + parts = url.split("/") + url = "/" + "/".join(parts[1:]) + return url + def update(self, updates: "dict") -> "MaigretSite": self.__dict__.update(updates) self.update_detectors() @@ -405,34 +416,23 @@ class MaigretDatabase: if not sites_dict: sites_dict = self.sites_dict() + urls = {} + tags = {} output = "" disabled_count = 0 total_count = len(sites_dict) - urls = {} - tags = {} for _, site in sites_dict.items(): if site.disabled: disabled_count += 1 - url = URLMatcher.extract_main_part(site.url) - if url.startswith("{username}"): - url = "SUBDOMAIN" - elif url == "": - url = f"{site.url} ({site.engine})" - else: - parts = url.split("/") - url = "/" + "/".join(parts[1:]) - - urls[url] = urls.get(url, 0) + 1 + url_type = site.get_url_type() + urls[url_type] = urls.get(url_type, 0) + 1 if not site.tags: tags["NO_TAGS"] = tags.get("NO_TAGS", 0) + 1 - for tag in site.tags: - if is_country_tag(tag): - # currenty do not display country tags - continue + for tag in filter(lambda x: not is_country_tag(x), site.tags): tags[tag] = tags.get(tag, 0) + 1 output += f"Enabled/total sites: {total_count - disabled_count}/{total_count}\n" @@ -441,8 +441,9 @@ class MaigretDatabase: if count == 1: break output += f"{count}\t{url}\n" + output += "Top sites' tags:\n" - for tag, count in sorted(tags.items(), key=lambda x: x[1], reverse=True): + for tag, count in sorted(tags.items(), key=lambda x: x[1], reverse=True)[:20]: mark = "" if tag not in SUPPORTED_TAGS: mark = " (non-standard)"