Files
maigret/tests/test_checking.py

468 lines
16 KiB
Python

from argparse import ArgumentTypeError
from mock import Mock
import pytest
from maigret import search
from maigret.checking import (
detect_error_page,
extract_ids_data,
parse_usernames,
update_results_info,
get_failed_sites,
timeout_check,
debug_response_logging,
process_site_result,
)
from maigret.errors import CheckError
from maigret.result import MaigretCheckResult, MaigretCheckStatus
from maigret.sites import MaigretSite
def site_result_except(server, username, **kwargs):
query = f'id={username}'
server.expect_request('/url', query_string=query).respond_with_data(**kwargs)
@pytest.mark.slow
@pytest.mark.asyncio
async def test_checking_by_status_code(httpserver, local_test_db):
sites_dict = local_test_db.sites_dict
site_result_except(httpserver, 'claimed', status=200)
site_result_except(httpserver, 'unclaimed', status=404)
result = await search('claimed', site_dict=sites_dict, logger=Mock())
assert result['StatusCode']['status'].is_found() is True
result = await search('unclaimed', site_dict=sites_dict, logger=Mock())
assert result['StatusCode']['status'].is_found() is False
@pytest.mark.slow
@pytest.mark.asyncio
async def test_checking_by_message_positive_full(httpserver, local_test_db):
sites_dict = local_test_db.sites_dict
site_result_except(httpserver, 'claimed', response_data="user profile")
site_result_except(httpserver, 'unclaimed', response_data="404 not found")
result = await search('claimed', site_dict=sites_dict, logger=Mock())
assert result['Message']['status'].is_found() is True
result = await search('unclaimed', site_dict=sites_dict, logger=Mock())
assert result['Message']['status'].is_found() is False
@pytest.mark.slow
@pytest.mark.asyncio
async def test_checking_by_message_positive_part(httpserver, local_test_db):
sites_dict = local_test_db.sites_dict
site_result_except(httpserver, 'claimed', response_data="profile")
site_result_except(httpserver, 'unclaimed', response_data="404")
result = await search('claimed', site_dict=sites_dict, logger=Mock())
assert result['Message']['status'].is_found() is True
result = await search('unclaimed', site_dict=sites_dict, logger=Mock())
assert result['Message']['status'].is_found() is False
@pytest.mark.slow
@pytest.mark.asyncio
async def test_checking_by_message_negative(httpserver, local_test_db):
sites_dict = local_test_db.sites_dict
site_result_except(httpserver, 'claimed', response_data="")
site_result_except(httpserver, 'unclaimed', response_data="user 404")
result = await search('claimed', site_dict=sites_dict, logger=Mock())
assert result['Message']['status'].is_found() is False
result = await search('unclaimed', site_dict=sites_dict, logger=Mock())
assert result['Message']['status'].is_found() is True
# ---- Pure-function unit tests (no network) ----
def test_detect_error_page_site_specific():
err = detect_error_page(
"Please enable JavaScript to proceed",
200,
{"Please enable JavaScript to proceed": "Scraping protection"},
ignore_403=False,
)
assert err is not None
assert err.type == "Site-specific"
assert err.desc == "Scraping protection"
def test_detect_error_page_403():
err = detect_error_page("some body", 403, {}, ignore_403=False)
assert err is not None
assert err.type == "Access denied"
def test_detect_error_page_403_ignored():
# XenForo engine uses ignore403 because member-not-found also returns 403
assert detect_error_page("not found body", 403, {}, ignore_403=True) is None
def test_detect_error_page_999_linkedin():
# LinkedIn returns 999 on bot suspicion — must NOT be reported as Server error
assert detect_error_page("", 999, {}, ignore_403=False) is None
def test_detect_error_page_500():
err = detect_error_page("", 503, {}, ignore_403=False)
assert err is not None
assert err.type == "Server"
assert "503" in err.desc
def test_detect_error_page_ok():
assert detect_error_page("hello world", 200, {}, ignore_403=False) is None
def test_parse_usernames_single_username():
logger = Mock()
result = parse_usernames({"profile_username": "alice"}, logger)
assert result == {"alice": "username"}
def test_parse_usernames_list_of_usernames():
logger = Mock()
result = parse_usernames({"other_usernames": "['alice', 'bob']"}, logger)
assert result == {"alice": "username", "bob": "username"}
def test_parse_usernames_malformed_list():
logger = Mock()
result = parse_usernames({"other_usernames": "not-a-list"}, logger)
# should swallow the error and just return empty
assert result == {}
assert logger.warning.called
def test_parse_usernames_supported_id():
logger = Mock()
# "telegram" is in SUPPORTED_IDS per socid_extractor
from maigret.checking import SUPPORTED_IDS
if SUPPORTED_IDS:
key = next(iter(SUPPORTED_IDS))
result = parse_usernames({key: "some_value"}, logger)
assert result.get("some_value") == key
def test_update_results_info_links():
info = {"username": "test"}
result = update_results_info(
info,
{"links": "['https://example.com/a', 'https://example.com/b']", "website": "https://example.com/w"},
{"alice": "username"},
)
assert result["ids_usernames"] == {"alice": "username"}
assert "https://example.com/w" in result["ids_links"]
assert "https://example.com/a" in result["ids_links"]
def test_update_results_info_no_website():
info = {}
result = update_results_info(info, {"links": "[]"}, {})
assert result["ids_links"] == []
def test_extract_ids_data_bad_html_returns_empty():
logger = Mock()
# Random HTML should not raise — returns {} if nothing matches
out = extract_ids_data("<html><body>nothing special</body></html>", logger, Mock(name="Site"))
assert isinstance(out, dict)
def test_get_failed_sites_filters_permanent_errors():
# Temporary errors (Request timeout, Connecting failure, etc.) are retryable → returned.
# Permanent ones (Captcha, Access denied, etc.) and results without error → filtered out.
good_status = MaigretCheckResult("u", "S1", "https://s1", MaigretCheckStatus.CLAIMED)
timeout_err = MaigretCheckResult(
"u", "S2", "https://s2", MaigretCheckStatus.UNKNOWN,
error=CheckError("Request timeout", "slow server"),
)
captcha_err = MaigretCheckResult(
"u", "S3", "https://s3", MaigretCheckStatus.UNKNOWN,
error=CheckError("Captcha", "Cloudflare"),
)
results = {
"S1": {"status": good_status},
"S2": {"status": timeout_err},
"S3": {"status": captcha_err},
"S4": {}, # no status at all
}
failed = get_failed_sites(results)
# Only the temporary-error site is retry-worthy
assert failed == ["S2"]
def test_timeout_check_valid():
assert timeout_check("2.5") == 2.5
assert timeout_check("30") == 30.0
def test_timeout_check_invalid():
with pytest.raises(ArgumentTypeError):
timeout_check("abc")
with pytest.raises(ArgumentTypeError):
timeout_check("0")
with pytest.raises(ArgumentTypeError):
timeout_check("-1")
def test_debug_response_logging_writes(tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path)
debug_response_logging("https://example.com", "<html>hi</html>", 200, None)
out = (tmp_path / "debug.log").read_text()
assert "https://example.com" in out
assert "200" in out
def test_debug_response_logging_no_response(tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path)
debug_response_logging("https://example.com", None, None, CheckError("Timeout"))
out = (tmp_path / "debug.log").read_text()
assert "No response" in out
def _make_site(data_overrides=None):
base = {
"url": "https://x/{username}",
"urlMain": "https://x",
"checkType": "status_code",
"usernameClaimed": "a",
"usernameUnclaimed": "b",
}
if data_overrides:
base.update(data_overrides)
return MaigretSite("TestSite", base)
def test_process_site_result_no_response_returns_info():
site = _make_site()
info = {"username": "a", "parsing_enabled": False, "url_user": "https://x/a"}
out = process_site_result(None, Mock(), Mock(), info, site)
assert out is info
def test_process_site_result_status_already_set():
site = _make_site()
pre = MaigretCheckResult("a", "S", "u", MaigretCheckStatus.ILLEGAL)
info = {"username": "a", "parsing_enabled": False, "status": pre, "url_user": "u"}
# Since status is already set, function returns without changes
out = process_site_result(("<html/>", 200, None), Mock(), Mock(), info, site)
assert out["status"] is pre
def test_process_site_result_status_code_claimed():
site = _make_site({"checkType": "status_code"})
info = {"username": "a", "parsing_enabled": False, "url_user": "https://x/a"}
out = process_site_result(("<html/>", 200, None), Mock(), Mock(), info, site)
assert out["status"].status == MaigretCheckStatus.CLAIMED
assert out["http_status"] == 200
def test_process_site_result_status_code_available():
site = _make_site({"checkType": "status_code"})
info = {"username": "a", "parsing_enabled": False, "url_user": "https://x/a"}
out = process_site_result(("<html/>", 404, None), Mock(), Mock(), info, site)
assert out["status"].status == MaigretCheckStatus.AVAILABLE
def test_process_site_result_message_claimed():
site = _make_site({
"checkType": "message",
"presenseStrs": ["profile-name"],
"absenceStrs": ["not found"],
})
info = {"username": "a", "parsing_enabled": False, "url_user": "https://x/a"}
out = process_site_result(("<div class='profile-name'>Alice</div>", 200, None), Mock(), Mock(), info, site)
assert out["status"].status == MaigretCheckStatus.CLAIMED
def test_process_site_result_message_available_by_absence():
site = _make_site({
"checkType": "message",
"presenseStrs": ["profile-name"],
"absenceStrs": ["not found"],
})
info = {"username": "a", "parsing_enabled": False, "url_user": "https://x/a"}
out = process_site_result(("<h1>not found</h1> profile-name too", 200, None), Mock(), Mock(), info, site)
# absence marker wins even if presence marker also appears
assert out["status"].status == MaigretCheckStatus.AVAILABLE
def test_process_site_result_with_error_is_unknown():
site = _make_site({"checkType": "status_code"})
info = {"username": "a", "parsing_enabled": False, "url_user": "https://x/a"}
resp = ("body", 403, CheckError("Captcha", "Cloudflare"))
out = process_site_result(resp, Mock(), Mock(), info, site)
assert out["status"].status == MaigretCheckStatus.UNKNOWN
assert out["status"].error is not None
# ---- CurlCffiChecker: TLS impersonation header sanitisation ----
class _FakeCurlResponse:
def __init__(self, text="ok", status_code=200):
self.text = text
self.status_code = status_code
class _FakeCurlSession:
"""Captures the kwargs of the last .get/.post/.head call for assertions."""
last_method = None
last_kwargs = None
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return False
async def get(self, **kwargs):
type(self).last_method = 'get'
type(self).last_kwargs = kwargs
return _FakeCurlResponse()
async def post(self, **kwargs):
type(self).last_method = 'post'
type(self).last_kwargs = kwargs
return _FakeCurlResponse()
async def head(self, **kwargs):
type(self).last_method = 'head'
type(self).last_kwargs = kwargs
return _FakeCurlResponse()
@pytest.fixture
def fake_curl_cffi(monkeypatch):
"""Replace CurlCffiAsyncSession with a recorder. Resets capture between tests."""
from maigret import checking
_FakeCurlSession.last_method = None
_FakeCurlSession.last_kwargs = None
monkeypatch.setattr(checking, 'CurlCffiAsyncSession', _FakeCurlSession)
return _FakeCurlSession
@pytest.mark.asyncio
async def test_curl_cffi_strips_random_user_agent_to_let_impersonation_drive_ua(fake_curl_cffi):
"""Regression: maigret used to forward `get_random_user_agent()` (often Chrome 91)
to curl_cffi alongside `impersonate="chrome"` (Chrome 131 TLS). Cloudflare composite
bot scoring rejects the resulting "Chrome 91 UA + Chrome 131 TLS" combo with a JS
challenge. The fix strips User-Agent and Connection from the headers passed to
curl_cffi so the impersonation default UA wins.
"""
from maigret.checking import CurlCffiChecker
checker = CurlCffiChecker(logger=Mock(), browser_emulate='chrome')
checker.prepare(
url='https://example.com/u/test',
headers={
"User-Agent": "Mozilla/5.0 ... Chrome/91.0.4472.124 ...", # maigret default
"Connection": "close", # maigret default
},
allow_redirects=True,
timeout=10,
method='get',
)
await checker.check()
sent = fake_curl_cffi.last_kwargs
assert fake_curl_cffi.last_method == 'get'
assert sent['impersonate'] == 'chrome'
# The whole point of the fix: random UA must not leak through.
assert sent['headers'] is None or 'User-Agent' not in sent['headers']
assert sent['headers'] is None or 'user-agent' not in {k.lower() for k in sent['headers']}
# Connection: close also stripped (interferes with impersonation defaults).
assert sent['headers'] is None or 'Connection' not in sent['headers']
@pytest.mark.asyncio
async def test_curl_cffi_preserves_site_specific_headers(fake_curl_cffi):
"""Site-specific headers (e.g. Content-Type for POST APIs, auth tokens, cookies)
must survive the User-Agent strip — only UA and Connection are removed.
"""
from maigret.checking import CurlCffiChecker
checker = CurlCffiChecker(logger=Mock(), browser_emulate='chrome')
checker.prepare(
url='https://example.com/api',
headers={
"User-Agent": "Mozilla/5.0 random",
"Connection": "close",
"Content-Type": "application/json",
"X-Csrf-Token": "abc123",
},
allow_redirects=True,
timeout=10,
method='get',
)
await checker.check()
sent_headers = fake_curl_cffi.last_kwargs['headers']
assert sent_headers is not None
assert sent_headers.get("Content-Type") == "application/json"
assert sent_headers.get("X-Csrf-Token") == "abc123"
# Sanity: stripped pair is gone
assert "User-Agent" not in sent_headers
assert "Connection" not in sent_headers
@pytest.mark.asyncio
async def test_curl_cffi_handles_empty_headers(fake_curl_cffi):
"""No headers at all → headers kwarg is None (not an empty dict that could confuse
curl_cffi's impersonation header injection)."""
from maigret.checking import CurlCffiChecker
checker = CurlCffiChecker(logger=Mock(), browser_emulate='chrome')
checker.prepare(
url='https://example.com/u/test',
headers=None,
allow_redirects=True,
timeout=10,
method='get',
)
await checker.check()
assert fake_curl_cffi.last_kwargs['headers'] is None
assert fake_curl_cffi.last_kwargs['impersonate'] == 'chrome'
@pytest.mark.asyncio
async def test_curl_cffi_strips_ua_for_post_too(fake_curl_cffi):
"""The same UA-strip must apply on POST (e.g. Discord-style POST username probes
with `tls_fingerprint`)."""
from maigret.checking import CurlCffiChecker
checker = CurlCffiChecker(logger=Mock(), browser_emulate='chrome')
checker.prepare(
url='https://example.com/api/check',
headers={
"User-Agent": "Mozilla/5.0 random",
"Content-Type": "application/json",
},
allow_redirects=True,
timeout=10,
method='post',
payload={"username": "test"},
)
await checker.check()
sent = fake_curl_cffi.last_kwargs
assert fake_curl_cffi.last_method == 'post'
assert sent['json'] == {"username": "test"}
assert "User-Agent" not in sent['headers']
assert sent['headers'].get("Content-Type") == "application/json"