Refactoring, test coverage increased to 60% (#1943)

This commit is contained in:
Soxoj
2024-12-08 02:13:28 +01:00
committed by GitHub
parent 4b1317789d
commit c66d776f8a
19 changed files with 326 additions and 226 deletions
+62 -49
View File
@@ -31,7 +31,7 @@ from .executors import (
AsyncioSimpleExecutor,
AsyncioProgressbarQueueExecutor,
)
from .result import QueryResult, QueryStatus
from .result import MaigretCheckResult, MaigretCheckStatus
from .sites import MaigretDatabase, MaigretSite
from .types import QueryOptions, QueryResultWrapper
from .utils import ascii_data_display, get_random_user_agent
@@ -322,7 +322,7 @@ def process_site_result(
break
def build_result(status, **kwargs):
return QueryResult(
return MaigretCheckResult(
username,
site_name,
url,
@@ -334,11 +334,11 @@ def process_site_result(
if check_error:
logger.warning(check_error)
result = QueryResult(
result = MaigretCheckResult(
username,
site_name,
url,
QueryStatus.UNKNOWN,
MaigretCheckStatus.UNKNOWN,
query_time=response_time,
error=check_error,
context=str(CheckError),
@@ -350,15 +350,15 @@ def process_site_result(
[(absence_flag in html_text) for absence_flag in site.absence_strs]
)
if not is_absence_detected and is_presense_detected:
result = build_result(QueryStatus.CLAIMED)
result = build_result(MaigretCheckStatus.CLAIMED)
else:
result = build_result(QueryStatus.AVAILABLE)
result = build_result(MaigretCheckStatus.AVAILABLE)
elif check_type in "status_code":
# Checks if the status code of the response is 2XX
if 200 <= status_code < 300:
result = build_result(QueryStatus.CLAIMED)
result = build_result(MaigretCheckStatus.CLAIMED)
else:
result = build_result(QueryStatus.AVAILABLE)
result = build_result(MaigretCheckStatus.AVAILABLE)
elif check_type == "response_url":
# For this detection method, we have turned off the redirect.
# So, there is no need to check the response URL: it will always
@@ -366,9 +366,9 @@ def process_site_result(
# code indicates that the request was successful (i.e. no 404, or
# forward to some odd redirect).
if 200 <= status_code < 300 and is_presense_detected:
result = build_result(QueryStatus.CLAIMED)
result = build_result(MaigretCheckStatus.CLAIMED)
else:
result = build_result(QueryStatus.AVAILABLE)
result = build_result(MaigretCheckStatus.AVAILABLE)
else:
# It should be impossible to ever get here...
raise ValueError(
@@ -377,33 +377,11 @@ def process_site_result(
extracted_ids_data = {}
if is_parsing_enabled and result.status == QueryStatus.CLAIMED:
try:
extracted_ids_data = extract(html_text)
except Exception as e:
logger.warning(f"Error while parsing {site.name}: {e}", exc_info=True)
if is_parsing_enabled and result.status == MaigretCheckStatus.CLAIMED:
extracted_ids_data = extract_ids_data(html_text, logger, site)
if extracted_ids_data:
new_usernames = {}
for k, v in extracted_ids_data.items():
if "username" in k and not "usernames" in k:
new_usernames[v] = "username"
elif "usernames" in k:
try:
tree = ast.literal_eval(v)
if type(tree) == list:
for n in tree:
new_usernames[n] = "username"
except Exception as e:
logger.warning(e)
if k in SUPPORTED_IDS:
new_usernames[v] = k
results_info["ids_usernames"] = new_usernames
links = ascii_data_display(extracted_ids_data.get("links", "[]"))
if "website" in extracted_ids_data:
links.append(extracted_ids_data["website"])
results_info["ids_links"] = links
new_usernames = parse_usernames(extracted_ids_data, logger)
results_info = update_results_info(results_info, extracted_ids_data, new_usernames)
result.ids_data = extracted_ids_data
# Save status of request
@@ -462,29 +440,29 @@ def make_site_result(
# site check is disabled
if site.disabled and not options['forced']:
logger.debug(f"Site {site.name} is disabled, skipping...")
results_site["status"] = QueryResult(
results_site["status"] = MaigretCheckResult(
username,
site.name,
url,
QueryStatus.ILLEGAL,
MaigretCheckStatus.ILLEGAL,
error=CheckError("Check is disabled"),
)
# current username type could not be applied
elif site.type != options["id_type"]:
results_site["status"] = QueryResult(
results_site["status"] = MaigretCheckResult(
username,
site.name,
url,
QueryStatus.ILLEGAL,
MaigretCheckStatus.ILLEGAL,
error=CheckError('Unsupported identifier type', f'Want "{site.type}"'),
)
# username is not allowed.
elif site.regex_check and re.search(site.regex_check, username) is None:
results_site["status"] = QueryResult(
results_site["status"] = MaigretCheckResult(
username,
site.name,
url,
QueryStatus.ILLEGAL,
MaigretCheckStatus.ILLEGAL,
error=CheckError(
'Unsupported username format', f'Want "{site.regex_check}"'
),
@@ -731,11 +709,11 @@ async def maigret(
continue
default_result: QueryResultWrapper = {
'site': site,
'status': QueryResult(
'status': MaigretCheckResult(
username,
sitename,
'',
QueryStatus.UNKNOWN,
MaigretCheckStatus.UNKNOWN,
error=CheckError('Request failed'),
),
}
@@ -819,8 +797,8 @@ async def site_self_check(
}
check_data = [
(site.username_claimed, QueryStatus.CLAIMED),
(site.username_unclaimed, QueryStatus.AVAILABLE),
(site.username_claimed, MaigretCheckStatus.CLAIMED),
(site.username_unclaimed, MaigretCheckStatus.AVAILABLE),
]
logger.info(f"Checking {site.name}...")
@@ -859,7 +837,7 @@ async def site_self_check(
site_status = result.status
if site_status != status:
if site_status == QueryStatus.UNKNOWN:
if site_status == MaigretCheckStatus.UNKNOWN:
msgs = site.absence_strs
etype = site.check_type
logger.warning(
@@ -871,9 +849,9 @@ async def site_self_check(
if skip_errors:
pass
# don't disable in case of available username
elif status == QueryStatus.CLAIMED:
elif status == MaigretCheckStatus.CLAIMED:
changes["disabled"] = True
elif status == QueryStatus.CLAIMED:
elif status == MaigretCheckStatus.CLAIMED:
logger.warning(
f"Not found `{username}` in {site.name}, must be claimed"
)
@@ -960,3 +938,38 @@ async def self_check(
print(f"Unchecked sites verified: {unchecked_old_count - unchecked_new_count}")
return total_disabled != 0 or unchecked_new_count != unchecked_old_count
def extract_ids_data(html_text, logger, site) -> Dict:
try:
return extract(html_text)
except Exception as e:
logger.warning(f"Error while parsing {site.name}: {e}", exc_info=True)
return {}
def parse_usernames(extracted_ids_data, logger) -> Dict:
new_usernames = {}
for k, v in extracted_ids_data.items():
if "username" in k and not "usernames" in k:
new_usernames[v] = "username"
elif "usernames" in k:
try:
tree = ast.literal_eval(v)
if type(tree) == list:
for n in tree:
new_usernames[n] = "username"
except Exception as e:
logger.warning(e)
if k in SUPPORTED_IDS:
new_usernames[v] = k
return new_usernames
def update_results_info(results_info, extracted_ids_data, new_usernames):
results_info["ids_usernames"] = new_usernames
links = ascii_data_display(extracted_ids_data.get("links", "[]"))
if "website" in extracted_ids_data:
links.append(extracted_ids_data["website"])
results_info["ids_links"] = links
return results_info
+45 -3
View File
@@ -1,6 +1,6 @@
from typing import Dict, List, Any
from typing import Dict, List, Any, Tuple
from .result import QueryResult
from .result import MaigretCheckResult
from .types import QueryResultWrapper
@@ -114,7 +114,7 @@ def extract_and_group(search_res: QueryResultWrapper) -> List[Dict[str, Any]]:
errors_counts: Dict[str, int] = {}
for r in search_res.values():
if r and isinstance(r, dict) and r.get('status'):
if not isinstance(r['status'], QueryResult):
if not isinstance(r['status'], MaigretCheckResult):
continue
err = r['status'].error
@@ -133,3 +133,45 @@ def extract_and_group(search_res: QueryResultWrapper) -> List[Dict[str, Any]]:
)
return counts
def notify_about_errors(
search_results: QueryResultWrapper, query_notify, show_statistics=False
) -> List[Tuple]:
"""
Prepare error notifications in search results, text + symbol,
to be displayed by notify object.
Example:
[
("Too many errors of type "timeout" (50.0%)", "!")
("Verbose error statistics:", "-")
]
"""
results = []
errs = extract_and_group(search_results)
was_errs_displayed = False
for e in errs:
if not is_important(e):
continue
text = f'Too many errors of type "{e["err"]}" ({round(e["perc"],2)}%)'
solution = solution_of(e['err'])
if solution:
text = '. '.join([text, solution.capitalize()])
results.append((text, '!'))
was_errs_displayed = True
if show_statistics:
results.append(('Verbose error statistics:', '-'))
for e in errs:
text = f'{e["err"]}: {round(e["perc"],2)}%'
results.append((text, '!'))
if was_errs_displayed:
results.append(
('You can see detailed site check errors with a flag `--print-errors`', '-')
)
return results
+3 -29
View File
@@ -45,34 +45,6 @@ from .settings import Settings
from .permutator import Permute
def notify_about_errors(
search_results: QueryResultWrapper, query_notify, show_statistics=False
):
errs = errors.extract_and_group(search_results)
was_errs_displayed = False
for e in errs:
if not errors.is_important(e):
continue
text = f'Too many errors of type "{e["err"]}" ({round(e["perc"],2)}%)'
solution = errors.solution_of(e['err'])
if solution:
text = '. '.join([text, solution.capitalize()])
query_notify.warning(text, '!')
was_errs_displayed = True
if show_statistics:
query_notify.warning(f'Verbose error statistics:')
for e in errs:
text = f'{e["err"]}: {round(e["perc"],2)}%'
query_notify.warning(text, '!')
if was_errs_displayed:
query_notify.warning(
'You can see detailed site check errors with a flag `--print-errors`'
)
def extract_ids_from_page(url, logger, timeout=5) -> dict:
results = {}
# url, headers
@@ -693,7 +665,9 @@ async def main():
check_domains=args.with_domains,
)
notify_about_errors(results, query_notify, show_statistics=args.verbose)
errs = errors.notify_about_errors(results, query_notify, show_statistics=args.verbose)
for e in errs:
query_notify.warning(*e)
if args.reports_sorting == "data":
results = sort_report_by_data_points(results)
+5 -5
View File
@@ -8,7 +8,7 @@ import sys
from colorama import Fore, Style, init
from .result import QueryStatus
from .result import MaigretCheckStatus
from .utils import get_dict_ascii_tree
@@ -245,7 +245,7 @@ class QueryNotifyPrint(QueryNotify):
ids_data_text = get_dict_ascii_tree(self.result.ids_data.items(), " ")
# Output to the terminal is desired.
if result.status == QueryStatus.CLAIMED:
if result.status == MaigretCheckStatus.CLAIMED:
color = Fore.BLUE if is_similar else Fore.GREEN
status = "?" if is_similar else "+"
notify = self.make_terminal_notify(
@@ -255,7 +255,7 @@ class QueryNotifyPrint(QueryNotify):
color,
result.site_url_user + ids_data_text,
)
elif result.status == QueryStatus.AVAILABLE:
elif result.status == MaigretCheckStatus.AVAILABLE:
if not self.print_found_only:
notify = self.make_terminal_notify(
"-",
@@ -264,7 +264,7 @@ class QueryNotifyPrint(QueryNotify):
Fore.YELLOW,
"Not found!" + ids_data_text,
)
elif result.status == QueryStatus.UNKNOWN:
elif result.status == MaigretCheckStatus.UNKNOWN:
if not self.skip_check_errors:
notify = self.make_terminal_notify(
"?",
@@ -273,7 +273,7 @@ class QueryNotifyPrint(QueryNotify):
Fore.RED,
str(self.result.error) + ids_data_text,
)
elif result.status == QueryStatus.ILLEGAL:
elif result.status == MaigretCheckStatus.ILLEGAL:
if not self.print_found_only:
text = "Illegal Username Format For This Site!"
notify = self.make_terminal_notify(
+6 -6
View File
@@ -13,7 +13,7 @@ from dateutil.parser import parse as parse_datetime_str
from jinja2 import Template
from .checking import SUPPORTED_IDS
from .result import QueryStatus
from .result import MaigretCheckStatus
from .sites import MaigretDatabase
from .utils import is_country_tag, CaseConverter, enrich_link_str
@@ -142,7 +142,7 @@ def save_graph_report(filename: str, username_results: list, db: MaigretDatabase
if not status: # FIXME: currently in case of timeout
continue
if dictionary["status"].status != QueryStatus.CLAIMED:
if dictionary["status"].status != MaigretCheckStatus.CLAIMED:
continue
site_fallback_name = dictionary.get(
@@ -341,7 +341,7 @@ def generate_report_context(username_results: list):
new_ids.append((u, utype))
usernames[u] = {"type": utype}
if status.status == QueryStatus.CLAIMED:
if status.status == MaigretCheckStatus.CLAIMED:
found_accounts += 1
dictionary["found"] = True
else:
@@ -421,7 +421,7 @@ def generate_txt_report(username: str, results: dict, file):
continue
if (
dictionary.get("status")
and dictionary["status"].status == QueryStatus.CLAIMED
and dictionary["status"].status == MaigretCheckStatus.CLAIMED
):
exists_counter += 1
file.write(dictionary["url_user"] + "\n")
@@ -438,7 +438,7 @@ def generate_json_report(username: str, results: dict, file, report_type):
if not site_result or not site_result.get("status"):
continue
if site_result["status"].status != QueryStatus.CLAIMED:
if site_result["status"].status != MaigretCheckStatus.CLAIMED:
continue
data = dict(site_result)
@@ -499,7 +499,7 @@ def design_xmind_sheet(sheet, username, results):
continue
result_status = dictionary.get("status")
# TODO: fix the reason
if not result_status or result_status.status != QueryStatus.CLAIMED:
if not result_status or result_status.status != MaigretCheckStatus.CLAIMED:
continue
stripped_tags = list(map(lambda x: x.strip(), result_status.tags))
+1 -1
View File
@@ -17376,7 +17376,7 @@
"video"
],
"headers": {
"Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MzM0NDE4ODAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbCwianRpIjoiYzRlNDQ4ZTgtZmFmNC00OWY1LTkyYmMtZWVmZWMzNWNlOTM1In0.nm4mnYvn8hm3u5gfNXh1r451U-R5O2MFOqz40DqixQo"
"Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MzM2MTc5MjAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbCwianRpIjoiNGYxM2M4N2ItYWMwMy00Y2JhLWExMDctNmNiODhmM2U3NjZjIn0.Y7CWEWckdSMsmJ8ROPmhHR6el2QCYJRDl0RLPpdJOKc"
},
"activation": {
"url": "https://vimeo.com/_rv/viewer",
+6 -11
View File
@@ -6,7 +6,7 @@ This module defines various objects for recording the results of queries.
from enum import Enum
class QueryStatus(Enum):
class MaigretCheckStatus(Enum):
"""Query Status Enumeration.
Describes status of query about a given username.
@@ -29,10 +29,9 @@ class QueryStatus(Enum):
return self.value
class QueryResult:
"""Query Result Object.
Describes result of query about a given username.
class MaigretCheckResult:
"""
Describes result of checking a given username on a given site
"""
def __init__(
@@ -47,11 +46,7 @@ class QueryResult:
error=None,
tags=[],
):
"""Create Query Result Object.
Contains information about a specific method of detecting usernames on
a given type of web sites.
"""
Keyword Arguments:
self -- This object.
username -- String indicating username that query result
@@ -98,7 +93,7 @@ class QueryResult:
}
def is_found(self):
return self.status == QueryStatus.CLAIMED
return self.status == MaigretCheckStatus.CLAIMED
def __str__(self):
"""Convert Object To String.
+7 -3
View File
@@ -9,11 +9,12 @@ import cloudscraper
from colorama import Fore, Style
from .activation import import_aiohttp_cookies
from .result import QueryResult
from .result import MaigretCheckResult
from .settings import Settings
from .sites import MaigretDatabase, MaigretEngine, MaigretSite
from .utils import get_random_user_agent
from .checking import site_self_check
from .utils import get_match_ratio
class CloudflareSession:
@@ -73,6 +74,9 @@ class Submitter:
@staticmethod
def get_alexa_rank(site_url_main):
import requests
import xml.etree.ElementTree as ElementTree
url = f"http://data.alexa.com/data?cli=10&url={site_url_main}"
xml_data = requests.get(url).text
root = ElementTree.fromstring(xml_data)
@@ -91,7 +95,7 @@ class Submitter:
async def site_self_check(self, site, semaphore, silent=False):
# Call the general function from the checking.py
changes = await checking_site_self_check(
changes = await site_self_check(
site=site,
logger=self.logger,
semaphore=semaphore,