Refactoring and linting, added notifications about frequent search errors

This commit is contained in:
Soxoj
2021-04-30 12:03:13 +03:00
parent bfaf276f6e
commit bfa6afac32
20 changed files with 1351 additions and 787 deletions
+104 -92
View File
@@ -5,6 +5,7 @@ import logging
import os
from argparse import ArgumentTypeError
from datetime import datetime
from typing import Dict, Any
import pycountry
import xmind
@@ -16,83 +17,85 @@ from .result import QueryStatus
from .utils import is_country_tag, CaseConverter, enrich_link_str
SUPPORTED_JSON_REPORT_FORMATS = [
'simple',
'ndjson',
"simple",
"ndjson",
]
'''
"""
UTILS
'''
"""
def filter_supposed_data(data):
### interesting fields
allowed_fields = ['fullname', 'gender', 'location', 'age']
filtered_supposed_data = {CaseConverter.snake_to_title(k): v[0]
for k, v in data.items()
if k in allowed_fields}
# interesting fields
allowed_fields = ["fullname", "gender", "location", "age"]
filtered_supposed_data = {
CaseConverter.snake_to_title(k): v[0]
for k, v in data.items()
if k in allowed_fields
}
return filtered_supposed_data
'''
"""
REPORTS SAVING
'''
"""
def save_csv_report(filename: str, username: str, results: dict):
with open(filename, 'w', newline='', encoding='utf-8') as f:
with open(filename, "w", newline="", encoding="utf-8") as f:
generate_csv_report(username, results, f)
def save_txt_report(filename: str, username: str, results: dict):
with open(filename, 'w', encoding='utf-8') as f:
with open(filename, "w", encoding="utf-8") as f:
generate_txt_report(username, results, f)
def save_html_report(filename: str, context: dict):
template, _ = generate_report_template(is_pdf=False)
filled_template = template.render(**context)
with open(filename, 'w') as f:
with open(filename, "w") as f:
f.write(filled_template)
def save_pdf_report(filename: str, context: dict):
template, css = generate_report_template(is_pdf=True)
filled_template = template.render(**context)
with open(filename, 'w+b') as f:
with open(filename, "w+b") as f:
pisa.pisaDocument(io.StringIO(filled_template), dest=f, default_css=css)
def save_json_report(filename: str, username: str, results: dict, report_type: str):
with open(filename, 'w', encoding='utf-8') as f:
with open(filename, "w", encoding="utf-8") as f:
generate_json_report(username, results, f, report_type=report_type)
'''
"""
REPORTS GENERATING
'''
"""
def generate_report_template(is_pdf: bool):
"""
HTML/PDF template generation
HTML/PDF template generation
"""
def get_resource_content(filename):
return open(os.path.join(maigret_path, 'resources', filename)).read()
return open(os.path.join(maigret_path, "resources", filename)).read()
maigret_path = os.path.dirname(os.path.realpath(__file__))
if is_pdf:
template_content = get_resource_content('simple_report_pdf.tpl')
css_content = get_resource_content('simple_report_pdf.css')
template_content = get_resource_content("simple_report_pdf.tpl")
css_content = get_resource_content("simple_report_pdf.css")
else:
template_content = get_resource_content('simple_report.tpl')
template_content = get_resource_content("simple_report.tpl")
css_content = None
template = Template(template_content)
template.globals['title'] = CaseConverter.snake_to_title
template.globals['detect_link'] = enrich_link_str
template.globals["title"] = CaseConverter.snake_to_title # type: ignore
template.globals["detect_link"] = enrich_link_str # type: ignore
return template, css_content
@@ -100,15 +103,15 @@ def generate_report_context(username_results: list):
brief_text = []
usernames = {}
extended_info_count = 0
tags = {}
supposed_data = {}
tags: Dict[str, int] = {}
supposed_data: Dict[str, Any] = {}
first_seen = None
for username, id_type, results in username_results:
found_accounts = 0
new_ids = []
usernames[username] = {'type': id_type}
usernames[username] = {"type": id_type}
for website_name in results:
dictionary = results[website_name]
@@ -116,19 +119,19 @@ def generate_report_context(username_results: list):
if not dictionary:
continue
if dictionary.get('is_similar'):
if dictionary.get("is_similar"):
continue
status = dictionary.get('status')
status = dictionary.get("status")
if not status: # FIXME: currently in case of timeout
continue
if status.ids_data:
dictionary['ids_data'] = status.ids_data
dictionary["ids_data"] = status.ids_data
extended_info_count += 1
# detect first seen
created_at = status.ids_data.get('created_at')
created_at = status.ids_data.get("created_at")
if created_at:
if first_seen is None:
first_seen = created_at
@@ -138,37 +141,46 @@ def generate_report_context(username_results: list):
new_time = parse_datetime_str(created_at)
if new_time < known_time:
first_seen = created_at
except:
logging.debug('Problems with converting datetime %s/%s', first_seen, created_at)
except Exception as e:
logging.debug(
"Problems with converting datetime %s/%s: %s",
first_seen,
created_at,
str(e),
)
for k, v in status.ids_data.items():
# suppose target data
field = 'fullname' if k == 'name' else k
if not field in supposed_data:
field = "fullname" if k == "name" else k
if field not in supposed_data:
supposed_data[field] = []
supposed_data[field].append(v)
# suppose country
if k in ['country', 'locale']:
if k in ["country", "locale"]:
try:
if is_country_tag(k):
tag = pycountry.countries.get(alpha_2=v).alpha_2.lower()
else:
tag = pycountry.countries.search_fuzzy(v)[0].alpha_2.lower()
tag = pycountry.countries.search_fuzzy(v)[
0
].alpha_2.lower()
# TODO: move countries to another struct
tags[tag] = tags.get(tag, 0) + 1
except Exception as e:
logging.debug('pycountry exception', exc_info=True)
logging.debug(
"Pycountry exception: %s", str(e), exc_info=True
)
new_usernames = dictionary.get('ids_usernames')
new_usernames = dictionary.get("ids_usernames")
if new_usernames:
for u, utype in new_usernames.items():
if not u in usernames:
if u not in usernames:
new_ids.append((u, utype))
usernames[u] = {'type': utype}
usernames[u] = {"type": utype}
if status.status == QueryStatus.CLAIMED:
found_accounts += 1
dictionary['found'] = True
dictionary["found"] = True
else:
continue
@@ -177,22 +189,24 @@ def generate_report_context(username_results: list):
for t in status.tags:
tags[t] = tags.get(t, 0) + 1
brief_text.append(f'Search by {id_type} {username} returned {found_accounts} accounts.')
brief_text.append(
f"Search by {id_type} {username} returned {found_accounts} accounts."
)
if new_ids:
ids_list = []
for u, t in new_ids:
ids_list.append(f'{u} ({t})' if t != 'username' else u)
brief_text.append(f'Found target\'s other IDs: ' + ', '.join(ids_list) + '.')
ids_list.append(f"{u} ({t})" if t != "username" else u)
brief_text.append("Found target's other IDs: " + ", ".join(ids_list) + ".")
brief_text.append(f'Extended info extracted from {extended_info_count} accounts.')
brief_text.append(f"Extended info extracted from {extended_info_count} accounts.")
brief = ' '.join(brief_text).strip()
brief = " ".join(brief_text).strip()
tuple_sort = lambda d: sorted(d, key=lambda x: x[1], reverse=True)
if 'global' in tags:
if "global" in tags:
# remove tag 'global' useless for country detection
del tags['global']
del tags["global"]
first_username = username_results[0][0]
countries_lists = list(filter(lambda x: is_country_tag(x[0]), tags.items()))
@@ -201,35 +215,33 @@ def generate_report_context(username_results: list):
filtered_supposed_data = filter_supposed_data(supposed_data)
return {
'username': first_username,
'brief': brief,
'results': username_results,
'first_seen': first_seen,
'interests_tuple_list': tuple_sort(interests_list),
'countries_tuple_list': tuple_sort(countries_lists),
'supposed_data': filtered_supposed_data,
'generated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"username": first_username,
"brief": brief,
"results": username_results,
"first_seen": first_seen,
"interests_tuple_list": tuple_sort(interests_list),
"countries_tuple_list": tuple_sort(countries_lists),
"supposed_data": filtered_supposed_data,
"generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
def generate_csv_report(username: str, results: dict, csvfile):
writer = csv.writer(csvfile)
writer.writerow(['username',
'name',
'url_main',
'url_user',
'exists',
'http_status'
]
)
writer.writerow(
["username", "name", "url_main", "url_user", "exists", "http_status"]
)
for site in results:
writer.writerow([username,
site,
results[site]['url_main'],
results[site]['url_user'],
str(results[site]['status'].status),
results[site]['http_status'],
])
writer.writerow(
[
username,
site,
results[site]["url_main"],
results[site]["url_user"],
str(results[site]["status"].status),
results[site]["http_status"],
]
)
def generate_txt_report(username: str, results: dict, file):
@@ -242,12 +254,11 @@ def generate_txt_report(username: str, results: dict, file):
if dictionary.get("status").status == QueryStatus.CLAIMED:
exists_counter += 1
file.write(dictionary["url_user"] + "\n")
file.write(f'Total Websites Username Detected On : {exists_counter}')
file.write(f"Total Websites Username Detected On : {exists_counter}")
def generate_json_report(username: str, results: dict, file, report_type):
exists_counter = 0
is_report_per_line = report_type.startswith('ndjson')
is_report_per_line = report_type.startswith("ndjson")
all_json = {}
for sitename in results:
@@ -257,11 +268,11 @@ def generate_json_report(username: str, results: dict, file, report_type):
continue
data = dict(site_result)
data['status'] = data['status'].json()
data["status"] = data["status"].json()
if is_report_per_line:
data['sitename'] = sitename
file.write(json.dumps(data) + '\n')
data["sitename"] = sitename
file.write(json.dumps(data) + "\n")
else:
all_json[sitename] = data
@@ -269,9 +280,9 @@ def generate_json_report(username: str, results: dict, file, report_type):
file.write(json.dumps(all_json))
'''
"""
XMIND 8 Functions
'''
"""
def save_xmind_report(filename, username, results):
@@ -284,7 +295,6 @@ def save_xmind_report(filename, username, results):
def design_sheet(sheet, username, results):
##all tag list
alltags = {}
supposed_data = {}
@@ -300,7 +310,7 @@ def design_sheet(sheet, username, results):
dictionary = results[website_name]
if dictionary.get("status").status == QueryStatus.CLAIMED:
## firsttime I found that entry
# firsttime I found that entry
for tag in dictionary.get("status").tags:
if tag.strip() == "":
continue
@@ -329,22 +339,22 @@ def design_sheet(sheet, username, results):
# suppose target data
if not isinstance(v, list):
currentsublabel = userlink.addSubTopic()
field = 'fullname' if k == 'name' else k
if not field in supposed_data:
field = "fullname" if k == "name" else k
if field not in supposed_data:
supposed_data[field] = []
supposed_data[field].append(v)
currentsublabel.setTitle("%s: %s" % (k, v))
else:
for currentval in v:
currentsublabel = userlink.addSubTopic()
field = 'fullname' if k == 'name' else k
if not field in supposed_data:
field = "fullname" if k == "name" else k
if field not in supposed_data:
supposed_data[field] = []
supposed_data[field].append(currentval)
currentsublabel.setTitle("%s: %s" % (k, currentval))
### Add Supposed DATA
# add supposed data
filterede_supposed_data = filter_supposed_data(supposed_data)
if (len(filterede_supposed_data) > 0):
if len(filterede_supposed_data) > 0:
undefinedsection = root_topic1.addSubTopic()
undefinedsection.setTitle("SUPPOSED DATA")
for k, v in filterede_supposed_data.items():
@@ -353,7 +363,9 @@ def design_sheet(sheet, username, results):
def check_supported_json_format(value):
if value and not value in SUPPORTED_JSON_REPORT_FORMATS:
raise ArgumentTypeError(f'JSON report type must be one of the following types: '
+ ', '.join(SUPPORTED_JSON_REPORT_FORMATS))
if value and value not in SUPPORTED_JSON_REPORT_FORMATS:
raise ArgumentTypeError(
"JSON report type must be one of the following types: "
+ ", ".join(SUPPORTED_JSON_REPORT_FORMATS)
)
return value