mirror of
https://github.com/soxoj/maigret.git
synced 2026-05-07 06:24:35 +00:00
Merge pull request #127 from soxoj/extraction-notify-tests
Improve extracting ids from URLs, tests
This commit is contained in:
+1
-4
@@ -22,9 +22,6 @@ src/
|
||||
# Comma-Separated Values (CSV) Reports
|
||||
*.csv
|
||||
|
||||
# Excluded sites list
|
||||
tests/.excluded_sites
|
||||
|
||||
# MacOS Folder Metadata File
|
||||
.DS_Store
|
||||
/reports/
|
||||
@@ -33,4 +30,4 @@ tests/.excluded_sites
|
||||
.coverage
|
||||
dist/
|
||||
htmlcov/
|
||||
test_*
|
||||
/test_*
|
||||
+29
-13
@@ -60,6 +60,17 @@ def notify_about_errors(search_results: QueryResultWrapper, query_notify):
|
||||
)
|
||||
|
||||
|
||||
def extract_ids_from_url(url: str, db: MaigretDatabase) -> dict:
|
||||
results = {}
|
||||
for s in db.sites:
|
||||
result = s.extract_id_from_url(url)
|
||||
if not result:
|
||||
continue
|
||||
_id, _type = result
|
||||
results[_id] = _type
|
||||
return results
|
||||
|
||||
|
||||
def extract_ids_from_page(url, logger, timeout=5) -> dict:
|
||||
results = {}
|
||||
# url, headers
|
||||
@@ -105,10 +116,8 @@ def extract_ids_from_results(results: QueryResultWrapper, db: MaigretDatabase) -
|
||||
ids_results[u] = utype
|
||||
|
||||
for url in dictionary.get('ids_links', []):
|
||||
for s in db.sites:
|
||||
u = s.detect_username(url)
|
||||
if u:
|
||||
ids_results[u] = 'username'
|
||||
ids_results.update(extract_ids_from_url(url, db))
|
||||
|
||||
return ids_results
|
||||
|
||||
|
||||
@@ -129,10 +138,9 @@ def setup_arguments_parser():
|
||||
)
|
||||
parser.add_argument(
|
||||
"username",
|
||||
nargs='?',
|
||||
nargs='*',
|
||||
metavar="USERNAMES",
|
||||
action="append",
|
||||
help="One or more usernames to check with social networks.",
|
||||
help="One or more usernames to search by.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--version",
|
||||
@@ -231,7 +239,9 @@ def setup_arguments_parser():
|
||||
help="Make requests over a proxy. e.g. socks5://127.0.0.1:1080",
|
||||
)
|
||||
|
||||
filter_group = parser.add_argument_group('Site filtering', 'Options to set site search scope')
|
||||
filter_group = parser.add_argument_group(
|
||||
'Site filtering', 'Options to set site search scope'
|
||||
)
|
||||
filter_group.add_argument(
|
||||
"-a",
|
||||
"--all-sites",
|
||||
@@ -269,7 +279,7 @@ def setup_arguments_parser():
|
||||
modes_group = parser.add_argument_group(
|
||||
'Operating modes',
|
||||
'Various functions except the default search by a username. '
|
||||
'Modes are executed sequentially in the order of declaration.'
|
||||
'Modes are executed sequentially in the order of declaration.',
|
||||
)
|
||||
modes_group.add_argument(
|
||||
"--parse",
|
||||
@@ -296,10 +306,12 @@ def setup_arguments_parser():
|
||||
"--stats",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Show database statistics (most frequent sites engines and tags)."
|
||||
help="Show database statistics (most frequent sites engines and tags).",
|
||||
)
|
||||
|
||||
output_group = parser.add_argument_group('Output options', 'Options to change verbosity and view of the console output')
|
||||
output_group = parser.add_argument_group(
|
||||
'Output options', 'Options to change verbosity and view of the console output'
|
||||
)
|
||||
output_group.add_argument(
|
||||
"--print-not-found",
|
||||
action="store_true",
|
||||
@@ -354,7 +366,9 @@ def setup_arguments_parser():
|
||||
help="Don't show progressbar.",
|
||||
)
|
||||
|
||||
report_group = parser.add_argument_group('Report formats', 'Supported formats of report files')
|
||||
report_group = parser.add_argument_group(
|
||||
'Report formats', 'Supported formats of report files'
|
||||
)
|
||||
report_group.add_argument(
|
||||
"-T",
|
||||
"--txt",
|
||||
@@ -446,7 +460,9 @@ async def main():
|
||||
print("Using the proxy: " + args.proxy)
|
||||
|
||||
if args.parse_url:
|
||||
extracted_ids = extract_ids_from_page(args.parse_url, logger, timeout=args.timeout)
|
||||
extracted_ids = extract_ids_from_page(
|
||||
args.parse_url, logger, timeout=args.timeout
|
||||
)
|
||||
usernames.update(extracted_ids)
|
||||
|
||||
if args.tags:
|
||||
|
||||
@@ -282,6 +282,8 @@ class QueryNotifyPrint(QueryNotify):
|
||||
sys.stdout.write("\x1b[1K\r")
|
||||
print(notify)
|
||||
|
||||
return notify
|
||||
|
||||
def __str__(self):
|
||||
"""Convert Object To String.
|
||||
|
||||
|
||||
@@ -14365,6 +14365,7 @@
|
||||
"ru"
|
||||
],
|
||||
"checkType": "response_url",
|
||||
"regexCheck": "^(?!id\\d)\\w*$",
|
||||
"alexaRank": 27,
|
||||
"urlMain": "https://vk.com/",
|
||||
"url": "https://vk.com/{username}",
|
||||
@@ -14379,6 +14380,7 @@
|
||||
"checkType": "response_url",
|
||||
"alexaRank": 27,
|
||||
"urlMain": "https://vk.com/",
|
||||
"regexCheck": "^\\d+$",
|
||||
"url": "https://vk.com/id{username}",
|
||||
"source": "VK",
|
||||
"usernameClaimed": "270433952",
|
||||
|
||||
+14
-1
@@ -3,7 +3,7 @@
|
||||
import copy
|
||||
import json
|
||||
import sys
|
||||
from typing import Optional, List, Dict, Any
|
||||
from typing import Optional, List, Dict, Any, Tuple
|
||||
|
||||
import requests
|
||||
|
||||
@@ -146,6 +146,19 @@ class MaigretSite:
|
||||
|
||||
return None
|
||||
|
||||
def extract_id_from_url(self, url: str) -> Optional[Tuple[str, str]]:
|
||||
if not self.url_regexp:
|
||||
return None
|
||||
|
||||
match_groups = self.url_regexp.match(url)
|
||||
if not match_groups:
|
||||
return None
|
||||
|
||||
_id = match_groups.groups()[-1].rstrip("/")
|
||||
_type = self.type
|
||||
|
||||
return _id, _type
|
||||
|
||||
@property
|
||||
def pretty_name(self):
|
||||
if self.source:
|
||||
|
||||
+4
-2
@@ -55,9 +55,11 @@ class URLMatcher:
|
||||
url_main_part = self.extract_main_part(url)
|
||||
for c in self.UNSAFE_SYMBOLS:
|
||||
url_main_part = url_main_part.replace(c, f"\\{c}")
|
||||
username_regexp = username_regexp or ".+?"
|
||||
prepared_username_regexp = (username_regexp or ".+?").lstrip('^').rstrip('$')
|
||||
|
||||
url_regexp = url_main_part.replace("{username}", f"({username_regexp})")
|
||||
url_regexp = url_main_part.replace(
|
||||
"{username}", f"({prepared_username_regexp})"
|
||||
)
|
||||
regexp_str = self._HTTP_URL_RE_STR.replace("(.+)", url_regexp)
|
||||
|
||||
return re.compile(regexp_str)
|
||||
|
||||
+12
-1
@@ -51,6 +51,17 @@ def test_args_search_mode(argparser):
|
||||
assert args == Namespace(**want_args)
|
||||
|
||||
|
||||
def test_args_search_mode_several_usernames(argparser):
|
||||
args = argparser.parse_args('username1 username2'.split())
|
||||
|
||||
assert args.username == ['username1', 'username2']
|
||||
|
||||
want_args = dict(DEFAULT_ARGS)
|
||||
want_args.update({'username': ['username1', 'username2']})
|
||||
|
||||
assert args == Namespace(**want_args)
|
||||
|
||||
|
||||
def test_args_self_check_mode(argparser):
|
||||
args = argparser.parse_args('--self-check --site GitHub'.split())
|
||||
|
||||
@@ -59,7 +70,7 @@ def test_args_self_check_mode(argparser):
|
||||
{
|
||||
'self_check': True,
|
||||
'site_list': ['GitHub'],
|
||||
'username': [None],
|
||||
'username': [],
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
+12
-5
@@ -5,7 +5,8 @@ import copy
|
||||
import pytest
|
||||
from mock import Mock
|
||||
|
||||
from maigret.maigret import self_check, maigret, extract_ids_from_page, extract_ids_from_results
|
||||
from maigret.maigret import self_check, maigret
|
||||
from maigret.maigret import extract_ids_from_page, extract_ids_from_results, extract_ids_from_url
|
||||
from maigret.sites import MaigretSite
|
||||
from maigret.result import QueryResult, QueryStatus
|
||||
|
||||
@@ -137,11 +138,18 @@ def test_maigret_results(test_db):
|
||||
assert results == RESULTS_EXAMPLE
|
||||
|
||||
|
||||
def test_extract_ids_from_url(default_db):
|
||||
assert extract_ids_from_url('https://www.reddit.com/user/test', default_db) == {'test': 'username'}
|
||||
assert extract_ids_from_url('https://vk.com/id123', default_db) == {'123': 'vk_id'}
|
||||
assert extract_ids_from_url('https://vk.com/ida123', default_db) == {'ida123': 'username'}
|
||||
assert extract_ids_from_url('https://my.mail.ru/yandex.ru/dipres8904/', default_db) == {'dipres8904': 'username'}
|
||||
assert extract_ids_from_url('https://reviews.yandex.ru/user/adbced123', default_db) == {'adbced123': 'yandex_public_id'}
|
||||
|
||||
|
||||
@pytest.mark.slow
|
||||
def test_extract_ids_from_page(test_db):
|
||||
logger = Mock()
|
||||
found_ids = extract_ids_from_page('https://www.reddit.com/user/test', logger)
|
||||
assert found_ids == {'test': 'username'}
|
||||
extract_ids_from_page('https://www.reddit.com/user/test', logger) == {'test': 'username'}
|
||||
|
||||
|
||||
def test_extract_ids_from_results(test_db):
|
||||
@@ -149,5 +157,4 @@ def test_extract_ids_from_results(test_db):
|
||||
TEST_EXAMPLE['Reddit']['ids_usernames'] = {'test1': 'yandex_public_id'}
|
||||
TEST_EXAMPLE['Reddit']['ids_links'] = ['https://www.reddit.com/user/test2']
|
||||
|
||||
found_ids = extract_ids_from_results(TEST_EXAMPLE, test_db)
|
||||
assert found_ids == {'test1': 'yandex_public_id', 'test2': 'username'}
|
||||
extract_ids_from_results(TEST_EXAMPLE, test_db) == {'test1': 'yandex_public_id', 'test2': 'username'}
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
from maigret.errors import CheckError
|
||||
from maigret.notify import QueryNotifyPrint
|
||||
from maigret.result import QueryStatus, QueryResult
|
||||
|
||||
|
||||
def test_notify_illegal():
|
||||
n = QueryNotifyPrint(color=False)
|
||||
|
||||
assert n.update(QueryResult(
|
||||
username="test",
|
||||
status=QueryStatus.ILLEGAL,
|
||||
site_name="TEST_SITE",
|
||||
site_url_user="http://example.com/test"
|
||||
)) == "[-] TEST_SITE: Illegal Username Format For This Site!"
|
||||
|
||||
|
||||
def test_notify_claimed():
|
||||
n = QueryNotifyPrint(color=False)
|
||||
|
||||
assert n.update(QueryResult(
|
||||
username="test",
|
||||
status=QueryStatus.CLAIMED,
|
||||
site_name="TEST_SITE",
|
||||
site_url_user="http://example.com/test"
|
||||
)) == "[+] TEST_SITE: http://example.com/test"
|
||||
|
||||
|
||||
def test_notify_available():
|
||||
n = QueryNotifyPrint(color=False)
|
||||
|
||||
assert n.update(QueryResult(
|
||||
username="test",
|
||||
status=QueryStatus.AVAILABLE,
|
||||
site_name="TEST_SITE",
|
||||
site_url_user="http://example.com/test"
|
||||
)) == "[-] TEST_SITE: Not found!"
|
||||
|
||||
|
||||
def test_notify_unknown():
|
||||
n = QueryNotifyPrint(color=False)
|
||||
result = QueryResult(
|
||||
username="test",
|
||||
status=QueryStatus.UNKNOWN,
|
||||
site_name="TEST_SITE",
|
||||
site_url_user="http://example.com/test"
|
||||
)
|
||||
result.error = CheckError('Type', 'Reason')
|
||||
|
||||
assert n.update(result) == "[?] TEST_SITE: Type error: Reason"
|
||||
@@ -68,8 +68,10 @@ def test_url_extract_main_part():
|
||||
]
|
||||
|
||||
url_regexp = re.compile('^https?://(www.)?flickr.com/photos/(.+?)$')
|
||||
# combine parts variations
|
||||
for url_parts in itertools.product(*parts):
|
||||
url = ''.join(url_parts)
|
||||
# ensure all combinations give valid main part
|
||||
assert URLMatcher.extract_main_part(url) == url_main_part
|
||||
assert not url_regexp.match(url) is None
|
||||
|
||||
@@ -84,8 +86,10 @@ def test_url_make_profile_url_regexp():
|
||||
['/', ''],
|
||||
]
|
||||
|
||||
# combine parts variations
|
||||
for url_parts in itertools.product(*parts):
|
||||
url = ''.join(url_parts)
|
||||
# ensure all combinations match pattern
|
||||
assert (
|
||||
URLMatcher.make_profile_url_regexp(url).pattern
|
||||
== r'^https?://(www.)?flickr\.com/photos/(.+?)$'
|
||||
|
||||
Reference in New Issue
Block a user