mirror of
https://github.com/soxoj/maigret.git
synced 2026-05-07 06:24:35 +00:00
feat(virgool): add POST support and use user-existence API to bypass JS cookies
Co-authored-by: soxoj <31013580+soxoj@users.noreply.github.com> Agent-Logs-Url: https://github.com/soxoj/maigret/sessions/e7f4ab84-917a-49fc-bfbd-9bbaf76027f8
This commit is contained in:
+32
-7
@@ -63,28 +63,39 @@ class SimpleAiohttpChecker(CheckerBase):
|
|||||||
self.timeout = 0
|
self.timeout = 0
|
||||||
self.method = 'get'
|
self.method = 'get'
|
||||||
|
|
||||||
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get'):
|
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get', json_body=None):
|
||||||
self.url = url
|
self.url = url
|
||||||
self.headers = headers
|
self.headers = headers
|
||||||
self.allow_redirects = allow_redirects
|
self.allow_redirects = allow_redirects
|
||||||
self.timeout = timeout
|
self.timeout = timeout
|
||||||
self.method = method
|
self.method = method
|
||||||
|
self.json_body = json_body
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def close(self):
|
async def close(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
async def _make_request(
|
async def _make_request(
|
||||||
self, session, url, headers, allow_redirects, timeout, method, logger
|
self, session, url, headers, allow_redirects, timeout, method, logger, json_body=None
|
||||||
) -> Tuple[str, int, Optional[CheckError]]:
|
) -> Tuple[str, int, Optional[CheckError]]:
|
||||||
try:
|
try:
|
||||||
request_method = session.get if method == 'get' else session.head
|
if method == 'post':
|
||||||
async with request_method(
|
request_method = session.post
|
||||||
|
elif method == 'head':
|
||||||
|
request_method = session.head
|
||||||
|
else:
|
||||||
|
request_method = session.get
|
||||||
|
|
||||||
|
kwargs = dict(
|
||||||
url=url,
|
url=url,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
allow_redirects=allow_redirects,
|
allow_redirects=allow_redirects,
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
) as response:
|
)
|
||||||
|
if method == 'post' and json_body is not None:
|
||||||
|
kwargs['json'] = json_body
|
||||||
|
|
||||||
|
async with request_method(**kwargs) as response:
|
||||||
status_code = response.status
|
status_code = response.status
|
||||||
response_content = await response.content.read()
|
response_content = await response.content.read()
|
||||||
charset = response.charset or "utf-8"
|
charset = response.charset or "utf-8"
|
||||||
@@ -141,6 +152,7 @@ class SimpleAiohttpChecker(CheckerBase):
|
|||||||
self.timeout,
|
self.timeout,
|
||||||
self.method,
|
self.method,
|
||||||
self.logger,
|
self.logger,
|
||||||
|
json_body=getattr(self, 'json_body', None),
|
||||||
)
|
)
|
||||||
|
|
||||||
if error and str(error) == "Invalid proxy response":
|
if error and str(error) == "Invalid proxy response":
|
||||||
@@ -165,7 +177,7 @@ class AiodnsDomainResolver(CheckerBase):
|
|||||||
self.logger = kwargs.get('logger', Mock())
|
self.logger = kwargs.get('logger', Mock())
|
||||||
self.resolver = aiodns.DNSResolver(loop=loop)
|
self.resolver = aiodns.DNSResolver(loop=loop)
|
||||||
|
|
||||||
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get'):
|
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get', json_body=None):
|
||||||
self.url = url
|
self.url = url
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@@ -494,7 +506,10 @@ def make_site_result(
|
|||||||
for k, v in site.get_params.items():
|
for k, v in site.get_params.items():
|
||||||
url_probe += f"&{k}={v}"
|
url_probe += f"&{k}={v}"
|
||||||
|
|
||||||
if site.check_type == "status_code" and site.request_head_only:
|
if site.request_method and site.request_method.lower() == 'post':
|
||||||
|
# Site explicitly requests POST method
|
||||||
|
request_method = 'post'
|
||||||
|
elif site.check_type == "status_code" and site.request_head_only:
|
||||||
# In most cases when we are detecting by status code,
|
# In most cases when we are detecting by status code,
|
||||||
# it is not necessary to get the entire body: we can
|
# it is not necessary to get the entire body: we can
|
||||||
# detect fine with just the HEAD response.
|
# detect fine with just the HEAD response.
|
||||||
@@ -505,6 +520,14 @@ def make_site_result(
|
|||||||
# not respond properly unless we request the whole page.
|
# not respond properly unless we request the whole page.
|
||||||
request_method = 'get'
|
request_method = 'get'
|
||||||
|
|
||||||
|
# Build JSON payload for POST requests by substituting {username}
|
||||||
|
json_body = None
|
||||||
|
if request_method == 'post' and site.request_payload:
|
||||||
|
import json as json_module
|
||||||
|
payload_str = json_module.dumps(site.request_payload)
|
||||||
|
payload_str = payload_str.replace('{username}', username)
|
||||||
|
json_body = json_module.loads(payload_str)
|
||||||
|
|
||||||
if site.check_type == "response_url":
|
if site.check_type == "response_url":
|
||||||
# Site forwards request to a different URL if username not
|
# Site forwards request to a different URL if username not
|
||||||
# found. Disallow the redirect so we can capture the
|
# found. Disallow the redirect so we can capture the
|
||||||
@@ -521,6 +544,7 @@ def make_site_result(
|
|||||||
headers=headers,
|
headers=headers,
|
||||||
allow_redirects=allow_redirects,
|
allow_redirects=allow_redirects,
|
||||||
timeout=options['timeout'],
|
timeout=options['timeout'],
|
||||||
|
json_body=json_body,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Store future request object in the results object
|
# Store future request object in the results object
|
||||||
@@ -577,6 +601,7 @@ async def check_site_for_username(
|
|||||||
allow_redirects=checker.allow_redirects,
|
allow_redirects=checker.allow_redirects,
|
||||||
timeout=checker.timeout,
|
timeout=checker.timeout,
|
||||||
method=checker.method,
|
method=checker.method,
|
||||||
|
json_body=getattr(checker, 'json_body', None),
|
||||||
)
|
)
|
||||||
response = await checker.check()
|
response = await checker.check()
|
||||||
|
|
||||||
|
|||||||
@@ -17676,24 +17676,31 @@
|
|||||||
"usernameUnclaimed": "smbepezbrg"
|
"usernameUnclaimed": "smbepezbrg"
|
||||||
},
|
},
|
||||||
"Virgool": {
|
"Virgool": {
|
||||||
"disabled": true,
|
|
||||||
"tags": [
|
"tags": [
|
||||||
"blog",
|
"blog",
|
||||||
"ir"
|
"ir"
|
||||||
],
|
],
|
||||||
"checkType": "message",
|
"checkType": "message",
|
||||||
"presenseStrs": [
|
"presenseStrs": [
|
||||||
"\"bio\""
|
"\"user_exist\":true",
|
||||||
|
"\"user_exist\": true"
|
||||||
],
|
],
|
||||||
"absenceStrs": [
|
"absenceStrs": [
|
||||||
"\u06f4\u06f0\u06f4"
|
"\u06a9\u0627\u0631\u0628\u0631\u06cc \u0628\u0627 \u0627\u06cc\u0646 \u0645\u0634\u062e\u0635\u0627\u062a \u06cc\u0627\u0641\u062a \u0646\u0634\u062f"
|
||||||
],
|
],
|
||||||
"errors": {
|
|
||||||
"<noscript>": "JS-generated cookies required"
|
|
||||||
},
|
|
||||||
"alexaRank": 1457,
|
"alexaRank": 1457,
|
||||||
"urlMain": "https://virgool.io/",
|
"urlMain": "https://virgool.io/",
|
||||||
"url": "https://virgool.io/@{username}",
|
"url": "https://virgool.io/@{username}",
|
||||||
|
"urlProbe": "https://virgool.io/api/v1.4/auth/user-existence",
|
||||||
|
"requestMethod": "post",
|
||||||
|
"requestPayload": {
|
||||||
|
"username": "{username}",
|
||||||
|
"type": "login",
|
||||||
|
"method": "username"
|
||||||
|
},
|
||||||
|
"headers": {
|
||||||
|
"Content-Type": "application/json"
|
||||||
|
},
|
||||||
"usernameClaimed": "blue",
|
"usernameClaimed": "blue",
|
||||||
"usernameUnclaimed": "noonewouldeverusethis7"
|
"usernameUnclaimed": "noonewouldeverusethis7"
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -67,6 +67,10 @@ class MaigretSite:
|
|||||||
check_type = ""
|
check_type = ""
|
||||||
# Whether to only send HEAD requests (GET by default)
|
# Whether to only send HEAD requests (GET by default)
|
||||||
request_head_only = ""
|
request_head_only = ""
|
||||||
|
# HTTP method override ("post" to use POST requests)
|
||||||
|
request_method = ""
|
||||||
|
# JSON payload template for POST requests (supports {username} placeholder)
|
||||||
|
request_payload: Dict[str, Any] = {}
|
||||||
# GET parameters to include in requests
|
# GET parameters to include in requests
|
||||||
get_params: Dict[str, Any] = {}
|
get_params: Dict[str, Any] = {}
|
||||||
|
|
||||||
@@ -138,6 +142,8 @@ class MaigretSite:
|
|||||||
'url_probe',
|
'url_probe',
|
||||||
'check_type',
|
'check_type',
|
||||||
'request_head_only',
|
'request_head_only',
|
||||||
|
'request_method',
|
||||||
|
'request_payload',
|
||||||
'get_params',
|
'get_params',
|
||||||
'presense_strs',
|
'presense_strs',
|
||||||
'absence_strs',
|
'absence_strs',
|
||||||
|
|||||||
@@ -16,6 +16,21 @@
|
|||||||
"absenseStrs": ["not found", "404"],
|
"absenseStrs": ["not found", "404"],
|
||||||
"usernameClaimed": "claimed",
|
"usernameClaimed": "claimed",
|
||||||
"usernameUnclaimed": "unclaimed"
|
"usernameUnclaimed": "unclaimed"
|
||||||
|
},
|
||||||
|
"PostMessage": {
|
||||||
|
"checkType": "message",
|
||||||
|
"url": "http://localhost:8989/profile?id={username}",
|
||||||
|
"urlMain": "http://localhost:8989/",
|
||||||
|
"urlProbe": "http://localhost:8989/api/check",
|
||||||
|
"requestMethod": "post",
|
||||||
|
"requestPayload": {
|
||||||
|
"username": "{username}",
|
||||||
|
"type": "lookup"
|
||||||
|
},
|
||||||
|
"presenseStrs": ["\"exists\":true", "\"exists\": true"],
|
||||||
|
"absenseStrs": ["not found"],
|
||||||
|
"usernameClaimed": "claimed",
|
||||||
|
"usernameUnclaimed": "unclaimed"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -67,3 +67,35 @@ async def test_checking_by_message_negative(httpserver, local_test_db):
|
|||||||
|
|
||||||
result = await search('unclaimed', site_dict=sites_dict, logger=Mock())
|
result = await search('unclaimed', site_dict=sites_dict, logger=Mock())
|
||||||
assert result['Message']['status'].is_found() is True
|
assert result['Message']['status'].is_found() is True
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.slow
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_checking_by_post_message(httpserver, local_test_db):
|
||||||
|
sites_dict = local_test_db.sites_dict
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
|
# Existing user: API responds with {"exists": true}
|
||||||
|
httpserver.expect_request(
|
||||||
|
'/api/check',
|
||||||
|
method='POST',
|
||||||
|
json={"username": "claimed", "type": "lookup"},
|
||||||
|
).respond_with_data(
|
||||||
|
json.dumps({"exists": True}), content_type="application/json"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Non-existing user: API responds with {"msg": "not found"}
|
||||||
|
httpserver.expect_request(
|
||||||
|
'/api/check',
|
||||||
|
method='POST',
|
||||||
|
json={"username": "unclaimed", "type": "lookup"},
|
||||||
|
).respond_with_data(
|
||||||
|
json.dumps({"msg": "not found"}), content_type="application/json"
|
||||||
|
)
|
||||||
|
|
||||||
|
result = await search('claimed', site_dict=sites_dict, logger=Mock())
|
||||||
|
assert result['PostMessage']['status'].is_found() is True
|
||||||
|
|
||||||
|
result = await search('unclaimed', site_dict=sites_dict, logger=Mock())
|
||||||
|
assert result['PostMessage']['status'].is_found() is False
|
||||||
|
|||||||
Reference in New Issue
Block a user