Fix site checks: 4 → ip_reputation, 9 fixed, 16 disabled, 3 dead dele… (#2555)

* Fix site checks: 4 → ip_reputation, 9 fixed, 16 disabled, 3 dead deleted; clarify ip_reputation tag semantics

* Improved test coverage
This commit is contained in:
Soxoj
2026-04-23 21:17:07 +02:00
committed by GitHub
parent b1004588af
commit 25026e21ea
8 changed files with 730 additions and 111 deletions
+107
View File
@@ -56,3 +56,110 @@ async def test_import_aiohttp_cookies(cookie_test_server):
print(f"Server response: {result}")
assert result == {'cookies': {'a': 'b'}}
# ---- OnlyFans signing tests (pure-compute, no network) ----
class _FakeSite:
"""Minimal stand-in for MaigretSite with the attributes onlyfans() touches."""
def __init__(self, headers=None, activation=None):
self.headers = headers or {}
self.activation = activation or {
"static_param": "jLM8LXHU1CGcuCzPMNwWX9osCScVuP4D",
"checksum_indexes": [28, 3, 16, 32, 25, 24, 23, 0, 26],
"checksum_constant": -180,
"format": "57203:{}:{:x}:69cfa6d8",
"url": "https://onlyfans.com/api2/v2/init",
}
class _FakeResponse:
def __init__(self, cookies=None):
self.cookies = cookies or {}
def test_onlyfans_sets_xbc_when_zero(monkeypatch):
site = _FakeSite(headers={"x-bc": "0", "cookie": "existing=1"})
# Prevent any real network. If _sign path still fires requests.get, fail loudly.
import maigret.activation as act_mod
def boom(*a, **kw): # pragma: no cover - sanity
raise AssertionError("requests.get should not run when cookie is present")
monkeypatch.setattr(act_mod.__dict__.get("requests", None) or __import__("requests"), "get", boom, raising=False)
logger = Mock()
ParsingActivator.onlyfans(site, logger, url="https://onlyfans.com/api2/v2/users/adam")
# x-bc must be rewritten to a non-zero hex token
assert site.headers["x-bc"] != "0"
assert len(site.headers["x-bc"]) == 40 # 20 bytes → 40 hex chars
# time / sign headers set for target URL
assert "time" in site.headers and site.headers["time"].isdigit()
assert site.headers["sign"].startswith("57203:")
def test_onlyfans_fetches_init_cookie_when_missing(monkeypatch):
"""When cookie header is absent, init endpoint is called and its cookies stored."""
site = _FakeSite(headers={"x-bc": "already_set_token", "user-id": "0"})
import requests
captured = {}
def fake_get(url, headers=None, timeout=15):
captured["url"] = url
captured["headers"] = dict(headers or {})
return _FakeResponse(cookies={"sess": "abc123", "csrf": "xyz"})
monkeypatch.setattr(requests, "get", fake_get)
logger = Mock()
ParsingActivator.onlyfans(site, logger, url="https://onlyfans.com/api2/v2/users/adam")
# init request made
assert captured["url"] == site.activation["url"]
# headers passed to init include freshly generated time/sign
assert "time" in captured["headers"]
assert captured["headers"]["sign"].startswith("57203:")
# cookie header populated from response
assert site.headers["cookie"] == "sess=abc123; csrf=xyz"
def test_onlyfans_signature_is_deterministic_for_same_time(monkeypatch):
"""Two calls with patched time produce identical signatures."""
site1 = _FakeSite(headers={"x-bc": "token", "cookie": "c=1"})
site2 = _FakeSite(headers={"x-bc": "token", "cookie": "c=1"})
import maigret.activation
monkeypatch.setattr(maigret.activation, "_time", __import__("time"), raising=False)
fixed = 1_700_000_000.123
import time as time_mod
monkeypatch.setattr(time_mod, "time", lambda: fixed)
logger = Mock()
ParsingActivator.onlyfans(site1, logger, url="https://onlyfans.com/api2/v2/users/adam")
ParsingActivator.onlyfans(site2, logger, url="https://onlyfans.com/api2/v2/users/adam")
assert site1.headers["time"] == site2.headers["time"]
assert site1.headers["sign"] == site2.headers["sign"]
def test_onlyfans_sign_differs_per_path(monkeypatch):
"""Different target URLs must yield different signatures."""
site = _FakeSite(headers={"x-bc": "token", "cookie": "c=1"})
import time as time_mod
monkeypatch.setattr(time_mod, "time", lambda: 1_700_000_000.0)
logger = Mock()
ParsingActivator.onlyfans(site, logger, url="https://onlyfans.com/api2/v2/users/adam")
sig_adam = site.headers["sign"]
ParsingActivator.onlyfans(site, logger, url="https://onlyfans.com/api2/v2/users/bob")
sig_bob = site.headers["sign"]
assert sig_adam != sig_bob
+240
View File
@@ -1,7 +1,22 @@
from argparse import ArgumentTypeError
from mock import Mock
import pytest
from maigret import search
from maigret.checking import (
detect_error_page,
extract_ids_data,
parse_usernames,
update_results_info,
get_failed_sites,
timeout_check,
debug_response_logging,
process_site_result,
)
from maigret.errors import CheckError
from maigret.result import MaigretCheckResult, MaigretCheckStatus
from maigret.sites import MaigretSite
def site_result_except(server, username, **kwargs):
@@ -67,3 +82,228 @@ async def test_checking_by_message_negative(httpserver, local_test_db):
result = await search('unclaimed', site_dict=sites_dict, logger=Mock())
assert result['Message']['status'].is_found() is True
# ---- Pure-function unit tests (no network) ----
def test_detect_error_page_site_specific():
err = detect_error_page(
"Please enable JavaScript to proceed",
200,
{"Please enable JavaScript to proceed": "Scraping protection"},
ignore_403=False,
)
assert err is not None
assert err.type == "Site-specific"
assert err.desc == "Scraping protection"
def test_detect_error_page_403():
err = detect_error_page("some body", 403, {}, ignore_403=False)
assert err is not None
assert err.type == "Access denied"
def test_detect_error_page_403_ignored():
# XenForo engine uses ignore403 because member-not-found also returns 403
assert detect_error_page("not found body", 403, {}, ignore_403=True) is None
def test_detect_error_page_999_linkedin():
# LinkedIn returns 999 on bot suspicion — must NOT be reported as Server error
assert detect_error_page("", 999, {}, ignore_403=False) is None
def test_detect_error_page_500():
err = detect_error_page("", 503, {}, ignore_403=False)
assert err is not None
assert err.type == "Server"
assert "503" in err.desc
def test_detect_error_page_ok():
assert detect_error_page("hello world", 200, {}, ignore_403=False) is None
def test_parse_usernames_single_username():
logger = Mock()
result = parse_usernames({"profile_username": "alice"}, logger)
assert result == {"alice": "username"}
def test_parse_usernames_list_of_usernames():
logger = Mock()
result = parse_usernames({"other_usernames": "['alice', 'bob']"}, logger)
assert result == {"alice": "username", "bob": "username"}
def test_parse_usernames_malformed_list():
logger = Mock()
result = parse_usernames({"other_usernames": "not-a-list"}, logger)
# should swallow the error and just return empty
assert result == {}
assert logger.warning.called
def test_parse_usernames_supported_id():
logger = Mock()
# "telegram" is in SUPPORTED_IDS per socid_extractor
from maigret.checking import SUPPORTED_IDS
if SUPPORTED_IDS:
key = next(iter(SUPPORTED_IDS))
result = parse_usernames({key: "some_value"}, logger)
assert result.get("some_value") == key
def test_update_results_info_links():
info = {"username": "test"}
result = update_results_info(
info,
{"links": "['https://example.com/a', 'https://example.com/b']", "website": "https://example.com/w"},
{"alice": "username"},
)
assert result["ids_usernames"] == {"alice": "username"}
assert "https://example.com/w" in result["ids_links"]
assert "https://example.com/a" in result["ids_links"]
def test_update_results_info_no_website():
info = {}
result = update_results_info(info, {"links": "[]"}, {})
assert result["ids_links"] == []
def test_extract_ids_data_bad_html_returns_empty():
logger = Mock()
# Random HTML should not raise — returns {} if nothing matches
out = extract_ids_data("<html><body>nothing special</body></html>", logger, Mock(name="Site"))
assert isinstance(out, dict)
def test_get_failed_sites_filters_permanent_errors():
# Temporary errors (Request timeout, Connecting failure, etc.) are retryable → returned.
# Permanent ones (Captcha, Access denied, etc.) and results without error → filtered out.
good_status = MaigretCheckResult("u", "S1", "https://s1", MaigretCheckStatus.CLAIMED)
timeout_err = MaigretCheckResult(
"u", "S2", "https://s2", MaigretCheckStatus.UNKNOWN,
error=CheckError("Request timeout", "slow server"),
)
captcha_err = MaigretCheckResult(
"u", "S3", "https://s3", MaigretCheckStatus.UNKNOWN,
error=CheckError("Captcha", "Cloudflare"),
)
results = {
"S1": {"status": good_status},
"S2": {"status": timeout_err},
"S3": {"status": captcha_err},
"S4": {}, # no status at all
}
failed = get_failed_sites(results)
# Only the temporary-error site is retry-worthy
assert failed == ["S2"]
def test_timeout_check_valid():
assert timeout_check("2.5") == 2.5
assert timeout_check("30") == 30.0
def test_timeout_check_invalid():
with pytest.raises(ArgumentTypeError):
timeout_check("abc")
with pytest.raises(ArgumentTypeError):
timeout_check("0")
with pytest.raises(ArgumentTypeError):
timeout_check("-1")
def test_debug_response_logging_writes(tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path)
debug_response_logging("https://example.com", "<html>hi</html>", 200, None)
out = (tmp_path / "debug.log").read_text()
assert "https://example.com" in out
assert "200" in out
def test_debug_response_logging_no_response(tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path)
debug_response_logging("https://example.com", None, None, CheckError("Timeout"))
out = (tmp_path / "debug.log").read_text()
assert "No response" in out
def _make_site(data_overrides=None):
base = {
"url": "https://x/{username}",
"urlMain": "https://x",
"checkType": "status_code",
"usernameClaimed": "a",
"usernameUnclaimed": "b",
}
if data_overrides:
base.update(data_overrides)
return MaigretSite("TestSite", base)
def test_process_site_result_no_response_returns_info():
site = _make_site()
info = {"username": "a", "parsing_enabled": False, "url_user": "https://x/a"}
out = process_site_result(None, Mock(), Mock(), info, site)
assert out is info
def test_process_site_result_status_already_set():
site = _make_site()
pre = MaigretCheckResult("a", "S", "u", MaigretCheckStatus.ILLEGAL)
info = {"username": "a", "parsing_enabled": False, "status": pre, "url_user": "u"}
# Since status is already set, function returns without changes
out = process_site_result(("<html/>", 200, None), Mock(), Mock(), info, site)
assert out["status"] is pre
def test_process_site_result_status_code_claimed():
site = _make_site({"checkType": "status_code"})
info = {"username": "a", "parsing_enabled": False, "url_user": "https://x/a"}
out = process_site_result(("<html/>", 200, None), Mock(), Mock(), info, site)
assert out["status"].status == MaigretCheckStatus.CLAIMED
assert out["http_status"] == 200
def test_process_site_result_status_code_available():
site = _make_site({"checkType": "status_code"})
info = {"username": "a", "parsing_enabled": False, "url_user": "https://x/a"}
out = process_site_result(("<html/>", 404, None), Mock(), Mock(), info, site)
assert out["status"].status == MaigretCheckStatus.AVAILABLE
def test_process_site_result_message_claimed():
site = _make_site({
"checkType": "message",
"presenseStrs": ["profile-name"],
"absenceStrs": ["not found"],
})
info = {"username": "a", "parsing_enabled": False, "url_user": "https://x/a"}
out = process_site_result(("<div class='profile-name'>Alice</div>", 200, None), Mock(), Mock(), info, site)
assert out["status"].status == MaigretCheckStatus.CLAIMED
def test_process_site_result_message_available_by_absence():
site = _make_site({
"checkType": "message",
"presenseStrs": ["profile-name"],
"absenceStrs": ["not found"],
})
info = {"username": "a", "parsing_enabled": False, "url_user": "https://x/a"}
out = process_site_result(("<h1>not found</h1> profile-name too", 200, None), Mock(), Mock(), info, site)
# absence marker wins even if presence marker also appears
assert out["status"].status == MaigretCheckStatus.AVAILABLE
def test_process_site_result_with_error_is_unknown():
site = _make_site({"checkType": "status_code"})
info = {"username": "a", "parsing_enabled": False, "url_user": "https://x/a"}
resp = ("body", 403, CheckError("Captcha", "Cloudflare"))
out = process_site_result(resp, Mock(), Mock(), info, site)
assert out["status"].status == MaigretCheckStatus.UNKNOWN
assert out["status"].error is not None
+227
View File
@@ -10,8 +10,15 @@ import xmind # type: ignore[import-untyped]
from jinja2 import Template
from maigret.report import (
filter_supposed_data,
sort_report_by_data_points,
_md_format_value,
generate_csv_report,
generate_txt_report,
save_csv_report,
save_txt_report,
save_json_report,
save_markdown_report,
save_xmind_report,
save_html_report,
save_pdf_report,
@@ -456,3 +463,223 @@ def test_text_report_broken():
assert brief_part in report_text
assert 'us' in report_text
assert 'photo' in report_text
def test_filter_supposed_data():
data = {
'fullname': ['Alice'],
'gender': ['female'],
'location': ['Berlin'],
'age': ['30'],
'email': ['x@y.z'], # not allowed, must be dropped
'bio': ['hi'], # not allowed
}
result = filter_supposed_data(data)
assert result == {
'Fullname': 'Alice',
'Gender': 'female',
'Location': 'Berlin',
'Age': '30',
}
def test_filter_supposed_data_empty():
assert filter_supposed_data({}) == {}
assert filter_supposed_data({'nope': ['v']}) == {}
def test_filter_supposed_data_scalar_values():
# Strings and scalars must be kept whole — previously v[0] on "Alice"
# silently returned "A" instead of "Alice".
data = {
'fullname': 'Alice',
'gender': 'female',
'location': 'Berlin',
'age': 30,
}
assert filter_supposed_data(data) == {
'Fullname': 'Alice',
'Gender': 'female',
'Location': 'Berlin',
'Age': 30,
}
def test_filter_supposed_data_empty_list_yields_empty_string():
# Edge case: list value present but empty should not crash with IndexError.
assert filter_supposed_data({'fullname': []}) == {'Fullname': ''}
def test_filter_supposed_data_mixed_values():
# List and scalar mixed in the same payload.
data = {'fullname': ['Alice', 'Alicia'], 'gender': 'female'}
assert filter_supposed_data(data) == {
'Fullname': 'Alice',
'Gender': 'female',
}
def test_sort_report_by_data_points():
status_many = MaigretCheckResult('', '', '', MaigretCheckStatus.CLAIMED)
status_many.ids_data = {'a': 1, 'b': 2, 'c': 3}
status_one = MaigretCheckResult('', '', '', MaigretCheckStatus.CLAIMED)
status_one.ids_data = {'a': 1}
status_none = MaigretCheckResult('', '', '', MaigretCheckStatus.CLAIMED)
results = {
'few': {'status': status_one},
'many': {'status': status_many},
'zero': {'status': status_none},
'nostatus': {},
}
sorted_out = sort_report_by_data_points(results)
keys = list(sorted_out.keys())
# site with 3 ids_data fields must come first
assert keys[0] == 'many'
# site with 1 field next
assert keys[1] == 'few'
def test_md_format_value_list():
assert _md_format_value(['a', 'b', 'c']) == 'a, b, c'
def test_md_format_value_url():
assert _md_format_value('https://example.com') == '[https://example.com](https://example.com)'
assert _md_format_value('http://x.y') == '[http://x.y](http://x.y)'
def test_md_format_value_plain():
assert _md_format_value('hello') == 'hello'
assert _md_format_value(42) == '42'
def test_save_csv_report():
filename = 'report_test.csv'
save_csv_report(filename, 'test', EXAMPLE_RESULTS)
with open(filename) as f:
content = f.read()
assert 'username,name,url_main' in content
assert 'test,GitHub' in content
def test_save_txt_report():
filename = 'report_test.txt'
save_txt_report(filename, 'test', EXAMPLE_RESULTS)
with open(filename) as f:
content = f.read()
assert 'https://www.github.com/test' in content
assert 'Total Websites Username Detected On : 1' in content
def test_save_json_report_simple():
filename = 'report_test.json'
save_json_report(filename, 'test', EXAMPLE_RESULTS, 'simple')
with open(filename) as f:
data = json.load(f)
assert 'GitHub' in data
def test_save_json_report_ndjson():
filename = 'report_test_ndjson.json'
save_json_report(filename, 'test', EXAMPLE_RESULTS, 'ndjson')
with open(filename) as f:
lines = f.readlines()
assert len(lines) == 1
assert json.loads(lines[0])['sitename'] == 'GitHub'
def _markdown_context_with_rich_ids():
"""Build a context with found accounts, ids_data (incl. image, url, list) to exercise all branches."""
found_result = copy.deepcopy(GOOD_RESULT)
found_result.tags = ['photo', 'us']
found_result.ids_data = {
"fullname": "Alice",
"name": "Alice A.",
"location": "Berlin",
"bio": "Photographer",
"external_url": "https://example.com/profile",
"image": "https://example.com/avatar.png", # must be skipped
"aliases": ["alice", "alicea"], # list value
"last_online": "2024-01-02 10:00:00",
}
data = {
'Github': {
'username': 'alice',
'parsing_enabled': True,
'url_main': 'https://github.com/',
'url_user': 'https://github.com/alice',
'status': found_result,
'http_status': 200,
'is_similar': False,
'rank': 1,
'site': MaigretSite('Github', {}),
'found': True,
'ids_data': found_result.ids_data,
},
'Similar': {
'username': 'alice',
'url_user': 'https://other.com/alice',
'is_similar': True,
'found': True,
'status': copy.deepcopy(GOOD_RESULT),
},
}
return {
'username': 'alice',
'generated_at': '2024-01-02 10:00',
'brief': 'Search returned 1 account',
'countries_tuple_list': [('us', 1)],
'interests_tuple_list': [('photo', 1)],
'first_seen': '2023-01-01',
'results': [('alice', 'username', data)],
}
def test_save_markdown_report():
filename = 'report_test.md'
context = _markdown_context_with_rich_ids()
save_markdown_report(filename, context, run_info={'sites_count': 100, 'flags': '--top-sites 100'})
with open(filename) as f:
content = f.read()
assert '# Report by searching on username "alice"' in content
assert '## Summary' in content
assert '## Accounts found' in content
assert '### Github' in content
assert '[https://github.com/alice](https://github.com/alice)' in content
assert 'Ethical use' in content
assert '100 sites checked' in content
# image field must NOT appear in per-site listing
assert 'avatar.png' not in content
# list field rendered with join
assert 'alice, alicea' in content
# external url formatted as markdown link
assert '[https://example.com/profile](https://example.com/profile)' in content
def test_save_markdown_report_minimal_context():
"""No run_info, no first_seen — exercise the fallback branches."""
filename = 'report_test_min.md'
context = {
'username': 'bob',
'brief': 'nothing found',
'results': [],
}
save_markdown_report(filename, context)
with open(filename) as f:
content = f.read()
assert '# Report by searching on username "bob"' in content
assert '## Summary' in content
def test_get_plaintext_report_minimal():
"""Minimal context without countries/interests."""
context = {
'brief': 'Nothing to report.',
'interests_tuple_list': [],
'countries_tuple_list': [],
}
out = get_plaintext_report(context)
assert 'Nothing to report.' in out
assert 'Countries:' not in out
assert 'Interests' not in out