mirror of
https://github.com/soxoj/maigret.git
synced 2026-05-06 22:19:01 +00:00
c9ab9d676b
- Fix VK and TradingView checkType; add Reddit and Microsoft Learn API-style probes where appropriate; adjust or disable entries that are unreliable under anti-bot protection. - Self-check: stop aggressive auto-disable; default to reporting issues only; add --auto-disable and --diagnose for optional fixes and deeper output. - Tooling: add utils/site_check.py and utils/check_top_n.py (and related helpers) to inspect and rank site behavior against the top-N list - Scope: aligns with fixing top-traffic / high-impact sites and making diagnostics repeatable without silently flipping disabled flags
481 lines
17 KiB
Python
481 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Mass site checking utility for Maigret development.
|
|
Check top-N sites from data.json and generate a report.
|
|
|
|
Usage:
|
|
python utils/check_top_n.py --top 100 # Check top 100 sites
|
|
python utils/check_top_n.py --top 50 --parallel 10 # Check with 10 parallel requests
|
|
python utils/check_top_n.py --top 100 --output report.json
|
|
python utils/check_top_n.py --top 100 --fix # Auto-fix simple issues
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import sys
|
|
import time
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass, field, asdict
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
# Add parent dir for imports
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
print("aiohttp not installed. Run: pip install aiohttp")
|
|
sys.exit(1)
|
|
|
|
|
|
class Colors:
|
|
RED = "\033[91m"
|
|
GREEN = "\033[92m"
|
|
YELLOW = "\033[93m"
|
|
BLUE = "\033[94m"
|
|
CYAN = "\033[96m"
|
|
RESET = "\033[0m"
|
|
BOLD = "\033[1m"
|
|
|
|
|
|
def color(text: str, c: str) -> str:
|
|
return f"{c}{text}{Colors.RESET}"
|
|
|
|
|
|
@dataclass
|
|
class SiteCheckResult:
|
|
"""Result of checking a single site."""
|
|
site_name: str
|
|
alexa_rank: int
|
|
disabled: bool
|
|
check_type: str
|
|
|
|
# Status
|
|
status: str = "unknown" # working, broken, timeout, error, anti_bot, disabled
|
|
|
|
# HTTP results
|
|
claimed_http_status: Optional[int] = None
|
|
unclaimed_http_status: Optional[int] = None
|
|
claimed_error: Optional[str] = None
|
|
unclaimed_error: Optional[str] = None
|
|
|
|
# Issues detected
|
|
issues: List[str] = field(default_factory=list)
|
|
warnings: List[str] = field(default_factory=list)
|
|
|
|
# Recommendations
|
|
recommendations: List[str] = field(default_factory=list)
|
|
|
|
# Timing
|
|
check_time_ms: int = 0
|
|
|
|
|
|
DEFAULT_HEADERS = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
"Accept-Language": "en-US,en;q=0.5",
|
|
}
|
|
|
|
|
|
async def check_url(url: str, headers: dict, timeout: int = 15) -> dict:
|
|
"""Quick URL check returning status and basic info."""
|
|
result = {
|
|
"status": None,
|
|
"final_url": None,
|
|
"content_length": 0,
|
|
"error": None,
|
|
"error_type": None,
|
|
"content": None,
|
|
"markers": {},
|
|
}
|
|
|
|
try:
|
|
connector = aiohttp.TCPConnector(ssl=False)
|
|
timeout_obj = aiohttp.ClientTimeout(total=timeout)
|
|
|
|
async with aiohttp.ClientSession(connector=connector, timeout=timeout_obj) as session:
|
|
async with session.get(url, headers=headers, allow_redirects=True) as resp:
|
|
result["status"] = resp.status
|
|
result["final_url"] = str(resp.url)
|
|
|
|
try:
|
|
text = await resp.text()
|
|
result["content_length"] = len(text)
|
|
result["content"] = text
|
|
|
|
text_lower = text.lower()
|
|
result["markers"] = {
|
|
"404_text": any(m in text_lower for m in ["not found", "404", "doesn't exist"]),
|
|
"captcha": any(m in text_lower for m in ["captcha", "recaptcha", "challenge"]),
|
|
"cloudflare": "cloudflare" in text_lower,
|
|
"login": any(m in text_lower for m in ["log in", "login", "sign in"]),
|
|
}
|
|
except Exception as e:
|
|
result["error"] = f"Content error: {e}"
|
|
result["error_type"] = "content"
|
|
|
|
except asyncio.TimeoutError:
|
|
result["error"] = "Timeout"
|
|
result["error_type"] = "timeout"
|
|
except aiohttp.ClientError as e:
|
|
result["error"] = str(e)
|
|
result["error_type"] = "client"
|
|
except Exception as e:
|
|
result["error"] = str(e)
|
|
result["error_type"] = "unknown"
|
|
|
|
return result
|
|
|
|
|
|
async def check_site(site_name: str, config: dict, timeout: int = 15) -> SiteCheckResult:
|
|
"""Check a single site and return detailed result."""
|
|
start_time = time.time()
|
|
|
|
result = SiteCheckResult(
|
|
site_name=site_name,
|
|
alexa_rank=config.get("alexaRank", 999999),
|
|
disabled=config.get("disabled", False),
|
|
check_type=config.get("checkType", "status_code"),
|
|
)
|
|
|
|
# Skip disabled sites
|
|
if result.disabled:
|
|
result.status = "disabled"
|
|
return result
|
|
|
|
# Build URL
|
|
url_template = config.get("url", "")
|
|
url_main = config.get("urlMain", "")
|
|
url_subpath = config.get("urlSubpath", "")
|
|
url_template = url_template.replace("{urlMain}", url_main).replace("{urlSubpath}", url_subpath)
|
|
|
|
claimed = config.get("usernameClaimed")
|
|
unclaimed = config.get("usernameUnclaimed", "noonewouldeverusethis7")
|
|
|
|
if not claimed:
|
|
result.status = "error"
|
|
result.issues.append("No usernameClaimed defined")
|
|
return result
|
|
|
|
# Prepare headers
|
|
headers = DEFAULT_HEADERS.copy()
|
|
if config.get("headers"):
|
|
headers.update(config["headers"])
|
|
|
|
# Check both URLs
|
|
url_claimed = url_template.replace("{username}", claimed)
|
|
url_unclaimed = url_template.replace("{username}", unclaimed)
|
|
|
|
try:
|
|
claimed_result, unclaimed_result = await asyncio.gather(
|
|
check_url(url_claimed, headers, timeout),
|
|
check_url(url_unclaimed, headers, timeout),
|
|
)
|
|
except Exception as e:
|
|
result.status = "error"
|
|
result.issues.append(f"Check failed: {e}")
|
|
return result
|
|
|
|
result.claimed_http_status = claimed_result["status"]
|
|
result.unclaimed_http_status = unclaimed_result["status"]
|
|
result.claimed_error = claimed_result.get("error")
|
|
result.unclaimed_error = unclaimed_result.get("error")
|
|
|
|
# Categorize result
|
|
if claimed_result["error_type"] == "timeout" or unclaimed_result["error_type"] == "timeout":
|
|
result.status = "timeout"
|
|
result.issues.append("Request timeout")
|
|
|
|
elif claimed_result["status"] == 403 or claimed_result["status"] == 429:
|
|
result.status = "anti_bot"
|
|
result.issues.append(f"Anti-bot protection (HTTP {claimed_result['status']})")
|
|
|
|
elif claimed_result.get("markers", {}).get("captcha"):
|
|
result.status = "anti_bot"
|
|
result.issues.append("Captcha detected")
|
|
|
|
elif claimed_result.get("markers", {}).get("cloudflare"):
|
|
result.status = "anti_bot"
|
|
result.warnings.append("Cloudflare protection detected")
|
|
|
|
elif claimed_result["error"] or unclaimed_result["error"]:
|
|
result.status = "error"
|
|
if claimed_result["error"]:
|
|
result.issues.append(f"Claimed error: {claimed_result['error']}")
|
|
if unclaimed_result["error"]:
|
|
result.issues.append(f"Unclaimed error: {unclaimed_result['error']}")
|
|
|
|
else:
|
|
# Validate check type
|
|
check_type = config.get("checkType", "status_code")
|
|
|
|
if check_type == "status_code":
|
|
if claimed_result["status"] == unclaimed_result["status"]:
|
|
result.status = "broken"
|
|
result.issues.append(f"Same status code ({claimed_result['status']}) for both")
|
|
# Suggest fix
|
|
if claimed_result["final_url"] != unclaimed_result["final_url"]:
|
|
result.recommendations.append("Switch to checkType: response_url")
|
|
else:
|
|
result.status = "working"
|
|
|
|
elif check_type == "response_url":
|
|
if claimed_result["final_url"] == unclaimed_result["final_url"]:
|
|
result.status = "broken"
|
|
result.issues.append("Same final URL for both")
|
|
if claimed_result["status"] != unclaimed_result["status"]:
|
|
result.recommendations.append("Switch to checkType: status_code")
|
|
else:
|
|
result.status = "working"
|
|
|
|
elif check_type == "message":
|
|
presense_strs = config.get("presenseStrs", [])
|
|
absence_strs = config.get("absenceStrs", [])
|
|
|
|
claimed_content = claimed_result.get("content", "") or ""
|
|
unclaimed_content = unclaimed_result.get("content", "") or ""
|
|
|
|
presense_ok = not presense_strs or any(s in claimed_content for s in presense_strs)
|
|
absence_claimed = absence_strs and any(s in claimed_content for s in absence_strs)
|
|
absence_unclaimed = absence_strs and any(s in unclaimed_content for s in absence_strs)
|
|
|
|
if presense_strs and not presense_ok:
|
|
result.status = "broken"
|
|
result.issues.append(f"presenseStrs not found: {presense_strs}")
|
|
# Check if status_code would work
|
|
if claimed_result["status"] != unclaimed_result["status"]:
|
|
result.recommendations.append(f"Switch to checkType: status_code ({claimed_result['status']} vs {unclaimed_result['status']})")
|
|
elif absence_claimed:
|
|
result.status = "broken"
|
|
result.issues.append(f"absenceStrs found in claimed page")
|
|
elif absence_strs and not absence_unclaimed:
|
|
result.status = "broken"
|
|
result.warnings.append("absenceStrs not found in unclaimed page")
|
|
else:
|
|
result.status = "working"
|
|
|
|
else:
|
|
result.status = "unknown"
|
|
result.warnings.append(f"Unknown checkType: {check_type}")
|
|
|
|
result.check_time_ms = int((time.time() - start_time) * 1000)
|
|
return result
|
|
|
|
|
|
def load_sites(db_path: Path) -> Dict[str, dict]:
|
|
"""Load all sites from data.json."""
|
|
with open(db_path) as f:
|
|
data = json.load(f)
|
|
return data.get("sites", {})
|
|
|
|
|
|
def get_top_sites(sites: Dict[str, dict], n: int) -> List[Tuple[str, dict]]:
|
|
"""Get top N sites by Alexa rank."""
|
|
ranked = []
|
|
for name, config in sites.items():
|
|
rank = config.get("alexaRank", 999999)
|
|
ranked.append((name, config, rank))
|
|
|
|
ranked.sort(key=lambda x: x[2])
|
|
return [(name, config) for name, config, _ in ranked[:n]]
|
|
|
|
|
|
async def check_sites_batch(sites: List[Tuple[str, dict]], parallel: int = 5,
|
|
timeout: int = 15, progress_callback=None) -> List[SiteCheckResult]:
|
|
"""Check multiple sites with parallelism control."""
|
|
results = []
|
|
semaphore = asyncio.Semaphore(parallel)
|
|
|
|
async def check_with_semaphore(name, config, index):
|
|
async with semaphore:
|
|
if progress_callback:
|
|
progress_callback(index, len(sites), name)
|
|
return await check_site(name, config, timeout)
|
|
|
|
tasks = [
|
|
check_with_semaphore(name, config, i)
|
|
for i, (name, config) in enumerate(sites)
|
|
]
|
|
|
|
results = await asyncio.gather(*tasks)
|
|
return results
|
|
|
|
|
|
def print_progress(current: int, total: int, site_name: str):
|
|
"""Print progress indicator."""
|
|
pct = int(current / total * 100)
|
|
bar_width = 30
|
|
filled = int(bar_width * current / total)
|
|
bar = "█" * filled + "░" * (bar_width - filled)
|
|
print(f"\r[{bar}] {pct:3d}% ({current}/{total}) {site_name:<30}", end="", flush=True)
|
|
|
|
|
|
def generate_report(results: List[SiteCheckResult]) -> dict:
|
|
"""Generate a summary report from check results."""
|
|
report = {
|
|
"summary": {
|
|
"total": len(results),
|
|
"working": 0,
|
|
"broken": 0,
|
|
"disabled": 0,
|
|
"timeout": 0,
|
|
"anti_bot": 0,
|
|
"error": 0,
|
|
"unknown": 0,
|
|
},
|
|
"by_status": defaultdict(list),
|
|
"issues": [],
|
|
"recommendations": [],
|
|
}
|
|
|
|
for r in results:
|
|
report["summary"][r.status] = report["summary"].get(r.status, 0) + 1
|
|
report["by_status"][r.status].append(r.site_name)
|
|
|
|
if r.issues:
|
|
report["issues"].append({
|
|
"site": r.site_name,
|
|
"rank": r.alexa_rank,
|
|
"issues": r.issues,
|
|
})
|
|
|
|
if r.recommendations:
|
|
report["recommendations"].append({
|
|
"site": r.site_name,
|
|
"rank": r.alexa_rank,
|
|
"recommendations": r.recommendations,
|
|
})
|
|
|
|
return report
|
|
|
|
|
|
def print_report(report: dict, results: List[SiteCheckResult]):
|
|
"""Print a formatted report to console."""
|
|
summary = report["summary"]
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"{color('SITE CHECK REPORT', Colors.CYAN)}")
|
|
print(f"{'='*60}\n")
|
|
|
|
print(f"{color('SUMMARY:', Colors.BOLD)}")
|
|
print(f" Total sites checked: {summary['total']}")
|
|
print(f" {color('Working:', Colors.GREEN)} {summary['working']}")
|
|
print(f" {color('Broken:', Colors.RED)} {summary['broken']}")
|
|
print(f" {color('Disabled:', Colors.YELLOW)} {summary['disabled']}")
|
|
print(f" {color('Timeout:', Colors.YELLOW)} {summary['timeout']}")
|
|
print(f" {color('Anti-bot:', Colors.YELLOW)} {summary['anti_bot']}")
|
|
print(f" {color('Error:', Colors.RED)} {summary['error']}")
|
|
|
|
# Broken sites
|
|
if report["by_status"]["broken"]:
|
|
print(f"\n{color('BROKEN SITES:', Colors.RED)}")
|
|
for site in report["by_status"]["broken"][:20]:
|
|
r = next(x for x in results if x.site_name == site)
|
|
print(f" - {site} (rank {r.alexa_rank}): {', '.join(r.issues)}")
|
|
if len(report["by_status"]["broken"]) > 20:
|
|
print(f" ... and {len(report['by_status']['broken']) - 20} more")
|
|
|
|
# Timeout sites
|
|
if report["by_status"]["timeout"]:
|
|
print(f"\n{color('TIMEOUT SITES:', Colors.YELLOW)}")
|
|
for site in report["by_status"]["timeout"][:10]:
|
|
print(f" - {site}")
|
|
if len(report["by_status"]["timeout"]) > 10:
|
|
print(f" ... and {len(report['by_status']['timeout']) - 10} more")
|
|
|
|
# Anti-bot sites
|
|
if report["by_status"]["anti_bot"]:
|
|
print(f"\n{color('ANTI-BOT PROTECTED:', Colors.YELLOW)}")
|
|
for site in report["by_status"]["anti_bot"][:10]:
|
|
r = next(x for x in results if x.site_name == site)
|
|
print(f" - {site}: {', '.join(r.issues)}")
|
|
if len(report["by_status"]["anti_bot"]) > 10:
|
|
print(f" ... and {len(report['by_status']['anti_bot']) - 10} more")
|
|
|
|
# Recommendations
|
|
if report["recommendations"]:
|
|
print(f"\n{color('RECOMMENDATIONS:', Colors.CYAN)}")
|
|
for rec in report["recommendations"][:15]:
|
|
print(f" {rec['site']} (rank {rec['rank']}):")
|
|
for r in rec["recommendations"]:
|
|
print(f" -> {r}")
|
|
if len(report["recommendations"]) > 15:
|
|
print(f" ... and {len(report['recommendations']) - 15} more")
|
|
|
|
|
|
async def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Mass site checking for Maigret",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
)
|
|
parser.add_argument("--top", "-n", type=int, default=100,
|
|
help="Check top N sites by Alexa rank (default: 100)")
|
|
parser.add_argument("--parallel", "-p", type=int, default=5,
|
|
help="Number of parallel requests (default: 5)")
|
|
parser.add_argument("--timeout", "-t", type=int, default=15,
|
|
help="Request timeout in seconds (default: 15)")
|
|
parser.add_argument("--output", "-o", help="Output JSON report to file")
|
|
parser.add_argument("--include-disabled", action="store_true",
|
|
help="Include disabled sites in results")
|
|
parser.add_argument("--only-broken", action="store_true",
|
|
help="Only show broken sites")
|
|
parser.add_argument("--json", action="store_true",
|
|
help="Output as JSON only")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Load sites
|
|
db_path = Path(__file__).parent.parent / "maigret" / "resources" / "data.json"
|
|
if not db_path.exists():
|
|
print(f"Database not found: {db_path}")
|
|
sys.exit(1)
|
|
|
|
sites = load_sites(db_path)
|
|
top_sites = get_top_sites(sites, args.top)
|
|
|
|
if not args.json:
|
|
print(f"Checking top {len(top_sites)} sites (parallel={args.parallel}, timeout={args.timeout}s)...")
|
|
print()
|
|
|
|
# Run checks
|
|
progress = print_progress if not args.json else None
|
|
results = await check_sites_batch(top_sites, args.parallel, args.timeout, progress)
|
|
|
|
if not args.json:
|
|
print() # Clear progress line
|
|
|
|
# Filter results
|
|
if not args.include_disabled:
|
|
results = [r for r in results if r.status != "disabled"]
|
|
if args.only_broken:
|
|
results = [r for r in results if r.status in ("broken", "error", "timeout")]
|
|
|
|
# Generate report
|
|
report = generate_report(results)
|
|
|
|
# Output
|
|
if args.json:
|
|
output = {
|
|
"report": report,
|
|
"results": [asdict(r) for r in results],
|
|
}
|
|
print(json.dumps(output, indent=2))
|
|
else:
|
|
print_report(report, results)
|
|
|
|
# Save to file
|
|
if args.output:
|
|
output = {
|
|
"report": report,
|
|
"results": [asdict(r) for r in results],
|
|
}
|
|
with open(args.output, "w") as f:
|
|
json.dump(output, f, indent=2)
|
|
print(f"\nReport saved to: {args.output}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|