mirror of
https://github.com/soxoj/maigret.git
synced 2026-05-06 22:19:01 +00:00
Fix site checks: 5 fixed, 4 disabled; fix UA leak bug (#2569)
This commit is contained in:
@@ -307,3 +307,161 @@ def test_process_site_result_with_error_is_unknown():
|
||||
out = process_site_result(resp, Mock(), Mock(), info, site)
|
||||
assert out["status"].status == MaigretCheckStatus.UNKNOWN
|
||||
assert out["status"].error is not None
|
||||
|
||||
|
||||
# ---- CurlCffiChecker: TLS impersonation header sanitisation ----
|
||||
|
||||
|
||||
class _FakeCurlResponse:
|
||||
def __init__(self, text="ok", status_code=200):
|
||||
self.text = text
|
||||
self.status_code = status_code
|
||||
|
||||
|
||||
class _FakeCurlSession:
|
||||
"""Captures the kwargs of the last .get/.post/.head call for assertions."""
|
||||
|
||||
last_method = None
|
||||
last_kwargs = None
|
||||
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
async def get(self, **kwargs):
|
||||
type(self).last_method = 'get'
|
||||
type(self).last_kwargs = kwargs
|
||||
return _FakeCurlResponse()
|
||||
|
||||
async def post(self, **kwargs):
|
||||
type(self).last_method = 'post'
|
||||
type(self).last_kwargs = kwargs
|
||||
return _FakeCurlResponse()
|
||||
|
||||
async def head(self, **kwargs):
|
||||
type(self).last_method = 'head'
|
||||
type(self).last_kwargs = kwargs
|
||||
return _FakeCurlResponse()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fake_curl_cffi(monkeypatch):
|
||||
"""Replace CurlCffiAsyncSession with a recorder. Resets capture between tests."""
|
||||
from maigret import checking
|
||||
_FakeCurlSession.last_method = None
|
||||
_FakeCurlSession.last_kwargs = None
|
||||
monkeypatch.setattr(checking, 'CurlCffiAsyncSession', _FakeCurlSession)
|
||||
return _FakeCurlSession
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_curl_cffi_strips_random_user_agent_to_let_impersonation_drive_ua(fake_curl_cffi):
|
||||
"""Regression: maigret used to forward `get_random_user_agent()` (often Chrome 91)
|
||||
to curl_cffi alongside `impersonate="chrome"` (Chrome 131 TLS). Cloudflare composite
|
||||
bot scoring rejects the resulting "Chrome 91 UA + Chrome 131 TLS" combo with a JS
|
||||
challenge. The fix strips User-Agent and Connection from the headers passed to
|
||||
curl_cffi so the impersonation default UA wins.
|
||||
"""
|
||||
from maigret.checking import CurlCffiChecker
|
||||
|
||||
checker = CurlCffiChecker(logger=Mock(), browser_emulate='chrome')
|
||||
checker.prepare(
|
||||
url='https://example.com/u/test',
|
||||
headers={
|
||||
"User-Agent": "Mozilla/5.0 ... Chrome/91.0.4472.124 ...", # maigret default
|
||||
"Connection": "close", # maigret default
|
||||
},
|
||||
allow_redirects=True,
|
||||
timeout=10,
|
||||
method='get',
|
||||
)
|
||||
await checker.check()
|
||||
|
||||
sent = fake_curl_cffi.last_kwargs
|
||||
assert fake_curl_cffi.last_method == 'get'
|
||||
assert sent['impersonate'] == 'chrome'
|
||||
# The whole point of the fix: random UA must not leak through.
|
||||
assert sent['headers'] is None or 'User-Agent' not in sent['headers']
|
||||
assert sent['headers'] is None or 'user-agent' not in {k.lower() for k in sent['headers']}
|
||||
# Connection: close also stripped (interferes with impersonation defaults).
|
||||
assert sent['headers'] is None or 'Connection' not in sent['headers']
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_curl_cffi_preserves_site_specific_headers(fake_curl_cffi):
|
||||
"""Site-specific headers (e.g. Content-Type for POST APIs, auth tokens, cookies)
|
||||
must survive the User-Agent strip — only UA and Connection are removed.
|
||||
"""
|
||||
from maigret.checking import CurlCffiChecker
|
||||
|
||||
checker = CurlCffiChecker(logger=Mock(), browser_emulate='chrome')
|
||||
checker.prepare(
|
||||
url='https://example.com/api',
|
||||
headers={
|
||||
"User-Agent": "Mozilla/5.0 random",
|
||||
"Connection": "close",
|
||||
"Content-Type": "application/json",
|
||||
"X-Csrf-Token": "abc123",
|
||||
},
|
||||
allow_redirects=True,
|
||||
timeout=10,
|
||||
method='get',
|
||||
)
|
||||
await checker.check()
|
||||
|
||||
sent_headers = fake_curl_cffi.last_kwargs['headers']
|
||||
assert sent_headers is not None
|
||||
assert sent_headers.get("Content-Type") == "application/json"
|
||||
assert sent_headers.get("X-Csrf-Token") == "abc123"
|
||||
# Sanity: stripped pair is gone
|
||||
assert "User-Agent" not in sent_headers
|
||||
assert "Connection" not in sent_headers
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_curl_cffi_handles_empty_headers(fake_curl_cffi):
|
||||
"""No headers at all → headers kwarg is None (not an empty dict that could confuse
|
||||
curl_cffi's impersonation header injection)."""
|
||||
from maigret.checking import CurlCffiChecker
|
||||
|
||||
checker = CurlCffiChecker(logger=Mock(), browser_emulate='chrome')
|
||||
checker.prepare(
|
||||
url='https://example.com/u/test',
|
||||
headers=None,
|
||||
allow_redirects=True,
|
||||
timeout=10,
|
||||
method='get',
|
||||
)
|
||||
await checker.check()
|
||||
|
||||
assert fake_curl_cffi.last_kwargs['headers'] is None
|
||||
assert fake_curl_cffi.last_kwargs['impersonate'] == 'chrome'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_curl_cffi_strips_ua_for_post_too(fake_curl_cffi):
|
||||
"""The same UA-strip must apply on POST (e.g. Discord-style POST username probes
|
||||
with `tls_fingerprint`)."""
|
||||
from maigret.checking import CurlCffiChecker
|
||||
|
||||
checker = CurlCffiChecker(logger=Mock(), browser_emulate='chrome')
|
||||
checker.prepare(
|
||||
url='https://example.com/api/check',
|
||||
headers={
|
||||
"User-Agent": "Mozilla/5.0 random",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
allow_redirects=True,
|
||||
timeout=10,
|
||||
method='post',
|
||||
payload={"username": "test"},
|
||||
)
|
||||
await checker.check()
|
||||
|
||||
sent = fake_curl_cffi.last_kwargs
|
||||
assert fake_curl_cffi.last_method == 'post'
|
||||
assert sent['json'] == {"username": "test"}
|
||||
assert "User-Agent" not in sent['headers']
|
||||
assert sent['headers'].get("Content-Type") == "application/json"
|
||||
|
||||
Reference in New Issue
Block a user