first commit

This commit is contained in:
Soxoj
2020-01-08 09:51:07 +03:00
commit ac0be37480
21 changed files with 22264 additions and 0 deletions
+8
View File
@@ -0,0 +1,8 @@
.git/
.vscode/
screenshot/
tests/
*.txt
!/requirements.txt
venv/
+29
View File
@@ -0,0 +1,29 @@
# Virtual Environment
venv/
# Editor Configurations
.vscode/
.idea/
# Python
__pycache__/
# Pip
src/
# Jupyter Notebook
.ipynb_checkpoints
*.ipynb
# Output files, except requirements.txt
*.txt
!requirements.txt
# Comma-Separated Values (CSV) Reports
*.csv
# Excluded sites list
tests/.excluded_sites
# MacOS Folder Metadata File
.DS_Store
+27
View File
@@ -0,0 +1,27 @@
FROM python:3.7-alpine as build
WORKDIR /wheels
RUN apk add --no-cache \
g++ \
gcc \
git \
libxml2 \
libxml2-dev \
libxslt-dev \
linux-headers
COPY requirements.txt /opt/maigret/
RUN pip3 wheel -r /opt/maigret/requirements.txt
FROM python:3.7-alpine
WORKDIR /opt/maigret
ARG VCS_REF
ARG VCS_URL="https://gitlab.com/soxoj/maigret"
LABEL org.label-schema.vcs-ref=$VCS_REF \
org.label-schema.vcs-url=$VCS_URL
COPY --from=build /wheels /wheels
COPY . /opt/maigret/
RUN pip3 install -r requirements.txt -f /wheels \
&& rm -rf /wheels \
&& rm -rf /root/.cache/pip/*
ENTRYPOINT ["python", "maigret.py"]
+45
View File
@@ -0,0 +1,45 @@
MIT License
Copyright (c) 2019 Soxoj
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
-------------------------------------------------------------------------------
MIT License
Copyright (c) 2019 Sherlock Project
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
+54
View File
@@ -0,0 +1,54 @@
# Maigret
<p align="center">
<img src="static/maigret.png" />
</p>
<i>The Commissioner Jules Maigret is a fictional French police detective, created by Georges Simenon. His investigation method is based on understanding the personality of different people and their interactions.</i>
## About
Purpose of Maigret - **collect a dossier on a person by username only**, checking for accounts on a huge number of sites.
This is a [sherlock](https://github.com/sherlock-project/) fork with cool features under heavy development.
*Don't forget to regularly update source code from repo*.
Currently supported >1300 sites ([full list](/sites.md)).
## Main features
* Profile pages parsing, [extracting](https://github.com/soxoj/socid_extractor) personal info, links to other profiles, etc.
* Recursive search by new usernames found
* Search by tags (site categories, countries)
* Censorship and captcha detection
* Very few false positives
## Installation
**NOTE**: Python 3.7 or higher and pip is required.
**Python 3.8 is recommended.**
```bash
# clone the repo and change directory
$ git clone https://git.rip/soxoj/maigret && cd maigret
# install the requirements
$ python3 -m pip install -r requirements.txt
```
## Demo with page parsing and recursive username search
```bash
python3 maigret alexaimephotographycars
```
![animation of recursive search](./static/recursive_search.svg)
[Full output](./static/recursive_search.md)
## License
MIT © [Maigret](https://git.rip/soxoj/maigret)<br/>
MIT © [Sherlock Project](https://github.com/sherlock-project/)<br/>
Original Creator of Sherlock Project - [Siddharth Dushantha](https://github.com/sdushantha)
+5
View File
@@ -0,0 +1,5 @@
"""Sherlock Module
This module contains the main logic to search for usernames at social
networks.
"""
+15
View File
@@ -0,0 +1,15 @@
#! /usr/bin/env python3
"""
Maigret (Sherlock fork): Find Usernames Across Social Networks Module
This module contains the main logic to search for usernames at social
networks.
"""
import asyncio
import maigret
if __name__ == "__main__":
asyncio.run(maigret.main())
+867
View File
@@ -0,0 +1,867 @@
#! /usr/bin/env python3
"""
Maigret main module
"""
import asyncio
import csv
import http.cookiejar as cookielib
import json
import logging
import os
import platform
import re
import ssl
import sys
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from http.cookies import SimpleCookie
import aiohttp
import requests
from mock import Mock
from notify import QueryNotifyPrint
from result import QueryResult, QueryStatus
from sites import SitesInformation
from socid_extractor import parse, extract
module_name = "Maigret OSINT tool"
__version__ = "0.1.0"
supported_recursive_search_ids = (
'yandex_public_id',
'gaia_id',
'vk_id',
'ok_id',
'wikimapia_uid',
)
common_errors = {
'<title>Attention Required! | Cloudflare</title>': 'Cloudflare captcha',
'<title>Доступ ограничен</title>': 'Rostelecom censorship',
'document.getElementById(\'validate_form_submit\').disabled=true': 'Mail.ru captcha',
'Verifying your browser, please wait...<br>DDoS Protection by</font> Blazingfast.io': 'Blazingfast protection',
'404</h1><p class="error-card__description">Мы&nbsp;не&nbsp;нашли страницу': 'MegaFon 404 page',
}
unsupported_characters = '#'
cookies_file = 'cookies.txt'
async def get_response(request_future, error_type, social_network, logger):
html_text = None
status_code = 0
error_text = "General Unknown Error"
expection_text = None
try:
response = await request_future
status_code = response.status
response_content = await response.content.read()
charset = response.charset or 'utf-8'
decoded_content = response_content.decode(charset, 'ignore')
html_text = decoded_content
if status_code > 0:
error_text = None
logger.debug(html_text)
except asyncio.TimeoutError as errt:
error_text = "Timeout Error"
expection_text = str(errt)
except (ssl.SSLCertVerificationError, ssl.SSLError) as err:
error_text = "SSL Error"
expection_text = str(err)
except aiohttp.client_exceptions.ClientConnectorError as err:
error_text = "Error Connecting"
expection_text = str(err)
except aiohttp.http_exceptions.BadHttpMessage as err:
error_text = "HTTP Error"
expection_text = str(err)
except Exception as err:
logger.warning(f'Unhandled error while requesting {social_network}: {err}')
logger.debug(err, exc_info=True)
error_text = "Some Error"
expection_text = str(err)
# TODO: return only needed information
return html_text, status_code, error_text, expection_text
async def update_site_data_from_response(site, site_data, site_info, semaphore, logger):
async with semaphore:
future = site_info.get('request_future')
if not future:
# ignore: search by incompatible id type
return
error_type = site_info['errorType']
site_data[site]['resp'] = await get_response(request_future=future,
error_type=error_type,
social_network=site,
logger=logger)
# TODO: move info separate module
def detect_error_page(html_text, status_code, fail_flags, ignore_403):
# Detect service restrictions such as a country restriction
for flag, msg in fail_flags.items():
if flag in html_text:
return 'Some site error', msg
# Detect common restrictions such as provider censorship and bot protection
for flag, msg in common_errors.items():
if flag in html_text:
return 'Error', msg
# Detect common site errors
if status_code == 403 and not ignore_403:
return 'Access denied', 'Access denied, use proxy/vpn'
elif status_code >= 500:
return f'Error {status_code}', f'Site error {status_code}'
return None, None
async def maigret(username, site_data, query_notify, logger,
proxy=None, timeout=None, recursive_search=False,
id_type='username', tags=None, debug=False, forced=False,
max_connections=100):
"""Main search func
Checks for existence of username on various social media sites.
Keyword Arguments:
username -- String indicating username that report
should be created against.
site_data -- Dictionary containing all of the site data.
query_notify -- Object with base type of QueryNotify().
This will be used to notify the caller about
query results.
proxy -- String indicating the proxy URL
timeout -- Time in seconds to wait before timing out request.
Default is no timeout.
recursive_search -- Search for other usernames in website pages & recursive search by them.
Return Value:
Dictionary containing results from report. Key of dictionary is the name
of the social network site, and the value is another dictionary with
the following keys:
url_main: URL of main site.
url_user: URL of user on site (if account exists).
status: QueryResult() object indicating results of test for
account existence.
http_status: HTTP status code of query which checked for existence on
site.
response_text: Text that came back from request. May be None if
there was an HTTP error when checking for existence.
"""
# Notify caller that we are starting the query.
if tags is None:
tags = set()
query_notify.start(username, id_type)
# TODO: connector
connector = aiohttp.TCPConnector(ssl=False)
session = aiohttp.ClientSession(connector=connector)
# Results from analysis of all sites
results_total = {}
# First create futures for all requests. This allows for the requests to run in parallel
for social_network, net_info in site_data.items():
if net_info.get('type', 'username') != id_type:
continue
site_tags = set(net_info.get('tags', []))
if tags:
if not set(tags).intersection(site_tags):
continue
if 'disabled' in net_info and net_info['disabled'] and not forced:
continue
# Results from analysis of this specific site
results_site = {}
# Record URL of main site
results_site['url_main'] = net_info.get("urlMain")
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11.1; rv:55.0) Gecko/20100101 Firefox/55.0',
}
if "headers" in net_info:
# Override/append any extra headers required by a given site.
headers.update(net_info["headers"])
# URL of user on site (if it exists)
url = net_info.get('url').format(username)
# Don't make request if username is invalid for the site
regex_check = net_info.get("regexCheck")
if regex_check and re.search(regex_check, username) is None:
# No need to do the check at the site: this user name is not allowed.
results_site['status'] = QueryResult(username,
social_network,
url,
QueryStatus.ILLEGAL)
results_site["url_user"] = ""
results_site['http_status'] = ""
results_site['response_text'] = ""
query_notify.update(results_site['status'])
else:
# URL of user on site (if it exists)
results_site["url_user"] = url
url_probe = net_info.get("urlProbe")
if url_probe is None:
# Probe URL is normal one seen by people out on the web.
url_probe = url
else:
# There is a special URL for probing existence separate
# from where the user profile normally can be found.
url_probe = url_probe.format(username)
if net_info["errorType"] == 'status_code' and net_info.get("request_head_only", True):
# In most cases when we are detecting by status code,
# it is not necessary to get the entire body: we can
# detect fine with just the HEAD response.
request_method = session.head
else:
# Either this detect method needs the content associated
# with the GET response, or this specific website will
# not respond properly unless we request the whole page.
request_method = session.get
if net_info["errorType"] == "response_url":
# Site forwards request to a different URL if username not
# found. Disallow the redirect so we can capture the
# http status from the original URL request.
allow_redirects = False
else:
# Allow whatever redirect that the site wants to do.
# The final result of the request will be what is available.
allow_redirects = True
# TODO: cookies using
def parse_cookies(cookies_str):
cookies = SimpleCookie()
cookies.load(cookies_str)
return {key: morsel.value for key, morsel in cookies.items()}
if os.path.exists(cookies_file):
cookies_obj = cookielib.MozillaCookieJar(cookies_file)
cookies_obj.load(ignore_discard=True, ignore_expires=True)
else:
cookies_obj = []
# This future starts running the request in a new thread, doesn't block the main thread
if proxy is not None:
proxies = {"http": proxy, "https": proxy}
future = request_method(url=url_probe, headers=headers,
proxies=proxies,
allow_redirects=allow_redirects,
timeout=timeout,
)
else:
future = request_method(url=url_probe, headers=headers,
allow_redirects=allow_redirects,
timeout=timeout,
)
# Store future in data for access later
net_info["request_future"] = future
# Add this site's results into final dictionary with all of the other results.
results_total[social_network] = results_site
# TODO: move into top-level function
sem = asyncio.Semaphore(max_connections)
tasks = []
for social_network, net_info in site_data.items():
future = asyncio.ensure_future(update_site_data_from_response(social_network, site_data, net_info, sem, logger))
tasks.append(future)
await asyncio.gather(*tasks)
await session.close()
# TODO: split to separate functions
for social_network, net_info in site_data.items():
# Retrieve results again
results_site = results_total.get(social_network)
if not results_site:
continue
# Retrieve other site information again
url = results_site.get("url_user")
logger.debug(url)
status = results_site.get("status")
if status is not None:
# We have already determined the user doesn't exist here
continue
# Get the expected error type
error_type = net_info["errorType"]
# Get the failure messages and comments
failure_errors = net_info.get("errors", {})
# TODO: refactor
resp = net_info.get('resp')
if not resp:
logger.error(f'No response for {social_network}')
continue
html_text, status_code, error_text, expection_text = resp
# TODO: add elapsed request time counting
response_time = None
if debug:
with open('debug.txt', 'a') as f:
status = status_code or 'No response'
f.write(f'url: {url}\nerror: {str(error_text)}\nr: {status}\n')
if html_text:
f.write(f'code: {status}\nresponse: {str(html_text)}\n')
if status_code and not error_text:
error_text, site_error_text = detect_error_page(html_text, status_code, failure_errors,
'ignore_403' in net_info)
# presense flags
# True by default
presense_flags = net_info.get("presenseStrs", [])
is_presense_detected = html_text and all(
[(presense_flag in html_text) for presense_flag in presense_flags]) or not presense_flags
if error_text is not None:
logger.debug(error_text)
result = QueryResult(username,
social_network,
url,
QueryStatus.UNKNOWN,
query_time=response_time,
context=error_text)
elif error_type == "message":
absence_flags = net_info.get("errorMsg")
is_absence_flags_list = isinstance(absence_flags, list)
absence_flags_set = set(absence_flags) if is_absence_flags_list else {absence_flags}
# Checks if the error message is in the HTML
is_absence_detected = any([(absence_flag in html_text) for absence_flag in absence_flags_set])
if not is_absence_detected and is_presense_detected:
result = QueryResult(username,
social_network,
url,
QueryStatus.CLAIMED,
query_time=response_time)
else:
result = QueryResult(username,
social_network,
url,
QueryStatus.AVAILABLE,
query_time=response_time)
elif error_type == "status_code":
# Checks if the status code of the response is 2XX
if (not status_code >= 300 or status_code < 200) and is_presense_detected:
result = QueryResult(username,
social_network,
url,
QueryStatus.CLAIMED,
query_time=response_time)
else:
result = QueryResult(username,
social_network,
url,
QueryStatus.AVAILABLE,
query_time=response_time)
elif error_type == "response_url":
# For this detection method, we have turned off the redirect.
# So, there is no need to check the response URL: it will always
# match the request. Instead, we will ensure that the response
# code indicates that the request was successful (i.e. no 404, or
# forward to some odd redirect).
if 200 <= status_code < 300 and is_presense_detected:
result = QueryResult(username,
social_network,
url,
QueryStatus.CLAIMED,
query_time=response_time)
else:
result = QueryResult(username,
social_network,
url,
QueryStatus.AVAILABLE,
query_time=response_time)
else:
# It should be impossible to ever get here...
raise ValueError(f"Unknown Error Type '{error_type}' for "
f"site '{social_network}'")
extracted_ids_data = {}
if recursive_search and result.status == QueryStatus.CLAIMED:
try:
extracted_ids_data = extract(html_text)
except Exception as e:
logger.warning(f'Error while parsing {social_network}: {e}', exc_info=True)
if extracted_ids_data:
new_usernames = {}
for k, v in extracted_ids_data.items():
if 'username' in k:
new_usernames[v] = 'username'
if k in supported_recursive_search_ids:
new_usernames[v] = k
results_site['ids_usernames'] = new_usernames
result.ids_data = extracted_ids_data
is_similar = net_info.get('similarSearch', False)
# Notify caller about results of query.
query_notify.update(result, is_similar)
# Save status of request
results_site['status'] = result
# Save results from request
results_site['http_status'] = status_code
results_site['is_similar'] = is_similar
# results_site['response_text'] = html_text
results_site['rank'] = net_info.get('rank', 0)
# Add this site's results into final dictionary with all of the other results.
results_total[social_network] = results_site
# Notify caller that all queries are finished.
query_notify.finish()
return results_total
def timeout_check(value):
"""Check Timeout Argument.
Checks timeout for validity.
Keyword Arguments:
value -- Time in seconds to wait before timing out request.
Return Value:
Floating point number representing the time (in seconds) that should be
used for the timeout.
NOTE: Will raise an exception if the timeout in invalid.
"""
from argparse import ArgumentTypeError
try:
timeout = float(value)
except ValueError:
raise ArgumentTypeError(f"Timeout '{value}' must be a number.")
if timeout <= 0:
raise ArgumentTypeError(f"Timeout '{value}' must be greater than 0.0s.")
return timeout
async def site_self_check(site_name, site_data, logger):
query_notify = Mock()
changes = {
'disabled': False,
}
check_data = [
(site_data['username_claimed'], QueryStatus.CLAIMED),
(site_data['username_unclaimed'], QueryStatus.AVAILABLE),
]
logger.info(f'Checking {site_name}...')
for username, status in check_data:
results = await maigret(
username,
{site_name: site_data},
query_notify,
logger,
timeout=30,
forced=True,
)
# don't disable entries with other ids types
if site_name not in results:
logger.info(results)
changes['disabled'] = True
continue
site_status = results[site_name]['status'].status
if site_status != status:
if site_status == QueryStatus.UNKNOWN:
msg = site_data.get('errorMsg')
etype = site_data.get('errorType')
logger.info(f'Error while searching {username} in {site_name}: {msg}, type {etype}')
# don't disable in case of available username
if status == QueryStatus.CLAIMED:
changes['disabled'] = True
elif status == QueryStatus.CLAIMED:
logger.info(f'Not found `{username}` in {site_name}, must be claimed')
changes['disabled'] = True
else:
logger.info(f'Found `{username}` in {site_name}, must be available')
changes['disabled'] = True
logger.info(f'Site {site_name} is okay')
return changes
async def self_check(json_file, logger):
sites = SitesInformation(json_file)
all_sites = {}
def disabled_count(data):
return len(list(filter(lambda x: x.get('disabled', False), data)))
async def update_site_data(site_name, site_data, all_sites, logger):
updates = await site_self_check(site_name, dict(site_data), logger)
all_sites[site_name].update(updates)
for site in sites:
all_sites[site.name] = site.information
disabled_old_count = disabled_count(all_sites.values())
tasks = []
for site_name, site_data in all_sites.items():
future = asyncio.ensure_future(update_site_data(site_name, site_data, all_sites, logger))
tasks.append(future)
await asyncio.gather(*tasks)
disabled_new_count = disabled_count(all_sites.values())
total_disabled = disabled_new_count - disabled_old_count
if total_disabled > 0:
message = 'Disabled'
else:
message = 'Enabled'
total_disabled *= -1
print(f'{message} {total_disabled} checked sites. Run with `--info` flag to get more information')
with open(json_file, 'w') as f:
json.dump(all_sites, f, indent=4)
async def main():
version_string = f"%(prog)s {__version__}\n" + \
f"{requests.__description__}: {requests.__version__}\n" + \
f"Python: {platform.python_version()}"
parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
description=f"{module_name} (Version {__version__})"
)
parser.add_argument("--version",
action="version", version=version_string,
help="Display version information and dependencies."
)
parser.add_argument("--info",
action="store_true", dest="info", default=False,
help="Display service information."
)
parser.add_argument("--verbose", "-v",
action="store_true", dest="verbose", default=False,
help="Display extra information and metrics."
)
parser.add_argument("-d", "--debug",
action="store_true", dest="debug", default=False,
help="Saving debugging information and sites responses in debug.txt."
)
parser.add_argument("--rank", "-r",
action="store_true", dest="rank", default=False,
help="Present websites ordered by their Alexa.com global rank in popularity.")
parser.add_argument("--folderoutput", "-fo", dest="folderoutput",
help="If using multiple usernames, the output of the results will be saved to this folder."
)
parser.add_argument("--output", "-o", dest="output",
help="If using single username, the output of the result will be saved to this file."
)
parser.add_argument("--csv",
action="store_true", dest="csv", default=False,
help="Create Comma-Separated Values (CSV) File."
)
parser.add_argument("--site",
action="append", metavar='SITE_NAME',
dest="site_list", default=None,
help="Limit analysis to just the listed sites (use several times to specify more than one)"
)
parser.add_argument("--proxy", "-p", metavar='PROXY_URL',
action="store", dest="proxy", default=None,
help="Make requests over a proxy. e.g. socks5://127.0.0.1:1080"
)
parser.add_argument("--json", "-j", metavar="JSON_FILE",
dest="json_file", default=None,
help="Load data from a JSON file or an online, valid, JSON file.")
parser.add_argument("--timeout",
action="store", metavar='TIMEOUT',
dest="timeout", type=timeout_check, default=10,
help="Time (in seconds) to wait for response to requests."
"Default timeout of 10.0s."
"A longer timeout will be more likely to get results from slow sites."
"On the other hand, this may cause a long delay to gather all results."
)
parser.add_argument("--print-not-found",
action="store_true", dest="print_not_found", default=False,
help="Print sites where the username was not found."
)
parser.add_argument("--print-errors",
action="store_true", dest="print_check_errors", default=False,
help="Print errors messages: connection, captcha, site country ban, etc."
)
parser.add_argument("--no-color",
action="store_true", dest="no_color", default=False,
help="Don't color terminal output"
)
parser.add_argument("--browse", "-b",
action="store_true", dest="browse", default=False,
help="Browse to all results on default bowser."
)
parser.add_argument("--no-recursion",
action="store_true", dest="disable_recursive_search", default=False,
help="Disable parsing pages for other usernames and recursive search by them."
)
parser.add_argument("--self-check",
action="store_true", default=False,
help="Do self check for sites and database and disable non-working ones."
)
parser.add_argument("--use-disabled-sites",
action="store_true", default=False,
help="Use disabled sites to search (may cause many false positives)."
)
parser.add_argument("--parse",
dest="parse_url", default='',
help="Parse page by URL and extract username and IDs to use for search."
)
parser.add_argument("username",
nargs='+', metavar='USERNAMES',
action="store",
help="One or more usernames to check with social networks."
)
parser.add_argument("--tags",
dest="tags", default='',
help="Specify tags of sites."
)
args = parser.parse_args()
# Logging
log_level = logging.ERROR
logging.basicConfig(
format='[%(filename)s:%(lineno)d] %(levelname)-3s %(asctime)s %(message)s',
datefmt='%H:%M:%S',
level=logging.ERROR
)
if args.debug:
log_level = logging.DEBUG
elif args.info:
log_level = logging.INFO
elif args.verbose:
log_level = logging.WARNING
logger = logging.getLogger('maigret')
logger.setLevel(log_level)
# Usernames initial list
usernames = {
u: 'username'
for u in args.username
if u not in ['-']
}
recursive_search_enabled = not args.disable_recursive_search
# Make prompts
if args.proxy is not None:
print("Using the proxy: " + args.proxy)
# Check if both output methods are entered as input.
if args.output is not None and args.folderoutput is not None:
print("You can only use one of the output methods.")
sys.exit(1)
# Check validity for single username output.
if args.output is not None and len(args.username) != 1:
print("You can only use --output with a single username")
sys.exit(1)
if args.parse_url:
page, _ = parse(args.parse_url, cookies_str='')
info = extract(page)
text = 'Extracted ID data from webpage: ' + ', '.join([f'{a}: {b}' for a, b in info.items()])
print(text)
for k, v in info.items():
if 'username' in k:
usernames[v] = 'username'
if k in supported_recursive_search_ids:
usernames[v] = k
if args.tags:
args.tags = set(str(args.tags).split(','))
if args.json_file is None:
args.json_file = \
os.path.join(os.path.dirname(os.path.realpath(__file__)),
"resources/data.json"
)
# Database self-checking
if args.self_check:
print('Maigret sites database self-checking...')
await self_check(args.json_file, logger)
# Create object with all information about sites we are aware of.
try:
sites = SitesInformation(args.json_file)
except Exception as error:
print(f"ERROR: {error}")
sys.exit(1)
# Create original dictionary from SitesInformation() object.
# Eventually, the rest of the code will be updated to use the new object
# directly, but this will glue the two pieces together.
site_data_all = {}
for site in sites:
site_data_all[site.name] = site.information
if args.site_list is None:
# Not desired to look at a sub-set of sites
site_data = site_data_all
else:
# User desires to selectively run queries on a sub-set of the site list.
# Make sure that the sites are supported & build up pruned site database.
site_data = {}
site_missing = []
for site in args.site_list:
for existing_site in site_data_all:
if site.lower() == existing_site.lower():
site_data[existing_site] = site_data_all[existing_site]
if not site_data:
# Build up list of sites not supported for future error message.
site_missing.append(f"'{site}'")
if site_missing:
print(
f"Error: Desired sites not found: {', '.join(site_missing)}.")
sys.exit(1)
if args.rank:
# Sort data by rank
site_dataCpy = dict(site_data)
ranked_sites = sorted(site_data, key=lambda k: ("rank" not in k, site_data[k].get("rank", sys.maxsize)))
site_data = {}
for site in ranked_sites:
site_data[site] = site_dataCpy.get(site)
# Database consistency
enabled_count = len(list(filter(lambda x: not x.get('disabled', False), site_data.values())))
print(f'Sites in database, enabled/total: {enabled_count}/{len(site_data)}')
# Create notify object for query results.
query_notify = QueryNotifyPrint(result=None,
verbose=args.verbose,
print_found_only=not args.print_not_found,
skip_check_errors=not args.print_check_errors,
color=not args.no_color)
already_checked = set()
while usernames:
username, id_type = list(usernames.items())[0]
del usernames[username]
if username.lower() in already_checked:
continue
else:
already_checked.add(username.lower())
# check for characters do not supported by sites generally
found_unsupported_chars = set(unsupported_characters).intersection(set(username))
if found_unsupported_chars:
pretty_chars_str = ','.join(map(lambda s: f'"{s}"', found_unsupported_chars))
print(f'Found unsupported URL characters: {pretty_chars_str}, skip search by username "{username}"')
continue
results = await maigret(username,
site_data,
query_notify,
proxy=args.proxy,
timeout=args.timeout,
recursive_search=recursive_search_enabled,
id_type=id_type,
tags=args.tags,
debug=args.verbose,
logger=logger,
forced=args.use_disabled_sites,
)
if args.output:
result_file = args.output
elif args.folderoutput:
# The usernames results should be stored in a targeted folder.
# If the folder doesn't exist, create it first
os.makedirs(args.folderoutput, exist_ok=True)
result_file = os.path.join(args.folderoutput, f"{username}.txt")
else:
result_file = f"{username}.txt"
with open(result_file, "w", encoding="utf-8") as file:
exists_counter = 0
for website_name in results:
dictionary = results[website_name]
new_usernames = dictionary.get('ids_usernames')
if new_usernames:
for u, utype in new_usernames.items():
usernames[u] = utype
if dictionary.get("status").status == QueryStatus.CLAIMED:
exists_counter += 1
file.write(dictionary["url_user"] + "\n")
file.write(f"Total Websites Username Detected On : {exists_counter}")
if args.csv:
with open(username + ".csv", "w", newline='', encoding="utf-8") as csv_report:
writer = csv.writer(csv_report)
writer.writerow(['username',
'name',
'url_main',
'url_user',
'exists',
'http_status',
'response_time_s'
]
)
for site in results:
response_time_s = results[site]['status'].query_time
if response_time_s is None:
response_time_s = ""
writer.writerow([username,
site,
results[site]['url_main'],
results[site]['url_user'],
str(results[site]['status'].status),
results[site]['http_status'],
response_time_s
]
)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print('Maigret is interrupted.')
sys.exit(1)
+283
View File
@@ -0,0 +1,283 @@
"""Sherlock Notify Module
This module defines the objects for notifying the caller about the
results of queries.
"""
from colorama import Fore, Style, init
from result import QueryStatus
class QueryNotify():
"""Query Notify Object.
Base class that describes methods available to notify the results of
a query.
It is intended that other classes inherit from this base class and
override the methods to implement specific functionality.
"""
def __init__(self, result=None):
"""Create Query Notify Object.
Contains information about a specific method of notifying the results
of a query.
Keyword Arguments:
self -- This object.
result -- Object of type QueryResult() containing
results for this query.
Return Value:
Nothing.
"""
self.result = result
return
def start(self, message=None, id_type='username'):
"""Notify Start.
Notify method for start of query. This method will be called before
any queries are performed. This method will typically be
overridden by higher level classes that will inherit from it.
Keyword Arguments:
self -- This object.
message -- Object that is used to give context to start
of query.
Default is None.
Return Value:
Nothing.
"""
return
def update(self, result):
"""Notify Update.
Notify method for query result. This method will typically be
overridden by higher level classes that will inherit from it.
Keyword Arguments:
self -- This object.
result -- Object of type QueryResult() containing
results for this query.
Return Value:
Nothing.
"""
self.result = result
return
def finish(self, message=None):
"""Notify Finish.
Notify method for finish of query. This method will be called after
all queries have been performed. This method will typically be
overridden by higher level classes that will inherit from it.
Keyword Arguments:
self -- This object.
message -- Object that is used to give context to start
of query.
Default is None.
Return Value:
Nothing.
"""
return
def __str__(self):
"""Convert Object To String.
Keyword Arguments:
self -- This object.
Return Value:
Nicely formatted string to get information about this object.
"""
result = str(self.result)
return result
class QueryNotifyPrint(QueryNotify):
"""Query Notify Print Object.
Query notify class that prints results.
"""
def __init__(self, result=None, verbose=False, print_found_only=False,
skip_check_errors=False, color=True):
"""Create Query Notify Print Object.
Contains information about a specific method of notifying the results
of a query.
Keyword Arguments:
self -- This object.
result -- Object of type QueryResult() containing
results for this query.
verbose -- Boolean indicating whether to give verbose output.
print_found_only -- Boolean indicating whether to only print found sites.
color -- Boolean indicating whether to color terminal output
Return Value:
Nothing.
"""
# Colorama module's initialization.
init(autoreset=True)
super().__init__(result)
self.verbose = verbose
self.print_found_only = print_found_only
self.skip_check_errors = skip_check_errors
self.color = color
return
def start(self, message, id_type):
"""Notify Start.
Will print the title to the standard output.
Keyword Arguments:
self -- This object.
message -- String containing username that the series
of queries are about.
Return Value:
Nothing.
"""
title = f"Checking {id_type}"
if self.color:
print(Style.BRIGHT + Fore.GREEN + "[" +
Fore.YELLOW + "*" +
Fore.GREEN + f"] {title}" +
Fore.WHITE + f" {message}" +
Fore.GREEN + " on:")
else:
print(f"[*] {title} {message} on:")
return
def get_additional_data_text(self, items, prepend=''):
text = ''
for num, item in enumerate(items):
box_symbol = '┣╸' if num != len(items) - 1 else '┗╸'
if type(item) == tuple:
field_name, field_value = item
if field_value.startswith('[\''):
is_last_item = num == len(items) - 1
prepend_symbols = ' ' * 3 if is_last_item else ''
field_value = self.get_additional_data_text(eval(field_value), prepend_symbols)
text += f'\n{prepend}{box_symbol}{field_name}: {field_value}'
else:
text += f'\n{prepend}{box_symbol} {item}'
return text
def update(self, result, is_similar=False):
"""Notify Update.
Will print the query result to the standard output.
Keyword Arguments:
self -- This object.
result -- Object of type QueryResult() containing
results for this query.
Return Value:
Nothing.
"""
self.result = result
if not self.result.ids_data:
ids_data_text = ""
else:
ids_data_text = self.get_additional_data_text(self.result.ids_data.items(), ' ')
def make_colored_terminal_notify(status, text, status_color, text_color, appendix):
text = [
f'{Style.BRIGHT}{Fore.WHITE}[{status_color}{status}{Fore.WHITE}]' +
f'{text_color} {text}: {Style.RESET_ALL}' +
f'{appendix}'
]
return ''.join(text)
def make_simple_terminal_notify(status, text, appendix):
return f'[{status}] {text}: {appendix}'
def make_terminal_notify(is_colored=True, *args):
if is_colored:
return make_colored_terminal_notify(*args)
else:
return make_simple_terminal_notify(*args)
notify = None
# Output to the terminal is desired.
if result.status == QueryStatus.CLAIMED:
color = Fore.BLUE if is_similar else Fore.GREEN
status = '?' if is_similar else '+'
notify = make_terminal_notify(
self.color,
status, result.site_name,
color, color,
result.site_url_user + ids_data_text
)
elif result.status == QueryStatus.AVAILABLE:
if not self.print_found_only:
notify = make_terminal_notify(
self.color,
'-', result.site_name,
Fore.RED, Fore.YELLOW,
'Not found!' + ids_data_text
)
elif result.status == QueryStatus.UNKNOWN:
if not self.skip_check_errors:
notify = make_terminal_notify(
self.color,
'?', result.site_name,
Fore.RED, Fore.RED,
self.result.context + ids_data_text
)
elif result.status == QueryStatus.ILLEGAL:
if not self.print_found_only:
text = 'Illegal Username Format For This Site!'
notify = make_terminal_notify(
self.color,
'-', result.site_name,
Fore.RED, Fore.YELLOW,
text + ids_data_text
)
else:
# It should be impossible to ever get here...
raise ValueError(f"Unknown Query Status '{str(result.status)}' for "
f"site '{self.result.site_name}'")
if notify:
print(notify)
return
def __str__(self):
"""Convert Object To String.
Keyword Arguments:
self -- This object.
Return Value:
Nicely formatted string to get information about this object.
"""
result = str(self.result)
return result
File diff suppressed because it is too large Load Diff
+93
View File
@@ -0,0 +1,93 @@
"""Sherlock Result Module
This module defines various objects for recording the results of queries.
"""
from enum import Enum
class QueryStatus(Enum):
"""Query Status Enumeration.
Describes status of query about a given username.
"""
CLAIMED = "Claimed" # Username Detected
AVAILABLE = "Available" # Username Not Detected
UNKNOWN = "Unknown" # Error Occurred While Trying To Detect Username
ILLEGAL = "Illegal" # Username Not Allowable For This Site
def __str__(self):
"""Convert Object To String.
Keyword Arguments:
self -- This object.
Return Value:
Nicely formatted string to get information about this object.
"""
return self.value
class QueryResult():
"""Query Result Object.
Describes result of query about a given username.
"""
def __init__(self, username, site_name, site_url_user, status, ids_data=None,
query_time=None, context=None):
"""Create Query Result Object.
Contains information about a specific method of detecting usernames on
a given type of web sites.
Keyword Arguments:
self -- This object.
username -- String indicating username that query result
was about.
site_name -- String which identifies site.
site_url_user -- String containing URL for username on site.
NOTE: The site may or may not exist: this
just indicates what the name would
be, if it existed.
status -- Enumeration of type QueryStatus() indicating
the status of the query.
query_time -- Time (in seconds) required to perform query.
Default of None.
context -- String indicating any additional context
about the query. For example, if there was
an error, this might indicate the type of
error that occurred.
Default of None.
ids_data -- Extracted from website page info about other
usernames and inner ids.
Return Value:
Nothing.
"""
self.username = username
self.site_name = site_name
self.site_url_user = site_url_user
self.status = status
self.query_time = query_time
self.context = context
self.ids_data = ids_data
return
def __str__(self):
"""Convert Object To String.
Keyword Arguments:
self -- This object.
Return Value:
Nicely formatted string to get information about this object.
"""
status = str(self.status)
if self.context is not None:
# There is extra context information available about the results.
# Append it to the normal response text.
status += f" ({self.context})"
return status
+246
View File
@@ -0,0 +1,246 @@
"""Sherlock Sites Information Module
This module supports storing information about web sites.
This is the raw data that will be used to search for usernames.
"""
import json
import operator
import sys
import requests
class SiteInformation():
def __init__(self, name, url_home, url_username_format, popularity_rank,
username_claimed, username_unclaimed,
information):
"""Create Site Information Object.
Contains information about a specific web site.
Keyword Arguments:
self -- This object.
name -- String which identifies site.
url_home -- String containing URL for home of site.
url_username_format -- String containing URL for Username format
on site.
NOTE: The string should contain the
token "{}" where the username should
be substituted. For example, a string
of "https://somesite.com/users/{}"
indicates that the individual
usernames would show up under the
"https://somesite.com/users/" area of
the web site.
popularity_rank -- Integer indicating popularity of site.
In general, smaller numbers mean more
popular ("0" or None means ranking
information not available).
username_claimed -- String containing username which is known
to be claimed on web site.
username_unclaimed -- String containing username which is known
to be unclaimed on web site.
information -- Dictionary containing all known information
about web site.
NOTE: Custom information about how to
actually detect the existence of the
username will be included in this
dictionary. This information will
be needed by the detection method,
but it is only recorded in this
object for future use.
Return Value:
Nothing.
"""
self.name = name
self.url_home = url_home
self.url_username_format = url_username_format
if (popularity_rank is None) or (popularity_rank == 0):
# We do not know the popularity, so make site go to bottom of list.
popularity_rank = sys.maxsize
self.popularity_rank = popularity_rank
self.username_claimed = username_claimed
self.username_unclaimed = username_unclaimed
self.information = information
return
def __str__(self):
"""Convert Object To String.
Keyword Arguments:
self -- This object.
Return Value:
Nicely formatted string to get information about this object.
"""
return f"{self.name} ({self.url_home})"
class SitesInformation():
def __init__(self, data_file_path=None):
"""Create Sites Information Object.
Contains information about all supported web sites.
Keyword Arguments:
self -- This object.
data_file_path -- String which indicates path to data file.
The file name must end in ".json".
There are 3 possible formats:
* Absolute File Format
For example, "c:/stuff/data.json".
* Relative File Format
The current working directory is used
as the context.
For example, "data.json".
* URL Format
For example,
"https://example.com/data.json", or
"http://example.com/data.json".
An exception will be thrown if the path
to the data file is not in the expected
format, or if there was any problem loading
the file.
If this option is not specified, then a
default site list will be used.
Return Value:
Nothing.
"""
# Ensure that specified data file has correct extension.
if ".json" != data_file_path[-5:].lower():
raise FileNotFoundError(f"Incorrect JSON file extension for "
f"data file '{data_file_path}'."
)
if (("http://" == data_file_path[:7].lower()) or
("https://" == data_file_path[:8].lower())
):
# Reference is to a URL.
try:
response = requests.get(url=data_file_path)
except Exception as error:
raise FileNotFoundError(f"Problem while attempting to access "
f"data file URL '{data_file_path}': "
f"{str(error)}"
)
if response.status_code == 200:
try:
site_data = response.json()
except Exception as error:
raise ValueError(f"Problem parsing json contents at "
f"'{data_file_path}': {str(error)}."
)
else:
raise FileNotFoundError(f"Bad response while accessing "
f"data file URL '{data_file_path}'."
)
else:
# Reference is to a file.
try:
with open(data_file_path, "r", encoding="utf-8") as file:
try:
data = json.load(file)
site_data = data.get("sites")
engines_data = data.get("engines")
except Exception as error:
raise ValueError(f"Problem parsing json contents at "
f"'{data_file_path}': {str(error)}."
)
except FileNotFoundError as error:
raise FileNotFoundError(f"Problem while attempting to access "
f"data file '{data_file_path}'."
)
self.sites = {}
# Add all of site information from the json file to internal site list.
for site_name in site_data:
try:
site = site_data[site_name]
# If popularity unknown, make site be at bottom of list.
popularity_rank = site.get("rank", sys.maxsize)
if 'engine' in site:
engine_data = engines_data[site['engine']]['site']
site.update(engine_data)
self.sites[site_name] = \
SiteInformation(site_name,
site["urlMain"],
site["url"],
popularity_rank,
site["username_claimed"],
site["username_unclaimed"],
site
)
except KeyError as error:
raise ValueError(f"Problem parsing json contents at "
f"'{data_file_path}': "
f"Missing attribute {str(error)}."
)
return
def site_name_list(self, popularity_rank=False):
"""Get Site Name List.
Keyword Arguments:
self -- This object.
popularity_rank -- Boolean indicating if list should be sorted
by popularity rank.
Default value is False.
NOTE: List is sorted in ascending
alphabetical order is popularity rank
is not requested.
Return Value:
List of strings containing names of sites.
"""
if popularity_rank:
# Sort in ascending popularity rank order.
site_rank_name = \
sorted([(site.popularity_rank, site.name) for site in self],
key=operator.itemgetter(0)
)
site_names = [name for _, name in site_rank_name]
else:
# Sort in ascending alphabetical order.
site_names = sorted([site.name for site in self], key=str.lower)
return site_names
def __iter__(self):
"""Iterator For Object.
Keyword Arguments:
self -- This object.
Return Value:
Iterator for sites object.
"""
for site_name in self.sites:
yield self.sites[site_name]
def __len__(self):
"""Length For Object.
Keyword Arguments:
self -- This object.
Return Value:
Length of sites object.
"""
return len(self.sites)
+4
View File
@@ -0,0 +1,4 @@
"""Sherlock Tests
This package contains various submodules used to run tests.
"""
+297
View File
@@ -0,0 +1,297 @@
"""Sherlock Tests
This module contains various tests.
"""
from tests.base import SherlockBaseTest
import unittest
class SherlockDetectTests(SherlockBaseTest):
def test_detect_true_via_message(self):
"""Test Username Does Exist (Via Message).
This test ensures that the "message" detection mechanism of
ensuring that a Username does exist works properly.
Keyword Arguments:
self -- This object.
Return Value:
N/A.
Will trigger an assert if detection mechanism did not work as expected.
"""
site = 'Instructables'
site_data = self.site_data_all[site]
#Ensure that the site's detection method has not changed.
self.assertEqual("message", site_data["errorType"])
self.username_check([site_data["username_claimed"]],
[site],
exist_check=True
)
return
def test_detect_false_via_message(self):
"""Test Username Does Not Exist (Via Message).
This test ensures that the "message" detection mechanism of
ensuring that a Username does *not* exist works properly.
Keyword Arguments:
self -- This object.
Return Value:
N/A.
Will trigger an assert if detection mechanism did not work as expected.
"""
site = 'Instructables'
site_data = self.site_data_all[site]
#Ensure that the site's detection method has not changed.
self.assertEqual("message", site_data["errorType"])
self.username_check([site_data["username_unclaimed"]],
[site],
exist_check=False
)
return
def test_detect_true_via_status_code(self):
"""Test Username Does Exist (Via Status Code).
This test ensures that the "status code" detection mechanism of
ensuring that a Username does exist works properly.
Keyword Arguments:
self -- This object.
Return Value:
N/A.
Will trigger an assert if detection mechanism did not work as expected.
"""
site = 'Facebook'
site_data = self.site_data_all[site]
#Ensure that the site's detection method has not changed.
self.assertEqual("status_code", site_data["errorType"])
self.username_check([site_data["username_claimed"]],
[site],
exist_check=True
)
return
def test_detect_false_via_status_code(self):
"""Test Username Does Not Exist (Via Status Code).
This test ensures that the "status code" detection mechanism of
ensuring that a Username does *not* exist works properly.
Keyword Arguments:
self -- This object.
Return Value:
N/A.
Will trigger an assert if detection mechanism did not work as expected.
"""
site = 'Facebook'
site_data = self.site_data_all[site]
#Ensure that the site's detection method has not changed.
self.assertEqual("status_code", site_data["errorType"])
self.username_check([site_data["username_unclaimed"]],
[site],
exist_check=False
)
return
def test_detect_true_via_response_url(self):
"""Test Username Does Exist (Via Response URL).
This test ensures that the "response URL" detection mechanism of
ensuring that a Username does exist works properly.
Keyword Arguments:
self -- This object.
Return Value:
N/A.
Will trigger an assert if detection mechanism did not work as expected.
"""
site = 'Quora'
site_data = self.site_data_all[site]
#Ensure that the site's detection method has not changed.
self.assertEqual("response_url", site_data["errorType"])
self.username_check([site_data["username_claimed"]],
[site],
exist_check=True
)
return
def test_detect_false_via_response_url(self):
"""Test Username Does Not Exist (Via Response URL).
This test ensures that the "response URL" detection mechanism of
ensuring that a Username does *not* exist works properly.
Keyword Arguments:
self -- This object.
Return Value:
N/A.
Will trigger an assert if detection mechanism did not work as expected.
"""
site = 'Quora'
site_data = self.site_data_all[site]
#Ensure that the site's detection method has not changed.
self.assertEqual("response_url", site_data["errorType"])
self.username_check([site_data["username_unclaimed"]],
[site],
exist_check=False
)
return
class SherlockSiteCoverageTests(SherlockBaseTest):
def test_coverage_false_via_response_url(self):
"""Test Username Does Not Exist Site Coverage (Via Response URL).
This test checks all sites with the "response URL" detection mechanism
to ensure that a Username that does not exist is reported that way.
Keyword Arguments:
self -- This object.
Return Value:
N/A.
Will trigger an assert if detection mechanism did not work as expected.
"""
self.detect_type_check("response_url", exist_check=False)
return
def test_coverage_true_via_response_url(self):
"""Test Username Does Exist Site Coverage (Via Response URL).
This test checks all sites with the "response URL" detection mechanism
to ensure that a Username that does exist is reported that way.
Keyword Arguments:
self -- This object.
Return Value:
N/A.
Will trigger an assert if detection mechanism did not work as expected.
"""
self.detect_type_check("response_url", exist_check=True)
return
def test_coverage_false_via_status(self):
"""Test Username Does Not Exist Site Coverage (Via HTTP Status).
This test checks all sites with the "HTTP Status" detection mechanism
to ensure that a Username that does not exist is reported that way.
Keyword Arguments:
self -- This object.
Return Value:
N/A.
Will trigger an assert if detection mechanism did not work as expected.
"""
self.detect_type_check("status_code", exist_check=False)
return
def test_coverage_true_via_status(self):
"""Test Username Does Exist Site Coverage (Via HTTP Status).
This test checks all sites with the "HTTP Status" detection mechanism
to ensure that a Username that does exist is reported that way.
Keyword Arguments:
self -- This object.
Return Value:
N/A.
Will trigger an assert if detection mechanism did not work as expected.
"""
self.detect_type_check("status_code", exist_check=True)
return
def test_coverage_false_via_message(self):
"""Test Username Does Not Exist Site Coverage (Via Error Message).
This test checks all sites with the "Error Message" detection mechanism
to ensure that a Username that does not exist is reported that way.
Keyword Arguments:
self -- This object.
Return Value:
N/A.
Will trigger an assert if detection mechanism did not work as expected.
"""
self.detect_type_check("message", exist_check=False)
return
def test_coverage_true_via_message(self):
"""Test Username Does Exist Site Coverage (Via Error Message).
This test checks all sites with the "Error Message" detection mechanism
to ensure that a Username that does exist is reported that way.
Keyword Arguments:
self -- This object.
Return Value:
N/A.
Will trigger an assert if detection mechanism did not work as expected.
"""
self.detect_type_check("message", exist_check=True)
return
def test_coverage_total(self):
"""Test Site Coverage Is Total.
This test checks that all sites have test data available.
Keyword Arguments:
self -- This object.
Return Value:
N/A.
Will trigger an assert if we do not have total coverage.
"""
self.coverage_total_check()
return
+228
View File
@@ -0,0 +1,228 @@
"""Sherlock Base Tests
This module contains various utilities for running tests.
"""
import os
import os.path
import unittest
import maigret
from result import QueryStatus
from result import QueryResult
from notify import QueryNotify
from sites import SitesInformation
import warnings
class SherlockBaseTest(unittest.TestCase):
def setUp(self):
"""Sherlock Base Test Setup.
Does common setup tasks for base Sherlock tests.
Keyword Arguments:
self -- This object.
Return Value:
N/A.
"""
#This ignores the ResourceWarning from an unclosed SSLSocket.
#TODO: Figure out how to fix the code so this is not needed.
warnings.simplefilter("ignore", ResourceWarning)
#Create object with all information about sites we are aware of.
sites = SitesInformation()
#Create original dictionary from SitesInformation() object.
#Eventually, the rest of the code will be updated to use the new object
#directly, but this will glue the two pieces together.
site_data_all = {}
for site in sites:
site_data_all[site.name] = site.information
self.site_data_all = site_data_all
# Load excluded sites list, if any
excluded_sites_path = os.path.join(os.path.dirname(os.path.realpath(maigret.__file__)), "tests/.excluded_sites")
try:
with open(excluded_sites_path, "r", encoding="utf-8") as excluded_sites_file:
self.excluded_sites = excluded_sites_file.read().splitlines()
except FileNotFoundError:
self.excluded_sites = []
#Create notify object for query results.
self.query_notify = QueryNotify()
self.tor=False
self.unique_tor=False
self.timeout=None
self.skip_error_sites=True
return
def site_data_filter(self, site_list):
"""Filter Site Data.
Keyword Arguments:
self -- This object.
site_list -- List of strings corresponding to sites which
should be filtered.
Return Value:
Dictionary containing sub-set of site data specified by 'site_list'.
"""
# Create new dictionary that has filtered site data based on input.
# Note that any site specified which is not understood will generate
# an error.
site_data = {}
for site in site_list:
with self.subTest(f"Checking test vector Site '{site}' "
f"exists in total site data."
):
site_data[site] = self.site_data_all[site]
return site_data
def username_check(self, username_list, site_list, exist_check=True):
"""Username Exist Check.
Keyword Arguments:
self -- This object.
username_list -- List of strings corresponding to usernames
which should exist on *all* of the sites.
site_list -- List of strings corresponding to sites which
should be filtered.
exist_check -- Boolean which indicates if this should be
a check for Username existence,
or non-existence.
Return Value:
N/A.
Will trigger an assert if Username does not have the expected
existence state.
"""
#Filter all site data down to just what is needed for this test.
site_data = self.site_data_filter(site_list)
if exist_check:
check_type_text = "claimed"
exist_result_desired = QueryStatus.CLAIMED
else:
check_type_text = "available"
exist_result_desired = QueryStatus.AVAILABLE
for username in username_list:
results = maigret.sherlock(username,
site_data,
self.query_notify,
tor=self.tor,
unique_tor=self.unique_tor,
timeout=self.timeout
)
for site, result in results.items():
with self.subTest(f"Checking Username '{username}' "
f"{check_type_text} on Site '{site}'"
):
if (
(self.skip_error_sites == True) and
(result['status'].status == QueryStatus.UNKNOWN)
):
#Some error connecting to site.
self.skipTest(f"Skipping Username '{username}' "
f"{check_type_text} on Site '{site}': "
f"Site returned error status."
)
self.assertEqual(exist_result_desired,
result['status'].status)
return
def detect_type_check(self, detect_type, exist_check=True):
"""Username Exist Check.
Keyword Arguments:
self -- This object.
detect_type -- String corresponding to detection algorithm
which is desired to be tested.
Note that only sites which have documented
usernames which exist and do not exist
will be tested.
exist_check -- Boolean which indicates if this should be
a check for Username existence,
or non-existence.
Return Value:
N/A.
Runs tests on all sites using the indicated detection algorithm
and which also has test vectors specified.
Will trigger an assert if Username does not have the expected
existence state.
"""
#Dictionary of sites that should be tested for having a username.
#This will allow us to test sites with a common username in parallel.
sites_by_username = {}
for site, site_data in self.site_data_all.items():
if (
(site in self.excluded_sites) or
(site_data["errorType"] != detect_type) or
(site_data.get("username_claimed") is None) or
(site_data.get("username_unclaimed") is None)
):
# This is either not a site we are interested in, or the
# site does not contain the required information to do
# the tests.
pass
else:
# We should run a test on this site.
# Figure out which type of user
if exist_check:
username = site_data.get("username_claimed")
else:
username = site_data.get("username_unclaimed")
# Add this site to the list of sites corresponding to this
# username.
if username in sites_by_username:
sites_by_username[username].append(site)
else:
sites_by_username[username] = [site]
# Check on the username availability against all of the sites.
for username, site_list in sites_by_username.items():
self.username_check([username],
site_list,
exist_check=exist_check
)
return
def coverage_total_check(self):
"""Total Coverage Check.
Keyword Arguments:
self -- This object.
Return Value:
N/A.
Counts up all Sites with full test data available.
Will trigger an assert if any Site does not have test coverage.
"""
site_no_tests_list = []
for site, site_data in self.site_data_all.items():
if (
(site_data.get("username_claimed") is None) or
(site_data.get("username_unclaimed") is None)
):
# Test information not available on this site.
site_no_tests_list.append(site)
self.assertEqual("", ", ".join(site_no_tests_list))
return
+14
View File
@@ -0,0 +1,14 @@
beautifulsoup4>=4.8.0
bs4>=0.0.1
certifi>=2019.6.16
colorama>=0.4.1
lxml>=4.4.0
PySocks>=1.7.0
requests>=2.22.0
requests-futures>=1.0.0
soupsieve>=1.9.2
stem>=1.8.0
torrequest>=0.1.0
git+https://github.com/soxoj/socid_extractor
aiohttp==3.5.4
mock==4.0.2
+1383
View File
File diff suppressed because it is too large Load Diff
Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

+90
View File
@@ -0,0 +1,90 @@
## Demo with page parsing and recursive username search
```bash
python3 maigret --ids --print-found --skip-errors alexaimephotographycars
[*] Checking username alexaimephotographycars on:
[+] 500px: https://500px.com/p/alexaimephotographycars
┣╸uid: dXJpOm5vZGU6VXNlcjoyNjQwMzQxNQ==
┣╸legacy_id: 26403415
┣╸username: alexaimephotographycars
┣╸name: Alex Aimé
┣╸website: www.flickr.com/photos/alexaimephotography/
┣╸facebook_link: www.instagram.com/street.reality.photography/
┣╸instagram_username: alexaimephotography
┗╸twitter_username: Alexaimephotogr
[*] Checking username alexaimephotography on:
[+] DeviantART: https://alexaimephotography.deviantart.com
┣╸country: France
┣╸registered_for_seconds: 55040868
┣╸gender: male
┣╸username: Alexaimephotography
┣╸twitter_username: alexaimephotogr
┣╸website: www.instagram.com/alexaimephotography/
┗╸links:
┗╸ https://www.instagram.com/alexaimephotography/
[+] EyeEm: https://www.eyeem.com/u/alexaimephotography
┣╸eyeem_id: 21974802
┣╸eyeem_username: alexaimephotography
┣╸fullname: Alex
┣╸followers: 10
┣╸friends: 2
┣╸liked_photos: 37
┣╸photos: 10
┗╸facebook_uid: 1534915183474093
[+] Facebook: https://www.facebook.com/alexaimephotography
[+] Gramho: https://gramho.com/explore-hashtag/alexaimephotography
[+] Instagram: https://www.instagram.com/alexaimephotography
┣╸username: alexaimephotography
┣╸full_name: Alexaimephotography
┣╸id: 6828488620
┣╸biography: 🇮🇹 🇲🇫 🇩🇪
Amateur photographer
Follow me @street.reality.photography
Sony A7ii
┗╸external_url: https://www.flickr.com/photos/alexaimephotography2020/
[+] Picuki: https://www.picuki.com/profile/alexaimephotography
[+] Pinterest: https://www.pinterest.com/alexaimephotography/
┣╸pinterest_username: alexaimephotography
┣╸fullname: alexaimephotography
┣╸image: https://s.pinimg.com/images/user/default_280.png
┣╸board_count: 3
┣╸pin_count: 4
┣╸country: FR
┣╸follower_count: 0
┣╸following_count: 1
┣╸is_website_verified: False
┣╸is_indexed: True
┣╸is_verified_merchant: False
┗╸locale: fr
[+] Reddit: https://www.reddit.com/user/alexaimephotography
┣╸reddit_id: t5_1nytpy
┣╸reddit_username: alexaimephotography
┣╸display_name: alexaimephotography
┣╸is_employee: False
┣╸is_nsfw: False
┣╸is_mod: True
┣╸is_following: True
┣╸has_user_profile: True
┣╸hide_from_robots: False
┣╸created_utc: 1562750403
┣╸total_karma: 43075
┗╸post_karma: 42574
[+] Tumblr: https://alexaimephotography.tumblr.com/
[+] VK: https://vk.com/alexaimephotography
[+] Vimeo: https://vimeo.com/alexaimephotography
┣╸uid: 75857717
┣╸name: AlexAimePhotography
┣╸username: alexaimephotography
┣╸location: France
┣╸created_at: 2017-12-06 06:49:28
┣╸is_staff: False
┗╸links:
┣╸ https://500px.com/alexaimephotography
┣╸ https://www.flickr.com/photos/photoambiance/
┣╸ https://www.instagram.com/alexaimephotography/
┣╸ https://www.youtube.com/channel/UC4NiYV3Yqih2WHcwKg4uPuQ
┗╸ https://flii.by/alexaimephotography/
[+] We Heart It: https://weheartit.com/alexaimephotography
[*] Checking username Alexaimephotogr on:
[+] Twitter: https://twitter.com/Alexaimephotogr
```
File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 44 KiB

+126
View File
@@ -0,0 +1,126 @@
#!/usr/bin/env python3
"""Maigret: Supported Site Listing with Alexa ranking and country tags
This module generates the listing of supported sites in file `SITES.md`
and pretty prints file with sites data.
"""
import json
import sys
import requests
import logging
import threading
import xml.etree.ElementTree as ET
from datetime import datetime
from argparse import ArgumentParser, RawDescriptionHelpFormatter
RANKS = {str(i):str(i) for i in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 50, 100, 500]}
RANKS.update({
'1000': '1K',
'5000': '5K',
'10000': '10K',
'100000': '100K',
'10000000': '1M',
'50000000': '10M',
})
def get_rank(domain_to_query, dest, print_errors=True):
#Retrieve ranking data via alexa API
url = f"http://data.alexa.com/data?cli=10&url={domain_to_query}"
xml_data = requests.get(url).text
root = ET.fromstring(xml_data)
try:
#Get ranking for this site.
dest['rank'] = int(root.find('.//REACH').attrib['RANK'])
country = root.find('.//COUNTRY')
if not country is None and country.attrib:
country_code = country.attrib['CODE']
tags = set(dest.get('tags', []))
if country_code:
tags.add(country_code.lower())
dest['tags'] = sorted(list(tags))
if 'type' in dest and dest['type'] != 'username':
dest['disabled'] = False
except Exception as e:
if print_errors:
logging.error(e)
# We did not find the rank for some reason.
print(f"Error retrieving rank information for '{domain_to_query}'")
print(f" Returned XML is |{xml_data}|")
return
def get_step_rank(rank):
def get_readable_rank(r):
return RANKS[str(r)]
valid_step_ranks = sorted(map(int, RANKS.keys()))
if rank == 0:
return get_readable_rank(valid_step_ranks[-1])
else:
return get_readable_rank(list(filter(lambda x: x >= rank, valid_step_ranks))[0])
if __name__ == '__main__':
parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter
)
parser.add_argument("--base","-b", metavar="BASE_FILE",
dest="base_file", default="maigret/resources/data.json",
help="JSON file with sites data to update.")
pool = list()
args = parser.parse_args()
with open(args.base_file, "r", encoding="utf-8") as data_file:
sites_info = json.load(data_file)
data = sites_info['sites']
engines = sites_info['engines']
with open("sites.md", "w") as site_file:
data_length = len(data)
site_file.write(f"""
## List of supported sites: total {data_length}\n
Rank data fetched from Alexa by domains.
""")
for social_network in data:
url_main = data.get(social_network).get("urlMain")
data.get(social_network)["rank"] = 0
th = threading.Thread(target=get_rank, args=(url_main, data.get(social_network)))
pool.append((social_network, url_main, th))
th.start()
index = 1
for social_network, url_main, th in pool:
th.join()
sys.stdout.write("\r{0}".format(f"Updated {index} out of {data_length} entries"))
sys.stdout.flush()
index = index + 1
sites_full_list = [(site, site_data['rank']) for site, site_data in data.items()]
sites_full_list.sort(reverse=False, key=lambda x: x[1])
while sites_full_list[0][1] == 0:
site = sites_full_list.pop(0)
sites_full_list.append(site)
for num, site_tuple in enumerate(sites_full_list):
site, rank = site_tuple
url_main = data[site]['urlMain']
valid_rank = get_step_rank(rank)
all_tags = data[site].get('tags', [])
tags = ', ' + ', '.join(all_tags) if all_tags else ''
note = ''
if data[site].get('disabled'):
note = ', search is disabled'
site_file.write(f'1. [{site}]({url_main})*: top {valid_rank}{tags}*{note}\n')
site_file.write(f'\nAlexa.com rank data fetched at ({datetime.utcnow()} UTC)\n')
sorted_json_data = json.dumps({'sites': data, 'engines': engines}, indent=2, sort_keys=True)
with open(args.base_file, "w") as data_file:
data_file.write(sorted_json_data)
print("\nFinished updating supported site listing!")