Sites re-check (#2423)

This commit is contained in:
Soxoj
2026-03-27 22:41:55 +01:00
committed by GitHub
parent 184519b202
commit fa1a4d1b4a
6 changed files with 341 additions and 76 deletions
+67 -9
View File
@@ -26,6 +26,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
try:
import aiohttp
from yarl import URL as YarlURL
except ImportError:
print("aiohttp not installed. Run: pip install aiohttp")
sys.exit(1)
@@ -74,8 +75,14 @@ def color(text: str, c: str) -> str:
async def check_url_aiohttp(url: str, headers: dict = None, follow_redirects: bool = True,
timeout: int = 15, ssl_verify: bool = False) -> dict:
"""Check a URL using aiohttp and return detailed response info."""
timeout: int = 15, ssl_verify: bool = False,
method: str = "GET", payload: dict = None) -> dict:
"""Check a URL using aiohttp and return detailed response info.
Args:
method: HTTP method ("GET" or "POST").
payload: JSON payload for POST requests (dict, will be serialized).
"""
headers = headers or DEFAULT_HEADERS.copy()
result = {
"method": "aiohttp",
@@ -96,7 +103,14 @@ async def check_url_aiohttp(url: str, headers: dict = None, follow_redirects: bo
timeout_obj = aiohttp.ClientTimeout(total=timeout)
async with aiohttp.ClientSession(connector=connector, timeout=timeout_obj) as session:
async with session.get(url, headers=headers, allow_redirects=follow_redirects) as resp:
# Use encoded=True if URL contains percent-encoded chars to prevent double-encoding
request_url = YarlURL(url, encoded=True) if '%' in url else url
request_kwargs = dict(headers=headers, allow_redirects=follow_redirects)
if method.upper() == "POST" and payload is not None:
request_kwargs["json"] = payload
request_fn = session.post if method.upper() == "POST" else session.get
async with request_fn(request_url, **request_kwargs) as resp:
result["status"] = resp.status
result["final_url"] = str(resp.url)
@@ -438,21 +452,54 @@ async def diagnose_site(site_config: dict, site_name: str) -> dict:
print(f" {color('[!]', Colors.RED)} No usernameClaimed defined")
return diagnosis
# Build full URL
# Build full URL (display URL)
url_template = url.replace("{urlMain}", url_main).replace("{urlSubpath}", site_config.get("urlSubpath", ""))
# Build probe URL (what Maigret actually requests)
url_probe = site_config.get("urlProbe", "")
if url_probe:
probe_template = url_probe.replace("{urlMain}", url_main).replace("{urlSubpath}", site_config.get("urlSubpath", ""))
else:
probe_template = url_template
# Detect request method and payload
request_method = site_config.get("requestMethod", "GET").upper()
request_payload_template = site_config.get("requestPayload")
headers = DEFAULT_HEADERS.copy()
# For API probes (urlProbe, POST), use neutral Accept header instead of text/html
# which can cause servers to return HTML instead of JSON
if url_probe or request_method == "POST":
headers["Accept"] = "*/*"
if site_config.get("headers"):
headers.update(site_config["headers"])
if url_probe:
print(f" urlProbe: {url_probe}")
if request_method != "GET":
print(f" requestMethod: {request_method}")
if request_payload_template:
print(f" requestPayload: {request_payload_template}")
# 2. Connectivity test
print(f"\n--- {color('2. CONNECTIVITY TEST', Colors.BOLD)} ---")
url_claimed = url_template.replace("{username}", claimed)
url_unclaimed = url_template.replace("{username}", unclaimed)
probe_claimed = probe_template.replace("{username}", claimed)
probe_unclaimed = probe_template.replace("{username}", unclaimed)
# Build payloads with username substituted
payload_claimed = None
payload_unclaimed = None
if request_payload_template and request_method == "POST":
payload_claimed = json.loads(
json.dumps(request_payload_template).replace("{username}", claimed)
)
payload_unclaimed = json.loads(
json.dumps(request_payload_template).replace("{username}", unclaimed)
)
result_claimed, result_unclaimed = await asyncio.gather(
check_url_aiohttp(url_claimed, headers),
check_url_aiohttp(url_unclaimed, headers)
check_url_aiohttp(probe_claimed, headers, method=request_method, payload=payload_claimed),
check_url_aiohttp(probe_unclaimed, headers, method=request_method, payload=payload_unclaimed)
)
print(f" Claimed ({claimed}): status={result_claimed['status']}, error={result_claimed['error']}")
@@ -523,7 +570,18 @@ async def diagnose_site(site_config: dict, site_name: str) -> dict:
diagnosis["warnings"].append(f"absenceStrs not found in unclaimed page")
print(f" {color('[WARN]', Colors.YELLOW)} absenceStrs not found in unclaimed page")
if presense_found_claimed and not absence_found_claimed and absence_found_unclaimed:
# Check works if: claimed is detected as present AND unclaimed is detected as absent.
# Presence detection: presenseStrs found (or empty = always true).
# Absence detection: absenceStrs found in unclaimed (or empty = never, rely on presenseStrs only).
# With only presenseStrs: works if found in claimed but NOT in unclaimed.
# With only absenceStrs: works if found in unclaimed but NOT in claimed.
# With both: standard combination.
claimed_is_present = presense_found_claimed and not absence_found_claimed
unclaimed_is_absent = (
(absence_strs and absence_found_unclaimed) or
(presense_strs and not presense_found_unclaimed)
)
if claimed_is_present and unclaimed_is_absent:
print(f" {color('[OK]', Colors.GREEN)} Message check should work correctly")
diagnosis["working"] = True
+64
View File
@@ -4,6 +4,7 @@ This module generates the listing of supported sites in file `SITES.md`
and pretty prints file with sites data.
"""
import sys
import socket
import requests
import logging
import threading
@@ -64,6 +65,49 @@ def get_base_domain(url):
return ""
def check_dns(domain, timeout=5):
"""Check if a domain resolves via DNS. Returns True if it resolves."""
try:
socket.setdefaulttimeout(timeout)
socket.getaddrinfo(domain, None)
return True
except (socket.gaierror, socket.timeout, OSError):
return False
def check_sites_dns(sites):
"""Check DNS resolution for all sites. Returns a set of site names that failed."""
SKIP_TLDS = ('.onion', '.i2p')
domains = {}
for site in sites:
domain = get_base_domain(site.url_main)
if domain and not any(domain.endswith(tld) for tld in SKIP_TLDS):
domains.setdefault(domain, []).append(site)
failed_sites = set()
results = {}
def resolve(domain):
results[domain] = check_dns(domain)
threads = []
for domain in domains:
t = threading.Thread(target=resolve, args=(domain,))
threads.append(t)
t.start()
for t in threads:
t.join()
for domain, resolved in results.items():
if not resolved:
for site in domains[domain]:
failed_sites.add(site.name)
logging.warning(f"DNS resolution failed for {domain}")
return failed_sites
def get_step_rank(rank):
def get_readable_rank(r):
return RANKS[str(r)]
@@ -86,6 +130,8 @@ def main():
parser.add_argument('--empty-only', help='update only sites without rating', action='store_true')
parser.add_argument('--exclude-engine', help='do not update score with certain engine',
action="append", dest="exclude_engine_list", default=[])
parser.add_argument('--dns-check', help='disable sites whose domains do not resolve via DNS',
action='store_true')
pool = list()
@@ -103,6 +149,24 @@ Rank data fetched from Majestic Million by domains.
""")
if args.dns_check:
print("Checking DNS resolution for all site domains...")
failed = check_sites_dns(sites_subset)
disabled_count = 0
re_enabled_count = 0
for site in sites_subset:
if site.name in failed:
if not site.disabled:
site.disabled = True
disabled_count += 1
print(f" Disabled {site.name}: DNS does not resolve ({get_base_domain(site.url_main)})")
else:
if site.disabled:
# Re-enable previously disabled site if DNS now resolves
# (only if it was likely disabled due to DNS failure)
pass
print(f"DNS check complete: {disabled_count} site(s) disabled, {len(failed)} domain(s) unresolvable.")
majestic_ranks = {}
if args.with_rank:
majestic_ranks = fetch_majestic_million()