Compare commits

..

54 Commits

Author SHA1 Message Date
soxoj bdfb4911ce Merge pull request #122 from soxoj/0.2.1-bugfix
Fixed json report generation bug, bump to 0.2.1
2021-05-02 20:14:22 +03:00
Soxoj 951be44452 Fixed test fixture scope 2021-05-02 20:12:36 +03:00
Soxoj 188edc1b7f Fixed json report generation bug, bump to 0.2.1 2021-05-02 20:06:15 +03:00
soxoj ec0d3a1f70 Merge pull request #121 from soxoj/0.2.0
Bump to 0.2.0, yank 0.1.20
2021-05-02 20:02:58 +03:00
Soxoj a084203ee1 Bump to 0.2.0, yank 0.1.20 2021-05-02 20:00:20 +03:00
soxoj 1afdda7336 Merge pull request #119 from soxoj/0.1.20
Bump to 0.1.20
2021-05-02 12:05:08 +03:00
Soxoj 252d12ff9e Bump to 0.1.20 2021-05-02 12:02:53 +03:00
soxoj 6afb17e24f Merge pull request #118 from soxoj/submit-improving-new-sites
Some sites added, submit mode improved
2021-05-02 11:08:52 +03:00
Soxoj 7fdd965bb2 Some sites added, submit mode improved 2021-05-02 11:06:37 +03:00
soxoj 8e30e969f9 Merge pull request #117 from soxoj/retries-refactoring
Introduced `--retries` flag, made thorough refactoring
2021-05-01 23:58:28 +03:00
Soxoj 5ee91f6659 Introduced --retries flag, made thorough refactoring
- updated sites list
- test scripts linting
2021-05-01 23:54:01 +03:00
soxoj 7fd4a2c516 Merge pull request #116 from soxoj/refactoring-errors
Refactoring and linting, added notifications about frequent search errors
2021-04-30 12:06:29 +03:00
Soxoj bfa6afac32 Refactoring and linting, added notifications about frequent search errors 2021-04-30 12:03:13 +03:00
soxoj bfaf276f6e Merge pull request #115 from soxoj/submit-source-improving
Added some new sites, implemented filtering by source site with `--na…
2021-04-29 17:18:31 +03:00
Soxoj c9194b20ba Added some new sites, implemented filtering by source site with --name, improved submit mode 2021-04-29 17:11:17 +03:00
soxoj a30a012550 Merge pull request #114 from soxoj/new-sites-source-feature
Added some new sites and introduced 'source' feature
2021-04-29 15:17:13 +03:00
Soxoj 2cdc9bb276 Added some new sites and introduced 'source' feature 2021-04-29 15:14:21 +03:00
soxoj 99fc6c8a8f Merge pull request #113 from soxoj/errors-stats
Errors stats MVP, some fp fixes
2021-04-25 01:13:39 +03:00
Soxoj b269c4a8e0 Added new modules 2021-04-25 01:12:15 +03:00
Soxoj f43dc5bd6f Errors stats MVP, some fp fixes 2021-04-25 01:08:23 +03:00
soxoj 83cda9e37f Merge pull request #112 from soxoj/tapd-added
Sites update
2021-04-19 00:25:55 +03:00
soxoj cc3df85690 Merge branch 'main' into tapd-added 2021-04-18 22:40:27 +03:00
Soxoj 8007e92021 Sites update 2021-04-18 22:38:30 +03:00
soxoj daaddbde4e Merge pull request #111 from soxoj/fp-fixes-18-04-21
Some false positives fixes
2021-04-18 15:26:11 +03:00
Soxoj cea5073962 Some false positives fixes 2021-04-18 15:20:35 +03:00
soxoj b345512489 Merge pull request #110 from soxoj/0.1.19
Bump to 0.1.19
2021-04-14 23:16:30 +03:00
Soxoj 786cb59145 Bump to 0.1.19 2021-04-14 23:14:33 +03:00
soxoj 481baddec6 Merge pull request #109 from soxoj/fp-fixes
Some false positive fixes
2021-04-12 23:18:47 +03:00
Soxoj ecb3d76581 Some false positive fixes 2021-04-12 23:16:26 +03:00
soxoj 8a8fab5bed Merge pull request #108 from soxoj/async-tasks-timeout
Added asyncio tasks with timeouts, non-blocking work with queue
2021-04-12 23:01:59 +03:00
Soxoj 2fee65fe4e Added asyncio tasks with timeouts, non-blocking work with queue 2021-04-11 17:56:27 +03:00
soxoj dabba859f3 Merge pull request #107 from soxoj/main-module-bugfix
Fixed maigret-as-a-module start
2021-04-06 00:36:45 +03:00
Soxoj 74d4d40abd Fixed maigret-as-a-module start 2021-04-06 00:33:39 +03:00
soxoj d6f6d78d3f Merge pull request #104 from soxoj/ascii-tree-bugfix
Fixed ascii tree bug
2021-04-02 09:08:14 +03:00
Soxoj 1b61c5085e Fixed ascii tree bug 2021-04-02 09:03:22 +03:00
soxoj 01e20518c1 Merge pull request #100 from soxoj/fp-fixes
Fixed some false positives
2021-03-31 23:20:18 +03:00
Soxoj 8477385289 Fixed some false positives 2021-03-31 23:17:47 +03:00
soxoj 491dd8f166 Merge pull request #99 from soxoj/no-progressbar-option
Added `--no-progressbar` flag
2021-03-30 19:47:42 +03:00
Soxoj c64b7a1c85 Added --no-progressbar flag 2021-03-30 19:44:01 +03:00
soxoj 03511a7a8f Merge pull request #97 from soxoj/wizard
Some API improvements
2021-03-30 01:16:12 +03:00
Soxoj 7f1a0fae03 Some API improvements 2021-03-30 01:14:46 +03:00
soxoj b0de174df2 Merge pull request #96 from soxoj/wizard
Added search wizard script as an API usage example
2021-03-30 01:11:12 +03:00
Soxoj b5db3f0035 Added search wizard script as an API usage example 2021-03-30 01:09:06 +03:00
soxoj 53d698bb7b Merge pull request #95 from soxoj/socid-bump
Updated socid_extractor version
2021-03-30 00:37:02 +03:00
soxoj 23fff42ca7 Merge pull request #94 from soxoj/dependabot/pip/lxml-4.6.3
Bump lxml from 4.6.2 to 4.6.3
2021-03-30 00:34:13 +03:00
Soxoj 51d9e6f5f6 Bump to v0.1.17 2021-03-30 00:33:51 +03:00
Soxoj 640c04f20b Updated socid_extractor version 2021-03-30 00:31:40 +03:00
dependabot[bot] 69f78e331b Bump lxml from 4.6.2 to 4.6.3
Bumps [lxml](https://github.com/lxml/lxml) from 4.6.2 to 4.6.3.
- [Release notes](https://github.com/lxml/lxml/releases)
- [Changelog](https://github.com/lxml/lxml/blob/master/CHANGES.txt)
- [Commits](https://github.com/lxml/lxml/compare/lxml-4.6.2...lxml-4.6.3)

Signed-off-by: dependabot[bot] <support@github.com>
2021-03-29 21:25:19 +00:00
soxoj 69c315b00e Merge pull request #93 from soxoj/docs-requirements
Documentation and API improving
2021-03-30 00:24:49 +03:00
Soxoj b755628a1d Documentation and API improving 2021-03-30 00:19:17 +03:00
soxoj 7490a412db Merge pull request #92 from soxoj/ignore403-bugfix
Fixed bug with ignore403 for engine-based sites
2021-03-28 17:40:35 +03:00
Soxoj 2741680d4a Fixed bug with ignore403 for engine-based sites 2021-03-28 17:37:18 +03:00
soxoj e5fc221ce2 Merge pull request #91 from soxoj/async-3.6.9-fix
Fix of 3.6.9 asyncio create_task error
2021-03-24 21:43:11 +03:00
Soxoj a044e3dd79 Fix of 3.6.9 asyncio create_task error 2021-03-24 21:37:56 +03:00
35 changed files with 8675 additions and 6084 deletions
+1 -1
View File
@@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.6, 3.7, 3.8, 3.9]
python-version: [3.6.9, 3.7, 3.8, 3.9]
steps:
- uses: actions/checkout@v2
+25
View File
@@ -2,6 +2,31 @@
## [Unreleased]
## [0.2.1] - 2021-05-02
* fixed json reports generation bug, added tests
## [0.2.0] - 2021-05-02
* added `--retries` option
* added `source` feature for sites' mirrors
* improved `submit` mode
* lot of style and logic fixes
## [0.1.20] - 2021-05-02 [YANKED]
## [0.1.19] - 2021-04-14
* added `--no-progressbar` option
* fixed ascii tree bug
* fixed `python -m maigret` run
* fixed requests freeze with timeout async tasks
## [0.1.18] - 2021-03-30
* some API improvements
## [0.1.17] - 2021-03-30
* simplified maigret search API
* improved documentation
* fixed 403 response code ignoring bug
## [0.1.16] - 2021-03-21
* improved URL parsing mode
* improved sites submit mode
+2 -1
View File
@@ -26,6 +26,7 @@ Currently supported more than 2000 sites ([full list](./sites.md)), by default s
* Search by tags (site categories, countries)
* Censorship and captcha detection
* Very few false positives
* Failed requests' restarts
## Installation
@@ -49,7 +50,7 @@ pip3 install .
git clone https://github.com/soxoj/maigret && cd maigret
```
You can use your a free virtual machine, the repo will be automatically cloned:
You can use a free virtual machine, the repo will be automatically cloned:
[![Open in Cloud Shell](https://user-images.githubusercontent.com/27065646/92304704-8d146d80-ef80-11ea-8c29-0deaabb1c702.png)](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/soxoj/maigret&tutorial=README.md) [![Run on Repl.it](https://user-images.githubusercontent.com/27065646/92304596-bf719b00-ef7f-11ea-987f-2c1f3c323088.png)](https://repl.it/github/soxoj/maigret)
<a href="https://colab.research.google.com/gist//soxoj/879b51bc3b2f8b695abb054090645000/maigret.ipynb"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab" height="40"></a>
Executable
+5
View File
@@ -0,0 +1,5 @@
#!/bin/sh
FILES="maigret wizard.py maigret.py tests"
echo 'black'
black --skip-string-normalization $FILES
Executable
+11
View File
@@ -0,0 +1,11 @@
#!/bin/sh
FILES="maigret wizard.py maigret.py tests"
echo 'syntax errors or undefined names'
flake8 --count --select=E9,F63,F7,F82 --show-source --statistics $FILES
echo 'warning'
flake8 --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics --ignore=E731,W503 $FILES
echo 'mypy'
mypy ./maigret ./wizard.py ./tests
+1 -1
View File
@@ -1,4 +1,4 @@
#! /usr/bin/env python3
#!/usr/bin/env python3
import asyncio
import sys
+4
View File
@@ -1 +1,5 @@
"""Maigret"""
from .checking import maigret as search
from .sites import MaigretEngine, MaigretSite, MaigretDatabase
from .notify import QueryNotifyPrint as Notifier
+2 -2
View File
@@ -6,7 +6,7 @@ Maigret entrypoint
import asyncio
import maigret
from .maigret import main
if __name__ == "__main__":
asyncio.run(maigret.main())
asyncio.run(main())
+25 -23
View File
@@ -9,46 +9,48 @@ class ParsingActivator:
@staticmethod
def twitter(site, logger, cookies={}):
headers = dict(site.headers)
del headers['x-guest-token']
r = requests.post(site.activation['url'], headers=headers)
del headers["x-guest-token"]
r = requests.post(site.activation["url"], headers=headers)
logger.info(r)
j = r.json()
guest_token = j[site.activation['src']]
site.headers['x-guest-token'] = guest_token
guest_token = j[site.activation["src"]]
site.headers["x-guest-token"] = guest_token
@staticmethod
def vimeo(site, logger, cookies={}):
headers = dict(site.headers)
if 'Authorization' in headers:
del headers['Authorization']
r = requests.get(site.activation['url'], headers=headers)
jwt_token = r.json()['jwt']
site.headers['Authorization'] = 'jwt ' + jwt_token
if "Authorization" in headers:
del headers["Authorization"]
r = requests.get(site.activation["url"], headers=headers)
jwt_token = r.json()["jwt"]
site.headers["Authorization"] = "jwt " + jwt_token
@staticmethod
def spotify(site, logger, cookies={}):
headers = dict(site.headers)
if 'Authorization' in headers:
del headers['Authorization']
r = requests.get(site.activation['url'])
bearer_token = r.json()['accessToken']
site.headers['authorization'] = f'Bearer {bearer_token}'
if "Authorization" in headers:
del headers["Authorization"]
r = requests.get(site.activation["url"])
bearer_token = r.json()["accessToken"]
site.headers["authorization"] = f"Bearer {bearer_token}"
@staticmethod
def xssis(site, logger, cookies={}):
if not cookies:
logger.debug('You must have cookies to activate xss.is parsing!')
logger.debug("You must have cookies to activate xss.is parsing!")
return
headers = dict(site.headers)
post_data = {
'_xfResponseType': 'json',
'_xfToken': '1611177919,a2710362e45dad9aa1da381e21941a38'
"_xfResponseType": "json",
"_xfToken": "1611177919,a2710362e45dad9aa1da381e21941a38",
}
headers['content-type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
r = requests.post(site.activation['url'], headers=headers, cookies=cookies, data=post_data)
csrf = r.json()['csrf']
site.get_params['_xfToken'] = csrf
headers["content-type"] = "application/x-www-form-urlencoded; charset=UTF-8"
r = requests.post(
site.activation["url"], headers=headers, cookies=cookies, data=post_data
)
csrf = r.json()["csrf"]
site.get_params["_xfToken"] = csrf
async def import_aiohttp_cookies(cookiestxt_filename):
@@ -62,8 +64,8 @@ async def import_aiohttp_cookies(cookiestxt_filename):
for key, cookie in list(domain.values())[0].items():
c = Morsel()
c.set(key, cookie.value, cookie.value)
c['domain'] = cookie.domain
c['path'] = cookie.path
c["domain"] = cookie.domain
c["path"] = cookie.path
cookies_list.append((key, c))
cookies.update_cookies(cookies_list)
+425 -410
View File
File diff suppressed because it is too large Load Diff
+115
View File
@@ -0,0 +1,115 @@
from typing import Dict, List, Any
from .result import QueryResult
# error got as a result of completed search query
class CheckError:
_type = 'Unknown'
_desc = ''
def __init__(self, typename, desc=''):
self._type = typename
self._desc = desc
def __str__(self):
if not self._desc:
return f'{self._type} error'
return f'{self._type} error: {self._desc}'
@property
def type(self):
return self._type
@property
def desc(self):
return self._desc
COMMON_ERRORS = {
'<title>Attention Required! | Cloudflare</title>': CheckError(
'Captcha', 'Cloudflare'
),
'Please stand by, while we are checking your browser': CheckError(
'Bot protection', 'Cloudflare'
),
'<title>Доступ ограничен</title>': CheckError('Censorship', 'Rostelecom'),
'document.getElementById(\'validate_form_submit\').disabled=true': CheckError(
'Captcha', 'Mail.ru'
),
'Verifying your browser, please wait...<br>DDoS Protection by</font> Blazingfast.io': CheckError(
'Bot protection', 'Blazingfast'
),
'404</h1><p class="error-card__description">Мы&nbsp;не&nbsp;нашли страницу': CheckError(
'Resolving', 'MegaFon 404 page'
),
'Доступ к информационному ресурсу ограничен на основании Федерального закона': CheckError(
'Censorship', 'MGTS'
),
'Incapsula incident ID': CheckError('Bot protection', 'Incapsula'),
}
ERRORS_TYPES = {
'Captcha': 'Try to switch to another IP address or to use service cookies',
'Bot protection': 'Try to switch to another IP address',
'Censorship': 'switch to another internet service provider',
'Request timeout': 'Try to increase timeout or to switch to another internet service provider',
}
TEMPORARY_ERRORS_TYPES = [
'Request timeout',
'Unknown',
'Request failed',
'Connecting failure',
'HTTP',
'Proxy',
'Interrupted',
'Connection lost',
]
THRESHOLD = 3 # percent
def is_important(err_data):
return err_data['perc'] >= THRESHOLD
def is_permanent(err_type):
return err_type not in TEMPORARY_ERRORS_TYPES
def detect(text):
for flag, err in COMMON_ERRORS.items():
if flag in text:
return err
return None
def solution_of(err_type) -> str:
return ERRORS_TYPES.get(err_type, '')
def extract_and_group(search_res: dict) -> List[Dict[str, Any]]:
errors_counts: Dict[str, int] = {}
for r in search_res:
if r and isinstance(r, dict) and r.get('status'):
if not isinstance(r['status'], QueryResult):
continue
err = r['status'].error
if not err:
continue
errors_counts[err.type] = errors_counts.get(err.type, 0) + 1
counts = []
for err, count in sorted(errors_counts.items(), key=lambda x: x[1], reverse=True):
counts.append(
{
'err': err,
'count': count,
'perc': round(count / len(search_res), 2) * 100,
}
)
return counts
+118
View File
@@ -0,0 +1,118 @@
import asyncio
import time
import tqdm
import sys
from typing import Iterable, Any, List
from .types import QueryDraft
def create_task_func():
if sys.version_info.minor > 6:
create_asyncio_task = asyncio.create_task
else:
loop = asyncio.get_event_loop()
create_asyncio_task = loop.create_task
return create_asyncio_task
class AsyncExecutor:
def __init__(self, *args, **kwargs):
self.logger = kwargs['logger']
async def run(self, tasks: Iterable[QueryDraft]):
start_time = time.time()
results = await self._run(tasks)
self.execution_time = time.time() - start_time
self.logger.debug(f'Spent time: {self.execution_time}')
return results
async def _run(self, tasks: Iterable[QueryDraft]):
await asyncio.sleep(0)
class AsyncioSimpleExecutor(AsyncExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def _run(self, tasks: Iterable[QueryDraft]):
futures = [f(*args, **kwargs) for f, args, kwargs in tasks]
return await asyncio.gather(*futures)
class AsyncioProgressbarExecutor(AsyncExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def _run(self, tasks: Iterable[QueryDraft]):
futures = [f(*args, **kwargs) for f, args, kwargs in tasks]
results = []
for f in tqdm.asyncio.tqdm.as_completed(futures):
results.append(await f)
return results
class AsyncioProgressbarSemaphoreExecutor(AsyncExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.semaphore = asyncio.Semaphore(kwargs.get('in_parallel', 1))
async def _run(self, tasks: Iterable[QueryDraft]):
async def _wrap_query(q: QueryDraft):
async with self.semaphore:
f, args, kwargs = q
return await f(*args, **kwargs)
async def semaphore_gather(tasks: Iterable[QueryDraft]):
coros = [_wrap_query(q) for q in tasks]
results = []
for f in tqdm.asyncio.tqdm.as_completed(coros):
results.append(await f)
return results
return await semaphore_gather(tasks)
class AsyncioProgressbarQueueExecutor(AsyncExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.workers_count = kwargs.get('in_parallel', 10)
self.progress_func = kwargs.get('progress_func', tqdm.tqdm)
self.queue = asyncio.Queue(self.workers_count)
self.timeout = kwargs.get('timeout')
async def worker(self):
while True:
try:
f, args, kwargs = self.queue.get_nowait()
except asyncio.QueueEmpty:
return
query_future = f(*args, **kwargs)
query_task = create_task_func()(query_future)
try:
result = await asyncio.wait_for(query_task, timeout=self.timeout)
except asyncio.TimeoutError:
result = kwargs.get('default')
self.results.append(result)
self.progress.update(1)
self.queue.task_done()
async def _run(self, queries: Iterable[QueryDraft]):
self.results: List[Any] = []
queries_list = list(queries)
min_workers = min(len(queries_list), self.workers_count)
workers = [create_task_func()(self.worker()) for _ in range(min_workers)]
self.progress = self.progress_func(total=len(queries_list))
for t in queries_list:
await self.queue.put(t)
await self.queue.join()
for w in workers:
w.cancel()
self.progress.close()
return self.results
+397 -203
View File
@@ -12,185 +12,349 @@ from argparse import ArgumentParser, RawDescriptionHelpFormatter
import requests
from socid_extractor import extract, parse, __version__ as socid_version
from .checking import timeout_check, supported_recursive_search_ids, self_check, unsupported_characters, maigret
from .checking import (
timeout_check,
supported_recursive_search_ids,
self_check,
unsupported_characters,
maigret,
)
from . import errors
from .notify import QueryNotifyPrint
from .report import save_csv_report, save_xmind_report, save_html_report, save_pdf_report, \
generate_report_context, save_txt_report, SUPPORTED_JSON_REPORT_FORMATS, check_supported_json_format, \
save_json_report
from .report import (
save_csv_report,
save_xmind_report,
save_html_report,
save_pdf_report,
generate_report_context,
save_txt_report,
SUPPORTED_JSON_REPORT_FORMATS,
check_supported_json_format,
save_json_report,
)
from .sites import MaigretDatabase
from .submit import submit_dialog
from .utils import get_dict_ascii_tree
__version__ = '0.1.16'
__version__ = '0.2.1'
def notify_about_errors(search_results, query_notify):
errs = errors.extract_and_group(search_results.values())
was_errs_displayed = False
for e in errs:
if not errors.is_important(e):
continue
text = f'Too many errors of type "{e["err"]}" ({e["perc"]}%)'
solution = errors.solution_of(e['err'])
if solution:
text = '. '.join([text, solution])
query_notify.warning(text, '!')
was_errs_displayed = True
if was_errs_displayed:
query_notify.warning(
'You can see detailed site check errors with a flag `--print-errors`'
)
def setup_arguments_parser():
version_string = '\n'.join(
[
f'%(prog)s {__version__}',
f'Socid-extractor: {socid_version}',
f'Aiohttp: {aiohttp.__version__}',
f'Requests: {requests.__version__}',
f'Python: {platform.python_version()}',
]
)
parser = ArgumentParser(
formatter_class=RawDescriptionHelpFormatter,
description=f"Maigret v{__version__}",
)
parser.add_argument(
"--version",
action="version",
version=version_string,
help="Display version information and dependencies.",
)
parser.add_argument(
"--info",
"-vv",
action="store_true",
dest="info",
default=False,
help="Display service information.",
)
parser.add_argument(
"--verbose",
"-v",
action="store_true",
dest="verbose",
default=False,
help="Display extra information and metrics.",
)
parser.add_argument(
"-d",
"--debug",
"-vvv",
action="store_true",
dest="debug",
default=False,
help="Saving debugging information and sites responses in debug.txt.",
)
parser.add_argument(
"--site",
action="append",
metavar='SITE_NAME',
dest="site_list",
default=[],
help="Limit analysis to just the listed sites (use several times to specify more than one)",
)
parser.add_argument(
"--proxy",
"-p",
metavar='PROXY_URL',
action="store",
dest="proxy",
default=None,
help="Make requests over a proxy. e.g. socks5://127.0.0.1:1080",
)
parser.add_argument(
"--db",
metavar="DB_FILE",
dest="db_file",
default=None,
help="Load Maigret database from a JSON file or an online, valid, JSON file.",
)
parser.add_argument(
"--cookies-jar-file",
metavar="COOKIE_FILE",
dest="cookie_file",
default=None,
help="File with cookies.",
)
parser.add_argument(
"--timeout",
action="store",
metavar='TIMEOUT',
dest="timeout",
type=timeout_check,
default=30,
help="Time (in seconds) to wait for response to requests. "
"Default timeout of 30.0s. "
"A longer timeout will be more likely to get results from slow sites. "
"On the other hand, this may cause a long delay to gather all results. ",
)
parser.add_argument(
"--retries",
action="store",
type=int,
metavar='RETRIES',
default=1,
help="Attempts to restart temporary failed requests.",
)
parser.add_argument(
"-n",
"--max-connections",
action="store",
type=int,
dest="connections",
default=100,
help="Allowed number of concurrent connections.",
)
parser.add_argument(
"-a",
"--all-sites",
action="store_true",
dest="all_sites",
default=False,
help="Use all sites for scan.",
)
parser.add_argument(
"--top-sites",
action="store",
default=500,
type=int,
help="Count of sites for scan ranked by Alexa Top (default: 500).",
)
parser.add_argument(
"--print-not-found",
action="store_true",
dest="print_not_found",
default=False,
help="Print sites where the username was not found.",
)
parser.add_argument(
"--print-errors",
action="store_true",
dest="print_check_errors",
default=False,
help="Print errors messages: connection, captcha, site country ban, etc.",
)
parser.add_argument(
"--submit",
metavar='EXISTING_USER_URL',
type=str,
dest="new_site_to_submit",
default=False,
help="URL of existing profile in new site to submit.",
)
parser.add_argument(
"--no-color",
action="store_true",
dest="no_color",
default=False,
help="Don't color terminal output",
)
parser.add_argument(
"--no-progressbar",
action="store_true",
dest="no_progressbar",
default=False,
help="Don't show progressbar.",
)
parser.add_argument(
"--browse",
"-b",
action="store_true",
dest="browse",
default=False,
help="Browse to all results on default bowser.",
)
parser.add_argument(
"--no-recursion",
action="store_true",
dest="disable_recursive_search",
default=False,
help="Disable recursive search by additional data extracted from pages.",
)
parser.add_argument(
"--no-extracting",
action="store_true",
dest="disable_extracting",
default=False,
help="Disable parsing pages for additional data and other usernames.",
)
parser.add_argument(
"--self-check",
action="store_true",
default=False,
help="Do self check for sites and database and disable non-working ones.",
)
parser.add_argument(
"--stats", action="store_true", default=False, help="Show database statistics."
)
parser.add_argument(
"--use-disabled-sites",
action="store_true",
default=False,
help="Use disabled sites to search (may cause many false positives).",
)
parser.add_argument(
"--parse",
dest="parse_url",
default='',
help="Parse page by URL and extract username and IDs to use for search.",
)
parser.add_argument(
"--id-type",
dest="id_type",
default='username',
help="Specify identifier(s) type (default: username).",
)
parser.add_argument(
"--ignore-ids",
action="append",
metavar='IGNORED_IDS',
dest="ignore_ids_list",
default=[],
help="Do not make search by the specified username or other ids.",
)
parser.add_argument(
"username",
nargs='+',
metavar='USERNAMES',
action="store",
help="One or more usernames to check with social networks.",
)
parser.add_argument(
"--tags", dest="tags", default='', help="Specify tags of sites."
)
# reports options
parser.add_argument(
"--folderoutput",
"-fo",
dest="folderoutput",
default="reports",
help="If using multiple usernames, the output of the results will be saved to this folder.",
)
parser.add_argument(
"-T",
"--txt",
action="store_true",
dest="txt",
default=False,
help="Create a TXT report (one report per username).",
)
parser.add_argument(
"-C",
"--csv",
action="store_true",
dest="csv",
default=False,
help="Create a CSV report (one report per username).",
)
parser.add_argument(
"-H",
"--html",
action="store_true",
dest="html",
default=False,
help="Create an HTML report file (general report on all usernames).",
)
parser.add_argument(
"-X",
"--xmind",
action="store_true",
dest="xmind",
default=False,
help="Generate an XMind 8 mindmap report (one report per username).",
)
parser.add_argument(
"-P",
"--pdf",
action="store_true",
dest="pdf",
default=False,
help="Generate a PDF report (general report on all usernames).",
)
parser.add_argument(
"-J",
"--json",
action="store",
metavar='REPORT_TYPE',
dest="json",
default='',
type=check_supported_json_format,
help=f"Generate a JSON report of specific type: {', '.join(SUPPORTED_JSON_REPORT_FORMATS)}"
" (one report per username).",
)
return parser
async def main():
version_string = '\n'.join([
f'%(prog)s {__version__}',
f'Socid-extractor: {socid_version}',
f'Aiohttp: {aiohttp.__version__}',
f'Requests: {requests.__version__}',
f'Python: {platform.python_version()}',
])
parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
description=f"Maigret v{__version__}"
)
parser.add_argument("--version",
action="version", version=version_string,
help="Display version information and dependencies."
)
parser.add_argument("--info", "-vv",
action="store_true", dest="info", default=False,
help="Display service information."
)
parser.add_argument("--verbose", "-v",
action="store_true", dest="verbose", default=False,
help="Display extra information and metrics."
)
parser.add_argument("-d", "--debug", "-vvv",
action="store_true", dest="debug", default=False,
help="Saving debugging information and sites responses in debug.txt."
)
parser.add_argument("--site",
action="append", metavar='SITE_NAME',
dest="site_list", default=[],
help="Limit analysis to just the listed sites (use several times to specify more than one)"
)
parser.add_argument("--proxy", "-p", metavar='PROXY_URL',
action="store", dest="proxy", default=None,
help="Make requests over a proxy. e.g. socks5://127.0.0.1:1080"
)
parser.add_argument("--db", metavar="DB_FILE",
dest="db_file", default=None,
help="Load Maigret database from a JSON file or an online, valid, JSON file.")
parser.add_argument("--cookies-jar-file", metavar="COOKIE_FILE",
dest="cookie_file", default=None,
help="File with cookies.")
parser.add_argument("--timeout",
action="store", metavar='TIMEOUT',
dest="timeout", type=timeout_check, default=10,
help="Time (in seconds) to wait for response to requests."
"Default timeout of 10.0s. "
"A longer timeout will be more likely to get results from slow sites."
"On the other hand, this may cause a long delay to gather all results."
)
parser.add_argument("-n", "--max-connections",
action="store", type=int,
dest="connections", default=100,
help="Allowed number of concurrent connections."
)
parser.add_argument("-a", "--all-sites",
action="store_true", dest="all_sites", default=False,
help="Use all sites for scan."
)
parser.add_argument("--top-sites",
action="store", default=500, type=int,
help="Count of sites for scan ranked by Alexa Top (default: 500)."
)
parser.add_argument("--print-not-found",
action="store_true", dest="print_not_found", default=False,
help="Print sites where the username was not found."
)
parser.add_argument("--print-errors",
action="store_true", dest="print_check_errors", default=False,
help="Print errors messages: connection, captcha, site country ban, etc."
)
parser.add_argument("--submit", metavar='EXISTING_USER_URL',
type=str, dest="new_site_to_submit", default=False,
help="URL of existing profile in new site to submit."
)
parser.add_argument("--no-color",
action="store_true", dest="no_color", default=False,
help="Don't color terminal output"
)
parser.add_argument("--browse", "-b",
action="store_true", dest="browse", default=False,
help="Browse to all results on default bowser."
)
parser.add_argument("--no-recursion",
action="store_true", dest="disable_recursive_search", default=False,
help="Disable recursive search by additional data extracted from pages."
)
parser.add_argument("--no-extracting",
action="store_true", dest="disable_extracting", default=False,
help="Disable parsing pages for additional data and other usernames."
)
parser.add_argument("--self-check",
action="store_true", default=False,
help="Do self check for sites and database and disable non-working ones."
)
parser.add_argument("--stats",
action="store_true", default=False,
help="Show database statistics."
)
parser.add_argument("--use-disabled-sites",
action="store_true", default=False,
help="Use disabled sites to search (may cause many false positives)."
)
parser.add_argument("--parse",
dest="parse_url", default='',
help="Parse page by URL and extract username and IDs to use for search."
)
parser.add_argument("--id-type",
dest="id_type", default='username',
help="Specify identifier(s) type (default: username)."
)
parser.add_argument("--ignore-ids",
action="append", metavar='IGNORED_IDS',
dest="ignore_ids_list", default=[],
help="Do not make search by the specified username or other ids."
)
parser.add_argument("username",
nargs='+', metavar='USERNAMES',
action="store",
help="One or more usernames to check with social networks."
)
parser.add_argument("--tags",
dest="tags", default='',
help="Specify tags of sites."
)
# reports options
parser.add_argument("--folderoutput", "-fo", dest="folderoutput", default="reports",
help="If using multiple usernames, the output of the results will be saved to this folder."
)
parser.add_argument("-T", "--txt",
action="store_true", dest="txt", default=False,
help="Create a TXT report (one report per username)."
)
parser.add_argument("-C", "--csv",
action="store_true", dest="csv", default=False,
help="Create a CSV report (one report per username)."
)
parser.add_argument("-H", "--html",
action="store_true", dest="html", default=False,
help="Create an HTML report file (general report on all usernames)."
)
parser.add_argument("-X", "--xmind",
action="store_true",
dest="xmind", default=False,
help="Generate an XMind 8 mindmap report (one report per username)."
)
parser.add_argument("-P", "--pdf",
action="store_true",
dest="pdf", default=False,
help="Generate a PDF report (general report on all usernames)."
)
parser.add_argument("-J", "--json",
action="store", metavar='REPORT_TYPE',
dest="json", default='', type=check_supported_json_format,
help=f"Generate a JSON report of specific type: {', '.join(SUPPORTED_JSON_REPORT_FORMATS)}"
" (one report per username)."
)
args = parser.parse_args()
arg_parser = setup_arguments_parser()
args = arg_parser.parse_args()
# Logging
log_level = logging.ERROR
logging.basicConfig(
format='[%(filename)s:%(lineno)d] %(levelname)-3s %(asctime)s %(message)s',
datefmt='%H:%M:%S',
level=log_level
level=log_level,
)
if args.debug:
@@ -207,8 +371,7 @@ async def main():
usernames = {
u: args.id_type
for u in args.username
if u not in ['-']
and u not in args.ignore_ids_list
if u not in ['-'] and u not in args.ignore_ids_list
}
parsing_enabled = not args.disable_extracting
@@ -224,8 +387,10 @@ async def main():
try:
# temporary workaround for URL mutations MVP
from socid_extractor import mutate_url
reqs += list(mutate_url(args.parse_url))
except:
except Exception as e:
logger.warning(e)
pass
for req in reqs:
@@ -247,38 +412,47 @@ async def main():
args.tags = list(set(str(args.tags).split(',')))
if args.db_file is None:
args.db_file = \
os.path.join(os.path.dirname(os.path.realpath(__file__)),
"resources/data.json"
)
args.db_file = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "resources/data.json"
)
if args.top_sites == 0 or args.all_sites:
args.top_sites = sys.maxsize
# Create notify object for query results.
query_notify = QueryNotifyPrint(result=None,
verbose=args.verbose,
print_found_only=not args.print_not_found,
skip_check_errors=not args.print_check_errors,
color=not args.no_color)
query_notify = QueryNotifyPrint(
result=None,
verbose=args.verbose,
print_found_only=not args.print_not_found,
skip_check_errors=not args.print_check_errors,
color=not args.no_color,
)
# Create object with all information about sites we are aware of.
db = MaigretDatabase().load_from_file(args.db_file)
get_top_sites_for_id = lambda x: db.ranked_sites_dict(top=args.top_sites, tags=args.tags,
names=args.site_list,
disabled=False, id_type=x)
get_top_sites_for_id = lambda x: db.ranked_sites_dict(
top=args.top_sites,
tags=args.tags,
names=args.site_list,
disabled=False,
id_type=x,
)
site_data = get_top_sites_for_id(args.id_type)
if args.new_site_to_submit:
is_submitted = await submit_dialog(db, args.new_site_to_submit, args.cookie_file)
is_submitted = await submit_dialog(
db, args.new_site_to_submit, args.cookie_file, logger
)
if is_submitted:
db.save_to_file(args.db_file)
# Database self-checking
if args.self_check:
print('Maigret sites database self-checking...')
is_need_update = await self_check(db, site_data, logger, max_connections=args.connections)
is_need_update = await self_check(
db, site_data, logger, max_connections=args.connections
)
if is_need_update:
if input('Do you want to save changes permanently? [Yn]\n').lower() == 'y':
db.save_to_file(args.db_file)
@@ -310,9 +484,13 @@ async def main():
query_notify.warning('No sites to check, exiting!')
sys.exit(2)
else:
query_notify.warning(f'Starting a search on top {len(site_data)} sites from the Maigret database...')
query_notify.warning(
f'Starting a search on top {len(site_data)} sites from the Maigret database...'
)
if not args.all_sites:
query_notify.warning(f'You can run search by full list of sites with flag `-a`', '!')
query_notify.warning(
'You can run search by full list of sites with flag `-a`', '!'
)
already_checked = set()
general_results = []
@@ -327,33 +505,45 @@ async def main():
already_checked.add(username.lower())
if username in args.ignore_ids_list:
query_notify.warning(f'Skip a search by username {username} cause it\'s marked as ignored.')
query_notify.warning(
f'Skip a search by username {username} cause it\'s marked as ignored.'
)
continue
# check for characters do not supported by sites generally
found_unsupported_chars = set(unsupported_characters).intersection(set(username))
found_unsupported_chars = set(unsupported_characters).intersection(
set(username)
)
if found_unsupported_chars:
pretty_chars_str = ','.join(map(lambda s: f'"{s}"', found_unsupported_chars))
pretty_chars_str = ','.join(
map(lambda s: f'"{s}"', found_unsupported_chars)
)
query_notify.warning(
f'Found unsupported URL characters: {pretty_chars_str}, skip search by username "{username}"')
f'Found unsupported URL characters: {pretty_chars_str}, skip search by username "{username}"'
)
continue
sites_to_check = get_top_sites_for_id(id_type)
results = await maigret(username,
dict(sites_to_check),
query_notify,
proxy=args.proxy,
timeout=args.timeout,
is_parsing_enabled=parsing_enabled,
id_type=id_type,
debug=args.verbose,
logger=logger,
cookies=args.cookie_file,
forced=args.use_disabled_sites,
max_connections=args.connections,
)
results = await maigret(
username=username,
site_dict=dict(sites_to_check),
query_notify=query_notify,
proxy=args.proxy,
timeout=args.timeout,
is_parsing_enabled=parsing_enabled,
id_type=id_type,
debug=args.verbose,
logger=logger,
cookies=args.cookie_file,
forced=args.use_disabled_sites,
max_connections=args.connections,
no_progressbar=args.no_progressbar,
retries=args.retries,
)
notify_about_errors(results, query_notify)
general_results.append((username, id_type, results))
@@ -392,9 +582,13 @@ async def main():
query_notify.warning(f'TXT report for {username} saved in {filename}')
if args.json:
filename = report_filepath_tpl.format(username=username, postfix=f'_{args.json}.json')
filename = report_filepath_tpl.format(
username=username, postfix=f'_{args.json}.json'
)
save_json_report(filename, username, results, report_type=args.json)
query_notify.warning(f'JSON {args.json} report for {username} saved in {filename}')
query_notify.warning(
f'JSON {args.json} report for {username} saved in {filename}'
)
# reporting for all the result
if general_results:
+61 -36
View File
@@ -11,7 +11,7 @@ from .result import QueryStatus
from .utils import get_dict_ascii_tree
class QueryNotify():
class QueryNotify:
"""Query Notify Object.
Base class that describes methods available to notify the results of
@@ -39,7 +39,7 @@ class QueryNotify():
return
def start(self, message=None, id_type='username'):
def start(self, message=None, id_type="username"):
"""Notify Start.
Notify method for start of query. This method will be called before
@@ -116,8 +116,14 @@ class QueryNotifyPrint(QueryNotify):
Query notify class that prints results.
"""
def __init__(self, result=None, verbose=False, print_found_only=False,
skip_check_errors=False, color=True):
def __init__(
self,
result=None,
verbose=False,
print_found_only=False,
skip_check_errors=False,
color=True,
):
"""Create Query Notify Print Object.
Contains information about a specific method of notifying the results
@@ -162,22 +168,29 @@ class QueryNotifyPrint(QueryNotify):
title = f"Checking {id_type}"
if self.color:
print(Style.BRIGHT + Fore.GREEN + "[" +
Fore.YELLOW + "*" +
Fore.GREEN + f"] {title}" +
Fore.WHITE + f" {message}" +
Fore.GREEN + " on:")
print(
Style.BRIGHT
+ Fore.GREEN
+ "["
+ Fore.YELLOW
+ "*"
+ Fore.GREEN
+ f"] {title}"
+ Fore.WHITE
+ f" {message}"
+ Fore.GREEN
+ " on:"
)
else:
print(f"[*] {title} {message} on:")
def warning(self, message, symbol='-'):
msg = f'[{symbol}] {message}'
def warning(self, message, symbol="-"):
msg = f"[{symbol}] {message}"
if self.color:
print(Style.BRIGHT + Fore.YELLOW + msg)
else:
print(msg)
def update(self, result, is_similar=False):
"""Notify Update.
@@ -196,18 +209,20 @@ class QueryNotifyPrint(QueryNotify):
if not self.result.ids_data:
ids_data_text = ""
else:
ids_data_text = get_dict_ascii_tree(self.result.ids_data.items(), ' ')
ids_data_text = get_dict_ascii_tree(self.result.ids_data.items(), " ")
def make_colored_terminal_notify(status, text, status_color, text_color, appendix):
def make_colored_terminal_notify(
status, text, status_color, text_color, appendix
):
text = [
f'{Style.BRIGHT}{Fore.WHITE}[{status_color}{status}{Fore.WHITE}]' +
f'{text_color} {text}: {Style.RESET_ALL}' +
f'{appendix}'
f"{Style.BRIGHT}{Fore.WHITE}[{status_color}{status}{Fore.WHITE}]"
+ f"{text_color} {text}: {Style.RESET_ALL}"
+ f"{appendix}"
]
return ''.join(text)
return "".join(text)
def make_simple_terminal_notify(status, text, appendix):
return f'[{status}] {text}: {appendix}'
return f"[{status}] {text}: {appendix}"
def make_terminal_notify(is_colored=True, *args):
if is_colored:
@@ -220,45 +235,55 @@ class QueryNotifyPrint(QueryNotify):
# Output to the terminal is desired.
if result.status == QueryStatus.CLAIMED:
color = Fore.BLUE if is_similar else Fore.GREEN
status = '?' if is_similar else '+'
status = "?" if is_similar else "+"
notify = make_terminal_notify(
self.color,
status, result.site_name,
color, color,
result.site_url_user + ids_data_text
status,
result.site_name,
color,
color,
result.site_url_user + ids_data_text,
)
elif result.status == QueryStatus.AVAILABLE:
if not self.print_found_only:
notify = make_terminal_notify(
self.color,
'-', result.site_name,
Fore.RED, Fore.YELLOW,
'Not found!' + ids_data_text
"-",
result.site_name,
Fore.RED,
Fore.YELLOW,
"Not found!" + ids_data_text,
)
elif result.status == QueryStatus.UNKNOWN:
if not self.skip_check_errors:
notify = make_terminal_notify(
self.color,
'?', result.site_name,
Fore.RED, Fore.RED,
self.result.context + ids_data_text
"?",
result.site_name,
Fore.RED,
Fore.RED,
str(self.result.error) + ids_data_text,
)
elif result.status == QueryStatus.ILLEGAL:
if not self.print_found_only:
text = 'Illegal Username Format For This Site!'
text = "Illegal Username Format For This Site!"
notify = make_terminal_notify(
self.color,
'-', result.site_name,
Fore.RED, Fore.YELLOW,
text + ids_data_text
"-",
result.site_name,
Fore.RED,
Fore.YELLOW,
text + ids_data_text,
)
else:
# It should be impossible to ever get here...
raise ValueError(f"Unknown Query Status '{str(result.status)}' for "
f"site '{self.result.site_name}'")
raise ValueError(
f"Unknown Query Status '{str(result.status)}' for "
f"site '{self.result.site_name}'"
)
if notify:
sys.stdout.write('\x1b[1K\r')
sys.stdout.write("\x1b[1K\r")
print(notify)
return
+107 -92
View File
@@ -5,6 +5,7 @@ import logging
import os
from argparse import ArgumentTypeError
from datetime import datetime
from typing import Dict, Any
import pycountry
import xmind
@@ -16,83 +17,85 @@ from .result import QueryStatus
from .utils import is_country_tag, CaseConverter, enrich_link_str
SUPPORTED_JSON_REPORT_FORMATS = [
'simple',
'ndjson',
"simple",
"ndjson",
]
'''
"""
UTILS
'''
"""
def filter_supposed_data(data):
### interesting fields
allowed_fields = ['fullname', 'gender', 'location', 'age']
filtered_supposed_data = {CaseConverter.snake_to_title(k): v[0]
for k, v in data.items()
if k in allowed_fields}
# interesting fields
allowed_fields = ["fullname", "gender", "location", "age"]
filtered_supposed_data = {
CaseConverter.snake_to_title(k): v[0]
for k, v in data.items()
if k in allowed_fields
}
return filtered_supposed_data
'''
"""
REPORTS SAVING
'''
"""
def save_csv_report(filename: str, username: str, results: dict):
with open(filename, 'w', newline='', encoding='utf-8') as f:
with open(filename, "w", newline="", encoding="utf-8") as f:
generate_csv_report(username, results, f)
def save_txt_report(filename: str, username: str, results: dict):
with open(filename, 'w', encoding='utf-8') as f:
with open(filename, "w", encoding="utf-8") as f:
generate_txt_report(username, results, f)
def save_html_report(filename: str, context: dict):
template, _ = generate_report_template(is_pdf=False)
filled_template = template.render(**context)
with open(filename, 'w') as f:
with open(filename, "w") as f:
f.write(filled_template)
def save_pdf_report(filename: str, context: dict):
template, css = generate_report_template(is_pdf=True)
filled_template = template.render(**context)
with open(filename, 'w+b') as f:
with open(filename, "w+b") as f:
pisa.pisaDocument(io.StringIO(filled_template), dest=f, default_css=css)
def save_json_report(filename: str, username: str, results: dict, report_type: str):
with open(filename, 'w', encoding='utf-8') as f:
with open(filename, "w", encoding="utf-8") as f:
generate_json_report(username, results, f, report_type=report_type)
'''
"""
REPORTS GENERATING
'''
"""
def generate_report_template(is_pdf: bool):
"""
HTML/PDF template generation
HTML/PDF template generation
"""
def get_resource_content(filename):
return open(os.path.join(maigret_path, 'resources', filename)).read()
return open(os.path.join(maigret_path, "resources", filename)).read()
maigret_path = os.path.dirname(os.path.realpath(__file__))
if is_pdf:
template_content = get_resource_content('simple_report_pdf.tpl')
css_content = get_resource_content('simple_report_pdf.css')
template_content = get_resource_content("simple_report_pdf.tpl")
css_content = get_resource_content("simple_report_pdf.css")
else:
template_content = get_resource_content('simple_report.tpl')
template_content = get_resource_content("simple_report.tpl")
css_content = None
template = Template(template_content)
template.globals['title'] = CaseConverter.snake_to_title
template.globals['detect_link'] = enrich_link_str
template.globals["title"] = CaseConverter.snake_to_title # type: ignore
template.globals["detect_link"] = enrich_link_str # type: ignore
return template, css_content
@@ -100,15 +103,15 @@ def generate_report_context(username_results: list):
brief_text = []
usernames = {}
extended_info_count = 0
tags = {}
supposed_data = {}
tags: Dict[str, int] = {}
supposed_data: Dict[str, Any] = {}
first_seen = None
for username, id_type, results in username_results:
found_accounts = 0
new_ids = []
usernames[username] = {'type': id_type}
usernames[username] = {"type": id_type}
for website_name in results:
dictionary = results[website_name]
@@ -116,19 +119,19 @@ def generate_report_context(username_results: list):
if not dictionary:
continue
if dictionary.get('is_similar'):
if dictionary.get("is_similar"):
continue
status = dictionary.get('status')
status = dictionary.get("status")
if not status: # FIXME: currently in case of timeout
continue
if status.ids_data:
dictionary['ids_data'] = status.ids_data
dictionary["ids_data"] = status.ids_data
extended_info_count += 1
# detect first seen
created_at = status.ids_data.get('created_at')
created_at = status.ids_data.get("created_at")
if created_at:
if first_seen is None:
first_seen = created_at
@@ -138,37 +141,46 @@ def generate_report_context(username_results: list):
new_time = parse_datetime_str(created_at)
if new_time < known_time:
first_seen = created_at
except:
logging.debug('Problems with converting datetime %s/%s', first_seen, created_at)
except Exception as e:
logging.debug(
"Problems with converting datetime %s/%s: %s",
first_seen,
created_at,
str(e),
)
for k, v in status.ids_data.items():
# suppose target data
field = 'fullname' if k == 'name' else k
if not field in supposed_data:
field = "fullname" if k == "name" else k
if field not in supposed_data:
supposed_data[field] = []
supposed_data[field].append(v)
# suppose country
if k in ['country', 'locale']:
if k in ["country", "locale"]:
try:
if is_country_tag(k):
tag = pycountry.countries.get(alpha_2=v).alpha_2.lower()
else:
tag = pycountry.countries.search_fuzzy(v)[0].alpha_2.lower()
tag = pycountry.countries.search_fuzzy(v)[
0
].alpha_2.lower()
# TODO: move countries to another struct
tags[tag] = tags.get(tag, 0) + 1
except Exception as e:
logging.debug('pycountry exception', exc_info=True)
logging.debug(
"Pycountry exception: %s", str(e), exc_info=True
)
new_usernames = dictionary.get('ids_usernames')
new_usernames = dictionary.get("ids_usernames")
if new_usernames:
for u, utype in new_usernames.items():
if not u in usernames:
if u not in usernames:
new_ids.append((u, utype))
usernames[u] = {'type': utype}
usernames[u] = {"type": utype}
if status.status == QueryStatus.CLAIMED:
found_accounts += 1
dictionary['found'] = True
dictionary["found"] = True
else:
continue
@@ -177,22 +189,24 @@ def generate_report_context(username_results: list):
for t in status.tags:
tags[t] = tags.get(t, 0) + 1
brief_text.append(f'Search by {id_type} {username} returned {found_accounts} accounts.')
brief_text.append(
f"Search by {id_type} {username} returned {found_accounts} accounts."
)
if new_ids:
ids_list = []
for u, t in new_ids:
ids_list.append(f'{u} ({t})' if t != 'username' else u)
brief_text.append(f'Found target\'s other IDs: ' + ', '.join(ids_list) + '.')
ids_list.append(f"{u} ({t})" if t != "username" else u)
brief_text.append("Found target's other IDs: " + ", ".join(ids_list) + ".")
brief_text.append(f'Extended info extracted from {extended_info_count} accounts.')
brief_text.append(f"Extended info extracted from {extended_info_count} accounts.")
brief = ' '.join(brief_text).strip()
brief = " ".join(brief_text).strip()
tuple_sort = lambda d: sorted(d, key=lambda x: x[1], reverse=True)
if 'global' in tags:
if "global" in tags:
# remove tag 'global' useless for country detection
del tags['global']
del tags["global"]
first_username = username_results[0][0]
countries_lists = list(filter(lambda x: is_country_tag(x[0]), tags.items()))
@@ -201,35 +215,33 @@ def generate_report_context(username_results: list):
filtered_supposed_data = filter_supposed_data(supposed_data)
return {
'username': first_username,
'brief': brief,
'results': username_results,
'first_seen': first_seen,
'interests_tuple_list': tuple_sort(interests_list),
'countries_tuple_list': tuple_sort(countries_lists),
'supposed_data': filtered_supposed_data,
'generated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"username": first_username,
"brief": brief,
"results": username_results,
"first_seen": first_seen,
"interests_tuple_list": tuple_sort(interests_list),
"countries_tuple_list": tuple_sort(countries_lists),
"supposed_data": filtered_supposed_data,
"generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
def generate_csv_report(username: str, results: dict, csvfile):
writer = csv.writer(csvfile)
writer.writerow(['username',
'name',
'url_main',
'url_user',
'exists',
'http_status'
]
)
writer.writerow(
["username", "name", "url_main", "url_user", "exists", "http_status"]
)
for site in results:
writer.writerow([username,
site,
results[site]['url_main'],
results[site]['url_user'],
str(results[site]['status'].status),
results[site]['http_status'],
])
writer.writerow(
[
username,
site,
results[site]["url_main"],
results[site]["url_user"],
str(results[site]["status"].status),
results[site]["http_status"],
]
)
def generate_txt_report(username: str, results: dict, file):
@@ -242,12 +254,11 @@ def generate_txt_report(username: str, results: dict, file):
if dictionary.get("status").status == QueryStatus.CLAIMED:
exists_counter += 1
file.write(dictionary["url_user"] + "\n")
file.write(f'Total Websites Username Detected On : {exists_counter}')
file.write(f"Total Websites Username Detected On : {exists_counter}")
def generate_json_report(username: str, results: dict, file, report_type):
exists_counter = 0
is_report_per_line = report_type.startswith('ndjson')
is_report_per_line = report_type.startswith("ndjson")
all_json = {}
for sitename in results:
@@ -257,11 +268,14 @@ def generate_json_report(username: str, results: dict, file, report_type):
continue
data = dict(site_result)
data['status'] = data['status'].json()
data["status"] = data["status"].json()
data["site"] = data["site"].json
if "future" in data:
del data["future"]
if is_report_per_line:
data['sitename'] = sitename
file.write(json.dumps(data) + '\n')
data["sitename"] = sitename
file.write(json.dumps(data) + "\n")
else:
all_json[sitename] = data
@@ -269,9 +283,9 @@ def generate_json_report(username: str, results: dict, file, report_type):
file.write(json.dumps(all_json))
'''
"""
XMIND 8 Functions
'''
"""
def save_xmind_report(filename, username, results):
@@ -284,7 +298,6 @@ def save_xmind_report(filename, username, results):
def design_sheet(sheet, username, results):
##all tag list
alltags = {}
supposed_data = {}
@@ -300,7 +313,7 @@ def design_sheet(sheet, username, results):
dictionary = results[website_name]
if dictionary.get("status").status == QueryStatus.CLAIMED:
## firsttime I found that entry
# firsttime I found that entry
for tag in dictionary.get("status").tags:
if tag.strip() == "":
continue
@@ -329,22 +342,22 @@ def design_sheet(sheet, username, results):
# suppose target data
if not isinstance(v, list):
currentsublabel = userlink.addSubTopic()
field = 'fullname' if k == 'name' else k
if not field in supposed_data:
field = "fullname" if k == "name" else k
if field not in supposed_data:
supposed_data[field] = []
supposed_data[field].append(v)
currentsublabel.setTitle("%s: %s" % (k, v))
else:
for currentval in v:
currentsublabel = userlink.addSubTopic()
field = 'fullname' if k == 'name' else k
if not field in supposed_data:
field = "fullname" if k == "name" else k
if field not in supposed_data:
supposed_data[field] = []
supposed_data[field].append(currentval)
currentsublabel.setTitle("%s: %s" % (k, currentval))
### Add Supposed DATA
# add supposed data
filterede_supposed_data = filter_supposed_data(supposed_data)
if (len(filterede_supposed_data) > 0):
if len(filterede_supposed_data) > 0:
undefinedsection = root_topic1.addSubTopic()
undefinedsection.setTitle("SUPPOSED DATA")
for k, v in filterede_supposed_data.items():
@@ -353,7 +366,9 @@ def design_sheet(sheet, username, results):
def check_supported_json_format(value):
if value and not value in SUPPORTED_JSON_REPORT_FORMATS:
raise ArgumentTypeError(f'JSON report type must be one of the following types: '
+ ', '.join(SUPPORTED_JSON_REPORT_FORMATS))
if value and value not in SUPPORTED_JSON_REPORT_FORMATS:
raise ArgumentTypeError(
"JSON report type must be one of the following types: "
+ ", ".join(SUPPORTED_JSON_REPORT_FORMATS)
)
return value
+4927 -3426
View File
File diff suppressed because it is too large Load Diff
+24 -9
View File
@@ -10,6 +10,7 @@ class QueryStatus(Enum):
Describes status of query about a given username.
"""
CLAIMED = "Claimed" # Username Detected
AVAILABLE = "Available" # Username Not Detected
UNKNOWN = "Unknown" # Error Occurred While Trying To Detect Username
@@ -27,14 +28,24 @@ class QueryStatus(Enum):
return self.value
class QueryResult():
class QueryResult:
"""Query Result Object.
Describes result of query about a given username.
"""
def __init__(self, username, site_name, site_url_user, status, ids_data=None,
query_time=None, context=None, tags=[]):
def __init__(
self,
username,
site_name,
site_url_user,
status,
ids_data=None,
query_time=None,
context=None,
error=None,
tags=[],
):
"""Create Query Result Object.
Contains information about a specific method of detecting usernames on
@@ -73,17 +84,21 @@ class QueryResult():
self.context = context
self.ids_data = ids_data
self.tags = tags
self.error = error
def json(self):
return {
'username': self.username,
'site_name': self.site_name,
'url': self.site_url_user,
'status': str(self.status),
'ids': self.ids_data or {},
'tags': self.tags,
"username": self.username,
"site_name": self.site_name,
"url": self.site_url_user,
"status": str(self.status),
"ids": self.ids_data or {},
"tags": self.tags,
}
def is_found(self):
return self.status == QueryStatus.CLAIMED
def __str__(self):
"""Convert Object To String.
+187 -112
View File
@@ -1,8 +1,9 @@
# -*- coding: future_annotations -*-
# ****************************** -*-
"""Maigret Sites Information"""
import copy
import json
import sys
from typing import Optional, List, Dict, Any
import requests
@@ -10,19 +11,56 @@ from .utils import CaseConverter, URLMatcher, is_country_tag
# TODO: move to data.json
SUPPORTED_TAGS = [
'gaming', 'coding', 'photo', 'music', 'blog', 'finance', 'freelance', 'dating',
'tech', 'forum', 'porn', 'erotic', 'webcam', 'video', 'movies', 'hacking', 'art',
'discussion', 'sharing', 'writing', 'wiki', 'business', 'shopping', 'sport',
'books', 'news', 'documents', 'travel', 'maps', 'hobby', 'apps', 'classified',
'career', 'geosocial', 'streaming', 'education', 'networking', 'torrent',
'science', 'medicine',
"gaming",
"coding",
"photo",
"music",
"blog",
"finance",
"freelance",
"dating",
"tech",
"forum",
"porn",
"erotic",
"webcam",
"video",
"movies",
"hacking",
"art",
"discussion",
"sharing",
"writing",
"wiki",
"business",
"shopping",
"sport",
"books",
"news",
"documents",
"travel",
"maps",
"hobby",
"apps",
"classified",
"career",
"geosocial",
"streaming",
"education",
"networking",
"torrent",
"science",
"medicine",
"reading",
"stock",
]
class MaigretEngine:
site: Dict[str, Any] = {}
def __init__(self, name, data):
self.name = name
self.site = {}
self.__dict__.update(data)
@property
@@ -32,43 +70,49 @@ class MaigretEngine:
class MaigretSite:
NOT_SERIALIZABLE_FIELDS = [
'name',
'engineData',
'requestFuture',
'detectedEngine',
'engineObj',
'stats',
'urlRegexp',
"name",
"engineData",
"requestFuture",
"detectedEngine",
"engineObj",
"stats",
"urlRegexp",
]
username_claimed = ""
username_unclaimed = ""
url_subpath = ""
url_main = ""
url = ""
disabled = False
similar_search = False
ignore403 = False
tags: List[str] = []
type = "username"
headers: Dict[str, str] = {}
errors: Dict[str, str] = {}
activation: Dict[str, Any] = {}
regex_check = None
url_probe = None
check_type = ""
request_head_only = ""
get_params: Dict[str, Any] = {}
presense_strs: List[str] = []
absence_strs: List[str] = []
stats: Dict[str, Any] = {}
engine = None
engine_data: Dict[str, Any] = {}
engine_obj: Optional["MaigretEngine"] = None
request_future = None
alexa_rank = None
source = None
def __init__(self, name, information):
self.name = name
self.disabled = False
self.similar_search = False
self.ignore_403 = False
self.tags = []
self.type = 'username'
self.headers = {}
self.errors = {}
self.activation = {}
self.url_subpath = ''
self.regex_check = None
self.url_probe = None
self.check_type = ''
self.request_head_only = ''
self.get_params = {}
self.presense_strs = []
self.absence_strs = []
self.stats = {}
self.engine = None
self.engine_data = {}
self.engine_obj = None
self.request_future = None
self.alexa_rank = None
self.url_subpath = ""
for k, v in information.items():
self.__dict__[CaseConverter.camel_to_snake(k)] = v
@@ -83,22 +127,31 @@ class MaigretSite:
return f"{self.name} ({self.url_main})"
def update_detectors(self):
if 'url' in self.__dict__:
if "url" in self.__dict__:
url = self.url
for group in ['urlMain', 'urlSubpath']:
for group in ["urlMain", "urlSubpath"]:
if group in url:
url = url.replace('{' + group + '}', self.__dict__[CaseConverter.camel_to_snake(group)])
url = url.replace(
"{" + group + "}",
self.__dict__[CaseConverter.camel_to_snake(group)],
)
self.url_regexp = URLMatcher.make_profile_url_regexp(url, self.regex_check)
def detect_username(self, url: str) -> str:
def detect_username(self, url: str) -> Optional[str]:
if self.url_regexp:
match_groups = self.url_regexp.match(url)
if match_groups:
return match_groups.groups()[-1].rstrip('/')
return match_groups.groups()[-1].rstrip("/")
return None
@property
def pretty_name(self):
if self.source:
return f"{self.name} [{self.source}]"
return self.name
@property
def json(self):
result = {}
@@ -106,7 +159,7 @@ class MaigretSite:
# convert to camelCase
field = CaseConverter.snake_to_camel(k)
# strip empty elements
if v in (False, '', [], {}, None, sys.maxsize, 'username'):
if v in (False, "", [], {}, None, sys.maxsize, "username"):
continue
if field in self.NOT_SERIALIZABLE_FIELDS:
continue
@@ -114,13 +167,13 @@ class MaigretSite:
return result
def update(self, updates: dict) -> MaigretSite:
def update(self, updates: "dict") -> "MaigretSite":
self.__dict__.update(updates)
self.update_detectors()
return self
def update_from_engine(self, engine: MaigretEngine) -> MaigretSite:
def update_from_engine(self, engine: MaigretEngine) -> "MaigretSite":
engine_data = engine.site
for k, v in engine_data.items():
field = CaseConverter.camel_to_snake(k)
@@ -138,7 +191,7 @@ class MaigretSite:
return self
def strip_engine_data(self) -> MaigretSite:
def strip_engine_data(self) -> "MaigretSite":
if not self.engine_obj:
return self
@@ -146,7 +199,7 @@ class MaigretSite:
self.url_regexp = None
self_copy = copy.deepcopy(self)
engine_data = self_copy.engine_obj.site
engine_data = self_copy.engine_obj and self_copy.engine_obj.site or {}
site_data_keys = list(self_copy.__dict__.keys())
for k in engine_data.keys():
@@ -183,29 +236,47 @@ class MaigretDatabase:
def sites_dict(self):
return {site.name: site for site in self._sites}
def ranked_sites_dict(self, reverse=False, top=sys.maxsize, tags=[], names=[],
disabled=True, id_type='username'):
def ranked_sites_dict(
self,
reverse=False,
top=sys.maxsize,
tags=[],
names=[],
disabled=True,
id_type="username",
):
"""
Ranking and filtering of the sites list
Ranking and filtering of the sites list
"""
normalized_names = list(map(str.lower, names))
normalized_tags = list(map(str.lower, tags))
is_name_ok = lambda x: x.name.lower() in normalized_names
is_engine_ok = lambda x: isinstance(x.engine, str) and x.engine.lower() in normalized_tags
is_source_ok = lambda x: x.source and x.source.lower() in normalized_names
is_engine_ok = (
lambda x: isinstance(x.engine, str) and x.engine.lower() in normalized_tags
)
is_tags_ok = lambda x: set(x.tags).intersection(set(normalized_tags))
is_disabled_needed = lambda x: not x.disabled or ('disabled' in tags or disabled)
is_disabled_needed = lambda x: not x.disabled or (
"disabled" in tags or disabled
)
is_id_type_ok = lambda x: x.type == id_type
filter_tags_engines_fun = lambda x: not tags or is_engine_ok(x) or is_tags_ok(x)
filter_names_fun = lambda x: not names or is_name_ok(x)
filter_names_fun = lambda x: not names or is_name_ok(x) or is_source_ok(x)
filter_fun = lambda x: filter_tags_engines_fun(x) and filter_names_fun(x) \
and is_disabled_needed(x) and is_id_type_ok(x)
filter_fun = (
lambda x: filter_tags_engines_fun(x)
and filter_names_fun(x)
and is_disabled_needed(x)
and is_id_type_ok(x)
)
filtered_list = [s for s in self.sites if filter_fun(s)]
sorted_list = sorted(filtered_list, key=lambda x: x.alexa_rank, reverse=reverse)[:top]
sorted_list = sorted(
filtered_list, key=lambda x: x.alexa_rank, reverse=reverse
)[:top]
return {site.name: site for site in sorted_list}
@property
@@ -216,7 +287,7 @@ class MaigretDatabase:
def engines_dict(self):
return {engine.name: engine for engine in self._engines}
def update_site(self, site: MaigretSite) -> MaigretDatabase:
def update_site(self, site: MaigretSite) -> "MaigretDatabase":
for s in self._sites:
if s.name == site.name:
s = site
@@ -225,20 +296,20 @@ class MaigretDatabase:
self._sites.append(site)
return self
def save_to_file(self, filename: str) -> MaigretDatabase:
def save_to_file(self, filename: str) -> "MaigretDatabase":
db_data = {
'sites': {site.name: site.strip_engine_data().json for site in self._sites},
'engines': {engine.name: engine.json for engine in self._engines},
"sites": {site.name: site.strip_engine_data().json for site in self._sites},
"engines": {engine.name: engine.json for engine in self._engines},
}
json_data = json.dumps(db_data, indent=4)
with open(filename, 'w') as f:
with open(filename, "w") as f:
f.write(json_data)
return self
def load_from_json(self, json_data: dict) -> MaigretDatabase:
def load_from_json(self, json_data: dict) -> "MaigretDatabase":
# Add all of site information from the json file to internal site list.
site_data = json_data.get("sites", {})
engines_data = json_data.get("engines", {})
@@ -250,30 +321,32 @@ class MaigretDatabase:
try:
maigret_site = MaigretSite(site_name, site_data[site_name])
engine = site_data[site_name].get('engine')
engine = site_data[site_name].get("engine")
if engine:
maigret_site.update_from_engine(self.engines_dict[engine])
self._sites.append(maigret_site)
except KeyError as error:
raise ValueError(f"Problem parsing json content for site {site_name}: "
f"Missing attribute {str(error)}."
)
raise ValueError(
f"Problem parsing json content for site {site_name}: "
f"Missing attribute {str(error)}."
)
return self
def load_from_str(self, db_str: str) -> MaigretDatabase:
def load_from_str(self, db_str: "str") -> "MaigretDatabase":
try:
data = json.loads(db_str)
except Exception as error:
raise ValueError(f"Problem parsing json contents from str"
f"'{db_str[:50]}'...: {str(error)}."
)
raise ValueError(
f"Problem parsing json contents from str"
f"'{db_str[:50]}'...: {str(error)}."
)
return self.load_from_json(data)
def load_from_url(self, url: str) -> MaigretDatabase:
is_url_valid = url.startswith('http://') or url.startswith('https://')
def load_from_url(self, url: str) -> "MaigretDatabase":
is_url_valid = url.startswith("http://") or url.startswith("https://")
if not is_url_valid:
raise FileNotFoundError(f"Invalid data file URL '{url}'.")
@@ -281,38 +354,40 @@ class MaigretDatabase:
try:
response = requests.get(url=url)
except Exception as error:
raise FileNotFoundError(f"Problem while attempting to access "
f"data file URL '{url}': "
f"{str(error)}"
)
raise FileNotFoundError(
f"Problem while attempting to access "
f"data file URL '{url}': "
f"{str(error)}"
)
if response.status_code == 200:
try:
data = response.json()
except Exception as error:
raise ValueError(f"Problem parsing json contents at "
f"'{url}': {str(error)}."
)
raise ValueError(
f"Problem parsing json contents at " f"'{url}': {str(error)}."
)
else:
raise FileNotFoundError(f"Bad response while accessing "
f"data file URL '{url}'."
)
raise FileNotFoundError(
f"Bad response while accessing " f"data file URL '{url}'."
)
return self.load_from_json(data)
def load_from_file(self, filename: str) -> MaigretDatabase:
def load_from_file(self, filename: "str") -> "MaigretDatabase":
try:
with open(filename, 'r', encoding='utf-8') as file:
with open(filename, "r", encoding="utf-8") as file:
try:
data = json.load(file)
except Exception as error:
raise ValueError(f"Problem parsing json contents from "
f"file '{filename}': {str(error)}."
)
raise ValueError(
f"Problem parsing json contents from "
f"file '{filename}': {str(error)}."
)
except FileNotFoundError as error:
raise FileNotFoundError(f"Problem while attempting to access "
f"data file '{filename}'."
)
raise FileNotFoundError(
f"Problem while attempting to access " f"data file '{filename}'."
) from error
return self.load_from_json(data)
@@ -320,8 +395,8 @@ class MaigretDatabase:
sites = sites_dict or self.sites_dict
found_flags = {}
for _, s in sites.items():
if 'presense_flag' in s.stats:
flag = s.stats['presense_flag']
if "presense_flag" in s.stats:
flag = s.stats["presense_flag"]
found_flags[flag] = found_flags.get(flag, 0) + 1
return found_flags
@@ -330,7 +405,7 @@ class MaigretDatabase:
if not sites_dict:
sites_dict = self.sites_dict()
output = ''
output = ""
disabled_count = 0
total_count = len(sites_dict)
urls = {}
@@ -341,18 +416,18 @@ class MaigretDatabase:
disabled_count += 1
url = URLMatcher.extract_main_part(site.url)
if url.startswith('{username}'):
url = 'SUBDOMAIN'
elif url == '':
url = f'{site.url} ({site.engine})'
if url.startswith("{username}"):
url = "SUBDOMAIN"
elif url == "":
url = f"{site.url} ({site.engine})"
else:
parts = url.split('/')
url = '/' + '/'.join(parts[1:])
parts = url.split("/")
url = "/" + "/".join(parts[1:])
urls[url] = urls.get(url, 0) + 1
if not site.tags:
tags['NO_TAGS'] = tags.get('NO_TAGS', 0) + 1
tags["NO_TAGS"] = tags.get("NO_TAGS", 0) + 1
for tag in site.tags:
if is_country_tag(tag):
@@ -360,17 +435,17 @@ class MaigretDatabase:
continue
tags[tag] = tags.get(tag, 0) + 1
output += f'Enabled/total sites: {total_count - disabled_count}/{total_count}\n'
output += 'Top sites\' profile URLs:\n'
output += f"Enabled/total sites: {total_count - disabled_count}/{total_count}\n"
output += "Top sites' profile URLs:\n"
for url, count in sorted(urls.items(), key=lambda x: x[1], reverse=True)[:20]:
if count == 1:
break
output += f'{count}\t{url}\n'
output += 'Top sites\' tags:\n'
output += f"{count}\t{url}\n"
output += "Top sites' tags:\n"
for tag, count in sorted(tags.items(), key=lambda x: x[1], reverse=True):
mark = ''
if not tag in SUPPORTED_TAGS:
mark = ' (non-standard)'
output += f'{count}\t{tag}{mark}\n'
mark = ""
if tag not in SUPPORTED_TAGS:
mark = " (non-standard)"
output += f"{count}\t{tag}{mark}\n"
return output
+198 -96
View File
@@ -1,35 +1,58 @@
import asyncio
import difflib
import re
from typing import List
import requests
from .checking import *
from .activation import import_aiohttp_cookies
from .checking import maigret
from .result import QueryStatus
from .sites import MaigretDatabase, MaigretSite, MaigretEngine
from .utils import get_random_user_agent
DESIRED_STRINGS = ["username", "not found", "пользователь", "profile", "lastname", "firstname", "biography",
"birthday", "репутация", "информация", "e-mail"]
DESIRED_STRINGS = [
"username",
"not found",
"пользователь",
"profile",
"lastname",
"firstname",
"biography",
"birthday",
"репутация",
"информация",
"e-mail",
]
SUPPOSED_USERNAMES = ['alex', 'god', 'admin', 'red', 'blue', 'john']
SUPPOSED_USERNAMES = ["alex", "god", "admin", "red", "blue", "john"]
HEADERS = {
"User-Agent": get_random_user_agent(),
}
RATIO = 0.6
TOP_FEATURES = 5
URL_RE = re.compile(r'https?://(www\.)?')
URL_RE = re.compile(r"https?://(www\.)?")
def get_match_ratio(x):
return round(max([
difflib.SequenceMatcher(a=x.lower(), b=y).ratio()
for y in DESIRED_STRINGS
]), 2)
return round(
max(
[difflib.SequenceMatcher(a=x.lower(), b=y).ratio() for y in DESIRED_STRINGS]
),
2,
)
def extract_mainpage_url(url):
return '/'.join(url.split('/', 3)[:3])
return "/".join(url.split("/", 3)[:3])
async def site_self_check(site, logger, semaphore, db: MaigretDatabase, silent=False):
query_notify = Mock()
changes = {
'disabled': False,
"disabled": False,
}
check_data = [
@@ -37,14 +60,13 @@ async def site_self_check(site, logger, semaphore, db: MaigretDatabase, silent=F
(site.username_unclaimed, QueryStatus.AVAILABLE),
]
logger.info(f'Checking {site.name}...')
logger.info(f"Checking {site.name}...")
for username, status in check_data:
results_dict = await maigret(
username,
{site.name: site},
query_notify,
logger,
username=username,
site_dict={site.name: site},
logger=logger,
timeout=30,
id_type=site.type,
forced=True,
@@ -55,10 +77,10 @@ async def site_self_check(site, logger, semaphore, db: MaigretDatabase, silent=F
# TODO: make normal checking
if site.name not in results_dict:
logger.info(results_dict)
changes['disabled'] = True
changes["disabled"] = True
continue
result = results_dict[site.name]['status']
result = results_dict[site.name]["status"]
site_status = result.status
@@ -67,71 +89,111 @@ async def site_self_check(site, logger, semaphore, db: MaigretDatabase, silent=F
msgs = site.absence_strs
etype = site.check_type
logger.warning(
f'Error while searching {username} in {site.name}: {result.context}, {msgs}, type {etype}')
"Error while searching '%s' in %s: %s, %s, check type %s",
username,
site.name,
result.context,
msgs,
etype,
)
# don't disable in case of available username
if status == QueryStatus.CLAIMED:
changes['disabled'] = True
changes["disabled"] = True
elif status == QueryStatus.CLAIMED:
logger.warning(f'Not found `{username}` in {site.name}, must be claimed')
logger.warning(
f"Not found `{username}` in {site.name}, must be claimed"
)
logger.info(results_dict[site.name])
changes['disabled'] = True
changes["disabled"] = True
else:
logger.warning(f'Found `{username}` in {site.name}, must be available')
logger.warning(f"Found `{username}` in {site.name}, must be available")
logger.info(results_dict[site.name])
changes['disabled'] = True
changes["disabled"] = True
logger.info(f'Site {site.name} checking is finished')
logger.info(f"Site {site.name} checking is finished")
return changes
async def detect_known_engine(db, url_exists, url_mainpage):
def generate_additional_fields_dialog(engine: MaigretEngine, dialog):
fields = {}
if 'urlSubpath' in engine.site.get('url', ''):
msg = (
'Detected engine suppose additional URL subpath using (/forum/, /blog/, etc). '
'Enter in manually if it exists: '
)
subpath = input(msg).strip('/')
if subpath:
fields['urlSubpath'] = f'/{subpath}'
return fields
async def detect_known_engine(
db, url_exists, url_mainpage, logger
) -> List[MaigretSite]:
try:
r = requests.get(url_mainpage)
except Exception as e:
print(e)
print('Some error while checking main page')
return None
logger.warning(e)
print("Some error while checking main page")
return []
for e in db.engines:
strs_to_check = e.__dict__.get('presenseStrs')
for engine in db.engines:
strs_to_check = engine.__dict__.get("presenseStrs")
if strs_to_check and r and r.text:
all_strs_in_response = True
for s in strs_to_check:
if not s in r.text:
if s not in r.text:
all_strs_in_response = False
sites = []
if all_strs_in_response:
engine_name = e.__dict__.get('name')
print(f'Detected engine {engine_name} for site {url_mainpage}')
engine_name = engine.__dict__.get("name")
sites = []
for u in SUPPOSED_USERNAMES:
print(f"Detected engine {engine_name} for site {url_mainpage}")
usernames_to_check = SUPPOSED_USERNAMES
supposed_username = extract_username_dialog(url_exists)
if supposed_username:
usernames_to_check = [supposed_username] + usernames_to_check
add_fields = generate_additional_fields_dialog(engine, url_exists)
for u in usernames_to_check:
site_data = {
'urlMain': url_mainpage,
'name': url_mainpage.split('//')[0],
'engine': engine_name,
'usernameClaimed': u,
'usernameUnclaimed': 'noonewouldeverusethis7',
"urlMain": url_mainpage,
"name": url_mainpage.split("//")[1],
"engine": engine_name,
"usernameClaimed": u,
"usernameUnclaimed": "noonewouldeverusethis7",
**add_fields,
}
logger.info(site_data)
maigret_site = MaigretSite(url_mainpage.split('/')[-1], site_data)
maigret_site = MaigretSite(url_mainpage.split("/")[-1], site_data)
maigret_site.update_from_engine(db.engines_dict[engine_name])
sites.append(maigret_site)
return sites
return None
return []
async def check_features_manually(db, url_exists, url_mainpage, cookie_file):
url_parts = url_exists.split('/')
def extract_username_dialog(url):
url_parts = url.rstrip("/").split("/")
supposed_username = url_parts[-1]
new_name = input(f'Is "{supposed_username}" a valid username? If not, write it manually: ')
if new_name:
supposed_username = new_name
non_exist_username = 'noonewouldeverusethis7'
entered_username = input(
f'Is "{supposed_username}" a valid username? If not, write it manually: '
)
return entered_username if entered_username else supposed_username
url_user = url_exists.replace(supposed_username, '{username}')
async def check_features_manually(
db, url_exists, url_mainpage, cookie_file, logger, redirects=True
):
supposed_username = extract_username_dialog(url_exists)
non_exist_username = "noonewouldeverusethis7"
url_user = url_exists.replace(supposed_username, "{username}")
url_not_exists = url_exists.replace(supposed_username, non_exist_username)
# cookies
@@ -140,8 +202,20 @@ async def check_features_manually(db, url_exists, url_mainpage, cookie_file):
cookie_jar = await import_aiohttp_cookies(cookie_file)
cookie_dict = {c.key: c.value for c in cookie_jar}
a = requests.get(url_exists, cookies=cookie_dict).text
b = requests.get(url_not_exists, cookies=cookie_dict).text
exists_resp = requests.get(
url_exists, cookies=cookie_dict, headers=HEADERS, allow_redirects=redirects
)
logger.debug(exists_resp.status_code)
logger.debug(exists_resp.text)
non_exists_resp = requests.get(
url_not_exists, cookies=cookie_dict, headers=HEADERS, allow_redirects=redirects
)
logger.debug(non_exists_resp.status_code)
logger.debug(non_exists_resp.text)
a = exists_resp.text
b = non_exists_resp.text
tokens_a = set(a.split('"'))
tokens_b = set(b.split('"'))
@@ -149,85 +223,113 @@ async def check_features_manually(db, url_exists, url_mainpage, cookie_file):
a_minus_b = tokens_a.difference(tokens_b)
b_minus_a = tokens_b.difference(tokens_a)
top_features_count = int(input(f'Specify count of features to extract [default {TOP_FEATURES}]: ') or TOP_FEATURES)
if len(a_minus_b) == len(b_minus_a) == 0:
print("The pages for existing and non-existing account are the same!")
presence_list = sorted(a_minus_b, key=get_match_ratio, reverse=True)[:top_features_count]
top_features_count = int(
input(f"Specify count of features to extract [default {TOP_FEATURES}]: ")
or TOP_FEATURES
)
print('Detected text features of existing account: ' + ', '.join(presence_list))
features = input('If features was not detected correctly, write it manually: ')
presence_list = sorted(a_minus_b, key=get_match_ratio, reverse=True)[
:top_features_count
]
print("Detected text features of existing account: " + ", ".join(presence_list))
features = input("If features was not detected correctly, write it manually: ")
if features:
presence_list = features.split(',')
presence_list = features.split(",")
absence_list = sorted(b_minus_a, key=get_match_ratio, reverse=True)[:top_features_count]
print('Detected text features of non-existing account: ' + ', '.join(absence_list))
features = input('If features was not detected correctly, write it manually: ')
absence_list = sorted(b_minus_a, key=get_match_ratio, reverse=True)[
:top_features_count
]
print("Detected text features of non-existing account: " + ", ".join(absence_list))
features = input("If features was not detected correctly, write it manually: ")
if features:
absence_list = features.split(',')
absence_list = features.split(",")
site_data = {
'absenceStrs': absence_list,
'presenseStrs': presence_list,
'url': url_user,
'urlMain': url_mainpage,
'usernameClaimed': supposed_username,
'usernameUnclaimed': non_exist_username,
'checkType': 'message',
"absenceStrs": absence_list,
"presenseStrs": presence_list,
"url": url_user,
"urlMain": url_mainpage,
"usernameClaimed": supposed_username,
"usernameUnclaimed": non_exist_username,
"checkType": "message",
}
site = MaigretSite(url_mainpage.split('/')[-1], site_data)
site = MaigretSite(url_mainpage.split("/")[-1], site_data)
return site
async def submit_dialog(db, url_exists, cookie_file):
domain_raw = URL_RE.sub('', url_exists).strip().strip('/')
domain_raw = domain_raw.split('/')[0]
async def submit_dialog(db, url_exists, cookie_file, logger):
domain_raw = URL_RE.sub("", url_exists).strip().strip("/")
domain_raw = domain_raw.split("/")[0]
# check for existence
matched_sites = list(filter(lambda x: domain_raw in x.url_main + x.url, db.sites))
if matched_sites:
print(f'Sites with domain "{domain_raw}" already exists in the Maigret database!')
status = lambda s: '(disabled)' if s.disabled else ''
url_block = lambda s: f'\n\t{s.url_main}\n\t{s.url}'
print('\n'.join([f'{site.name} {status(site)}{url_block(site)}' for site in matched_sites]))
return False
print(
f'Sites with domain "{domain_raw}" already exists in the Maigret database!'
)
status = lambda s: "(disabled)" if s.disabled else ""
url_block = lambda s: f"\n\t{s.url_main}\n\t{s.url}"
print(
"\n".join(
[
f"{site.name} {status(site)}{url_block(site)}"
for site in matched_sites
]
)
)
if input("Do you want to continue? [yN] ").lower() in "n":
return False
url_mainpage = extract_mainpage_url(url_exists)
sites = await detect_known_engine(db, url_exists, url_mainpage)
sites = await detect_known_engine(db, url_exists, url_mainpage, logger)
if not sites:
print('Unable to detect site engine, lets generate checking features')
sites = [await check_features_manually(db, url_exists, url_mainpage, cookie_file)]
print("Unable to detect site engine, lets generate checking features")
sites = [
await check_features_manually(
db, url_exists, url_mainpage, cookie_file, logger
)
]
print(sites[0].__dict__)
logger.debug(sites[0].__dict__)
sem = asyncio.Semaphore(1)
log_level = logging.INFO
logging.basicConfig(
format='[%(filename)s:%(lineno)d] %(levelname)-3s %(asctime)s %(message)s',
datefmt='%H:%M:%S',
level=log_level
)
logger = logging.getLogger('site-submit')
logger.setLevel(log_level)
found = False
chosen_site = None
for s in sites:
chosen_site = s
result = await site_self_check(s, logger, sem, db)
if not result['disabled']:
if not result["disabled"]:
found = True
break
if not found:
print(f'Sorry, we couldn\'t find params to detect account presence/absence in {chosen_site.name}.')
print('Try to run this mode again and increase features count or choose others.')
print(
f"Sorry, we couldn't find params to detect account presence/absence in {chosen_site.name}."
)
print(
"Try to run this mode again and increase features count or choose others."
)
else:
if input(f'Site {chosen_site.name} successfully checked. Do you want to save it in the Maigret DB? [Yn] ').lower() in 'y':
print(chosen_site.json)
if (
input(
f"Site {chosen_site.name} successfully checked. Do you want to save it in the Maigret DB? [Yn] "
).lower()
in "y"
):
logger.debug(chosen_site.json)
site_data = chosen_site.strip_engine_data()
print(site_data.json)
logger.debug(site_data.json)
db.update_site(site_data)
return True
+11
View File
@@ -0,0 +1,11 @@
from typing import Callable, List, Dict, Tuple, Any
# search query
QueryDraft = Tuple[Callable, List, Dict]
# options dict
QueryOptions = Dict[str, Any]
# TODO: throw out
QueryResultWrapper = Dict[str, Any]
+33 -23
View File
@@ -1,78 +1,88 @@
import re
import random
DEFAULT_USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36",
]
class CaseConverter:
@staticmethod
def camel_to_snake(camelcased_string: str) -> str:
return re.sub(r'(?<!^)(?=[A-Z])', '_', camelcased_string).lower()
return re.sub(r"(?<!^)(?=[A-Z])", "_", camelcased_string).lower()
@staticmethod
def snake_to_camel(snakecased_string: str) -> str:
formatted = ''.join(word.title() for word in snakecased_string.split('_'))
formatted = "".join(word.title() for word in snakecased_string.split("_"))
result = formatted[0].lower() + formatted[1:]
return result
@staticmethod
def snake_to_title(snakecased_string: str) -> str:
words = snakecased_string.split('_')
words = snakecased_string.split("_")
words[0] = words[0].title()
return ' '.join(words)
return " ".join(words)
def is_country_tag(tag: str) -> bool:
"""detect if tag represent a country"""
return bool(re.match("^([a-zA-Z]){2}$", tag)) or tag == 'global'
return bool(re.match("^([a-zA-Z]){2}$", tag)) or tag == "global"
def enrich_link_str(link: str) -> str:
link = link.strip()
if link.startswith('www.') or (link.startswith('http') and '//' in link):
if link.startswith("www.") or (link.startswith("http") and "//" in link):
return f'<a class="auto-link" href="{link}">{link}</a>'
return link
class URLMatcher:
_HTTP_URL_RE_STR = '^https?://(www.)?(.+)$'
_HTTP_URL_RE_STR = "^https?://(www.)?(.+)$"
HTTP_URL_RE = re.compile(_HTTP_URL_RE_STR)
UNSAFE_SYMBOLS = '.?'
UNSAFE_SYMBOLS = ".?"
@classmethod
def extract_main_part(self, url: str) -> str:
match = self.HTTP_URL_RE.search(url)
if match and match.group(2):
return match.group(2).rstrip('/')
return match.group(2).rstrip("/")
return ''
return ""
@classmethod
def make_profile_url_regexp(self, url: str, username_regexp: str = ''):
def make_profile_url_regexp(self, url: str, username_regexp: str = ""):
url_main_part = self.extract_main_part(url)
for c in self.UNSAFE_SYMBOLS:
url_main_part = url_main_part.replace(c, f'\\{c}')
username_regexp = username_regexp or '.+?'
url_main_part = url_main_part.replace(c, f"\\{c}")
username_regexp = username_regexp or ".+?"
url_regexp = url_main_part.replace('{username}', f'({username_regexp})')
regexp_str = self._HTTP_URL_RE_STR.replace('(.+)', url_regexp)
url_regexp = url_main_part.replace("{username}", f"({username_regexp})")
regexp_str = self._HTTP_URL_RE_STR.replace("(.+)", url_regexp)
return re.compile(regexp_str)
def get_dict_ascii_tree(items, prepend='', new_line=True):
text = ''
def get_dict_ascii_tree(items, prepend="", new_line=True):
text = ""
for num, item in enumerate(items):
box_symbol = '┣╸' if num != len(items) - 1 else '┗╸'
box_symbol = "┣╸" if num != len(items) - 1 else "┗╸"
if type(item) == tuple:
field_name, field_value = item
if field_value.startswith('[\''):
if field_value.startswith("['"):
is_last_item = num == len(items) - 1
prepend_symbols = ' ' * 3 if is_last_item else ''
field_value = print_ascii_tree(eval(field_value), prepend_symbols)
text += f'\n{prepend}{box_symbol}{field_name}: {field_value}'
prepend_symbols = " " * 3 if is_last_item else ""
field_value = get_dict_ascii_tree(eval(field_value), prepend_symbols)
text += f"\n{prepend}{box_symbol}{field_name}: {field_value}"
else:
text += f'\n{prepend}{box_symbol} {item}'
text += f"\n{prepend}{box_symbol} {item}"
if not new_line:
text = text[1:]
return text
def get_random_user_agent():
return random.choice(DEFAULT_USER_AGENTS)
+2 -4
View File
@@ -14,21 +14,19 @@ future-annotations==1.0.0
html5lib==1.1
idna==2.10
Jinja2==2.11.3
lxml==4.6.2
lxml==4.6.3
MarkupSafe==1.1.1
mock==4.0.2
multidict==5.1.0
Pillow==8.1.1
pycountry==20.7.3
PyPDF2==1.26.0
PySocks==1.7.1
python-bidi==0.4.2
python-socks==1.1.2
reportlab==3.5.59
requests>=2.24.0
requests-futures==1.0.0
six==1.15.0
socid-extractor>=0.0.15
socid-extractor>=0.0.16
soupsieve==2.1
stem==1.8.0
torrequest==0.1.0
+6
View File
@@ -1,3 +1,9 @@
[egg_info]
tag_build =
tag_date = 0
[flake8]
per-file-ignores = __init__.py:F401
[mypy]
ignore_missing_imports = True
+1 -1
View File
@@ -12,7 +12,7 @@ with open('requirements.txt') as rf:
requires = rf.read().splitlines()
setup(name='maigret',
version='0.1.15',
version='0.2.1',
description='Collect a dossier on a person by username from a huge number of sites',
long_description=long_description,
long_description_content_type="text/markdown",
+1479 -1452
View File
File diff suppressed because it is too large Load Diff
Executable
+2
View File
@@ -0,0 +1,2 @@
#!/bin/sh
pytest tests
+10 -1
View File
@@ -9,6 +9,7 @@ from maigret.sites import MaigretDatabase
CUR_PATH = os.path.dirname(os.path.realpath(__file__))
JSON_FILE = os.path.join(CUR_PATH, '../maigret/resources/data.json')
TEST_JSON_FILE = os.path.join(CUR_PATH, 'db.json')
empty_mark = Mark('', [], {})
@@ -26,7 +27,8 @@ def get_test_reports_filenames():
def remove_test_reports():
reports_list = get_test_reports_filenames()
for f in reports_list: os.remove(f)
for f in reports_list:
os.remove(f)
logging.error(f'Removed test reports {reports_list}')
@@ -37,6 +39,13 @@ def default_db():
return db
@pytest.fixture(scope='function')
def test_db():
db = MaigretDatabase().load_from_file(TEST_JSON_FILE)
return db
@pytest.fixture(autouse=True)
def reports_autoclean():
remove_test_reports()
+26
View File
@@ -0,0 +1,26 @@
{
"engines": {},
"sites": {
"GooglePlayStore": {
"tags": ["global", "us"],
"disabled": false,
"checkType": "status_code",
"alexaRank": 1,
"url": "https://play.google.com/store/apps/developer?id={username}",
"urlMain": "https://play.google.com/store",
"usernameClaimed": "Facebook_nosuchname",
"usernameUnclaimed": "noonewouldeverusethis7"
},
"Reddit": {
"tags": ["news", "social", "us"],
"checkType": "status_code",
"presenseStrs": ["totalKarma"],
"disabled": true,
"alexaRank": 17,
"url": "https://www.reddit.com/user/{username}",
"urlMain": "https://www.reddit.com/",
"usernameClaimed": "blue",
"usernameUnclaimed": "noonewouldeverusethis7"
}
}
}
+3 -2
View File
@@ -44,8 +44,9 @@ async def test_import_aiohttp_cookies():
url = 'https://httpbin.org/cookies'
connector = aiohttp.TCPConnector(ssl=False)
session = aiohttp.ClientSession(connector=connector, trust_env=True,
cookie_jar=cookie_jar)
session = aiohttp.ClientSession(
connector=connector, trust_env=True, cookie_jar=cookie_jar
)
response = await session.get(url=url)
result = json.loads(await response.content.read())
@@ -2,10 +2,16 @@
import pytest
import asyncio
import logging
from maigret.checking import AsyncioSimpleExecutor, AsyncioProgressbarExecutor, AsyncioProgressbarSemaphoreExecutor, AsyncioProgressbarQueueExecutor
from maigret.executors import (
AsyncioSimpleExecutor,
AsyncioProgressbarExecutor,
AsyncioProgressbarSemaphoreExecutor,
AsyncioProgressbarQueueExecutor,
)
logger = logging.getLogger(__name__)
async def func(n):
await asyncio.sleep(0.1 * (n % 3))
return n
@@ -19,6 +25,7 @@ async def test_simple_asyncio_executor():
assert executor.execution_time > 0.2
assert executor.execution_time < 0.3
@pytest.mark.asyncio
async def test_asyncio_progressbar_executor():
tasks = [(func, [n], {}) for n in range(10)]
+123 -96
View File
@@ -4,103 +4,130 @@ import asyncio
import pytest
from mock import Mock
from maigret.maigret import self_check
from maigret.sites import MaigretDatabase
from maigret.maigret import self_check, maigret
from maigret.sites import MaigretSite
from maigret.result import QueryResult, QueryStatus
EXAMPLE_DB = {
'engines': {
},
'sites': {
"GooglePlayStore": {
"tags": [
"global",
"us"
],
"disabled": False,
"checkType": "status_code",
"alexaRank": 1,
"url": "https://play.google.com/store/apps/developer?id={username}",
"urlMain": "https://play.google.com/store",
"usernameClaimed": "Facebook_nosuchname",
"usernameUnclaimed": "noonewouldeverusethis7"
@pytest.mark.slow
def test_self_check_db_positive_disable(test_db):
logger = Mock()
assert test_db.sites[0].disabled is False
loop = asyncio.get_event_loop()
loop.run_until_complete(
self_check(test_db, test_db.sites_dict, logger, silent=True)
)
assert test_db.sites[0].disabled is True
@pytest.mark.slow
def test_self_check_db_positive_enable(test_db):
logger = Mock()
test_db.sites[0].disabled = True
test_db.sites[0].username_claimed = 'Facebook'
assert test_db.sites[0].disabled is True
loop = asyncio.get_event_loop()
loop.run_until_complete(
self_check(test_db, test_db.sites_dict, logger, silent=True)
)
assert test_db.sites[0].disabled is False
@pytest.mark.slow
def test_self_check_db_negative_disabled(test_db):
logger = Mock()
test_db.sites[0].disabled = True
assert test_db.sites[0].disabled is True
loop = asyncio.get_event_loop()
loop.run_until_complete(
self_check(test_db, test_db.sites_dict, logger, silent=True)
)
assert test_db.sites[0].disabled is True
@pytest.mark.slow
def test_self_check_db_negative_enabled(test_db):
logger = Mock()
test_db.sites[0].disabled = False
test_db.sites[0].username_claimed = 'Facebook'
assert test_db.sites[0].disabled is False
loop = asyncio.get_event_loop()
loop.run_until_complete(
self_check(test_db, test_db.sites_dict, logger, silent=True)
)
assert test_db.sites[0].disabled is False
@pytest.mark.slow
def test_maigret_results(test_db):
logger = Mock()
username = 'Facebook'
loop = asyncio.get_event_loop()
results = loop.run_until_complete(
maigret(username, site_dict=test_db.sites_dict, logger=logger, timeout=30)
)
assert isinstance(results, dict)
reddit_site = results['Reddit']['site']
assert isinstance(reddit_site, MaigretSite)
assert reddit_site.json == {
'tags': ['news', 'social', 'us'],
'checkType': 'status_code',
'presenseStrs': ['totalKarma'],
'disabled': True,
'alexaRank': 17,
'url': 'https://www.reddit.com/user/{username}',
'urlMain': 'https://www.reddit.com/',
'usernameClaimed': 'blue',
'usernameUnclaimed': 'noonewouldeverusethis7',
}
del results['Reddit']['site']
del results['GooglePlayStore']['site']
reddit_status = results['Reddit']['status']
assert isinstance(reddit_status, QueryResult)
assert reddit_status.status == QueryStatus.ILLEGAL
playstore_status = results['GooglePlayStore']['status']
assert isinstance(playstore_status, QueryResult)
assert playstore_status.status == QueryStatus.CLAIMED
del results['Reddit']['status']
del results['GooglePlayStore']['status']
assert results['Reddit'].get('future') is None
del results['GooglePlayStore']['future']
assert results == {
'Reddit': {
'cookies': None,
'parsing_enabled': False,
'url_main': 'https://www.reddit.com/',
'username': 'Facebook',
},
"Reddit": {
"tags": [
"news",
"social",
"us"
],
"checkType": "status_code",
"presenseStrs": [
"totalKarma"
],
"disabled": True,
"alexaRank": 17,
"url": "https://www.reddit.com/user/{username}",
"urlMain": "https://www.reddit.com/",
"usernameClaimed": "blue",
"usernameUnclaimed": "noonewouldeverusethis7"
'GooglePlayStore': {
'cookies': None,
'http_status': 200,
'is_similar': False,
'parsing_enabled': False,
'rank': 1,
'url_main': 'https://play.google.com/store',
'url_user': 'https://play.google.com/store/apps/developer?id=Facebook',
'username': 'Facebook',
},
}
}
@pytest.mark.slow
def test_self_check_db_positive_disable():
logger = Mock()
db = MaigretDatabase()
db.load_from_json(EXAMPLE_DB)
assert db.sites[0].disabled == False
loop = asyncio.get_event_loop()
loop.run_until_complete(self_check(db, db.sites_dict, logger, silent=True))
assert db.sites[0].disabled == True
@pytest.mark.slow
def test_self_check_db_positive_enable():
logger = Mock()
db = MaigretDatabase()
db.load_from_json(EXAMPLE_DB)
db.sites[0].disabled = True
db.sites[0].username_claimed = 'Facebook'
assert db.sites[0].disabled == True
loop = asyncio.get_event_loop()
loop.run_until_complete(self_check(db, db.sites_dict, logger, silent=True))
assert db.sites[0].disabled == False
@pytest.mark.slow
def test_self_check_db_negative_disabled():
logger = Mock()
db = MaigretDatabase()
db.load_from_json(EXAMPLE_DB)
db.sites[0].disabled = True
assert db.sites[0].disabled == True
loop = asyncio.get_event_loop()
loop.run_until_complete(self_check(db, db.sites_dict, logger, silent=True))
assert db.sites[0].disabled == True
@pytest.mark.slow
def test_self_check_db_negative_enabled():
logger = Mock()
db = MaigretDatabase()
db.load_from_json(EXAMPLE_DB)
db.sites[0].disabled = False
db.sites[0].username_claimed = 'Facebook'
assert db.sites[0].disabled == False
loop = asyncio.get_event_loop()
loop.run_until_complete(self_check(db, db.sites_dict, logger, silent=True))
assert db.sites[0].disabled == False
+208 -70
View File
@@ -7,9 +7,22 @@ from io import StringIO
import xmind
from jinja2 import Template
from maigret.report import generate_csv_report, generate_txt_report, save_xmind_report, save_html_report, \
save_pdf_report, generate_report_template, generate_report_context, generate_json_report
from maigret.report import (
generate_csv_report,
generate_txt_report,
save_xmind_report,
save_html_report,
save_pdf_report,
generate_report_template,
generate_report_context,
generate_json_report,
)
from maigret.result import QueryResult, QueryStatus
from maigret.sites import MaigretSite
GOOD_RESULT = QueryResult('', '', '', QueryStatus.CLAIMED)
BAD_RESULT = QueryResult('', '', '', QueryStatus.AVAILABLE)
EXAMPLE_RESULTS = {
'GitHub': {
@@ -17,90 +30,212 @@ EXAMPLE_RESULTS = {
'parsing_enabled': True,
'url_main': 'https://www.github.com/',
'url_user': 'https://www.github.com/test',
'status': QueryResult('test',
'GitHub',
'https://www.github.com/test',
QueryStatus.CLAIMED,
tags=['test_tag']),
'status': QueryResult(
'test',
'GitHub',
'https://www.github.com/test',
QueryStatus.CLAIMED,
tags=['test_tag'],
),
'http_status': 200,
'is_similar': False,
'rank': 78
'rank': 78,
'site': MaigretSite('test', {}),
}
}
GOOD_RESULT = QueryResult('', '', '', QueryStatus.CLAIMED)
BAD_RESULT = QueryResult('', '', '', QueryStatus.AVAILABLE)
GOOD_500PX_RESULT = copy.deepcopy(GOOD_RESULT)
GOOD_500PX_RESULT.tags = ['photo', 'us', 'global']
GOOD_500PX_RESULT.ids_data = {"uid": "dXJpOm5vZGU6VXNlcjoyNjQwMzQxNQ==", "legacy_id": "26403415",
"username": "alexaimephotographycars", "name": "Alex Aim\u00e9",
"website": "www.flickr.com/photos/alexaimephotography/",
"facebook_link": " www.instagram.com/street.reality.photography/",
"instagram_username": "alexaimephotography", "twitter_username": "Alexaimephotogr"}
GOOD_500PX_RESULT.ids_data = {
"uid": "dXJpOm5vZGU6VXNlcjoyNjQwMzQxNQ==",
"legacy_id": "26403415",
"username": "alexaimephotographycars",
"name": "Alex Aim\u00e9",
"website": "www.flickr.com/photos/alexaimephotography/",
"facebook_link": " www.instagram.com/street.reality.photography/",
"instagram_username": "alexaimephotography",
"twitter_username": "Alexaimephotogr",
}
GOOD_REDDIT_RESULT = copy.deepcopy(GOOD_RESULT)
GOOD_REDDIT_RESULT.tags = ['news', 'us']
GOOD_REDDIT_RESULT.ids_data = {"reddit_id": "t5_1nytpy", "reddit_username": "alexaimephotography",
"fullname": "alexaimephotography",
"image": "https://styles.redditmedia.com/t5_1nytpy/styles/profileIcon_7vmhdwzd3g931.jpg?width=256&height=256&crop=256:256,smart&frame=1&s=4f355f16b4920844a3f4eacd4237a7bf76b2e97e",
"is_employee": "False", "is_nsfw": "False", "is_mod": "True", "is_following": "True",
"has_user_profile": "True", "hide_from_robots": "False",
"created_at": "2019-07-10 12:20:03", "total_karma": "53959", "post_karma": "52738"}
GOOD_REDDIT_RESULT.ids_data = {
"reddit_id": "t5_1nytpy",
"reddit_username": "alexaimephotography",
"fullname": "alexaimephotography",
"image": "https://styles.redditmedia.com/t5_1nytpy/styles/profileIcon_7vmhdwzd3g931.jpg?width=256&height=256&crop=256:256,smart&frame=1&s=4f355f16b4920844a3f4eacd4237a7bf76b2e97e",
"is_employee": "False",
"is_nsfw": "False",
"is_mod": "True",
"is_following": "True",
"has_user_profile": "True",
"hide_from_robots": "False",
"created_at": "2019-07-10 12:20:03",
"total_karma": "53959",
"post_karma": "52738",
}
GOOD_IG_RESULT = copy.deepcopy(GOOD_RESULT)
GOOD_IG_RESULT.tags = ['photo', 'global']
GOOD_IG_RESULT.ids_data = {"instagram_username": "alexaimephotography", "fullname": "Alexaimephotography",
"id": "6828488620",
"image": "https://scontent-hel3-1.cdninstagram.com/v/t51.2885-19/s320x320/95420076_1169632876707608_8741505804647006208_n.jpg?_nc_ht=scontent-hel3-1.cdninstagram.com&_nc_ohc=jd87OUGsX4MAX_Ym5GX&tp=1&oh=0f42badd68307ba97ec7fb1ef7b4bfd4&oe=601E5E6F",
"bio": "Photographer \nChild of fine street arts",
"external_url": "https://www.flickr.com/photos/alexaimephotography2020/"}
GOOD_IG_RESULT.ids_data = {
"instagram_username": "alexaimephotography",
"fullname": "Alexaimephotography",
"id": "6828488620",
"image": "https://scontent-hel3-1.cdninstagram.com/v/t51.2885-19/s320x320/95420076_1169632876707608_8741505804647006208_n.jpg?_nc_ht=scontent-hel3-1.cdninstagram.com&_nc_ohc=jd87OUGsX4MAX_Ym5GX&tp=1&oh=0f42badd68307ba97ec7fb1ef7b4bfd4&oe=601E5E6F",
"bio": "Photographer \nChild of fine street arts",
"external_url": "https://www.flickr.com/photos/alexaimephotography2020/",
}
GOOD_TWITTER_RESULT = copy.deepcopy(GOOD_RESULT)
GOOD_TWITTER_RESULT.tags = ['social', 'us']
TEST = [('alexaimephotographycars', 'username', {
'500px': {'username': 'alexaimephotographycars', 'parsing_enabled': True, 'url_main': 'https://500px.com/',
'url_user': 'https://500px.com/p/alexaimephotographycars',
'ids_usernames': {'alexaimephotographycars': 'username', 'alexaimephotography': 'username',
'Alexaimephotogr': 'username'}, 'status': GOOD_500PX_RESULT, 'http_status': 200,
'is_similar': False, 'rank': 2981},
'Reddit': {'username': 'alexaimephotographycars', 'parsing_enabled': True, 'url_main': 'https://www.reddit.com/',
'url_user': 'https://www.reddit.com/user/alexaimephotographycars', 'status': BAD_RESULT,
'http_status': 404, 'is_similar': False, 'rank': 17},
'Twitter': {'username': 'alexaimephotographycars', 'parsing_enabled': True, 'url_main': 'https://www.twitter.com/',
'url_user': 'https://twitter.com/alexaimephotographycars', 'status': BAD_RESULT, 'http_status': 400,
'is_similar': False, 'rank': 55},
'Instagram': {'username': 'alexaimephotographycars', 'parsing_enabled': True,
'url_main': 'https://www.instagram.com/',
'url_user': 'https://www.instagram.com/alexaimephotographycars', 'status': BAD_RESULT,
'http_status': 404, 'is_similar': False, 'rank': 29}}), ('alexaimephotography', 'username', {
'500px': {'username': 'alexaimephotography', 'parsing_enabled': True, 'url_main': 'https://500px.com/',
'url_user': 'https://500px.com/p/alexaimephotography', 'status': BAD_RESULT, 'http_status': 200,
'is_similar': False, 'rank': 2981},
'Reddit': {'username': 'alexaimephotography', 'parsing_enabled': True, 'url_main': 'https://www.reddit.com/',
'url_user': 'https://www.reddit.com/user/alexaimephotography',
'ids_usernames': {'alexaimephotography': 'username'}, 'status': GOOD_REDDIT_RESULT, 'http_status': 200,
'is_similar': False, 'rank': 17},
'Twitter': {'username': 'alexaimephotography', 'parsing_enabled': True, 'url_main': 'https://www.twitter.com/',
'url_user': 'https://twitter.com/alexaimephotography', 'status': BAD_RESULT, 'http_status': 400,
'is_similar': False, 'rank': 55},
'Instagram': {'username': 'alexaimephotography', 'parsing_enabled': True, 'url_main': 'https://www.instagram.com/',
'url_user': 'https://www.instagram.com/alexaimephotography',
'ids_usernames': {'alexaimephotography': 'username'}, 'status': GOOD_IG_RESULT, 'http_status': 200,
'is_similar': False, 'rank': 29}}), ('Alexaimephotogr', 'username', {
'500px': {'username': 'Alexaimephotogr', 'parsing_enabled': True, 'url_main': 'https://500px.com/',
'url_user': 'https://500px.com/p/Alexaimephotogr', 'status': BAD_RESULT, 'http_status': 200,
'is_similar': False, 'rank': 2981},
'Reddit': {'username': 'Alexaimephotogr', 'parsing_enabled': True, 'url_main': 'https://www.reddit.com/',
'url_user': 'https://www.reddit.com/user/Alexaimephotogr', 'status': BAD_RESULT, 'http_status': 404,
'is_similar': False, 'rank': 17},
'Twitter': {'username': 'Alexaimephotogr', 'parsing_enabled': True, 'url_main': 'https://www.twitter.com/',
'url_user': 'https://twitter.com/Alexaimephotogr', 'status': GOOD_TWITTER_RESULT, 'http_status': 400,
'is_similar': False, 'rank': 55},
'Instagram': {'username': 'Alexaimephotogr', 'parsing_enabled': True, 'url_main': 'https://www.instagram.com/',
'url_user': 'https://www.instagram.com/Alexaimephotogr', 'status': BAD_RESULT, 'http_status': 404,
'is_similar': False, 'rank': 29}})]
TEST = [
(
'alexaimephotographycars',
'username',
{
'500px': {
'username': 'alexaimephotographycars',
'parsing_enabled': True,
'url_main': 'https://500px.com/',
'url_user': 'https://500px.com/p/alexaimephotographycars',
'ids_usernames': {
'alexaimephotographycars': 'username',
'alexaimephotography': 'username',
'Alexaimephotogr': 'username',
},
'status': GOOD_500PX_RESULT,
'http_status': 200,
'is_similar': False,
'rank': 2981,
},
'Reddit': {
'username': 'alexaimephotographycars',
'parsing_enabled': True,
'url_main': 'https://www.reddit.com/',
'url_user': 'https://www.reddit.com/user/alexaimephotographycars',
'status': BAD_RESULT,
'http_status': 404,
'is_similar': False,
'rank': 17,
},
'Twitter': {
'username': 'alexaimephotographycars',
'parsing_enabled': True,
'url_main': 'https://www.twitter.com/',
'url_user': 'https://twitter.com/alexaimephotographycars',
'status': BAD_RESULT,
'http_status': 400,
'is_similar': False,
'rank': 55,
},
'Instagram': {
'username': 'alexaimephotographycars',
'parsing_enabled': True,
'url_main': 'https://www.instagram.com/',
'url_user': 'https://www.instagram.com/alexaimephotographycars',
'status': BAD_RESULT,
'http_status': 404,
'is_similar': False,
'rank': 29,
},
},
),
(
'alexaimephotography',
'username',
{
'500px': {
'username': 'alexaimephotography',
'parsing_enabled': True,
'url_main': 'https://500px.com/',
'url_user': 'https://500px.com/p/alexaimephotography',
'status': BAD_RESULT,
'http_status': 200,
'is_similar': False,
'rank': 2981,
},
'Reddit': {
'username': 'alexaimephotography',
'parsing_enabled': True,
'url_main': 'https://www.reddit.com/',
'url_user': 'https://www.reddit.com/user/alexaimephotography',
'ids_usernames': {'alexaimephotography': 'username'},
'status': GOOD_REDDIT_RESULT,
'http_status': 200,
'is_similar': False,
'rank': 17,
},
'Twitter': {
'username': 'alexaimephotography',
'parsing_enabled': True,
'url_main': 'https://www.twitter.com/',
'url_user': 'https://twitter.com/alexaimephotography',
'status': BAD_RESULT,
'http_status': 400,
'is_similar': False,
'rank': 55,
},
'Instagram': {
'username': 'alexaimephotography',
'parsing_enabled': True,
'url_main': 'https://www.instagram.com/',
'url_user': 'https://www.instagram.com/alexaimephotography',
'ids_usernames': {'alexaimephotography': 'username'},
'status': GOOD_IG_RESULT,
'http_status': 200,
'is_similar': False,
'rank': 29,
},
},
),
(
'Alexaimephotogr',
'username',
{
'500px': {
'username': 'Alexaimephotogr',
'parsing_enabled': True,
'url_main': 'https://500px.com/',
'url_user': 'https://500px.com/p/Alexaimephotogr',
'status': BAD_RESULT,
'http_status': 200,
'is_similar': False,
'rank': 2981,
},
'Reddit': {
'username': 'Alexaimephotogr',
'parsing_enabled': True,
'url_main': 'https://www.reddit.com/',
'url_user': 'https://www.reddit.com/user/Alexaimephotogr',
'status': BAD_RESULT,
'http_status': 404,
'is_similar': False,
'rank': 17,
},
'Twitter': {
'username': 'Alexaimephotogr',
'parsing_enabled': True,
'url_main': 'https://www.twitter.com/',
'url_user': 'https://twitter.com/Alexaimephotogr',
'status': GOOD_TWITTER_RESULT,
'http_status': 400,
'is_similar': False,
'rank': 55,
},
'Instagram': {
'username': 'Alexaimephotogr',
'parsing_enabled': True,
'url_main': 'https://www.instagram.com/',
'url_user': 'https://www.instagram.com/Alexaimephotogr',
'status': BAD_RESULT,
'http_status': 404,
'is_similar': False,
'rank': 29,
},
},
),
]
SUPPOSED_BRIEF = """Search by username alexaimephotographycars returned 1 accounts. Found target's other IDs: alexaimephotography, Alexaimephotogr. Search by username alexaimephotography returned 2 accounts. Search by username Alexaimephotogr returned 1 accounts. Extended info extracted from 3 accounts."""
@@ -187,7 +322,10 @@ def test_save_xmind_report():
assert data['topic']['topics'][0]['title'] == 'Undefined'
assert data['topic']['topics'][1]['title'] == 'test_tag'
assert len(data['topic']['topics'][1]['topics']) == 1
assert data['topic']['topics'][1]['topics'][0]['label'] == 'https://www.github.com/test'
assert (
data['topic']['topics'][1]['topics'][0]['label']
== 'https://www.github.com/test'
)
def test_html_report():
+14 -12
View File
@@ -10,25 +10,21 @@ EXAMPLE_DB = {
"The specified member cannot be found. Please enter a member's entire name.",
],
"checkType": "message",
"errors": {
"You must be logged-in to do that.": "Login required"
},
"url": "{urlMain}{urlSubpath}/members/?username={username}"
}
"errors": {"You must be logged-in to do that.": "Login required"},
"url": "{urlMain}{urlSubpath}/members/?username={username}",
},
},
},
'sites': {
"Amperka": {
"engine": "XenForo",
"rank": 121613,
"tags": [
"ru"
],
"tags": ["ru"],
"urlMain": "http://forum.amperka.ru",
"usernameClaimed": "adam",
"usernameUnclaimed": "noonewouldeverusethis7"
"usernameUnclaimed": "noonewouldeverusethis7",
},
}
},
}
@@ -116,8 +112,14 @@ def test_site_url_detector():
db = MaigretDatabase()
db.load_from_json(EXAMPLE_DB)
assert db.sites[0].url_regexp.pattern == r'^https?://(www.)?forum\.amperka\.ru/members/\?username=(.+?)$'
assert db.sites[0].detect_username('http://forum.amperka.ru/members/?username=test') == 'test'
assert (
db.sites[0].url_regexp.pattern
== r'^https?://(www.)?forum\.amperka\.ru/members/\?username=(.+?)$'
)
assert (
db.sites[0].detect_username('http://forum.amperka.ru/members/?username=test')
== 'test'
)
def test_ranked_sites_dict():
+39 -6
View File
@@ -2,7 +2,13 @@
import itertools
import re
from maigret.utils import CaseConverter, is_country_tag, enrich_link_str, URLMatcher, get_dict_ascii_tree
from maigret.utils import (
CaseConverter,
is_country_tag,
enrich_link_str,
URLMatcher,
get_dict_ascii_tree,
)
def test_case_convert_camel_to_snake():
@@ -26,6 +32,13 @@ def test_case_convert_snake_to_title():
assert b == 'Camel cased string'
def test_case_convert_camel_with_digits_to_snake():
a = 'ignore403'
b = CaseConverter.camel_to_snake(a)
assert b == 'ignore403'
def test_is_country_tag():
assert is_country_tag('ru') == True
assert is_country_tag('FR') == True
@@ -38,8 +51,10 @@ def test_is_country_tag():
def test_enrich_link_str():
assert enrich_link_str('test') == 'test'
assert enrich_link_str(
' www.flickr.com/photos/alexaimephotography/') == '<a class="auto-link" href="www.flickr.com/photos/alexaimephotography/">www.flickr.com/photos/alexaimephotography/</a>'
assert (
enrich_link_str(' www.flickr.com/photos/alexaimephotography/')
== '<a class="auto-link" href="www.flickr.com/photos/alexaimephotography/">www.flickr.com/photos/alexaimephotography/</a>'
)
def test_url_extract_main_part():
@@ -71,15 +86,32 @@ def test_url_make_profile_url_regexp():
for url_parts in itertools.product(*parts):
url = ''.join(url_parts)
assert URLMatcher.make_profile_url_regexp(url).pattern == r'^https?://(www.)?flickr\.com/photos/(.+?)$'
assert (
URLMatcher.make_profile_url_regexp(url).pattern
== r'^https?://(www.)?flickr\.com/photos/(.+?)$'
)
def test_get_dict_ascii_tree():
data = {'uid': 'dXJpOm5vZGU6VXNlcjoyNjQwMzQxNQ==', 'legacy_id': '26403415', 'username': 'alexaimephotographycars', 'name': 'Alex Aimé', 'created_at': '2018-05-04T10:17:01.000+0000', 'image': 'https://drscdn.500px.org/user_avatar/26403415/q%3D85_w%3D300_h%3D300/v2?webp=true&v=2&sig=0235678a4f7b65e007e864033ebfaf5ef6d87fad34f80a8639d985320c20fe3b', 'image_bg': 'https://drscdn.500px.org/user_cover/26403415/q%3D65_m%3D2048/v2?webp=true&v=1&sig=bea411fb158391a4fdad498874ff17088f91257e59dfb376ff67e3a44c3a4201', 'website': 'www.instagram.com/street.reality.photography/', 'facebook_link': ' www.instagram.com/street.reality.photography/', 'instagram_username': 'Street.Reality.Photography', 'twitter_username': 'Alexaimephotogr'}
data = {
'uid': 'dXJpOm5vZGU6VXNlcjoyNjQwMzQxNQ==',
'legacy_id': '26403415',
'username': 'alexaimephotographycars',
'name': 'Alex Aimé',
'created_at': '2018-05-04T10:17:01.000+0000',
'image': 'https://drscdn.500px.org/user_avatar/26403415/q%3D85_w%3D300_h%3D300/v2?webp=true&v=2&sig=0235678a4f7b65e007e864033ebfaf5ef6d87fad34f80a8639d985320c20fe3b',
'image_bg': 'https://drscdn.500px.org/user_cover/26403415/q%3D65_m%3D2048/v2?webp=true&v=1&sig=bea411fb158391a4fdad498874ff17088f91257e59dfb376ff67e3a44c3a4201',
'website': 'www.instagram.com/street.reality.photography/',
'facebook_link': ' www.instagram.com/street.reality.photography/',
'instagram_username': 'Street.Reality.Photography',
'twitter_username': 'Alexaimephotogr',
}
ascii_tree = get_dict_ascii_tree(data.items())
assert ascii_tree == """
assert (
ascii_tree
== """
uid: dXJpOm5vZGU6VXNlcjoyNjQwMzQxNQ==
legacy_id: 26403415
username: alexaimephotographycars
@@ -91,3 +123,4 @@ def test_get_dict_ascii_tree():
facebook_link: www.instagram.com/street.reality.photography/
instagram_username: Street.Reality.Photography
twitter_username: Alexaimephotogr"""
)
Executable
+71
View File
@@ -0,0 +1,71 @@
#!/usr/bin/env python3
import asyncio
import logging
import maigret
# top popular sites from the Maigret database
TOP_SITES_COUNT = 300
# Maigret HTTP requests timeout
TIMEOUT = 10
# max parallel requests
MAX_CONNECTIONS = 50
if __name__ == '__main__':
# setup logging and asyncio
logger = logging.getLogger('maigret')
logger.setLevel(logging.WARNING)
loop = asyncio.get_event_loop()
# setup Maigret
db = maigret.MaigretDatabase().load_from_file('./maigret/resources/data.json')
# also can be downloaded from web
# db = MaigretDatabase().load_from_url(MAIGRET_DB_URL)
# user input
username = input('Enter username to search: ')
sites_count_raw = input(
f'Select the number of sites to search ({TOP_SITES_COUNT} for default, {len(db.sites_dict)} max): '
)
sites_count = int(sites_count_raw) or TOP_SITES_COUNT
sites = db.ranked_sites_dict(top=sites_count)
show_progressbar_raw = input('Do you want to show a progressbar? [Yn] ')
show_progressbar = show_progressbar_raw.lower() != 'n'
extract_info_raw = input(
'Do you want to extract additional info from accounts\' pages? [Yn] '
)
extract_info = extract_info_raw.lower() != 'n'
use_notifier_raw = input(
'Do you want to use notifier for displaying results while searching? [Yn] '
)
use_notifier = use_notifier_raw.lower() != 'n'
notifier = None
if use_notifier:
notifier = maigret.Notifier(print_found_only=True, skip_check_errors=True)
# search!
search_func = maigret.search(
username=username,
site_dict=sites,
timeout=TIMEOUT,
logger=logger,
max_connections=MAX_CONNECTIONS,
query_notify=notifier,
no_progressbar=(not show_progressbar),
is_parsing_enabled=extract_info,
)
results = loop.run_until_complete(search_func)
input('Search completed. Press any key to show results.')
for sitename, data in results.items():
is_found = data['status'].is_found()
print(f'{sitename} - {"Found!" if is_found else "Not found"}')