Compare commits

..

45 Commits

Author SHA1 Message Date
soxoj c6661e22ff Merge pull request #72 from soxoj/v0.1.15
Bump to 0.1.15
2021-03-14 20:15:12 +03:00
Soxoj fdb68b5e80 Bump to 0.1.15 2021-03-14 20:11:32 +03:00
soxoj 9fe6b99239 Merge pull request #71 from soxoj/html-report-img-fix
Fixed HTML report images hiding for small screens + some minor fixes
2021-03-14 17:31:12 +03:00
Soxoj b9d303fde3 Fixed HTML report images hiding for small screens + some minor fixes 2021-03-14 16:15:31 +03:00
soxoj d29e88d96f Merge pull request #70 from soxoj/extracting-flag
Added separate `no-extracing` flag to rule page parsing
2021-03-14 13:22:29 +03:00
Soxoj 731a8e01f9 Added separate no-extracing flag to rule page parsing 2021-03-14 13:03:29 +03:00
soxoj cf7acfd8c8 Merge pull request #69 from soxoj/tiktok-fix
TikTok fixes
2021-03-13 00:02:25 +03:00
soxoj 9e6bd05acc Merge pull request #68 from soxoj/ssl-error-catching
Fixed catching of python-specific exception
2021-03-13 00:00:45 +03:00
Soxoj 6ea1dc33f7 TikTok fixes 2021-03-12 23:58:46 +03:00
Soxoj d5bc92d26a Fixed catching of python-specific exception 2021-03-12 23:34:59 +03:00
soxoj f7263c9b3c Merge pull request #67 from soxoj/fp-fixes
Some false positives fixes
2021-03-12 23:31:54 +03:00
Soxoj e6f82a8ba3 Some false positives fixes 2021-03-12 22:53:53 +03:00
soxoj ba7a38092c Merge pull request #65 from soxoj/dependabot/pip/aiohttp-3.7.4
Bump aiohttp from 3.7.3 to 3.7.4
2021-02-26 22:06:04 +03:00
dependabot[bot] 92a1677213 Bump aiohttp from 3.7.3 to 3.7.4
Bumps [aiohttp](https://github.com/aio-libs/aiohttp) from 3.7.3 to 3.7.4.
- [Release notes](https://github.com/aio-libs/aiohttp/releases)
- [Changelog](https://github.com/aio-libs/aiohttp/blob/master/CHANGES.rst)
- [Commits](https://github.com/aio-libs/aiohttp/compare/v3.7.3...v3.7.4)

Signed-off-by: dependabot[bot] <support@github.com>
2021-02-26 03:07:44 +00:00
soxoj 9bbc5e61a7 Merge pull request #64 from soxoj/version-update
Bump version to 0.1.14
2021-02-25 22:47:31 +03:00
Soxoj da3e3f6719 Bump version to 0.1.14 2021-02-25 22:45:48 +03:00
soxoj d28221462a Merge pull request #63 from soxoj/updates
Updates
2021-02-25 22:35:47 +03:00
Soxoj 5baccbae0c Updates 2021-02-25 22:34:07 +03:00
soxoj 65de06dc13 Merge pull request #62 from soxoj/socid-dep-update
Update socid-extractor version
2021-02-18 23:13:32 +03:00
Soxoj dd71bc19c0 Fix requirements again 2021-02-18 23:11:51 +03:00
Soxoj 0625867f2a Fix requirements conflict 2021-02-18 23:09:40 +03:00
Soxoj ac7ff47fad Update socid-extractor version 2021-02-18 23:06:28 +03:00
soxoj 0449142745 Merge pull request #61 from soxoj/steam-updates
Spotify added, Steam ID processing updated
2021-02-18 20:57:38 +03:00
Soxoj 1a77bc7472 Spotify added, Steam ID processing updated 2021-02-18 20:53:26 +03:00
soxoj 8391d7317d Merge pull request #59 from soxoj/small-updates
Tags updates
2021-02-18 00:48:17 +03:00
soxoj 8bf789633e Merge pull request #58 from Matrix86/main
Fix for docker
2021-02-18 00:37:38 +03:00
Soxoj 2714ff8fff Tags updates 2021-02-18 00:35:59 +03:00
Gianluca b7c02456e7 fix: docker build returns an error on the pillow compilation 2021-02-17 11:51:33 +01:00
soxoj 15af5e14f2 Merge pull request #57 from soxoj/small-updates
Added `--ignore-ids` option, some sites updates
2021-02-17 03:02:04 +03:00
Soxoj f24ad4abfe Added --ignore-ids option, some sites updates 2021-02-17 02:58:57 +03:00
soxoj 2e3eceed81 Merge pull request #56 from soxoj/stats-discourse
Added stats flag, added Discourse engine
2021-02-15 23:21:59 +03:00
Soxoj 9bc3615afc Added stats flag, added Discourse engine 2021-02-15 23:15:09 +03:00
soxoj a9543e8303 Merge pull request #55 from soxoj/username-extraction
Improved extraction of usernames from links in personal data
2021-02-15 01:59:36 +03:00
Soxoj 31df4eb44d Fixed deepcopy problem for 3.6 2021-02-15 01:58:14 +03:00
Soxoj 89c33e5409 Removed pattern typing for compatibility 2021-02-15 01:52:53 +03:00
Soxoj c0956a0e23 Improved extraction of usernames from links in personal data 2021-02-15 01:36:10 +03:00
soxoj bb4c5dc67a Merge pull request #54 from soxoj/sites-update
Added several sites, updated sites list
2021-02-13 23:26:37 +03:00
Soxoj c16fc7c002 Added several sites, updated sites list 2021-02-13 23:24:53 +03:00
soxoj 53f72edaff Merge pull request #53 from soxoj/json-reports-submit-improvements
Added JSON reports
2021-02-13 01:10:55 +03:00
Soxoj 631de7b346 Added reports of JSON format (simple, njdson); improved submit logic; added several sites 2021-02-13 01:06:05 +03:00
soxoj 7676c053f9 Merge pull request #51 from soxoj/submit-mode
Experimental site submit mode
2021-02-09 00:45:41 +03:00
Soxoj 90135d4676 Experimental site submit mode 2021-02-09 00:43:59 +03:00
soxoj 4f9dace1de Merge pull request #50 from soxoj/favicons
Favicons added to sites list
2021-02-07 00:55:00 +03:00
Soxoj cdec320062 Ordered list format fixed 2021-02-07 00:53:05 +03:00
Soxoj 10426c07aa Favicons added to sites list 2021-02-07 00:43:42 +03:00
20 changed files with 5725 additions and 4877 deletions
+16
View File
@@ -2,6 +2,22 @@
## [Unreleased] ## [Unreleased]
## [0.1.15] - 2021-03-14
* improved HTML reports
* fixed python-3.6- specific error
* false positives fixes
## [0.1.14] - 2021-02-25
* added JSON export formats
* improved tags markup
* realized username detection in userinfo links
* added DB stats CLI option
* added site submit logic and CLI option
* added Spotify parsing activation
* main logic refactoring
* fixed Dockerfile
* fixed requirements
## [0.1.13] - 2021-02-06 ## [0.1.13] - 2021-02-06
* improved sites list filtering * improved sites list filtering
* pretty console messages * pretty console messages
+1
View File
@@ -13,6 +13,7 @@ RUN pip install --upgrade pip \
libxml2 \ libxml2 \
libxml2-dev \ libxml2-dev \
libxslt-dev \ libxslt-dev \
jpeg-dev \
&& YARL_NO_EXTENSIONS=1 python3 -m pip install maigret \ && YARL_NO_EXTENSIONS=1 python3 -m pip install maigret \
&& apk del .build-dependencies \ && apk del .build-dependencies \
&& rm -rf /var/cache/apk/* \ && rm -rf /var/cache/apk/* \
-4
View File
@@ -72,10 +72,6 @@ docker run maigret user
[PDF report](./static/report_alexaimephotographycars.pdf), [HTML report](https://htmlpreview.github.io/?https://raw.githubusercontent.com/soxoj/maigret/main/static/report_alexaimephotographycars.html) [PDF report](./static/report_alexaimephotographycars.pdf), [HTML report](https://htmlpreview.github.io/?https://raw.githubusercontent.com/soxoj/maigret/main/static/report_alexaimephotographycars.html)
```bash
maigret alexaimephotographycars
```
![animation of recursive search](./static/recursive_search.svg) ![animation of recursive search](./static/recursive_search.svg)
![HTML report screenshot](./static/report_alexaimephotography_html_screenshot.png) ![HTML report screenshot](./static/report_alexaimephotography_html_screenshot.png)
+9
View File
@@ -27,6 +27,15 @@ class ParsingActivator:
jwt_token = r.json()['jwt'] jwt_token = r.json()['jwt']
site.headers['Authorization'] = 'jwt ' + jwt_token site.headers['Authorization'] = 'jwt ' + jwt_token
@staticmethod
def spotify(site, logger, cookies={}):
headers = dict(site.headers)
if 'Authorization' in headers:
del headers['Authorization']
r = requests.get(site.activation['url'])
bearer_token = r.json()['accessToken']
site.headers['authorization'] = f'Bearer {bearer_token}'
@staticmethod @staticmethod
def xssis(site, logger, cookies={}): def xssis(site, logger, cookies={}):
if not cookies: if not cookies:
+610
View File
@@ -0,0 +1,610 @@
import asyncio
import logging
import re
import ssl
import sys
import aiohttp
import tqdm.asyncio
from aiohttp_socks import ProxyConnector
from mock import Mock
from python_socks import _errors as proxy_errors
from socid_extractor import extract
from .activation import ParsingActivator, import_aiohttp_cookies
from .result import QueryResult, QueryStatus
from .sites import MaigretDatabase, MaigretSite
supported_recursive_search_ids = (
'yandex_public_id',
'gaia_id',
'vk_id',
'ok_id',
'wikimapia_uid',
'steam_id',
)
common_errors = {
'<title>Attention Required! | Cloudflare</title>': 'Cloudflare captcha',
'Please stand by, while we are checking your browser': 'Cloudflare captcha',
'<title>Доступ ограничен</title>': 'Rostelecom censorship',
'document.getElementById(\'validate_form_submit\').disabled=true': 'Mail.ru captcha',
'Verifying your browser, please wait...<br>DDoS Protection by</font> Blazingfast.io': 'Blazingfast protection',
'404</h1><p class="error-card__description">Мы&nbsp;не&nbsp;нашли страницу': 'MegaFon 404 page',
'Доступ к информационному ресурсу ограничен на основании Федерального закона': 'MGTS censorship',
'Incapsula incident ID': 'Incapsula antibot protection',
}
unsupported_characters = '#'
async def get_response(request_future, site_name, logger):
html_text = None
status_code = 0
error_text = "General Unknown Error"
expection_text = None
try:
response = await request_future
status_code = response.status
response_content = await response.content.read()
charset = response.charset or 'utf-8'
decoded_content = response_content.decode(charset, 'ignore')
html_text = decoded_content
if status_code > 0:
error_text = None
logger.debug(html_text)
except asyncio.TimeoutError as errt:
error_text = "Timeout Error"
expection_text = str(errt)
except aiohttp.client_exceptions.ClientConnectorError as err:
error_text = "Error Connecting"
expection_text = str(err)
except aiohttp.http_exceptions.BadHttpMessage as err:
error_text = "HTTP Error"
expection_text = str(err)
except proxy_errors.ProxyError as err:
error_text = "Proxy Error"
expection_text = str(err)
except Exception as err:
# python-specific exceptions
if sys.version_info.minor > 6:
if isinstance(err, ssl.SSLCertVerificationError) or isinstance(err, ssl.SSLError):
error_text = "SSL Error"
expection_text = str(err)
else:
logger.warning(f'Unhandled error while requesting {site_name}: {err}')
logger.debug(err, exc_info=True)
error_text = "Some Error"
expection_text = str(err)
# TODO: return only needed information
return html_text, status_code, error_text, expection_text
async def update_site_dict_from_response(sitename, site_dict, results_info, semaphore, logger, query_notify):
async with semaphore:
site_obj = site_dict[sitename]
future = site_obj.request_future
if not future:
# ignore: search by incompatible id type
return
response = await get_response(request_future=future,
site_name=sitename,
logger=logger)
site_dict[sitename] = process_site_result(response, query_notify, logger, results_info, site_obj)
# TODO: move to separate class
def detect_error_page(html_text, status_code, fail_flags, ignore_403):
# Detect service restrictions such as a country restriction
for flag, msg in fail_flags.items():
if flag in html_text:
return 'Some site error', msg
# Detect common restrictions such as provider censorship and bot protection
for flag, msg in common_errors.items():
if flag in html_text:
return 'Error', msg
# Detect common site errors
if status_code == 403 and not ignore_403:
return 'Access denied', 'Access denied, use proxy/vpn'
elif status_code >= 500:
return f'Error {status_code}', f'Site error {status_code}'
return None, None
def process_site_result(response, query_notify, logger, results_info, site: MaigretSite):
if not response:
return results_info
fulltags = site.tags
# Retrieve other site information again
username = results_info['username']
is_parsing_enabled = results_info['parsing_enabled']
url = results_info.get("url_user")
logger.debug(url)
status = results_info.get("status")
if status is not None:
# We have already determined the user doesn't exist here
return results_info
# Get the expected check type
check_type = site.check_type
# Get the failure messages and comments
failure_errors = site.errors
# TODO: refactor
if not response:
logger.error(f'No response for {site.name}')
return results_info
html_text, status_code, error_text, expection_text = response
site_error_text = '?'
# TODO: add elapsed request time counting
response_time = None
if logger.level == logging.DEBUG:
with open('debug.txt', 'a') as f:
status = status_code or 'No response'
f.write(f'url: {url}\nerror: {str(error_text)}\nr: {status}\n')
if html_text:
f.write(f'code: {status}\nresponse: {str(html_text)}\n')
if status_code and not error_text:
error_text, site_error_text = detect_error_page(html_text, status_code, failure_errors,
site.ignore_403)
if site.activation and html_text:
is_need_activation = any([s for s in site.activation['marks'] if s in html_text])
if is_need_activation:
method = site.activation['method']
try:
activate_fun = getattr(ParsingActivator(), method)
# TODO: async call
activate_fun(site, logger)
except AttributeError:
logger.warning(f'Activation method {method} for site {site.name} not found!')
except Exception as e:
logger.warning(f'Failed activation {method} for site {site.name}: {e}')
# presense flags
# True by default
presense_flags = site.presense_strs
is_presense_detected = False
if html_text:
if not presense_flags:
is_presense_detected = True
site.stats['presense_flag'] = None
else:
for presense_flag in presense_flags:
if presense_flag in html_text:
is_presense_detected = True
site.stats['presense_flag'] = presense_flag
logger.info(presense_flag)
break
if error_text is not None:
logger.debug(error_text)
result = QueryResult(username,
site.name,
url,
QueryStatus.UNKNOWN,
query_time=response_time,
context=f'{error_text}: {site_error_text}', tags=fulltags)
elif check_type == "message":
absence_flags = site.absence_strs
is_absence_flags_list = isinstance(absence_flags, list)
absence_flags_set = set(absence_flags) if is_absence_flags_list else {absence_flags}
# Checks if the error message is in the HTML
is_absence_detected = any([(absence_flag in html_text) for absence_flag in absence_flags_set])
if not is_absence_detected and is_presense_detected:
result = QueryResult(username,
site.name,
url,
QueryStatus.CLAIMED,
query_time=response_time, tags=fulltags)
else:
result = QueryResult(username,
site.name,
url,
QueryStatus.AVAILABLE,
query_time=response_time, tags=fulltags)
elif check_type == "status_code":
# Checks if the status code of the response is 2XX
if (not status_code >= 300 or status_code < 200) and is_presense_detected:
result = QueryResult(username,
site.name,
url,
QueryStatus.CLAIMED,
query_time=response_time, tags=fulltags)
else:
result = QueryResult(username,
site.name,
url,
QueryStatus.AVAILABLE,
query_time=response_time, tags=fulltags)
elif check_type == "response_url":
# For this detection method, we have turned off the redirect.
# So, there is no need to check the response URL: it will always
# match the request. Instead, we will ensure that the response
# code indicates that the request was successful (i.e. no 404, or
# forward to some odd redirect).
if 200 <= status_code < 300 and is_presense_detected:
result = QueryResult(username,
site.name,
url,
QueryStatus.CLAIMED,
query_time=response_time, tags=fulltags)
else:
result = QueryResult(username,
site.name,
url,
QueryStatus.AVAILABLE,
query_time=response_time, tags=fulltags)
else:
# It should be impossible to ever get here...
raise ValueError(f"Unknown check type '{check_type}' for "
f"site '{site.name}'")
extracted_ids_data = {}
if is_parsing_enabled and result.status == QueryStatus.CLAIMED:
try:
extracted_ids_data = extract(html_text)
except Exception as e:
logger.warning(f'Error while parsing {site.name}: {e}', exc_info=True)
if extracted_ids_data:
new_usernames = {}
for k, v in extracted_ids_data.items():
if 'username' in k:
new_usernames[v] = 'username'
if k in supported_recursive_search_ids:
new_usernames[v] = k
results_info['ids_usernames'] = new_usernames
results_info['ids_links'] = eval(extracted_ids_data.get('links', '[]'))
result.ids_data = extracted_ids_data
# Notify caller about results of query.
query_notify.update(result, site.similar_search)
# Save status of request
results_info['status'] = result
# Save results from request
results_info['http_status'] = status_code
results_info['is_similar'] = site.similar_search
# results_site['response_text'] = html_text
results_info['rank'] = site.alexa_rank
return results_info
async def maigret(username, site_dict, query_notify, logger,
proxy=None, timeout=None, is_parsing_enabled=False,
id_type='username', debug=False, forced=False,
max_connections=100, no_progressbar=False,
cookies=None):
"""Main search func
Checks for existence of username on various social media sites.
Keyword Arguments:
username -- String indicating username that report
should be created against.
site_dict -- Dictionary containing all of the site data.
query_notify -- Object with base type of QueryNotify().
This will be used to notify the caller about
query results.
proxy -- String indicating the proxy URL
timeout -- Time in seconds to wait before timing out request.
Default is no timeout.
is_parsing_enabled -- Search for other usernames in website pages.
Return Value:
Dictionary containing results from report. Key of dictionary is the name
of the social network site, and the value is another dictionary with
the following keys:
url_main: URL of main site.
url_user: URL of user on site (if account exists).
status: QueryResult() object indicating results of test for
account existence.
http_status: HTTP status code of query which checked for existence on
site.
response_text: Text that came back from request. May be None if
there was an HTTP error when checking for existence.
"""
# Notify caller that we are starting the query.
query_notify.start(username, id_type)
# TODO: connector
connector = ProxyConnector.from_url(proxy) if proxy else aiohttp.TCPConnector(ssl=False)
# connector = aiohttp.TCPConnector(ssl=False)
connector.verify_ssl = False
cookie_jar = None
if cookies:
logger.debug(f'Using cookies jar file {cookies}')
cookie_jar = await import_aiohttp_cookies(cookies)
session = aiohttp.ClientSession(connector=connector, trust_env=True, cookie_jar=cookie_jar)
if logger.level == logging.DEBUG:
future = session.get(url='https://icanhazip.com')
ip, status, error, expection = await get_response(future, None, logger)
if ip:
logger.debug(f'My IP is: {ip.strip()}')
else:
logger.debug(f'IP requesting {error}: {expection}')
# Results from analysis of all sites
results_total = {}
# First create futures for all requests. This allows for the requests to run in parallel
for site_name, site in site_dict.items():
if site.type != id_type:
continue
if site.disabled and not forced:
logger.debug(f'Site {site.name} is disabled, skipping...')
continue
# Results from analysis of this specific site
results_site = {}
# Record URL of main site and username
results_site['username'] = username
results_site['parsing_enabled'] = is_parsing_enabled
results_site['url_main'] = site.url_main
results_site['cookies'] = cookie_jar and cookie_jar.filter_cookies(site.url_main) or None
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11.1; rv:55.0) Gecko/20100101 Firefox/55.0',
}
headers.update(site.headers)
if not 'url' in site.__dict__:
logger.error('No URL for site %s', site.name)
# URL of user on site (if it exists)
url = site.url.format(
urlMain=site.url_main,
urlSubpath=site.url_subpath,
username=username
)
# workaround to prevent slash errors
url = re.sub('(?<!:)/+', '/', url)
# Don't make request if username is invalid for the site
if site.regex_check and re.search(site.regex_check, username) is None:
# No need to do the check at the site: this user name is not allowed.
results_site['status'] = QueryResult(username,
site_name,
url,
QueryStatus.ILLEGAL)
results_site["url_user"] = ""
results_site['http_status'] = ""
results_site['response_text'] = ""
query_notify.update(results_site['status'])
else:
# URL of user on site (if it exists)
results_site["url_user"] = url
url_probe = site.url_probe
if url_probe is None:
# Probe URL is normal one seen by people out on the web.
url_probe = url
else:
# There is a special URL for probing existence separate
# from where the user profile normally can be found.
url_probe = url_probe.format(
urlMain=site.url_main,
urlSubpath=site.url_subpath,
username=username,
)
for k, v in site.get_params.items():
url_probe += f'&{k}={v}'
if site.check_type == 'status_code' and site.request_head_only:
# In most cases when we are detecting by status code,
# it is not necessary to get the entire body: we can
# detect fine with just the HEAD response.
request_method = session.head
else:
# Either this detect method needs the content associated
# with the GET response, or this specific website will
# not respond properly unless we request the whole page.
request_method = session.get
if site.check_type == "response_url":
# Site forwards request to a different URL if username not
# found. Disallow the redirect so we can capture the
# http status from the original URL request.
allow_redirects = False
else:
# Allow whatever redirect that the site wants to do.
# The final result of the request will be what is available.
allow_redirects = True
future = request_method(url=url_probe, headers=headers,
allow_redirects=allow_redirects,
timeout=timeout,
)
# Store future in data for access later
# TODO: move to separate obj
site.request_future = future
# Add this site's results into final dictionary with all of the other results.
results_total[site_name] = results_site
# TODO: move into top-level function
sem = asyncio.Semaphore(max_connections)
tasks = []
for sitename, result_obj in results_total.items():
update_site_coro = update_site_dict_from_response(sitename, site_dict, result_obj, sem, logger, query_notify)
future = asyncio.ensure_future(update_site_coro)
tasks.append(future)
if no_progressbar:
await asyncio.gather(*tasks)
else:
for f in tqdm.asyncio.tqdm.as_completed(tasks):
await f
await session.close()
# Notify caller that all queries are finished.
query_notify.finish()
return results_total
def timeout_check(value):
"""Check Timeout Argument.
Checks timeout for validity.
Keyword Arguments:
value -- Time in seconds to wait before timing out request.
Return Value:
Floating point number representing the time (in seconds) that should be
used for the timeout.
NOTE: Will raise an exception if the timeout in invalid.
"""
from argparse import ArgumentTypeError
try:
timeout = float(value)
except ValueError:
raise ArgumentTypeError(f"Timeout '{value}' must be a number.")
if timeout <= 0:
raise ArgumentTypeError(f"Timeout '{value}' must be greater than 0.0s.")
return timeout
async def site_self_check(site, logger, semaphore, db: MaigretDatabase, silent=False):
query_notify = Mock()
changes = {
'disabled': False,
}
try:
check_data = [
(site.username_claimed, QueryStatus.CLAIMED),
(site.username_unclaimed, QueryStatus.AVAILABLE),
]
except Exception as e:
logger.error(e)
logger.error(site.__dict__)
check_data = []
logger.info(f'Checking {site.name}...')
for username, status in check_data:
async with semaphore:
results_dict = await maigret(
username,
{site.name: site},
query_notify,
logger,
timeout=30,
id_type=site.type,
forced=True,
no_progressbar=True,
)
# don't disable entries with other ids types
# TODO: make normal checking
if site.name not in results_dict:
logger.info(results_dict)
changes['disabled'] = True
continue
result = results_dict[site.name]['status']
site_status = result.status
if site_status != status:
if site_status == QueryStatus.UNKNOWN:
msgs = site.absence_strs
etype = site.check_type
logger.warning(
f'Error while searching {username} in {site.name}: {result.context}, {msgs}, type {etype}')
# don't disable in case of available username
if status == QueryStatus.CLAIMED:
changes['disabled'] = True
elif status == QueryStatus.CLAIMED:
logger.warning(f'Not found `{username}` in {site.name}, must be claimed')
logger.info(results_dict[site.name])
changes['disabled'] = True
else:
logger.warning(f'Found `{username}` in {site.name}, must be available')
logger.info(results_dict[site.name])
changes['disabled'] = True
logger.info(f'Site {site.name} checking is finished')
if changes['disabled'] != site.disabled:
site.disabled = changes['disabled']
db.update_site(site)
if not silent:
action = 'Disabled' if site.disabled else 'Enabled'
print(f'{action} site {site.name}...')
return changes
async def self_check(db: MaigretDatabase, site_data: dict, logger, silent=False,
max_connections=10) -> bool:
sem = asyncio.Semaphore(max_connections)
tasks = []
all_sites = site_data
def disabled_count(lst):
return len(list(filter(lambda x: x.disabled, lst)))
disabled_old_count = disabled_count(all_sites.values())
for _, site in all_sites.items():
check_coro = site_self_check(site, logger, sem, db, silent)
future = asyncio.ensure_future(check_coro)
tasks.append(future)
for f in tqdm.asyncio.tqdm.as_completed(tasks):
await f
disabled_new_count = disabled_count(all_sites.values())
total_disabled = disabled_new_count - disabled_old_count
if total_disabled >= 0:
message = 'Disabled'
else:
message = 'Enabled'
total_disabled *= -1
if not silent:
print(
f'{message} {total_disabled} ({disabled_old_count} => {disabled_new_count}) checked sites. Run with `--info` flag to get more information')
return total_disabled != 0
+73 -614
View File
@@ -2,615 +2,22 @@
Maigret main module Maigret main module
""" """
import asyncio
import logging
import os import os
import platform import platform
import re
import ssl
import sys import sys
from argparse import ArgumentParser, RawDescriptionHelpFormatter from argparse import ArgumentParser, RawDescriptionHelpFormatter
import aiohttp
import requests import requests
import tqdm.asyncio from socid_extractor import parse, __version__ as socid_version
from aiohttp_socks import ProxyConnector
from mock import Mock
from python_socks import _errors as proxy_errors
from socid_extractor import parse, extract, __version__ as socid_version
from .activation import ParsingActivator, import_aiohttp_cookies from .checking import *
from .notify import QueryNotifyPrint from .notify import QueryNotifyPrint
from .report import save_csv_report, save_xmind_report, save_html_report, save_pdf_report, \ from .report import save_csv_report, save_xmind_report, save_html_report, save_pdf_report, \
generate_report_context, save_txt_report generate_report_context, save_txt_report, SUPPORTED_JSON_REPORT_FORMATS, check_supported_json_format, \
from .result import QueryResult, QueryStatus save_json_report
from .sites import MaigretDatabase, MaigretSite from .submit import submit_dialog
__version__ = '0.1.13' __version__ = '0.1.15'
supported_recursive_search_ids = (
'yandex_public_id',
'gaia_id',
'vk_id',
'ok_id',
'wikimapia_uid',
)
common_errors = {
'<title>Attention Required! | Cloudflare</title>': 'Cloudflare captcha',
'Please stand by, while we are checking your browser': 'Cloudflare captcha',
'<title>Доступ ограничен</title>': 'Rostelecom censorship',
'document.getElementById(\'validate_form_submit\').disabled=true': 'Mail.ru captcha',
'Verifying your browser, please wait...<br>DDoS Protection by</font> Blazingfast.io': 'Blazingfast protection',
'404</h1><p class="error-card__description">Мы&nbsp;не&nbsp;нашли страницу': 'MegaFon 404 page',
'Доступ к информационному ресурсу ограничен на основании Федерального закона': 'MGTS censorship',
'Incapsula incident ID': 'Incapsula antibot protection',
}
unsupported_characters = '#'
async def get_response(request_future, site_name, logger):
html_text = None
status_code = 0
error_text = "General Unknown Error"
expection_text = None
try:
response = await request_future
status_code = response.status
response_content = await response.content.read()
charset = response.charset or 'utf-8'
decoded_content = response_content.decode(charset, 'ignore')
html_text = decoded_content
if status_code > 0:
error_text = None
logger.debug(html_text)
except asyncio.TimeoutError as errt:
error_text = "Timeout Error"
expection_text = str(errt)
except (ssl.SSLCertVerificationError, ssl.SSLError) as err:
error_text = "SSL Error"
expection_text = str(err)
except aiohttp.client_exceptions.ClientConnectorError as err:
error_text = "Error Connecting"
expection_text = str(err)
except aiohttp.http_exceptions.BadHttpMessage as err:
error_text = "HTTP Error"
expection_text = str(err)
except proxy_errors.ProxyError as err:
error_text = "Proxy Error"
expection_text = str(err)
except Exception as err:
logger.warning(f'Unhandled error while requesting {site_name}: {err}')
logger.debug(err, exc_info=True)
error_text = "Some Error"
expection_text = str(err)
# TODO: return only needed information
return html_text, status_code, error_text, expection_text
async def update_site_dict_from_response(sitename, site_dict, results_info, semaphore, logger, query_notify):
async with semaphore:
site_obj = site_dict[sitename]
future = site_obj.request_future
if not future:
# ignore: search by incompatible id type
return
response = await get_response(request_future=future,
site_name=sitename,
logger=logger)
site_dict[sitename] = process_site_result(response, query_notify, logger, results_info, site_obj)
# TODO: move info separate module
def detect_error_page(html_text, status_code, fail_flags, ignore_403):
# Detect service restrictions such as a country restriction
for flag, msg in fail_flags.items():
if flag in html_text:
return 'Some site error', msg
# Detect common restrictions such as provider censorship and bot protection
for flag, msg in common_errors.items():
if flag in html_text:
return 'Error', msg
# Detect common site errors
if status_code == 403 and not ignore_403:
return 'Access denied', 'Access denied, use proxy/vpn'
elif status_code >= 500:
return f'Error {status_code}', f'Site error {status_code}'
return None, None
def process_site_result(response, query_notify, logger, results_info, site: MaigretSite):
if not response:
return results_info
fulltags = site.tags
# Retrieve other site information again
username = results_info['username']
is_parsing_enabled = results_info['parsing_enabled']
url = results_info.get("url_user")
logger.debug(url)
status = results_info.get("status")
if status is not None:
# We have already determined the user doesn't exist here
return results_info
# Get the expected check type
check_type = site.check_type
# Get the failure messages and comments
failure_errors = site.errors
# TODO: refactor
if not response:
logger.error(f'No response for {site.name}')
return results_info
html_text, status_code, error_text, expection_text = response
site_error_text = '?'
# TODO: add elapsed request time counting
response_time = None
if logger.level == logging.DEBUG:
with open('debug.txt', 'a') as f:
status = status_code or 'No response'
f.write(f'url: {url}\nerror: {str(error_text)}\nr: {status}\n')
if html_text:
f.write(f'code: {status}\nresponse: {str(html_text)}\n')
if status_code and not error_text:
error_text, site_error_text = detect_error_page(html_text, status_code, failure_errors,
site.ignore_403)
if site.activation and html_text:
is_need_activation = any([s for s in site.activation['marks'] if s in html_text])
if is_need_activation:
method = site.activation['method']
try:
activate_fun = getattr(ParsingActivator(), method)
# TODO: async call
activate_fun(site, logger)
except AttributeError:
logger.warning(f'Activation method {method} for site {site.name} not found!')
# presense flags
# True by default
presense_flags = site.presense_strs
is_presense_detected = False
if html_text:
if not presense_flags:
is_presense_detected = True
site.stats['presense_flag'] = None
else:
for presense_flag in presense_flags:
if presense_flag in html_text:
is_presense_detected = True
site.stats['presense_flag'] = presense_flag
logger.info(presense_flag)
break
if error_text is not None:
logger.debug(error_text)
result = QueryResult(username,
site.name,
url,
QueryStatus.UNKNOWN,
query_time=response_time,
context=f'{error_text}: {site_error_text}', tags=fulltags)
elif check_type == "message":
absence_flags = site.absence_strs
is_absence_flags_list = isinstance(absence_flags, list)
absence_flags_set = set(absence_flags) if is_absence_flags_list else {absence_flags}
# Checks if the error message is in the HTML
is_absence_detected = any([(absence_flag in html_text) for absence_flag in absence_flags_set])
if not is_absence_detected and is_presense_detected:
result = QueryResult(username,
site.name,
url,
QueryStatus.CLAIMED,
query_time=response_time, tags=fulltags)
else:
result = QueryResult(username,
site.name,
url,
QueryStatus.AVAILABLE,
query_time=response_time, tags=fulltags)
elif check_type == "status_code":
# Checks if the status code of the response is 2XX
if (not status_code >= 300 or status_code < 200) and is_presense_detected:
result = QueryResult(username,
site.name,
url,
QueryStatus.CLAIMED,
query_time=response_time, tags=fulltags)
else:
result = QueryResult(username,
site.name,
url,
QueryStatus.AVAILABLE,
query_time=response_time, tags=fulltags)
elif check_type == "response_url":
# For this detection method, we have turned off the redirect.
# So, there is no need to check the response URL: it will always
# match the request. Instead, we will ensure that the response
# code indicates that the request was successful (i.e. no 404, or
# forward to some odd redirect).
if 200 <= status_code < 300 and is_presense_detected:
result = QueryResult(username,
site.name,
url,
QueryStatus.CLAIMED,
query_time=response_time, tags=fulltags)
else:
result = QueryResult(username,
site.name,
url,
QueryStatus.AVAILABLE,
query_time=response_time, tags=fulltags)
else:
# It should be impossible to ever get here...
raise ValueError(f"Unknown check type '{check_type}' for "
f"site '{site.name}'")
extracted_ids_data = {}
if is_parsing_enabled and result.status == QueryStatus.CLAIMED:
try:
extracted_ids_data = extract(html_text)
except Exception as e:
logger.warning(f'Error while parsing {site.name}: {e}', exc_info=True)
if extracted_ids_data:
new_usernames = {}
for k, v in extracted_ids_data.items():
if 'username' in k:
new_usernames[v] = 'username'
if k in supported_recursive_search_ids:
new_usernames[v] = k
results_info['ids_usernames'] = new_usernames
result.ids_data = extracted_ids_data
# Notify caller about results of query.
query_notify.update(result, site.similar_search)
# Save status of request
results_info['status'] = result
# Save results from request
results_info['http_status'] = status_code
results_info['is_similar'] = site.similar_search
# results_site['response_text'] = html_text
results_info['rank'] = site.alexa_rank
return results_info
async def maigret(username, site_dict, query_notify, logger,
proxy=None, timeout=None, recursive_search=False,
id_type='username', debug=False, forced=False,
max_connections=100, no_progressbar=False,
cookies=None):
"""Main search func
Checks for existence of username on various social media sites.
Keyword Arguments:
username -- String indicating username that report
should be created against.
site_dict -- Dictionary containing all of the site data.
query_notify -- Object with base type of QueryNotify().
This will be used to notify the caller about
query results.
proxy -- String indicating the proxy URL
timeout -- Time in seconds to wait before timing out request.
Default is no timeout.
recursive_search -- Search for other usernames in website pages & recursive search by them.
Return Value:
Dictionary containing results from report. Key of dictionary is the name
of the social network site, and the value is another dictionary with
the following keys:
url_main: URL of main site.
url_user: URL of user on site (if account exists).
status: QueryResult() object indicating results of test for
account existence.
http_status: HTTP status code of query which checked for existence on
site.
response_text: Text that came back from request. May be None if
there was an HTTP error when checking for existence.
"""
# Notify caller that we are starting the query.
query_notify.start(username, id_type)
# TODO: connector
connector = ProxyConnector.from_url(proxy) if proxy else aiohttp.TCPConnector(ssl=False)
# connector = aiohttp.TCPConnector(ssl=False)
connector.verify_ssl=False
cookie_jar = None
if cookies:
cookie_jar = await import_aiohttp_cookies(cookies)
session = aiohttp.ClientSession(connector=connector, trust_env=True, cookie_jar=cookie_jar)
if logger.level == logging.DEBUG:
future = session.get(url='https://icanhazip.com')
ip, status, error, expection = await get_response(future, None, logger)
if ip:
logger.debug(f'My IP is: {ip.strip()}')
else:
logger.debug(f'IP requesting {error}: {expection}')
# Results from analysis of all sites
results_total = {}
# First create futures for all requests. This allows for the requests to run in parallel
for site_name, site in site_dict.items():
if site.type != id_type:
continue
if site.disabled and not forced:
logger.debug(f'Site {site.name} is disabled, skipping...')
continue
# Results from analysis of this specific site
results_site = {}
# Record URL of main site and username
results_site['username'] = username
results_site['parsing_enabled'] = recursive_search
results_site['url_main'] = site.url_main
results_site['cookies'] = cookie_jar and cookie_jar.filter_cookies(site.url_main) or None
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11.1; rv:55.0) Gecko/20100101 Firefox/55.0',
}
headers.update(site.headers)
if not 'url' in site.__dict__:
logger.error('No URL for site %s', site.name)
# URL of user on site (if it exists)
url = site.url.format(
urlMain=site.url_main,
urlSubpath=site.url_subpath,
username=username
)
# workaround to prevent slash errors
url = re.sub('(?<!:)/+', '/', url)
# Don't make request if username is invalid for the site
if site.regex_check and re.search(site.regex_check, username) is None:
# No need to do the check at the site: this user name is not allowed.
results_site['status'] = QueryResult(username,
site_name,
url,
QueryStatus.ILLEGAL)
results_site["url_user"] = ""
results_site['http_status'] = ""
results_site['response_text'] = ""
query_notify.update(results_site['status'])
else:
# URL of user on site (if it exists)
results_site["url_user"] = url
url_probe = site.url_probe
if url_probe is None:
# Probe URL is normal one seen by people out on the web.
url_probe = url
else:
# There is a special URL for probing existence separate
# from where the user profile normally can be found.
url_probe = url_probe.format(
urlMain=site.url_main,
urlSubpath=site.url_subpath,
username=username,
)
for k, v in site.get_params.items():
url_probe += f'&{k}={v}'
if site.check_type == 'status_code' and site.request_head_only:
# In most cases when we are detecting by status code,
# it is not necessary to get the entire body: we can
# detect fine with just the HEAD response.
request_method = session.head
else:
# Either this detect method needs the content associated
# with the GET response, or this specific website will
# not respond properly unless we request the whole page.
request_method = session.get
if site.check_type == "response_url":
# Site forwards request to a different URL if username not
# found. Disallow the redirect so we can capture the
# http status from the original URL request.
allow_redirects = False
else:
# Allow whatever redirect that the site wants to do.
# The final result of the request will be what is available.
allow_redirects = True
future = request_method(url=url_probe, headers=headers,
allow_redirects=allow_redirects,
timeout=timeout,
)
# Store future in data for access later
# TODO: move to separate obj
site.request_future = future
# Add this site's results into final dictionary with all of the other results.
results_total[site_name] = results_site
# TODO: move into top-level function
sem = asyncio.Semaphore(max_connections)
tasks = []
for sitename, result_obj in results_total.items():
update_site_coro = update_site_dict_from_response(sitename, site_dict, result_obj, sem, logger, query_notify)
future = asyncio.ensure_future(update_site_coro)
tasks.append(future)
if no_progressbar:
await asyncio.gather(*tasks)
else:
for f in tqdm.asyncio.tqdm.as_completed(tasks):
await f
await session.close()
# Notify caller that all queries are finished.
query_notify.finish()
return results_total
def timeout_check(value):
"""Check Timeout Argument.
Checks timeout for validity.
Keyword Arguments:
value -- Time in seconds to wait before timing out request.
Return Value:
Floating point number representing the time (in seconds) that should be
used for the timeout.
NOTE: Will raise an exception if the timeout in invalid.
"""
from argparse import ArgumentTypeError
try:
timeout = float(value)
except ValueError:
raise ArgumentTypeError(f"Timeout '{value}' must be a number.")
if timeout <= 0:
raise ArgumentTypeError(f"Timeout '{value}' must be greater than 0.0s.")
return timeout
async def site_self_check(site, logger, semaphore, db: MaigretDatabase, silent=False):
query_notify = Mock()
changes = {
'disabled': False,
}
try:
check_data = [
(site.username_claimed, QueryStatus.CLAIMED),
(site.username_unclaimed, QueryStatus.AVAILABLE),
]
except:
print(site.__dict__)
logger.info(f'Checking {site.name}...')
for username, status in check_data:
async with semaphore:
results_dict = await maigret(
username,
{site.name: site},
query_notify,
logger,
timeout=30,
id_type=site.type,
forced=True,
no_progressbar=True,
)
# don't disable entries with other ids types
# TODO: make normal checking
if site.name not in results_dict:
logger.info(results_dict)
changes['disabled'] = True
continue
result = results_dict[site.name]['status']
site_status = result.status
if site_status != status:
if site_status == QueryStatus.UNKNOWN:
msgs = site.absence_strs
etype = site.check_type
logger.warning(f'Error while searching {username} in {site.name}: {result.context}, {msgs}, type {etype}')
# don't disable in case of available username
if status == QueryStatus.CLAIMED:
changes['disabled'] = True
elif status == QueryStatus.CLAIMED:
logger.warning(f'Not found `{username}` in {site.name}, must be claimed')
logger.info(results_dict[site.name])
changes['disabled'] = True
else:
logger.warning(f'Found `{username}` in {site.name}, must be available')
logger.info(results_dict[site.name])
changes['disabled'] = True
logger.info(f'Site {site.name} checking is finished')
if changes['disabled'] != site.disabled:
site.disabled = changes['disabled']
db.update_site(site)
if not silent:
action = 'Disabled' if site.disabled else 'Enabled'
print(f'{action} site {site.name}...')
return changes
async def self_check(db: MaigretDatabase, site_data: dict, logger, silent=False,
max_connections=10) -> bool:
sem = asyncio.Semaphore(max_connections)
tasks = []
all_sites = site_data
def disabled_count(lst):
return len(list(filter(lambda x: x.disabled, lst)))
disabled_old_count = disabled_count(all_sites.values())
for _, site in all_sites.items():
check_coro = site_self_check(site, logger, sem, db, silent)
future = asyncio.ensure_future(check_coro)
tasks.append(future)
for f in tqdm.asyncio.tqdm.as_completed(tasks):
await f
disabled_new_count = disabled_count(all_sites.values())
total_disabled = disabled_new_count - disabled_old_count
if total_disabled >= 0:
message = 'Disabled'
else:
message = 'Enabled'
total_disabled *= -1
if not silent:
print(f'{message} {total_disabled} ({disabled_old_count} => {disabled_new_count}) checked sites. Run with `--info` flag to get more information')
return total_disabled != 0
async def main(): async def main():
@@ -650,9 +57,9 @@ async def main():
action="store", dest="proxy", default=None, action="store", dest="proxy", default=None,
help="Make requests over a proxy. e.g. socks5://127.0.0.1:1080" help="Make requests over a proxy. e.g. socks5://127.0.0.1:1080"
) )
parser.add_argument("--json", "-j", metavar="JSON_FILE", parser.add_argument("--db", metavar="DB_FILE",
dest="json_file", default=None, dest="db_file", default=None,
help="Load data from a JSON file or an online, valid, JSON file.") help="Load Maigret database from a JSON file or an online, valid, JSON file.")
parser.add_argument("--cookies-jar-file", metavar="COOKIE_FILE", parser.add_argument("--cookies-jar-file", metavar="COOKIE_FILE",
dest="cookie_file", default=None, dest="cookie_file", default=None,
help="File with cookies.") help="File with cookies.")
@@ -660,7 +67,7 @@ async def main():
action="store", metavar='TIMEOUT', action="store", metavar='TIMEOUT',
dest="timeout", type=timeout_check, default=10, dest="timeout", type=timeout_check, default=10,
help="Time (in seconds) to wait for response to requests." help="Time (in seconds) to wait for response to requests."
"Default timeout of 10.0s." "Default timeout of 10.0s. "
"A longer timeout will be more likely to get results from slow sites." "A longer timeout will be more likely to get results from slow sites."
"On the other hand, this may cause a long delay to gather all results." "On the other hand, this may cause a long delay to gather all results."
) )
@@ -685,6 +92,10 @@ async def main():
action="store_true", dest="print_check_errors", default=False, action="store_true", dest="print_check_errors", default=False,
help="Print errors messages: connection, captcha, site country ban, etc." help="Print errors messages: connection, captcha, site country ban, etc."
) )
parser.add_argument("--submit", metavar='EXISTING_USER_URL',
type=str, dest="new_site_to_submit", default=False,
help="URL of existing profile in new site to submit."
)
parser.add_argument("--no-color", parser.add_argument("--no-color",
action="store_true", dest="no_color", default=False, action="store_true", dest="no_color", default=False,
help="Don't color terminal output" help="Don't color terminal output"
@@ -695,12 +106,20 @@ async def main():
) )
parser.add_argument("--no-recursion", parser.add_argument("--no-recursion",
action="store_true", dest="disable_recursive_search", default=False, action="store_true", dest="disable_recursive_search", default=False,
help="Disable parsing pages for other usernames and recursive search by them." help="Disable recursive search by additional data extracted from pages."
)
parser.add_argument("--no-extracting",
action="store_true", dest="disable_extracting", default=False,
help="Disable parsing pages for additional data and other usernames."
) )
parser.add_argument("--self-check", parser.add_argument("--self-check",
action="store_true", default=False, action="store_true", default=False,
help="Do self check for sites and database and disable non-working ones." help="Do self check for sites and database and disable non-working ones."
) )
parser.add_argument("--stats",
action="store_true", default=False,
help="Show database statistics."
)
parser.add_argument("--use-disabled-sites", parser.add_argument("--use-disabled-sites",
action="store_true", default=False, action="store_true", default=False,
help="Use disabled sites to search (may cause many false positives)." help="Use disabled sites to search (may cause many false positives)."
@@ -713,6 +132,11 @@ async def main():
dest="id_type", default='username', dest="id_type", default='username',
help="Specify identifier(s) type (default: username)." help="Specify identifier(s) type (default: username)."
) )
parser.add_argument("--ignore-ids",
action="append", metavar='IGNORED_IDS',
dest="ignore_ids_list", default=[],
help="Do not make search by the specified username or other ids."
)
parser.add_argument("username", parser.add_argument("username",
nargs='+', metavar='USERNAMES', nargs='+', metavar='USERNAMES',
action="store", action="store",
@@ -738,7 +162,7 @@ async def main():
action="store_true", dest="html", default=False, action="store_true", dest="html", default=False,
help="Create an HTML report file (general report on all usernames)." help="Create an HTML report file (general report on all usernames)."
) )
parser.add_argument("-X","--xmind", parser.add_argument("-X", "--xmind",
action="store_true", action="store_true",
dest="xmind", default=False, dest="xmind", default=False,
help="Generate an XMind 8 mindmap report (one report per username)." help="Generate an XMind 8 mindmap report (one report per username)."
@@ -748,6 +172,12 @@ async def main():
dest="pdf", default=False, dest="pdf", default=False,
help="Generate a PDF report (general report on all usernames)." help="Generate a PDF report (general report on all usernames)."
) )
parser.add_argument("-J", "--json",
action="store", metavar='REPORT_TYPE',
dest="json", default='', type=check_supported_json_format,
help=f"Generate a JSON report of specific type: {', '.join(SUPPORTED_JSON_REPORT_FORMATS)}"
" (one report per username)."
)
args = parser.parse_args() args = parser.parse_args()
@@ -774,8 +204,10 @@ async def main():
u: args.id_type u: args.id_type
for u in args.username for u in args.username
if u not in ['-'] if u not in ['-']
and u not in args.ignore_ids_list
} }
parsing_enabled = not args.disable_extracting
recursive_search_enabled = not args.disable_recursive_search recursive_search_enabled = not args.disable_recursive_search
# Make prompts # Make prompts
@@ -796,8 +228,8 @@ async def main():
if args.tags: if args.tags:
args.tags = list(set(str(args.tags).split(','))) args.tags = list(set(str(args.tags).split(',')))
if args.json_file is None: if args.db_file is None:
args.json_file = \ args.db_file = \
os.path.join(os.path.dirname(os.path.realpath(__file__)), os.path.join(os.path.dirname(os.path.realpath(__file__)),
"resources/data.json" "resources/data.json"
) )
@@ -813,24 +245,32 @@ async def main():
color=not args.no_color) color=not args.no_color)
# Create object with all information about sites we are aware of. # Create object with all information about sites we are aware of.
db = MaigretDatabase().load_from_file(args.json_file) db = MaigretDatabase().load_from_file(args.db_file)
get_top_sites_for_id = lambda x: db.ranked_sites_dict(top=args.top_sites, tags=args.tags, get_top_sites_for_id = lambda x: db.ranked_sites_dict(top=args.top_sites, tags=args.tags,
names=args.site_list, names=args.site_list,
disabled=False, id_type=x) disabled=False, id_type=x)
site_data = get_top_sites_for_id(args.id_type) site_data = get_top_sites_for_id(args.id_type)
if args.new_site_to_submit:
is_submitted = await submit_dialog(db, args.new_site_to_submit)
if is_submitted:
db.save_to_file(args.db_file)
# Database self-checking # Database self-checking
if args.self_check: if args.self_check:
print('Maigret sites database self-checking...') print('Maigret sites database self-checking...')
is_need_update = await self_check(db, site_data, logger, max_connections=args.connections) is_need_update = await self_check(db, site_data, logger, max_connections=args.connections)
if is_need_update: if is_need_update:
if input('Do you want to save changes permanently? [yYnN]\n').lower() == 'y': if input('Do you want to save changes permanently? [yYnN]\n').lower() == 'y':
db.save_to_file(args.json_file) db.save_to_file(args.db_file)
print('Database was successfully updated.') print('Database was successfully updated.')
else: else:
print('Updates will be applied only for current search session.') print('Updates will be applied only for current search session.')
print(db.get_stats(site_data)) print(db.get_scan_stats(site_data))
if args.stats:
print(db.get_db_stats(db.sites_dict))
# Make reports folder is not exists # Make reports folder is not exists
os.makedirs(args.folderoutput, exist_ok=True) os.makedirs(args.folderoutput, exist_ok=True)
@@ -869,12 +309,17 @@ async def main():
else: else:
already_checked.add(username.lower()) already_checked.add(username.lower())
if username in args.ignore_ids_list:
query_notify.warning(f'Skip a search by username {username} cause it\'s marked as ignored.')
continue
# check for characters do not supported by sites generally # check for characters do not supported by sites generally
found_unsupported_chars = set(unsupported_characters).intersection(set(username)) found_unsupported_chars = set(unsupported_characters).intersection(set(username))
if found_unsupported_chars: if found_unsupported_chars:
pretty_chars_str = ','.join(map(lambda s: f'"{s}"', found_unsupported_chars)) pretty_chars_str = ','.join(map(lambda s: f'"{s}"', found_unsupported_chars))
query_notify.warning(f'Found unsupported URL characters: {pretty_chars_str}, skip search by username "{username}"') query_notify.warning(
f'Found unsupported URL characters: {pretty_chars_str}, skip search by username "{username}"')
continue continue
sites_to_check = get_top_sites_for_id(id_type) sites_to_check = get_top_sites_for_id(id_type)
@@ -884,7 +329,7 @@ async def main():
query_notify, query_notify,
proxy=args.proxy, proxy=args.proxy,
timeout=args.timeout, timeout=args.timeout,
recursive_search=recursive_search_enabled, is_parsing_enabled=parsing_enabled,
id_type=id_type, id_type=id_type,
debug=args.verbose, debug=args.verbose,
logger=logger, logger=logger,
@@ -902,11 +347,18 @@ async def main():
# TODO: fix no site data issue # TODO: fix no site data issue
if not dictionary: if not dictionary:
continue continue
new_usernames = dictionary.get('ids_usernames') new_usernames = dictionary.get('ids_usernames')
if new_usernames: if new_usernames:
for u, utype in new_usernames.items(): for u, utype in new_usernames.items():
usernames[u] = utype usernames[u] = utype
for url in dictionary.get('ids_links', []):
for s in db.sites:
u = s.detect_username(url)
if u:
usernames[u] = 'username'
# reporting for a one username # reporting for a one username
if args.xmind: if args.xmind:
filename = report_filepath_tpl.format(username=username, postfix='.xmind') filename = report_filepath_tpl.format(username=username, postfix='.xmind')
@@ -923,6 +375,12 @@ async def main():
save_txt_report(filename, username, results) save_txt_report(filename, username, results)
query_notify.warning(f'TXT report for {username} saved in {filename}') query_notify.warning(f'TXT report for {username} saved in {filename}')
if args.json:
filename = report_filepath_tpl.format(username=username, postfix=f'_{args.json}.json')
save_json_report(filename, username, results, report_type=args.json)
query_notify.warning(f'JSON {args.json} report for {username} saved in {filename}')
# reporting for all the result # reporting for all the result
if general_results: if general_results:
if args.html or args.pdf: if args.html or args.pdf:
@@ -941,7 +399,7 @@ async def main():
save_pdf_report(filename, report_context) save_pdf_report(filename, report_context)
query_notify.warning(f'PDF report on all usernames saved in {filename}') query_notify.warning(f'PDF report on all usernames saved in {filename}')
# update database # update database
db.save_to_file(args.json_file) db.save_to_file(args.db_file)
def run(): def run():
@@ -952,5 +410,6 @@ def run():
print('Maigret is interrupted.') print('Maigret is interrupted.')
sys.exit(1) sys.exit(1)
if __name__ == "__main__": if __name__ == "__main__":
run() run()
+41
View File
@@ -1,4 +1,5 @@
import csv import csv
import json
import io import io
import logging import logging
import os import os
@@ -7,11 +8,17 @@ import xmind
from datetime import datetime from datetime import datetime
from jinja2 import Template from jinja2 import Template
from xhtml2pdf import pisa from xhtml2pdf import pisa
from argparse import ArgumentTypeError
from dateutil.parser import parse as parse_datetime_str from dateutil.parser import parse as parse_datetime_str
from .result import QueryStatus from .result import QueryStatus
from .utils import is_country_tag, CaseConverter, enrich_link_str from .utils import is_country_tag, CaseConverter, enrich_link_str
SUPPORTED_JSON_REPORT_FORMATS = [
'simple',
'ndjson',
]
''' '''
UTILS UTILS
@@ -51,6 +58,10 @@ def save_pdf_report(filename: str, context: dict):
with open(filename, 'w+b') as f: with open(filename, 'w+b') as f:
pisa.pisaDocument(io.StringIO(filled_template), dest=f, default_css=css) pisa.pisaDocument(io.StringIO(filled_template), dest=f, default_css=css)
def save_json_report(filename: str, username: str, results: dict, report_type: str):
with open(filename, 'w', encoding='utf-8') as f:
generate_json_report(username, results, f, report_type=report_type)
''' '''
REPORTS GENERATING REPORTS GENERATING
@@ -225,6 +236,30 @@ def generate_txt_report(username: str, results: dict, file):
file.write(dictionary["url_user"] + "\n") file.write(dictionary["url_user"] + "\n")
file.write(f'Total Websites Username Detected On : {exists_counter}') file.write(f'Total Websites Username Detected On : {exists_counter}')
def generate_json_report(username: str, results: dict, file, report_type):
exists_counter = 0
is_report_per_line = report_type.startswith('ndjson')
all_json = {}
for sitename in results:
site_result = results[sitename]
# TODO: fix no site data issue
if not site_result or site_result.get("status").status != QueryStatus.CLAIMED:
continue
data = dict(site_result)
data['status'] = data['status'].json()
if is_report_per_line:
data['sitename'] = sitename
file.write(json.dumps(data)+'\n')
else:
all_json[sitename] = data
if not is_report_per_line:
file.write(json.dumps(all_json))
''' '''
XMIND 8 Functions XMIND 8 Functions
''' '''
@@ -306,3 +341,9 @@ def design_sheet(sheet, username, results):
currentsublabel.setTitle("%s: %s" % (k, v)) currentsublabel.setTitle("%s: %s" % (k, v))
def check_supported_json_format(value):
if value and not value in SUPPORTED_JSON_REPORT_FORMATS:
raise ArgumentTypeError(f'JSON report type must be one of the following types: '
+ ', '.join(SUPPORTED_JSON_REPORT_FORMATS))
return value
+2321 -2003
View File
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -68,7 +68,7 @@
<div class="row-mb"> <div class="row-mb">
<div class="col-md"> <div class="col-md">
<div class="card flex-md-row mb-4 box-shadow h-md-250"> <div class="card flex-md-row mb-4 box-shadow h-md-250">
<img class="card-img-right flex-auto d-none d-md-block" alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="{{ v.status.ids_data.image or 'https://i.imgur.com/040fmbw.png' }}" data-holder-rendered="true"> <img class="card-img-right flex-auto d-md-block" alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="{{ v.status.ids_data.image or 'https://i.imgur.com/040fmbw.png' }}" data-holder-rendered="true">
<div class="card-body d-flex flex-column align-items-start" style="padding-top: 0;"> <div class="card-body d-flex flex-column align-items-start" style="padding-top: 0;">
<h3 class="mb-0" style="padding-top: 1rem;"> <h3 class="mb-0" style="padding-top: 1rem;">
<a class="text-dark" href="{{ v.url_main }}" target="_blank">{{ k }}</a> <a class="text-dark" href="{{ v.url_main }}" target="_blank">{{ k }}</a>
+10 -1
View File
@@ -1,4 +1,4 @@
"""Sherlock Result Module """Maigret Result Module
This module defines various objects for recording the results of queries. This module defines various objects for recording the results of queries.
""" """
@@ -74,6 +74,15 @@ class QueryResult():
self.ids_data = ids_data self.ids_data = ids_data
self.tags = tags self.tags = tags
def json(self):
return {
'username': self.username,
'site_name': self.site_name,
'url': self.site_url_user,
'status': str(self.status),
'ids': self.ids_data or {},
'tags': self.tags,
}
def __str__(self): def __str__(self):
"""Convert Object To String. """Convert Object To String.
+95 -3
View File
@@ -2,11 +2,21 @@
"""Maigret Sites Information""" """Maigret Sites Information"""
import copy import copy
import json import json
import re
import sys import sys
import requests import requests
from .utils import CaseConverter from .utils import CaseConverter, URLMatcher, is_country_tag
# TODO: move to data.json
SUPPORTED_TAGS = [
'gaming', 'coding', 'photo', 'music', 'blog', 'finance', 'freelance', 'dating',
'tech', 'forum', 'porn', 'erotic', 'webcam', 'video', 'movies', 'hacking', 'art',
'discussion', 'sharing', 'writing', 'wiki', 'business', 'shopping', 'sport',
'books', 'news', 'documents', 'travel', 'maps', 'hobby', 'apps', 'classified',
'career', 'geosocial', 'streaming', 'education', 'networking', 'torrent',
]
class MaigretEngine: class MaigretEngine:
@@ -21,6 +31,16 @@ class MaigretEngine:
class MaigretSite: class MaigretSite:
NOT_SERIALIZABLE_FIELDS = [
'name',
'engineData',
'requestFuture',
'detectedEngine',
'engineObj',
'stats',
'urlRegexp',
]
def __init__(self, name, information): def __init__(self, name, information):
self.name = name self.name = name
@@ -57,10 +77,29 @@ class MaigretSite:
# We do not know the popularity, so make site go to bottom of list. # We do not know the popularity, so make site go to bottom of list.
self.alexa_rank = sys.maxsize self.alexa_rank = sys.maxsize
self.update_detectors()
def __str__(self): def __str__(self):
return f"{self.name} ({self.url_main})" return f"{self.name} ({self.url_main})"
def update_detectors(self):
if 'url' in self.__dict__:
url = self.url
for group in ['urlMain', 'urlSubpath']:
if group in url:
url = url.replace('{'+group+'}', self.__dict__[CaseConverter.camel_to_snake(group)])
self.url_regexp = URLMatcher.make_profile_url_regexp(url, self.regex_check)
def detect_username(self, url: str) -> str:
if self.url_regexp:
import logging
match_groups = self.url_regexp.match(url)
if match_groups:
return match_groups.groups()[-1].rstrip('/')
return None
@property @property
def json(self): def json(self):
result = {} result = {}
@@ -70,7 +109,7 @@ class MaigretSite:
# strip empty elements # strip empty elements
if v in (False, '', [], {}, None, sys.maxsize, 'username'): if v in (False, '', [], {}, None, sys.maxsize, 'username'):
continue continue
if field in ['name', 'engineData', 'requestFuture', 'detectedEngine', 'engineObj', 'stats']: if field in self.NOT_SERIALIZABLE_FIELDS:
continue continue
result[field] = v result[field] = v
@@ -78,6 +117,7 @@ class MaigretSite:
def update(self, updates: dict) -> MaigretSite: def update(self, updates: dict) -> MaigretSite:
self.__dict__.update(updates) self.__dict__.update(updates)
self.update_detectors()
return self return self
@@ -95,6 +135,7 @@ class MaigretSite:
self.__dict__[field] = v self.__dict__[field] = v
self.engine_obj = engine self.engine_obj = engine
self.update_detectors()
return self return self
@@ -103,6 +144,8 @@ class MaigretSite:
return self return self
self.request_future = None self.request_future = None
self.url_regexp = None
self_copy = copy.deepcopy(self) self_copy = copy.deepcopy(self)
engine_data = self_copy.engine_obj.site engine_data = self_copy.engine_obj.site
site_data_keys = list(self_copy.__dict__.keys()) site_data_keys = list(self_copy.__dict__.keys())
@@ -277,7 +320,7 @@ class MaigretDatabase:
return self.load_from_json(data) return self.load_from_json(data)
def get_stats(self, sites_dict): def get_scan_stats(self, sites_dict):
sites = sites_dict or self.sites_dict sites = sites_dict or self.sites_dict
found_flags = {} found_flags = {}
for _, s in sites.items(): for _, s in sites.items():
@@ -286,3 +329,52 @@ class MaigretDatabase:
found_flags[flag] = found_flags.get(flag, 0) + 1 found_flags[flag] = found_flags.get(flag, 0) + 1
return found_flags return found_flags
def get_db_stats(self, sites_dict):
if not sites_dict:
sites_dict = self.sites_dict()
output = ''
disabled_count = 0
total_count = len(sites_dict)
urls = {}
tags = {}
for _, site in sites_dict.items():
if site.disabled:
disabled_count += 1
url = URLMatcher.extract_main_part(site.url)
if url.startswith('{username}'):
url = 'SUBDOMAIN'
elif url == '':
url = f'{site.url} ({site.engine})'
else:
parts = url.split('/')
url = '/' + '/'.join(parts[1:])
urls[url] = urls.get(url, 0) + 1
if not site.tags:
tags['NO_TAGS'] = tags.get('NO_TAGS', 0) + 1
for tag in site.tags:
if is_country_tag(tag):
# currenty do not display country tags
continue
tags[tag] = tags.get(tag, 0) + 1
output += f'Enabled/total sites: {total_count-disabled_count}/{total_count}\n'
output += 'Top sites\' profile URLs:\n'
for url, count in sorted(urls.items(), key=lambda x: x[1], reverse=True)[:20]:
if count == 1:
break
output += f'{count}\t{url}\n'
output += 'Top sites\' tags:\n'
for tag, count in sorted(tags.items(), key=lambda x: x[1], reverse=True):
mark = ''
if not tag in SUPPORTED_TAGS:
mark = ' (non-standard)'
output += f'{count}\t{tag}{mark}\n'
return output
+172
View File
@@ -0,0 +1,172 @@
import difflib
import json
import requests
from mock import Mock
from .checking import *
DESIRED_STRINGS = ["username", "not found", "пользователь", "profile", "lastname", "firstname", "biography",
"birthday", "репутация", "информация", "e-mail"]
RATIO = 0.6
TOP_FEATURES = 5
URL_RE = re.compile(r'https?://(www\.)?')
def get_match_ratio(x):
return round(max([
difflib.SequenceMatcher(a=x.lower(), b=y).ratio()
for y in DESIRED_STRINGS
]), 2)
def extract_domain(url):
return '/'.join(url.split('/', 3)[:3])
async def site_self_check(site, logger, semaphore, db: MaigretDatabase, silent=False):
query_notify = Mock()
changes = {
'disabled': False,
}
check_data = [
(site.username_claimed, QueryStatus.CLAIMED),
(site.username_unclaimed, QueryStatus.AVAILABLE),
]
logger.info(f'Checking {site.name}...')
for username, status in check_data:
async with semaphore:
results_dict = await maigret(
username,
{site.name: site},
query_notify,
logger,
timeout=30,
id_type=site.type,
forced=True,
no_progressbar=True,
)
# don't disable entries with other ids types
# TODO: make normal checking
if site.name not in results_dict:
logger.info(results_dict)
changes['disabled'] = True
continue
result = results_dict[site.name]['status']
site_status = result.status
if site_status != status:
if site_status == QueryStatus.UNKNOWN:
msgs = site.absence_strs
etype = site.check_type
logger.warning(
f'Error while searching {username} in {site.name}: {result.context}, {msgs}, type {etype}')
# don't disable in case of available username
if status == QueryStatus.CLAIMED:
changes['disabled'] = True
elif status == QueryStatus.CLAIMED:
logger.warning(f'Not found `{username}` in {site.name}, must be claimed')
logger.info(results_dict[site.name])
changes['disabled'] = True
else:
logger.warning(f'Found `{username}` in {site.name}, must be available')
logger.info(results_dict[site.name])
changes['disabled'] = True
logger.info(f'Site {site.name} checking is finished')
return changes
async def submit_dialog(db, url_exists):
domain_raw = URL_RE.sub('', url_exists).strip().strip('/')
domain_raw = domain_raw.split('/')[0]
matched_sites = list(filter(lambda x: domain_raw in x.url_main+x.url, db.sites))
if matched_sites:
print(f'Sites with domain "{domain_raw}" already exists in the Maigret database!')
status = lambda s: '(disabled)' if s.disabled else ''
url_block = lambda s: f'\n\t{s.url_main}\n\t{s.url}'
print('\n'.join([f'{site.name} {status(site)}{url_block(site)}' for site in matched_sites]))
return False
url_parts = url_exists.split('/')
supposed_username = url_parts[-1]
new_name = input(f'Is "{supposed_username}" a valid username? If not, write it manually: ')
if new_name:
supposed_username = new_name
non_exist_username = 'noonewouldeverusethis7'
url_user = url_exists.replace(supposed_username, '{username}')
url_not_exists = url_exists.replace(supposed_username, non_exist_username)
a = requests.get(url_exists).text
b = requests.get(url_not_exists).text
tokens_a = set(a.split('"'))
tokens_b = set(b.split('"'))
a_minus_b = tokens_a.difference(tokens_b)
b_minus_a = tokens_b.difference(tokens_a)
top_features_count = int(input(f'Specify count of features to extract [default {TOP_FEATURES}]: ') or TOP_FEATURES)
presence_list = sorted(a_minus_b, key=get_match_ratio, reverse=True)[:top_features_count]
print('Detected text features of existing account: ' + ', '.join(presence_list))
features = input('If features was not detected correctly, write it manually: ')
if features:
presence_list = features.split(',')
absence_list = sorted(b_minus_a, key=get_match_ratio, reverse=True)[:top_features_count]
print('Detected text features of non-existing account: ' + ', '.join(absence_list))
features = input('If features was not detected correctly, write it manually: ')
if features:
absence_list = features.split(',')
url_main = extract_domain(url_exists)
site_data = {
'absenceStrs': absence_list,
'presenseStrs': presence_list,
'url': url_user,
'urlMain': url_main,
'usernameClaimed': supposed_username,
'usernameUnclaimed': non_exist_username,
'checkType': 'message',
}
site = MaigretSite(url_main.split('/')[-1], site_data)
print(site.__dict__)
sem = asyncio.Semaphore(1)
log_level = logging.INFO
logging.basicConfig(
format='[%(filename)s:%(lineno)d] %(levelname)-3s %(asctime)s %(message)s',
datefmt='%H:%M:%S',
level=log_level
)
logger = logging.getLogger('site-submit')
logger.setLevel(log_level)
result = await site_self_check(site, logger, sem, db)
if result['disabled']:
print(f'Sorry, we couldn\'t find params to detect account presence/absence in {site.name}.')
print('Try to run this mode again and increase features count or choose others.')
else:
if input(f'Site {site.name} successfully checked. Do you want to save it in the Maigret DB? [yY] ') in 'yY':
db.update_site(site)
return True
return False
+28 -1
View File
@@ -1,4 +1,5 @@
import re import re
import sys
class CaseConverter: class CaseConverter:
@@ -28,4 +29,30 @@ def enrich_link_str(link: str) -> str:
link = link.strip() link = link.strip()
if link.startswith('www.') or (link.startswith('http') and '//' in link): if link.startswith('www.') or (link.startswith('http') and '//' in link):
return f'<a class="auto-link" href="{link}">{link}</a>' return f'<a class="auto-link" href="{link}">{link}</a>'
return link return link
class URLMatcher:
_HTTP_URL_RE_STR = '^https?://(www.)?(.+)$'
HTTP_URL_RE = re.compile(_HTTP_URL_RE_STR)
UNSAFE_SYMBOLS = '.?'
@classmethod
def extract_main_part(self, url: str) -> str:
match = self.HTTP_URL_RE.search(url)
if match and match.group(2):
return match.group(2).rstrip('/')
return ''
@classmethod
def make_profile_url_regexp(self, url: str, username_regexp: str = ''):
url_main_part = self.extract_main_part(url)
for c in self.UNSAFE_SYMBOLS:
url_main_part = url_main_part.replace(c, f'\\{c}')
username_regexp = username_regexp or '.+?'
url_regexp = url_main_part.replace('{username}', f'({username_regexp})')
regexp_str = self._HTTP_URL_RE_STR.replace('(.+)', url_regexp)
return re.compile(regexp_str)
+3 -4
View File
@@ -1,4 +1,4 @@
aiohttp==3.7.3 aiohttp==3.7.4
aiohttp-socks==0.5.5 aiohttp-socks==0.5.5
arabic-reshaper==2.1.1 arabic-reshaper==2.1.1
async-timeout==3.0.1 async-timeout==3.0.1
@@ -25,16 +25,15 @@ PySocks==1.7.1
python-bidi==0.4.2 python-bidi==0.4.2
python-socks==1.1.2 python-socks==1.1.2
reportlab==3.5.59 reportlab==3.5.59
requests==2.25.1 requests>=2.24.0
requests-futures==1.0.0 requests-futures==1.0.0
six==1.15.0 six==1.15.0
socid-extractor>=0.0.4 socid-extractor>=0.0.13
soupsieve==2.1 soupsieve==2.1
stem==1.8.0 stem==1.8.0
torrequest==0.1.0 torrequest==0.1.0
tqdm==4.55.0 tqdm==4.55.0
typing-extensions==3.7.4.3 typing-extensions==3.7.4.3
urllib3==1.26.2
webencodings==0.5.1 webencodings==0.5.1
xhtml2pdf==0.2.5 xhtml2pdf==0.2.5
XMind==1.2.0 XMind==1.2.0
+1 -1
View File
@@ -12,7 +12,7 @@ with open('requirements.txt') as rf:
requires = rf.read().splitlines() requires = rf.read().splitlines()
setup(name='maigret', setup(name='maigret',
version='0.1.13', version='0.1.15',
description='Collect a dossier on a person by username from a huge number of sites', description='Collect a dossier on a person by username from a huge number of sites',
long_description=long_description, long_description=long_description,
long_description_content_type="text/markdown", long_description_content_type="text/markdown",
+2272 -2242
View File
File diff suppressed because it is too large Load Diff
+28 -1
View File
@@ -1,5 +1,6 @@
"""Maigret reports test functions""" """Maigret reports test functions"""
import copy import copy
import json
import os import os
from io import StringIO from io import StringIO
@@ -7,7 +8,7 @@ import xmind
from jinja2 import Template from jinja2 import Template
from maigret.report import generate_csv_report, generate_txt_report, save_xmind_report, save_html_report, \ from maigret.report import generate_csv_report, generate_txt_report, save_xmind_report, save_html_report, \
save_pdf_report, generate_report_template, generate_report_context save_pdf_report, generate_report_template, generate_report_context, generate_json_report
from maigret.result import QueryResult, QueryStatus from maigret.result import QueryResult, QueryStatus
EXAMPLE_RESULTS = { EXAMPLE_RESULTS = {
@@ -146,6 +147,32 @@ def test_generate_txt_report():
] ]
def test_generate_json_simple_report():
jsonfile = StringIO()
MODIFIED_RESULTS = dict(EXAMPLE_RESULTS)
MODIFIED_RESULTS['GitHub2'] = EXAMPLE_RESULTS['GitHub']
generate_json_report('test', MODIFIED_RESULTS, jsonfile, 'simple')
jsonfile.seek(0)
data = jsonfile.readlines()
assert len(data) == 1
assert list(json.loads(data[0]).keys()) == ['GitHub', 'GitHub2']
def test_generate_json_ndjson_report():
jsonfile = StringIO()
MODIFIED_RESULTS = dict(EXAMPLE_RESULTS)
MODIFIED_RESULTS['GitHub2'] = EXAMPLE_RESULTS['GitHub']
generate_json_report('test', MODIFIED_RESULTS, jsonfile, 'ndjson')
jsonfile.seek(0)
data = jsonfile.readlines()
assert len(data) == 2
assert json.loads(data[0])['sitename'] == 'GitHub'
def test_save_xmind_report(): def test_save_xmind_report():
filename = 'report_test.xmind' filename = 'report_test.xmind'
save_xmind_report(filename, 'test', EXAMPLE_RESULTS) save_xmind_report(filename, 'test', EXAMPLE_RESULTS)
+8
View File
@@ -113,6 +113,14 @@ def test_saving_site_error():
assert amperka.strip_engine_data().json['errors'] == {'error1': 'text1'} assert amperka.strip_engine_data().json['errors'] == {'error1': 'text1'}
def test_site_url_detector():
db = MaigretDatabase()
db.load_from_json(EXAMPLE_DB)
assert db.sites[0].url_regexp.pattern == r'^https?://(www.)?forum\.amperka\.ru/members/\?username=(.+?)$'
assert db.sites[0].detect_username('http://forum.amperka.ru/members/?username=test') == 'test'
def test_ranked_sites_dict(): def test_ranked_sites_dict():
db = MaigretDatabase() db = MaigretDatabase()
db.update_site(MaigretSite('3', {'alexaRank': 1000, 'engine': 'ucoz'})) db.update_site(MaigretSite('3', {'alexaRank': 1000, 'engine': 'ucoz'}))
+33 -1
View File
@@ -1,5 +1,7 @@
"""Maigret utils test functions""" """Maigret utils test functions"""
from maigret.utils import CaseConverter, is_country_tag, enrich_link_str import itertools
import re
from maigret.utils import CaseConverter, is_country_tag, enrich_link_str, URLMatcher
def test_case_convert_camel_to_snake(): def test_case_convert_camel_to_snake():
@@ -32,3 +34,33 @@ def test_is_country_tag():
def test_enrich_link_str(): def test_enrich_link_str():
assert enrich_link_str('test') == 'test' assert enrich_link_str('test') == 'test'
assert enrich_link_str(' www.flickr.com/photos/alexaimephotography/') == '<a class="auto-link" href="www.flickr.com/photos/alexaimephotography/">www.flickr.com/photos/alexaimephotography/</a>' assert enrich_link_str(' www.flickr.com/photos/alexaimephotography/') == '<a class="auto-link" href="www.flickr.com/photos/alexaimephotography/">www.flickr.com/photos/alexaimephotography/</a>'
def test_url_extract_main_part():
url_main_part = 'flickr.com/photos/alexaimephotography'
parts = [
['http://', 'https://'],
['www.', ''],
[url_main_part],
['/', ''],
]
url_regexp = re.compile('^https?://(www.)?flickr.com/photos/(.+?)$')
for url_parts in itertools.product(*parts):
url = ''.join(url_parts)
assert URLMatcher.extract_main_part(url) == url_main_part
assert not url_regexp.match(url) is None
def test_url_make_profile_url_regexp():
url_main_part = 'flickr.com/photos/{username}'
parts = [
['http://', 'https://'],
['www.', ''],
[url_main_part],
['/', ''],
]
for url_parts in itertools.product(*parts):
url = ''.join(url_parts)
assert URLMatcher.make_profile_url_regexp(url).pattern == r'^https?://(www.)?flickr\.com/photos/(.+?)$'
+3 -1
View File
@@ -121,7 +121,9 @@ Rank data fetched from Alexa by domains.
note = '' note = ''
if site.disabled: if site.disabled:
note = ', search is disabled' note = ', search is disabled'
site_file.write(f'1. [{site}]({url_main})*: top {valid_rank}{tags}*{note}\n')
favicon = f"![](https://www.google.com/s2/favicons?domain={url_main})"
site_file.write(f'1. {favicon} [{site}]({url_main})*: top {valid_rank}{tags}*{note}\n')
db.update_site(site) db.update_site(site)
site_file.write(f'\nAlexa.com rank data fetched at ({datetime.utcnow()} UTC)\n') site_file.write(f'\nAlexa.com rank data fetched at ({datetime.utcnow()} UTC)\n')