Compare commits

..

33 Commits

Author SHA1 Message Date
soxoj 9b7f36dc24 Merge pull request #166 from soxoj/0.3.0
Bump to 0.3.0
2021-06-03 00:13:35 +03:00
Soxoj 05167ad30c Bump to 0.3.0 2021-06-02 23:58:06 +03:00
soxoj cee6f0aa43 Merge pull request #165 from soxoj/i2p-support
Added I2P sites support
2021-06-02 23:49:05 +03:00
Soxoj 02cf330e37 Added I2P sites support 2021-06-02 23:45:11 +03:00
soxoj 5c8f7a3af0 Merge pull request #164 from soxoj/dns-checks-some-fixes
Added some domains for new DNS checker, fixed reports generation crashes
2021-06-02 23:26:26 +03:00
Soxoj 13e1b6f4d1 Added some domains for new DNS checker, fixed reports generation crashes 2021-06-02 23:16:44 +03:00
soxoj 5179cb56eb Merge pull request #163 from soxoj/sites-01-06-21
Added several sites
2021-06-01 00:30:19 +03:00
Soxoj 1a2c7e944a Added several sites 2021-06-01 00:28:14 +03:00
soxoj f7eae046a1 Merge pull request #162 from soxoj/new-sites-26-05-21
Added some new sites
2021-05-26 23:15:21 +03:00
Soxoj bdff08cb70 Added some new sites 2021-05-26 23:08:30 +03:00
soxoj a468cb1cd3 Merge pull request #161 from soxoj/xmind-report-fix
XMind report generation fix
2021-05-25 23:16:28 +03:00
Soxoj 0fe933e8a1 XMind report generation fix 2021-05-25 23:11:25 +03:00
soxoj 5c3de91181 Merge pull request #160 from soxoj/report-sort-fix
Fix for empty status in results data
2021-05-23 14:19:10 +03:00
Soxoj 3356463102 Fix for empty status in results data 2021-05-23 14:17:06 +03:00
soxoj 7ac03cf5ca Merge pull request #159 from soxoj/reports-data-sorting
Sort by number of data points (#105)
2021-05-22 20:26:37 +03:00
Soxoj 4aeacef07d Sort by number of data points (#105) 2021-05-22 20:23:53 +03:00
soxoj 8de1830cf3 Merge pull request #158 from soxoj/dns-resolving
Added DNS checker
2021-05-22 03:16:47 +03:00
Soxoj ba6169659e Added DNS checker 2021-05-22 03:12:04 +03:00
soxoj 4a5c5c3f07 Update README.md 2021-05-21 02:17:19 +03:00
soxoj 4ba7fcb1ff Merge pull request #157 from soxoj/tor-checking
Added checker of Tor sites
2021-05-20 23:30:08 +03:00
Soxoj a76f95858f Added checker of Tor sites 2021-05-20 23:26:02 +03:00
soxoj bea900dda0 Merge pull request #155 from soxoj/0.2.4
Bump to 0.2.4
2021-05-18 01:20:00 +03:00
Soxoj bb1bde833d Bump to 0.2.4 2021-05-18 01:17:35 +03:00
soxoj 5b405c6abb Merge pull request #154 from soxoj/tests-improving
Improved tests
2021-05-18 00:57:31 +03:00
Soxoj 99fa58ceed Disabled Twitter activation test 2021-05-18 00:55:18 +03:00
Soxoj c71e404f63 Added test dependencies 2021-05-18 00:49:13 +03:00
Soxoj 2c04ccce57 Improved tests 2021-05-18 00:43:56 +03:00
soxoj 435db7cdc9 Merge pull request #153 from soxoj/sites-update-16-05-21
Several sites added, updated site list
2021-05-17 00:35:56 +03:00
Soxoj 413a0502a4 Several sites added, updated site list 2021-05-16 17:02:41 +03:00
soxoj 2aedcc3166 Merge pull request #152 from soxoj/cli-plaintext-report
Added text report to CLI output
2021-05-15 16:57:22 +03:00
Soxoj 28835204f5 Added text report to CLI output 2021-05-15 16:55:05 +03:00
soxoj b11a247dfd Merge pull request #151 from soxoj/tags-socid-extractor
Tags updated, added tests for tags
2021-05-15 14:55:01 +03:00
Soxoj c9219d91ec Tags updated, added tests for tags
Added several sites
Updated socid_extractor version to avoid bug #150
2021-05-15 14:51:30 +03:00
27 changed files with 4357 additions and 3366 deletions
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
- name: Install dependencies - name: Install dependencies
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip
python -m pip install flake8 pytest pytest-rerunfailures python -m pip install -r test-requirements.txt
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Test with pytest - name: Test with pytest
run: | run: |
+10
View File
@@ -2,6 +2,16 @@
## [Unreleased] ## [Unreleased]
## [0.3.0] - 2021-06-02
* added support of Tor and I2P sites
* added experimental DNS checking feature
* implemented sorting by data points for reports
* reports fixes
## [0.2.4] - 2021-05-18
* cli output report
* various improvements
## [0.2.3] - 2021-05-12 ## [0.2.3] - 2021-05-12
* added Yelp and yelp_userid support * added Yelp and yelp_userid support
* tags markup stabilization * tags markup stabilization
+2 -8
View File
@@ -8,12 +8,6 @@
<a href="https://pypi.org/project/maigret/"> <a href="https://pypi.org/project/maigret/">
<img alt="PyPI - Downloads" src="https://img.shields.io/pypi/dw/maigret?style=flat-square"> <img alt="PyPI - Downloads" src="https://img.shields.io/pypi/dw/maigret?style=flat-square">
</a> </a>
<a href="https://gitter.im/maigret-osint/community">
<img alt="Chat - Gitter" src="./static/chat_gitter.svg" />
</a>
<a href="https://twitter.com/intent/follow?screen_name=sox0j">
<img src="https://img.shields.io/twitter/follow/sox0j?label=Follow%20sox0j&style=social&color=blue" alt="Follow @sox0j" />
</a>
</p> </p>
<p align="center"> <p align="center">
<img src="./static/maigret.png" height="200"/> <img src="./static/maigret.png" height="200"/>
@@ -24,9 +18,9 @@
## About ## About
**Maigret** collect a dossier on a person **by username only**, checking for accounts on a huge number of sites and gathering all the available information from web pages. Maigret is an easy-to-use and powerful fork of [Sherlock](https://github.com/sherlock-project/sherlock). **Maigret** collect a dossier on a person **by username only**, checking for accounts on a huge number of sites and gathering all the available information from web pages. No API keys required. Maigret is an easy-to-use and powerful fork of [Sherlock](https://github.com/sherlock-project/sherlock).
Currently supported more than 2000 sites ([full list](./sites.md)), search is launched against 500 popular sites in descending order of popularity by default. Currently supported more than 2000 sites ([full list](./sites.md)), search is launched against 500 popular sites in descending order of popularity by default. Also supported checking of Tor sites, I2P sites, and domains (via DNS resolving).
## Main features ## Main features
+1 -1
View File
@@ -1,3 +1,3 @@
"""Maigret version file""" """Maigret version file"""
__version__ = '0.2.3' __version__ = '0.3.0'
+1 -1
View File
@@ -35,7 +35,7 @@ class ParsingActivator:
site.headers["authorization"] = f"Bearer {bearer_token}" site.headers["authorization"] = f"Bearer {bearer_token}"
async def import_aiohttp_cookies(cookiestxt_filename): def import_aiohttp_cookies(cookiestxt_filename):
cookies_obj = MozillaCookieJar(cookiestxt_filename) cookies_obj = MozillaCookieJar(cookiestxt_filename)
cookies_obj.load(ignore_discard=True, ignore_expires=True) cookies_obj.load(ignore_discard=True, ignore_expires=True)
+208 -66
View File
@@ -9,6 +9,7 @@ from typing import Tuple, Optional, Dict, List
from urllib.parse import quote from urllib.parse import quote
import aiohttp import aiohttp
import aiodns
import tqdm.asyncio import tqdm.asyncio
from aiohttp_socks import ProxyConnector from aiohttp_socks import ProxyConnector
from python_socks import _errors as proxy_errors from python_socks import _errors as proxy_errors
@@ -26,7 +27,7 @@ from .executors import (
from .result import QueryResult, QueryStatus from .result import QueryResult, QueryStatus
from .sites import MaigretDatabase, MaigretSite from .sites import MaigretDatabase, MaigretSite
from .types import QueryOptions, QueryResultWrapper from .types import QueryOptions, QueryResultWrapper
from .utils import get_random_user_agent from .utils import get_random_user_agent, ascii_data_display
SUPPORTED_IDS = ( SUPPORTED_IDS = (
@@ -43,49 +44,142 @@ SUPPORTED_IDS = (
BAD_CHARS = "#" BAD_CHARS = "#"
async def get_response(request_future, logger) -> Tuple[str, int, Optional[CheckError]]: class CheckerBase:
html_text = None pass
status_code = 0
error: Optional[CheckError] = CheckError("Unknown")
try:
response = await request_future
status_code = response.status class SimpleAiohttpChecker(CheckerBase):
response_content = await response.content.read() def __init__(self, *args, **kwargs):
charset = response.charset or "utf-8" proxy = kwargs.get('proxy')
decoded_content = response_content.decode(charset, "ignore") cookie_jar = kwargs.get('cookie_jar')
html_text = decoded_content self.logger = kwargs.get('logger', Mock())
error = None # make http client session
if status_code == 0: connector = (
error = CheckError("Connection lost") ProxyConnector.from_url(proxy) if proxy else aiohttp.TCPConnector(ssl=False)
)
connector.verify_ssl = False
self.session = aiohttp.ClientSession(
connector=connector, trust_env=True, cookie_jar=cookie_jar
)
logger.debug(html_text) def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get'):
if method == 'get':
except asyncio.TimeoutError as e: request_method = self.session.get
error = CheckError("Request timeout", str(e))
except ClientConnectorError as e:
error = CheckError("Connecting failure", str(e))
except ServerDisconnectedError as e:
error = CheckError("Server disconnected", str(e))
except aiohttp.http_exceptions.BadHttpMessage as e:
error = CheckError("HTTP", str(e))
except proxy_errors.ProxyError as e:
error = CheckError("Proxy", str(e))
except KeyboardInterrupt:
error = CheckError("Interrupted")
except Exception as e:
# python-specific exceptions
if sys.version_info.minor > 6 and (
isinstance(e, ssl.SSLCertVerificationError) or isinstance(e, ssl.SSLError)
):
error = CheckError("SSL", str(e))
else: else:
logger.debug(e, exc_info=True) request_method = self.session.head
error = CheckError("Unexpected", str(e))
return str(html_text), status_code, error future = request_method(
url=url,
headers=headers,
allow_redirects=allow_redirects,
timeout=timeout,
)
return future
async def close(self):
await self.session.close()
async def check(self, future) -> Tuple[str, int, Optional[CheckError]]:
html_text = None
status_code = 0
error: Optional[CheckError] = CheckError("Unknown")
try:
response = await future
status_code = response.status
response_content = await response.content.read()
charset = response.charset or "utf-8"
decoded_content = response_content.decode(charset, "ignore")
html_text = decoded_content
error = None
if status_code == 0:
error = CheckError("Connection lost")
self.logger.debug(html_text)
except asyncio.TimeoutError as e:
error = CheckError("Request timeout", str(e))
except ClientConnectorError as e:
error = CheckError("Connecting failure", str(e))
except ServerDisconnectedError as e:
error = CheckError("Server disconnected", str(e))
except aiohttp.http_exceptions.BadHttpMessage as e:
error = CheckError("HTTP", str(e))
except proxy_errors.ProxyError as e:
error = CheckError("Proxy", str(e))
except KeyboardInterrupt:
error = CheckError("Interrupted")
except Exception as e:
# python-specific exceptions
if sys.version_info.minor > 6 and (
isinstance(e, ssl.SSLCertVerificationError)
or isinstance(e, ssl.SSLError)
):
error = CheckError("SSL", str(e))
else:
self.logger.debug(e, exc_info=True)
error = CheckError("Unexpected", str(e))
return str(html_text), status_code, error
class ProxiedAiohttpChecker(SimpleAiohttpChecker):
def __init__(self, *args, **kwargs):
proxy = kwargs.get('proxy')
cookie_jar = kwargs.get('cookie_jar')
self.logger = kwargs.get('logger', Mock())
connector = ProxyConnector.from_url(proxy)
connector.verify_ssl = False
self.session = aiohttp.ClientSession(
connector=connector, trust_env=True, cookie_jar=cookie_jar
)
class AiodnsDomainResolver(CheckerBase):
def __init__(self, *args, **kwargs):
loop = asyncio.get_event_loop()
self.logger = kwargs.get('logger', Mock())
self.resolver = aiodns.DNSResolver(loop=loop)
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get'):
return self.resolver.query(url, 'A')
async def check(self, future) -> Tuple[str, int, Optional[CheckError]]:
status = 404
error = None
text = ''
try:
res = await future
text = str(res[0].host)
status = 200
except aiodns.error.DNSError:
pass
except Exception as e:
self.logger.error(e, exc_info=True)
error = CheckError('DNS resolve error', str(e))
return text, status, error
class CheckerMock:
def __init__(self, *args, **kwargs):
pass
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get'):
return None
async def check(self, future) -> Tuple[str, int, Optional[CheckError]]:
await asyncio.sleep(0)
return '', 0, None
async def close(self):
return
# TODO: move to separate class # TODO: move to separate class
@@ -233,9 +327,9 @@ def process_site_result(
result = build_result(QueryStatus.CLAIMED) result = build_result(QueryStatus.CLAIMED)
else: else:
result = build_result(QueryStatus.AVAILABLE) result = build_result(QueryStatus.AVAILABLE)
elif check_type == "status_code": elif check_type in "status_code":
# Checks if the status code of the response is 2XX # Checks if the status code of the response is 2XX
if is_presense_detected and (not status_code >= 300 or status_code < 200): if 200 <= status_code < 300:
result = build_result(QueryStatus.CLAIMED) result = build_result(QueryStatus.CLAIMED)
else: else:
result = build_result(QueryStatus.AVAILABLE) result = build_result(QueryStatus.AVAILABLE)
@@ -272,7 +366,7 @@ def process_site_result(
new_usernames[v] = k new_usernames[v] = k
results_info["ids_usernames"] = new_usernames results_info["ids_usernames"] = new_usernames
links = eval(extracted_ids_data.get("links", "[]")) links = ascii_data_display(extracted_ids_data.get("links", "[]"))
if "website" in extracted_ids_data: if "website" in extracted_ids_data:
links.append(extracted_ids_data["website"]) links.append(extracted_ids_data["website"])
results_info["ids_links"] = links results_info["ids_links"] = links
@@ -322,7 +416,8 @@ def make_site_result(
# workaround to prevent slash errors # workaround to prevent slash errors
url = re.sub("(?<!:)/+", "/", url) url = re.sub("(?<!:)/+", "/", url)
session = options['session'] # always clearweb_checker for now
checker = options["checkers"][site.protocol]
# site check is disabled # site check is disabled
if site.disabled and not options['forced']: if site.disabled and not options['forced']:
@@ -381,12 +476,12 @@ def make_site_result(
# In most cases when we are detecting by status code, # In most cases when we are detecting by status code,
# it is not necessary to get the entire body: we can # it is not necessary to get the entire body: we can
# detect fine with just the HEAD response. # detect fine with just the HEAD response.
request_method = session.head request_method = 'head'
else: else:
# Either this detect method needs the content associated # Either this detect method needs the content associated
# with the GET response, or this specific website will # with the GET response, or this specific website will
# not respond properly unless we request the whole page. # not respond properly unless we request the whole page.
request_method = session.get request_method = 'get'
if site.check_type == "response_url": if site.check_type == "response_url":
# Site forwards request to a different URL if username not # Site forwards request to a different URL if username not
@@ -398,7 +493,8 @@ def make_site_result(
# The final result of the request will be what is available. # The final result of the request will be what is available.
allow_redirects = True allow_redirects = True
future = request_method( future = checker.prepare(
method=request_method,
url=url_probe, url=url_probe,
headers=headers, headers=headers,
allow_redirects=allow_redirects, allow_redirects=allow_redirects,
@@ -407,6 +503,7 @@ def make_site_result(
# Store future request object in the results object # Store future request object in the results object
results_site["future"] = future results_site["future"] = future
results_site["checker"] = checker
return results_site return results_site
@@ -419,7 +516,9 @@ async def check_site_for_username(
if not future: if not future:
return site.name, default_result return site.name, default_result
response = await get_response(request_future=future, logger=logger) checker = default_result["checker"]
response = await checker.check(future=future)
response_result = process_site_result( response_result = process_site_result(
response, query_notify, logger, default_result, site response, query_notify, logger, default_result, site
@@ -430,9 +529,9 @@ async def check_site_for_username(
return site.name, response_result return site.name, response_result
async def debug_ip_request(session, logger): async def debug_ip_request(checker, logger):
future = session.get(url="https://icanhazip.com") future = checker.prepare(url="https://icanhazip.com")
ip, status, check_error = await get_response(future, logger) ip, status, check_error = await checker.check(future)
if ip: if ip:
logger.debug(f"My IP is: {ip.strip()}") logger.debug(f"My IP is: {ip.strip()}")
else: else:
@@ -456,7 +555,9 @@ async def maigret(
logger, logger,
query_notify=None, query_notify=None,
proxy=None, proxy=None,
timeout=None, tor_proxy=None,
i2p_proxy=None,
timeout=3,
is_parsing_enabled=False, is_parsing_enabled=False,
id_type="username", id_type="username",
debug=False, debug=False,
@@ -465,6 +566,7 @@ async def maigret(
no_progressbar=False, no_progressbar=False,
cookies=None, cookies=None,
retries=0, retries=0,
check_domains=False,
) -> QueryResultWrapper: ) -> QueryResultWrapper:
"""Main search func """Main search func
@@ -478,7 +580,7 @@ async def maigret(
query results. query results.
logger -- Standard Python logger object. logger -- Standard Python logger object.
timeout -- Time in seconds to wait before timing out request. timeout -- Time in seconds to wait before timing out request.
Default is no timeout. Default is 3 seconds.
is_parsing_enabled -- Extract additional info from account pages. is_parsing_enabled -- Extract additional info from account pages.
id_type -- Type of username to search. id_type -- Type of username to search.
Default is 'username', see all supported here: Default is 'username', see all supported here:
@@ -508,23 +610,36 @@ async def maigret(
query_notify.start(username, id_type) query_notify.start(username, id_type)
# make http client session
connector = (
ProxyConnector.from_url(proxy) if proxy else aiohttp.TCPConnector(ssl=False)
)
connector.verify_ssl = False
cookie_jar = None cookie_jar = None
if cookies: if cookies:
logger.debug(f"Using cookies jar file {cookies}") logger.debug(f"Using cookies jar file {cookies}")
cookie_jar = await import_aiohttp_cookies(cookies) cookie_jar = import_aiohttp_cookies(cookies)
session = aiohttp.ClientSession( clearweb_checker = SimpleAiohttpChecker(
connector=connector, trust_env=True, cookie_jar=cookie_jar proxy=proxy, cookie_jar=cookie_jar, logger=logger
) )
# TODO
tor_checker = CheckerMock()
if tor_proxy:
tor_checker = ProxiedAiohttpChecker( # type: ignore
proxy=tor_proxy, cookie_jar=cookie_jar, logger=logger
)
# TODO
i2p_checker = CheckerMock()
if i2p_proxy:
i2p_checker = ProxiedAiohttpChecker( # type: ignore
proxy=i2p_proxy, cookie_jar=cookie_jar, logger=logger
)
# TODO
dns_checker = CheckerMock()
if check_domains:
dns_checker = AiodnsDomainResolver(logger=logger) # type: ignore
if logger.level == logging.DEBUG: if logger.level == logging.DEBUG:
await debug_ip_request(session, logger) await debug_ip_request(clearweb_checker, logger)
# setup parallel executor # setup parallel executor
executor: Optional[AsyncExecutor] = None executor: Optional[AsyncExecutor] = None
@@ -538,7 +653,12 @@ async def maigret(
# make options objects for all the requests # make options objects for all the requests
options: QueryOptions = {} options: QueryOptions = {}
options["cookies"] = cookie_jar options["cookies"] = cookie_jar
options["session"] = session options["checkers"] = {
'': clearweb_checker,
'tor': tor_checker,
'dns': dns_checker,
'i2p': i2p_checker,
}
options["parsing"] = is_parsing_enabled options["parsing"] = is_parsing_enabled
options["timeout"] = timeout options["timeout"] = timeout
options["id_type"] = id_type options["id_type"] = id_type
@@ -591,7 +711,11 @@ async def maigret(
) )
# closing http client session # closing http client session
await session.close() await clearweb_checker.close()
if tor_proxy:
await tor_checker.close()
if i2p_proxy:
await i2p_checker.close()
# notify caller that all queries are finished # notify caller that all queries are finished
query_notify.finish() query_notify.finish()
@@ -625,7 +749,13 @@ def timeout_check(value):
async def site_self_check( async def site_self_check(
site: MaigretSite, logger, semaphore, db: MaigretDatabase, silent=False site: MaigretSite,
logger,
semaphore,
db: MaigretDatabase,
silent=False,
tor_proxy=None,
i2p_proxy=None,
): ):
changes = { changes = {
"disabled": False, "disabled": False,
@@ -649,6 +779,8 @@ async def site_self_check(
forced=True, forced=True,
no_progressbar=True, no_progressbar=True,
retries=1, retries=1,
tor_proxy=tor_proxy,
i2p_proxy=i2p_proxy,
) )
# don't disable entries with other ids types # don't disable entries with other ids types
@@ -658,6 +790,8 @@ async def site_self_check(
changes["disabled"] = True changes["disabled"] = True
continue continue
logger.debug(results_dict)
result = results_dict[site.name]["status"] result = results_dict[site.name]["status"]
site_status = result.status site_status = result.status
@@ -696,7 +830,13 @@ async def site_self_check(
async def self_check( async def self_check(
db: MaigretDatabase, site_data: dict, logger, silent=False, max_connections=10 db: MaigretDatabase,
site_data: dict,
logger,
silent=False,
max_connections=10,
tor_proxy=None,
i2p_proxy=None,
) -> bool: ) -> bool:
sem = asyncio.Semaphore(max_connections) sem = asyncio.Semaphore(max_connections)
tasks = [] tasks = []
@@ -708,7 +848,9 @@ async def self_check(
disabled_old_count = disabled_count(all_sites.values()) disabled_old_count = disabled_count(all_sites.values())
for _, site in all_sites.items(): for _, site in all_sites.items():
check_coro = site_self_check(site, logger, sem, db, silent) check_coro = site_self_check(
site, logger, sem, db, silent, tor_proxy, i2p_proxy
)
future = asyncio.ensure_future(check_coro) future = asyncio.ensure_future(check_coro)
tasks.append(future) tasks.append(future)
+47 -1
View File
@@ -32,6 +32,8 @@ from .report import (
save_txt_report, save_txt_report,
SUPPORTED_JSON_REPORT_FORMATS, SUPPORTED_JSON_REPORT_FORMATS,
save_json_report, save_json_report,
get_plaintext_report,
sort_report_by_data_points,
) )
from .sites import MaigretDatabase from .sites import MaigretDatabase
from .submit import submit_dialog from .submit import submit_dialog
@@ -237,6 +239,26 @@ def setup_arguments_parser():
default=None, default=None,
help="Make requests over a proxy. e.g. socks5://127.0.0.1:1080", help="Make requests over a proxy. e.g. socks5://127.0.0.1:1080",
) )
parser.add_argument(
"--tor-proxy",
metavar='TOR_PROXY_URL',
action="store",
default='socks5://127.0.0.1:9050',
help="Specify URL of your Tor gateway. Default is socks5://127.0.0.1:9050",
)
parser.add_argument(
"--i2p-proxy",
metavar='I2P_PROXY_URL',
action="store",
default='http://127.0.0.1:4444',
help="Specify URL of your I2P gateway. Default is http://127.0.0.1:4444",
)
parser.add_argument(
"--with-domains",
action="store_true",
default=False,
help="Enable (experimental) feature of checking domains on usernames.",
)
filter_group = parser.add_argument_group( filter_group = parser.add_argument_group(
'Site filtering', 'Options to set site search scope' 'Site filtering', 'Options to set site search scope'
@@ -419,6 +441,13 @@ def setup_arguments_parser():
help=f"Generate a JSON report of specific type: {', '.join(SUPPORTED_JSON_REPORT_FORMATS)}" help=f"Generate a JSON report of specific type: {', '.join(SUPPORTED_JSON_REPORT_FORMATS)}"
" (one report per username).", " (one report per username).",
) )
parser.add_argument(
"--reports-sorting",
default='default',
choices=('default', 'data'),
help="Method of results sorting in reports (default: in order of getting the result)",
)
return parser return parser
@@ -507,7 +536,12 @@ async def main():
if args.self_check: if args.self_check:
print('Maigret sites database self-checking...') print('Maigret sites database self-checking...')
is_need_update = await self_check( is_need_update = await self_check(
db, site_data, logger, max_connections=args.connections db,
site_data,
logger,
max_connections=args.connections,
tor_proxy=args.tor_proxy,
i2p_proxy=args.i2p_proxy,
) )
if is_need_update: if is_need_update:
if input('Do you want to save changes permanently? [Yn]\n').lower() in ( if input('Do you want to save changes permanently? [Yn]\n').lower() in (
@@ -583,6 +617,8 @@ async def main():
site_dict=dict(sites_to_check), site_dict=dict(sites_to_check),
query_notify=query_notify, query_notify=query_notify,
proxy=args.proxy, proxy=args.proxy,
tor_proxy=args.tor_proxy,
i2p_proxy=args.i2p_proxy,
timeout=args.timeout, timeout=args.timeout,
is_parsing_enabled=parsing_enabled, is_parsing_enabled=parsing_enabled,
id_type=id_type, id_type=id_type,
@@ -593,10 +629,14 @@ async def main():
max_connections=args.connections, max_connections=args.connections,
no_progressbar=args.no_progressbar, no_progressbar=args.no_progressbar,
retries=args.retries, retries=args.retries,
check_domains=args.with_domains,
) )
notify_about_errors(results, query_notify) notify_about_errors(results, query_notify)
if args.reports_sorting == "data":
results = sort_report_by_data_points(results)
general_results.append((username, id_type, results)) general_results.append((username, id_type, results))
# TODO: tests # TODO: tests
@@ -646,6 +686,12 @@ async def main():
filename = report_filepath_tpl.format(username=username, postfix='.pdf') filename = report_filepath_tpl.format(username=username, postfix='.pdf')
save_pdf_report(filename, report_context) save_pdf_report(filename, report_context)
query_notify.warning(f'PDF report on all usernames saved in {filename}') query_notify.warning(f'PDF report on all usernames saved in {filename}')
text_report = get_plaintext_report(report_context)
if text_report:
query_notify.info('Short text report:')
print(text_report)
# update database # update database
db.save_to_file(args.db_file) db.save_to_file(args.db_file)
+10 -3
View File
@@ -205,13 +205,20 @@ class QueryNotifyPrint(QueryNotify):
else: else:
print(f"[*] {title} {message} on:") print(f"[*] {title} {message} on:")
def warning(self, message, symbol="-"): def _colored_print(self, fore_color, msg):
msg = f"[{symbol}] {message}"
if self.color: if self.color:
print(Style.BRIGHT + Fore.YELLOW + msg) print(Style.BRIGHT + fore_color + msg)
else: else:
print(msg) print(msg)
def warning(self, message, symbol="-"):
msg = f"[{symbol}] {message}"
self._colored_print(Fore.YELLOW, msg)
def info(self, message, symbol="*"):
msg = f"[{symbol}] {message}"
self._colored_print(Fore.BLUE, msg)
def update(self, result, is_similar=False): def update(self, result, is_similar=False):
"""Notify Update. """Notify Update.
+47 -9
View File
@@ -36,6 +36,18 @@ def filter_supposed_data(data):
return filtered_supposed_data return filtered_supposed_data
def sort_report_by_data_points(results):
return dict(
sorted(
results.items(),
key=lambda x: len(
(x[1].get('status') and x[1]['status'].ids_data or {}).keys()
),
reverse=True,
)
)
""" """
REPORTS SAVING REPORTS SAVING
""" """
@@ -70,6 +82,17 @@ def save_json_report(filename: str, username: str, results: dict, report_type: s
generate_json_report(username, results, f, report_type=report_type) generate_json_report(username, results, f, report_type=report_type)
def get_plaintext_report(context: dict) -> str:
output = (context['brief'] + " ").replace('. ', '.\n')
interests = list(map(lambda x: x[0], context.get('interests_tuple_list', [])))
countries = list(map(lambda x: x[0], context.get('countries_tuple_list', [])))
if countries:
output += f'Countries: {", ".join(countries)}\n'
if interests:
output += f'Interests (tags): {", ".join(interests)}\n'
return output.strip()
""" """
REPORTS GENERATING REPORTS GENERATING
""" """
@@ -215,6 +238,7 @@ def generate_report_context(username_results: list):
return { return {
"username": first_username, "username": first_username,
# TODO: return brief list
"brief": brief, "brief": brief,
"results": username_results, "results": username_results,
"first_seen": first_seen, "first_seen": first_seen,
@@ -231,14 +255,18 @@ def generate_csv_report(username: str, results: dict, csvfile):
["username", "name", "url_main", "url_user", "exists", "http_status"] ["username", "name", "url_main", "url_user", "exists", "http_status"]
) )
for site in results: for site in results:
# TODO: fix the reason
status = 'Unknown'
if "status" in results[site]:
status = str(results[site]["status"].status)
writer.writerow( writer.writerow(
[ [
username, username,
site, site,
results[site]["url_main"], results[site].get("url_main", ""),
results[site]["url_user"], results[site].get("url_user", ""),
str(results[site]["status"].status), status,
results[site]["http_status"], results[site].get("http_status", 0),
] ]
) )
@@ -250,7 +278,10 @@ def generate_txt_report(username: str, results: dict, file):
# TODO: fix no site data issue # TODO: fix no site data issue
if not dictionary: if not dictionary:
continue continue
if dictionary.get("status").status == QueryStatus.CLAIMED: if (
dictionary.get("status")
and dictionary["status"].status == QueryStatus.CLAIMED
):
exists_counter += 1 exists_counter += 1
file.write(dictionary["url_user"] + "\n") file.write(dictionary["url_user"] + "\n")
file.write(f"Total Websites Username Detected On : {exists_counter}") file.write(f"Total Websites Username Detected On : {exists_counter}")
@@ -263,14 +294,18 @@ def generate_json_report(username: str, results: dict, file, report_type):
for sitename in results: for sitename in results:
site_result = results[sitename] site_result = results[sitename]
# TODO: fix no site data issue # TODO: fix no site data issue
if not site_result or site_result.get("status").status != QueryStatus.CLAIMED: if not site_result or not site_result.get("status"):
continue
if site_result["status"].status != QueryStatus.CLAIMED:
continue continue
data = dict(site_result) data = dict(site_result)
data["status"] = data["status"].json() data["status"] = data["status"].json()
data["site"] = data["site"].json data["site"] = data["site"].json
if "future" in data: for field in ["future", "checker"]:
del data["future"] if field in data:
del data[field]
if is_report_per_line: if is_report_per_line:
data["sitename"] = sitename data["sitename"] = sitename
@@ -319,8 +354,11 @@ def design_xmind_sheet(sheet, username, results):
for website_name in results: for website_name in results:
dictionary = results[website_name] dictionary = results[website_name]
if not dictionary:
continue
result_status = dictionary.get("status") result_status = dictionary.get("status")
if result_status.status != QueryStatus.CLAIMED: # TODO: fix the reason
if not result_status or result_status.status != QueryStatus.CLAIMED:
continue continue
stripped_tags = list(map(lambda x: x.strip(), result_status.tags)) stripped_tags = list(map(lambda x: x.strip(), result_status.tags))
+2322 -1907
View File
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -68,7 +68,7 @@
<div class="row-mb"> <div class="row-mb">
<div class="col-md"> <div class="col-md">
<div class="card flex-md-row mb-4 box-shadow h-md-250"> <div class="card flex-md-row mb-4 box-shadow h-md-250">
<img class="card-img-right flex-auto d-md-block" alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="{{ v.status.ids_data.image or 'https://i.imgur.com/040fmbw.png' }}" data-holder-rendered="true"> <img class="card-img-right flex-auto d-md-block" alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="{{ v.status and v.status.ids_data and v.status.ids_data.image or 'https://i.imgur.com/040fmbw.png' }}" data-holder-rendered="true">
<div class="card-body d-flex flex-column align-items-start" style="padding-top: 0;"> <div class="card-body d-flex flex-column align-items-start" style="padding-top: 0;">
<h3 class="mb-0" style="padding-top: 1rem;"> <h3 class="mb-0" style="padding-top: 1rem;">
<a class="text-dark" href="{{ v.url_main }}" target="_blank">{{ k }}</a> <a class="text-dark" href="{{ v.url_main }}" target="_blank">{{ k }}</a>
+13 -2
View File
@@ -61,9 +61,12 @@ SUPPORTED_TAGS = [
"military", "military",
"auto", "auto",
"gambling", "gambling",
"business",
"cybercriminal", "cybercriminal",
"review", "review",
"bookmarks",
"design",
"tor",
"i2p",
] ]
@@ -121,6 +124,8 @@ class MaigretSite:
alexa_rank = None alexa_rank = None
source = None source = None
protocol = ''
def __init__(self, name, information): def __init__(self, name, information):
self.name = name self.name = name
self.url_subpath = "" self.url_subpath = ""
@@ -300,12 +305,18 @@ class MaigretDatabase:
lambda x: isinstance(x.engine, str) and x.engine.lower() in normalized_tags lambda x: isinstance(x.engine, str) and x.engine.lower() in normalized_tags
) )
is_tags_ok = lambda x: set(x.tags).intersection(set(normalized_tags)) is_tags_ok = lambda x: set(x.tags).intersection(set(normalized_tags))
is_protocol_in_tags = lambda x: x.protocol and x.protocol in normalized_tags
is_disabled_needed = lambda x: not x.disabled or ( is_disabled_needed = lambda x: not x.disabled or (
"disabled" in tags or disabled "disabled" in tags or disabled
) )
is_id_type_ok = lambda x: x.type == id_type is_id_type_ok = lambda x: x.type == id_type
filter_tags_engines_fun = lambda x: not tags or is_engine_ok(x) or is_tags_ok(x) filter_tags_engines_fun = (
lambda x: not tags
or is_engine_ok(x)
or is_tags_ok(x)
or is_protocol_in_tags(x)
)
filter_names_fun = lambda x: not names or is_name_ok(x) or is_source_ok(x) filter_names_fun = lambda x: not names or is_name_ok(x) or is_source_ok(x)
filter_fun = ( filter_fun = (
+31 -10
View File
@@ -32,6 +32,8 @@ HEADERS = {
"User-Agent": get_random_user_agent(), "User-Agent": get_random_user_agent(),
} }
SEPARATORS = "\"'"
RATIO = 0.6 RATIO = 0.6
TOP_FEATURES = 5 TOP_FEATURES = 5
URL_RE = re.compile(r"https?://(www\.)?") URL_RE = re.compile(r"https?://(www\.)?")
@@ -195,7 +197,7 @@ async def detect_known_engine(
def extract_username_dialog(url): def extract_username_dialog(url):
url_parts = url.rstrip("/").split("/") url_parts = url.rstrip("/").split("/")
supposed_username = url_parts[-1] supposed_username = url_parts[-1].strip('@')
entered_username = input( entered_username = input(
f'Is "{supposed_username}" a valid username? If not, write it manually: ' f'Is "{supposed_username}" a valid username? If not, write it manually: '
) )
@@ -203,38 +205,53 @@ def extract_username_dialog(url):
async def check_features_manually( async def check_features_manually(
db, url_exists, url_mainpage, cookie_file, logger, redirects=True db, url_exists, url_mainpage, cookie_file, logger, redirects=False
): ):
custom_headers = {}
while True:
header_key = input(
'Specify custom header if you need or just press Enter to skip. Header name: '
)
if not header_key:
break
header_value = input('Header value: ')
custom_headers[header_key.strip()] = header_value.strip()
supposed_username = extract_username_dialog(url_exists) supposed_username = extract_username_dialog(url_exists)
non_exist_username = "noonewouldeverusethis7" non_exist_username = "noonewouldeverusethis7"
url_user = url_exists.replace(supposed_username, "{username}") url_user = url_exists.replace(supposed_username, "{username}")
url_not_exists = url_exists.replace(supposed_username, non_exist_username) url_not_exists = url_exists.replace(supposed_username, non_exist_username)
headers = dict(HEADERS)
headers.update(custom_headers)
# cookies # cookies
cookie_dict = None cookie_dict = None
if cookie_file: if cookie_file:
logger.info(f'Use {cookie_file} for cookies') logger.info(f'Use {cookie_file} for cookies')
cookie_jar = await import_aiohttp_cookies(cookie_file) cookie_jar = import_aiohttp_cookies(cookie_file)
cookie_dict = {c.key: c.value for c in cookie_jar} cookie_dict = {c.key: c.value for c in cookie_jar}
exists_resp = requests.get( exists_resp = requests.get(
url_exists, cookies=cookie_dict, headers=HEADERS, allow_redirects=redirects url_exists, cookies=cookie_dict, headers=headers, allow_redirects=redirects
) )
logger.debug(url_exists)
logger.debug(exists_resp.status_code) logger.debug(exists_resp.status_code)
logger.debug(exists_resp.text) logger.debug(exists_resp.text)
non_exists_resp = requests.get( non_exists_resp = requests.get(
url_not_exists, cookies=cookie_dict, headers=HEADERS, allow_redirects=redirects url_not_exists, cookies=cookie_dict, headers=headers, allow_redirects=redirects
) )
logger.debug(url_not_exists)
logger.debug(non_exists_resp.status_code) logger.debug(non_exists_resp.status_code)
logger.debug(non_exists_resp.text) logger.debug(non_exists_resp.text)
a = exists_resp.text a = exists_resp.text
b = non_exists_resp.text b = non_exists_resp.text
tokens_a = set(a.split('"')) tokens_a = set(re.split(f'[{SEPARATORS}]', a))
tokens_b = set(b.split('"')) tokens_b = set(re.split(f'[{SEPARATORS}]', b))
a_minus_b = tokens_a.difference(tokens_b) a_minus_b = tokens_a.difference(tokens_b)
b_minus_a = tokens_b.difference(tokens_a) b_minus_a = tokens_b.difference(tokens_a)
@@ -255,7 +272,7 @@ async def check_features_manually(
features = input("If features was not detected correctly, write it manually: ") features = input("If features was not detected correctly, write it manually: ")
if features: if features:
presence_list = features.split(",") presence_list = list(map(str.strip, features.split(",")))
absence_list = sorted(b_minus_a, key=get_match_ratio, reverse=True)[ absence_list = sorted(b_minus_a, key=get_match_ratio, reverse=True)[
:top_features_count :top_features_count
@@ -264,7 +281,7 @@ async def check_features_manually(
features = input("If features was not detected correctly, write it manually: ") features = input("If features was not detected correctly, write it manually: ")
if features: if features:
absence_list = features.split(",") absence_list = list(map(str.strip, features.split(",")))
site_data = { site_data = {
"absenceStrs": absence_list, "absenceStrs": absence_list,
@@ -276,6 +293,9 @@ async def check_features_manually(
"checkType": "message", "checkType": "message",
} }
if headers != HEADERS:
site_data['headers'] = headers
site = MaigretSite(url_mainpage.split("/")[-1], site_data) site = MaigretSite(url_mainpage.split("/")[-1], site_data)
return site return site
@@ -283,6 +303,7 @@ async def check_features_manually(
async def submit_dialog(db, url_exists, cookie_file, logger): async def submit_dialog(db, url_exists, cookie_file, logger):
domain_raw = URL_RE.sub("", url_exists).strip().strip("/") domain_raw = URL_RE.sub("", url_exists).strip().strip("/")
domain_raw = domain_raw.split("/")[0] domain_raw = domain_raw.split("/")[0]
logger.info('Domain is %s', domain_raw)
# check for existence # check for existence
matched_sites = list(filter(lambda x: domain_raw in x.url_main + x.url, db.sites)) matched_sites = list(filter(lambda x: domain_raw in x.url_main + x.url, db.sites))
@@ -355,7 +376,7 @@ async def submit_dialog(db, url_exists, cookie_file, logger):
return False return False
chosen_site.name = input("Change site name if you want: ") or chosen_site.name chosen_site.name = input("Change site name if you want: ") or chosen_site.name
chosen_site.tags = input("Site tags: ").split(',') chosen_site.tags = list(map(str.strip, input("Site tags: ").split(',')))
rank = get_alexa_rank(chosen_site.url_main) rank = get_alexa_rank(chosen_site.url_main)
if rank: if rank:
print(f'New alexa rank: {rank}') print(f'New alexa rank: {rank}')
+8 -1
View File
@@ -1,5 +1,7 @@
import ast
import re import re
import random import random
from typing import Any
DEFAULT_USER_AGENTS = [ DEFAULT_USER_AGENTS = [
@@ -65,6 +67,10 @@ class URLMatcher:
return re.compile(regexp_str) return re.compile(regexp_str)
def ascii_data_display(data: str) -> Any:
return ast.literal_eval(data)
def get_dict_ascii_tree(items, prepend="", new_line=True): def get_dict_ascii_tree(items, prepend="", new_line=True):
text = "" text = ""
for num, item in enumerate(items): for num, item in enumerate(items):
@@ -75,7 +81,8 @@ def get_dict_ascii_tree(items, prepend="", new_line=True):
if field_value.startswith("['"): if field_value.startswith("['"):
is_last_item = num == len(items) - 1 is_last_item = num == len(items) - 1
prepend_symbols = " " * 3 if is_last_item else "" prepend_symbols = " " * 3 if is_last_item else ""
field_value = get_dict_ascii_tree(eval(field_value), prepend_symbols) data = ascii_data_display(field_value)
field_value = get_dict_ascii_tree(data, prepend_symbols)
text += f"\n{prepend}{box_symbol}{field_name}: {field_value}" text += f"\n{prepend}{box_symbol}{field_name}: {field_value}"
else: else:
text += f"\n{prepend}{box_symbol} {item}" text += f"\n{prepend}{box_symbol} {item}"
+2 -1
View File
@@ -1,3 +1,4 @@
aiodns==3.0.0
aiohttp==3.7.4 aiohttp==3.7.4
aiohttp-socks==0.5.5 aiohttp-socks==0.5.5
arabic-reshaper==2.1.1 arabic-reshaper==2.1.1
@@ -26,7 +27,7 @@ python-socks==1.1.2
requests>=2.24.0 requests>=2.24.0
requests-futures==1.0.0 requests-futures==1.0.0
six==1.15.0 six==1.15.0
socid-extractor>=0.0.19 socid-extractor>=0.0.21
soupsieve==2.1 soupsieve==2.1
stem==1.8.0 stem==1.8.0
torrequest==0.1.0 torrequest==0.1.0
+1 -1
View File
@@ -12,7 +12,7 @@ with open('requirements.txt') as rf:
requires = rf.read().splitlines() requires = rf.read().splitlines()
setup(name='maigret', setup(name='maigret',
version='0.2.3', version='0.3.0',
description='Collect a dossier on a person by username from a huge number of sites', description='Collect a dossier on a person by username from a huge number of sites',
long_description=long_description, long_description=long_description,
long_description_content_type="text/markdown", long_description_content_type="text/markdown",
+1408 -1345
View File
File diff suppressed because it is too large Load Diff
+6
View File
@@ -0,0 +1,6 @@
flake8==3.8.4
pytest==6.2.4
pytest-asyncio==0.14.0
pytest-cov==2.10.1
pytest-httpserver==1.0.0
pytest-rerunfailures==9.1.1
+12 -5
View File
@@ -12,6 +12,7 @@ from maigret.maigret import setup_arguments_parser
CUR_PATH = os.path.dirname(os.path.realpath(__file__)) CUR_PATH = os.path.dirname(os.path.realpath(__file__))
JSON_FILE = os.path.join(CUR_PATH, '../maigret/resources/data.json') JSON_FILE = os.path.join(CUR_PATH, '../maigret/resources/data.json')
TEST_JSON_FILE = os.path.join(CUR_PATH, 'db.json') TEST_JSON_FILE = os.path.join(CUR_PATH, 'db.json')
LOCAL_TEST_JSON_FILE = os.path.join(CUR_PATH, 'local.json')
empty_mark = Mark('', (), {}) empty_mark = Mark('', (), {})
@@ -36,16 +37,17 @@ def remove_test_reports():
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def default_db(): def default_db():
db = MaigretDatabase().load_from_file(JSON_FILE) return MaigretDatabase().load_from_file(JSON_FILE)
return db
@pytest.fixture(scope='function') @pytest.fixture(scope='function')
def test_db(): def test_db():
db = MaigretDatabase().load_from_file(TEST_JSON_FILE) return MaigretDatabase().load_from_file(TEST_JSON_FILE)
return db
@pytest.fixture(scope='function')
def local_test_db():
return MaigretDatabase().load_from_file(LOCAL_TEST_JSON_FILE)
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
@@ -58,3 +60,8 @@ def reports_autoclean():
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def argparser(): def argparser():
return setup_arguments_parser() return setup_arguments_parser()
@pytest.fixture(scope="session")
def httpserver_listen_address():
return ("localhost", 8989)
+21
View File
@@ -0,0 +1,21 @@
{
"engines": {},
"sites": {
"StatusCode": {
"checkType": "status_code",
"url": "http://localhost:8989/url?id={username}",
"urlMain": "http://localhost:8989/",
"usernameClaimed": "claimed",
"usernameUnclaimed": "unclaimed"
},
"Message": {
"checkType": "message",
"url": "http://localhost:8989/url?id={username}",
"urlMain": "http://localhost:8989/",
"presenseStrs": ["user", "profile"],
"absenseStrs": ["not found", "404"],
"usernameClaimed": "claimed",
"usernameUnclaimed": "unclaimed"
}
}
}
+2 -1
View File
@@ -22,6 +22,7 @@ httpbin.org FALSE / FALSE 0 a b
""" """
@pytest.mark.skip(reason="periodically fails")
@pytest.mark.slow @pytest.mark.slow
def test_twitter_activation(default_db): def test_twitter_activation(default_db):
twitter_site = default_db.sites_dict['Twitter'] twitter_site = default_db.sites_dict['Twitter']
@@ -39,7 +40,7 @@ async def test_import_aiohttp_cookies():
with open(cookies_filename, 'w') as f: with open(cookies_filename, 'w') as f:
f.write(COOKIES_TXT) f.write(COOKIES_TXT)
cookie_jar = await import_aiohttp_cookies(cookies_filename) cookie_jar = import_aiohttp_cookies(cookies_filename)
assert list(cookie_jar._cookies.keys()) == ['xss.is', 'httpbin.org'] assert list(cookie_jar._cookies.keys()) == ['xss.is', 'httpbin.org']
url = 'https://httpbin.org/cookies' url = 'https://httpbin.org/cookies'
+69
View File
@@ -0,0 +1,69 @@
from mock import Mock
import pytest
from maigret import search
def site_result_except(server, username, **kwargs):
query = f'id={username}'
server.expect_request('/url', query_string=query).respond_with_data(**kwargs)
@pytest.mark.slow
@pytest.mark.asyncio
async def test_checking_by_status_code(httpserver, local_test_db):
sites_dict = local_test_db.sites_dict
site_result_except(httpserver, 'claimed', status=200)
site_result_except(httpserver, 'unclaimed', status=404)
result = await search('claimed', site_dict=sites_dict, logger=Mock())
assert result['StatusCode']['status'].is_found() is True
result = await search('unclaimed', site_dict=sites_dict, logger=Mock())
assert result['StatusCode']['status'].is_found() is False
@pytest.mark.slow
@pytest.mark.asyncio
async def test_checking_by_message_positive_full(httpserver, local_test_db):
sites_dict = local_test_db.sites_dict
site_result_except(httpserver, 'claimed', response_data="user profile")
site_result_except(httpserver, 'unclaimed', response_data="404 not found")
result = await search('claimed', site_dict=sites_dict, logger=Mock())
assert result['Message']['status'].is_found() is True
result = await search('unclaimed', site_dict=sites_dict, logger=Mock())
assert result['Message']['status'].is_found() is False
@pytest.mark.slow
@pytest.mark.asyncio
async def test_checking_by_message_positive_part(httpserver, local_test_db):
sites_dict = local_test_db.sites_dict
site_result_except(httpserver, 'claimed', response_data="profile")
site_result_except(httpserver, 'unclaimed', response_data="404")
result = await search('claimed', site_dict=sites_dict, logger=Mock())
assert result['Message']['status'].is_found() is True
result = await search('unclaimed', site_dict=sites_dict, logger=Mock())
assert result['Message']['status'].is_found() is False
@pytest.mark.slow
@pytest.mark.asyncio
async def test_checking_by_message_negative(httpserver, local_test_db):
sites_dict = local_test_db.sites_dict
site_result_except(httpserver, 'claimed', response_data="")
site_result_except(httpserver, 'unclaimed', response_data="user 404")
result = await search('claimed', site_dict=sites_dict, logger=Mock())
assert result['Message']['status'].is_found() is False
result = await search('unclaimed', site_dict=sites_dict, logger=Mock())
assert result['Message']['status'].is_found() is True
+4
View File
@@ -25,17 +25,21 @@ DEFAULT_ARGS: Dict[str, Any] = {
'print_check_errors': False, 'print_check_errors': False,
'print_not_found': False, 'print_not_found': False,
'proxy': None, 'proxy': None,
'reports_sorting': 'default',
'retries': 1, 'retries': 1,
'self_check': False, 'self_check': False,
'site_list': [], 'site_list': [],
'stats': False, 'stats': False,
'tags': '', 'tags': '',
'timeout': 30, 'timeout': 30,
'tor_proxy': 'socks5://127.0.0.1:9050',
'i2p_proxy': 'http://127.0.0.1:4444',
'top_sites': 500, 'top_sites': 500,
'txt': False, 'txt': False,
'use_disabled_sites': False, 'use_disabled_sites': False,
'username': [], 'username': [],
'verbose': False, 'verbose': False,
'with_domains': False,
'xmind': False, 'xmind': False,
} }
+15
View File
@@ -0,0 +1,15 @@
"""Maigret data test functions"""
from maigret.utils import is_country_tag
from maigret.sites import SUPPORTED_TAGS
def test_tags_validity(default_db):
unknown_tags = set()
for site in default_db.sites:
for tag in filter(lambda x: not is_country_tag(x), site.tags):
if tag not in SUPPORTED_TAGS:
unknown_tags.add(tag)
assert unknown_tags == set()
+1
View File
@@ -138,6 +138,7 @@ def test_maigret_results(test_db):
assert results['Reddit'].get('future') is None assert results['Reddit'].get('future') is None
del results['GooglePlayStore']['future'] del results['GooglePlayStore']['future']
del results['GooglePlayStore']['checker']
assert results == RESULTS_EXAMPLE assert results == RESULTS_EXAMPLE
+109 -2
View File
@@ -16,6 +16,7 @@ from maigret.report import (
generate_report_template, generate_report_template,
generate_report_context, generate_report_context,
generate_json_report, generate_json_report,
get_plaintext_report,
) )
from maigret.result import QueryResult, QueryStatus from maigret.result import QueryResult, QueryStatus
from maigret.sites import MaigretSite from maigret.sites import MaigretSite
@@ -44,6 +45,19 @@ EXAMPLE_RESULTS = {
} }
} }
BROKEN_RESULTS = {
'GitHub': {
'username': 'test',
'parsing_enabled': True,
'url_main': 'https://www.github.com/',
'url_user': 'https://www.github.com/test',
'http_status': 200,
'is_similar': False,
'rank': 78,
'site': MaigretSite('test', {}),
}
}
GOOD_500PX_RESULT = copy.deepcopy(GOOD_RESULT) GOOD_500PX_RESULT = copy.deepcopy(GOOD_RESULT)
GOOD_500PX_RESULT.tags = ['photo', 'us', 'global'] GOOD_500PX_RESULT.tags = ['photo', 'us', 'global']
GOOD_500PX_RESULT.ids_data = { GOOD_500PX_RESULT.ids_data = {
@@ -238,10 +252,13 @@ TEST = [
] ]
SUPPOSED_BRIEF = """Search by username alexaimephotographycars returned 1 accounts. Found target's other IDs: alexaimephotography, Alexaimephotogr. Search by username alexaimephotography returned 2 accounts. Search by username Alexaimephotogr returned 1 accounts. Extended info extracted from 3 accounts.""" SUPPOSED_BRIEF = """Search by username alexaimephotographycars returned 1 accounts. Found target's other IDs: alexaimephotography, Alexaimephotogr. Search by username alexaimephotography returned 2 accounts. Search by username Alexaimephotogr returned 1 accounts. Extended info extracted from 3 accounts."""
SUPPOSED_BROKEN_BRIEF = """Search by username alexaimephotographycars returned 0 accounts. Search by username alexaimephotography returned 2 accounts. Search by username Alexaimephotogr returned 1 accounts. Extended info extracted from 2 accounts."""
SUPPOSED_INTERESTS = "Interests: photo <span class=\"text-muted\">(2)</span>, news <span class=\"text-muted\">(1)</span>, social <span class=\"text-muted\">(1)</span>"
SUPPOSED_GEO = "Geo: us <span class=\"text-muted\">(3)</span>" SUPPOSED_GEO = "Geo: us <span class=\"text-muted\">(3)</span>"
SUPPOSED_BROKEN_GEO = "Geo: us <span class=\"text-muted\">(2)</span>"
SUPPOSED_INTERESTS = "Interests: photo <span class=\"text-muted\">(2)</span>, news <span class=\"text-muted\">(1)</span>, social <span class=\"text-muted\">(1)</span>"
SUPPOSED_BROKEN_INTERESTS = "Interests: news <span class=\"text-muted\">(1)</span>, photo <span class=\"text-muted\">(1)</span>, social <span class=\"text-muted\">(1)</span>"
def test_generate_report_template(): def test_generate_report_template():
@@ -269,6 +286,19 @@ def test_generate_csv_report():
] ]
def test_generate_csv_report_broken():
csvfile = StringIO()
generate_csv_report('test', BROKEN_RESULTS, csvfile)
csvfile.seek(0)
data = csvfile.readlines()
assert data == [
'username,name,url_main,url_user,exists,http_status\r\n',
'test,GitHub,https://www.github.com/,https://www.github.com/test,Unknown,200\r\n',
]
def test_generate_txt_report(): def test_generate_txt_report():
txtfile = StringIO() txtfile = StringIO()
generate_txt_report('test', EXAMPLE_RESULTS, txtfile) generate_txt_report('test', EXAMPLE_RESULTS, txtfile)
@@ -282,6 +312,18 @@ def test_generate_txt_report():
] ]
def test_generate_txt_report_broken():
txtfile = StringIO()
generate_txt_report('test', BROKEN_RESULTS, txtfile)
txtfile.seek(0)
data = txtfile.readlines()
assert data == [
'Total Websites Username Detected On : 0',
]
def test_generate_json_simple_report(): def test_generate_json_simple_report():
jsonfile = StringIO() jsonfile = StringIO()
MODIFIED_RESULTS = dict(EXAMPLE_RESULTS) MODIFIED_RESULTS = dict(EXAMPLE_RESULTS)
@@ -295,6 +337,19 @@ def test_generate_json_simple_report():
assert list(json.loads(data[0]).keys()) == ['GitHub', 'GitHub2'] assert list(json.loads(data[0]).keys()) == ['GitHub', 'GitHub2']
def test_generate_json_simple_report_broken():
jsonfile = StringIO()
MODIFIED_RESULTS = dict(BROKEN_RESULTS)
MODIFIED_RESULTS['GitHub2'] = BROKEN_RESULTS['GitHub']
generate_json_report('test', BROKEN_RESULTS, jsonfile, 'simple')
jsonfile.seek(0)
data = jsonfile.readlines()
assert len(data) == 1
assert list(json.loads(data[0]).keys()) == []
def test_generate_json_ndjson_report(): def test_generate_json_ndjson_report():
jsonfile = StringIO() jsonfile = StringIO()
MODIFIED_RESULTS = dict(EXAMPLE_RESULTS) MODIFIED_RESULTS = dict(EXAMPLE_RESULTS)
@@ -328,6 +383,20 @@ def test_save_xmind_report():
) )
def test_save_xmind_report_broken():
filename = 'report_test.xmind'
save_xmind_report(filename, 'test', BROKEN_RESULTS)
workbook = xmind.load(filename)
sheet = workbook.getPrimarySheet()
data = sheet.getData()
assert data['title'] == 'test Analysis'
assert data['topic']['title'] == 'test'
assert len(data['topic']['topics']) == 1
assert data['topic']['topics'][0]['title'] == 'Undefined'
def test_html_report(): def test_html_report():
report_name = 'report_test.html' report_name = 'report_test.html'
context = generate_report_context(TEST) context = generate_report_context(TEST)
@@ -340,9 +409,47 @@ def test_html_report():
assert SUPPOSED_INTERESTS in report_text assert SUPPOSED_INTERESTS in report_text
def test_html_report_broken():
report_name = 'report_test_broken.html'
BROKEN_DATA = copy.deepcopy(TEST)
BROKEN_DATA[0][2]['500px']['status'] = None
context = generate_report_context(BROKEN_DATA)
save_html_report(report_name, context)
report_text = open(report_name).read()
assert SUPPOSED_BROKEN_BRIEF in report_text
assert SUPPOSED_BROKEN_GEO in report_text
assert SUPPOSED_BROKEN_INTERESTS in report_text
def test_pdf_report(): def test_pdf_report():
report_name = 'report_test.pdf' report_name = 'report_test.pdf'
context = generate_report_context(TEST) context = generate_report_context(TEST)
save_pdf_report(report_name, context) save_pdf_report(report_name, context)
assert os.path.exists(report_name) assert os.path.exists(report_name)
def test_text_report():
context = generate_report_context(TEST)
report_text = get_plaintext_report(context)
for brief_part in SUPPOSED_BRIEF.split():
assert brief_part in report_text
assert 'us' in report_text
assert 'photo' in report_text
def test_text_report_broken():
BROKEN_DATA = copy.deepcopy(TEST)
BROKEN_DATA[0][2]['500px']['status'] = None
context = generate_report_context(BROKEN_DATA)
report_text = get_plaintext_report(context)
for brief_part in SUPPOSED_BROKEN_BRIEF.split():
assert brief_part in report_text
assert 'us' in report_text
assert 'photo' in report_text
+5
View File
@@ -57,6 +57,11 @@ def test_enrich_link_str():
) )
def test_url_extract_main_part_negative():
url_main_part = 'None'
assert URLMatcher.extract_main_part(url_main_part) == ''
def test_url_extract_main_part(): def test_url_extract_main_part():
url_main_part = 'flickr.com/photos/alexaimephotography' url_main_part = 'flickr.com/photos/alexaimephotography'