Improve site-check quality: fix broken site configs, add diagnostic utilities, and make self-check report-only by default with opt-in auto-disable. (#2301)

- Fix VK and TradingView checkType; add Reddit and Microsoft Learn API-style probes where appropriate; adjust or disable entries that are unreliable under anti-bot protection.
- Self-check: stop aggressive auto-disable; default to reporting issues only; add --auto-disable and --diagnose for optional fixes and deeper output.
- Tooling: add utils/site_check.py and utils/check_top_n.py (and related helpers) to inspect and rank site behavior against the top-N list
- Scope: aligns with fixing top-traffic / high-impact sites and making diagnostics repeatable without silently flipping disabled flags
This commit is contained in:
Soxoj
2026-03-22 16:48:35 +01:00
committed by GitHub
parent f3b741d283
commit 97cc4b46d9
14 changed files with 1959 additions and 65 deletions
+102 -13
View File
@@ -826,9 +826,21 @@ async def site_self_check(
i2p_proxy=None,
skip_errors=False,
cookies=None,
auto_disable=False,
diagnose=False,
):
"""
Self-check a site configuration.
Args:
auto_disable: If True, automatically disable sites that fail checks.
If False (default), only report issues without disabling.
diagnose: If True, print detailed diagnosis information.
"""
changes = {
"disabled": False,
"issues": [],
"recommendations": [],
}
check_data = [
@@ -838,6 +850,8 @@ async def site_self_check(
logger.info(f"Checking {site.name}...")
results_cache = {}
for username, status in check_data:
async with semaphore:
results_dict = await maigret(
@@ -859,15 +873,20 @@ async def site_self_check(
# TODO: make normal checking
if site.name not in results_dict:
logger.info(results_dict)
changes["disabled"] = True
changes["issues"].append(f"Site {site.name} not in results (wrong id_type?)")
if auto_disable:
changes["disabled"] = True
continue
logger.debug(results_dict)
result = results_dict[site.name]["status"]
results_cache[username] = results_dict[site.name]
if result.error and 'Cannot connect to host' in result.error.desc:
changes["disabled"] = True
changes["issues"].append(f"Cannot connect to host")
if auto_disable:
changes["disabled"] = True
site_status = result.status
@@ -875,6 +894,8 @@ async def site_self_check(
if site_status == MaigretCheckStatus.UNKNOWN:
msgs = site.absence_strs
etype = site.check_type
error_msg = f"Error checking {username}: {result.context}"
changes["issues"].append(error_msg)
logger.warning(
f"Error while searching {username} in {site.name}: {result.context}, {msgs}, type {etype}"
)
@@ -884,28 +905,62 @@ async def site_self_check(
if skip_errors:
pass
# don't disable in case of available username
elif status == MaigretCheckStatus.CLAIMED:
elif status == MaigretCheckStatus.CLAIMED and auto_disable:
changes["disabled"] = True
elif status == MaigretCheckStatus.CLAIMED:
changes["issues"].append(f"Claimed user '{username}' not detected as claimed")
logger.warning(
f"Not found `{username}` in {site.name}, must be claimed"
)
logger.info(results_dict[site.name])
changes["disabled"] = True
if auto_disable:
changes["disabled"] = True
else:
changes["issues"].append(f"Unclaimed user '{username}' detected as claimed")
logger.warning(f"Found `{username}` in {site.name}, must be available")
logger.info(results_dict[site.name])
changes["disabled"] = True
if auto_disable:
changes["disabled"] = True
logger.info(f"Site {site.name} checking is finished")
if changes["disabled"] != site.disabled:
# Generate recommendations based on issues
if changes["issues"] and len(results_cache) == 2:
claimed_result = results_cache.get(site.username_claimed, {})
unclaimed_result = results_cache.get(site.username_unclaimed, {})
claimed_http = claimed_result.get("http_status")
unclaimed_http = unclaimed_result.get("http_status")
if claimed_http and unclaimed_http:
if claimed_http != unclaimed_http and site.check_type != "status_code":
changes["recommendations"].append(
f"Consider checkType: status_code (HTTP {claimed_http} vs {unclaimed_http})"
)
# Print diagnosis if requested
if diagnose and changes["issues"]:
print(f"\n--- {site.name} DIAGNOSIS ---")
print(f" Check type: {site.check_type}")
print(f" Issues:")
for issue in changes["issues"]:
print(f" - {issue}")
if changes["recommendations"]:
print(f" Recommendations:")
for rec in changes["recommendations"]:
print(f" -> {rec}")
# Only modify site if auto_disable is enabled
if auto_disable and changes["disabled"] != site.disabled:
site.disabled = changes["disabled"]
logger.info(f"Switching property 'disabled' for {site.name} to {site.disabled}")
db.update_site(site)
if not silent:
action = "Disabled" if site.disabled else "Enabled"
print(f"{action} site {site.name}...")
elif changes["issues"] and not silent and not diagnose:
# Report issues without disabling
print(f"Issues found in {site.name}: {len(changes['issues'])} (not auto-disabled)")
# remove service tag "unchecked"
if "unchecked" in site.tags:
@@ -924,10 +979,24 @@ async def self_check(
proxy=None,
tor_proxy=None,
i2p_proxy=None,
) -> bool:
auto_disable=False,
diagnose=False,
) -> dict:
"""
Run self-check on sites.
Args:
auto_disable: If True, automatically disable sites that fail checks.
If False (default), only report issues without disabling.
diagnose: If True, print detailed diagnosis for each failing site.
Returns:
dict with 'needs_update' bool and 'results' list of check results
"""
sem = asyncio.Semaphore(max_connections)
tasks = []
all_sites = site_data
all_results = []
def disabled_count(lst):
return len(list(filter(lambda x: x.disabled, lst)))
@@ -939,15 +1008,18 @@ async def self_check(
for _, site in all_sites.items():
check_coro = site_self_check(
site, logger, sem, db, silent, proxy, tor_proxy, i2p_proxy, skip_errors=True
site, logger, sem, db, silent, proxy, tor_proxy, i2p_proxy,
skip_errors=True, auto_disable=auto_disable, diagnose=diagnose
)
future = asyncio.ensure_future(check_coro)
tasks.append(future)
tasks.append((site.name, future))
if tasks:
with alive_bar(len(tasks), title='Self-checking', force_tty=True) as progress:
for f in asyncio.as_completed(tasks):
await f
for site_name, f in tasks:
result = await f
result['site_name'] = site_name
all_results.append(result)
progress() # Update the progress bar
unchecked_new_count = len(
@@ -956,7 +1028,10 @@ async def self_check(
disabled_new_count = disabled_count(all_sites.values())
total_disabled = disabled_new_count - disabled_old_count
if total_disabled:
# Count issues
total_issues = sum(1 for r in all_results if r.get('issues'))
if auto_disable and total_disabled:
if total_disabled >= 0:
message = "Disabled"
else:
@@ -968,11 +1043,25 @@ async def self_check(
f"{message} {total_disabled} ({disabled_old_count} => {disabled_new_count}) checked sites. "
"Run with `--info` flag to get more information"
)
elif total_issues and not silent:
print(f"\nFound issues in {total_issues} sites (auto-disable is OFF)")
print("Use --auto-disable to automatically disable failing sites")
print("Use --diagnose to see detailed diagnosis for each site")
if unchecked_new_count != unchecked_old_count:
print(f"Unchecked sites verified: {unchecked_old_count - unchecked_new_count}")
return total_disabled != 0 or unchecked_new_count != unchecked_old_count
needs_update = total_disabled != 0 or unchecked_new_count != unchecked_old_count
# For backwards compatibility, return bool if auto_disable is True
if auto_disable:
return needs_update
return {
'needs_update': needs_update,
'results': all_results,
'total_issues': total_issues,
}
def extract_ids_data(html_text, logger, site) -> Dict: