diff --git a/maigret/checking.py b/maigret/checking.py index e19f456..5080024 100644 --- a/maigret/checking.py +++ b/maigret/checking.py @@ -37,7 +37,7 @@ SUPPORTED_IDS = ( "uidme_uguid", ) -unsupported_characters = "#" +BAD_CHARS = "#" async def get_response(request_future, logger) -> Tuple[str, int, Optional[CheckError]]: diff --git a/maigret/errors.py b/maigret/errors.py index 11484fd..9ba2183 100644 --- a/maigret/errors.py +++ b/maigret/errors.py @@ -1,6 +1,7 @@ from typing import Dict, List, Any from .result import QueryResult +from .types import QueryResultWrapper # error got as a result of completed search query @@ -104,9 +105,9 @@ def solution_of(err_type) -> str: return ERRORS_TYPES.get(err_type, '') -def extract_and_group(search_res: dict) -> List[Dict[str, Any]]: +def extract_and_group(search_res: QueryResultWrapper) -> List[Dict[str, Any]]: errors_counts: Dict[str, int] = {} - for r in search_res: + for r in search_res.values(): if r and isinstance(r, dict) and r.get('status'): if not isinstance(r['status'], QueryResult): continue diff --git a/maigret/maigret.py b/maigret/maigret.py index c3c3597..d807052 100755 --- a/maigret/maigret.py +++ b/maigret/maigret.py @@ -8,6 +8,7 @@ import os import sys import platform from argparse import ArgumentParser, RawDescriptionHelpFormatter +from typing import List, Tuple import requests from socid_extractor import extract, parse, __version__ as socid_version @@ -16,7 +17,7 @@ from .checking import ( timeout_check, SUPPORTED_IDS, self_check, - unsupported_characters, + BAD_CHARS, maigret, ) from . import errors @@ -33,13 +34,14 @@ from .report import ( ) from .sites import MaigretDatabase from .submit import submit_dialog +from .types import QueryResultWrapper from .utils import get_dict_ascii_tree __version__ = '0.2.1' -def notify_about_errors(search_results, query_notify): - errs = errors.extract_and_group(search_results.values()) +def notify_about_errors(search_results: QueryResultWrapper, query_notify): + errs = errors.extract_and_group(search_results) was_errs_displayed = False for e in errs: if not errors.is_important(e): @@ -58,6 +60,58 @@ def notify_about_errors(search_results, query_notify): ) +def extract_ids_from_page(url, logger, timeout=5) -> dict: + results = {} + # url, headers + reqs: List[Tuple[str, set]] = [(url, set())] + try: + # temporary workaround for URL mutations MVP + from socid_extractor import mutate_url + + reqs += list(mutate_url(url)) + except Exception as e: + logger.warning(e) + + for req in reqs: + url, headers = req + print(f'Scanning webpage by URL {url}...') + page, _ = parse(url, cookies_str='', headers=headers, timeout=timeout) + logger.debug(page) + info = extract(page) + if not info: + print('Nothing extracted') + else: + print(get_dict_ascii_tree(info.items(), new_line=False), ' ') + for k, v in info.items(): + if 'username' in k: + results[v] = 'username' + if k in SUPPORTED_IDS: + results[v] = k + + return results + + +def extract_ids_from_results(results: QueryResultWrapper, db: MaigretDatabase) -> dict: + ids_results = {} + for website_name in results: + dictionary = results[website_name] + # TODO: fix no site data issue + if not dictionary: + continue + + new_usernames = dictionary.get('ids_usernames') + if new_usernames: + for u, utype in new_usernames.items(): + ids_results[u] = utype + + for url in dictionary.get('ids_links', []): + for s in db.sites: + u = s.detect_username(url) + if u: + ids_results[u] = 'username' + return ids_results + + def setup_arguments_parser(): version_string = '\n'.join( [ @@ -392,31 +446,8 @@ async def main(): print("Using the proxy: " + args.proxy) if args.parse_url: - # url, headers - reqs = [(args.parse_url, set())] - try: - # temporary workaround for URL mutations MVP - from socid_extractor import mutate_url - - reqs += list(mutate_url(args.parse_url)) - except Exception as e: - logger.warning(e) - pass - - for req in reqs: - url, headers = req - print(f'Scanning webpage by URL {url}...') - page, _ = parse(url, cookies_str='', headers=headers) - info = extract(page) - if not info: - print('Nothing extracted') - else: - print(get_dict_ascii_tree(info.items(), new_line=False), ' ') - for k, v in info.items(): - if 'username' in k: - usernames[v] = 'username' - if k in SUPPORTED_IDS: - usernames[v] = k + extracted_ids = extract_ids_from_page(args.parse_url, logger, timeout=args.timeout) + usernames.update(extracted_ids) if args.tags: args.tags = list(set(str(args.tags).split(','))) @@ -471,6 +502,7 @@ async def main(): print('Updates will be applied only for current search session.') print(db.get_scan_stats(site_data)) + # Database statistics if args.stats: print(db.get_db_stats(db.sites_dict)) @@ -480,11 +512,6 @@ async def main(): # Define one report filename template report_filepath_tpl = os.path.join(args.folderoutput, 'report_{username}{postfix}') - # Database stats - # TODO: verbose info about filtered sites - # enabled_count = len(list(filter(lambda x: not x.disabled, site_data.values()))) - # print(f'Sites in database, enabled/total: {enabled_count}/{len(site_data)}') - if usernames == {}: # magic params to exit after init query_notify.warning('No usernames to check, exiting.') @@ -493,14 +520,14 @@ async def main(): if not site_data: query_notify.warning('No sites to check, exiting!') sys.exit(2) - else: + + query_notify.warning( + f'Starting a search on top {len(site_data)} sites from the Maigret database...' + ) + if not args.all_sites: query_notify.warning( - f'Starting a search on top {len(site_data)} sites from the Maigret database...' + 'You can run search by full list of sites with flag `-a`', '!' ) - if not args.all_sites: - query_notify.warning( - 'You can run search by full list of sites with flag `-a`', '!' - ) already_checked = set() general_results = [] @@ -511,8 +538,8 @@ async def main(): if username.lower() in already_checked: continue - else: - already_checked.add(username.lower()) + + already_checked.add(username.lower()) if username in args.ignore_ids_list: query_notify.warning( @@ -521,10 +548,7 @@ async def main(): continue # check for characters do not supported by sites generally - found_unsupported_chars = set(unsupported_characters).intersection( - set(username) - ) - + found_unsupported_chars = set(BAD_CHARS).intersection(set(username)) if found_unsupported_chars: pretty_chars_str = ','.join( map(lambda s: f'"{s}"', found_unsupported_chars) @@ -558,22 +582,9 @@ async def main(): general_results.append((username, id_type, results)) # TODO: tests - for website_name in results: - dictionary = results[website_name] - # TODO: fix no site data issue - if not dictionary or not recursive_search_enabled: - continue - - new_usernames = dictionary.get('ids_usernames') - if new_usernames: - for u, utype in new_usernames.items(): - usernames[u] = utype - - for url in dictionary.get('ids_links', []): - for s in db.sites: - u = s.detect_username(url) - if u: - usernames[u] = 'username' + if recursive_search_enabled: + extracted_ids = extract_ids_from_results(results, db) + usernames.update(extracted_ids) # reporting for a one username if args.xmind: diff --git a/maigret/report.py b/maigret/report.py index b411ff6..c3d20a9 100644 --- a/maigret/report.py +++ b/maigret/report.py @@ -3,7 +3,6 @@ import io import json import logging import os -from argparse import ArgumentTypeError from datetime import datetime from typing import Dict, Any diff --git a/tests/conftest.py b/tests/conftest.py index 1e7cbdd..66002a6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,7 +12,7 @@ from maigret.maigret import setup_arguments_parser CUR_PATH = os.path.dirname(os.path.realpath(__file__)) JSON_FILE = os.path.join(CUR_PATH, '../maigret/resources/data.json') TEST_JSON_FILE = os.path.join(CUR_PATH, 'db.json') -empty_mark = Mark('', [], {}) +empty_mark = Mark('', (), {}) def by_slow_marker(item): diff --git a/tests/test_maigret.py b/tests/test_maigret.py index 2701e20..0f3fef5 100644 --- a/tests/test_maigret.py +++ b/tests/test_maigret.py @@ -1,14 +1,35 @@ """Maigret main module test functions""" import asyncio +import copy import pytest from mock import Mock -from maigret.maigret import self_check, maigret +from maigret.maigret import self_check, maigret, extract_ids_from_page, extract_ids_from_results from maigret.sites import MaigretSite from maigret.result import QueryResult, QueryStatus +RESULTS_EXAMPLE = { + 'Reddit': { + 'cookies': None, + 'parsing_enabled': False, + 'url_main': 'https://www.reddit.com/', + 'username': 'Facebook', + }, + 'GooglePlayStore': { + 'cookies': None, + 'http_status': 200, + 'is_similar': False, + 'parsing_enabled': False, + 'rank': 1, + 'url_main': 'https://play.google.com/store', + 'url_user': 'https://play.google.com/store/apps/developer?id=Facebook', + 'username': 'Facebook', + }, +} + + @pytest.mark.slow def test_self_check_db_positive_disable(test_db): logger = Mock() @@ -113,21 +134,20 @@ def test_maigret_results(test_db): assert results['Reddit'].get('future') is None del results['GooglePlayStore']['future'] - assert results == { - 'Reddit': { - 'cookies': None, - 'parsing_enabled': False, - 'url_main': 'https://www.reddit.com/', - 'username': 'Facebook', - }, - 'GooglePlayStore': { - 'cookies': None, - 'http_status': 200, - 'is_similar': False, - 'parsing_enabled': False, - 'rank': 1, - 'url_main': 'https://play.google.com/store', - 'url_user': 'https://play.google.com/store/apps/developer?id=Facebook', - 'username': 'Facebook', - }, - } + assert results == RESULTS_EXAMPLE + + +@pytest.mark.slow +def test_extract_ids_from_page(test_db): + logger = Mock() + found_ids = extract_ids_from_page('https://www.reddit.com/user/test', logger) + assert found_ids == {'test': 'username'} + + +def test_extract_ids_from_results(test_db): + TEST_EXAMPLE = copy.deepcopy(RESULTS_EXAMPLE) + TEST_EXAMPLE['Reddit']['ids_usernames'] = {'test1': 'yandex_public_id'} + TEST_EXAMPLE['Reddit']['ids_links'] = ['https://www.reddit.com/user/test2'] + + found_ids = extract_ids_from_results(TEST_EXAMPLE, test_db) + assert found_ids == {'test1': 'yandex_public_id', 'test2': 'username'}