mirror of
https://github.com/soxoj/maigret.git
synced 2026-05-09 08:04:32 +00:00
Compare commits
33 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 9b7f36dc24 | |||
| 05167ad30c | |||
| cee6f0aa43 | |||
| 02cf330e37 | |||
| 5c8f7a3af0 | |||
| 13e1b6f4d1 | |||
| 5179cb56eb | |||
| 1a2c7e944a | |||
| f7eae046a1 | |||
| bdff08cb70 | |||
| a468cb1cd3 | |||
| 0fe933e8a1 | |||
| 5c3de91181 | |||
| 3356463102 | |||
| 7ac03cf5ca | |||
| 4aeacef07d | |||
| 8de1830cf3 | |||
| ba6169659e | |||
| 4a5c5c3f07 | |||
| 4ba7fcb1ff | |||
| a76f95858f | |||
| bea900dda0 | |||
| bb1bde833d | |||
| 5b405c6abb | |||
| 99fa58ceed | |||
| c71e404f63 | |||
| 2c04ccce57 | |||
| 435db7cdc9 | |||
| 413a0502a4 | |||
| 2aedcc3166 | |||
| 28835204f5 | |||
| b11a247dfd | |||
| c9219d91ec |
@@ -26,7 +26,7 @@ jobs:
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
python -m pip install flake8 pytest pytest-rerunfailures
|
||||
python -m pip install -r test-requirements.txt
|
||||
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
|
||||
- name: Test with pytest
|
||||
run: |
|
||||
|
||||
@@ -2,6 +2,16 @@
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.3.0] - 2021-06-02
|
||||
* added support of Tor and I2P sites
|
||||
* added experimental DNS checking feature
|
||||
* implemented sorting by data points for reports
|
||||
* reports fixes
|
||||
|
||||
## [0.2.4] - 2021-05-18
|
||||
* cli output report
|
||||
* various improvements
|
||||
|
||||
## [0.2.3] - 2021-05-12
|
||||
* added Yelp and yelp_userid support
|
||||
* tags markup stabilization
|
||||
|
||||
@@ -8,12 +8,6 @@
|
||||
<a href="https://pypi.org/project/maigret/">
|
||||
<img alt="PyPI - Downloads" src="https://img.shields.io/pypi/dw/maigret?style=flat-square">
|
||||
</a>
|
||||
<a href="https://gitter.im/maigret-osint/community">
|
||||
<img alt="Chat - Gitter" src="./static/chat_gitter.svg" />
|
||||
</a>
|
||||
<a href="https://twitter.com/intent/follow?screen_name=sox0j">
|
||||
<img src="https://img.shields.io/twitter/follow/sox0j?label=Follow%20sox0j&style=social&color=blue" alt="Follow @sox0j" />
|
||||
</a>
|
||||
</p>
|
||||
<p align="center">
|
||||
<img src="./static/maigret.png" height="200"/>
|
||||
@@ -24,9 +18,9 @@
|
||||
|
||||
## About
|
||||
|
||||
**Maigret** collect a dossier on a person **by username only**, checking for accounts on a huge number of sites and gathering all the available information from web pages. Maigret is an easy-to-use and powerful fork of [Sherlock](https://github.com/sherlock-project/sherlock).
|
||||
**Maigret** collect a dossier on a person **by username only**, checking for accounts on a huge number of sites and gathering all the available information from web pages. No API keys required. Maigret is an easy-to-use and powerful fork of [Sherlock](https://github.com/sherlock-project/sherlock).
|
||||
|
||||
Currently supported more than 2000 sites ([full list](./sites.md)), search is launched against 500 popular sites in descending order of popularity by default.
|
||||
Currently supported more than 2000 sites ([full list](./sites.md)), search is launched against 500 popular sites in descending order of popularity by default. Also supported checking of Tor sites, I2P sites, and domains (via DNS resolving).
|
||||
|
||||
## Main features
|
||||
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
"""Maigret version file"""
|
||||
|
||||
__version__ = '0.2.3'
|
||||
__version__ = '0.3.0'
|
||||
|
||||
@@ -35,7 +35,7 @@ class ParsingActivator:
|
||||
site.headers["authorization"] = f"Bearer {bearer_token}"
|
||||
|
||||
|
||||
async def import_aiohttp_cookies(cookiestxt_filename):
|
||||
def import_aiohttp_cookies(cookiestxt_filename):
|
||||
cookies_obj = MozillaCookieJar(cookiestxt_filename)
|
||||
cookies_obj.load(ignore_discard=True, ignore_expires=True)
|
||||
|
||||
|
||||
+208
-66
@@ -9,6 +9,7 @@ from typing import Tuple, Optional, Dict, List
|
||||
from urllib.parse import quote
|
||||
|
||||
import aiohttp
|
||||
import aiodns
|
||||
import tqdm.asyncio
|
||||
from aiohttp_socks import ProxyConnector
|
||||
from python_socks import _errors as proxy_errors
|
||||
@@ -26,7 +27,7 @@ from .executors import (
|
||||
from .result import QueryResult, QueryStatus
|
||||
from .sites import MaigretDatabase, MaigretSite
|
||||
from .types import QueryOptions, QueryResultWrapper
|
||||
from .utils import get_random_user_agent
|
||||
from .utils import get_random_user_agent, ascii_data_display
|
||||
|
||||
|
||||
SUPPORTED_IDS = (
|
||||
@@ -43,49 +44,142 @@ SUPPORTED_IDS = (
|
||||
BAD_CHARS = "#"
|
||||
|
||||
|
||||
async def get_response(request_future, logger) -> Tuple[str, int, Optional[CheckError]]:
|
||||
html_text = None
|
||||
status_code = 0
|
||||
error: Optional[CheckError] = CheckError("Unknown")
|
||||
class CheckerBase:
|
||||
pass
|
||||
|
||||
try:
|
||||
response = await request_future
|
||||
|
||||
status_code = response.status
|
||||
response_content = await response.content.read()
|
||||
charset = response.charset or "utf-8"
|
||||
decoded_content = response_content.decode(charset, "ignore")
|
||||
html_text = decoded_content
|
||||
class SimpleAiohttpChecker(CheckerBase):
|
||||
def __init__(self, *args, **kwargs):
|
||||
proxy = kwargs.get('proxy')
|
||||
cookie_jar = kwargs.get('cookie_jar')
|
||||
self.logger = kwargs.get('logger', Mock())
|
||||
|
||||
error = None
|
||||
if status_code == 0:
|
||||
error = CheckError("Connection lost")
|
||||
# make http client session
|
||||
connector = (
|
||||
ProxyConnector.from_url(proxy) if proxy else aiohttp.TCPConnector(ssl=False)
|
||||
)
|
||||
connector.verify_ssl = False
|
||||
self.session = aiohttp.ClientSession(
|
||||
connector=connector, trust_env=True, cookie_jar=cookie_jar
|
||||
)
|
||||
|
||||
logger.debug(html_text)
|
||||
|
||||
except asyncio.TimeoutError as e:
|
||||
error = CheckError("Request timeout", str(e))
|
||||
except ClientConnectorError as e:
|
||||
error = CheckError("Connecting failure", str(e))
|
||||
except ServerDisconnectedError as e:
|
||||
error = CheckError("Server disconnected", str(e))
|
||||
except aiohttp.http_exceptions.BadHttpMessage as e:
|
||||
error = CheckError("HTTP", str(e))
|
||||
except proxy_errors.ProxyError as e:
|
||||
error = CheckError("Proxy", str(e))
|
||||
except KeyboardInterrupt:
|
||||
error = CheckError("Interrupted")
|
||||
except Exception as e:
|
||||
# python-specific exceptions
|
||||
if sys.version_info.minor > 6 and (
|
||||
isinstance(e, ssl.SSLCertVerificationError) or isinstance(e, ssl.SSLError)
|
||||
):
|
||||
error = CheckError("SSL", str(e))
|
||||
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get'):
|
||||
if method == 'get':
|
||||
request_method = self.session.get
|
||||
else:
|
||||
logger.debug(e, exc_info=True)
|
||||
error = CheckError("Unexpected", str(e))
|
||||
request_method = self.session.head
|
||||
|
||||
return str(html_text), status_code, error
|
||||
future = request_method(
|
||||
url=url,
|
||||
headers=headers,
|
||||
allow_redirects=allow_redirects,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
return future
|
||||
|
||||
async def close(self):
|
||||
await self.session.close()
|
||||
|
||||
async def check(self, future) -> Tuple[str, int, Optional[CheckError]]:
|
||||
html_text = None
|
||||
status_code = 0
|
||||
error: Optional[CheckError] = CheckError("Unknown")
|
||||
|
||||
try:
|
||||
response = await future
|
||||
|
||||
status_code = response.status
|
||||
response_content = await response.content.read()
|
||||
charset = response.charset or "utf-8"
|
||||
decoded_content = response_content.decode(charset, "ignore")
|
||||
html_text = decoded_content
|
||||
|
||||
error = None
|
||||
if status_code == 0:
|
||||
error = CheckError("Connection lost")
|
||||
|
||||
self.logger.debug(html_text)
|
||||
|
||||
except asyncio.TimeoutError as e:
|
||||
error = CheckError("Request timeout", str(e))
|
||||
except ClientConnectorError as e:
|
||||
error = CheckError("Connecting failure", str(e))
|
||||
except ServerDisconnectedError as e:
|
||||
error = CheckError("Server disconnected", str(e))
|
||||
except aiohttp.http_exceptions.BadHttpMessage as e:
|
||||
error = CheckError("HTTP", str(e))
|
||||
except proxy_errors.ProxyError as e:
|
||||
error = CheckError("Proxy", str(e))
|
||||
except KeyboardInterrupt:
|
||||
error = CheckError("Interrupted")
|
||||
except Exception as e:
|
||||
# python-specific exceptions
|
||||
if sys.version_info.minor > 6 and (
|
||||
isinstance(e, ssl.SSLCertVerificationError)
|
||||
or isinstance(e, ssl.SSLError)
|
||||
):
|
||||
error = CheckError("SSL", str(e))
|
||||
else:
|
||||
self.logger.debug(e, exc_info=True)
|
||||
error = CheckError("Unexpected", str(e))
|
||||
|
||||
return str(html_text), status_code, error
|
||||
|
||||
|
||||
class ProxiedAiohttpChecker(SimpleAiohttpChecker):
|
||||
def __init__(self, *args, **kwargs):
|
||||
proxy = kwargs.get('proxy')
|
||||
cookie_jar = kwargs.get('cookie_jar')
|
||||
self.logger = kwargs.get('logger', Mock())
|
||||
|
||||
connector = ProxyConnector.from_url(proxy)
|
||||
connector.verify_ssl = False
|
||||
self.session = aiohttp.ClientSession(
|
||||
connector=connector, trust_env=True, cookie_jar=cookie_jar
|
||||
)
|
||||
|
||||
|
||||
class AiodnsDomainResolver(CheckerBase):
|
||||
def __init__(self, *args, **kwargs):
|
||||
loop = asyncio.get_event_loop()
|
||||
self.logger = kwargs.get('logger', Mock())
|
||||
self.resolver = aiodns.DNSResolver(loop=loop)
|
||||
|
||||
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get'):
|
||||
return self.resolver.query(url, 'A')
|
||||
|
||||
async def check(self, future) -> Tuple[str, int, Optional[CheckError]]:
|
||||
status = 404
|
||||
error = None
|
||||
text = ''
|
||||
|
||||
try:
|
||||
res = await future
|
||||
text = str(res[0].host)
|
||||
status = 200
|
||||
except aiodns.error.DNSError:
|
||||
pass
|
||||
except Exception as e:
|
||||
self.logger.error(e, exc_info=True)
|
||||
error = CheckError('DNS resolve error', str(e))
|
||||
|
||||
return text, status, error
|
||||
|
||||
|
||||
class CheckerMock:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get'):
|
||||
return None
|
||||
|
||||
async def check(self, future) -> Tuple[str, int, Optional[CheckError]]:
|
||||
await asyncio.sleep(0)
|
||||
return '', 0, None
|
||||
|
||||
async def close(self):
|
||||
return
|
||||
|
||||
|
||||
# TODO: move to separate class
|
||||
@@ -233,9 +327,9 @@ def process_site_result(
|
||||
result = build_result(QueryStatus.CLAIMED)
|
||||
else:
|
||||
result = build_result(QueryStatus.AVAILABLE)
|
||||
elif check_type == "status_code":
|
||||
elif check_type in "status_code":
|
||||
# Checks if the status code of the response is 2XX
|
||||
if is_presense_detected and (not status_code >= 300 or status_code < 200):
|
||||
if 200 <= status_code < 300:
|
||||
result = build_result(QueryStatus.CLAIMED)
|
||||
else:
|
||||
result = build_result(QueryStatus.AVAILABLE)
|
||||
@@ -272,7 +366,7 @@ def process_site_result(
|
||||
new_usernames[v] = k
|
||||
|
||||
results_info["ids_usernames"] = new_usernames
|
||||
links = eval(extracted_ids_data.get("links", "[]"))
|
||||
links = ascii_data_display(extracted_ids_data.get("links", "[]"))
|
||||
if "website" in extracted_ids_data:
|
||||
links.append(extracted_ids_data["website"])
|
||||
results_info["ids_links"] = links
|
||||
@@ -322,7 +416,8 @@ def make_site_result(
|
||||
# workaround to prevent slash errors
|
||||
url = re.sub("(?<!:)/+", "/", url)
|
||||
|
||||
session = options['session']
|
||||
# always clearweb_checker for now
|
||||
checker = options["checkers"][site.protocol]
|
||||
|
||||
# site check is disabled
|
||||
if site.disabled and not options['forced']:
|
||||
@@ -381,12 +476,12 @@ def make_site_result(
|
||||
# In most cases when we are detecting by status code,
|
||||
# it is not necessary to get the entire body: we can
|
||||
# detect fine with just the HEAD response.
|
||||
request_method = session.head
|
||||
request_method = 'head'
|
||||
else:
|
||||
# Either this detect method needs the content associated
|
||||
# with the GET response, or this specific website will
|
||||
# not respond properly unless we request the whole page.
|
||||
request_method = session.get
|
||||
request_method = 'get'
|
||||
|
||||
if site.check_type == "response_url":
|
||||
# Site forwards request to a different URL if username not
|
||||
@@ -398,7 +493,8 @@ def make_site_result(
|
||||
# The final result of the request will be what is available.
|
||||
allow_redirects = True
|
||||
|
||||
future = request_method(
|
||||
future = checker.prepare(
|
||||
method=request_method,
|
||||
url=url_probe,
|
||||
headers=headers,
|
||||
allow_redirects=allow_redirects,
|
||||
@@ -407,6 +503,7 @@ def make_site_result(
|
||||
|
||||
# Store future request object in the results object
|
||||
results_site["future"] = future
|
||||
results_site["checker"] = checker
|
||||
|
||||
return results_site
|
||||
|
||||
@@ -419,7 +516,9 @@ async def check_site_for_username(
|
||||
if not future:
|
||||
return site.name, default_result
|
||||
|
||||
response = await get_response(request_future=future, logger=logger)
|
||||
checker = default_result["checker"]
|
||||
|
||||
response = await checker.check(future=future)
|
||||
|
||||
response_result = process_site_result(
|
||||
response, query_notify, logger, default_result, site
|
||||
@@ -430,9 +529,9 @@ async def check_site_for_username(
|
||||
return site.name, response_result
|
||||
|
||||
|
||||
async def debug_ip_request(session, logger):
|
||||
future = session.get(url="https://icanhazip.com")
|
||||
ip, status, check_error = await get_response(future, logger)
|
||||
async def debug_ip_request(checker, logger):
|
||||
future = checker.prepare(url="https://icanhazip.com")
|
||||
ip, status, check_error = await checker.check(future)
|
||||
if ip:
|
||||
logger.debug(f"My IP is: {ip.strip()}")
|
||||
else:
|
||||
@@ -456,7 +555,9 @@ async def maigret(
|
||||
logger,
|
||||
query_notify=None,
|
||||
proxy=None,
|
||||
timeout=None,
|
||||
tor_proxy=None,
|
||||
i2p_proxy=None,
|
||||
timeout=3,
|
||||
is_parsing_enabled=False,
|
||||
id_type="username",
|
||||
debug=False,
|
||||
@@ -465,6 +566,7 @@ async def maigret(
|
||||
no_progressbar=False,
|
||||
cookies=None,
|
||||
retries=0,
|
||||
check_domains=False,
|
||||
) -> QueryResultWrapper:
|
||||
"""Main search func
|
||||
|
||||
@@ -478,7 +580,7 @@ async def maigret(
|
||||
query results.
|
||||
logger -- Standard Python logger object.
|
||||
timeout -- Time in seconds to wait before timing out request.
|
||||
Default is no timeout.
|
||||
Default is 3 seconds.
|
||||
is_parsing_enabled -- Extract additional info from account pages.
|
||||
id_type -- Type of username to search.
|
||||
Default is 'username', see all supported here:
|
||||
@@ -508,23 +610,36 @@ async def maigret(
|
||||
|
||||
query_notify.start(username, id_type)
|
||||
|
||||
# make http client session
|
||||
connector = (
|
||||
ProxyConnector.from_url(proxy) if proxy else aiohttp.TCPConnector(ssl=False)
|
||||
)
|
||||
connector.verify_ssl = False
|
||||
|
||||
cookie_jar = None
|
||||
if cookies:
|
||||
logger.debug(f"Using cookies jar file {cookies}")
|
||||
cookie_jar = await import_aiohttp_cookies(cookies)
|
||||
cookie_jar = import_aiohttp_cookies(cookies)
|
||||
|
||||
session = aiohttp.ClientSession(
|
||||
connector=connector, trust_env=True, cookie_jar=cookie_jar
|
||||
clearweb_checker = SimpleAiohttpChecker(
|
||||
proxy=proxy, cookie_jar=cookie_jar, logger=logger
|
||||
)
|
||||
|
||||
# TODO
|
||||
tor_checker = CheckerMock()
|
||||
if tor_proxy:
|
||||
tor_checker = ProxiedAiohttpChecker( # type: ignore
|
||||
proxy=tor_proxy, cookie_jar=cookie_jar, logger=logger
|
||||
)
|
||||
|
||||
# TODO
|
||||
i2p_checker = CheckerMock()
|
||||
if i2p_proxy:
|
||||
i2p_checker = ProxiedAiohttpChecker( # type: ignore
|
||||
proxy=i2p_proxy, cookie_jar=cookie_jar, logger=logger
|
||||
)
|
||||
|
||||
# TODO
|
||||
dns_checker = CheckerMock()
|
||||
if check_domains:
|
||||
dns_checker = AiodnsDomainResolver(logger=logger) # type: ignore
|
||||
|
||||
if logger.level == logging.DEBUG:
|
||||
await debug_ip_request(session, logger)
|
||||
await debug_ip_request(clearweb_checker, logger)
|
||||
|
||||
# setup parallel executor
|
||||
executor: Optional[AsyncExecutor] = None
|
||||
@@ -538,7 +653,12 @@ async def maigret(
|
||||
# make options objects for all the requests
|
||||
options: QueryOptions = {}
|
||||
options["cookies"] = cookie_jar
|
||||
options["session"] = session
|
||||
options["checkers"] = {
|
||||
'': clearweb_checker,
|
||||
'tor': tor_checker,
|
||||
'dns': dns_checker,
|
||||
'i2p': i2p_checker,
|
||||
}
|
||||
options["parsing"] = is_parsing_enabled
|
||||
options["timeout"] = timeout
|
||||
options["id_type"] = id_type
|
||||
@@ -591,7 +711,11 @@ async def maigret(
|
||||
)
|
||||
|
||||
# closing http client session
|
||||
await session.close()
|
||||
await clearweb_checker.close()
|
||||
if tor_proxy:
|
||||
await tor_checker.close()
|
||||
if i2p_proxy:
|
||||
await i2p_checker.close()
|
||||
|
||||
# notify caller that all queries are finished
|
||||
query_notify.finish()
|
||||
@@ -625,7 +749,13 @@ def timeout_check(value):
|
||||
|
||||
|
||||
async def site_self_check(
|
||||
site: MaigretSite, logger, semaphore, db: MaigretDatabase, silent=False
|
||||
site: MaigretSite,
|
||||
logger,
|
||||
semaphore,
|
||||
db: MaigretDatabase,
|
||||
silent=False,
|
||||
tor_proxy=None,
|
||||
i2p_proxy=None,
|
||||
):
|
||||
changes = {
|
||||
"disabled": False,
|
||||
@@ -649,6 +779,8 @@ async def site_self_check(
|
||||
forced=True,
|
||||
no_progressbar=True,
|
||||
retries=1,
|
||||
tor_proxy=tor_proxy,
|
||||
i2p_proxy=i2p_proxy,
|
||||
)
|
||||
|
||||
# don't disable entries with other ids types
|
||||
@@ -658,6 +790,8 @@ async def site_self_check(
|
||||
changes["disabled"] = True
|
||||
continue
|
||||
|
||||
logger.debug(results_dict)
|
||||
|
||||
result = results_dict[site.name]["status"]
|
||||
|
||||
site_status = result.status
|
||||
@@ -696,7 +830,13 @@ async def site_self_check(
|
||||
|
||||
|
||||
async def self_check(
|
||||
db: MaigretDatabase, site_data: dict, logger, silent=False, max_connections=10
|
||||
db: MaigretDatabase,
|
||||
site_data: dict,
|
||||
logger,
|
||||
silent=False,
|
||||
max_connections=10,
|
||||
tor_proxy=None,
|
||||
i2p_proxy=None,
|
||||
) -> bool:
|
||||
sem = asyncio.Semaphore(max_connections)
|
||||
tasks = []
|
||||
@@ -708,7 +848,9 @@ async def self_check(
|
||||
disabled_old_count = disabled_count(all_sites.values())
|
||||
|
||||
for _, site in all_sites.items():
|
||||
check_coro = site_self_check(site, logger, sem, db, silent)
|
||||
check_coro = site_self_check(
|
||||
site, logger, sem, db, silent, tor_proxy, i2p_proxy
|
||||
)
|
||||
future = asyncio.ensure_future(check_coro)
|
||||
tasks.append(future)
|
||||
|
||||
|
||||
+47
-1
@@ -32,6 +32,8 @@ from .report import (
|
||||
save_txt_report,
|
||||
SUPPORTED_JSON_REPORT_FORMATS,
|
||||
save_json_report,
|
||||
get_plaintext_report,
|
||||
sort_report_by_data_points,
|
||||
)
|
||||
from .sites import MaigretDatabase
|
||||
from .submit import submit_dialog
|
||||
@@ -237,6 +239,26 @@ def setup_arguments_parser():
|
||||
default=None,
|
||||
help="Make requests over a proxy. e.g. socks5://127.0.0.1:1080",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--tor-proxy",
|
||||
metavar='TOR_PROXY_URL',
|
||||
action="store",
|
||||
default='socks5://127.0.0.1:9050',
|
||||
help="Specify URL of your Tor gateway. Default is socks5://127.0.0.1:9050",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--i2p-proxy",
|
||||
metavar='I2P_PROXY_URL',
|
||||
action="store",
|
||||
default='http://127.0.0.1:4444',
|
||||
help="Specify URL of your I2P gateway. Default is http://127.0.0.1:4444",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--with-domains",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Enable (experimental) feature of checking domains on usernames.",
|
||||
)
|
||||
|
||||
filter_group = parser.add_argument_group(
|
||||
'Site filtering', 'Options to set site search scope'
|
||||
@@ -419,6 +441,13 @@ def setup_arguments_parser():
|
||||
help=f"Generate a JSON report of specific type: {', '.join(SUPPORTED_JSON_REPORT_FORMATS)}"
|
||||
" (one report per username).",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--reports-sorting",
|
||||
default='default',
|
||||
choices=('default', 'data'),
|
||||
help="Method of results sorting in reports (default: in order of getting the result)",
|
||||
)
|
||||
return parser
|
||||
|
||||
|
||||
@@ -507,7 +536,12 @@ async def main():
|
||||
if args.self_check:
|
||||
print('Maigret sites database self-checking...')
|
||||
is_need_update = await self_check(
|
||||
db, site_data, logger, max_connections=args.connections
|
||||
db,
|
||||
site_data,
|
||||
logger,
|
||||
max_connections=args.connections,
|
||||
tor_proxy=args.tor_proxy,
|
||||
i2p_proxy=args.i2p_proxy,
|
||||
)
|
||||
if is_need_update:
|
||||
if input('Do you want to save changes permanently? [Yn]\n').lower() in (
|
||||
@@ -583,6 +617,8 @@ async def main():
|
||||
site_dict=dict(sites_to_check),
|
||||
query_notify=query_notify,
|
||||
proxy=args.proxy,
|
||||
tor_proxy=args.tor_proxy,
|
||||
i2p_proxy=args.i2p_proxy,
|
||||
timeout=args.timeout,
|
||||
is_parsing_enabled=parsing_enabled,
|
||||
id_type=id_type,
|
||||
@@ -593,10 +629,14 @@ async def main():
|
||||
max_connections=args.connections,
|
||||
no_progressbar=args.no_progressbar,
|
||||
retries=args.retries,
|
||||
check_domains=args.with_domains,
|
||||
)
|
||||
|
||||
notify_about_errors(results, query_notify)
|
||||
|
||||
if args.reports_sorting == "data":
|
||||
results = sort_report_by_data_points(results)
|
||||
|
||||
general_results.append((username, id_type, results))
|
||||
|
||||
# TODO: tests
|
||||
@@ -646,6 +686,12 @@ async def main():
|
||||
filename = report_filepath_tpl.format(username=username, postfix='.pdf')
|
||||
save_pdf_report(filename, report_context)
|
||||
query_notify.warning(f'PDF report on all usernames saved in {filename}')
|
||||
|
||||
text_report = get_plaintext_report(report_context)
|
||||
if text_report:
|
||||
query_notify.info('Short text report:')
|
||||
print(text_report)
|
||||
|
||||
# update database
|
||||
db.save_to_file(args.db_file)
|
||||
|
||||
|
||||
+10
-3
@@ -205,13 +205,20 @@ class QueryNotifyPrint(QueryNotify):
|
||||
else:
|
||||
print(f"[*] {title} {message} on:")
|
||||
|
||||
def warning(self, message, symbol="-"):
|
||||
msg = f"[{symbol}] {message}"
|
||||
def _colored_print(self, fore_color, msg):
|
||||
if self.color:
|
||||
print(Style.BRIGHT + Fore.YELLOW + msg)
|
||||
print(Style.BRIGHT + fore_color + msg)
|
||||
else:
|
||||
print(msg)
|
||||
|
||||
def warning(self, message, symbol="-"):
|
||||
msg = f"[{symbol}] {message}"
|
||||
self._colored_print(Fore.YELLOW, msg)
|
||||
|
||||
def info(self, message, symbol="*"):
|
||||
msg = f"[{symbol}] {message}"
|
||||
self._colored_print(Fore.BLUE, msg)
|
||||
|
||||
def update(self, result, is_similar=False):
|
||||
"""Notify Update.
|
||||
|
||||
|
||||
+47
-9
@@ -36,6 +36,18 @@ def filter_supposed_data(data):
|
||||
return filtered_supposed_data
|
||||
|
||||
|
||||
def sort_report_by_data_points(results):
|
||||
return dict(
|
||||
sorted(
|
||||
results.items(),
|
||||
key=lambda x: len(
|
||||
(x[1].get('status') and x[1]['status'].ids_data or {}).keys()
|
||||
),
|
||||
reverse=True,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
"""
|
||||
REPORTS SAVING
|
||||
"""
|
||||
@@ -70,6 +82,17 @@ def save_json_report(filename: str, username: str, results: dict, report_type: s
|
||||
generate_json_report(username, results, f, report_type=report_type)
|
||||
|
||||
|
||||
def get_plaintext_report(context: dict) -> str:
|
||||
output = (context['brief'] + " ").replace('. ', '.\n')
|
||||
interests = list(map(lambda x: x[0], context.get('interests_tuple_list', [])))
|
||||
countries = list(map(lambda x: x[0], context.get('countries_tuple_list', [])))
|
||||
if countries:
|
||||
output += f'Countries: {", ".join(countries)}\n'
|
||||
if interests:
|
||||
output += f'Interests (tags): {", ".join(interests)}\n'
|
||||
return output.strip()
|
||||
|
||||
|
||||
"""
|
||||
REPORTS GENERATING
|
||||
"""
|
||||
@@ -215,6 +238,7 @@ def generate_report_context(username_results: list):
|
||||
|
||||
return {
|
||||
"username": first_username,
|
||||
# TODO: return brief list
|
||||
"brief": brief,
|
||||
"results": username_results,
|
||||
"first_seen": first_seen,
|
||||
@@ -231,14 +255,18 @@ def generate_csv_report(username: str, results: dict, csvfile):
|
||||
["username", "name", "url_main", "url_user", "exists", "http_status"]
|
||||
)
|
||||
for site in results:
|
||||
# TODO: fix the reason
|
||||
status = 'Unknown'
|
||||
if "status" in results[site]:
|
||||
status = str(results[site]["status"].status)
|
||||
writer.writerow(
|
||||
[
|
||||
username,
|
||||
site,
|
||||
results[site]["url_main"],
|
||||
results[site]["url_user"],
|
||||
str(results[site]["status"].status),
|
||||
results[site]["http_status"],
|
||||
results[site].get("url_main", ""),
|
||||
results[site].get("url_user", ""),
|
||||
status,
|
||||
results[site].get("http_status", 0),
|
||||
]
|
||||
)
|
||||
|
||||
@@ -250,7 +278,10 @@ def generate_txt_report(username: str, results: dict, file):
|
||||
# TODO: fix no site data issue
|
||||
if not dictionary:
|
||||
continue
|
||||
if dictionary.get("status").status == QueryStatus.CLAIMED:
|
||||
if (
|
||||
dictionary.get("status")
|
||||
and dictionary["status"].status == QueryStatus.CLAIMED
|
||||
):
|
||||
exists_counter += 1
|
||||
file.write(dictionary["url_user"] + "\n")
|
||||
file.write(f"Total Websites Username Detected On : {exists_counter}")
|
||||
@@ -263,14 +294,18 @@ def generate_json_report(username: str, results: dict, file, report_type):
|
||||
for sitename in results:
|
||||
site_result = results[sitename]
|
||||
# TODO: fix no site data issue
|
||||
if not site_result or site_result.get("status").status != QueryStatus.CLAIMED:
|
||||
if not site_result or not site_result.get("status"):
|
||||
continue
|
||||
|
||||
if site_result["status"].status != QueryStatus.CLAIMED:
|
||||
continue
|
||||
|
||||
data = dict(site_result)
|
||||
data["status"] = data["status"].json()
|
||||
data["site"] = data["site"].json
|
||||
if "future" in data:
|
||||
del data["future"]
|
||||
for field in ["future", "checker"]:
|
||||
if field in data:
|
||||
del data[field]
|
||||
|
||||
if is_report_per_line:
|
||||
data["sitename"] = sitename
|
||||
@@ -319,8 +354,11 @@ def design_xmind_sheet(sheet, username, results):
|
||||
|
||||
for website_name in results:
|
||||
dictionary = results[website_name]
|
||||
if not dictionary:
|
||||
continue
|
||||
result_status = dictionary.get("status")
|
||||
if result_status.status != QueryStatus.CLAIMED:
|
||||
# TODO: fix the reason
|
||||
if not result_status or result_status.status != QueryStatus.CLAIMED:
|
||||
continue
|
||||
|
||||
stripped_tags = list(map(lambda x: x.strip(), result_status.tags))
|
||||
|
||||
+2322
-1907
File diff suppressed because it is too large
Load Diff
@@ -68,7 +68,7 @@
|
||||
<div class="row-mb">
|
||||
<div class="col-md">
|
||||
<div class="card flex-md-row mb-4 box-shadow h-md-250">
|
||||
<img class="card-img-right flex-auto d-md-block" alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="{{ v.status.ids_data.image or 'https://i.imgur.com/040fmbw.png' }}" data-holder-rendered="true">
|
||||
<img class="card-img-right flex-auto d-md-block" alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="{{ v.status and v.status.ids_data and v.status.ids_data.image or 'https://i.imgur.com/040fmbw.png' }}" data-holder-rendered="true">
|
||||
<div class="card-body d-flex flex-column align-items-start" style="padding-top: 0;">
|
||||
<h3 class="mb-0" style="padding-top: 1rem;">
|
||||
<a class="text-dark" href="{{ v.url_main }}" target="_blank">{{ k }}</a>
|
||||
|
||||
+13
-2
@@ -61,9 +61,12 @@ SUPPORTED_TAGS = [
|
||||
"military",
|
||||
"auto",
|
||||
"gambling",
|
||||
"business",
|
||||
"cybercriminal",
|
||||
"review",
|
||||
"bookmarks",
|
||||
"design",
|
||||
"tor",
|
||||
"i2p",
|
||||
]
|
||||
|
||||
|
||||
@@ -121,6 +124,8 @@ class MaigretSite:
|
||||
alexa_rank = None
|
||||
source = None
|
||||
|
||||
protocol = ''
|
||||
|
||||
def __init__(self, name, information):
|
||||
self.name = name
|
||||
self.url_subpath = ""
|
||||
@@ -300,12 +305,18 @@ class MaigretDatabase:
|
||||
lambda x: isinstance(x.engine, str) and x.engine.lower() in normalized_tags
|
||||
)
|
||||
is_tags_ok = lambda x: set(x.tags).intersection(set(normalized_tags))
|
||||
is_protocol_in_tags = lambda x: x.protocol and x.protocol in normalized_tags
|
||||
is_disabled_needed = lambda x: not x.disabled or (
|
||||
"disabled" in tags or disabled
|
||||
)
|
||||
is_id_type_ok = lambda x: x.type == id_type
|
||||
|
||||
filter_tags_engines_fun = lambda x: not tags or is_engine_ok(x) or is_tags_ok(x)
|
||||
filter_tags_engines_fun = (
|
||||
lambda x: not tags
|
||||
or is_engine_ok(x)
|
||||
or is_tags_ok(x)
|
||||
or is_protocol_in_tags(x)
|
||||
)
|
||||
filter_names_fun = lambda x: not names or is_name_ok(x) or is_source_ok(x)
|
||||
|
||||
filter_fun = (
|
||||
|
||||
+31
-10
@@ -32,6 +32,8 @@ HEADERS = {
|
||||
"User-Agent": get_random_user_agent(),
|
||||
}
|
||||
|
||||
SEPARATORS = "\"'"
|
||||
|
||||
RATIO = 0.6
|
||||
TOP_FEATURES = 5
|
||||
URL_RE = re.compile(r"https?://(www\.)?")
|
||||
@@ -195,7 +197,7 @@ async def detect_known_engine(
|
||||
|
||||
def extract_username_dialog(url):
|
||||
url_parts = url.rstrip("/").split("/")
|
||||
supposed_username = url_parts[-1]
|
||||
supposed_username = url_parts[-1].strip('@')
|
||||
entered_username = input(
|
||||
f'Is "{supposed_username}" a valid username? If not, write it manually: '
|
||||
)
|
||||
@@ -203,38 +205,53 @@ def extract_username_dialog(url):
|
||||
|
||||
|
||||
async def check_features_manually(
|
||||
db, url_exists, url_mainpage, cookie_file, logger, redirects=True
|
||||
db, url_exists, url_mainpage, cookie_file, logger, redirects=False
|
||||
):
|
||||
custom_headers = {}
|
||||
while True:
|
||||
header_key = input(
|
||||
'Specify custom header if you need or just press Enter to skip. Header name: '
|
||||
)
|
||||
if not header_key:
|
||||
break
|
||||
header_value = input('Header value: ')
|
||||
custom_headers[header_key.strip()] = header_value.strip()
|
||||
|
||||
supposed_username = extract_username_dialog(url_exists)
|
||||
non_exist_username = "noonewouldeverusethis7"
|
||||
|
||||
url_user = url_exists.replace(supposed_username, "{username}")
|
||||
url_not_exists = url_exists.replace(supposed_username, non_exist_username)
|
||||
|
||||
headers = dict(HEADERS)
|
||||
headers.update(custom_headers)
|
||||
|
||||
# cookies
|
||||
cookie_dict = None
|
||||
if cookie_file:
|
||||
logger.info(f'Use {cookie_file} for cookies')
|
||||
cookie_jar = await import_aiohttp_cookies(cookie_file)
|
||||
cookie_jar = import_aiohttp_cookies(cookie_file)
|
||||
cookie_dict = {c.key: c.value for c in cookie_jar}
|
||||
|
||||
exists_resp = requests.get(
|
||||
url_exists, cookies=cookie_dict, headers=HEADERS, allow_redirects=redirects
|
||||
url_exists, cookies=cookie_dict, headers=headers, allow_redirects=redirects
|
||||
)
|
||||
logger.debug(url_exists)
|
||||
logger.debug(exists_resp.status_code)
|
||||
logger.debug(exists_resp.text)
|
||||
|
||||
non_exists_resp = requests.get(
|
||||
url_not_exists, cookies=cookie_dict, headers=HEADERS, allow_redirects=redirects
|
||||
url_not_exists, cookies=cookie_dict, headers=headers, allow_redirects=redirects
|
||||
)
|
||||
logger.debug(url_not_exists)
|
||||
logger.debug(non_exists_resp.status_code)
|
||||
logger.debug(non_exists_resp.text)
|
||||
|
||||
a = exists_resp.text
|
||||
b = non_exists_resp.text
|
||||
|
||||
tokens_a = set(a.split('"'))
|
||||
tokens_b = set(b.split('"'))
|
||||
tokens_a = set(re.split(f'[{SEPARATORS}]', a))
|
||||
tokens_b = set(re.split(f'[{SEPARATORS}]', b))
|
||||
|
||||
a_minus_b = tokens_a.difference(tokens_b)
|
||||
b_minus_a = tokens_b.difference(tokens_a)
|
||||
@@ -255,7 +272,7 @@ async def check_features_manually(
|
||||
features = input("If features was not detected correctly, write it manually: ")
|
||||
|
||||
if features:
|
||||
presence_list = features.split(",")
|
||||
presence_list = list(map(str.strip, features.split(",")))
|
||||
|
||||
absence_list = sorted(b_minus_a, key=get_match_ratio, reverse=True)[
|
||||
:top_features_count
|
||||
@@ -264,7 +281,7 @@ async def check_features_manually(
|
||||
features = input("If features was not detected correctly, write it manually: ")
|
||||
|
||||
if features:
|
||||
absence_list = features.split(",")
|
||||
absence_list = list(map(str.strip, features.split(",")))
|
||||
|
||||
site_data = {
|
||||
"absenceStrs": absence_list,
|
||||
@@ -276,6 +293,9 @@ async def check_features_manually(
|
||||
"checkType": "message",
|
||||
}
|
||||
|
||||
if headers != HEADERS:
|
||||
site_data['headers'] = headers
|
||||
|
||||
site = MaigretSite(url_mainpage.split("/")[-1], site_data)
|
||||
return site
|
||||
|
||||
@@ -283,6 +303,7 @@ async def check_features_manually(
|
||||
async def submit_dialog(db, url_exists, cookie_file, logger):
|
||||
domain_raw = URL_RE.sub("", url_exists).strip().strip("/")
|
||||
domain_raw = domain_raw.split("/")[0]
|
||||
logger.info('Domain is %s', domain_raw)
|
||||
|
||||
# check for existence
|
||||
matched_sites = list(filter(lambda x: domain_raw in x.url_main + x.url, db.sites))
|
||||
@@ -355,7 +376,7 @@ async def submit_dialog(db, url_exists, cookie_file, logger):
|
||||
return False
|
||||
|
||||
chosen_site.name = input("Change site name if you want: ") or chosen_site.name
|
||||
chosen_site.tags = input("Site tags: ").split(',')
|
||||
chosen_site.tags = list(map(str.strip, input("Site tags: ").split(',')))
|
||||
rank = get_alexa_rank(chosen_site.url_main)
|
||||
if rank:
|
||||
print(f'New alexa rank: {rank}')
|
||||
|
||||
+8
-1
@@ -1,5 +1,7 @@
|
||||
import ast
|
||||
import re
|
||||
import random
|
||||
from typing import Any
|
||||
|
||||
|
||||
DEFAULT_USER_AGENTS = [
|
||||
@@ -65,6 +67,10 @@ class URLMatcher:
|
||||
return re.compile(regexp_str)
|
||||
|
||||
|
||||
def ascii_data_display(data: str) -> Any:
|
||||
return ast.literal_eval(data)
|
||||
|
||||
|
||||
def get_dict_ascii_tree(items, prepend="", new_line=True):
|
||||
text = ""
|
||||
for num, item in enumerate(items):
|
||||
@@ -75,7 +81,8 @@ def get_dict_ascii_tree(items, prepend="", new_line=True):
|
||||
if field_value.startswith("['"):
|
||||
is_last_item = num == len(items) - 1
|
||||
prepend_symbols = " " * 3 if is_last_item else " ┃ "
|
||||
field_value = get_dict_ascii_tree(eval(field_value), prepend_symbols)
|
||||
data = ascii_data_display(field_value)
|
||||
field_value = get_dict_ascii_tree(data, prepend_symbols)
|
||||
text += f"\n{prepend}{box_symbol}{field_name}: {field_value}"
|
||||
else:
|
||||
text += f"\n{prepend}{box_symbol} {item}"
|
||||
|
||||
+2
-1
@@ -1,3 +1,4 @@
|
||||
aiodns==3.0.0
|
||||
aiohttp==3.7.4
|
||||
aiohttp-socks==0.5.5
|
||||
arabic-reshaper==2.1.1
|
||||
@@ -26,7 +27,7 @@ python-socks==1.1.2
|
||||
requests>=2.24.0
|
||||
requests-futures==1.0.0
|
||||
six==1.15.0
|
||||
socid-extractor>=0.0.19
|
||||
socid-extractor>=0.0.21
|
||||
soupsieve==2.1
|
||||
stem==1.8.0
|
||||
torrequest==0.1.0
|
||||
|
||||
@@ -12,7 +12,7 @@ with open('requirements.txt') as rf:
|
||||
requires = rf.read().splitlines()
|
||||
|
||||
setup(name='maigret',
|
||||
version='0.2.3',
|
||||
version='0.3.0',
|
||||
description='Collect a dossier on a person by username from a huge number of sites',
|
||||
long_description=long_description,
|
||||
long_description_content_type="text/markdown",
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
flake8==3.8.4
|
||||
pytest==6.2.4
|
||||
pytest-asyncio==0.14.0
|
||||
pytest-cov==2.10.1
|
||||
pytest-httpserver==1.0.0
|
||||
pytest-rerunfailures==9.1.1
|
||||
+12
-5
@@ -12,6 +12,7 @@ from maigret.maigret import setup_arguments_parser
|
||||
CUR_PATH = os.path.dirname(os.path.realpath(__file__))
|
||||
JSON_FILE = os.path.join(CUR_PATH, '../maigret/resources/data.json')
|
||||
TEST_JSON_FILE = os.path.join(CUR_PATH, 'db.json')
|
||||
LOCAL_TEST_JSON_FILE = os.path.join(CUR_PATH, 'local.json')
|
||||
empty_mark = Mark('', (), {})
|
||||
|
||||
|
||||
@@ -36,16 +37,17 @@ def remove_test_reports():
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def default_db():
|
||||
db = MaigretDatabase().load_from_file(JSON_FILE)
|
||||
|
||||
return db
|
||||
return MaigretDatabase().load_from_file(JSON_FILE)
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def test_db():
|
||||
db = MaigretDatabase().load_from_file(TEST_JSON_FILE)
|
||||
return MaigretDatabase().load_from_file(TEST_JSON_FILE)
|
||||
|
||||
return db
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def local_test_db():
|
||||
return MaigretDatabase().load_from_file(LOCAL_TEST_JSON_FILE)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
@@ -58,3 +60,8 @@ def reports_autoclean():
|
||||
@pytest.fixture(scope='session')
|
||||
def argparser():
|
||||
return setup_arguments_parser()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def httpserver_listen_address():
|
||||
return ("localhost", 8989)
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
{
|
||||
"engines": {},
|
||||
"sites": {
|
||||
"StatusCode": {
|
||||
"checkType": "status_code",
|
||||
"url": "http://localhost:8989/url?id={username}",
|
||||
"urlMain": "http://localhost:8989/",
|
||||
"usernameClaimed": "claimed",
|
||||
"usernameUnclaimed": "unclaimed"
|
||||
},
|
||||
"Message": {
|
||||
"checkType": "message",
|
||||
"url": "http://localhost:8989/url?id={username}",
|
||||
"urlMain": "http://localhost:8989/",
|
||||
"presenseStrs": ["user", "profile"],
|
||||
"absenseStrs": ["not found", "404"],
|
||||
"usernameClaimed": "claimed",
|
||||
"usernameUnclaimed": "unclaimed"
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,7 @@ httpbin.org FALSE / FALSE 0 a b
|
||||
"""
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="periodically fails")
|
||||
@pytest.mark.slow
|
||||
def test_twitter_activation(default_db):
|
||||
twitter_site = default_db.sites_dict['Twitter']
|
||||
@@ -39,7 +40,7 @@ async def test_import_aiohttp_cookies():
|
||||
with open(cookies_filename, 'w') as f:
|
||||
f.write(COOKIES_TXT)
|
||||
|
||||
cookie_jar = await import_aiohttp_cookies(cookies_filename)
|
||||
cookie_jar = import_aiohttp_cookies(cookies_filename)
|
||||
assert list(cookie_jar._cookies.keys()) == ['xss.is', 'httpbin.org']
|
||||
|
||||
url = 'https://httpbin.org/cookies'
|
||||
|
||||
@@ -0,0 +1,69 @@
|
||||
from mock import Mock
|
||||
import pytest
|
||||
|
||||
from maigret import search
|
||||
|
||||
|
||||
def site_result_except(server, username, **kwargs):
|
||||
query = f'id={username}'
|
||||
server.expect_request('/url', query_string=query).respond_with_data(**kwargs)
|
||||
|
||||
|
||||
@pytest.mark.slow
|
||||
@pytest.mark.asyncio
|
||||
async def test_checking_by_status_code(httpserver, local_test_db):
|
||||
sites_dict = local_test_db.sites_dict
|
||||
|
||||
site_result_except(httpserver, 'claimed', status=200)
|
||||
site_result_except(httpserver, 'unclaimed', status=404)
|
||||
|
||||
result = await search('claimed', site_dict=sites_dict, logger=Mock())
|
||||
assert result['StatusCode']['status'].is_found() is True
|
||||
|
||||
result = await search('unclaimed', site_dict=sites_dict, logger=Mock())
|
||||
assert result['StatusCode']['status'].is_found() is False
|
||||
|
||||
|
||||
@pytest.mark.slow
|
||||
@pytest.mark.asyncio
|
||||
async def test_checking_by_message_positive_full(httpserver, local_test_db):
|
||||
sites_dict = local_test_db.sites_dict
|
||||
|
||||
site_result_except(httpserver, 'claimed', response_data="user profile")
|
||||
site_result_except(httpserver, 'unclaimed', response_data="404 not found")
|
||||
|
||||
result = await search('claimed', site_dict=sites_dict, logger=Mock())
|
||||
assert result['Message']['status'].is_found() is True
|
||||
|
||||
result = await search('unclaimed', site_dict=sites_dict, logger=Mock())
|
||||
assert result['Message']['status'].is_found() is False
|
||||
|
||||
|
||||
@pytest.mark.slow
|
||||
@pytest.mark.asyncio
|
||||
async def test_checking_by_message_positive_part(httpserver, local_test_db):
|
||||
sites_dict = local_test_db.sites_dict
|
||||
|
||||
site_result_except(httpserver, 'claimed', response_data="profile")
|
||||
site_result_except(httpserver, 'unclaimed', response_data="404")
|
||||
|
||||
result = await search('claimed', site_dict=sites_dict, logger=Mock())
|
||||
assert result['Message']['status'].is_found() is True
|
||||
|
||||
result = await search('unclaimed', site_dict=sites_dict, logger=Mock())
|
||||
assert result['Message']['status'].is_found() is False
|
||||
|
||||
|
||||
@pytest.mark.slow
|
||||
@pytest.mark.asyncio
|
||||
async def test_checking_by_message_negative(httpserver, local_test_db):
|
||||
sites_dict = local_test_db.sites_dict
|
||||
|
||||
site_result_except(httpserver, 'claimed', response_data="")
|
||||
site_result_except(httpserver, 'unclaimed', response_data="user 404")
|
||||
|
||||
result = await search('claimed', site_dict=sites_dict, logger=Mock())
|
||||
assert result['Message']['status'].is_found() is False
|
||||
|
||||
result = await search('unclaimed', site_dict=sites_dict, logger=Mock())
|
||||
assert result['Message']['status'].is_found() is True
|
||||
@@ -25,17 +25,21 @@ DEFAULT_ARGS: Dict[str, Any] = {
|
||||
'print_check_errors': False,
|
||||
'print_not_found': False,
|
||||
'proxy': None,
|
||||
'reports_sorting': 'default',
|
||||
'retries': 1,
|
||||
'self_check': False,
|
||||
'site_list': [],
|
||||
'stats': False,
|
||||
'tags': '',
|
||||
'timeout': 30,
|
||||
'tor_proxy': 'socks5://127.0.0.1:9050',
|
||||
'i2p_proxy': 'http://127.0.0.1:4444',
|
||||
'top_sites': 500,
|
||||
'txt': False,
|
||||
'use_disabled_sites': False,
|
||||
'username': [],
|
||||
'verbose': False,
|
||||
'with_domains': False,
|
||||
'xmind': False,
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
"""Maigret data test functions"""
|
||||
|
||||
from maigret.utils import is_country_tag
|
||||
from maigret.sites import SUPPORTED_TAGS
|
||||
|
||||
|
||||
def test_tags_validity(default_db):
|
||||
unknown_tags = set()
|
||||
|
||||
for site in default_db.sites:
|
||||
for tag in filter(lambda x: not is_country_tag(x), site.tags):
|
||||
if tag not in SUPPORTED_TAGS:
|
||||
unknown_tags.add(tag)
|
||||
|
||||
assert unknown_tags == set()
|
||||
@@ -138,6 +138,7 @@ def test_maigret_results(test_db):
|
||||
|
||||
assert results['Reddit'].get('future') is None
|
||||
del results['GooglePlayStore']['future']
|
||||
del results['GooglePlayStore']['checker']
|
||||
|
||||
assert results == RESULTS_EXAMPLE
|
||||
|
||||
|
||||
+109
-2
@@ -16,6 +16,7 @@ from maigret.report import (
|
||||
generate_report_template,
|
||||
generate_report_context,
|
||||
generate_json_report,
|
||||
get_plaintext_report,
|
||||
)
|
||||
from maigret.result import QueryResult, QueryStatus
|
||||
from maigret.sites import MaigretSite
|
||||
@@ -44,6 +45,19 @@ EXAMPLE_RESULTS = {
|
||||
}
|
||||
}
|
||||
|
||||
BROKEN_RESULTS = {
|
||||
'GitHub': {
|
||||
'username': 'test',
|
||||
'parsing_enabled': True,
|
||||
'url_main': 'https://www.github.com/',
|
||||
'url_user': 'https://www.github.com/test',
|
||||
'http_status': 200,
|
||||
'is_similar': False,
|
||||
'rank': 78,
|
||||
'site': MaigretSite('test', {}),
|
||||
}
|
||||
}
|
||||
|
||||
GOOD_500PX_RESULT = copy.deepcopy(GOOD_RESULT)
|
||||
GOOD_500PX_RESULT.tags = ['photo', 'us', 'global']
|
||||
GOOD_500PX_RESULT.ids_data = {
|
||||
@@ -238,10 +252,13 @@ TEST = [
|
||||
]
|
||||
|
||||
SUPPOSED_BRIEF = """Search by username alexaimephotographycars returned 1 accounts. Found target's other IDs: alexaimephotography, Alexaimephotogr. Search by username alexaimephotography returned 2 accounts. Search by username Alexaimephotogr returned 1 accounts. Extended info extracted from 3 accounts."""
|
||||
|
||||
SUPPOSED_INTERESTS = "Interests: photo <span class=\"text-muted\">(2)</span>, news <span class=\"text-muted\">(1)</span>, social <span class=\"text-muted\">(1)</span>"
|
||||
SUPPOSED_BROKEN_BRIEF = """Search by username alexaimephotographycars returned 0 accounts. Search by username alexaimephotography returned 2 accounts. Search by username Alexaimephotogr returned 1 accounts. Extended info extracted from 2 accounts."""
|
||||
|
||||
SUPPOSED_GEO = "Geo: us <span class=\"text-muted\">(3)</span>"
|
||||
SUPPOSED_BROKEN_GEO = "Geo: us <span class=\"text-muted\">(2)</span>"
|
||||
|
||||
SUPPOSED_INTERESTS = "Interests: photo <span class=\"text-muted\">(2)</span>, news <span class=\"text-muted\">(1)</span>, social <span class=\"text-muted\">(1)</span>"
|
||||
SUPPOSED_BROKEN_INTERESTS = "Interests: news <span class=\"text-muted\">(1)</span>, photo <span class=\"text-muted\">(1)</span>, social <span class=\"text-muted\">(1)</span>"
|
||||
|
||||
|
||||
def test_generate_report_template():
|
||||
@@ -269,6 +286,19 @@ def test_generate_csv_report():
|
||||
]
|
||||
|
||||
|
||||
def test_generate_csv_report_broken():
|
||||
csvfile = StringIO()
|
||||
generate_csv_report('test', BROKEN_RESULTS, csvfile)
|
||||
|
||||
csvfile.seek(0)
|
||||
data = csvfile.readlines()
|
||||
|
||||
assert data == [
|
||||
'username,name,url_main,url_user,exists,http_status\r\n',
|
||||
'test,GitHub,https://www.github.com/,https://www.github.com/test,Unknown,200\r\n',
|
||||
]
|
||||
|
||||
|
||||
def test_generate_txt_report():
|
||||
txtfile = StringIO()
|
||||
generate_txt_report('test', EXAMPLE_RESULTS, txtfile)
|
||||
@@ -282,6 +312,18 @@ def test_generate_txt_report():
|
||||
]
|
||||
|
||||
|
||||
def test_generate_txt_report_broken():
|
||||
txtfile = StringIO()
|
||||
generate_txt_report('test', BROKEN_RESULTS, txtfile)
|
||||
|
||||
txtfile.seek(0)
|
||||
data = txtfile.readlines()
|
||||
|
||||
assert data == [
|
||||
'Total Websites Username Detected On : 0',
|
||||
]
|
||||
|
||||
|
||||
def test_generate_json_simple_report():
|
||||
jsonfile = StringIO()
|
||||
MODIFIED_RESULTS = dict(EXAMPLE_RESULTS)
|
||||
@@ -295,6 +337,19 @@ def test_generate_json_simple_report():
|
||||
assert list(json.loads(data[0]).keys()) == ['GitHub', 'GitHub2']
|
||||
|
||||
|
||||
def test_generate_json_simple_report_broken():
|
||||
jsonfile = StringIO()
|
||||
MODIFIED_RESULTS = dict(BROKEN_RESULTS)
|
||||
MODIFIED_RESULTS['GitHub2'] = BROKEN_RESULTS['GitHub']
|
||||
generate_json_report('test', BROKEN_RESULTS, jsonfile, 'simple')
|
||||
|
||||
jsonfile.seek(0)
|
||||
data = jsonfile.readlines()
|
||||
|
||||
assert len(data) == 1
|
||||
assert list(json.loads(data[0]).keys()) == []
|
||||
|
||||
|
||||
def test_generate_json_ndjson_report():
|
||||
jsonfile = StringIO()
|
||||
MODIFIED_RESULTS = dict(EXAMPLE_RESULTS)
|
||||
@@ -328,6 +383,20 @@ def test_save_xmind_report():
|
||||
)
|
||||
|
||||
|
||||
def test_save_xmind_report_broken():
|
||||
filename = 'report_test.xmind'
|
||||
save_xmind_report(filename, 'test', BROKEN_RESULTS)
|
||||
|
||||
workbook = xmind.load(filename)
|
||||
sheet = workbook.getPrimarySheet()
|
||||
data = sheet.getData()
|
||||
|
||||
assert data['title'] == 'test Analysis'
|
||||
assert data['topic']['title'] == 'test'
|
||||
assert len(data['topic']['topics']) == 1
|
||||
assert data['topic']['topics'][0]['title'] == 'Undefined'
|
||||
|
||||
|
||||
def test_html_report():
|
||||
report_name = 'report_test.html'
|
||||
context = generate_report_context(TEST)
|
||||
@@ -340,9 +409,47 @@ def test_html_report():
|
||||
assert SUPPOSED_INTERESTS in report_text
|
||||
|
||||
|
||||
def test_html_report_broken():
|
||||
report_name = 'report_test_broken.html'
|
||||
BROKEN_DATA = copy.deepcopy(TEST)
|
||||
BROKEN_DATA[0][2]['500px']['status'] = None
|
||||
|
||||
context = generate_report_context(BROKEN_DATA)
|
||||
save_html_report(report_name, context)
|
||||
|
||||
report_text = open(report_name).read()
|
||||
|
||||
assert SUPPOSED_BROKEN_BRIEF in report_text
|
||||
assert SUPPOSED_BROKEN_GEO in report_text
|
||||
assert SUPPOSED_BROKEN_INTERESTS in report_text
|
||||
|
||||
|
||||
def test_pdf_report():
|
||||
report_name = 'report_test.pdf'
|
||||
context = generate_report_context(TEST)
|
||||
save_pdf_report(report_name, context)
|
||||
|
||||
assert os.path.exists(report_name)
|
||||
|
||||
|
||||
def test_text_report():
|
||||
context = generate_report_context(TEST)
|
||||
report_text = get_plaintext_report(context)
|
||||
|
||||
for brief_part in SUPPOSED_BRIEF.split():
|
||||
assert brief_part in report_text
|
||||
assert 'us' in report_text
|
||||
assert 'photo' in report_text
|
||||
|
||||
|
||||
def test_text_report_broken():
|
||||
BROKEN_DATA = copy.deepcopy(TEST)
|
||||
BROKEN_DATA[0][2]['500px']['status'] = None
|
||||
|
||||
context = generate_report_context(BROKEN_DATA)
|
||||
report_text = get_plaintext_report(context)
|
||||
|
||||
for brief_part in SUPPOSED_BROKEN_BRIEF.split():
|
||||
assert brief_part in report_text
|
||||
assert 'us' in report_text
|
||||
assert 'photo' in report_text
|
||||
|
||||
@@ -57,6 +57,11 @@ def test_enrich_link_str():
|
||||
)
|
||||
|
||||
|
||||
def test_url_extract_main_part_negative():
|
||||
url_main_part = 'None'
|
||||
assert URLMatcher.extract_main_part(url_main_part) == ''
|
||||
|
||||
|
||||
def test_url_extract_main_part():
|
||||
url_main_part = 'flickr.com/photos/alexaimephotography'
|
||||
|
||||
|
||||
Reference in New Issue
Block a user