Files
maigret/utils/check_top_n.py
T
Soxoj 97cc4b46d9 Improve site-check quality: fix broken site configs, add diagnostic utilities, and make self-check report-only by default with opt-in auto-disable. (#2301)
- Fix VK and TradingView checkType; add Reddit and Microsoft Learn API-style probes where appropriate; adjust or disable entries that are unreliable under anti-bot protection.
- Self-check: stop aggressive auto-disable; default to reporting issues only; add --auto-disable and --diagnose for optional fixes and deeper output.
- Tooling: add utils/site_check.py and utils/check_top_n.py (and related helpers) to inspect and rank site behavior against the top-N list
- Scope: aligns with fixing top-traffic / high-impact sites and making diagnostics repeatable without silently flipping disabled flags
2026-03-22 16:48:35 +01:00

481 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Mass site checking utility for Maigret development.
Check top-N sites from data.json and generate a report.
Usage:
python utils/check_top_n.py --top 100 # Check top 100 sites
python utils/check_top_n.py --top 50 --parallel 10 # Check with 10 parallel requests
python utils/check_top_n.py --top 100 --output report.json
python utils/check_top_n.py --top 100 --fix # Auto-fix simple issues
"""
import argparse
import asyncio
import json
import sys
import time
from collections import defaultdict
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# Add parent dir for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
import aiohttp
except ImportError:
print("aiohttp not installed. Run: pip install aiohttp")
sys.exit(1)
class Colors:
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
CYAN = "\033[96m"
RESET = "\033[0m"
BOLD = "\033[1m"
def color(text: str, c: str) -> str:
return f"{c}{text}{Colors.RESET}"
@dataclass
class SiteCheckResult:
"""Result of checking a single site."""
site_name: str
alexa_rank: int
disabled: bool
check_type: str
# Status
status: str = "unknown" # working, broken, timeout, error, anti_bot, disabled
# HTTP results
claimed_http_status: Optional[int] = None
unclaimed_http_status: Optional[int] = None
claimed_error: Optional[str] = None
unclaimed_error: Optional[str] = None
# Issues detected
issues: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
# Recommendations
recommendations: List[str] = field(default_factory=list)
# Timing
check_time_ms: int = 0
DEFAULT_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
}
async def check_url(url: str, headers: dict, timeout: int = 15) -> dict:
"""Quick URL check returning status and basic info."""
result = {
"status": None,
"final_url": None,
"content_length": 0,
"error": None,
"error_type": None,
"content": None,
"markers": {},
}
try:
connector = aiohttp.TCPConnector(ssl=False)
timeout_obj = aiohttp.ClientTimeout(total=timeout)
async with aiohttp.ClientSession(connector=connector, timeout=timeout_obj) as session:
async with session.get(url, headers=headers, allow_redirects=True) as resp:
result["status"] = resp.status
result["final_url"] = str(resp.url)
try:
text = await resp.text()
result["content_length"] = len(text)
result["content"] = text
text_lower = text.lower()
result["markers"] = {
"404_text": any(m in text_lower for m in ["not found", "404", "doesn't exist"]),
"captcha": any(m in text_lower for m in ["captcha", "recaptcha", "challenge"]),
"cloudflare": "cloudflare" in text_lower,
"login": any(m in text_lower for m in ["log in", "login", "sign in"]),
}
except Exception as e:
result["error"] = f"Content error: {e}"
result["error_type"] = "content"
except asyncio.TimeoutError:
result["error"] = "Timeout"
result["error_type"] = "timeout"
except aiohttp.ClientError as e:
result["error"] = str(e)
result["error_type"] = "client"
except Exception as e:
result["error"] = str(e)
result["error_type"] = "unknown"
return result
async def check_site(site_name: str, config: dict, timeout: int = 15) -> SiteCheckResult:
"""Check a single site and return detailed result."""
start_time = time.time()
result = SiteCheckResult(
site_name=site_name,
alexa_rank=config.get("alexaRank", 999999),
disabled=config.get("disabled", False),
check_type=config.get("checkType", "status_code"),
)
# Skip disabled sites
if result.disabled:
result.status = "disabled"
return result
# Build URL
url_template = config.get("url", "")
url_main = config.get("urlMain", "")
url_subpath = config.get("urlSubpath", "")
url_template = url_template.replace("{urlMain}", url_main).replace("{urlSubpath}", url_subpath)
claimed = config.get("usernameClaimed")
unclaimed = config.get("usernameUnclaimed", "noonewouldeverusethis7")
if not claimed:
result.status = "error"
result.issues.append("No usernameClaimed defined")
return result
# Prepare headers
headers = DEFAULT_HEADERS.copy()
if config.get("headers"):
headers.update(config["headers"])
# Check both URLs
url_claimed = url_template.replace("{username}", claimed)
url_unclaimed = url_template.replace("{username}", unclaimed)
try:
claimed_result, unclaimed_result = await asyncio.gather(
check_url(url_claimed, headers, timeout),
check_url(url_unclaimed, headers, timeout),
)
except Exception as e:
result.status = "error"
result.issues.append(f"Check failed: {e}")
return result
result.claimed_http_status = claimed_result["status"]
result.unclaimed_http_status = unclaimed_result["status"]
result.claimed_error = claimed_result.get("error")
result.unclaimed_error = unclaimed_result.get("error")
# Categorize result
if claimed_result["error_type"] == "timeout" or unclaimed_result["error_type"] == "timeout":
result.status = "timeout"
result.issues.append("Request timeout")
elif claimed_result["status"] == 403 or claimed_result["status"] == 429:
result.status = "anti_bot"
result.issues.append(f"Anti-bot protection (HTTP {claimed_result['status']})")
elif claimed_result.get("markers", {}).get("captcha"):
result.status = "anti_bot"
result.issues.append("Captcha detected")
elif claimed_result.get("markers", {}).get("cloudflare"):
result.status = "anti_bot"
result.warnings.append("Cloudflare protection detected")
elif claimed_result["error"] or unclaimed_result["error"]:
result.status = "error"
if claimed_result["error"]:
result.issues.append(f"Claimed error: {claimed_result['error']}")
if unclaimed_result["error"]:
result.issues.append(f"Unclaimed error: {unclaimed_result['error']}")
else:
# Validate check type
check_type = config.get("checkType", "status_code")
if check_type == "status_code":
if claimed_result["status"] == unclaimed_result["status"]:
result.status = "broken"
result.issues.append(f"Same status code ({claimed_result['status']}) for both")
# Suggest fix
if claimed_result["final_url"] != unclaimed_result["final_url"]:
result.recommendations.append("Switch to checkType: response_url")
else:
result.status = "working"
elif check_type == "response_url":
if claimed_result["final_url"] == unclaimed_result["final_url"]:
result.status = "broken"
result.issues.append("Same final URL for both")
if claimed_result["status"] != unclaimed_result["status"]:
result.recommendations.append("Switch to checkType: status_code")
else:
result.status = "working"
elif check_type == "message":
presense_strs = config.get("presenseStrs", [])
absence_strs = config.get("absenceStrs", [])
claimed_content = claimed_result.get("content", "") or ""
unclaimed_content = unclaimed_result.get("content", "") or ""
presense_ok = not presense_strs or any(s in claimed_content for s in presense_strs)
absence_claimed = absence_strs and any(s in claimed_content for s in absence_strs)
absence_unclaimed = absence_strs and any(s in unclaimed_content for s in absence_strs)
if presense_strs and not presense_ok:
result.status = "broken"
result.issues.append(f"presenseStrs not found: {presense_strs}")
# Check if status_code would work
if claimed_result["status"] != unclaimed_result["status"]:
result.recommendations.append(f"Switch to checkType: status_code ({claimed_result['status']} vs {unclaimed_result['status']})")
elif absence_claimed:
result.status = "broken"
result.issues.append(f"absenceStrs found in claimed page")
elif absence_strs and not absence_unclaimed:
result.status = "broken"
result.warnings.append("absenceStrs not found in unclaimed page")
else:
result.status = "working"
else:
result.status = "unknown"
result.warnings.append(f"Unknown checkType: {check_type}")
result.check_time_ms = int((time.time() - start_time) * 1000)
return result
def load_sites(db_path: Path) -> Dict[str, dict]:
"""Load all sites from data.json."""
with open(db_path) as f:
data = json.load(f)
return data.get("sites", {})
def get_top_sites(sites: Dict[str, dict], n: int) -> List[Tuple[str, dict]]:
"""Get top N sites by Alexa rank."""
ranked = []
for name, config in sites.items():
rank = config.get("alexaRank", 999999)
ranked.append((name, config, rank))
ranked.sort(key=lambda x: x[2])
return [(name, config) for name, config, _ in ranked[:n]]
async def check_sites_batch(sites: List[Tuple[str, dict]], parallel: int = 5,
timeout: int = 15, progress_callback=None) -> List[SiteCheckResult]:
"""Check multiple sites with parallelism control."""
results = []
semaphore = asyncio.Semaphore(parallel)
async def check_with_semaphore(name, config, index):
async with semaphore:
if progress_callback:
progress_callback(index, len(sites), name)
return await check_site(name, config, timeout)
tasks = [
check_with_semaphore(name, config, i)
for i, (name, config) in enumerate(sites)
]
results = await asyncio.gather(*tasks)
return results
def print_progress(current: int, total: int, site_name: str):
"""Print progress indicator."""
pct = int(current / total * 100)
bar_width = 30
filled = int(bar_width * current / total)
bar = "" * filled + "" * (bar_width - filled)
print(f"\r[{bar}] {pct:3d}% ({current}/{total}) {site_name:<30}", end="", flush=True)
def generate_report(results: List[SiteCheckResult]) -> dict:
"""Generate a summary report from check results."""
report = {
"summary": {
"total": len(results),
"working": 0,
"broken": 0,
"disabled": 0,
"timeout": 0,
"anti_bot": 0,
"error": 0,
"unknown": 0,
},
"by_status": defaultdict(list),
"issues": [],
"recommendations": [],
}
for r in results:
report["summary"][r.status] = report["summary"].get(r.status, 0) + 1
report["by_status"][r.status].append(r.site_name)
if r.issues:
report["issues"].append({
"site": r.site_name,
"rank": r.alexa_rank,
"issues": r.issues,
})
if r.recommendations:
report["recommendations"].append({
"site": r.site_name,
"rank": r.alexa_rank,
"recommendations": r.recommendations,
})
return report
def print_report(report: dict, results: List[SiteCheckResult]):
"""Print a formatted report to console."""
summary = report["summary"]
print(f"\n{'='*60}")
print(f"{color('SITE CHECK REPORT', Colors.CYAN)}")
print(f"{'='*60}\n")
print(f"{color('SUMMARY:', Colors.BOLD)}")
print(f" Total sites checked: {summary['total']}")
print(f" {color('Working:', Colors.GREEN)} {summary['working']}")
print(f" {color('Broken:', Colors.RED)} {summary['broken']}")
print(f" {color('Disabled:', Colors.YELLOW)} {summary['disabled']}")
print(f" {color('Timeout:', Colors.YELLOW)} {summary['timeout']}")
print(f" {color('Anti-bot:', Colors.YELLOW)} {summary['anti_bot']}")
print(f" {color('Error:', Colors.RED)} {summary['error']}")
# Broken sites
if report["by_status"]["broken"]:
print(f"\n{color('BROKEN SITES:', Colors.RED)}")
for site in report["by_status"]["broken"][:20]:
r = next(x for x in results if x.site_name == site)
print(f" - {site} (rank {r.alexa_rank}): {', '.join(r.issues)}")
if len(report["by_status"]["broken"]) > 20:
print(f" ... and {len(report['by_status']['broken']) - 20} more")
# Timeout sites
if report["by_status"]["timeout"]:
print(f"\n{color('TIMEOUT SITES:', Colors.YELLOW)}")
for site in report["by_status"]["timeout"][:10]:
print(f" - {site}")
if len(report["by_status"]["timeout"]) > 10:
print(f" ... and {len(report['by_status']['timeout']) - 10} more")
# Anti-bot sites
if report["by_status"]["anti_bot"]:
print(f"\n{color('ANTI-BOT PROTECTED:', Colors.YELLOW)}")
for site in report["by_status"]["anti_bot"][:10]:
r = next(x for x in results if x.site_name == site)
print(f" - {site}: {', '.join(r.issues)}")
if len(report["by_status"]["anti_bot"]) > 10:
print(f" ... and {len(report['by_status']['anti_bot']) - 10} more")
# Recommendations
if report["recommendations"]:
print(f"\n{color('RECOMMENDATIONS:', Colors.CYAN)}")
for rec in report["recommendations"][:15]:
print(f" {rec['site']} (rank {rec['rank']}):")
for r in rec["recommendations"]:
print(f" -> {r}")
if len(report["recommendations"]) > 15:
print(f" ... and {len(report['recommendations']) - 15} more")
async def main():
parser = argparse.ArgumentParser(
description="Mass site checking for Maigret",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--top", "-n", type=int, default=100,
help="Check top N sites by Alexa rank (default: 100)")
parser.add_argument("--parallel", "-p", type=int, default=5,
help="Number of parallel requests (default: 5)")
parser.add_argument("--timeout", "-t", type=int, default=15,
help="Request timeout in seconds (default: 15)")
parser.add_argument("--output", "-o", help="Output JSON report to file")
parser.add_argument("--include-disabled", action="store_true",
help="Include disabled sites in results")
parser.add_argument("--only-broken", action="store_true",
help="Only show broken sites")
parser.add_argument("--json", action="store_true",
help="Output as JSON only")
args = parser.parse_args()
# Load sites
db_path = Path(__file__).parent.parent / "maigret" / "resources" / "data.json"
if not db_path.exists():
print(f"Database not found: {db_path}")
sys.exit(1)
sites = load_sites(db_path)
top_sites = get_top_sites(sites, args.top)
if not args.json:
print(f"Checking top {len(top_sites)} sites (parallel={args.parallel}, timeout={args.timeout}s)...")
print()
# Run checks
progress = print_progress if not args.json else None
results = await check_sites_batch(top_sites, args.parallel, args.timeout, progress)
if not args.json:
print() # Clear progress line
# Filter results
if not args.include_disabled:
results = [r for r in results if r.status != "disabled"]
if args.only_broken:
results = [r for r in results if r.status in ("broken", "error", "timeout")]
# Generate report
report = generate_report(results)
# Output
if args.json:
output = {
"report": report,
"results": [asdict(r) for r in results],
}
print(json.dumps(output, indent=2))
else:
print_report(report, results)
# Save to file
if args.output:
output = {
"report": report,
"results": [asdict(r) for r in results],
}
with open(args.output, "w") as f:
json.dump(output, f, indent=2)
print(f"\nReport saved to: {args.output}")
if __name__ == "__main__":
asyncio.run(main())