Main maigret function refactoring

This commit is contained in:
Soxoj
2021-05-05 18:02:13 +03:00
parent 2fb1f19948
commit 3cbb9df7b3
6 changed files with 115 additions and 85 deletions
+72 -61
View File
@@ -8,6 +8,7 @@ import os
import sys
import platform
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from typing import List, Tuple
import requests
from socid_extractor import extract, parse, __version__ as socid_version
@@ -16,7 +17,7 @@ from .checking import (
timeout_check,
SUPPORTED_IDS,
self_check,
unsupported_characters,
BAD_CHARS,
maigret,
)
from . import errors
@@ -33,13 +34,14 @@ from .report import (
)
from .sites import MaigretDatabase
from .submit import submit_dialog
from .types import QueryResultWrapper
from .utils import get_dict_ascii_tree
__version__ = '0.2.1'
def notify_about_errors(search_results, query_notify):
errs = errors.extract_and_group(search_results.values())
def notify_about_errors(search_results: QueryResultWrapper, query_notify):
errs = errors.extract_and_group(search_results)
was_errs_displayed = False
for e in errs:
if not errors.is_important(e):
@@ -58,6 +60,58 @@ def notify_about_errors(search_results, query_notify):
)
def extract_ids_from_page(url, logger, timeout=5) -> dict:
results = {}
# url, headers
reqs: List[Tuple[str, set]] = [(url, set())]
try:
# temporary workaround for URL mutations MVP
from socid_extractor import mutate_url
reqs += list(mutate_url(url))
except Exception as e:
logger.warning(e)
for req in reqs:
url, headers = req
print(f'Scanning webpage by URL {url}...')
page, _ = parse(url, cookies_str='', headers=headers, timeout=timeout)
logger.debug(page)
info = extract(page)
if not info:
print('Nothing extracted')
else:
print(get_dict_ascii_tree(info.items(), new_line=False), ' ')
for k, v in info.items():
if 'username' in k:
results[v] = 'username'
if k in SUPPORTED_IDS:
results[v] = k
return results
def extract_ids_from_results(results: QueryResultWrapper, db: MaigretDatabase) -> dict:
ids_results = {}
for website_name in results:
dictionary = results[website_name]
# TODO: fix no site data issue
if not dictionary:
continue
new_usernames = dictionary.get('ids_usernames')
if new_usernames:
for u, utype in new_usernames.items():
ids_results[u] = utype
for url in dictionary.get('ids_links', []):
for s in db.sites:
u = s.detect_username(url)
if u:
ids_results[u] = 'username'
return ids_results
def setup_arguments_parser():
version_string = '\n'.join(
[
@@ -392,31 +446,8 @@ async def main():
print("Using the proxy: " + args.proxy)
if args.parse_url:
# url, headers
reqs = [(args.parse_url, set())]
try:
# temporary workaround for URL mutations MVP
from socid_extractor import mutate_url
reqs += list(mutate_url(args.parse_url))
except Exception as e:
logger.warning(e)
pass
for req in reqs:
url, headers = req
print(f'Scanning webpage by URL {url}...')
page, _ = parse(url, cookies_str='', headers=headers)
info = extract(page)
if not info:
print('Nothing extracted')
else:
print(get_dict_ascii_tree(info.items(), new_line=False), ' ')
for k, v in info.items():
if 'username' in k:
usernames[v] = 'username'
if k in SUPPORTED_IDS:
usernames[v] = k
extracted_ids = extract_ids_from_page(args.parse_url, logger, timeout=args.timeout)
usernames.update(extracted_ids)
if args.tags:
args.tags = list(set(str(args.tags).split(',')))
@@ -471,6 +502,7 @@ async def main():
print('Updates will be applied only for current search session.')
print(db.get_scan_stats(site_data))
# Database statistics
if args.stats:
print(db.get_db_stats(db.sites_dict))
@@ -480,11 +512,6 @@ async def main():
# Define one report filename template
report_filepath_tpl = os.path.join(args.folderoutput, 'report_{username}{postfix}')
# Database stats
# TODO: verbose info about filtered sites
# enabled_count = len(list(filter(lambda x: not x.disabled, site_data.values())))
# print(f'Sites in database, enabled/total: {enabled_count}/{len(site_data)}')
if usernames == {}:
# magic params to exit after init
query_notify.warning('No usernames to check, exiting.')
@@ -493,14 +520,14 @@ async def main():
if not site_data:
query_notify.warning('No sites to check, exiting!')
sys.exit(2)
else:
query_notify.warning(
f'Starting a search on top {len(site_data)} sites from the Maigret database...'
)
if not args.all_sites:
query_notify.warning(
f'Starting a search on top {len(site_data)} sites from the Maigret database...'
'You can run search by full list of sites with flag `-a`', '!'
)
if not args.all_sites:
query_notify.warning(
'You can run search by full list of sites with flag `-a`', '!'
)
already_checked = set()
general_results = []
@@ -511,8 +538,8 @@ async def main():
if username.lower() in already_checked:
continue
else:
already_checked.add(username.lower())
already_checked.add(username.lower())
if username in args.ignore_ids_list:
query_notify.warning(
@@ -521,10 +548,7 @@ async def main():
continue
# check for characters do not supported by sites generally
found_unsupported_chars = set(unsupported_characters).intersection(
set(username)
)
found_unsupported_chars = set(BAD_CHARS).intersection(set(username))
if found_unsupported_chars:
pretty_chars_str = ','.join(
map(lambda s: f'"{s}"', found_unsupported_chars)
@@ -558,22 +582,9 @@ async def main():
general_results.append((username, id_type, results))
# TODO: tests
for website_name in results:
dictionary = results[website_name]
# TODO: fix no site data issue
if not dictionary or not recursive_search_enabled:
continue
new_usernames = dictionary.get('ids_usernames')
if new_usernames:
for u, utype in new_usernames.items():
usernames[u] = utype
for url in dictionary.get('ids_links', []):
for s in db.sites:
u = s.detect_username(url)
if u:
usernames[u] = 'username'
if recursive_search_enabled:
extracted_ids = extract_ids_from_results(results, db)
usernames.update(extracted_ids)
# reporting for a one username
if args.xmind: