Improve site-check quality: fix broken site configs, add diagnostic utilities, and make self-check report-only by default with opt-in auto-disable. (#2301)

- Fix VK and TradingView checkType; add Reddit and Microsoft Learn API-style probes where appropriate; adjust or disable entries that are unreliable under anti-bot protection.
- Self-check: stop aggressive auto-disable; default to reporting issues only; add --auto-disable and --diagnose for optional fixes and deeper output.
- Tooling: add utils/site_check.py and utils/check_top_n.py (and related helpers) to inspect and rank site behavior against the top-N list
- Scope: aligns with fixing top-traffic / high-impact sites and making diagnostics repeatable without silently flipping disabled flags
This commit is contained in:
Soxoj
2026-03-22 16:48:35 +01:00
committed by Soxoj
parent 4784ecdacc
commit c9ab9d676b
14 changed files with 1959 additions and 65 deletions
+480
View File
@@ -0,0 +1,480 @@
#!/usr/bin/env python3
"""
Mass site checking utility for Maigret development.
Check top-N sites from data.json and generate a report.
Usage:
python utils/check_top_n.py --top 100 # Check top 100 sites
python utils/check_top_n.py --top 50 --parallel 10 # Check with 10 parallel requests
python utils/check_top_n.py --top 100 --output report.json
python utils/check_top_n.py --top 100 --fix # Auto-fix simple issues
"""
import argparse
import asyncio
import json
import sys
import time
from collections import defaultdict
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# Add parent dir for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
import aiohttp
except ImportError:
print("aiohttp not installed. Run: pip install aiohttp")
sys.exit(1)
class Colors:
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
CYAN = "\033[96m"
RESET = "\033[0m"
BOLD = "\033[1m"
def color(text: str, c: str) -> str:
return f"{c}{text}{Colors.RESET}"
@dataclass
class SiteCheckResult:
"""Result of checking a single site."""
site_name: str
alexa_rank: int
disabled: bool
check_type: str
# Status
status: str = "unknown" # working, broken, timeout, error, anti_bot, disabled
# HTTP results
claimed_http_status: Optional[int] = None
unclaimed_http_status: Optional[int] = None
claimed_error: Optional[str] = None
unclaimed_error: Optional[str] = None
# Issues detected
issues: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
# Recommendations
recommendations: List[str] = field(default_factory=list)
# Timing
check_time_ms: int = 0
DEFAULT_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
}
async def check_url(url: str, headers: dict, timeout: int = 15) -> dict:
"""Quick URL check returning status and basic info."""
result = {
"status": None,
"final_url": None,
"content_length": 0,
"error": None,
"error_type": None,
"content": None,
"markers": {},
}
try:
connector = aiohttp.TCPConnector(ssl=False)
timeout_obj = aiohttp.ClientTimeout(total=timeout)
async with aiohttp.ClientSession(connector=connector, timeout=timeout_obj) as session:
async with session.get(url, headers=headers, allow_redirects=True) as resp:
result["status"] = resp.status
result["final_url"] = str(resp.url)
try:
text = await resp.text()
result["content_length"] = len(text)
result["content"] = text
text_lower = text.lower()
result["markers"] = {
"404_text": any(m in text_lower for m in ["not found", "404", "doesn't exist"]),
"captcha": any(m in text_lower for m in ["captcha", "recaptcha", "challenge"]),
"cloudflare": "cloudflare" in text_lower,
"login": any(m in text_lower for m in ["log in", "login", "sign in"]),
}
except Exception as e:
result["error"] = f"Content error: {e}"
result["error_type"] = "content"
except asyncio.TimeoutError:
result["error"] = "Timeout"
result["error_type"] = "timeout"
except aiohttp.ClientError as e:
result["error"] = str(e)
result["error_type"] = "client"
except Exception as e:
result["error"] = str(e)
result["error_type"] = "unknown"
return result
async def check_site(site_name: str, config: dict, timeout: int = 15) -> SiteCheckResult:
"""Check a single site and return detailed result."""
start_time = time.time()
result = SiteCheckResult(
site_name=site_name,
alexa_rank=config.get("alexaRank", 999999),
disabled=config.get("disabled", False),
check_type=config.get("checkType", "status_code"),
)
# Skip disabled sites
if result.disabled:
result.status = "disabled"
return result
# Build URL
url_template = config.get("url", "")
url_main = config.get("urlMain", "")
url_subpath = config.get("urlSubpath", "")
url_template = url_template.replace("{urlMain}", url_main).replace("{urlSubpath}", url_subpath)
claimed = config.get("usernameClaimed")
unclaimed = config.get("usernameUnclaimed", "noonewouldeverusethis7")
if not claimed:
result.status = "error"
result.issues.append("No usernameClaimed defined")
return result
# Prepare headers
headers = DEFAULT_HEADERS.copy()
if config.get("headers"):
headers.update(config["headers"])
# Check both URLs
url_claimed = url_template.replace("{username}", claimed)
url_unclaimed = url_template.replace("{username}", unclaimed)
try:
claimed_result, unclaimed_result = await asyncio.gather(
check_url(url_claimed, headers, timeout),
check_url(url_unclaimed, headers, timeout),
)
except Exception as e:
result.status = "error"
result.issues.append(f"Check failed: {e}")
return result
result.claimed_http_status = claimed_result["status"]
result.unclaimed_http_status = unclaimed_result["status"]
result.claimed_error = claimed_result.get("error")
result.unclaimed_error = unclaimed_result.get("error")
# Categorize result
if claimed_result["error_type"] == "timeout" or unclaimed_result["error_type"] == "timeout":
result.status = "timeout"
result.issues.append("Request timeout")
elif claimed_result["status"] == 403 or claimed_result["status"] == 429:
result.status = "anti_bot"
result.issues.append(f"Anti-bot protection (HTTP {claimed_result['status']})")
elif claimed_result.get("markers", {}).get("captcha"):
result.status = "anti_bot"
result.issues.append("Captcha detected")
elif claimed_result.get("markers", {}).get("cloudflare"):
result.status = "anti_bot"
result.warnings.append("Cloudflare protection detected")
elif claimed_result["error"] or unclaimed_result["error"]:
result.status = "error"
if claimed_result["error"]:
result.issues.append(f"Claimed error: {claimed_result['error']}")
if unclaimed_result["error"]:
result.issues.append(f"Unclaimed error: {unclaimed_result['error']}")
else:
# Validate check type
check_type = config.get("checkType", "status_code")
if check_type == "status_code":
if claimed_result["status"] == unclaimed_result["status"]:
result.status = "broken"
result.issues.append(f"Same status code ({claimed_result['status']}) for both")
# Suggest fix
if claimed_result["final_url"] != unclaimed_result["final_url"]:
result.recommendations.append("Switch to checkType: response_url")
else:
result.status = "working"
elif check_type == "response_url":
if claimed_result["final_url"] == unclaimed_result["final_url"]:
result.status = "broken"
result.issues.append("Same final URL for both")
if claimed_result["status"] != unclaimed_result["status"]:
result.recommendations.append("Switch to checkType: status_code")
else:
result.status = "working"
elif check_type == "message":
presense_strs = config.get("presenseStrs", [])
absence_strs = config.get("absenceStrs", [])
claimed_content = claimed_result.get("content", "") or ""
unclaimed_content = unclaimed_result.get("content", "") or ""
presense_ok = not presense_strs or any(s in claimed_content for s in presense_strs)
absence_claimed = absence_strs and any(s in claimed_content for s in absence_strs)
absence_unclaimed = absence_strs and any(s in unclaimed_content for s in absence_strs)
if presense_strs and not presense_ok:
result.status = "broken"
result.issues.append(f"presenseStrs not found: {presense_strs}")
# Check if status_code would work
if claimed_result["status"] != unclaimed_result["status"]:
result.recommendations.append(f"Switch to checkType: status_code ({claimed_result['status']} vs {unclaimed_result['status']})")
elif absence_claimed:
result.status = "broken"
result.issues.append(f"absenceStrs found in claimed page")
elif absence_strs and not absence_unclaimed:
result.status = "broken"
result.warnings.append("absenceStrs not found in unclaimed page")
else:
result.status = "working"
else:
result.status = "unknown"
result.warnings.append(f"Unknown checkType: {check_type}")
result.check_time_ms = int((time.time() - start_time) * 1000)
return result
def load_sites(db_path: Path) -> Dict[str, dict]:
"""Load all sites from data.json."""
with open(db_path) as f:
data = json.load(f)
return data.get("sites", {})
def get_top_sites(sites: Dict[str, dict], n: int) -> List[Tuple[str, dict]]:
"""Get top N sites by Alexa rank."""
ranked = []
for name, config in sites.items():
rank = config.get("alexaRank", 999999)
ranked.append((name, config, rank))
ranked.sort(key=lambda x: x[2])
return [(name, config) for name, config, _ in ranked[:n]]
async def check_sites_batch(sites: List[Tuple[str, dict]], parallel: int = 5,
timeout: int = 15, progress_callback=None) -> List[SiteCheckResult]:
"""Check multiple sites with parallelism control."""
results = []
semaphore = asyncio.Semaphore(parallel)
async def check_with_semaphore(name, config, index):
async with semaphore:
if progress_callback:
progress_callback(index, len(sites), name)
return await check_site(name, config, timeout)
tasks = [
check_with_semaphore(name, config, i)
for i, (name, config) in enumerate(sites)
]
results = await asyncio.gather(*tasks)
return results
def print_progress(current: int, total: int, site_name: str):
"""Print progress indicator."""
pct = int(current / total * 100)
bar_width = 30
filled = int(bar_width * current / total)
bar = "" * filled + "" * (bar_width - filled)
print(f"\r[{bar}] {pct:3d}% ({current}/{total}) {site_name:<30}", end="", flush=True)
def generate_report(results: List[SiteCheckResult]) -> dict:
"""Generate a summary report from check results."""
report = {
"summary": {
"total": len(results),
"working": 0,
"broken": 0,
"disabled": 0,
"timeout": 0,
"anti_bot": 0,
"error": 0,
"unknown": 0,
},
"by_status": defaultdict(list),
"issues": [],
"recommendations": [],
}
for r in results:
report["summary"][r.status] = report["summary"].get(r.status, 0) + 1
report["by_status"][r.status].append(r.site_name)
if r.issues:
report["issues"].append({
"site": r.site_name,
"rank": r.alexa_rank,
"issues": r.issues,
})
if r.recommendations:
report["recommendations"].append({
"site": r.site_name,
"rank": r.alexa_rank,
"recommendations": r.recommendations,
})
return report
def print_report(report: dict, results: List[SiteCheckResult]):
"""Print a formatted report to console."""
summary = report["summary"]
print(f"\n{'='*60}")
print(f"{color('SITE CHECK REPORT', Colors.CYAN)}")
print(f"{'='*60}\n")
print(f"{color('SUMMARY:', Colors.BOLD)}")
print(f" Total sites checked: {summary['total']}")
print(f" {color('Working:', Colors.GREEN)} {summary['working']}")
print(f" {color('Broken:', Colors.RED)} {summary['broken']}")
print(f" {color('Disabled:', Colors.YELLOW)} {summary['disabled']}")
print(f" {color('Timeout:', Colors.YELLOW)} {summary['timeout']}")
print(f" {color('Anti-bot:', Colors.YELLOW)} {summary['anti_bot']}")
print(f" {color('Error:', Colors.RED)} {summary['error']}")
# Broken sites
if report["by_status"]["broken"]:
print(f"\n{color('BROKEN SITES:', Colors.RED)}")
for site in report["by_status"]["broken"][:20]:
r = next(x for x in results if x.site_name == site)
print(f" - {site} (rank {r.alexa_rank}): {', '.join(r.issues)}")
if len(report["by_status"]["broken"]) > 20:
print(f" ... and {len(report['by_status']['broken']) - 20} more")
# Timeout sites
if report["by_status"]["timeout"]:
print(f"\n{color('TIMEOUT SITES:', Colors.YELLOW)}")
for site in report["by_status"]["timeout"][:10]:
print(f" - {site}")
if len(report["by_status"]["timeout"]) > 10:
print(f" ... and {len(report['by_status']['timeout']) - 10} more")
# Anti-bot sites
if report["by_status"]["anti_bot"]:
print(f"\n{color('ANTI-BOT PROTECTED:', Colors.YELLOW)}")
for site in report["by_status"]["anti_bot"][:10]:
r = next(x for x in results if x.site_name == site)
print(f" - {site}: {', '.join(r.issues)}")
if len(report["by_status"]["anti_bot"]) > 10:
print(f" ... and {len(report['by_status']['anti_bot']) - 10} more")
# Recommendations
if report["recommendations"]:
print(f"\n{color('RECOMMENDATIONS:', Colors.CYAN)}")
for rec in report["recommendations"][:15]:
print(f" {rec['site']} (rank {rec['rank']}):")
for r in rec["recommendations"]:
print(f" -> {r}")
if len(report["recommendations"]) > 15:
print(f" ... and {len(report['recommendations']) - 15} more")
async def main():
parser = argparse.ArgumentParser(
description="Mass site checking for Maigret",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--top", "-n", type=int, default=100,
help="Check top N sites by Alexa rank (default: 100)")
parser.add_argument("--parallel", "-p", type=int, default=5,
help="Number of parallel requests (default: 5)")
parser.add_argument("--timeout", "-t", type=int, default=15,
help="Request timeout in seconds (default: 15)")
parser.add_argument("--output", "-o", help="Output JSON report to file")
parser.add_argument("--include-disabled", action="store_true",
help="Include disabled sites in results")
parser.add_argument("--only-broken", action="store_true",
help="Only show broken sites")
parser.add_argument("--json", action="store_true",
help="Output as JSON only")
args = parser.parse_args()
# Load sites
db_path = Path(__file__).parent.parent / "maigret" / "resources" / "data.json"
if not db_path.exists():
print(f"Database not found: {db_path}")
sys.exit(1)
sites = load_sites(db_path)
top_sites = get_top_sites(sites, args.top)
if not args.json:
print(f"Checking top {len(top_sites)} sites (parallel={args.parallel}, timeout={args.timeout}s)...")
print()
# Run checks
progress = print_progress if not args.json else None
results = await check_sites_batch(top_sites, args.parallel, args.timeout, progress)
if not args.json:
print() # Clear progress line
# Filter results
if not args.include_disabled:
results = [r for r in results if r.status != "disabled"]
if args.only_broken:
results = [r for r in results if r.status in ("broken", "error", "timeout")]
# Generate report
report = generate_report(results)
# Output
if args.json:
output = {
"report": report,
"results": [asdict(r) for r in results],
}
print(json.dumps(output, indent=2))
else:
print_report(report, results)
# Save to file
if args.output:
output = {
"report": report,
"results": [asdict(r) for r in results],
}
with open(args.output, "w") as f:
json.dump(output, f, indent=2)
print(f"\nReport saved to: {args.output}")
if __name__ == "__main__":
asyncio.run(main())
+223
View File
@@ -0,0 +1,223 @@
#!/usr/bin/env python3
"""
Probe likely false-positive sites among the top-N Alexa-ranked entries.
For each of K random *distinct* usernames taken from ``usernameClaimed`` fields in
the Maigret database, runs a clean ``maigret`` scan (``--top-sites N --json simple|ndjson``).
Sites that return CLAIMED in *every* run are reported: unrelated random claimed
handles are unlikely to all exist on the same third-party site, so such sites are
candidates for broken checks.
"""
from __future__ import annotations
import argparse
import json
import random
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
def repo_root() -> Path:
return Path(__file__).resolve().parent.parent
def load_username_claimed_pool(db_path: Path) -> list[str]:
with db_path.open(encoding="utf-8") as f:
data = json.load(f)
sites = data.get("sites") or {}
seen: set[str] = set()
pool: list[str] = []
for _name, site in sites.items():
u = (site or {}).get("usernameClaimed")
if not u or not isinstance(u, str):
continue
u = u.strip()
if not u or u in seen:
continue
seen.add(u)
pool.append(u)
return pool
def run_maigret(
*,
username: str,
db_path: Path,
out_dir: Path,
top_sites: int,
json_format: str,
quiet: bool,
) -> Path:
"""Run maigret subprocess; return path to the written JSON report."""
safe = username.replace("/", "_")
report_name = f"report_{safe}_{json_format}.json"
report_path = out_dir / report_name
cmd = [
sys.executable,
"-m",
"maigret",
username,
"--db",
str(db_path),
"--top-sites",
str(top_sites),
"--json",
json_format,
"--folderoutput",
str(out_dir),
"--no-progressbar",
"--no-color",
"--no-recursion",
"--no-extracting",
]
sink = subprocess.DEVNULL if quiet else None
proc = subprocess.run(
cmd,
cwd=str(repo_root()),
text=True,
stdout=sink,
stderr=sink,
)
if proc.returncode != 0:
raise RuntimeError(
f"maigret exited with {proc.returncode} for username {username!r}"
)
if not report_path.is_file():
raise FileNotFoundError(f"Expected report missing: {report_path}")
return report_path
def claimed_sites_from_report(path: Path, json_format: str) -> set[str]:
if json_format == "simple":
with path.open(encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
return set()
return set(data.keys())
# ndjson: one object per line, each has "sitename"
sites: set[str] = set()
with path.open(encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
obj = json.loads(line)
name = obj.get("sitename")
if isinstance(name, str) and name:
sites.add(name)
return sites
def main() -> int:
parser = argparse.ArgumentParser(
description=(
"Pick random distinct usernameClaimed values, run maigret --top-sites N "
"with JSON reports, and list sites that claimed all of them (suspicious FP)."
)
)
parser.add_argument(
"--db",
"-b",
type=Path,
default=repo_root() / "maigret" / "resources" / "data.json",
help="Path to Maigret data.json (a temp copy is used for runs).",
)
parser.add_argument(
"--top-sites",
"-n",
type=int,
default=500,
metavar="N",
help="Value for maigret --top-sites (default: 500).",
)
parser.add_argument(
"--samples",
"-k",
type=int,
default=5,
metavar="K",
help="How many distinct random usernames to draw (default: 5).",
)
parser.add_argument(
"--seed",
type=int,
default=None,
help="RNG seed for reproducible username selection.",
)
parser.add_argument(
"--json",
dest="json_format",
default="simple",
choices=["simple", "ndjson"],
help="JSON report type passed to maigret -J (default: simple).",
)
parser.add_argument(
"--verbose",
"-v",
action="store_true",
default=False,
help="Print maigret stdout/stderr (default: suppress child output).",
)
args = parser.parse_args()
quiet = not args.verbose
db_src = args.db.resolve()
if not db_src.is_file():
print(f"Database not found: {db_src}", file=sys.stderr)
return 2
pool = load_username_claimed_pool(db_src)
if len(pool) < args.samples:
print(
f"Need at least {args.samples} distinct usernameClaimed entries, "
f"found {len(pool)}.",
file=sys.stderr,
)
return 2
rng = random.Random(args.seed)
picked = rng.sample(pool, args.samples)
print(f"Database: {db_src}")
print(f"--top-sites {args.top_sites}, {args.samples} random usernameClaimed:")
for i, u in enumerate(picked, 1):
print(f" {i}. {u}")
site_sets: list[set[str]] = []
with tempfile.TemporaryDirectory(prefix="maigret_fp_probe_") as tmp:
tmp_path = Path(tmp)
db_work = tmp_path / "data.json"
shutil.copyfile(db_src, db_work)
for u in picked:
print(f"\nRunning maigret for {u!r} ...", flush=True)
report = run_maigret(
username=u,
db_path=db_work,
out_dir=tmp_path,
top_sites=args.top_sites,
json_format=args.json_format,
quiet=quiet,
)
sites = claimed_sites_from_report(report, args.json_format)
site_sets.append(sites)
print(f" -> {len(sites)} positive site(s) in JSON", flush=True)
always = set.intersection(*site_sets) if site_sets else set()
print("\n--- Sites with CLAIMED in all runs (candidates for false positives) ---")
if not always:
print("(none)")
else:
for name in sorted(always):
print(name)
return 0
if __name__ == "__main__":
raise SystemExit(main())
+750
View File
@@ -0,0 +1,750 @@
#!/usr/bin/env python3
"""
Site check utility for Maigret development.
Quickly test site availability, find valid usernames, and diagnose check issues.
Usage:
python utils/site_check.py --site "SiteName" --check-claimed
python utils/site_check.py --site "SiteName" --maigret # Test via Maigret
python utils/site_check.py --site "SiteName" --compare-methods # aiohttp vs Maigret
python utils/site_check.py --url "https://example.com/user/{username}" --test "john"
python utils/site_check.py --site "SiteName" --find-user
python utils/site_check.py --site "SiteName" --diagnose # Full diagnosis
"""
import argparse
import asyncio
import json
import logging
import re
import sys
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# Add parent dir for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
import aiohttp
except ImportError:
print("aiohttp not installed. Run: pip install aiohttp")
sys.exit(1)
# Maigret imports (optional, for --maigret mode)
MAIGRET_AVAILABLE = False
try:
from maigret.sites import MaigretDatabase, MaigretSite
from maigret.checking import (
SimpleAiohttpChecker,
check_site_for_username,
process_site_result,
make_site_result,
)
from maigret.notify import QueryNotifyPrint
from maigret.result import QueryStatus
MAIGRET_AVAILABLE = True
except ImportError:
pass
DEFAULT_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
}
COMMON_USERNAMES = ["blue", "test", "admin", "user", "john", "alex", "david", "mike", "chris", "dan"]
class Colors:
"""ANSI color codes for terminal output."""
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
MAGENTA = "\033[95m"
CYAN = "\033[96m"
RESET = "\033[0m"
BOLD = "\033[1m"
def color(text: str, c: str) -> str:
"""Wrap text with color codes."""
return f"{c}{text}{Colors.RESET}"
async def check_url_aiohttp(url: str, headers: dict = None, follow_redirects: bool = True,
timeout: int = 15, ssl_verify: bool = False) -> dict:
"""Check a URL using aiohttp and return detailed response info."""
headers = headers or DEFAULT_HEADERS.copy()
result = {
"method": "aiohttp",
"url": url,
"status": None,
"final_url": None,
"redirects": [],
"content_length": 0,
"content": None,
"title": None,
"error": None,
"error_type": None,
"markers": {},
}
try:
connector = aiohttp.TCPConnector(ssl=ssl_verify)
timeout_obj = aiohttp.ClientTimeout(total=timeout)
async with aiohttp.ClientSession(connector=connector, timeout=timeout_obj) as session:
async with session.get(url, headers=headers, allow_redirects=follow_redirects) as resp:
result["status"] = resp.status
result["final_url"] = str(resp.url)
# Get redirect history
if resp.history:
result["redirects"] = [str(r.url) for r in resp.history]
# Read content
try:
text = await resp.text()
result["content_length"] = len(text)
result["content"] = text
# Extract title
title_match = re.search(r'<title>([^<]*)</title>', text, re.IGNORECASE)
if title_match:
result["title"] = title_match.group(1).strip()[:100]
# Check common markers
text_lower = text.lower()
markers = {
"404_text": any(m in text_lower for m in ["not found", "404", "doesn't exist", "does not exist"]),
"profile_markers": any(m in text_lower for m in ["profile", "user", "member", "account"]),
"error_markers": any(m in text_lower for m in ["error", "banned", "suspended", "blocked"]),
"login_required": any(m in text_lower for m in ["log in", "login", "sign in", "signin"]),
"captcha": any(m in text_lower for m in ["captcha", "recaptcha", "challenge", "verify you"]),
"cloudflare": "cloudflare" in text_lower or "cf-ray" in text_lower,
"rate_limit": any(m in text_lower for m in ["rate limit", "too many requests", "429"]),
}
result["markers"] = markers
# First 500 chars of body for inspection
result["body_preview"] = text[:500].replace("\n", " ").strip()
except Exception as e:
result["error"] = f"Content read error: {e}"
result["error_type"] = "content_error"
except asyncio.TimeoutError:
result["error"] = "Timeout"
result["error_type"] = "timeout"
except aiohttp.ClientError as e:
result["error"] = f"Client error: {e}"
result["error_type"] = "client_error"
except Exception as e:
result["error"] = f"Error: {e}"
result["error_type"] = "unknown"
return result
async def check_url_maigret(site: 'MaigretSite', username: str, logger=None) -> dict:
"""Check a URL using Maigret's checking mechanism."""
if not MAIGRET_AVAILABLE:
return {"error": "Maigret not available", "method": "maigret"}
if logger is None:
logger = logging.getLogger("site_check")
logger.setLevel(logging.WARNING)
result = {
"method": "maigret",
"url": None,
"status": None,
"status_str": None,
"http_status": None,
"final_url": None,
"error": None,
"error_type": None,
"ids_data": None,
}
try:
# Create query options
options = {
"parsing": False,
"cookie_jar": None,
"timeout": 15,
}
# Create a simple notifier
class SilentNotify:
def start(self, msg=None): pass
def update(self, status, similar=False): pass
def finish(self, msg=None, status=None): pass
notifier = SilentNotify()
# Run the check
site_name, site_result = await check_site_for_username(
site, username, options, logger, notifier
)
result["url"] = site_result.get("url_user")
result["status"] = site_result.get("status")
result["status_str"] = str(site_result.get("status"))
result["http_status"] = site_result.get("http_status")
result["ids_data"] = site_result.get("ids_data")
# Check for errors
status = site_result.get("status")
if status and hasattr(status, 'error') and status.error:
result["error"] = f"{status.error.type}: {status.error.desc}"
result["error_type"] = str(status.error.type)
except Exception as e:
result["error"] = str(e)
result["error_type"] = "exception"
return result
async def find_valid_username(url_template: str, usernames: list = None, headers: dict = None) -> Optional[str]:
"""Try common usernames to find one that works."""
usernames = usernames or COMMON_USERNAMES
headers = headers or DEFAULT_HEADERS.copy()
print(f"Testing {len(usernames)} usernames on {url_template}...")
for username in usernames:
url = url_template.replace("{username}", username)
result = await check_url_aiohttp(url, headers)
status = result["status"]
markers = result.get("markers", {})
# Good signs: 200 status, profile markers, no 404 text
if status == 200 and not markers.get("404_text") and markers.get("profile_markers"):
print(f" {color('[+]', Colors.GREEN)} {username}: status={status}, has profile markers")
return username
elif status == 200 and not markers.get("404_text"):
print(f" {color('[?]', Colors.YELLOW)} {username}: status={status}, might work")
else:
print(f" {color('[-]', Colors.RED)} {username}: status={status}")
return None
async def compare_users_aiohttp(url_template: str, claimed: str, unclaimed: str = "noonewouldeverusethis7",
headers: dict = None) -> Tuple[dict, dict]:
"""Compare responses for claimed vs unclaimed usernames using aiohttp."""
headers = headers or DEFAULT_HEADERS.copy()
print(f"\n{'='*60}")
print(f"Comparing: {color(claimed, Colors.GREEN)} vs {color(unclaimed, Colors.RED)}")
print(f"URL template: {url_template}")
print(f"Method: aiohttp")
print(f"{'='*60}\n")
url_claimed = url_template.replace("{username}", claimed)
url_unclaimed = url_template.replace("{username}", unclaimed)
result_claimed, result_unclaimed = await asyncio.gather(
check_url_aiohttp(url_claimed, headers),
check_url_aiohttp(url_unclaimed, headers)
)
def print_result(name, r, c):
print(f"--- {color(name, c)} ---")
print(f" URL: {r['url']}")
print(f" Status: {color(str(r['status']), Colors.GREEN if r['status'] == 200 else Colors.RED)}")
if r["redirects"]:
print(f" Redirects: {' -> '.join(r['redirects'])} -> {r['final_url']}")
print(f" Final URL: {r['final_url']}")
print(f" Content length: {r['content_length']}")
print(f" Title: {r['title']}")
if r["error"]:
print(f" Error: {color(r['error'], Colors.RED)}")
print(f" Markers: {r['markers']}")
print()
print_result(f"CLAIMED ({claimed})", result_claimed, Colors.GREEN)
print_result(f"UNCLAIMED ({unclaimed})", result_unclaimed, Colors.RED)
# Analysis
print(f"--- {color('ANALYSIS', Colors.CYAN)} ---")
recommendations = []
if result_claimed["status"] != result_unclaimed["status"]:
print(f" [!] Status codes differ: {result_claimed['status']} vs {result_unclaimed['status']}")
recommendations.append(("status_code", f"Status codes: {result_claimed['status']} vs {result_unclaimed['status']}"))
if result_claimed["final_url"] != result_unclaimed["final_url"]:
print(f" [!] Final URLs differ")
recommendations.append(("response_url", "Final URLs differ"))
if result_claimed["content_length"] != result_unclaimed["content_length"]:
diff = abs(result_claimed["content_length"] - result_unclaimed["content_length"])
print(f" [!] Content length differs by {diff} bytes")
recommendations.append(("message", f"Content differs by {diff} bytes"))
if result_claimed["title"] != result_unclaimed["title"]:
print(f" [!] Titles differ:")
print(f" Claimed: {result_claimed['title']}")
print(f" Unclaimed: {result_unclaimed['title']}")
recommendations.append(("message", f"Titles differ: '{result_claimed['title']}' vs '{result_unclaimed['title']}'"))
# Check for problems
if result_claimed.get("markers", {}).get("captcha"):
print(f" {color('[WARN]', Colors.YELLOW)} Captcha detected on claimed page")
if result_claimed.get("markers", {}).get("cloudflare"):
print(f" {color('[WARN]', Colors.YELLOW)} Cloudflare protection detected")
if result_claimed.get("markers", {}).get("login_required"):
print(f" {color('[WARN]', Colors.YELLOW)} Login may be required")
if recommendations:
print(f"\n {color('Recommended checkType:', Colors.BOLD)} {recommendations[0][0]}")
else:
print(f" {color('[!]', Colors.RED)} No clear difference found - site may need special handling")
return result_claimed, result_unclaimed
async def compare_methods(site: 'MaigretSite', claimed: str, unclaimed: str) -> dict:
"""Compare aiohttp vs Maigret results for the same site."""
if not MAIGRET_AVAILABLE:
print(color("Maigret not available for comparison", Colors.RED))
return {}
print(f"\n{'='*60}")
print(f"{color('METHOD COMPARISON', Colors.CYAN)}: aiohttp vs Maigret")
print(f"Site: {site.name}")
print(f"Claimed: {claimed}, Unclaimed: {unclaimed}")
print(f"{'='*60}\n")
# Build URL template
url_template = site.url
url_template = url_template.replace("{urlMain}", site.url_main or "")
url_template = url_template.replace("{urlSubpath}", getattr(site, 'url_subpath', '') or "")
headers = DEFAULT_HEADERS.copy()
if hasattr(site, 'headers') and site.headers:
headers.update(site.headers)
# Run all checks in parallel
url_claimed = url_template.replace("{username}", claimed)
url_unclaimed = url_template.replace("{username}", unclaimed)
aiohttp_claimed, aiohttp_unclaimed, maigret_claimed, maigret_unclaimed = await asyncio.gather(
check_url_aiohttp(url_claimed, headers),
check_url_aiohttp(url_unclaimed, headers),
check_url_maigret(site, claimed),
check_url_maigret(site, unclaimed),
)
def status_icon(status):
if status == 200:
return color("200", Colors.GREEN)
elif status == 404:
return color("404", Colors.YELLOW)
elif status and status >= 400:
return color(str(status), Colors.RED)
return str(status)
def maigret_status_icon(status_str):
if "Claimed" in str(status_str):
return color("Claimed", Colors.GREEN)
elif "Available" in str(status_str):
return color("Available", Colors.YELLOW)
else:
return color(str(status_str), Colors.RED)
print(f"{'Method':<12} {'Username':<25} {'HTTP Status':<12} {'Result':<20}")
print("-" * 70)
print(f"{'aiohttp':<12} {claimed:<25} {status_icon(aiohttp_claimed['status']):<20} {'OK' if not aiohttp_claimed['error'] else aiohttp_claimed['error'][:20]}")
print(f"{'aiohttp':<12} {unclaimed:<25} {status_icon(aiohttp_unclaimed['status']):<20} {'OK' if not aiohttp_unclaimed['error'] else aiohttp_unclaimed['error'][:20]}")
print(f"{'Maigret':<12} {claimed:<25} {status_icon(maigret_claimed.get('http_status')):<20} {maigret_status_icon(maigret_claimed.get('status_str'))}")
print(f"{'Maigret':<12} {unclaimed:<25} {status_icon(maigret_unclaimed.get('http_status')):<20} {maigret_status_icon(maigret_unclaimed.get('status_str'))}")
# Check for discrepancies
print(f"\n--- {color('DISCREPANCY ANALYSIS', Colors.CYAN)} ---")
issues = []
if aiohttp_claimed['status'] != maigret_claimed.get('http_status'):
issues.append(f"HTTP status mismatch for claimed: aiohttp={aiohttp_claimed['status']}, Maigret={maigret_claimed.get('http_status')}")
if aiohttp_unclaimed['status'] != maigret_unclaimed.get('http_status'):
issues.append(f"HTTP status mismatch for unclaimed: aiohttp={aiohttp_unclaimed['status']}, Maigret={maigret_unclaimed.get('http_status')}")
# Check Maigret detection correctness
claimed_detected = "Claimed" in str(maigret_claimed.get('status_str', ''))
unclaimed_detected = "Available" in str(maigret_unclaimed.get('status_str', ''))
if not claimed_detected:
issues.append(f"Maigret did NOT detect claimed user '{claimed}' as Claimed")
if not unclaimed_detected:
issues.append(f"Maigret did NOT detect unclaimed user '{unclaimed}' as Available")
if issues:
for issue in issues:
print(f" {color('[!]', Colors.RED)} {issue}")
else:
print(f" {color('[OK]', Colors.GREEN)} Both methods agree on results")
return {
"aiohttp_claimed": aiohttp_claimed,
"aiohttp_unclaimed": aiohttp_unclaimed,
"maigret_claimed": maigret_claimed,
"maigret_unclaimed": maigret_unclaimed,
"issues": issues,
}
async def diagnose_site(site_config: dict, site_name: str) -> dict:
"""Full diagnosis of a site configuration."""
print(f"\n{'='*60}")
print(f"{color('FULL SITE DIAGNOSIS', Colors.CYAN)}: {site_name}")
print(f"{'='*60}\n")
diagnosis = {
"site_name": site_name,
"issues": [],
"warnings": [],
"recommendations": [],
"working": False,
}
# 1. Config analysis
print(f"--- {color('1. CONFIGURATION', Colors.BOLD)} ---")
check_type = site_config.get("checkType", "status_code")
url = site_config.get("url", "")
url_main = site_config.get("urlMain", "")
claimed = site_config.get("usernameClaimed")
unclaimed = site_config.get("usernameUnclaimed", "noonewouldeverusethis7")
disabled = site_config.get("disabled", False)
print(f" checkType: {check_type}")
print(f" URL: {url}")
print(f" urlMain: {url_main}")
print(f" usernameClaimed: {claimed}")
print(f" disabled: {disabled}")
if disabled:
diagnosis["issues"].append("Site is disabled")
print(f" {color('[!]', Colors.YELLOW)} Site is disabled")
if not claimed:
diagnosis["issues"].append("No usernameClaimed defined")
print(f" {color('[!]', Colors.RED)} No usernameClaimed defined")
return diagnosis
# Build full URL
url_template = url.replace("{urlMain}", url_main).replace("{urlSubpath}", site_config.get("urlSubpath", ""))
headers = DEFAULT_HEADERS.copy()
if site_config.get("headers"):
headers.update(site_config["headers"])
# 2. Connectivity test
print(f"\n--- {color('2. CONNECTIVITY TEST', Colors.BOLD)} ---")
url_claimed = url_template.replace("{username}", claimed)
url_unclaimed = url_template.replace("{username}", unclaimed)
result_claimed, result_unclaimed = await asyncio.gather(
check_url_aiohttp(url_claimed, headers),
check_url_aiohttp(url_unclaimed, headers)
)
print(f" Claimed ({claimed}): status={result_claimed['status']}, error={result_claimed['error']}")
print(f" Unclaimed ({unclaimed}): status={result_unclaimed['status']}, error={result_unclaimed['error']}")
# Check for common problems
if result_claimed["error_type"] == "timeout":
diagnosis["issues"].append("Timeout on claimed username")
if result_unclaimed["error_type"] == "timeout":
diagnosis["issues"].append("Timeout on unclaimed username")
if result_claimed.get("markers", {}).get("cloudflare"):
diagnosis["warnings"].append("Cloudflare protection detected")
if result_claimed.get("markers", {}).get("captcha"):
diagnosis["warnings"].append("Captcha detected")
if result_claimed["status"] == 403:
diagnosis["issues"].append("403 Forbidden - possible anti-bot protection")
if result_claimed["status"] == 429:
diagnosis["issues"].append("429 Rate Limited")
# 3. Check type validation
print(f"\n--- {color('3. CHECK TYPE VALIDATION', Colors.BOLD)} ---")
if check_type == "status_code":
if result_claimed["status"] == result_unclaimed["status"]:
diagnosis["issues"].append(f"status_code check but same status ({result_claimed['status']}) for both")
print(f" {color('[FAIL]', Colors.RED)} Same status code for claimed and unclaimed: {result_claimed['status']}")
else:
print(f" {color('[OK]', Colors.GREEN)} Status codes differ: {result_claimed['status']} vs {result_unclaimed['status']}")
diagnosis["working"] = True
elif check_type == "response_url":
if result_claimed["final_url"] == result_unclaimed["final_url"]:
diagnosis["issues"].append("response_url check but same final URL for both")
print(f" {color('[FAIL]', Colors.RED)} Same final URL for both")
else:
print(f" {color('[OK]', Colors.GREEN)} Final URLs differ")
diagnosis["working"] = True
elif check_type == "message":
presense_strs = site_config.get("presenseStrs", [])
absence_strs = site_config.get("absenceStrs", [])
print(f" presenseStrs: {presense_strs}")
print(f" absenceStrs: {absence_strs}")
claimed_content = result_claimed.get("content", "") or ""
unclaimed_content = result_unclaimed.get("content", "") or ""
# Check presenseStrs
presense_found_claimed = any(s in claimed_content for s in presense_strs) if presense_strs else True
presense_found_unclaimed = any(s in unclaimed_content for s in presense_strs) if presense_strs else True
# Check absenceStrs
absence_found_claimed = any(s in claimed_content for s in absence_strs) if absence_strs else False
absence_found_unclaimed = any(s in unclaimed_content for s in absence_strs) if absence_strs else False
print(f" Claimed - presenseStrs found: {presense_found_claimed}, absenceStrs found: {absence_found_claimed}")
print(f" Unclaimed - presenseStrs found: {presense_found_unclaimed}, absenceStrs found: {absence_found_unclaimed}")
if presense_strs and not presense_found_claimed:
diagnosis["issues"].append(f"presenseStrs {presense_strs} not found in claimed page")
print(f" {color('[FAIL]', Colors.RED)} presenseStrs not found in claimed page")
if absence_strs and absence_found_claimed:
diagnosis["issues"].append(f"absenceStrs {absence_strs} found in claimed page (should not be)")
print(f" {color('[FAIL]', Colors.RED)} absenceStrs found in claimed page")
if absence_strs and not absence_found_unclaimed:
diagnosis["warnings"].append(f"absenceStrs not found in unclaimed page")
print(f" {color('[WARN]', Colors.YELLOW)} absenceStrs not found in unclaimed page")
if presense_found_claimed and not absence_found_claimed and absence_found_unclaimed:
print(f" {color('[OK]', Colors.GREEN)} Message check should work correctly")
diagnosis["working"] = True
# 4. Recommendations
print(f"\n--- {color('4. RECOMMENDATIONS', Colors.BOLD)} ---")
if not diagnosis["working"]:
# Suggest alternatives
if result_claimed["status"] != result_unclaimed["status"]:
diagnosis["recommendations"].append(f"Switch to checkType: status_code (status {result_claimed['status']} vs {result_unclaimed['status']})")
if result_claimed["final_url"] != result_unclaimed["final_url"]:
diagnosis["recommendations"].append("Switch to checkType: response_url")
if result_claimed["title"] != result_unclaimed["title"]:
diagnosis["recommendations"].append(f"Use title as marker: presenseStrs=['{result_claimed['title']}'] or absenceStrs=['{result_unclaimed['title']}']")
if diagnosis["recommendations"]:
for rec in diagnosis["recommendations"]:
print(f" -> {rec}")
elif diagnosis["working"]:
print(f" {color('Site appears to be working correctly', Colors.GREEN)}")
else:
print(f" {color('No clear fix found - site may need special handling or should be disabled', Colors.RED)}")
# Summary
print(f"\n--- {color('SUMMARY', Colors.BOLD)} ---")
if diagnosis["issues"]:
print(f" Issues: {len(diagnosis['issues'])}")
for issue in diagnosis["issues"]:
print(f" - {issue}")
if diagnosis["warnings"]:
print(f" Warnings: {len(diagnosis['warnings'])}")
for warn in diagnosis["warnings"]:
print(f" - {warn}")
print(f" Working: {color('YES', Colors.GREEN) if diagnosis['working'] else color('NO', Colors.RED)}")
return diagnosis
def load_site_from_db(site_name: str) -> Tuple[Optional[dict], Optional['MaigretSite']]:
"""Load site config from data.json. Returns (config_dict, MaigretSite or None)."""
db_path = Path(__file__).parent.parent / "maigret" / "resources" / "data.json"
with open(db_path) as f:
data = json.load(f)
config = None
if site_name in data["sites"]:
config = data["sites"][site_name]
else:
# Try case-insensitive search
for name, cfg in data["sites"].items():
if name.lower() == site_name.lower():
config = cfg
site_name = name
break
if not config:
return None, None
# Also load MaigretSite if available
maigret_site = None
if MAIGRET_AVAILABLE:
try:
db = MaigretDatabase().load_from_path(db_path)
maigret_site = db.sites_dict.get(site_name)
except Exception:
pass
return config, maigret_site
async def main():
parser = argparse.ArgumentParser(
description="Site check utility for Maigret development",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --site "VK" --check-claimed # Test site with aiohttp
%(prog)s --site "VK" --maigret # Test site with Maigret
%(prog)s --site "VK" --compare-methods # Compare aiohttp vs Maigret
%(prog)s --site "VK" --diagnose # Full diagnosis
%(prog)s --url "https://vk.com/{username}" --compare blue nobody123
%(prog)s --site "VK" --find-user # Find a valid username
"""
)
parser.add_argument("--site", "-s", help="Site name from data.json")
parser.add_argument("--url", "-u", help="URL template with {username}")
parser.add_argument("--test", "-t", help="Username to test")
parser.add_argument("--compare", "-c", nargs=2, metavar=("CLAIMED", "UNCLAIMED"),
help="Compare two usernames")
parser.add_argument("--find-user", "-f", action="store_true",
help="Find a valid username")
parser.add_argument("--check-claimed", action="store_true",
help="Check if claimed username still works (aiohttp)")
parser.add_argument("--maigret", "-m", action="store_true",
help="Test using Maigret's checker instead of aiohttp")
parser.add_argument("--compare-methods", action="store_true",
help="Compare aiohttp vs Maigret results")
parser.add_argument("--diagnose", "-d", action="store_true",
help="Full diagnosis of site configuration")
parser.add_argument("--headers", help="Custom headers as JSON")
parser.add_argument("--timeout", type=int, default=15, help="Request timeout in seconds")
parser.add_argument("--json", action="store_true", help="Output results as JSON")
args = parser.parse_args()
url_template = None
claimed = None
unclaimed = "noonewouldeverusethis7"
headers = DEFAULT_HEADERS.copy()
site_config = None
maigret_site = None
# Load from site name
if args.site:
site_config, maigret_site = load_site_from_db(args.site)
if not site_config:
print(f"Site '{args.site}' not found in database")
sys.exit(1)
url_template = site_config.get("url", "")
url_main = site_config.get("urlMain", "")
url_subpath = site_config.get("urlSubpath", "")
url_template = url_template.replace("{urlMain}", url_main).replace("{urlSubpath}", url_subpath)
claimed = site_config.get("usernameClaimed")
unclaimed = site_config.get("usernameUnclaimed", unclaimed)
if site_config.get("headers"):
headers.update(site_config["headers"])
if not args.json:
print(f"Loaded site: {args.site}")
print(f" URL: {url_template}")
print(f" Claimed: {claimed}")
print(f" CheckType: {site_config.get('checkType', 'unknown')}")
print(f" Disabled: {site_config.get('disabled', False)}")
# Override with explicit URL
if args.url:
url_template = args.url
# Custom headers
if args.headers:
headers.update(json.loads(args.headers))
# Actions
if args.diagnose:
if not site_config:
print("--diagnose requires --site")
sys.exit(1)
result = await diagnose_site(site_config, args.site)
if args.json:
print(json.dumps(result, indent=2, default=str))
elif args.compare_methods:
if not maigret_site:
if not MAIGRET_AVAILABLE:
print("Maigret imports not available")
else:
print("Could not load MaigretSite object")
sys.exit(1)
result = await compare_methods(maigret_site, claimed, unclaimed)
if args.json:
print(json.dumps(result, indent=2, default=str))
elif args.maigret:
if not maigret_site:
if not MAIGRET_AVAILABLE:
print("Maigret imports not available")
else:
print("Could not load MaigretSite object")
sys.exit(1)
print(f"\n--- Testing with Maigret ---")
for username in [claimed, unclaimed]:
result = await check_url_maigret(maigret_site, username)
print(f" {username}: status={result.get('status_str')}, http={result.get('http_status')}, error={result.get('error')}")
elif args.find_user:
if not url_template:
print("--find-user requires --site or --url")
sys.exit(1)
result = await find_valid_username(url_template, headers=headers)
if result:
print(f"\n{color('Found valid username:', Colors.GREEN)} {result}")
else:
print(f"\n{color('No valid username found', Colors.RED)}")
elif args.compare:
if not url_template:
print("--compare requires --site or --url")
sys.exit(1)
result = await compare_users_aiohttp(url_template, args.compare[0], args.compare[1], headers)
if args.json:
# Remove content field for JSON output (too large)
for r in result:
if isinstance(r, dict) and "content" in r:
del r["content"]
print(json.dumps(result, indent=2, default=str))
elif args.check_claimed and claimed:
result = await compare_users_aiohttp(url_template, claimed, unclaimed, headers)
elif args.test:
if not url_template:
print("--test requires --site or --url")
sys.exit(1)
url = url_template.replace("{username}", args.test)
result = await check_url_aiohttp(url, headers, timeout=args.timeout)
if "content" in result:
del result["content"] # Too large for display
print(json.dumps(result, indent=2, default=str))
else:
# Default: check claimed username if available
if url_template and claimed:
await compare_users_aiohttp(url_template, claimed, unclaimed, headers)
else:
parser.print_help()
if __name__ == "__main__":
asyncio.run(main())