Improve extracting ids from URLs, tests

This commit is contained in:
Soxoj
2021-05-06 22:35:44 +03:00
parent 009d51c380
commit 0e9655c46a
10 changed files with 129 additions and 26 deletions
+1 -4
View File
@@ -22,9 +22,6 @@ src/
# Comma-Separated Values (CSV) Reports
*.csv
# Excluded sites list
tests/.excluded_sites
# MacOS Folder Metadata File
.DS_Store
/reports/
@@ -33,4 +30,4 @@ tests/.excluded_sites
.coverage
dist/
htmlcov/
test_*
/test_*
+29 -13
View File
@@ -60,6 +60,17 @@ def notify_about_errors(search_results: QueryResultWrapper, query_notify):
)
def extract_ids_from_url(url: str, db: MaigretDatabase) -> dict:
results = {}
for s in db.sites:
result = s.extract_id_from_url(url)
if not result:
continue
_id, _type = result
results[_id] = _type
return results
def extract_ids_from_page(url, logger, timeout=5) -> dict:
results = {}
# url, headers
@@ -105,10 +116,8 @@ def extract_ids_from_results(results: QueryResultWrapper, db: MaigretDatabase) -
ids_results[u] = utype
for url in dictionary.get('ids_links', []):
for s in db.sites:
u = s.detect_username(url)
if u:
ids_results[u] = 'username'
ids_results.update(extract_ids_from_url(url, db))
return ids_results
@@ -129,10 +138,9 @@ def setup_arguments_parser():
)
parser.add_argument(
"username",
nargs='?',
nargs='*',
metavar="USERNAMES",
action="append",
help="One or more usernames to check with social networks.",
help="One or more usernames to search by.",
)
parser.add_argument(
"--version",
@@ -231,7 +239,9 @@ def setup_arguments_parser():
help="Make requests over a proxy. e.g. socks5://127.0.0.1:1080",
)
filter_group = parser.add_argument_group('Site filtering', 'Options to set site search scope')
filter_group = parser.add_argument_group(
'Site filtering', 'Options to set site search scope'
)
filter_group.add_argument(
"-a",
"--all-sites",
@@ -269,7 +279,7 @@ def setup_arguments_parser():
modes_group = parser.add_argument_group(
'Operating modes',
'Various functions except the default search by a username. '
'Modes are executed sequentially in the order of declaration.'
'Modes are executed sequentially in the order of declaration.',
)
modes_group.add_argument(
"--parse",
@@ -296,10 +306,12 @@ def setup_arguments_parser():
"--stats",
action="store_true",
default=False,
help="Show database statistics (most frequent sites engines and tags)."
help="Show database statistics (most frequent sites engines and tags).",
)
output_group = parser.add_argument_group('Output options', 'Options to change verbosity and view of the console output')
output_group = parser.add_argument_group(
'Output options', 'Options to change verbosity and view of the console output'
)
output_group.add_argument(
"--print-not-found",
action="store_true",
@@ -354,7 +366,9 @@ def setup_arguments_parser():
help="Don't show progressbar.",
)
report_group = parser.add_argument_group('Report formats', 'Supported formats of report files')
report_group = parser.add_argument_group(
'Report formats', 'Supported formats of report files'
)
report_group.add_argument(
"-T",
"--txt",
@@ -446,7 +460,9 @@ async def main():
print("Using the proxy: " + args.proxy)
if args.parse_url:
extracted_ids = extract_ids_from_page(args.parse_url, logger, timeout=args.timeout)
extracted_ids = extract_ids_from_page(
args.parse_url, logger, timeout=args.timeout
)
usernames.update(extracted_ids)
if args.tags:
+2
View File
@@ -282,6 +282,8 @@ class QueryNotifyPrint(QueryNotify):
sys.stdout.write("\x1b[1K\r")
print(notify)
return notify
def __str__(self):
"""Convert Object To String.
+2
View File
@@ -14365,6 +14365,7 @@
"ru"
],
"checkType": "response_url",
"regexCheck": "^(?!id\\d)\\w*$",
"alexaRank": 27,
"urlMain": "https://vk.com/",
"url": "https://vk.com/{username}",
@@ -14379,6 +14380,7 @@
"checkType": "response_url",
"alexaRank": 27,
"urlMain": "https://vk.com/",
"regexCheck": "^\\d+$",
"url": "https://vk.com/id{username}",
"source": "VK",
"usernameClaimed": "270433952",
+14 -1
View File
@@ -3,7 +3,7 @@
import copy
import json
import sys
from typing import Optional, List, Dict, Any
from typing import Optional, List, Dict, Any, Tuple
import requests
@@ -146,6 +146,19 @@ class MaigretSite:
return None
def extract_id_from_url(self, url: str) -> Optional[Tuple[str, str]]:
if not self.url_regexp:
return None
match_groups = self.url_regexp.match(url)
if not match_groups:
return None
_id = match_groups.groups()[-1].rstrip("/")
_type = self.type
return _id, _type
@property
def pretty_name(self):
if self.source:
+4 -2
View File
@@ -55,9 +55,11 @@ class URLMatcher:
url_main_part = self.extract_main_part(url)
for c in self.UNSAFE_SYMBOLS:
url_main_part = url_main_part.replace(c, f"\\{c}")
username_regexp = username_regexp or ".+?"
prepared_username_regexp = (username_regexp or ".+?").lstrip('^').rstrip('$')
url_regexp = url_main_part.replace("{username}", f"({username_regexp})")
url_regexp = url_main_part.replace(
"{username}", f"({prepared_username_regexp})"
)
regexp_str = self._HTTP_URL_RE_STR.replace("(.+)", url_regexp)
return re.compile(regexp_str)
+12 -1
View File
@@ -51,6 +51,17 @@ def test_args_search_mode(argparser):
assert args == Namespace(**want_args)
def test_args_search_mode_several_usernames(argparser):
args = argparser.parse_args('username1 username2'.split())
assert args.username == ['username1', 'username2']
want_args = dict(DEFAULT_ARGS)
want_args.update({'username': ['username1', 'username2']})
assert args == Namespace(**want_args)
def test_args_self_check_mode(argparser):
args = argparser.parse_args('--self-check --site GitHub'.split())
@@ -59,7 +70,7 @@ def test_args_self_check_mode(argparser):
{
'self_check': True,
'site_list': ['GitHub'],
'username': [None],
'username': [],
}
)
+12 -5
View File
@@ -5,7 +5,8 @@ import copy
import pytest
from mock import Mock
from maigret.maigret import self_check, maigret, extract_ids_from_page, extract_ids_from_results
from maigret.maigret import self_check, maigret
from maigret.maigret import extract_ids_from_page, extract_ids_from_results, extract_ids_from_url
from maigret.sites import MaigretSite
from maigret.result import QueryResult, QueryStatus
@@ -137,11 +138,18 @@ def test_maigret_results(test_db):
assert results == RESULTS_EXAMPLE
def test_extract_ids_from_url(default_db):
assert extract_ids_from_url('https://www.reddit.com/user/test', default_db) == {'test': 'username'}
assert extract_ids_from_url('https://vk.com/id123', default_db) == {'123': 'vk_id'}
assert extract_ids_from_url('https://vk.com/ida123', default_db) == {'ida123': 'username'}
assert extract_ids_from_url('https://my.mail.ru/yandex.ru/dipres8904/', default_db) == {'dipres8904': 'username'}
assert extract_ids_from_url('https://reviews.yandex.ru/user/adbced123', default_db) == {'adbced123': 'yandex_public_id'}
@pytest.mark.slow
def test_extract_ids_from_page(test_db):
logger = Mock()
found_ids = extract_ids_from_page('https://www.reddit.com/user/test', logger)
assert found_ids == {'test': 'username'}
extract_ids_from_page('https://www.reddit.com/user/test', logger) == {'test': 'username'}
def test_extract_ids_from_results(test_db):
@@ -149,5 +157,4 @@ def test_extract_ids_from_results(test_db):
TEST_EXAMPLE['Reddit']['ids_usernames'] = {'test1': 'yandex_public_id'}
TEST_EXAMPLE['Reddit']['ids_links'] = ['https://www.reddit.com/user/test2']
found_ids = extract_ids_from_results(TEST_EXAMPLE, test_db)
assert found_ids == {'test1': 'yandex_public_id', 'test2': 'username'}
extract_ids_from_results(TEST_EXAMPLE, test_db) == {'test1': 'yandex_public_id', 'test2': 'username'}
+49
View File
@@ -0,0 +1,49 @@
from maigret.errors import CheckError
from maigret.notify import QueryNotifyPrint
from maigret.result import QueryStatus, QueryResult
def test_notify_illegal():
n = QueryNotifyPrint(color=False)
assert n.update(QueryResult(
username="test",
status=QueryStatus.ILLEGAL,
site_name="TEST_SITE",
site_url_user="http://example.com/test"
)) == "[-] TEST_SITE: Illegal Username Format For This Site!"
def test_notify_claimed():
n = QueryNotifyPrint(color=False)
assert n.update(QueryResult(
username="test",
status=QueryStatus.CLAIMED,
site_name="TEST_SITE",
site_url_user="http://example.com/test"
)) == "[+] TEST_SITE: http://example.com/test"
def test_notify_available():
n = QueryNotifyPrint(color=False)
assert n.update(QueryResult(
username="test",
status=QueryStatus.AVAILABLE,
site_name="TEST_SITE",
site_url_user="http://example.com/test"
)) == "[-] TEST_SITE: Not found!"
def test_notify_unknown():
n = QueryNotifyPrint(color=False)
result = QueryResult(
username="test",
status=QueryStatus.UNKNOWN,
site_name="TEST_SITE",
site_url_user="http://example.com/test"
)
result.error = CheckError('Type', 'Reason')
assert n.update(result) == "[?] TEST_SITE: Type error: Reason"
+4
View File
@@ -68,8 +68,10 @@ def test_url_extract_main_part():
]
url_regexp = re.compile('^https?://(www.)?flickr.com/photos/(.+?)$')
# combine parts variations
for url_parts in itertools.product(*parts):
url = ''.join(url_parts)
# ensure all combinations give valid main part
assert URLMatcher.extract_main_part(url) == url_main_part
assert not url_regexp.match(url) is None
@@ -84,8 +86,10 @@ def test_url_make_profile_url_regexp():
['/', ''],
]
# combine parts variations
for url_parts in itertools.product(*parts):
url = ''.join(url_parts)
# ensure all combinations match pattern
assert (
URLMatcher.make_profile_url_regexp(url).pattern
== r'^https?://(www.)?flickr\.com/photos/(.+?)$'