Merge pull request #124 from soxoj/refactoring-complexity-decrease

Refactored to decrease cyclomatic complexity
This commit is contained in:
soxoj
2021-05-05 10:59:11 +03:00
committed by GitHub
5 changed files with 131 additions and 137 deletions
+42 -41
View File
@@ -54,10 +54,9 @@ async def get_response(request_future, logger) -> Tuple[str, int, Optional[Check
decoded_content = response_content.decode(charset, "ignore") decoded_content = response_content.decode(charset, "ignore")
html_text = decoded_content html_text = decoded_content
error = None
if status_code == 0: if status_code == 0:
error = CheckError("Connection lost") error = CheckError("Connection lost")
else:
error = None
logger.debug(html_text) logger.debug(html_text)
@@ -73,11 +72,10 @@ async def get_response(request_future, logger) -> Tuple[str, int, Optional[Check
error = CheckError("Interrupted") error = CheckError("Interrupted")
except Exception as e: except Exception as e:
# python-specific exceptions # python-specific exceptions
if sys.version_info.minor > 6: if sys.version_info.minor > 6 and (
if isinstance(e, ssl.SSLCertVerificationError) or isinstance( isinstance(e, ssl.SSLCertVerificationError) or isinstance(e, ssl.SSLError)
e, ssl.SSLError ):
): error = CheckError("SSL", str(e))
error = CheckError("SSL", str(e))
else: else:
logger.debug(e, exc_info=True) logger.debug(e, exc_info=True)
error = CheckError("Unexpected", str(e)) error = CheckError("Unexpected", str(e))
@@ -109,6 +107,14 @@ def detect_error_page(
return None return None
def debug_response_logging(url, html_text, status_code, check_error):
with open("debug.log", "a") as f:
status = status_code or "No response"
f.write(f"url: {url}\nerror: {check_error}\nr: {status}\n")
if html_text:
f.write(f"code: {status}\nresponse: {str(html_text)}\n")
def process_site_result( def process_site_result(
response, query_notify, logger, results_info: QueryResultWrapper, site: MaigretSite response, query_notify, logger, results_info: QueryResultWrapper, site: MaigretSite
): ):
@@ -142,11 +148,7 @@ def process_site_result(
response_time = None response_time = None
if logger.level == logging.DEBUG: if logger.level == logging.DEBUG:
with open("debug.txt", "a") as f: debug_response_logging(url, html_text, status_code, check_error)
status = status_code or "No response"
f.write(f"url: {url}\nerror: {check_error}\nr: {status}\n")
if html_text:
f.write(f"code: {status}\nresponse: {str(html_text)}\n")
# additional check for errors # additional check for errors
if status_code and not check_error: if status_code and not check_error:
@@ -154,29 +156,34 @@ def process_site_result(
html_text, status_code, site.errors, site.ignore403 html_text, status_code, site.errors, site.ignore403
) )
if site.activation and html_text: # parsing activation
is_need_activation = any( is_need_activation = any(
[s for s in site.activation["marks"] if s in html_text] [s for s in site.activation.get("marks", []) if s in html_text]
) )
if is_need_activation:
method = site.activation["method"] if site.activation and html_text and is_need_activation:
try: method = site.activation["method"]
activate_fun = getattr(ParsingActivator(), method) try:
# TODO: async call activate_fun = getattr(ParsingActivator(), method)
activate_fun(site, logger) # TODO: async call
except AttributeError: activate_fun(site, logger)
logger.warning( except AttributeError:
f"Activation method {method} for site {site.name} not found!" logger.warning(
) f"Activation method {method} for site {site.name} not found!"
except Exception as e: )
logger.warning(f"Failed activation {method} for site {site.name}: {str(e)}", exc_info=True) except Exception as e:
# TODO: temporary check error logger.warning(
f"Failed activation {method} for site {site.name}: {str(e)}",
exc_info=True,
)
# TODO: temporary check error
site_name = site.pretty_name site_name = site.pretty_name
# presense flags # presense flags
# True by default # True by default
presense_flags = site.presense_strs presense_flags = site.presense_strs
is_presense_detected = False is_presense_detected = False
if html_text: if html_text:
if not presense_flags: if not presense_flags:
is_presense_detected = True is_presense_detected = True
@@ -263,9 +270,6 @@ def process_site_result(
results_info["ids_links"] = eval(extracted_ids_data.get("links", "[]")) results_info["ids_links"] = eval(extracted_ids_data.get("links", "[]"))
result.ids_data = extracted_ids_data result.ids_data = extracted_ids_data
# Notify caller about results of query.
query_notify.update(result, site.similar_search)
# Save status of request # Save status of request
results_info["status"] = result results_info["status"] = result
@@ -413,6 +417,8 @@ async def check_site_for_username(
response, query_notify, logger, default_result, site response, query_notify, logger, default_result, site
) )
query_notify.update(response_result['status'], site.similar_search)
return site.name, response_result return site.name, response_result
@@ -617,15 +623,10 @@ async def site_self_check(
"disabled": False, "disabled": False,
} }
try: check_data = [
check_data = [ (site.username_claimed, QueryStatus.CLAIMED),
(site.username_claimed, QueryStatus.CLAIMED), (site.username_unclaimed, QueryStatus.AVAILABLE),
(site.username_unclaimed, QueryStatus.AVAILABLE), ]
]
except Exception as e:
logger.error(e)
logger.error(site.__dict__)
check_data = []
logger.info(f"Checking {site.name}...") logger.info(f"Checking {site.name}...")
+3 -1
View File
@@ -54,7 +54,9 @@ COMMON_ERRORS = {
'Censorship', 'MGTS' 'Censorship', 'MGTS'
), ),
'Incapsula incident ID': CheckError('Bot protection', 'Incapsula'), 'Incapsula incident ID': CheckError('Bot protection', 'Incapsula'),
'Сайт заблокирован хостинг-провайдером': CheckError('Site-specific', 'Site is disabled (Beget)'), 'Сайт заблокирован хостинг-провайдером': CheckError(
'Site-specific', 'Site is disabled (Beget)'
),
} }
ERRORS_TYPES = { ERRORS_TYPES = {
+28 -34
View File
@@ -152,6 +152,27 @@ class QueryNotifyPrint(QueryNotify):
return return
def make_colored_terminal_notify(
self, status, text, status_color, text_color, appendix
):
text = [
f"{Style.BRIGHT}{Fore.WHITE}[{status_color}{status}{Fore.WHITE}]"
+ f"{text_color} {text}: {Style.RESET_ALL}"
+ f"{appendix}"
]
return "".join(text)
def make_simple_terminal_notify(
self, status, text, status_color, text_color, appendix
):
return f"[{status}] {text}: {appendix}"
def make_terminal_notify(self, *args):
if self.color:
return self.make_colored_terminal_notify(*args)
else:
return self.make_simple_terminal_notify(*args)
def start(self, message, id_type): def start(self, message, id_type):
"""Notify Start. """Notify Start.
@@ -204,40 +225,18 @@ class QueryNotifyPrint(QueryNotify):
Return Value: Return Value:
Nothing. Nothing.
""" """
notify = None
self.result = result self.result = result
if not self.result.ids_data: ids_data_text = ""
ids_data_text = "" if self.result.ids_data:
else:
ids_data_text = get_dict_ascii_tree(self.result.ids_data.items(), " ") ids_data_text = get_dict_ascii_tree(self.result.ids_data.items(), " ")
def make_colored_terminal_notify(
status, text, status_color, text_color, appendix
):
text = [
f"{Style.BRIGHT}{Fore.WHITE}[{status_color}{status}{Fore.WHITE}]"
+ f"{text_color} {text}: {Style.RESET_ALL}"
+ f"{appendix}"
]
return "".join(text)
def make_simple_terminal_notify(status, text, appendix):
return f"[{status}] {text}: {appendix}"
def make_terminal_notify(is_colored=True, *args):
if is_colored:
return make_colored_terminal_notify(*args)
else:
return make_simple_terminal_notify(*args)
notify = None
# Output to the terminal is desired. # Output to the terminal is desired.
if result.status == QueryStatus.CLAIMED: if result.status == QueryStatus.CLAIMED:
color = Fore.BLUE if is_similar else Fore.GREEN color = Fore.BLUE if is_similar else Fore.GREEN
status = "?" if is_similar else "+" status = "?" if is_similar else "+"
notify = make_terminal_notify( notify = self.make_terminal_notify(
self.color,
status, status,
result.site_name, result.site_name,
color, color,
@@ -246,8 +245,7 @@ class QueryNotifyPrint(QueryNotify):
) )
elif result.status == QueryStatus.AVAILABLE: elif result.status == QueryStatus.AVAILABLE:
if not self.print_found_only: if not self.print_found_only:
notify = make_terminal_notify( notify = self.make_terminal_notify(
self.color,
"-", "-",
result.site_name, result.site_name,
Fore.RED, Fore.RED,
@@ -256,8 +254,7 @@ class QueryNotifyPrint(QueryNotify):
) )
elif result.status == QueryStatus.UNKNOWN: elif result.status == QueryStatus.UNKNOWN:
if not self.skip_check_errors: if not self.skip_check_errors:
notify = make_terminal_notify( notify = self.make_terminal_notify(
self.color,
"?", "?",
result.site_name, result.site_name,
Fore.RED, Fore.RED,
@@ -267,8 +264,7 @@ class QueryNotifyPrint(QueryNotify):
elif result.status == QueryStatus.ILLEGAL: elif result.status == QueryStatus.ILLEGAL:
if not self.print_found_only: if not self.print_found_only:
text = "Illegal Username Format For This Site!" text = "Illegal Username Format For This Site!"
notify = make_terminal_notify( notify = self.make_terminal_notify(
self.color,
"-", "-",
result.site_name, result.site_name,
Fore.RED, Fore.RED,
@@ -286,8 +282,6 @@ class QueryNotifyPrint(QueryNotify):
sys.stdout.write("\x1b[1K\r") sys.stdout.write("\x1b[1K\r")
print(notify) print(notify)
return
def __str__(self): def __str__(self):
"""Convert Object To String. """Convert Object To String.
+40 -44
View File
@@ -293,11 +293,20 @@ def save_xmind_report(filename, username, results):
os.remove(filename) os.remove(filename)
workbook = xmind.load(filename) workbook = xmind.load(filename)
sheet = workbook.getPrimarySheet() sheet = workbook.getPrimarySheet()
design_sheet(sheet, username, results) design_xmind_sheet(sheet, username, results)
xmind.save(workbook, path=filename) xmind.save(workbook, path=filename)
def design_sheet(sheet, username, results): def add_xmind_subtopic(userlink, k, v, supposed_data):
currentsublabel = userlink.addSubTopic()
field = "fullname" if k == "name" else k
if field not in supposed_data:
supposed_data[field] = []
supposed_data[field].append(v)
currentsublabel.setTitle("%s: %s" % (k, v))
def design_xmind_sheet(sheet, username, results):
alltags = {} alltags = {}
supposed_data = {} supposed_data = {}
@@ -311,56 +320,43 @@ def design_sheet(sheet, username, results):
for website_name in results: for website_name in results:
dictionary = results[website_name] dictionary = results[website_name]
result_status = dictionary.get("status")
if result_status.status != QueryStatus.CLAIMED:
continue
if dictionary.get("status").status == QueryStatus.CLAIMED: stripped_tags = list(map(lambda x: x.strip(), result_status.tags))
# firsttime I found that entry normalized_tags = list(
for tag in dictionary.get("status").tags: filter(lambda x: x and not is_country_tag(x), stripped_tags)
if tag.strip() == "": )
continue
if tag not in alltags.keys():
if not is_country_tag(tag):
tagsection = root_topic1.addSubTopic()
tagsection.setTitle(tag)
alltags[tag] = tagsection
category = None category = None
for tag in dictionary.get("status").tags: for tag in normalized_tags:
if tag.strip() == "": if tag in alltags.keys():
continue continue
if not is_country_tag(tag): tagsection = root_topic1.addSubTopic()
category = tag tagsection.setTitle(tag)
alltags[tag] = tagsection
category = tag
if category is None: section = alltags[category] if category else undefinedsection
userlink = undefinedsection.addSubTopic() userlink = section.addSubTopic()
userlink.addLabel(dictionary.get("status").site_url_user) userlink.addLabel(result_status.site_url_user)
ids_data = result_status.ids_data or {}
for k, v in ids_data.items():
# suppose target data
if isinstance(v, list):
for currentval in v:
add_xmind_subtopic(userlink, k, currentval, supposed_data)
else: else:
userlink = alltags[category].addSubTopic() add_xmind_subtopic(userlink, k, v, supposed_data)
userlink.addLabel(dictionary.get("status").site_url_user)
if dictionary.get("status").ids_data:
for k, v in dictionary.get("status").ids_data.items():
# suppose target data
if not isinstance(v, list):
currentsublabel = userlink.addSubTopic()
field = "fullname" if k == "name" else k
if field not in supposed_data:
supposed_data[field] = []
supposed_data[field].append(v)
currentsublabel.setTitle("%s: %s" % (k, v))
else:
for currentval in v:
currentsublabel = userlink.addSubTopic()
field = "fullname" if k == "name" else k
if field not in supposed_data:
supposed_data[field] = []
supposed_data[field].append(currentval)
currentsublabel.setTitle("%s: %s" % (k, currentval))
# add supposed data # add supposed data
filterede_supposed_data = filter_supposed_data(supposed_data) filtered_supposed_data = filter_supposed_data(supposed_data)
if len(filterede_supposed_data) > 0: if len(filtered_supposed_data) > 0:
undefinedsection = root_topic1.addSubTopic() undefinedsection = root_topic1.addSubTopic()
undefinedsection.setTitle("SUPPOSED DATA") undefinedsection.setTitle("SUPPOSED DATA")
for k, v in filterede_supposed_data.items(): for k, v in filtered_supposed_data.items():
currentsublabel = undefinedsection.addSubTopic() currentsublabel = undefinedsection.addSubTopic()
currentsublabel.setTitle("%s: %s" % (k, v)) currentsublabel.setTitle("%s: %s" % (k, v))
+18 -17
View File
@@ -167,6 +167,17 @@ class MaigretSite:
return result return result
def get_url_type(self) -> str:
url = URLMatcher.extract_main_part(self.url)
if url.startswith("{username}"):
url = "SUBDOMAIN"
elif url == "":
url = f"{self.url} ({self.engine})"
else:
parts = url.split("/")
url = "/" + "/".join(parts[1:])
return url
def update(self, updates: "dict") -> "MaigretSite": def update(self, updates: "dict") -> "MaigretSite":
self.__dict__.update(updates) self.__dict__.update(updates)
self.update_detectors() self.update_detectors()
@@ -405,34 +416,23 @@ class MaigretDatabase:
if not sites_dict: if not sites_dict:
sites_dict = self.sites_dict() sites_dict = self.sites_dict()
urls = {}
tags = {}
output = "" output = ""
disabled_count = 0 disabled_count = 0
total_count = len(sites_dict) total_count = len(sites_dict)
urls = {}
tags = {}
for _, site in sites_dict.items(): for _, site in sites_dict.items():
if site.disabled: if site.disabled:
disabled_count += 1 disabled_count += 1
url = URLMatcher.extract_main_part(site.url) url_type = site.get_url_type()
if url.startswith("{username}"): urls[url_type] = urls.get(url_type, 0) + 1
url = "SUBDOMAIN"
elif url == "":
url = f"{site.url} ({site.engine})"
else:
parts = url.split("/")
url = "/" + "/".join(parts[1:])
urls[url] = urls.get(url, 0) + 1
if not site.tags: if not site.tags:
tags["NO_TAGS"] = tags.get("NO_TAGS", 0) + 1 tags["NO_TAGS"] = tags.get("NO_TAGS", 0) + 1
for tag in site.tags: for tag in filter(lambda x: not is_country_tag(x), site.tags):
if is_country_tag(tag):
# currenty do not display country tags
continue
tags[tag] = tags.get(tag, 0) + 1 tags[tag] = tags.get(tag, 0) + 1
output += f"Enabled/total sites: {total_count - disabled_count}/{total_count}\n" output += f"Enabled/total sites: {total_count - disabled_count}/{total_count}\n"
@@ -441,8 +441,9 @@ class MaigretDatabase:
if count == 1: if count == 1:
break break
output += f"{count}\t{url}\n" output += f"{count}\t{url}\n"
output += "Top sites' tags:\n" output += "Top sites' tags:\n"
for tag, count in sorted(tags.items(), key=lambda x: x[1], reverse=True): for tag, count in sorted(tags.items(), key=lambda x: x[1], reverse=True)[:20]:
mark = "" mark = ""
if tag not in SUPPORTED_TAGS: if tag not in SUPPORTED_TAGS:
mark = " (non-standard)" mark = " (non-standard)"