mirror of
https://github.com/soxoj/maigret.git
synced 2026-05-07 06:24:35 +00:00
Refactored to decrease cyclomatic complexity
This commit is contained in:
+42
-41
@@ -54,10 +54,9 @@ async def get_response(request_future, logger) -> Tuple[str, int, Optional[Check
|
||||
decoded_content = response_content.decode(charset, "ignore")
|
||||
html_text = decoded_content
|
||||
|
||||
error = None
|
||||
if status_code == 0:
|
||||
error = CheckError("Connection lost")
|
||||
else:
|
||||
error = None
|
||||
|
||||
logger.debug(html_text)
|
||||
|
||||
@@ -73,11 +72,10 @@ async def get_response(request_future, logger) -> Tuple[str, int, Optional[Check
|
||||
error = CheckError("Interrupted")
|
||||
except Exception as e:
|
||||
# python-specific exceptions
|
||||
if sys.version_info.minor > 6:
|
||||
if isinstance(e, ssl.SSLCertVerificationError) or isinstance(
|
||||
e, ssl.SSLError
|
||||
):
|
||||
error = CheckError("SSL", str(e))
|
||||
if sys.version_info.minor > 6 and (
|
||||
isinstance(e, ssl.SSLCertVerificationError) or isinstance(e, ssl.SSLError)
|
||||
):
|
||||
error = CheckError("SSL", str(e))
|
||||
else:
|
||||
logger.debug(e, exc_info=True)
|
||||
error = CheckError("Unexpected", str(e))
|
||||
@@ -109,6 +107,14 @@ def detect_error_page(
|
||||
return None
|
||||
|
||||
|
||||
def debug_response_logging(url, html_text, status_code, check_error):
|
||||
with open("debug.log", "a") as f:
|
||||
status = status_code or "No response"
|
||||
f.write(f"url: {url}\nerror: {check_error}\nr: {status}\n")
|
||||
if html_text:
|
||||
f.write(f"code: {status}\nresponse: {str(html_text)}\n")
|
||||
|
||||
|
||||
def process_site_result(
|
||||
response, query_notify, logger, results_info: QueryResultWrapper, site: MaigretSite
|
||||
):
|
||||
@@ -142,11 +148,7 @@ def process_site_result(
|
||||
response_time = None
|
||||
|
||||
if logger.level == logging.DEBUG:
|
||||
with open("debug.txt", "a") as f:
|
||||
status = status_code or "No response"
|
||||
f.write(f"url: {url}\nerror: {check_error}\nr: {status}\n")
|
||||
if html_text:
|
||||
f.write(f"code: {status}\nresponse: {str(html_text)}\n")
|
||||
debug_response_logging(url, html_text, status_code, check_error)
|
||||
|
||||
# additional check for errors
|
||||
if status_code and not check_error:
|
||||
@@ -154,29 +156,34 @@ def process_site_result(
|
||||
html_text, status_code, site.errors, site.ignore403
|
||||
)
|
||||
|
||||
if site.activation and html_text:
|
||||
is_need_activation = any(
|
||||
[s for s in site.activation["marks"] if s in html_text]
|
||||
)
|
||||
if is_need_activation:
|
||||
method = site.activation["method"]
|
||||
try:
|
||||
activate_fun = getattr(ParsingActivator(), method)
|
||||
# TODO: async call
|
||||
activate_fun(site, logger)
|
||||
except AttributeError:
|
||||
logger.warning(
|
||||
f"Activation method {method} for site {site.name} not found!"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed activation {method} for site {site.name}: {str(e)}", exc_info=True)
|
||||
# TODO: temporary check error
|
||||
# parsing activation
|
||||
is_need_activation = any(
|
||||
[s for s in site.activation.get("marks", []) if s in html_text]
|
||||
)
|
||||
|
||||
if site.activation and html_text and is_need_activation:
|
||||
method = site.activation["method"]
|
||||
try:
|
||||
activate_fun = getattr(ParsingActivator(), method)
|
||||
# TODO: async call
|
||||
activate_fun(site, logger)
|
||||
except AttributeError:
|
||||
logger.warning(
|
||||
f"Activation method {method} for site {site.name} not found!"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed activation {method} for site {site.name}: {str(e)}",
|
||||
exc_info=True,
|
||||
)
|
||||
# TODO: temporary check error
|
||||
|
||||
site_name = site.pretty_name
|
||||
# presense flags
|
||||
# True by default
|
||||
presense_flags = site.presense_strs
|
||||
is_presense_detected = False
|
||||
|
||||
if html_text:
|
||||
if not presense_flags:
|
||||
is_presense_detected = True
|
||||
@@ -263,9 +270,6 @@ def process_site_result(
|
||||
results_info["ids_links"] = eval(extracted_ids_data.get("links", "[]"))
|
||||
result.ids_data = extracted_ids_data
|
||||
|
||||
# Notify caller about results of query.
|
||||
query_notify.update(result, site.similar_search)
|
||||
|
||||
# Save status of request
|
||||
results_info["status"] = result
|
||||
|
||||
@@ -413,6 +417,8 @@ async def check_site_for_username(
|
||||
response, query_notify, logger, default_result, site
|
||||
)
|
||||
|
||||
query_notify.update(response_result['status'], site.similar_search)
|
||||
|
||||
return site.name, response_result
|
||||
|
||||
|
||||
@@ -617,15 +623,10 @@ async def site_self_check(
|
||||
"disabled": False,
|
||||
}
|
||||
|
||||
try:
|
||||
check_data = [
|
||||
(site.username_claimed, QueryStatus.CLAIMED),
|
||||
(site.username_unclaimed, QueryStatus.AVAILABLE),
|
||||
]
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
logger.error(site.__dict__)
|
||||
check_data = []
|
||||
check_data = [
|
||||
(site.username_claimed, QueryStatus.CLAIMED),
|
||||
(site.username_unclaimed, QueryStatus.AVAILABLE),
|
||||
]
|
||||
|
||||
logger.info(f"Checking {site.name}...")
|
||||
|
||||
|
||||
+3
-1
@@ -54,7 +54,9 @@ COMMON_ERRORS = {
|
||||
'Censorship', 'MGTS'
|
||||
),
|
||||
'Incapsula incident ID': CheckError('Bot protection', 'Incapsula'),
|
||||
'Сайт заблокирован хостинг-провайдером': CheckError('Site-specific', 'Site is disabled (Beget)'),
|
||||
'Сайт заблокирован хостинг-провайдером': CheckError(
|
||||
'Site-specific', 'Site is disabled (Beget)'
|
||||
),
|
||||
}
|
||||
|
||||
ERRORS_TYPES = {
|
||||
|
||||
+28
-34
@@ -152,6 +152,27 @@ class QueryNotifyPrint(QueryNotify):
|
||||
|
||||
return
|
||||
|
||||
def make_colored_terminal_notify(
|
||||
self, status, text, status_color, text_color, appendix
|
||||
):
|
||||
text = [
|
||||
f"{Style.BRIGHT}{Fore.WHITE}[{status_color}{status}{Fore.WHITE}]"
|
||||
+ f"{text_color} {text}: {Style.RESET_ALL}"
|
||||
+ f"{appendix}"
|
||||
]
|
||||
return "".join(text)
|
||||
|
||||
def make_simple_terminal_notify(
|
||||
self, status, text, status_color, text_color, appendix
|
||||
):
|
||||
return f"[{status}] {text}: {appendix}"
|
||||
|
||||
def make_terminal_notify(self, *args):
|
||||
if self.color:
|
||||
return self.make_colored_terminal_notify(*args)
|
||||
else:
|
||||
return self.make_simple_terminal_notify(*args)
|
||||
|
||||
def start(self, message, id_type):
|
||||
"""Notify Start.
|
||||
|
||||
@@ -204,40 +225,18 @@ class QueryNotifyPrint(QueryNotify):
|
||||
Return Value:
|
||||
Nothing.
|
||||
"""
|
||||
notify = None
|
||||
self.result = result
|
||||
|
||||
if not self.result.ids_data:
|
||||
ids_data_text = ""
|
||||
else:
|
||||
ids_data_text = ""
|
||||
if self.result.ids_data:
|
||||
ids_data_text = get_dict_ascii_tree(self.result.ids_data.items(), " ")
|
||||
|
||||
def make_colored_terminal_notify(
|
||||
status, text, status_color, text_color, appendix
|
||||
):
|
||||
text = [
|
||||
f"{Style.BRIGHT}{Fore.WHITE}[{status_color}{status}{Fore.WHITE}]"
|
||||
+ f"{text_color} {text}: {Style.RESET_ALL}"
|
||||
+ f"{appendix}"
|
||||
]
|
||||
return "".join(text)
|
||||
|
||||
def make_simple_terminal_notify(status, text, appendix):
|
||||
return f"[{status}] {text}: {appendix}"
|
||||
|
||||
def make_terminal_notify(is_colored=True, *args):
|
||||
if is_colored:
|
||||
return make_colored_terminal_notify(*args)
|
||||
else:
|
||||
return make_simple_terminal_notify(*args)
|
||||
|
||||
notify = None
|
||||
|
||||
# Output to the terminal is desired.
|
||||
if result.status == QueryStatus.CLAIMED:
|
||||
color = Fore.BLUE if is_similar else Fore.GREEN
|
||||
status = "?" if is_similar else "+"
|
||||
notify = make_terminal_notify(
|
||||
self.color,
|
||||
notify = self.make_terminal_notify(
|
||||
status,
|
||||
result.site_name,
|
||||
color,
|
||||
@@ -246,8 +245,7 @@ class QueryNotifyPrint(QueryNotify):
|
||||
)
|
||||
elif result.status == QueryStatus.AVAILABLE:
|
||||
if not self.print_found_only:
|
||||
notify = make_terminal_notify(
|
||||
self.color,
|
||||
notify = self.make_terminal_notify(
|
||||
"-",
|
||||
result.site_name,
|
||||
Fore.RED,
|
||||
@@ -256,8 +254,7 @@ class QueryNotifyPrint(QueryNotify):
|
||||
)
|
||||
elif result.status == QueryStatus.UNKNOWN:
|
||||
if not self.skip_check_errors:
|
||||
notify = make_terminal_notify(
|
||||
self.color,
|
||||
notify = self.make_terminal_notify(
|
||||
"?",
|
||||
result.site_name,
|
||||
Fore.RED,
|
||||
@@ -267,8 +264,7 @@ class QueryNotifyPrint(QueryNotify):
|
||||
elif result.status == QueryStatus.ILLEGAL:
|
||||
if not self.print_found_only:
|
||||
text = "Illegal Username Format For This Site!"
|
||||
notify = make_terminal_notify(
|
||||
self.color,
|
||||
notify = self.make_terminal_notify(
|
||||
"-",
|
||||
result.site_name,
|
||||
Fore.RED,
|
||||
@@ -286,8 +282,6 @@ class QueryNotifyPrint(QueryNotify):
|
||||
sys.stdout.write("\x1b[1K\r")
|
||||
print(notify)
|
||||
|
||||
return
|
||||
|
||||
def __str__(self):
|
||||
"""Convert Object To String.
|
||||
|
||||
|
||||
+40
-44
@@ -293,11 +293,20 @@ def save_xmind_report(filename, username, results):
|
||||
os.remove(filename)
|
||||
workbook = xmind.load(filename)
|
||||
sheet = workbook.getPrimarySheet()
|
||||
design_sheet(sheet, username, results)
|
||||
design_xmind_sheet(sheet, username, results)
|
||||
xmind.save(workbook, path=filename)
|
||||
|
||||
|
||||
def design_sheet(sheet, username, results):
|
||||
def add_xmind_subtopic(userlink, k, v, supposed_data):
|
||||
currentsublabel = userlink.addSubTopic()
|
||||
field = "fullname" if k == "name" else k
|
||||
if field not in supposed_data:
|
||||
supposed_data[field] = []
|
||||
supposed_data[field].append(v)
|
||||
currentsublabel.setTitle("%s: %s" % (k, v))
|
||||
|
||||
|
||||
def design_xmind_sheet(sheet, username, results):
|
||||
alltags = {}
|
||||
supposed_data = {}
|
||||
|
||||
@@ -311,56 +320,43 @@ def design_sheet(sheet, username, results):
|
||||
|
||||
for website_name in results:
|
||||
dictionary = results[website_name]
|
||||
result_status = dictionary.get("status")
|
||||
if result_status.status != QueryStatus.CLAIMED:
|
||||
continue
|
||||
|
||||
if dictionary.get("status").status == QueryStatus.CLAIMED:
|
||||
# firsttime I found that entry
|
||||
for tag in dictionary.get("status").tags:
|
||||
if tag.strip() == "":
|
||||
continue
|
||||
if tag not in alltags.keys():
|
||||
if not is_country_tag(tag):
|
||||
tagsection = root_topic1.addSubTopic()
|
||||
tagsection.setTitle(tag)
|
||||
alltags[tag] = tagsection
|
||||
stripped_tags = list(map(lambda x: x.strip(), result_status.tags))
|
||||
normalized_tags = list(
|
||||
filter(lambda x: x and not is_country_tag(x), stripped_tags)
|
||||
)
|
||||
|
||||
category = None
|
||||
for tag in dictionary.get("status").tags:
|
||||
if tag.strip() == "":
|
||||
continue
|
||||
if not is_country_tag(tag):
|
||||
category = tag
|
||||
category = None
|
||||
for tag in normalized_tags:
|
||||
if tag in alltags.keys():
|
||||
continue
|
||||
tagsection = root_topic1.addSubTopic()
|
||||
tagsection.setTitle(tag)
|
||||
alltags[tag] = tagsection
|
||||
category = tag
|
||||
|
||||
if category is None:
|
||||
userlink = undefinedsection.addSubTopic()
|
||||
userlink.addLabel(dictionary.get("status").site_url_user)
|
||||
section = alltags[category] if category else undefinedsection
|
||||
userlink = section.addSubTopic()
|
||||
userlink.addLabel(result_status.site_url_user)
|
||||
|
||||
ids_data = result_status.ids_data or {}
|
||||
for k, v in ids_data.items():
|
||||
# suppose target data
|
||||
if isinstance(v, list):
|
||||
for currentval in v:
|
||||
add_xmind_subtopic(userlink, k, currentval, supposed_data)
|
||||
else:
|
||||
userlink = alltags[category].addSubTopic()
|
||||
userlink.addLabel(dictionary.get("status").site_url_user)
|
||||
add_xmind_subtopic(userlink, k, v, supposed_data)
|
||||
|
||||
if dictionary.get("status").ids_data:
|
||||
for k, v in dictionary.get("status").ids_data.items():
|
||||
# suppose target data
|
||||
if not isinstance(v, list):
|
||||
currentsublabel = userlink.addSubTopic()
|
||||
field = "fullname" if k == "name" else k
|
||||
if field not in supposed_data:
|
||||
supposed_data[field] = []
|
||||
supposed_data[field].append(v)
|
||||
currentsublabel.setTitle("%s: %s" % (k, v))
|
||||
else:
|
||||
for currentval in v:
|
||||
currentsublabel = userlink.addSubTopic()
|
||||
field = "fullname" if k == "name" else k
|
||||
if field not in supposed_data:
|
||||
supposed_data[field] = []
|
||||
supposed_data[field].append(currentval)
|
||||
currentsublabel.setTitle("%s: %s" % (k, currentval))
|
||||
# add supposed data
|
||||
filterede_supposed_data = filter_supposed_data(supposed_data)
|
||||
if len(filterede_supposed_data) > 0:
|
||||
filtered_supposed_data = filter_supposed_data(supposed_data)
|
||||
if len(filtered_supposed_data) > 0:
|
||||
undefinedsection = root_topic1.addSubTopic()
|
||||
undefinedsection.setTitle("SUPPOSED DATA")
|
||||
for k, v in filterede_supposed_data.items():
|
||||
for k, v in filtered_supposed_data.items():
|
||||
currentsublabel = undefinedsection.addSubTopic()
|
||||
currentsublabel.setTitle("%s: %s" % (k, v))
|
||||
|
||||
|
||||
+18
-17
@@ -167,6 +167,17 @@ class MaigretSite:
|
||||
|
||||
return result
|
||||
|
||||
def get_url_type(self) -> str:
|
||||
url = URLMatcher.extract_main_part(self.url)
|
||||
if url.startswith("{username}"):
|
||||
url = "SUBDOMAIN"
|
||||
elif url == "":
|
||||
url = f"{self.url} ({self.engine})"
|
||||
else:
|
||||
parts = url.split("/")
|
||||
url = "/" + "/".join(parts[1:])
|
||||
return url
|
||||
|
||||
def update(self, updates: "dict") -> "MaigretSite":
|
||||
self.__dict__.update(updates)
|
||||
self.update_detectors()
|
||||
@@ -405,34 +416,23 @@ class MaigretDatabase:
|
||||
if not sites_dict:
|
||||
sites_dict = self.sites_dict()
|
||||
|
||||
urls = {}
|
||||
tags = {}
|
||||
output = ""
|
||||
disabled_count = 0
|
||||
total_count = len(sites_dict)
|
||||
urls = {}
|
||||
tags = {}
|
||||
|
||||
for _, site in sites_dict.items():
|
||||
if site.disabled:
|
||||
disabled_count += 1
|
||||
|
||||
url = URLMatcher.extract_main_part(site.url)
|
||||
if url.startswith("{username}"):
|
||||
url = "SUBDOMAIN"
|
||||
elif url == "":
|
||||
url = f"{site.url} ({site.engine})"
|
||||
else:
|
||||
parts = url.split("/")
|
||||
url = "/" + "/".join(parts[1:])
|
||||
|
||||
urls[url] = urls.get(url, 0) + 1
|
||||
url_type = site.get_url_type()
|
||||
urls[url_type] = urls.get(url_type, 0) + 1
|
||||
|
||||
if not site.tags:
|
||||
tags["NO_TAGS"] = tags.get("NO_TAGS", 0) + 1
|
||||
|
||||
for tag in site.tags:
|
||||
if is_country_tag(tag):
|
||||
# currenty do not display country tags
|
||||
continue
|
||||
for tag in filter(lambda x: not is_country_tag(x), site.tags):
|
||||
tags[tag] = tags.get(tag, 0) + 1
|
||||
|
||||
output += f"Enabled/total sites: {total_count - disabled_count}/{total_count}\n"
|
||||
@@ -441,8 +441,9 @@ class MaigretDatabase:
|
||||
if count == 1:
|
||||
break
|
||||
output += f"{count}\t{url}\n"
|
||||
|
||||
output += "Top sites' tags:\n"
|
||||
for tag, count in sorted(tags.items(), key=lambda x: x[1], reverse=True):
|
||||
for tag, count in sorted(tags.items(), key=lambda x: x[1], reverse=True)[:20]:
|
||||
mark = ""
|
||||
if tag not in SUPPORTED_TAGS:
|
||||
mark = " (non-standard)"
|
||||
|
||||
Reference in New Issue
Block a user