Compare commits

...

15 Commits

Author SHA1 Message Date
soxoj 158f739a59 Merge pull request #129 from soxoj/0.2.2
Updated sites, improved submit dialog, bump to 0.2.2
2021-05-07 12:30:55 +03:00
Soxoj b6a207d0e3 Updated sites, improved submit dialog, bump to 0.2.2 2021-05-07 12:27:24 +03:00
soxoj d59867b0d9 Merge pull request #128 from soxoj/sites-improvements
Added several sites, some improvements
2021-05-07 01:23:23 +03:00
Soxoj 2145027196 Added several sites, some improvements 2021-05-07 01:20:20 +03:00
soxoj 386e9eba4f Merge pull request #127 from soxoj/extraction-notify-tests
Improve extracting ids from URLs, tests
2021-05-06 22:38:22 +03:00
Soxoj 0e9655c46a Improve extracting ids from URLs, tests 2021-05-06 22:35:44 +03:00
soxoj 009d51c380 Merge pull request #126 from soxoj/main-refactoring
Main maigret function refactoring
2021-05-05 23:32:27 +03:00
Soxoj 78e9688ece Test data fix 2021-05-05 23:27:30 +03:00
Soxoj 3cbb9df7b3 Main maigret function refactoring 2021-05-05 18:02:13 +03:00
soxoj 2fb1f19948 Merge pull request #125 from soxoj/argparser-tests
CLI arguments improvements, tests added
2021-05-05 15:34:36 +03:00
Soxoj 3b91a9cd31 CLI arguments improvements, tests added 2021-05-05 15:27:56 +03:00
soxoj 9858e71349 Merge pull request #124 from soxoj/refactoring-complexity-decrease
Refactored to decrease cyclomatic complexity
2021-05-05 10:59:11 +03:00
Soxoj c88e194d07 Refactored to decrease cyclomatic complexity 2021-05-05 10:55:33 +03:00
soxoj ad5c7fbc7d Merge pull request #123 from soxoj/new-sites-engines
Added some new sites, engines updates
2021-05-03 03:18:40 +03:00
Soxoj 66d6c7a93c Added some new sites, engines updates 2021-05-03 03:16:02 +03:00
21 changed files with 5029 additions and 3362 deletions
+6 -3
View File
@@ -22,9 +22,12 @@ src/
# Comma-Separated Values (CSV) Reports # Comma-Separated Values (CSV) Reports
*.csv *.csv
# Excluded sites list
tests/.excluded_sites
# MacOS Folder Metadata File # MacOS Folder Metadata File
.DS_Store .DS_Store
/reports/ /reports/
# Testing
.coverage
dist/
htmlcov/
/test_*
+5
View File
@@ -2,6 +2,11 @@
## [Unreleased] ## [Unreleased]
## [0.2.2] - 2021-05-07
* improved ids extractors
* updated sites and engines
* updates CLI options
## [0.2.1] - 2021-05-02 ## [0.2.1] - 2021-05-02
* fixed json reports generation bug, added tests * fixed json reports generation bug, added tests
-18
View File
@@ -34,24 +34,6 @@ class ParsingActivator:
bearer_token = r.json()["accessToken"] bearer_token = r.json()["accessToken"]
site.headers["authorization"] = f"Bearer {bearer_token}" site.headers["authorization"] = f"Bearer {bearer_token}"
@staticmethod
def xssis(site, logger, cookies={}):
if not cookies:
logger.debug("You must have cookies to activate xss.is parsing!")
return
headers = dict(site.headers)
post_data = {
"_xfResponseType": "json",
"_xfToken": "1611177919,a2710362e45dad9aa1da381e21941a38",
}
headers["content-type"] = "application/x-www-form-urlencoded; charset=UTF-8"
r = requests.post(
site.activation["url"], headers=headers, cookies=cookies, data=post_data
)
csrf = r.json()["csrf"]
site.get_params["_xfToken"] = csrf
async def import_aiohttp_cookies(cookiestxt_filename): async def import_aiohttp_cookies(cookiestxt_filename):
cookies_obj = MozillaCookieJar(cookiestxt_filename) cookies_obj = MozillaCookieJar(cookiestxt_filename)
+31 -28
View File
@@ -6,6 +6,7 @@ import ssl
import sys import sys
import tqdm import tqdm
from typing import Tuple, Optional, Dict, List from typing import Tuple, Optional, Dict, List
from urllib.parse import quote
import aiohttp import aiohttp
import tqdm.asyncio import tqdm.asyncio
@@ -27,7 +28,7 @@ from .types import QueryOptions, QueryResultWrapper
from .utils import get_random_user_agent from .utils import get_random_user_agent
supported_recursive_search_ids = ( SUPPORTED_IDS = (
"yandex_public_id", "yandex_public_id",
"gaia_id", "gaia_id",
"vk_id", "vk_id",
@@ -37,7 +38,7 @@ supported_recursive_search_ids = (
"uidme_uguid", "uidme_uguid",
) )
unsupported_characters = "#" BAD_CHARS = "#"
async def get_response(request_future, logger) -> Tuple[str, int, Optional[CheckError]]: async def get_response(request_future, logger) -> Tuple[str, int, Optional[CheckError]]:
@@ -54,10 +55,9 @@ async def get_response(request_future, logger) -> Tuple[str, int, Optional[Check
decoded_content = response_content.decode(charset, "ignore") decoded_content = response_content.decode(charset, "ignore")
html_text = decoded_content html_text = decoded_content
error = None
if status_code == 0: if status_code == 0:
error = CheckError("Connection lost") error = CheckError("Connection lost")
else:
error = None
logger.debug(html_text) logger.debug(html_text)
@@ -73,9 +73,8 @@ async def get_response(request_future, logger) -> Tuple[str, int, Optional[Check
error = CheckError("Interrupted") error = CheckError("Interrupted")
except Exception as e: except Exception as e:
# python-specific exceptions # python-specific exceptions
if sys.version_info.minor > 6: if sys.version_info.minor > 6 and (
if isinstance(e, ssl.SSLCertVerificationError) or isinstance( isinstance(e, ssl.SSLCertVerificationError) or isinstance(e, ssl.SSLError)
e, ssl.SSLError
): ):
error = CheckError("SSL", str(e)) error = CheckError("SSL", str(e))
else: else:
@@ -109,6 +108,14 @@ def detect_error_page(
return None return None
def debug_response_logging(url, html_text, status_code, check_error):
with open("debug.log", "a") as f:
status = status_code or "No response"
f.write(f"url: {url}\nerror: {check_error}\nr: {status}\n")
if html_text:
f.write(f"code: {status}\nresponse: {str(html_text)}\n")
def process_site_result( def process_site_result(
response, query_notify, logger, results_info: QueryResultWrapper, site: MaigretSite response, query_notify, logger, results_info: QueryResultWrapper, site: MaigretSite
): ):
@@ -121,7 +128,7 @@ def process_site_result(
username = results_info["username"] username = results_info["username"]
is_parsing_enabled = results_info["parsing_enabled"] is_parsing_enabled = results_info["parsing_enabled"]
url = results_info.get("url_user") url = results_info.get("url_user")
logger.debug(url) logger.info(url)
status = results_info.get("status") status = results_info.get("status")
if status is not None: if status is not None:
@@ -142,11 +149,7 @@ def process_site_result(
response_time = None response_time = None
if logger.level == logging.DEBUG: if logger.level == logging.DEBUG:
with open("debug.txt", "a") as f: debug_response_logging(url, html_text, status_code, check_error)
status = status_code or "No response"
f.write(f"url: {url}\nerror: {check_error}\nr: {status}\n")
if html_text:
f.write(f"code: {status}\nresponse: {str(html_text)}\n")
# additional check for errors # additional check for errors
if status_code and not check_error: if status_code and not check_error:
@@ -154,11 +157,12 @@ def process_site_result(
html_text, status_code, site.errors, site.ignore403 html_text, status_code, site.errors, site.ignore403
) )
if site.activation and html_text: # parsing activation
is_need_activation = any( is_need_activation = any(
[s for s in site.activation["marks"] if s in html_text] [s for s in site.activation.get("marks", []) if s in html_text]
) )
if is_need_activation:
if site.activation and html_text and is_need_activation:
method = site.activation["method"] method = site.activation["method"]
try: try:
activate_fun = getattr(ParsingActivator(), method) activate_fun = getattr(ParsingActivator(), method)
@@ -169,13 +173,18 @@ def process_site_result(
f"Activation method {method} for site {site.name} not found!" f"Activation method {method} for site {site.name} not found!"
) )
except Exception as e: except Exception as e:
logger.warning(f"Failed activation {method} for site {site.name}: {e}") logger.warning(
f"Failed activation {method} for site {site.name}: {str(e)}",
exc_info=True,
)
# TODO: temporary check error
site_name = site.pretty_name site_name = site.pretty_name
# presense flags # presense flags
# True by default # True by default
presense_flags = site.presense_strs presense_flags = site.presense_strs
is_presense_detected = False is_presense_detected = False
if html_text: if html_text:
if not presense_flags: if not presense_flags:
is_presense_detected = True is_presense_detected = True
@@ -200,7 +209,7 @@ def process_site_result(
) )
if check_error: if check_error:
logger.debug(check_error) logger.warning(check_error)
result = QueryResult( result = QueryResult(
username, username,
site_name, site_name,
@@ -255,16 +264,13 @@ def process_site_result(
for k, v in extracted_ids_data.items(): for k, v in extracted_ids_data.items():
if "username" in k: if "username" in k:
new_usernames[v] = "username" new_usernames[v] = "username"
if k in supported_recursive_search_ids: if k in SUPPORTED_IDS:
new_usernames[v] = k new_usernames[v] = k
results_info["ids_usernames"] = new_usernames results_info["ids_usernames"] = new_usernames
results_info["ids_links"] = eval(extracted_ids_data.get("links", "[]")) results_info["ids_links"] = eval(extracted_ids_data.get("links", "[]"))
result.ids_data = extracted_ids_data result.ids_data = extracted_ids_data
# Notify caller about results of query.
query_notify.update(result, site.similar_search)
# Save status of request # Save status of request
results_info["status"] = result results_info["status"] = result
@@ -303,7 +309,7 @@ def make_site_result(
# URL of user on site (if it exists) # URL of user on site (if it exists)
url = site.url.format( url = site.url.format(
urlMain=site.url_main, urlSubpath=site.url_subpath, username=username urlMain=site.url_main, urlSubpath=site.url_subpath, username=quote(username)
) )
# workaround to prevent slash errors # workaround to prevent slash errors
@@ -412,6 +418,8 @@ async def check_site_for_username(
response, query_notify, logger, default_result, site response, query_notify, logger, default_result, site
) )
query_notify.update(response_result['status'], site.similar_search)
return site.name, response_result return site.name, response_result
@@ -616,15 +624,10 @@ async def site_self_check(
"disabled": False, "disabled": False,
} }
try:
check_data = [ check_data = [
(site.username_claimed, QueryStatus.CLAIMED), (site.username_claimed, QueryStatus.CLAIMED),
(site.username_unclaimed, QueryStatus.AVAILABLE), (site.username_unclaimed, QueryStatus.AVAILABLE),
] ]
except Exception as e:
logger.error(e)
logger.error(site.__dict__)
check_data = []
logger.info(f"Checking {site.name}...") logger.info(f"Checking {site.name}...")
+17 -2
View File
@@ -1,6 +1,7 @@
from typing import Dict, List, Any from typing import Dict, List, Any
from .result import QueryResult from .result import QueryResult
from .types import QueryResultWrapper
# error got as a result of completed search query # error got as a result of completed search query
@@ -34,6 +35,12 @@ COMMON_ERRORS = {
'Please stand by, while we are checking your browser': CheckError( 'Please stand by, while we are checking your browser': CheckError(
'Bot protection', 'Cloudflare' 'Bot protection', 'Cloudflare'
), ),
'<span data-translate="checking_browser">Checking your browser before accessing</span>': CheckError(
'Bot protection', 'Cloudflare'
),
'This website is using a security service to protect itself from online attacks.': CheckError(
'Access denied', 'Cloudflare'
),
'<title>Доступ ограничен</title>': CheckError('Censorship', 'Rostelecom'), '<title>Доступ ограничен</title>': CheckError('Censorship', 'Rostelecom'),
'document.getElementById(\'validate_form_submit\').disabled=true': CheckError( 'document.getElementById(\'validate_form_submit\').disabled=true': CheckError(
'Captcha', 'Mail.ru' 'Captcha', 'Mail.ru'
@@ -48,6 +55,9 @@ COMMON_ERRORS = {
'Censorship', 'MGTS' 'Censorship', 'MGTS'
), ),
'Incapsula incident ID': CheckError('Bot protection', 'Incapsula'), 'Incapsula incident ID': CheckError('Bot protection', 'Incapsula'),
'Сайт заблокирован хостинг-провайдером': CheckError(
'Site-specific', 'Site is disabled (Beget)'
),
} }
ERRORS_TYPES = { ERRORS_TYPES = {
@@ -57,6 +67,11 @@ ERRORS_TYPES = {
'Request timeout': 'Try to increase timeout or to switch to another internet service provider', 'Request timeout': 'Try to increase timeout or to switch to another internet service provider',
} }
# TODO: checking for reason
ERRORS_REASONS = {
'Login required': 'Add authorization cookies through `--cookies-jar-file` (see cookies.txt)',
}
TEMPORARY_ERRORS_TYPES = [ TEMPORARY_ERRORS_TYPES = [
'Request timeout', 'Request timeout',
'Unknown', 'Unknown',
@@ -90,9 +105,9 @@ def solution_of(err_type) -> str:
return ERRORS_TYPES.get(err_type, '') return ERRORS_TYPES.get(err_type, '')
def extract_and_group(search_res: dict) -> List[Dict[str, Any]]: def extract_and_group(search_res: QueryResultWrapper) -> List[Dict[str, Any]]:
errors_counts: Dict[str, int] = {} errors_counts: Dict[str, int] = {}
for r in search_res: for r in search_res.values():
if r and isinstance(r, dict) and r.get('status'): if r and isinstance(r, dict) and r.get('status'):
if not isinstance(r['status'], QueryResult): if not isinstance(r['status'], QueryResult):
continue continue
+256 -216
View File
@@ -8,15 +8,16 @@ import os
import sys import sys
import platform import platform
from argparse import ArgumentParser, RawDescriptionHelpFormatter from argparse import ArgumentParser, RawDescriptionHelpFormatter
from typing import List, Tuple
import requests import requests
from socid_extractor import extract, parse, __version__ as socid_version from socid_extractor import extract, parse, __version__ as socid_version
from .checking import ( from .checking import (
timeout_check, timeout_check,
supported_recursive_search_ids, SUPPORTED_IDS,
self_check, self_check,
unsupported_characters, BAD_CHARS,
maigret, maigret,
) )
from . import errors from . import errors
@@ -29,18 +30,18 @@ from .report import (
generate_report_context, generate_report_context,
save_txt_report, save_txt_report,
SUPPORTED_JSON_REPORT_FORMATS, SUPPORTED_JSON_REPORT_FORMATS,
check_supported_json_format,
save_json_report, save_json_report,
) )
from .sites import MaigretDatabase from .sites import MaigretDatabase
from .submit import submit_dialog from .submit import submit_dialog
from .types import QueryResultWrapper
from .utils import get_dict_ascii_tree from .utils import get_dict_ascii_tree
__version__ = '0.2.1' __version__ = '0.2.2'
def notify_about_errors(search_results, query_notify): def notify_about_errors(search_results: QueryResultWrapper, query_notify):
errs = errors.extract_and_group(search_results.values()) errs = errors.extract_and_group(search_results)
was_errs_displayed = False was_errs_displayed = False
for e in errs: for e in errs:
if not errors.is_important(e): if not errors.is_important(e):
@@ -59,6 +60,67 @@ def notify_about_errors(search_results, query_notify):
) )
def extract_ids_from_url(url: str, db: MaigretDatabase) -> dict:
results = {}
for s in db.sites:
result = s.extract_id_from_url(url)
if not result:
continue
_id, _type = result
results[_id] = _type
return results
def extract_ids_from_page(url, logger, timeout=5) -> dict:
results = {}
# url, headers
reqs: List[Tuple[str, set]] = [(url, set())]
try:
# temporary workaround for URL mutations MVP
from socid_extractor import mutate_url
reqs += list(mutate_url(url))
except Exception as e:
logger.warning(e)
for req in reqs:
url, headers = req
print(f'Scanning webpage by URL {url}...')
page, _ = parse(url, cookies_str='', headers=headers, timeout=timeout)
logger.debug(page)
info = extract(page)
if not info:
print('Nothing extracted')
else:
print(get_dict_ascii_tree(info.items(), new_line=False), ' ')
for k, v in info.items():
if 'username' in k:
results[v] = 'username'
if k in SUPPORTED_IDS:
results[v] = k
return results
def extract_ids_from_results(results: QueryResultWrapper, db: MaigretDatabase) -> dict:
ids_results = {}
for website_name in results:
dictionary = results[website_name]
# TODO: fix no site data issue
if not dictionary:
continue
new_usernames = dictionary.get('ids_usernames')
if new_usernames:
for u, utype in new_usernames.items():
ids_results[u] = utype
for url in dictionary.get('ids_links', []):
ids_results.update(extract_ids_from_url(url, db))
return ids_results
def setup_arguments_parser(): def setup_arguments_parser():
version_string = '\n'.join( version_string = '\n'.join(
[ [
@@ -74,68 +136,18 @@ def setup_arguments_parser():
formatter_class=RawDescriptionHelpFormatter, formatter_class=RawDescriptionHelpFormatter,
description=f"Maigret v{__version__}", description=f"Maigret v{__version__}",
) )
parser.add_argument(
"username",
nargs='*',
metavar="USERNAMES",
help="One or more usernames to search by.",
)
parser.add_argument( parser.add_argument(
"--version", "--version",
action="version", action="version",
version=version_string, version=version_string,
help="Display version information and dependencies.", help="Display version information and dependencies.",
) )
parser.add_argument(
"--info",
"-vv",
action="store_true",
dest="info",
default=False,
help="Display service information.",
)
parser.add_argument(
"--verbose",
"-v",
action="store_true",
dest="verbose",
default=False,
help="Display extra information and metrics.",
)
parser.add_argument(
"-d",
"--debug",
"-vvv",
action="store_true",
dest="debug",
default=False,
help="Saving debugging information and sites responses in debug.txt.",
)
parser.add_argument(
"--site",
action="append",
metavar='SITE_NAME',
dest="site_list",
default=[],
help="Limit analysis to just the listed sites (use several times to specify more than one)",
)
parser.add_argument(
"--proxy",
"-p",
metavar='PROXY_URL',
action="store",
dest="proxy",
default=None,
help="Make requests over a proxy. e.g. socks5://127.0.0.1:1080",
)
parser.add_argument(
"--db",
metavar="DB_FILE",
dest="db_file",
default=None,
help="Load Maigret database from a JSON file or an online, valid, JSON file.",
)
parser.add_argument(
"--cookies-jar-file",
metavar="COOKIE_FILE",
dest="cookie_file",
default=None,
help="File with cookies.",
)
parser.add_argument( parser.add_argument(
"--timeout", "--timeout",
action="store", action="store",
@@ -143,7 +155,7 @@ def setup_arguments_parser():
dest="timeout", dest="timeout",
type=timeout_check, type=timeout_check,
default=30, default=30,
help="Time (in seconds) to wait for response to requests. " help="Time in seconds to wait for response to requests. "
"Default timeout of 30.0s. " "Default timeout of 30.0s. "
"A longer timeout will be more likely to get results from slow sites. " "A longer timeout will be more likely to get results from slow sites. "
"On the other hand, this may cause a long delay to gather all results. ", "On the other hand, this may cause a long delay to gather all results. ",
@@ -165,65 +177,6 @@ def setup_arguments_parser():
default=100, default=100,
help="Allowed number of concurrent connections.", help="Allowed number of concurrent connections.",
) )
parser.add_argument(
"-a",
"--all-sites",
action="store_true",
dest="all_sites",
default=False,
help="Use all sites for scan.",
)
parser.add_argument(
"--top-sites",
action="store",
default=500,
type=int,
help="Count of sites for scan ranked by Alexa Top (default: 500).",
)
parser.add_argument(
"--print-not-found",
action="store_true",
dest="print_not_found",
default=False,
help="Print sites where the username was not found.",
)
parser.add_argument(
"--print-errors",
action="store_true",
dest="print_check_errors",
default=False,
help="Print errors messages: connection, captcha, site country ban, etc.",
)
parser.add_argument(
"--submit",
metavar='EXISTING_USER_URL',
type=str,
dest="new_site_to_submit",
default=False,
help="URL of existing profile in new site to submit.",
)
parser.add_argument(
"--no-color",
action="store_true",
dest="no_color",
default=False,
help="Don't color terminal output",
)
parser.add_argument(
"--no-progressbar",
action="store_true",
dest="no_progressbar",
default=False,
help="Don't show progressbar.",
)
parser.add_argument(
"--browse",
"-b",
action="store_true",
dest="browse",
default=False,
help="Browse to all results on default bowser.",
)
parser.add_argument( parser.add_argument(
"--no-recursion", "--no-recursion",
action="store_true", action="store_true",
@@ -238,33 +191,27 @@ def setup_arguments_parser():
default=False, default=False,
help="Disable parsing pages for additional data and other usernames.", help="Disable parsing pages for additional data and other usernames.",
) )
parser.add_argument(
"--self-check",
action="store_true",
default=False,
help="Do self check for sites and database and disable non-working ones.",
)
parser.add_argument(
"--stats", action="store_true", default=False, help="Show database statistics."
)
parser.add_argument(
"--use-disabled-sites",
action="store_true",
default=False,
help="Use disabled sites to search (may cause many false positives).",
)
parser.add_argument(
"--parse",
dest="parse_url",
default='',
help="Parse page by URL and extract username and IDs to use for search.",
)
parser.add_argument( parser.add_argument(
"--id-type", "--id-type",
dest="id_type", dest="id_type",
default='username', default='username',
choices=SUPPORTED_IDS,
help="Specify identifier(s) type (default: username).", help="Specify identifier(s) type (default: username).",
) )
parser.add_argument(
"--db",
metavar="DB_FILE",
dest="db_file",
default=None,
help="Load Maigret database from a JSON file or an online, valid, JSON file.",
)
parser.add_argument(
"--cookies-jar-file",
metavar="COOKIE_FILE",
dest="cookie_file",
default=None,
help="File with cookies.",
)
parser.add_argument( parser.add_argument(
"--ignore-ids", "--ignore-ids",
action="append", action="append",
@@ -273,25 +220,156 @@ def setup_arguments_parser():
default=[], default=[],
help="Do not make search by the specified username or other ids.", help="Do not make search by the specified username or other ids.",
) )
parser.add_argument(
"username",
nargs='+',
metavar='USERNAMES',
action="store",
help="One or more usernames to check with social networks.",
)
parser.add_argument(
"--tags", dest="tags", default='', help="Specify tags of sites."
)
# reports options # reports options
parser.add_argument( parser.add_argument(
"--folderoutput", "--folderoutput",
"-fo", "-fo",
dest="folderoutput", dest="folderoutput",
default="reports", default="reports",
metavar="PATH",
help="If using multiple usernames, the output of the results will be saved to this folder.", help="If using multiple usernames, the output of the results will be saved to this folder.",
) )
parser.add_argument( parser.add_argument(
"--proxy",
"-p",
metavar='PROXY_URL',
action="store",
dest="proxy",
default=None,
help="Make requests over a proxy. e.g. socks5://127.0.0.1:1080",
)
filter_group = parser.add_argument_group(
'Site filtering', 'Options to set site search scope'
)
filter_group.add_argument(
"-a",
"--all-sites",
action="store_true",
dest="all_sites",
default=False,
help="Use all sites for scan.",
)
filter_group.add_argument(
"--top-sites",
action="store",
default=500,
metavar="N",
type=int,
help="Count of sites for scan ranked by Alexa Top (default: 500).",
)
filter_group.add_argument(
"--tags", dest="tags", default='', help="Specify tags of sites (see `--stats`)."
)
filter_group.add_argument(
"--site",
action="append",
metavar='SITE_NAME',
dest="site_list",
default=[],
help="Limit analysis to just the specified sites (multiple option).",
)
filter_group.add_argument(
"--use-disabled-sites",
action="store_true",
default=False,
help="Use disabled sites to search (may cause many false positives).",
)
modes_group = parser.add_argument_group(
'Operating modes',
'Various functions except the default search by a username. '
'Modes are executed sequentially in the order of declaration.',
)
modes_group.add_argument(
"--parse",
dest="parse_url",
default='',
metavar='URL',
help="Parse page by URL and extract username and IDs to use for search.",
)
modes_group.add_argument(
"--submit",
metavar='URL',
type=str,
dest="new_site_to_submit",
default=False,
help="URL of existing profile in new site to submit.",
)
modes_group.add_argument(
"--self-check",
action="store_true",
default=False,
help="Do self check for sites and database and disable non-working ones.",
)
modes_group.add_argument(
"--stats",
action="store_true",
default=False,
help="Show database statistics (most frequent sites engines and tags).",
)
output_group = parser.add_argument_group(
'Output options', 'Options to change verbosity and view of the console output'
)
output_group.add_argument(
"--print-not-found",
action="store_true",
dest="print_not_found",
default=False,
help="Print sites where the username was not found.",
)
output_group.add_argument(
"--print-errors",
action="store_true",
dest="print_check_errors",
default=False,
help="Print errors messages: connection, captcha, site country ban, etc.",
)
output_group.add_argument(
"--verbose",
"-v",
action="store_true",
dest="verbose",
default=False,
help="Display extra information and metrics.",
)
output_group.add_argument(
"--info",
"-vv",
action="store_true",
dest="info",
default=False,
help="Display extra/service information and metrics.",
)
output_group.add_argument(
"--debug",
"-vvv",
"-d",
action="store_true",
dest="debug",
default=False,
help="Display extra/service/debug information and metrics, save responses in debug.log.",
)
output_group.add_argument(
"--no-color",
action="store_true",
dest="no_color",
default=False,
help="Don't color terminal output",
)
output_group.add_argument(
"--no-progressbar",
action="store_true",
dest="no_progressbar",
default=False,
help="Don't show progressbar.",
)
report_group = parser.add_argument_group(
'Report formats', 'Supported formats of report files'
)
report_group.add_argument(
"-T", "-T",
"--txt", "--txt",
action="store_true", action="store_true",
@@ -299,7 +377,7 @@ def setup_arguments_parser():
default=False, default=False,
help="Create a TXT report (one report per username).", help="Create a TXT report (one report per username).",
) )
parser.add_argument( report_group.add_argument(
"-C", "-C",
"--csv", "--csv",
action="store_true", action="store_true",
@@ -307,7 +385,7 @@ def setup_arguments_parser():
default=False, default=False,
help="Create a CSV report (one report per username).", help="Create a CSV report (one report per username).",
) )
parser.add_argument( report_group.add_argument(
"-H", "-H",
"--html", "--html",
action="store_true", action="store_true",
@@ -315,7 +393,7 @@ def setup_arguments_parser():
default=False, default=False,
help="Create an HTML report file (general report on all usernames).", help="Create an HTML report file (general report on all usernames).",
) )
parser.add_argument( report_group.add_argument(
"-X", "-X",
"--xmind", "--xmind",
action="store_true", action="store_true",
@@ -323,7 +401,7 @@ def setup_arguments_parser():
default=False, default=False,
help="Generate an XMind 8 mindmap report (one report per username).", help="Generate an XMind 8 mindmap report (one report per username).",
) )
parser.add_argument( report_group.add_argument(
"-P", "-P",
"--pdf", "--pdf",
action="store_true", action="store_true",
@@ -331,14 +409,14 @@ def setup_arguments_parser():
default=False, default=False,
help="Generate a PDF report (general report on all usernames).", help="Generate a PDF report (general report on all usernames).",
) )
parser.add_argument( report_group.add_argument(
"-J", "-J",
"--json", "--json",
action="store", action="store",
metavar='REPORT_TYPE', metavar='TYPE',
dest="json", dest="json",
default='', default='',
type=check_supported_json_format, choices=SUPPORTED_JSON_REPORT_FORMATS,
help=f"Generate a JSON report of specific type: {', '.join(SUPPORTED_JSON_REPORT_FORMATS)}" help=f"Generate a JSON report of specific type: {', '.join(SUPPORTED_JSON_REPORT_FORMATS)}"
" (one report per username).", " (one report per username).",
) )
@@ -371,7 +449,7 @@ async def main():
usernames = { usernames = {
u: args.id_type u: args.id_type
for u in args.username for u in args.username
if u not in ['-'] and u not in args.ignore_ids_list if u and u not in ['-'] and u not in args.ignore_ids_list
} }
parsing_enabled = not args.disable_extracting parsing_enabled = not args.disable_extracting
@@ -382,31 +460,10 @@ async def main():
print("Using the proxy: " + args.proxy) print("Using the proxy: " + args.proxy)
if args.parse_url: if args.parse_url:
# url, headers extracted_ids = extract_ids_from_page(
reqs = [(args.parse_url, set())] args.parse_url, logger, timeout=args.timeout
try: )
# temporary workaround for URL mutations MVP usernames.update(extracted_ids)
from socid_extractor import mutate_url
reqs += list(mutate_url(args.parse_url))
except Exception as e:
logger.warning(e)
pass
for req in reqs:
url, headers = req
print(f'Scanning webpage by URL {url}...')
page, _ = parse(url, cookies_str='', headers=headers)
info = extract(page)
if not info:
print('Nothing extracted')
else:
print(get_dict_ascii_tree(info.items(), new_line=False), ' ')
for k, v in info.items():
if 'username' in k:
usernames[v] = 'username'
if k in supported_recursive_search_ids:
usernames[v] = k
if args.tags: if args.tags:
args.tags = list(set(str(args.tags).split(','))) args.tags = list(set(str(args.tags).split(',')))
@@ -434,7 +491,7 @@ async def main():
top=args.top_sites, top=args.top_sites,
tags=args.tags, tags=args.tags,
names=args.site_list, names=args.site_list,
disabled=False, disabled=args.use_disabled_sites,
id_type=x, id_type=x,
) )
@@ -454,13 +511,17 @@ async def main():
db, site_data, logger, max_connections=args.connections db, site_data, logger, max_connections=args.connections
) )
if is_need_update: if is_need_update:
if input('Do you want to save changes permanently? [Yn]\n').lower() == 'y': if input('Do you want to save changes permanently? [Yn]\n').lower() in (
'y',
'',
):
db.save_to_file(args.db_file) db.save_to_file(args.db_file)
print('Database was successfully updated.') print('Database was successfully updated.')
else: else:
print('Updates will be applied only for current search session.') print('Updates will be applied only for current search session.')
print(db.get_scan_stats(site_data)) print('Scan sessions flags stats: ' + str(db.get_scan_stats(site_data)))
# Database statistics
if args.stats: if args.stats:
print(db.get_db_stats(db.sites_dict)) print(db.get_db_stats(db.sites_dict))
@@ -470,11 +531,6 @@ async def main():
# Define one report filename template # Define one report filename template
report_filepath_tpl = os.path.join(args.folderoutput, 'report_{username}{postfix}') report_filepath_tpl = os.path.join(args.folderoutput, 'report_{username}{postfix}')
# Database stats
# TODO: verbose info about filtered sites
# enabled_count = len(list(filter(lambda x: not x.disabled, site_data.values())))
# print(f'Sites in database, enabled/total: {enabled_count}/{len(site_data)}')
if usernames == {}: if usernames == {}:
# magic params to exit after init # magic params to exit after init
query_notify.warning('No usernames to check, exiting.') query_notify.warning('No usernames to check, exiting.')
@@ -483,7 +539,7 @@ async def main():
if not site_data: if not site_data:
query_notify.warning('No sites to check, exiting!') query_notify.warning('No sites to check, exiting!')
sys.exit(2) sys.exit(2)
else:
query_notify.warning( query_notify.warning(
f'Starting a search on top {len(site_data)} sites from the Maigret database...' f'Starting a search on top {len(site_data)} sites from the Maigret database...'
) )
@@ -501,7 +557,7 @@ async def main():
if username.lower() in already_checked: if username.lower() in already_checked:
continue continue
else:
already_checked.add(username.lower()) already_checked.add(username.lower())
if username in args.ignore_ids_list: if username in args.ignore_ids_list:
@@ -511,10 +567,7 @@ async def main():
continue continue
# check for characters do not supported by sites generally # check for characters do not supported by sites generally
found_unsupported_chars = set(unsupported_characters).intersection( found_unsupported_chars = set(BAD_CHARS).intersection(set(username))
set(username)
)
if found_unsupported_chars: if found_unsupported_chars:
pretty_chars_str = ','.join( pretty_chars_str = ','.join(
map(lambda s: f'"{s}"', found_unsupported_chars) map(lambda s: f'"{s}"', found_unsupported_chars)
@@ -548,22 +601,9 @@ async def main():
general_results.append((username, id_type, results)) general_results.append((username, id_type, results))
# TODO: tests # TODO: tests
for website_name in results: if recursive_search_enabled:
dictionary = results[website_name] extracted_ids = extract_ids_from_results(results, db)
# TODO: fix no site data issue usernames.update(extracted_ids)
if not dictionary or not recursive_search_enabled:
continue
new_usernames = dictionary.get('ids_usernames')
if new_usernames:
for u, utype in new_usernames.items():
usernames[u] = utype
for url in dictionary.get('ids_links', []):
for s in db.sites:
u = s.detect_username(url)
if u:
usernames[u] = 'username'
# reporting for a one username # reporting for a one username
if args.xmind: if args.xmind:
+28 -32
View File
@@ -152,6 +152,27 @@ class QueryNotifyPrint(QueryNotify):
return return
def make_colored_terminal_notify(
self, status, text, status_color, text_color, appendix
):
text = [
f"{Style.BRIGHT}{Fore.WHITE}[{status_color}{status}{Fore.WHITE}]"
+ f"{text_color} {text}: {Style.RESET_ALL}"
+ f"{appendix}"
]
return "".join(text)
def make_simple_terminal_notify(
self, status, text, status_color, text_color, appendix
):
return f"[{status}] {text}: {appendix}"
def make_terminal_notify(self, *args):
if self.color:
return self.make_colored_terminal_notify(*args)
else:
return self.make_simple_terminal_notify(*args)
def start(self, message, id_type): def start(self, message, id_type):
"""Notify Start. """Notify Start.
@@ -204,40 +225,18 @@ class QueryNotifyPrint(QueryNotify):
Return Value: Return Value:
Nothing. Nothing.
""" """
notify = None
self.result = result self.result = result
if not self.result.ids_data:
ids_data_text = "" ids_data_text = ""
else: if self.result.ids_data:
ids_data_text = get_dict_ascii_tree(self.result.ids_data.items(), " ") ids_data_text = get_dict_ascii_tree(self.result.ids_data.items(), " ")
def make_colored_terminal_notify(
status, text, status_color, text_color, appendix
):
text = [
f"{Style.BRIGHT}{Fore.WHITE}[{status_color}{status}{Fore.WHITE}]"
+ f"{text_color} {text}: {Style.RESET_ALL}"
+ f"{appendix}"
]
return "".join(text)
def make_simple_terminal_notify(status, text, appendix):
return f"[{status}] {text}: {appendix}"
def make_terminal_notify(is_colored=True, *args):
if is_colored:
return make_colored_terminal_notify(*args)
else:
return make_simple_terminal_notify(*args)
notify = None
# Output to the terminal is desired. # Output to the terminal is desired.
if result.status == QueryStatus.CLAIMED: if result.status == QueryStatus.CLAIMED:
color = Fore.BLUE if is_similar else Fore.GREEN color = Fore.BLUE if is_similar else Fore.GREEN
status = "?" if is_similar else "+" status = "?" if is_similar else "+"
notify = make_terminal_notify( notify = self.make_terminal_notify(
self.color,
status, status,
result.site_name, result.site_name,
color, color,
@@ -246,8 +245,7 @@ class QueryNotifyPrint(QueryNotify):
) )
elif result.status == QueryStatus.AVAILABLE: elif result.status == QueryStatus.AVAILABLE:
if not self.print_found_only: if not self.print_found_only:
notify = make_terminal_notify( notify = self.make_terminal_notify(
self.color,
"-", "-",
result.site_name, result.site_name,
Fore.RED, Fore.RED,
@@ -256,8 +254,7 @@ class QueryNotifyPrint(QueryNotify):
) )
elif result.status == QueryStatus.UNKNOWN: elif result.status == QueryStatus.UNKNOWN:
if not self.skip_check_errors: if not self.skip_check_errors:
notify = make_terminal_notify( notify = self.make_terminal_notify(
self.color,
"?", "?",
result.site_name, result.site_name,
Fore.RED, Fore.RED,
@@ -267,8 +264,7 @@ class QueryNotifyPrint(QueryNotify):
elif result.status == QueryStatus.ILLEGAL: elif result.status == QueryStatus.ILLEGAL:
if not self.print_found_only: if not self.print_found_only:
text = "Illegal Username Format For This Site!" text = "Illegal Username Format For This Site!"
notify = make_terminal_notify( notify = self.make_terminal_notify(
self.color,
"-", "-",
result.site_name, result.site_name,
Fore.RED, Fore.RED,
@@ -286,7 +282,7 @@ class QueryNotifyPrint(QueryNotify):
sys.stdout.write("\x1b[1K\r") sys.stdout.write("\x1b[1K\r")
print(notify) print(notify)
return return notify
def __str__(self): def __str__(self):
"""Convert Object To String. """Convert Object To String.
+36 -50
View File
@@ -3,7 +3,6 @@ import io
import json import json
import logging import logging
import os import os
from argparse import ArgumentTypeError
from datetime import datetime from datetime import datetime
from typing import Dict, Any from typing import Dict, Any
@@ -293,11 +292,20 @@ def save_xmind_report(filename, username, results):
os.remove(filename) os.remove(filename)
workbook = xmind.load(filename) workbook = xmind.load(filename)
sheet = workbook.getPrimarySheet() sheet = workbook.getPrimarySheet()
design_sheet(sheet, username, results) design_xmind_sheet(sheet, username, results)
xmind.save(workbook, path=filename) xmind.save(workbook, path=filename)
def design_sheet(sheet, username, results): def add_xmind_subtopic(userlink, k, v, supposed_data):
currentsublabel = userlink.addSubTopic()
field = "fullname" if k == "name" else k
if field not in supposed_data:
supposed_data[field] = []
supposed_data[field].append(v)
currentsublabel.setTitle("%s: %s" % (k, v))
def design_xmind_sheet(sheet, username, results):
alltags = {} alltags = {}
supposed_data = {} supposed_data = {}
@@ -311,64 +319,42 @@ def design_sheet(sheet, username, results):
for website_name in results: for website_name in results:
dictionary = results[website_name] dictionary = results[website_name]
result_status = dictionary.get("status")
if dictionary.get("status").status == QueryStatus.CLAIMED: if result_status.status != QueryStatus.CLAIMED:
# firsttime I found that entry continue
for tag in dictionary.get("status").tags:
if tag.strip() == "": stripped_tags = list(map(lambda x: x.strip(), result_status.tags))
normalized_tags = list(
filter(lambda x: x and not is_country_tag(x), stripped_tags)
)
category = None
for tag in normalized_tags:
if tag in alltags.keys():
continue continue
if tag not in alltags.keys():
if not is_country_tag(tag):
tagsection = root_topic1.addSubTopic() tagsection = root_topic1.addSubTopic()
tagsection.setTitle(tag) tagsection.setTitle(tag)
alltags[tag] = tagsection alltags[tag] = tagsection
category = None
for tag in dictionary.get("status").tags:
if tag.strip() == "":
continue
if not is_country_tag(tag):
category = tag category = tag
if category is None: section = alltags[category] if category else undefinedsection
userlink = undefinedsection.addSubTopic() userlink = section.addSubTopic()
userlink.addLabel(dictionary.get("status").site_url_user) userlink.addLabel(result_status.site_url_user)
else:
userlink = alltags[category].addSubTopic()
userlink.addLabel(dictionary.get("status").site_url_user)
if dictionary.get("status").ids_data: ids_data = result_status.ids_data or {}
for k, v in dictionary.get("status").ids_data.items(): for k, v in ids_data.items():
# suppose target data # suppose target data
if not isinstance(v, list): if isinstance(v, list):
currentsublabel = userlink.addSubTopic()
field = "fullname" if k == "name" else k
if field not in supposed_data:
supposed_data[field] = []
supposed_data[field].append(v)
currentsublabel.setTitle("%s: %s" % (k, v))
else:
for currentval in v: for currentval in v:
currentsublabel = userlink.addSubTopic() add_xmind_subtopic(userlink, k, currentval, supposed_data)
field = "fullname" if k == "name" else k else:
if field not in supposed_data: add_xmind_subtopic(userlink, k, v, supposed_data)
supposed_data[field] = []
supposed_data[field].append(currentval)
currentsublabel.setTitle("%s: %s" % (k, currentval))
# add supposed data # add supposed data
filterede_supposed_data = filter_supposed_data(supposed_data) filtered_supposed_data = filter_supposed_data(supposed_data)
if len(filterede_supposed_data) > 0: if len(filtered_supposed_data) > 0:
undefinedsection = root_topic1.addSubTopic() undefinedsection = root_topic1.addSubTopic()
undefinedsection.setTitle("SUPPOSED DATA") undefinedsection.setTitle("SUPPOSED DATA")
for k, v in filterede_supposed_data.items(): for k, v in filtered_supposed_data.items():
currentsublabel = undefinedsection.addSubTopic() currentsublabel = undefinedsection.addSubTopic()
currentsublabel.setTitle("%s: %s" % (k, v)) currentsublabel.setTitle("%s: %s" % (k, v))
def check_supported_json_format(value):
if value and value not in SUPPORTED_JSON_REPORT_FORMATS:
raise ArgumentTypeError(
"JSON report type must be one of the following types: "
+ ", ".join(SUPPORTED_JSON_REPORT_FORMATS)
)
return value
+3133 -1871
View File
File diff suppressed because it is too large Load Diff
+34 -20
View File
@@ -3,7 +3,7 @@
import copy import copy
import json import json
import sys import sys
from typing import Optional, List, Dict, Any from typing import Optional, List, Dict, Any, Tuple
import requests import requests
@@ -146,6 +146,19 @@ class MaigretSite:
return None return None
def extract_id_from_url(self, url: str) -> Optional[Tuple[str, str]]:
if not self.url_regexp:
return None
match_groups = self.url_regexp.match(url)
if not match_groups:
return None
_id = match_groups.groups()[-1].rstrip("/")
_type = self.type
return _id, _type
@property @property
def pretty_name(self): def pretty_name(self):
if self.source: if self.source:
@@ -167,6 +180,17 @@ class MaigretSite:
return result return result
def get_url_type(self) -> str:
url = URLMatcher.extract_main_part(self.url)
if url.startswith("{username}"):
url = "SUBDOMAIN"
elif url == "":
url = f"{self.url} ({self.engine})"
else:
parts = url.split("/")
url = "/" + "/".join(parts[1:])
return url
def update(self, updates: "dict") -> "MaigretSite": def update(self, updates: "dict") -> "MaigretSite":
self.__dict__.update(updates) self.__dict__.update(updates)
self.update_detectors() self.update_detectors()
@@ -405,44 +429,34 @@ class MaigretDatabase:
if not sites_dict: if not sites_dict:
sites_dict = self.sites_dict() sites_dict = self.sites_dict()
urls = {}
tags = {}
output = "" output = ""
disabled_count = 0 disabled_count = 0
total_count = len(sites_dict) total_count = len(sites_dict)
urls = {}
tags = {}
for _, site in sites_dict.items(): for _, site in sites_dict.items():
if site.disabled: if site.disabled:
disabled_count += 1 disabled_count += 1
url = URLMatcher.extract_main_part(site.url) url_type = site.get_url_type()
if url.startswith("{username}"): urls[url_type] = urls.get(url_type, 0) + 1
url = "SUBDOMAIN"
elif url == "":
url = f"{site.url} ({site.engine})"
else:
parts = url.split("/")
url = "/" + "/".join(parts[1:])
urls[url] = urls.get(url, 0) + 1
if not site.tags: if not site.tags:
tags["NO_TAGS"] = tags.get("NO_TAGS", 0) + 1 tags["NO_TAGS"] = tags.get("NO_TAGS", 0) + 1
for tag in site.tags: for tag in filter(lambda x: not is_country_tag(x), site.tags):
if is_country_tag(tag):
# currenty do not display country tags
continue
tags[tag] = tags.get(tag, 0) + 1 tags[tag] = tags.get(tag, 0) + 1
output += f"Enabled/total sites: {total_count - disabled_count}/{total_count}\n" output += f"Enabled/total sites: {total_count - disabled_count}/{total_count}\n"
output += "Top sites' profile URLs:\n" output += "Top profile URLs:\n"
for url, count in sorted(urls.items(), key=lambda x: x[1], reverse=True)[:20]: for url, count in sorted(urls.items(), key=lambda x: x[1], reverse=True)[:20]:
if count == 1: if count == 1:
break break
output += f"{count}\t{url}\n" output += f"{count}\t{url}\n"
output += "Top sites' tags:\n"
for tag, count in sorted(tags.items(), key=lambda x: x[1], reverse=True): output += "Top tags:\n"
for tag, count in sorted(tags.items(), key=lambda x: x[1], reverse=True)[:20]:
mark = "" mark = ""
if tag not in SUPPORTED_TAGS: if tag not in SUPPORTED_TAGS:
mark = " (non-standard)" mark = " (non-standard)"
+7
View File
@@ -291,7 +291,13 @@ async def submit_dialog(db, url_exists, cookie_file, logger):
url_mainpage = extract_mainpage_url(url_exists) url_mainpage = extract_mainpage_url(url_exists)
print('Detecting site engine, please wait...')
sites = []
try:
sites = await detect_known_engine(db, url_exists, url_mainpage, logger) sites = await detect_known_engine(db, url_exists, url_mainpage, logger)
except KeyboardInterrupt:
print('Engine detect process is interrupted.')
if not sites: if not sites:
print("Unable to detect site engine, lets generate checking features") print("Unable to detect site engine, lets generate checking features")
sites = [ sites = [
@@ -304,6 +310,7 @@ async def submit_dialog(db, url_exists, cookie_file, logger):
sem = asyncio.Semaphore(1) sem = asyncio.Semaphore(1)
print("Checking, please wait...")
found = False found = False
chosen_site = None chosen_site = None
for s in sites: for s in sites:
+4 -2
View File
@@ -55,9 +55,11 @@ class URLMatcher:
url_main_part = self.extract_main_part(url) url_main_part = self.extract_main_part(url)
for c in self.UNSAFE_SYMBOLS: for c in self.UNSAFE_SYMBOLS:
url_main_part = url_main_part.replace(c, f"\\{c}") url_main_part = url_main_part.replace(c, f"\\{c}")
username_regexp = username_regexp or ".+?" prepared_username_regexp = (username_regexp or ".+?").lstrip('^').rstrip('$')
url_regexp = url_main_part.replace("{username}", f"({username_regexp})") url_regexp = url_main_part.replace(
"{username}", f"({prepared_username_regexp})"
)
regexp_str = self._HTTP_URL_RE_STR.replace("(.+)", url_regexp) regexp_str = self._HTTP_URL_RE_STR.replace("(.+)", url_regexp)
return re.compile(regexp_str) return re.compile(regexp_str)
+1 -1
View File
@@ -12,7 +12,7 @@ with open('requirements.txt') as rf:
requires = rf.read().splitlines() requires = rf.read().splitlines()
setup(name='maigret', setup(name='maigret',
version='0.2.1', version='0.2.2',
description='Collect a dossier on a person by username from a huge number of sites', description='Collect a dossier on a person by username from a huge number of sites',
long_description=long_description, long_description=long_description,
long_description_content_type="text/markdown", long_description_content_type="text/markdown",
+1187 -1052
View File
File diff suppressed because it is too large Load Diff
+3 -1
View File
@@ -1,2 +1,4 @@
#!/bin/sh #!/bin/sh
pytest tests coverage run --source=./maigret -m pytest tests
coverage report -m
coverage html
+8 -1
View File
@@ -6,11 +6,13 @@ import pytest
from _pytest.mark import Mark from _pytest.mark import Mark
from maigret.sites import MaigretDatabase from maigret.sites import MaigretDatabase
from maigret.maigret import setup_arguments_parser
CUR_PATH = os.path.dirname(os.path.realpath(__file__)) CUR_PATH = os.path.dirname(os.path.realpath(__file__))
JSON_FILE = os.path.join(CUR_PATH, '../maigret/resources/data.json') JSON_FILE = os.path.join(CUR_PATH, '../maigret/resources/data.json')
TEST_JSON_FILE = os.path.join(CUR_PATH, 'db.json') TEST_JSON_FILE = os.path.join(CUR_PATH, 'db.json')
empty_mark = Mark('', [], {}) empty_mark = Mark('', (), {})
def by_slow_marker(item): def by_slow_marker(item):
@@ -51,3 +53,8 @@ def reports_autoclean():
remove_test_reports() remove_test_reports()
yield yield
remove_test_reports() remove_test_reports()
@pytest.fixture(scope='session')
def argparser():
return setup_arguments_parser()
+93
View File
@@ -0,0 +1,93 @@
"""Maigret command-line arguments parsing tests"""
from argparse import Namespace
from typing import Dict, Any
DEFAULT_ARGS: Dict[str, Any] = {
'all_sites': False,
'connections': 100,
'cookie_file': None,
'csv': False,
'db_file': None,
'debug': False,
'disable_extracting': False,
'disable_recursive_search': False,
'folderoutput': 'reports',
'html': False,
'id_type': 'username',
'ignore_ids_list': [],
'info': False,
'json': '',
'new_site_to_submit': False,
'no_color': False,
'no_progressbar': False,
'parse_url': '',
'pdf': False,
'print_check_errors': False,
'print_not_found': False,
'proxy': None,
'retries': 1,
'self_check': False,
'site_list': [],
'stats': False,
'tags': '',
'timeout': 30,
'top_sites': 500,
'txt': False,
'use_disabled_sites': False,
'username': [],
'verbose': False,
'xmind': False,
}
def test_args_search_mode(argparser):
args = argparser.parse_args('username'.split())
assert args.username == ['username']
want_args = dict(DEFAULT_ARGS)
want_args.update({'username': ['username']})
assert args == Namespace(**want_args)
def test_args_search_mode_several_usernames(argparser):
args = argparser.parse_args('username1 username2'.split())
assert args.username == ['username1', 'username2']
want_args = dict(DEFAULT_ARGS)
want_args.update({'username': ['username1', 'username2']})
assert args == Namespace(**want_args)
def test_args_self_check_mode(argparser):
args = argparser.parse_args('--self-check --site GitHub'.split())
want_args = dict(DEFAULT_ARGS)
want_args.update(
{
'self_check': True,
'site_list': ['GitHub'],
'username': [],
}
)
assert args == Namespace(**want_args)
def test_args_multiple_sites(argparser):
args = argparser.parse_args(
'--site GitHub VK --site PornHub --site Taringa,Steam'.split()
)
want_args = dict(DEFAULT_ARGS)
want_args.update(
{
'site_list': ['GitHub', 'PornHub', 'Taringa,Steam'],
'username': ['VK'],
}
)
assert args == Namespace(**want_args)
+61 -17
View File
@@ -1,14 +1,40 @@
"""Maigret main module test functions""" """Maigret main module test functions"""
import asyncio import asyncio
import copy
import pytest import pytest
from mock import Mock from mock import Mock
from maigret.maigret import self_check, maigret from maigret.maigret import self_check, maigret
from maigret.maigret import (
extract_ids_from_page,
extract_ids_from_results,
extract_ids_from_url,
)
from maigret.sites import MaigretSite from maigret.sites import MaigretSite
from maigret.result import QueryResult, QueryStatus from maigret.result import QueryResult, QueryStatus
RESULTS_EXAMPLE = {
'Reddit': {
'cookies': None,
'parsing_enabled': False,
'url_main': 'https://www.reddit.com/',
'username': 'Facebook',
},
'GooglePlayStore': {
'cookies': None,
'http_status': 200,
'is_similar': False,
'parsing_enabled': False,
'rank': 1,
'url_main': 'https://play.google.com/store',
'url_user': 'https://play.google.com/store/apps/developer?id=Facebook',
'username': 'Facebook',
},
}
@pytest.mark.slow @pytest.mark.slow
def test_self_check_db_positive_disable(test_db): def test_self_check_db_positive_disable(test_db):
logger = Mock() logger = Mock()
@@ -113,21 +139,39 @@ def test_maigret_results(test_db):
assert results['Reddit'].get('future') is None assert results['Reddit'].get('future') is None
del results['GooglePlayStore']['future'] del results['GooglePlayStore']['future']
assert results == { assert results == RESULTS_EXAMPLE
'Reddit': {
'cookies': None,
'parsing_enabled': False, def test_extract_ids_from_url(default_db):
'url_main': 'https://www.reddit.com/', assert extract_ids_from_url('https://www.reddit.com/user/test', default_db) == {
'username': 'Facebook', 'test': 'username'
}, }
'GooglePlayStore': { assert extract_ids_from_url('https://vk.com/id123', default_db) == {'123': 'vk_id'}
'cookies': None, assert extract_ids_from_url('https://vk.com/ida123', default_db) == {
'http_status': 200, 'ida123': 'username'
'is_similar': False, }
'parsing_enabled': False, assert extract_ids_from_url(
'rank': 1, 'https://my.mail.ru/yandex.ru/dipres8904/', default_db
'url_main': 'https://play.google.com/store', ) == {'dipres8904': 'username'}
'url_user': 'https://play.google.com/store/apps/developer?id=Facebook', assert extract_ids_from_url(
'username': 'Facebook', 'https://reviews.yandex.ru/user/adbced123', default_db
}, ) == {'adbced123': 'yandex_public_id'}
@pytest.mark.slow
def test_extract_ids_from_page(test_db):
logger = Mock()
extract_ids_from_page('https://www.reddit.com/user/test', logger) == {
'test': 'username'
}
def test_extract_ids_from_results(test_db):
TEST_EXAMPLE = copy.deepcopy(RESULTS_EXAMPLE)
TEST_EXAMPLE['Reddit']['ids_usernames'] = {'test1': 'yandex_public_id'}
TEST_EXAMPLE['Reddit']['ids_links'] = ['https://www.reddit.com/user/test2']
extract_ids_from_results(TEST_EXAMPLE, test_db) == {
'test1': 'yandex_public_id',
'test2': 'username',
} }
+64
View File
@@ -0,0 +1,64 @@
from maigret.errors import CheckError
from maigret.notify import QueryNotifyPrint
from maigret.result import QueryStatus, QueryResult
def test_notify_illegal():
n = QueryNotifyPrint(color=False)
assert (
n.update(
QueryResult(
username="test",
status=QueryStatus.ILLEGAL,
site_name="TEST_SITE",
site_url_user="http://example.com/test",
)
)
== "[-] TEST_SITE: Illegal Username Format For This Site!"
)
def test_notify_claimed():
n = QueryNotifyPrint(color=False)
assert (
n.update(
QueryResult(
username="test",
status=QueryStatus.CLAIMED,
site_name="TEST_SITE",
site_url_user="http://example.com/test",
)
)
== "[+] TEST_SITE: http://example.com/test"
)
def test_notify_available():
n = QueryNotifyPrint(color=False)
assert (
n.update(
QueryResult(
username="test",
status=QueryStatus.AVAILABLE,
site_name="TEST_SITE",
site_url_user="http://example.com/test",
)
)
== "[-] TEST_SITE: Not found!"
)
def test_notify_unknown():
n = QueryNotifyPrint(color=False)
result = QueryResult(
username="test",
status=QueryStatus.UNKNOWN,
site_name="TEST_SITE",
site_url_user="http://example.com/test",
)
result.error = CheckError('Type', 'Reason')
assert n.update(result) == "[?] TEST_SITE: Type error: Reason"
+24 -17
View File
@@ -40,13 +40,13 @@ def test_case_convert_camel_with_digits_to_snake():
def test_is_country_tag(): def test_is_country_tag():
assert is_country_tag('ru') == True assert is_country_tag('ru') is True
assert is_country_tag('FR') == True assert is_country_tag('FR') is True
assert is_country_tag('a1') == False assert is_country_tag('a1') is False
assert is_country_tag('dating') == False assert is_country_tag('dating') is False
assert is_country_tag('global') == True assert is_country_tag('global') is True
def test_enrich_link_str(): def test_enrich_link_str():
@@ -68,8 +68,10 @@ def test_url_extract_main_part():
] ]
url_regexp = re.compile('^https?://(www.)?flickr.com/photos/(.+?)$') url_regexp = re.compile('^https?://(www.)?flickr.com/photos/(.+?)$')
# combine parts variations
for url_parts in itertools.product(*parts): for url_parts in itertools.product(*parts):
url = ''.join(url_parts) url = ''.join(url_parts)
# ensure all combinations give valid main part
assert URLMatcher.extract_main_part(url) == url_main_part assert URLMatcher.extract_main_part(url) == url_main_part
assert not url_regexp.match(url) is None assert not url_regexp.match(url) is None
@@ -84,8 +86,10 @@ def test_url_make_profile_url_regexp():
['/', ''], ['/', ''],
] ]
# combine parts variations
for url_parts in itertools.product(*parts): for url_parts in itertools.product(*parts):
url = ''.join(url_parts) url = ''.join(url_parts)
# ensure all combinations match pattern
assert ( assert (
URLMatcher.make_profile_url_regexp(url).pattern URLMatcher.make_profile_url_regexp(url).pattern
== r'^https?://(www.)?flickr\.com/photos/(.+?)$' == r'^https?://(www.)?flickr\.com/photos/(.+?)$'
@@ -98,6 +102,7 @@ def test_get_dict_ascii_tree():
'legacy_id': '26403415', 'legacy_id': '26403415',
'username': 'alexaimephotographycars', 'username': 'alexaimephotographycars',
'name': 'Alex Aimé', 'name': 'Alex Aimé',
'links': "['www.instagram.com/street.reality.photography/']",
'created_at': '2018-05-04T10:17:01.000+0000', 'created_at': '2018-05-04T10:17:01.000+0000',
'image': 'https://drscdn.500px.org/user_avatar/26403415/q%3D85_w%3D300_h%3D300/v2?webp=true&v=2&sig=0235678a4f7b65e007e864033ebfaf5ef6d87fad34f80a8639d985320c20fe3b', 'image': 'https://drscdn.500px.org/user_avatar/26403415/q%3D85_w%3D300_h%3D300/v2?webp=true&v=2&sig=0235678a4f7b65e007e864033ebfaf5ef6d87fad34f80a8639d985320c20fe3b',
'image_bg': 'https://drscdn.500px.org/user_cover/26403415/q%3D65_m%3D2048/v2?webp=true&v=1&sig=bea411fb158391a4fdad498874ff17088f91257e59dfb376ff67e3a44c3a4201', 'image_bg': 'https://drscdn.500px.org/user_cover/26403415/q%3D65_m%3D2048/v2?webp=true&v=1&sig=bea411fb158391a4fdad498874ff17088f91257e59dfb376ff67e3a44c3a4201',
@@ -107,20 +112,22 @@ def test_get_dict_ascii_tree():
'twitter_username': 'Alexaimephotogr', 'twitter_username': 'Alexaimephotogr',
} }
ascii_tree = get_dict_ascii_tree(data.items()) ascii_tree = get_dict_ascii_tree(data.items(), prepend=" ")
assert ( assert (
ascii_tree ascii_tree
== """ == """
┣╸uid: dXJpOm5vZGU6VXNlcjoyNjQwMzQxNQ== ┣╸uid: dXJpOm5vZGU6VXNlcjoyNjQwMzQxNQ==
┣╸legacy_id: 26403415 ┣╸legacy_id: 26403415
┣╸username: alexaimephotographycars ┣╸username: alexaimephotographycars
┣╸name: Alex Aimé ┣╸name: Alex Aimé
┣╸created_at: 2018-05-04T10:17:01.000+0000 ┣╸links:
┣╸image: https://drscdn.500px.org/user_avatar/26403415/q%3D85_w%3D300_h%3D300/v2?webp=true&v=2&sig=0235678a4f7b65e007e864033ebfaf5ef6d87fad34f80a8639d985320c20fe3b ┃ ┗╸ www.instagram.com/street.reality.photography/
┣╸image_bg: https://drscdn.500px.org/user_cover/26403415/q%3D65_m%3D2048/v2?webp=true&v=1&sig=bea411fb158391a4fdad498874ff17088f91257e59dfb376ff67e3a44c3a4201 ┣╸created_at: 2018-05-04T10:17:01.000+0000
┣╸website: www.instagram.com/street.reality.photography/ ┣╸image: https://drscdn.500px.org/user_avatar/26403415/q%3D85_w%3D300_h%3D300/v2?webp=true&v=2&sig=0235678a4f7b65e007e864033ebfaf5ef6d87fad34f80a8639d985320c20fe3b
┣╸facebook_link: www.instagram.com/street.reality.photography/ ┣╸image_bg: https://drscdn.500px.org/user_cover/26403415/q%3D65_m%3D2048/v2?webp=true&v=1&sig=bea411fb158391a4fdad498874ff17088f91257e59dfb376ff67e3a44c3a4201
┣╸instagram_username: Street.Reality.Photography ┣╸website: www.instagram.com/street.reality.photography/
┗╸twitter_username: Alexaimephotogr""" ┣╸facebook_link: www.instagram.com/street.reality.photography/
┣╸instagram_username: Street.Reality.Photography
┗╸twitter_username: Alexaimephotogr"""
) )
+1 -1
View File
@@ -87,7 +87,7 @@ if __name__ == '__main__':
with open("sites.md", "w") as site_file: with open("sites.md", "w") as site_file:
site_file.write(f""" site_file.write(f"""
## List of supported sites: total {len(sites_subset)}\n ## List of supported sites (search methods): total {len(sites_subset)}\n
Rank data fetched from Alexa by domains. Rank data fetched from Alexa by domains.
""") """)