Compare commits

..

2 Commits

Author SHA1 Message Date
copilot-swe-agent[bot] e6624bc0b0 Add automated solution for closing invalid Telegram PRs
Co-authored-by: soxoj <31013580+soxoj@users.noreply.github.com>
2025-08-22 00:22:39 +00:00
copilot-swe-agent[bot] 7467f56854 Initial plan 2025-08-22 00:17:57 +00:00
39 changed files with 28570 additions and 31959 deletions
@@ -0,0 +1,61 @@
name: Close Invalid Telegram PRs
on:
schedule:
# Run daily at 2 AM UTC
- cron: '0 2 * * *'
workflow_dispatch:
# Allow manual triggering
inputs:
dry_run:
description: 'Run in dry-run mode (show what would be closed without closing)'
required: false
default: 'false'
type: boolean
jobs:
close-invalid-prs:
runs-on: ubuntu-latest
permissions:
# Need write permissions for pull requests and issues
pull-requests: write
issues: write
contents: read
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install requests
- name: Make script executable
run: chmod +x utils/close_invalid_telegram_prs.py
- name: Run PR closer script (dry-run for manual trigger)
if: github.event_name == 'workflow_dispatch' && github.event.inputs.dry_run == 'true'
run: |
python utils/close_invalid_telegram_prs.py --dry-run
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Run PR closer script (live for manual trigger)
if: github.event_name == 'workflow_dispatch' && github.event.inputs.dry_run == 'false'
run: |
python utils/close_invalid_telegram_prs.py
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Run PR closer script (automated daily)
if: github.event_name == 'schedule'
run: |
python utils/close_invalid_telegram_prs.py --dry-run
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+39 -54
View File
@@ -2,69 +2,54 @@ name: Package exe with PyInstaller - Windows
on:
push:
branches: [main, dev]
branches: [ main, dev ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Checkout
uses: actions/checkout@v4
# Wine Python (not Linux) runs PyInstaller; altgraph needs pkg_resources — reinstall setuptools after all deps.
- name: Prepare requirements for Wine (setuptools last)
run: |
set -euo pipefail
cp pyinstaller/requirements.txt pyinstaller/requirements-wine.txt
{
echo ""
echo "# CI: setuptools last so pkg_resources exists for PyInstaller/altgraph in Wine"
echo "setuptools==70.0.0"
} >> pyinstaller/requirements-wine.txt
- name: PyInstaller Windows Build
uses: JackMcKew/pyinstaller-action-windows@main
with:
path: pyinstaller
- name: PyInstaller Windows Build
uses: JackMcKew/pyinstaller-action-windows@main
with:
path: pyinstaller
requirements: requirements-wine.txt
- name: Upload PyInstaller Binary to Workflow as Artifact
uses: actions/upload-artifact@v4
with:
name: maigret_standalone_win32
path: pyinstaller/dist/windows
- name: Upload PyInstaller Binary to Workflow as Artifact
if: success()
uses: actions/upload-artifact@v4
with:
name: maigret_standalone_win32
path: pyinstaller/dist/windows
- name: Download PyInstaller Binary
uses: actions/download-artifact@v4
with:
name: maigret_standalone_win32
- name: Download PyInstaller Binary
if: success()
uses: actions/download-artifact@v4
with:
name: maigret_standalone_win32
- name: Create New Release and Upload PyInstaller Binary to Release
uses: ncipollo/release-action@v1.14.0
id: create_release
with:
allowUpdates: true
draft: false
prerelease: false
artifactErrorsFailBuild: true
makeLatest: true
replacesArtifacts: true
artifacts: maigret_standalone.exe
name: Development Windows Release [${{ github.ref_name }}]
tag: ${{ github.ref_name }}
body: |
This is a development release built from the **${{ github.ref_name }}** branch.
- name: Create New Release and Upload PyInstaller Binary to Release
if: success()
uses: ncipollo/release-action@v1.14.0
id: create_release
with:
allowUpdates: true
draft: false
prerelease: false
artifactErrorsFailBuild: true
makeLatest: true
replacesArtifacts: true
artifacts: maigret_standalone.exe
name: Development Windows Release [${{ github.ref_name }}]
tag: ${{ github.ref_name }}
body: |
This is a development release built from the **${{ github.ref_name }}** branch.
Take into account that `dev` releases may be unstable.
Please, use [the development release](https://github.com/soxoj/maigret/releases/tag/main) build from the **main** branch.
Take into account that `dev` releases may be unstable.
Please, use [the development release](https://github.com/soxoj/maigret/releases/tag/main) build from the **main** branch.
Instructions:
- Download the attached file `maigret_standalone.exe` to get the Windows executable.
- Video guide on how to run it: https://youtu.be/qIgwTZOmMmM
- For detailed documentation, visit: https://maigret.readthedocs.io/en/latest/
Instructions:
- Download the attached file `maigret_standalone.exe` to get the Windows executable.
- Video guide on how to run it: https://youtu.be/qIgwTZOmMmM
- For detailed documentation, visit: https://maigret.readthedocs.io/en/latest/
env:
GITHUB_TOKEN: ${{ github.token }}
env:
GITHUB_TOKEN: ${{ github.token }}
-3
View File
@@ -22,9 +22,6 @@ jobs:
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install system dependencies
run: |
sudo apt-get update && sudo apt-get install -y libcairo2-dev
- name: Install dependencies
run: |
python -m pip install --upgrade pip
+17 -40
View File
@@ -1,57 +1,34 @@
name: Update sites rating and statistics
on:
push:
branches: [ main ]
concurrency:
group: update-sites-${{ github.ref }}
cancel-in-progress: true
pull_request:
branches: [ dev ]
types: [opened, synchronize]
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@v2.3.2
with:
ref: main
ref: ${{ github.event.pull_request.head.sha }}
fetch-depth: 0 # otherwise, there would be errors pushing refs to the destination repository.
- name: Install system dependencies
run: |
sudo apt-get update && sudo apt-get install -y libcairo2-dev
- name: Build application
- name: build application
run: |
pip3 install .
python3 ./utils/update_site_data.py --empty-only
- name: Remove ambiguous main tag
run: git tag -d main || true
- name: Check for meaningful changes
id: check
- name: Commit and push changes
run: |
REAL_CHANGES=$(git diff --unified=0 sites.md | grep '^[+-][^+-]' | grep -v 'The list was updated at' | wc -l)
if [ "$REAL_CHANGES" -gt 0 ]; then
echo "has_changes=true" >> $GITHUB_OUTPUT
else
echo "has_changes=false" >> $GITHUB_OUTPUT
fi
- name: Delete existing PR branch
if: steps.check.outputs.has_changes == 'true'
run: git push origin --delete auto/update-sites-list || true
- name: Create Pull Request
if: steps.check.outputs.has_changes == 'true'
uses: peter-evans/create-pull-request@v7
with:
token: ${{ secrets.GITHUB_TOKEN }}
commit-message: "Updated site list and statistics"
title: "Automated Sites List Update"
body: "Automated changes to sites.md based on new Alexa rankings/statistics."
branch: "auto/update-sites-list"
base: main
delete-branch: true
git config --global user.name "Maigret autoupdate"
git config --global user.email "soxoj@protonmail.com"
echo `git name-rev ${{ github.event.pull_request.head.sha }} --name-only`
export BRANCH=`git name-rev ${{ github.event.pull_request.head.sha }} --name-only | sed 's/remotes\/origin\///'`
echo $BRANCH
git remote -v
git checkout $BRANCH
git add sites.md
git commit -m "Updated site list and statistics"
git push origin $BRANCH
+7 -9
View File
@@ -1,18 +1,16 @@
FROM python:3.11-slim
FROM python:3.10-slim
LABEL maintainer="Soxoj <soxoj@protonmail.com>"
WORKDIR /app
RUN pip install --no-cache-dir --upgrade pip
RUN apt-get update && \
apt-get install --no-install-recommends -y \
build-essential \
python3-dev \
pkg-config \
libcairo2-dev \
gcc \
musl-dev \
libxml2 \
libxml2-dev \
libxslt1-dev \
&& rm -rf /var/lib/apt/lists/* /tmp/*
libxslt-dev \
&& \
rm -rf /var/lib/apt/lists/* /tmp/*
COPY . .
RUN YARL_NO_EXTENSIONS=1 python3 -m pip install --no-cache-dir .
# For production use, set FLASK_HOST to a specific IP address for security
ENV FLASK_HOST=0.0.0.0
ENTRYPOINT ["maigret"]
-452
View File
@@ -1,452 +0,0 @@
# Site checks — guide (Maigret)
Working document for future changes: workflow, findings from reviews, and practical steps. See also [`site-checks-playbook.md`](site-checks-playbook.md) (short checklist), [`socid_extractor_improvements.log`](socid_extractor_improvements.log) (proposals for upstream identity extraction), and the code in [`maigret/checking.py`](../maigret/checking.py).
**Documentation maintenance:** whenever you improve Maigret, add search tooling, or change check logic, update **this file** and [`site-checks-playbook.md`](site-checks-playbook.md) in sync (see the section at the end). If you change rules about the JSON API check or the `socid_extractor` log format, update **[`socid_extractor_improvements.log`](socid_extractor_improvements.log)** (template / header) together with this guide.
---
## 1. How checks work
Logic lives in `process_site_result` ([`maigret/checking.py`](../maigret/checking.py)):
| `checkType` | Meaning |
|-------------|---------|
| `message` | Profile is “found” if the HTML contains **none** of the `absenceStrs` substrings **and** at least one `presenseStrs` marker matches. If `presenseStrs` is **empty**, presence is treated as true for **any** page (risky configuration). |
| `status_code` | HTTP **2xx** is enough — only safe if the server does **not** return 200 for “user not found”. |
| `response_url` | Custom flow with **redirects disabled** so the status/URL of the *first* response can be used. |
For other `checkType` values, [`make_site_result`](../maigret/checking.py) sets **`allow_redirects=True`**: the client follows redirects and `process_site_result` sees the **final** response body and status (not the pre-redirect hop). You do **not** need to “turn on” follow-redirect separately for most sites.
Sites with an `engine` field (e.g. XenForo) are merged with a template from the `engines` section in [`maigret/resources/data.json`](../maigret/resources/data.json) ([`MaigretSite.update_from_engine`](../maigret/sites.py)).
### `urlProbe`: probe URL vs reported profile URL
- **`url`** — pattern for the **public profile page** users should open (what appears in reports as `url_user`). Supports `{username}`, `{urlMain}`, `{urlSubpath}`; the username segment is URL-encoded when the string is built ([`make_site_result`](../maigret/checking.py)).
- **`urlProbe`** (optional) — if set, Maigret sends the HTTP **GET** (or HEAD where applicable) to **this** URL for the check, instead of to `url`. Same placeholders. Use it when the reliable signal is a **JSON/API** endpoint but the human-facing link must stay on the main site (e.g. `https://picsart.com/u/{username}` + probe `https://api.picsart.com/users/show/{username}.json`, or GitHubs `https://github.com/{username}` + `https://api.github.com/users/{username}`).
If `urlProbe` is omitted, the probe URL defaults to `url`.
### Redirects and final URL as a signal
If the **HTML shell** looks the same for “user exists” and “user does not exist” (typical SPA), it is still worth checking whether the **server** behaves differently:
- **Final URL** after redirects (e.g. profile canonical URL vs `/404` path).
- **Redirect chain** length or target host (e.g. lander vs profile).
If that differs reliably, you may be able to use **`checkType`: `response_url`** in [`data.json`](../maigret/resources/data.json) (no auto-follow) or extend logic — but only when the difference is stable.
**Server-side HTTP vs client-side navigation.** Maigret follows **HTTP** redirects only; it does **not** run JavaScript. If the browser shows a navigation to `/u/name/posts` or `/not-found` **after** the SPA bundle loads, that may never appear as an extra hop in `curl`/aiohttp — only a **trailing-slash** `301` might show up. Always confirm with `curl -sIL` / a small script whether the **Location** chain differs for real vs fake users before relying on URL-based rules.
**Empirical check (claimed vs non-existent usernames, `GET` with follow redirects, no JS):**
| Site | Result |
|------|--------|
| **Kaskus** | No HTTP redirects beyond the request path; same generic `<title>` and near-identical body length — **no** discriminating signal from redirects alone. |
| **Bibsonomy** | Both requests redirect to **`/pow-challenge/?return=/user/...`** (proof-of-work). Only the `return` path changes with the username; **both** existing and fake hit the same challenge flow — not a profile-vs-missing distinction. |
| **Picsart (web UI `https://picsart.com/u/{username}`)** | Only a **trailing-slash** `301`; the first HTML is the same empty app shell (~3 KiB) for real and fake users. Browser-only routes such as `…/posts` vs `…/not-found` are **not** visible as additional HTTP redirects in this pipeline. |
**Picsart — workable check via public API.** The site exposes **`https://api.picsart.com/users/show/{username}.json`**: JSON with `"status":"success"` and a user object when the account exists, and `"reason":"user_not_found"` when it does not. Put that URL in **`urlProbe`**, set **`url`** to the web profile pattern **`https://picsart.com/u/{username}`**, and use **`checkType`: `message`** with narrow `presenseStrs` / `absenceStrs` so reports show the human link while the request hits the API (see **`urlProbe`** above).
For **Kaskus** and **Bibsonomy**, HTTP-level comparison still does **not** unlock a safe check without PoW / richer signals; keep **`disabled: true`** until something stable appears (API, SSR markers, etc.).
---
## 2. Standard checks: public JSON API and `socid_extractor` log
### 2.1 Public JSON API (always)
When diagnosing a site—especially **SPAs**, **soft 404s**, or **near-identical HTML** for real vs fake users—**routinely look for a public JSON (or JSON-like) API** used for profile or user lookup. Typical leads: paths containing `/api/`, `/v1/`, `graphql`, `users/show`, `.json` suffixes, or the same endpoints mobile apps use. Verify with `curl` (or the Maigret request path) that **claimed** and **unclaimed** usernames produce **reliably different** bodies or status codes. If such an endpoint is more stable than HTML, put it in **`urlProbe`** and keep **`url`** as the canonical profile page on the main site (see **`urlProbe`** in section 1). If there is no separate public URL for humans, you may still point **`url`** at the API only (reports will show that URL).
This is a **standard** part of site-check work, not an optional extra.
### 2.2 Mandatory: [`LLM/socid_extractor_improvements.log`](socid_extractor_improvements.log)
If you discover **either**:
1. **JSON embedded in HTML** with user/profile fields (inline scripts, `__NEXT_DATA__`, `application/ld+json`, hydration blobs, etc.), or
2. A **standalone JSON HTTP response** (public API) with user/profile data for that service,
you **must append** a proposal block to **[`LLM/socid_extractor_improvements.log`](socid_extractor_improvements.log)**.
**Why:** Maigret calls [`socid_extractor.extract`](https://pypi.org/project/socid-extractor/) on the response body ([`extract_ids_data` in `checking.py`](../maigret/checking.py)) to fill `ids_data`. New payloads usually need a **new scheme** upstream (`flags`, `regex`, optional `extract_json`, `fields`, optional `url_mutations` / `transforms`), matching patterns such as **`GitHub API`** or **`Gitlab API`** in `socid_extractor`s `schemes.py`.
**Each log entry must include:**
- **Date** — ISO `YYYY-MM-DD` (day you add the entry).
- **Example username** — Prefer the sites `usernameClaimed` from `data.json`, or any account that reproduces the payload.
- **Proposal** — Use the **block template** in the log file: detection idea, optional URL mutation, and field mappings in the same style as existing schemes.
If the service is **already covered** by an existing `socid_extractor` scheme, add a **short** entry anyway (date, example username, scheme name, “already implemented”) so there is an audit trail.
Do **not** paste secrets, cookies, or full private JSON; short key names and structure hints are enough.
---
## 3. Improvement workflow
### Phase A — Reproduce
1. Targeted run:
```bash
maigret --db /path/to/maigret/resources/data.json \
TEST_USERNAME \
--site "SiteName" \
--print-not-found --print-errors \
--no-progressbar -vv
```
2. Run separately with a **real** existing username and a **definitely non-existent** one (as `usernameClaimed` / `usernameUnclaimed` in JSON).
3. If needed: `-vvv` and `debug.log` (raw response).
4. Automated pair check:
```bash
maigret --db ... --self-check --site "SiteName" --no-progressbar
```
### Phase B — Classify the cause
| Symptom | Likely cause |
|---------|----------------|
| False “found” with `status_code` | Soft 404 (200 on a “not found” page). |
| False “found” with `message` | Overly broad `presenseStrs` (`name`, `email`, JSON keys) or stale `absenceStrs`. |
| Same HTML for different users | SPA / skeleton shell before hydration — also compare **final URL / redirect chain** (see above); if still identical, often `disabled`. |
| Login page instead of profile | XenForo etc.: guest, `ignore403`, “must be logged in” strings. |
| reCAPTCHA / “Checking your browser” / “not a bot” | Bot protection; Maigrets default User-Agent may worsen the response. |
| Redirect to another domain / lander | Stale URL template. |
### Phase C — Edits in [`data.json`](../maigret/resources/data.json)
1. Update `url` / `urlMain` if needed (HTTPS, new profile path).
2. Replace inappropriate `status_code` with `message` (or `response_url`), choosing:
- **`absenceStrs`** — only what reliably appears on the “user does not exist” page;
- **`presenseStrs`** — narrow markers of a real profile (avoid generic words).
3. For XenForo: override only fields that differ in the site entry; do not break the global `engines` template.
4. Refresh `usernameClaimed` / `usernameUnclaimed` if reference accounts disappeared.
5. Set **`headers`** (e.g. another `User-Agent`) if the site serves a captcha only to “suspicious” clients.
6. Use **`errors`**: HTML substring → meaningful check error (UNKNOWN), so it is not confused with “available”.
### Phase D — Decision criteria
| Outcome | When to use |
|---------|-------------|
| **Check fixed** | The `claimed` / `unclaimed` pair behaves predictably, `--self-check` passes, no regression on a similar site with the same engine. |
| **Check disabled** (`disabled: true`) | Cloudflare / anti-bot / login required / indistinguishable SPA without stable markers. |
| **Entry removed** | **Only** if the domain/service is gone (NXDOMAIN, clearly dead project), not “because it is hard to fix”. |
### Phase E — Before commit
- `maigret --self-check` for affected sites.
- `make test`.
---
## 4. Findings from reviews (concrete site batch)
Summary from an earlier false-positive review for: OpenSea, Mercado Livre, Redtube, Toms Guide, Kaggle, Kaskus, Livemaster, TechPowerUp, authorSTREAM, Bibsonomy, Bulbagarden, iXBT, Serebii, Picsart, Hashnode, hi5.
### What most often broke checks
1. **`status_code` where content checks are needed** — soft 404 with status 200.
2. **Broad `presenseStrs`** — matches on error pages or generic SPA shells.
3. **XenForo + guest** — HTML includes strings like “You must be logged in” that overlap the engine template.
4. **User-Agent** — on some sites (e.g. Kaggle) the default UA triggered a reCAPTCHA page instead of profile HTML; a deliberate `User-Agent` in site `headers` helped.
5. **SPAs and redirects** — identical first HTML, redirect to lander / another product (hi5 → Tagged), URL format changes by region (Mercado Livre).
### What worked as a fix
- Switching to **`message`** with narrow strings from **`<title>`** or unique markup where stable (**Kaggle**, **Mercado Livre**, **Hashnode**).
- For **Kaggle**, additionally: **`headers`**, **`errors`** for browser-check text.
- **Redtube** stayed valid on **`status_code`** with a stable **404** for non-existent users.
- **Picsart**: the web profile URL is a thin SPA shell; use the **JSON API** (`api.picsart.com/users/show/{username}.json`) in **`url`** with **`message`**-style markers (`"status":"success"` vs `user_not_found`), not the browser-only `/posts` vs `/not-found` navigation.
- For **Weblate / Anubis Anti-Bot**: Setting `headers` with a basic script User-Agent (e.g. `python-requests/2.25.1`) rather than the default browser UA completely bypassed the Anubis Proof-of-Work challenge HTTP 307 redirect, instantly recovering the native HTTP 404 framework.
### What required disabling checks
Where you **cannot** reliably tell “profile exists” from “no profile” without bypassing protection, login, or full JS:
- Anti-bot / captcha / “not a bot” page;
- Guest-only access to the needed page;
- SPA with indistinguishable first response;
- Forums returning **403** and a login page instead of a member profile for the member-search URL;
- Stale URLs that redirect to a stub.
In those cases **`disabled: true`** is better than false “found”; remove the DB entry only on **actual** domain death.
### Code notes
- For the `status_code` branch in `process_site_result`, use **strict** comparison `check_type == "status_code"`, not a substring match inside `"status_code"`.
- Treat empty `presenseStrs` with `message` as risky: when debugging, watch DEBUG-level logs if that diagnostics exists in code.
---
## 5. Future ideas (Maigret improvements)
- A mode or script: one site, two usernames, print statuses and first N bytes of the response (wrapper around `maigret()`).
- Document in CLI help that **`--use-disabled-sites`** is needed to analyze disabled entries.
---
## 6. Development utilities
### 6.1 `utils/site_check.py` — Single site diagnostics
A comprehensive utility for testing individual sites with multiple modes:
```bash
# Basic comparison of claimed vs unclaimed (aiohttp)
python utils/site_check.py --site "VK" --check-claimed
# Test via Maigret's checker directly
python utils/site_check.py --site "VK" --maigret
# Compare aiohttp vs Maigret results (find discrepancies)
python utils/site_check.py --site "VK" --compare-methods
# Full diagnosis with recommendations
python utils/site_check.py --site "VK" --diagnose
# Test with custom URL
python utils/site_check.py --url "https://example.com/{username}" --compare user1 user2
# Find a valid username for a site
python utils/site_check.py --site "VK" --find-user
```
**Key features:**
- `--maigret` — Uses Maigret's actual checking code, not raw aiohttp
- `--compare-methods` — Shows if aiohttp and Maigret see different results (useful for debugging)
- `--diagnose` — Validates checkType against actual responses, suggests fixes
- Color output with markers detection (captcha, cloudflare, login, etc.)
- `--json` flag for machine-readable output
**When to use each mode:**
| Mode | Use case |
|------|----------|
| `--check-claimed` | Quick sanity check: do claimed/unclaimed still differ? |
| `--maigret` | Verify Maigret's actual behavior matches expectations |
| `--compare-methods` | Debug "works in curl but fails in Maigret" issues |
| `--diagnose` | Full analysis when a site is broken, get fix recommendations |
### 6.2 `utils/check_top_n.py` — Mass site checking
Batch-check top N sites by Alexa rank with categorized reporting:
```bash
# Check top 100 sites
python utils/check_top_n.py --top 100
# Faster with more parallelism
python utils/check_top_n.py --top 100 --parallel 10
# Output JSON report
python utils/check_top_n.py --top 100 --output report.json
# Only show broken sites
python utils/check_top_n.py --top 100 --only-broken
```
**Output categories:**
- `working` — Site check passes
- `broken` — Check fails (wrong status, missing markers)
- `timeout` — Request timed out
- `anti_bot` — 403/429 or captcha detected
- `error` — Connection or other errors
- `disabled` — Already disabled in data.json
**Report includes:**
- Summary counts by category
- List of broken sites with issues
- Recommendations for fixes (e.g., "Switch to checkType: status_code")
### 6.3 Self-check behavior (`--self-check`)
The self-check command has been improved to be less aggressive:
```bash
# Check sites WITHOUT auto-disabling (default)
maigret --self-check --site "VK"
# Auto-disable failing sites (old behavior)
maigret --self-check --site "VK" --auto-disable
# Show detailed diagnosis for each failure
maigret --self-check --site "VK" --diagnose
```
**Behavior changes:**
| Flag | Effect |
|------|--------|
| `--self-check` alone | Reports issues but does NOT disable sites |
| `--auto-disable` | Automatically disables sites that fail (opt-in) |
| `--diagnose` | Prints detailed diagnosis with recommendations |
**Why this matters:**
- Old behavior was too aggressive — sites got disabled without explanation
- New behavior reports issues and suggests fixes
- Explicit `--auto-disable` required to modify database
---
## 7. Lessons learned (practical observations)
Collected from hands-on work fixing top-ranked sites (Reddit, Wikipedia, Microsoft Learn, Baidu, etc.).
### 7.1 JSON API is the first thing to look for
Both Reddit and Microsoft Learn had working public APIs that solved the problem entirely. The web pages were SPAs or blocked by anti-bot measures, but the APIs worked reliably:
- **Reddit**: `https://api.reddit.com/user/{username}/about` — returns JSON with user data or `{"message": "Not Found", "error": 404}`.
- **Microsoft Learn**: `https://learn.microsoft.com/api/profiles/{username}` — returns JSON with `userName` field or HTTP 404.
This confirms the playbook recommendation: always check for `/api/`, `.json`, GraphQL endpoints before giving up on a site.
### 7.2 `urlProbe` is a powerful tool
It separates "what we check" (API) from "what we show the user" (human-readable profile URL). Reddit is a perfect example:
```json
{
"url": "https://www.reddit.com/user/{username}",
"urlProbe": "https://api.reddit.com/user/{username}/about",
"checkType": "message",
"presenseStrs": ["\"name\":"],
"absenceStrs": ["Not Found"]
}
```
The check hits the API, but reports display `www.reddit.com/user/blue`.
### 7.3 aiohttp ≠ curl ≠ requests
Wikipedia returned HTTP 200 for `curl` and Python `requests`, but HTTP 403 for `aiohttp`. This is **TLS fingerprinting** — the server identifies the HTTP library by cryptographic characteristics of the TLS handshake, not by headers.
**Key insight:** Changing `User-Agent` does **not** help against TLS fingerprinting. Always test with aiohttp directly (or via Maigret with `-vvv` and `debug.log`), not just `curl`.
```python
# This returns 403 for Wikipedia even with browser UA:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers={"User-Agent": "Mozilla/5.0 ..."}) as resp:
print(resp.status) # 403
```
### 7.4 HTTP 403 in Maigret can mean different things
Initially it seemed Wikipedia was returning 403, but `curl` showed 200. Only `debug.log` revealed the real picture — aiohttp was getting blocked at TLS level.
**Lesson:** Use `-vvv` flag and inspect `debug.log` for raw response status and body. The warning message alone may be misleading.
### 7.5 Dead services migrate, not disappear
MSDN Social and TechNet profiles redirected to Microsoft Learn. Instead of deleting old entries:
1. Keep old entries with `disabled: true` as historical record.
2. Create a new entry for the current service with working API.
This preserves audit trail and avoids breaking existing workflows.
### 7.6 `status_code` is more reliable than `message` for APIs
Microsoft Learn API returns HTTP 404 for non-existent users — a clean signal without HTML parsing. For JSON APIs that return proper HTTP status codes, `status_code` is often the best choice:
```json
{
"checkType": "status_code",
"urlProbe": "https://learn.microsoft.com/api/profiles/{username}"
}
```
No need for fragile string matching when the API speaks HTTP correctly.
### 7.8 Engine templates can silently break across many sites
The **vBulletin** engine template has `absenceStrs` in five languages ("This user has not registered…", "Пользователь не зарегистрирован…", etc.). In a batch review of ~12 vBulletin forums (oneclickchicks, mirf, Pesiq, VKMOnline, forum.zone-game.info, etc.), **none** of the absence strings matched — the forums returned identical pages for both claimed and unclaimed usernames. Root cause: many of these forums require login to view member profiles, so they serve a generic page (no "user not registered" message at all) instead of an informative error.
**Lesson:** When a whole engine class shows false positives, do not patch sites one by one — check whether the **engine template** itself still matches the actual error pages. A template written for one version/language pack may silently stop working after a forum upgrade or config change.
### 7.9 Search-by-author URLs are architecturally unreliable
Several sites (OnanistovNet, Shoppingzone, Pogovorim, Astrogalaxy, Sexwin) used a phpBB-style `search.php?keywords=&terms=all&author={username}` URL as the check endpoint. This searches for **posts** by that author, not for the user account itself. Even if the markers worked, a user who exists but has zero posts would be indistinguishable from a non-existent user. And in practice, the sites changed their response format — some now return HTTP 404, others dropped the expected Russian absence text altogether.
**Lesson:** Avoid author-search URLs as the check endpoint; they test "has posts" rather than "account exists" and are doubly fragile (both logic mismatch and format drift).
### 7.10 Some sites generate a page for any path — permanent false positives
Two distinct patterns:
- **Pbase** creates a stub page titled "pbase Artist {username}" for **every** URL, real or fake. Both return HTTP 200 with nearly identical content (~3.3 KB). No markers can distinguish them.
- **ffm.bio** is even trickier: for the non-existent username `a.slomkoowski` it generated a page titled "mr.a" with description "a is a", apparently fuzzy-matching the path to the closest real entry. Both return HTTP 200 with large, content-rich pages.
**Lesson:** Before writing markers for a site, verify that the "unclaimed" URL actually produces an **error-like** response (different status, different title, unique error text). If the site always returns a plausible-looking page, no combination of `presenseStrs` / `absenceStrs` will help — `disabled: true` is the only safe option.
### 7.11 TLS fingerprinting can degrade over time (Kaggle)
Kaggle was previously fixed with a custom `User-Agent` header and `errors` for the "Checking your browser" captcha page. In the latest batch review, aiohttp receives HTTP 404 with identical content for **both** claimed and unclaimed usernames — the site now blocks the entire request before it reaches the profile page. This matches the TLS fingerprinting pattern seen earlier with Wikipedia (section 7.3), but here the degradation happened **after** a working fix was already in place.
**Lesson:** Sites that rely on bot-detection can tighten their rules at any time. A working `User-Agent` override today may fail tomorrow. When a previously fixed site starts returning identical responses for both usernames, suspect TLS fingerprinting first, and accept `disabled: true` if no public API is available.
### 7.12 API endpoints may bypass Cloudflare even when the main site is blocked
All four Fandom wikis returned HTTP 403 with a Cloudflare "Just a moment..." challenge when aiohttp accessed the user profile page (`/wiki/User:{username}`). However, the **MediaWiki API** on the same domain (`/api.php?action=query&list=users&ususers={username}&format=json`) returned clean JSON without any challenge. Similarly, **Substack** served a captcha-laden SPA for `/@{username}`, but its `public_profile` API (`/api/v1/user/{username}/public_profile`) responded with proper JSON and correct HTTP 404 for missing users.
This is likely because API routes are excluded from the Cloudflare WAF rules or use a different pipeline than the HTML-serving paths.
**Lesson:** When a site's main pages are blocked by Cloudflare or similar WAF, still check API endpoints on the **same domain** — they may not go through the same protection layer. This is especially true for:
- MediaWiki's `api.php` on wiki farms (Fandom, Wikia, self-hosted MediaWiki)
- REST API paths (`/api/v1/`, `/api/v2/`) on SPA-heavy sites
- Internal data endpoints that the SPA itself calls
### 7.13 GraphQL APIs often support GET, not just POST
**hashnode** exposes a GraphQL endpoint at `https://gql.hashnode.com`. While GraphQL is typically associated with POST requests, many implementations also support **GET** with the query passed as a URL parameter. This is critical for Maigret, which only supports GET/HEAD for `urlProbe`.
```
GET https://gql.hashnode.com?query=%7Buser(username%3A%20%22melwinalm%22)%20%7B%20name%20username%20%7D%7D
→ {"data":{"user":{"name":"Melwin D'Almeida","username":"melwinalm"}}}
GET https://gql.hashnode.com?query=%7Buser(username%3A%20%22a.slomkoowski%22)%20%7B%20name%20username%20%7D%7D
→ {"data":{"user":null}}
```
**Lesson:** Before giving up on a GraphQL-only site, try the same query via GET with `?query=...` (URL-encoded). Many GraphQL servers accept both methods.
### 7.14 URL-encoding resolves template placeholder conflicts
The hashnode GraphQL query `{user(username: "{username}") { name }}` contains curly braces that conflict with Maigret's `{username}` placeholder — Python's `str.format()` would raise a `KeyError` on `{user(username...}`.
The fix: URL-encode the GraphQL braces (`{` → `%7B`, `}` → `%7D`) but leave `{username}` as-is. Python's `.format()` only interprets literal `{…}` as placeholders, not `%7B…%7D`, and the GraphQL server decodes the percent-encoding on its end:
```
urlProbe: https://gql.hashnode.com?query=%7Buser(username%3A%20%22{username}%22)%20%7B%20name%20username%20%7D%7D
```
After `.format(username="melwinalm")`:
```
https://gql.hashnode.com?query=%7Buser(username%3A%20%22melwinalm%22)%20%7B%20name%20username%20%7D%7D
```
**Lesson:** When a `urlProbe` needs literal curly braces (GraphQL, JSON in URL, etc.), percent-encode them. This is a general technique for any `data.json` URL field processed by `.format()`.
### 7.7 The playbook classification works
The decision tree from the documentation accurately describes real-world cases:
| Situation | Playbook says | Actual result |
|-----------|---------------|---------------|
| Captcha (Baidu) | `disabled: true` | Correct |
| TLS fingerprinting (Wikipedia) | `disabled: true` (anti-bot) | Correct |
| Working API available (Reddit, MS Learn) | Use `urlProbe` | Correct |
| Service migrated (MSDN → MS Learn) | Update URL or create new entry | Correct |
---
## Documentation maintenance
For any of the changes below, **always** keep these artifacts in sync — this file ([`site-checks-guide.md`](site-checks-guide.md)), [`site-checks-playbook.md`](site-checks-playbook.md), and (when rules or templates change) the header/template in [`socid_extractor_improvements.log`](socid_extractor_improvements.log):
- Maigret code changes (including [`maigret/checking.py`](../maigret/checking.py), request executors, CLI);
- New or changed search tools / helper utilities for site checks;
- Changes to rules or semantics of `checkType`, `data.json` fields, self-check, etc.;
- Changes to the **public JSON API** diagnostic step or **mandatory** `socid_extractor` logging rules.
Prefer updating the guide, playbook, and log template in one commit or in the same task so instructions do not diverge. **Append-only:** new proposals go at the bottom of `socid_extractor_improvements.log`; do not delete historical entries when editing the template.
-87
View File
@@ -1,87 +0,0 @@
# Site checks — playbook (Maigret)
Short checklist for edits to [`maigret/resources/data.json`](../maigret/resources/data.json) and, when needed, [`maigret/checking.py`](../maigret/checking.py). Full guide: [`site-checks-guide.md`](site-checks-guide.md). Upstream extraction proposals: [`socid_extractor_improvements.log`](socid_extractor_improvements.log).
**Documentation maintenance:** whenever you improve Maigret, add search tooling, or change check logic, update **both** this file and [`site-checks-guide.md`](site-checks-guide.md) (see the “Documentation maintenance” section at the end of that file). When JSON API / `socid_extractor` logging rules change, update the **template header** in [`socid_extractor_improvements.log`](socid_extractor_improvements.log) in the same change.
## 0. Standard checks (do alongside reproduce / classify)
- **Public JSON API:** always look for a stable JSON (or GraphQL JSON) profile endpoint (`/api/`, `.json`, mobile-style URLs). When the API is more reliable than HTML, set **`urlProbe`** to that endpoint and keep **`url`** as the human-readable profile link (e.g. `https://picsart.com/u/{username}`). If there is no separate profile URL, use the API as `url` only. Details: **`urlProbe`** and section **2.1** in [`site-checks-guide.md`](site-checks-guide.md).
- **`socid_extractor` log (mandatory):** if you find **embedded user JSON in HTML** or a **standalone JSON profile API**, append a dated entry (with **example username**) to [`socid_extractor_improvements.log`](socid_extractor_improvements.log). Details: section **2.2** in [`site-checks-guide.md`](site-checks-guide.md).
## 1. Reproduce
- Run a targeted check:
`maigret USER --db /path/to/maigret/resources/data.json --site "SiteName" --print-not-found --print-errors --no-progressbar -vv`
- Compare an **existing** and a **non-existent** username (as `usernameClaimed` / `usernameUnclaimed` in JSON).
- With `-vvv`, inspect `debug.log` (raw response in the log).
## 2. Classify the cause
| Symptom | Typical cause | Action |
|--------|-----------------|--------|
| HTTP 200 for “user does not exist” | Soft 404 | Move from `status_code` to `message` or `response_url`; add `absenceStrs` / narrow `presenseStrs` |
| Generic words match (`name`, `email`) | `presenseStrs` too broad | Remove generic markers; add profile-specific ones |
| Same HTML without JS | SPA / skeleton shell | Compare **final URL and HTTP redirects** (Maigret already follows redirects by default). If the browser shows extra routes (`/posts`, `/not-found`) only **after JS**, they will **not** appear to Maigret — try a **public JSON/API** endpoint for the same site if one exists. See **Redirects and final URL** and **Picsart** in [`site-checks-guide.md`](site-checks-guide.md). |
| 403 / “Log in” / guest-only | Auth or anti-bot required | `disabled: true` |
| reCAPTCHA / “Checking your browser” | Bot protection | Try a reasonable `User-Agent` in `headers`; else `errors` + UNKNOWN or `disabled` |
| Domain does not resolve / persistent timeout | Dead service | Remove entry **only** after confirming the domain is dead |
## 3. Data edits
1. Update `url` / `urlMain` if needed (HTTPS redirects). Use optional **`urlProbe`** when the HTTP check should hit a different URL than the profile link shown in reports (API vs web UI).
2. For `message`: **always** tune string pairs so `absenceStrs` fire on “no user” pages and `presenseStrs` fire on real profiles without false absence hits.
3. Engine (`engine`, e.g. XenForo): override only differing fields in the site entry so other sites are not broken.
4. Keep `status_code` only if the response **reliably** differs by status code without soft 404.
## 4. Verify
- `maigret --self-check --site "SiteName" --db ...` for touched entries.
- `make test` before commit.
## 5. Code notes
- `process_site_result` uses strict comparison to `"status_code"` for `checkType` (not a substring trick).
- Empty `presenseStrs` with `message` means “presence always true”; a debug line is logged only at DEBUG level.
## 6. Development utilities
Quick reference for site check utilities. Full details: section **6** in [`site-checks-guide.md`](site-checks-guide.md).
| Command | Purpose |
|---------|---------|
| `python utils/site_check.py --site "X" --check-claimed` | Quick aiohttp comparison |
| `python utils/site_check.py --site "X" --maigret` | Test via Maigret checker |
| `python utils/site_check.py --site "X" --compare-methods` | Find aiohttp vs Maigret discrepancies |
| `python utils/site_check.py --site "X" --diagnose` | Full diagnosis with fix recommendations |
| `python utils/check_top_n.py --top 100` | Mass-check top 100 sites |
| `maigret --self-check --site "X"` | Self-check (reports only, no auto-disable) |
| `maigret --self-check --site "X" --auto-disable` | Self-check with auto-disable |
| `maigret --self-check --site "X" --diagnose` | Self-check with detailed diagnosis |
## 7. Quick tips (lessons learned)
Practical observations from fixing top-ranked sites. Full details: section **7** in [`site-checks-guide.md`](site-checks-guide.md).
| Tip | Why it matters |
|-----|----------------|
| **API first** | Reddit, Microsoft Learn — APIs worked when web pages were blocked. Always check `/api/`, `.json` endpoints. |
| **`urlProbe` separates check from display** | Check via API, show human URL in reports. Example: Reddit API → `www.reddit.com/user/` link. |
| **aiohttp ≠ curl** | Wikipedia returned 200 for curl, 403 for aiohttp (TLS fingerprinting). Always test with Maigret directly. |
| **Use `debug.log`** | Run with `-vvv` to see raw response. Warning messages alone can be misleading. |
| **`status_code` for clean APIs** | If API returns proper 404 for missing users, prefer `status_code` over `message`. |
| **Migrate, don't delete** | MSDN → Microsoft Learn: keep old entry disabled, create new one for current service. |
| **Engine templates break silently** | vBulletin `absenceStrs` failed on ~12 forums at once — many require login, showing a generic page with no error text. Check the engine template first. |
| **Search-by-author is unreliable** | phpBB `search.php?author=` checks for posts, not accounts. A user with zero posts looks identical to a non-existent user. Avoid these URLs. |
| **Some sites always generate a page** | Pbase stubs "pbase Artist {name}" for any path; ffm.bio fuzzy-matches to the nearest real entry. No markers can help — `disabled: true`. |
| **TLS fingerprinting degrades over time** | Kaggle's custom `User-Agent` fix stopped working — aiohttp now gets 404 for both usernames. Accept `disabled: true` when no API exists. |
| **API endpoints bypass Cloudflare** | Fandom `api.php` and Substack `/api/v1/` returned clean JSON while main pages were blocked by Cloudflare. Always try API paths on the same domain. |
| **Inspect Network tab for POST APIs** | Many modern platforms (e.g., Discord) heavily protect HTML profiles but expose unauthenticated `POST` endpoints for username checks. Maigret supports this natively: define `"request_method": "POST"` and `"request_payload": {"username": "{username}"}` in `data.json` to query them! |
| **Strict JSON markers are bulletproof** | When probing APIs, use `checkType: "message"` with exact JSON substrings (like `"{\"taken\": false}"`). Unlike HTML layout checks, this approach is immune to UI redesigns, A/B testing, and language translations. |
| **GraphQL supports GET too** | hashnode GraphQL works via `GET ?query=...` (URL-encoded). You can use either native POST payloads or GET `urlProbe` for GraphQL. |
| **URL-encode braces for template safety** | GraphQL `{...}` conflicts with Maigret's `{username}`. Use `%7B`/`%7D` for literal braces in `urlProbe``.format()` ignores percent-encoded chars. |
| **Anti-bot bypass via simple UA** | "Anubis" anti-bot PoW screens (like on Weblate) intercept modern browser UAs via HTTP 307. Hardcoding `"headers": {"User-Agent": "python-requests/2.25.1"}` circumvents the scraper filter and restores default detection logic. |
## 8. Documentation maintenance
When you change Maigret, add search tools, or change check logic, keep **this playbook**, [`site-checks-guide.md`](site-checks-guide.md), and (when applicable) the template in [`socid_extractor_improvements.log`](socid_extractor_improvements.log) aligned. New log **entries** are append-only at the bottom of that file.
+2 -2
View File
@@ -25,7 +25,7 @@
<i>The Commissioner Jules Maigret is a fictional French police detective, created by Georges Simenon. His investigation method is based on understanding the personality of different people and their interactions.</i>
<b>👉👉👉 [Online Telegram bot](https://t.me/maigret_search_bot)</b>
<b>👉👉👉 [Online Telegram bot](https://t.me/osint_maigret_bot)</b>
## About
@@ -53,7 +53,7 @@ See the full description of Maigret features [in the documentation](https://maig
## Installation
‼️ Maigret is available online via [official Telegram bot](https://t.me/maigret_search_bot). Consider using it if you don't want to install anything.
‼️ Maigret is available online via [official Telegram bot](https://t.me/osint_maigret_bot). Consider using it if you don't want to install anything.
### Windows
+121
View File
@@ -0,0 +1,121 @@
# Invalid Telegram PR Auto-Closer
This repository includes an automated solution to identify and close pull requests with titles matching the pattern "Invalid result https://t.me/...". These PRs are typically auto-generated or spam submissions that should not be processed.
## Components
### 1. Python Script (`utils/close_invalid_telegram_prs.py`)
A utility script that:
- Searches for open PRs matching the pattern "Invalid result https://t.me/..."
- Optionally closes them with a descriptive comment
- Supports dry-run mode for testing
- Uses the GitHub API to interact with the repository
#### Usage
```bash
# Dry run (show what would be closed without closing)
python utils/close_invalid_telegram_prs.py --dry-run
# Close matching PRs interactively
python utils/close_invalid_telegram_prs.py
# Close PRs with custom comment
python utils/close_invalid_telegram_prs.py --comment "Custom closure message"
# Use with different repository
python utils/close_invalid_telegram_prs.py --owner username --repo repository
```
#### Requirements
- Python 3.6+
- `requests` library: `pip install requests`
- GitHub personal access token with repository access
#### Authentication
Set your GitHub token via:
- Command line: `--token YOUR_TOKEN`
- Environment variable: `export GITHUB_TOKEN=YOUR_TOKEN`
### 2. GitHub Actions Workflow (`.github/workflows/close-invalid-telegram-prs.yml`)
An automated workflow that:
- Runs daily at 2 AM UTC (in dry-run mode by default)
- Can be manually triggered with option to actually close PRs
- Uses the repository's `GITHUB_TOKEN` for authentication
#### Manual Trigger
1. Go to the Actions tab in your GitHub repository
2. Select "Close Invalid Telegram PRs" workflow
3. Click "Run workflow"
4. Choose whether to run in dry-run mode or actually close PRs
### 3. Tests (`tests/test_close_invalid_telegram_prs.py`)
Unit tests that verify:
- Correct identification of matching PR titles
- Proper rejection of non-matching titles
- Case-insensitive pattern matching
- Whitespace handling
Run tests with:
```bash
python tests/test_close_invalid_telegram_prs.py
```
## Pattern Detection
The script identifies PRs with titles matching:
- `Invalid result https://t.me/...` (case insensitive)
- Various whitespace and formatting variations
- Any Telegram URL after the pattern
### Examples of Matching Titles
- "Invalid result https://t.me/someuser"
- "INVALID RESULT https://t.me/channel123"
- "Invalid Result https://t.me/bot_name"
- " Invalid result https://t.me/user/123 " (with whitespace)
### Examples of Non-Matching Titles
- "Valid result https://t.me/someuser" (not "Invalid")
- "Invalid results https://t.me/someuser" (plural "results")
- "Fix invalid result https://t.me/someuser" (extra words)
- "Invalid result http://t.me/someuser" (http instead of https)
## Security
- The GitHub Actions workflow only has the minimum required permissions
- The script requires explicit confirmation before closing PRs (except in automated mode)
- All actions are logged and can be audited
- Dry-run mode is available for testing
## Customization
You can customize the behavior by:
- Modifying the regex pattern in `is_invalid_telegram_pr()` function
- Changing the default comment message
- Adjusting the GitHub Actions schedule
- Adding additional validation logic
## Troubleshooting
### Common Issues
1. **Permission Denied**: Ensure your GitHub token has the required permissions
2. **No PRs Found**: This is normal if there are no matching PRs
3. **Rate Limiting**: The script handles GitHub API rate limits automatically
### Debug Mode
Run with verbose output:
```bash
python utils/close_invalid_telegram_prs.py --dry-run
```
This will show exactly which PRs match the pattern without closing them.
+1 -19
View File
@@ -31,32 +31,14 @@ two-letter country codes (**not a language!**). E.g. photo, dating, sport; jp, u
Multiple tags can be associated with one site. **Warning**: tags markup is
not stable now. Read more :doc:`in the separate section <tags>`.
``--exclude-tags`` - Exclude sites with specific tags from the search
(blacklist). E.g. ``--exclude-tags porn,dating`` will skip all sites
tagged with ``porn`` or ``dating``. Can be combined with ``--tags`` to
include certain categories while excluding others. Read more
:doc:`in the separate section <tags>`.
``-n``, ``--max-connections`` - Allowed number of concurrent connections
**(default: 100)**.
``-a``, ``--all-sites`` - Use all sites for scan **(default: top 500)**.
``--top-sites`` - Count of sites for scan ranked by Majestic Million
``--top-sites`` - Count of sites for scan ranked by Alexa Top
**(default: top 500)**.
**Mirrors:** After the top *N* sites by Majestic Million rank are chosen (respecting
``--tags``, ``--use-disabled-sites``, etc.), Maigret may add extra sites
whose database field ``source`` names a **parent platform** that itself falls
in the Majestic Million top *N* when ranking **including disabled** sites. For example,
if ``Twitter`` ranks in the first 500 by Majestic Million, a mirror such as ``memory.lol``
(with ``source: Twitter``) is included even though it has no rank and would
otherwise be cut off. The same applies to Instagram-related mirrors (e.g.
Picuki) when ``Instagram`` is in that parent top *N* by rank—even if the
official ``Instagram`` entry is disabled and not scanned by default, its
mirrors can still be pulled in. The final list is the ranked top *N* plus
these mirrors (no fixed upper bound on mirror count).
``--timeout`` - Time (in seconds) to wait for responses from sites
**(default: 30)**. A longer timeout will be more likely to get results
from slow sites. On the other hand, this may cause a long delay to
-35
View File
@@ -22,16 +22,8 @@ The supported methods (``checkType`` values in ``data.json``) are:
- ``status_code`` - checks that status code of the response is 2XX
- ``response_url`` - check if there is not redirect and the response is 2XX
.. note::
Maigret natively treats specific anti-bot HTTP status codes (like LinkedIn's ``HTTP 999``) as a standard "Not Found/Available" signal instead of throwing an infrastructure Server Error, gracefully preventing false positives.
See the details of check mechanisms in the `checking.py <https://github.com/soxoj/maigret/blob/main/maigret/checking.py#L339>`_ file.
.. note::
Maigret now uses the **Majestic Million** dataset for site popularity sorting instead of the discontinued Alexa Rank API. For backward compatibility with existing configurations and parsers, the ranking field in `data.json` and internal site models remains named ``alexaRank`` and ``alexa_rank``.
**Mirrors and ``--top-sites``:** When you limit scans with ``--top-sites N``, Maigret also includes *mirror* sites (entries whose ``source`` field points at a parent platform such as Twitter or Instagram) if that parent would appear in the Majestic Million top *N* when disabled sites are considered for ranking. See the **Mirrors** paragraph under ``--top-sites`` in :doc:`command-line-options`.
Testing
-------
@@ -120,33 +112,6 @@ There are few options for sites data.json helpful in various cases:
- ``headers`` - a dictionary of additional headers to be sent to the site
- ``requestHeadOnly`` - set to ``true`` if it's enough to make a HEAD request to the site
- ``regexCheck`` - a regex to check if the username is valid, in case of frequent false-positives
- ``requestMethod`` - set the HTTP method to use (e.g., ``POST``). By default, Maigret natively defaults to GET or HEAD.
- ``requestPayload`` - a dictionary with the JSON payload to send for POST requests (e.g., ``{"username": "{username}"}``), extremely useful for parsing GraphQL or modern JSON APIs.
``urlProbe`` (optional profile probe URL)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
By default Maigret performs the HTTP request to the same URL as ``url`` (the public profile link pattern).
If you set ``urlProbe`` in ``data.json``, Maigret **fetches** that URL for the presence check (API, GraphQL, JSON endpoint, etc.), while **reports and ``url_user``** still use ``url`` — the human-readable profile page users should open.
Placeholders: ``{username}``, ``{urlMain}``, ``{urlSubpath}`` (same as for ``url``). Example: GitHub uses ``url`` ``https://github.com/{username}`` and ``urlProbe`` ``https://api.github.com/users/{username}``; Picsart uses the web profile ``https://picsart.com/u/{username}`` and probes ``https://api.picsart.com/users/show/{username}.json``.
Implementation: ``make_site_result`` in `checking.py <https://github.com/soxoj/maigret/blob/main/maigret/checking.py>`_.
Site check fixes using LLM
--------------------------
.. note::
The ``LLM/`` directory at the root of the repository contains detailed instructions for editing site checks (in Markdown format): checklist, full guide to ``checkType`` / ``data.json`` / ``urlProbe``, handling false positives, searching for public JSON APIs, and the proposal log for ``socid_extractor``.
Main files:
- `site-checks-playbook.md <https://github.com/soxoj/maigret/blob/main/LLM/site-checks-playbook.md>`_ — short checklist
- `site-checks-guide.md <https://github.com/soxoj/maigret/blob/main/LLM/site-checks-guide.md>`_ — detailed guide
- `socid_extractor_improvements.log <https://github.com/soxoj/maigret/blob/main/LLM/socid_extractor_improvements.log>`_ — template and entries for identity extractor improvements
These files should be kept up-to-date whenever changes are made to the check logic in the code or in ``data.json``.
.. _activation-mechanism:
-16
View File
@@ -23,19 +23,3 @@ Usage
``--tags coding`` -- search on sites related to software development.
``--tags ucoz`` -- search on uCoz sites only (mostly CIS countries)
Blacklisting (excluding) tags
------------------------------
You can exclude sites with certain tags from the search using ``--exclude-tags``:
``--exclude-tags porn,dating`` -- skip all sites tagged with ``porn`` or ``dating``.
``--exclude-tags ru`` -- skip all Russian sites.
You can combine ``--tags`` and ``--exclude-tags`` to fine-tune your search:
``--tags forum --exclude-tags ru`` -- search on forum sites, but skip Russian ones.
In the web interface, the tag cloud supports three states per tag:
click once to **include** (green), click again to **exclude** (dark/strikethrough),
and click once more to return to **neutral** (red).
+1 -1
View File
@@ -13,7 +13,7 @@ Use Cases
---------
1. Search for accounts with username ``machine42`` on top 500 sites (by default, according to Majestic Million rank) from the Maigret DB.
1. Search for accounts with username ``machine42`` on top 500 sites (by default, according to Alexa rank) from the Maigret DB.
.. code-block:: console
+26 -191
View File
@@ -61,49 +61,30 @@ class SimpleAiohttpChecker(CheckerBase):
self.headers = None
self.allow_redirects = True
self.timeout = 0
self.allow_redirects = True
self.timeout = 0
self.method = 'get'
self.payload = None
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get', payload=None):
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get'):
self.url = url
self.headers = headers
self.allow_redirects = allow_redirects
self.timeout = timeout
self.method = method
self.payload = payload
return None
async def close(self):
pass
async def _make_request(
self, session, url, headers, allow_redirects, timeout, method, logger, payload=None
self, session, url, headers, allow_redirects, timeout, method, logger
) -> Tuple[str, int, Optional[CheckError]]:
try:
if method.lower() == 'get':
request_method = session.get
elif method.lower() == 'post':
request_method = session.post
elif method.lower() == 'head':
request_method = session.head
else:
request_method = session.get
kwargs = {
'url': url,
'headers': headers,
'allow_redirects': allow_redirects,
'timeout': timeout,
}
if payload and method.lower() == 'post':
if headers and headers.get('Content-Type') == 'application/x-www-form-urlencoded':
kwargs['data'] = payload
else:
kwargs['json'] = payload
async with request_method(**kwargs) as response:
request_method = session.get if method == 'get' else session.head
async with request_method(
url=url,
headers=headers,
allow_redirects=allow_redirects,
timeout=timeout,
) as response:
status_code = response.status
response_content = await response.content.read()
charset = response.charset or "utf-8"
@@ -160,7 +141,6 @@ class SimpleAiohttpChecker(CheckerBase):
self.timeout,
self.method,
self.logger,
self.payload,
)
if error and str(error) == "Invalid proxy response":
@@ -185,7 +165,7 @@ class AiodnsDomainResolver(CheckerBase):
self.logger = kwargs.get('logger', Mock())
self.resolver = aiodns.DNSResolver(loop=loop)
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get', payload=None):
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get'):
self.url = url
return None
@@ -211,7 +191,7 @@ class CheckerMock:
def __init__(self, *args, **kwargs):
pass
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get', payload=None):
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get'):
return None
async def check(self) -> Tuple[str, int, Optional[CheckError]]:
@@ -240,11 +220,6 @@ def detect_error_page(
if status_code == 403 and not ignore_403:
return CheckError("Access denied", "403 status code, use proxy/vpn")
elif status_code == 999:
# LinkedIn anti-bot / HTTP 999 workaround. It shouldn't trigger an infrastructure
# Server Error because it represents a valid "Not Found / Blocked" state for the username.
pass
elif status_code >= 500:
return CheckError("Server", f"{status_code} status code")
@@ -332,12 +307,6 @@ def process_site_result(
if html_text:
if not presense_flags:
if check_type == "message" and logger.isEnabledFor(logging.DEBUG):
logger.debug(
"Site %s uses checkType message with empty presenseStrs; "
"presence is treated as true for any page.",
site.name,
)
is_presense_detected = True
site.stats["presense_flag"] = None
else:
@@ -380,7 +349,7 @@ def process_site_result(
result = build_result(MaigretCheckStatus.CLAIMED)
else:
result = build_result(MaigretCheckStatus.AVAILABLE)
elif check_type == "status_code":
elif check_type in "status_code":
# Checks if the status code of the response is 2XX
if 200 <= status_code < 300:
result = build_result(MaigretCheckStatus.CLAIMED)
@@ -519,9 +488,7 @@ def make_site_result(
for k, v in site.get_params.items():
url_probe += f"&{k}={v}"
if site.request_method:
request_method = site.request_method.lower()
elif site.check_type == "status_code" and site.request_head_only:
if site.check_type == "status_code" and site.request_head_only:
# In most cases when we are detecting by status code,
# it is not necessary to get the entire body: we can
# detect fine with just the HEAD response.
@@ -532,15 +499,6 @@ def make_site_result(
# not respond properly unless we request the whole page.
request_method = 'get'
payload = None
if site.request_payload:
payload = {}
for k, v in site.request_payload.items():
if isinstance(v, str):
payload[k] = v.format(username=username)
else:
payload[k] = v
if site.check_type == "response_url":
# Site forwards request to a different URL if username not
# found. Disallow the redirect so we can capture the
@@ -557,7 +515,6 @@ def make_site_result(
headers=headers,
allow_redirects=allow_redirects,
timeout=options['timeout'],
payload=payload,
)
# Store future request object in the results object
@@ -584,39 +541,6 @@ async def check_site_for_username(
return site.name, default_result
response = await checker.check()
html_text = response[0] if response and response[0] else ""
# Retry once after token-style activation (e.g. Twitter guest token refresh).
act = site.activation
if act and html_text:
marks = act.get("marks") or []
if marks and any(m in html_text for m in marks):
method = act["method"]
try:
activate_fun = getattr(ParsingActivator(), method)
activate_fun(site, logger)
except AttributeError as e:
logger.warning(
f"Activation method {method} for site {site.name} not found!",
exc_info=True,
)
except Exception as e:
logger.warning(
f"Failed activation {method} for site {site.name}: {str(e)}",
exc_info=True,
)
else:
merged = dict(checker.headers or {})
merged.update(site.headers)
checker.prepare(
url=checker.url,
headers=merged,
allow_redirects=checker.allow_redirects,
timeout=checker.timeout,
method=checker.method,
payload=getattr(checker, 'payload', None),
)
response = await checker.check()
response_result = process_site_result(
response, query_notify, logger, default_result, site
@@ -864,21 +788,9 @@ async def site_self_check(
i2p_proxy=None,
skip_errors=False,
cookies=None,
auto_disable=False,
diagnose=False,
):
"""
Self-check a site configuration.
Args:
auto_disable: If True, automatically disable sites that fail checks.
If False (default), only report issues without disabling.
diagnose: If True, print detailed diagnosis information.
"""
changes = {
"disabled": False,
"issues": [],
"recommendations": [],
}
check_data = [
@@ -888,8 +800,6 @@ async def site_self_check(
logger.info(f"Checking {site.name}...")
results_cache = {}
for username, status in check_data:
async with semaphore:
results_dict = await maigret(
@@ -911,20 +821,15 @@ async def site_self_check(
# TODO: make normal checking
if site.name not in results_dict:
logger.info(results_dict)
changes["issues"].append(f"Site {site.name} not in results (wrong id_type?)")
if auto_disable:
changes["disabled"] = True
changes["disabled"] = True
continue
logger.debug(results_dict)
result = results_dict[site.name]["status"]
results_cache[username] = results_dict[site.name]
if result.error and 'Cannot connect to host' in result.error.desc:
changes["issues"].append(f"Cannot connect to host")
if auto_disable:
changes["disabled"] = True
changes["disabled"] = True
site_status = result.status
@@ -932,8 +837,6 @@ async def site_self_check(
if site_status == MaigretCheckStatus.UNKNOWN:
msgs = site.absence_strs
etype = site.check_type
error_msg = f"Error checking {username}: {result.context}"
changes["issues"].append(error_msg)
logger.warning(
f"Error while searching {username} in {site.name}: {result.context}, {msgs}, type {etype}"
)
@@ -943,62 +846,28 @@ async def site_self_check(
if skip_errors:
pass
# don't disable in case of available username
elif status == MaigretCheckStatus.CLAIMED and auto_disable:
elif status == MaigretCheckStatus.CLAIMED:
changes["disabled"] = True
elif status == MaigretCheckStatus.CLAIMED:
changes["issues"].append(f"Claimed user '{username}' not detected as claimed")
logger.warning(
f"Not found `{username}` in {site.name}, must be claimed"
)
logger.info(results_dict[site.name])
if auto_disable:
changes["disabled"] = True
changes["disabled"] = True
else:
changes["issues"].append(f"Unclaimed user '{username}' detected as claimed")
logger.warning(f"Found `{username}` in {site.name}, must be available")
logger.info(results_dict[site.name])
if auto_disable:
changes["disabled"] = True
changes["disabled"] = True
logger.info(f"Site {site.name} checking is finished")
# Generate recommendations based on issues
if changes["issues"] and len(results_cache) == 2:
claimed_result = results_cache.get(site.username_claimed, {})
unclaimed_result = results_cache.get(site.username_unclaimed, {})
claimed_http = claimed_result.get("http_status")
unclaimed_http = unclaimed_result.get("http_status")
if claimed_http and unclaimed_http:
if claimed_http != unclaimed_http and site.check_type != "status_code":
changes["recommendations"].append(
f"Consider checkType: status_code (HTTP {claimed_http} vs {unclaimed_http})"
)
# Print diagnosis if requested
if diagnose and changes["issues"]:
print(f"\n--- {site.name} DIAGNOSIS ---")
print(f" Check type: {site.check_type}")
print(f" Issues:")
for issue in changes["issues"]:
print(f" - {issue}")
if changes["recommendations"]:
print(f" Recommendations:")
for rec in changes["recommendations"]:
print(f" -> {rec}")
# Only modify site if auto_disable is enabled
if auto_disable and changes["disabled"] != site.disabled:
if changes["disabled"] != site.disabled:
site.disabled = changes["disabled"]
logger.info(f"Switching property 'disabled' for {site.name} to {site.disabled}")
db.update_site(site)
if not silent:
action = "Disabled" if site.disabled else "Enabled"
print(f"{action} site {site.name}...")
elif changes["issues"] and not silent and not diagnose:
# Report issues without disabling
print(f"Issues found in {site.name}: {len(changes['issues'])} (not auto-disabled)")
# remove service tag "unchecked"
if "unchecked" in site.tags:
@@ -1017,24 +886,10 @@ async def self_check(
proxy=None,
tor_proxy=None,
i2p_proxy=None,
auto_disable=False,
diagnose=False,
) -> dict:
"""
Run self-check on sites.
Args:
auto_disable: If True, automatically disable sites that fail checks.
If False (default), only report issues without disabling.
diagnose: If True, print detailed diagnosis for each failing site.
Returns:
dict with 'needs_update' bool and 'results' list of check results
"""
) -> bool:
sem = asyncio.Semaphore(max_connections)
tasks = []
all_sites = site_data
all_results = []
def disabled_count(lst):
return len(list(filter(lambda x: x.disabled, lst)))
@@ -1046,18 +901,15 @@ async def self_check(
for _, site in all_sites.items():
check_coro = site_self_check(
site, logger, sem, db, silent, proxy, tor_proxy, i2p_proxy,
skip_errors=True, auto_disable=auto_disable, diagnose=diagnose
site, logger, sem, db, silent, proxy, tor_proxy, i2p_proxy, skip_errors=True
)
future = asyncio.ensure_future(check_coro)
tasks.append((site.name, future))
tasks.append(future)
if tasks:
with alive_bar(len(tasks), title='Self-checking', force_tty=True) as progress:
for site_name, f in tasks:
result = await f
result['site_name'] = site_name
all_results.append(result)
for f in asyncio.as_completed(tasks):
await f
progress() # Update the progress bar
unchecked_new_count = len(
@@ -1066,10 +918,7 @@ async def self_check(
disabled_new_count = disabled_count(all_sites.values())
total_disabled = disabled_new_count - disabled_old_count
# Count issues
total_issues = sum(1 for r in all_results if r.get('issues'))
if auto_disable and total_disabled:
if total_disabled:
if total_disabled >= 0:
message = "Disabled"
else:
@@ -1081,25 +930,11 @@ async def self_check(
f"{message} {total_disabled} ({disabled_old_count} => {disabled_new_count}) checked sites. "
"Run with `--info` flag to get more information"
)
elif total_issues and not silent:
print(f"\nFound issues in {total_issues} sites (auto-disable is OFF)")
print("Use --auto-disable to automatically disable failing sites")
print("Use --diagnose to see detailed diagnosis for each site")
if unchecked_new_count != unchecked_old_count:
print(f"Unchecked sites verified: {unchecked_old_count - unchecked_new_count}")
needs_update = total_disabled != 0 or unchecked_new_count != unchecked_old_count
# For backwards compatibility, return bool if auto_disable is True
if auto_disable:
return needs_update
return {
'needs_update': needs_update,
'results': all_results,
'total_issues': total_issues,
}
return total_disabled != 0 or unchecked_new_count != unchecked_old_count
def extract_ids_data(html_text, logger, site) -> Dict:
-3
View File
@@ -32,9 +32,6 @@ COMMON_ERRORS = {
'<title>Attention Required! | Cloudflare</title>': CheckError(
'Captcha', 'Cloudflare'
),
'<title>Just a moment</title>': CheckError(
'Bot protection', 'Cloudflare challenge page'
),
'Please stand by, while we are checking your browser': CheckError(
'Bot protection', 'Cloudflare'
),
+3 -39
View File
@@ -277,12 +277,6 @@ def setup_arguments_parser(settings: Settings):
filter_group.add_argument(
"--tags", dest="tags", default='', help="Specify tags of sites (see `--stats`)."
)
filter_group.add_argument(
"--exclude-tags",
dest="exclude_tags",
default='',
help="Specify tags to exclude from search (blacklist).",
)
filter_group.add_argument(
"--site",
action="append",
@@ -322,19 +316,7 @@ def setup_arguments_parser(settings: Settings):
"--self-check",
action="store_true",
default=settings.self_check_enabled,
help="Do self check for sites and database. Use --auto-disable to disable failing sites.",
)
modes_group.add_argument(
"--auto-disable",
action="store_true",
default=False,
help="With --self-check: automatically disable sites that fail checks.",
)
modes_group.add_argument(
"--diagnose",
action="store_true",
default=False,
help="With --self-check: print detailed diagnosis for each failing site.",
help="Do self check for sites and database and disable non-working ones.",
)
modes_group.add_argument(
"--stats",
@@ -538,11 +520,6 @@ async def main():
if args.tags:
args.tags = list(set(str(args.tags).split(',')))
if args.exclude_tags:
args.exclude_tags = list(set(str(args.exclude_tags).split(',')))
else:
args.exclude_tags = []
db_file = args.db_file \
if (args.db_file.startswith("http://") or args.db_file.startswith("https://")) \
else path.join(path.dirname(path.realpath(__file__)), args.db_file)
@@ -564,7 +541,6 @@ async def main():
get_top_sites_for_id = lambda x: db.ranked_sites_dict(
top=args.top_sites,
tags=args.tags,
excluded_tags=args.exclude_tags,
names=args.site_list,
disabled=args.use_disabled_sites,
id_type=x,
@@ -590,7 +566,7 @@ async def main():
query_notify.success(
f'Maigret sites database self-check started for {len(site_data)} sites...'
)
check_result = await self_check(
is_need_update = await self_check(
db,
site_data,
logger,
@@ -598,16 +574,7 @@ async def main():
max_connections=args.connections,
tor_proxy=args.tor_proxy,
i2p_proxy=args.i2p_proxy,
auto_disable=args.auto_disable,
diagnose=args.diagnose,
)
# Handle both old (bool) and new (dict) return types
if isinstance(check_result, dict):
is_need_update = check_result.get('needs_update', False)
else:
is_need_update = check_result
if is_need_update:
if input('Do you want to save changes permanently? [Yn]\n').lower() in (
'y',
@@ -644,10 +611,7 @@ async def main():
port = (
args.web if args.web else 5000
) # args.web is either the specified port or 5000 by default
# Host configuration: secure by default, but allow override via environment
host = os.getenv('FLASK_HOST', '127.0.0.1')
app.run(host=host, port=port)
app.run(port=port)
return
if usernames == {}:
+24272 -24700
View File
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -5,7 +5,7 @@ from typing import List
SETTINGS_FILES_PATHS = [
path.join(path.dirname(path.realpath(__file__)), "resources/settings.json"),
path.expanduser('~/.maigret/settings.json'),
'~/.maigret/settings.json',
path.join(os.getcwd(), 'settings.json'),
]
+2 -64
View File
@@ -65,10 +65,6 @@ class MaigretSite:
url_probe = None
# Type of check to perform
check_type = ""
# HTTP request method (GET, POST, HEAD, etc.)
request_method = ""
# HTTP request payload (for POST, PUT, etc.)
request_payload: Dict[str, Any] = {}
# Whether to only send HEAD requests (GET by default)
request_head_only = ""
# GET parameters to include in requests
@@ -141,8 +137,6 @@ class MaigretSite:
'regex_check',
'url_probe',
'check_type',
'request_method',
'request_payload',
'request_head_only',
'get_params',
'presense_strs',
@@ -324,7 +318,6 @@ class MaigretDatabase:
reverse=False,
top=sys.maxsize,
tags=[],
excluded_tags=[],
names=[],
disabled=True,
id_type="username",
@@ -332,30 +325,19 @@ class MaigretDatabase:
"""
Ranking and filtering of the sites list
When ``top`` is limited (not "all sites"), **mirrors** may be appended after
the Alexa-ranked slice. A mirror is any filtered site with a non-empty
``source`` field equal to the name of a site that appears in the first
``top`` positions of a **parent ranking** that includes disabled sites.
Thus mirrors such as third-party viewers (e.g. for Twitter or Instagram)
are still scanned when their parent platform ranks highly, even if the
official site is disabled and omitted from the main list.
Args:
reverse (bool, optional): Reverse the sorting order. Defaults to False.
top (int, optional): Maximum number of sites to return. Defaults to sys.maxsize.
tags (list, optional): List of tags to filter sites by (whitelist). Defaults to empty list.
excluded_tags (list, optional): List of tags to exclude sites by (blacklist). Defaults to empty list.
tags (list, optional): List of tags to filter sites by. Defaults to empty list.
names (list, optional): List of site names (or urls, see MaigretSite.__eq__) to filter by. Defaults to empty list.
disabled (bool, optional): Whether to include disabled sites. Defaults to True.
id_type (str, optional): Type of identifier to filter by. Defaults to "username".
Returns:
dict: Dictionary of filtered and ranked sites (base top slice plus mirrors),
with site names as keys and MaigretSite objects as values
dict: Dictionary of filtered and ranked sites, with site names as keys and MaigretSite objects as values
"""
normalized_names = list(map(str.lower, names))
normalized_tags = list(map(str.lower, tags))
normalized_excluded_tags = list(map(str.lower, excluded_tags))
is_name_ok = lambda x: x.name.lower() in normalized_names
is_source_ok = lambda x: x.source and x.source.lower() in normalized_names
@@ -369,22 +351,6 @@ class MaigretDatabase:
)
is_id_type_ok = lambda x: x.type == id_type
is_excluded_by_tag = lambda x: set(
map(str.lower, x.tags)
).intersection(set(normalized_excluded_tags))
is_excluded_by_engine = lambda x: (
isinstance(x.engine, str)
and x.engine.lower() in normalized_excluded_tags
)
is_excluded_by_protocol = lambda x: (
x.protocol and x.protocol in normalized_excluded_tags
)
is_not_excluded = lambda x: not excluded_tags or not (
is_excluded_by_tag(x)
or is_excluded_by_engine(x)
or is_excluded_by_protocol(x)
)
filter_tags_engines_fun = (
lambda x: not tags
or is_engine_ok(x)
@@ -395,7 +361,6 @@ class MaigretDatabase:
filter_fun = (
lambda x: filter_tags_engines_fun(x)
and is_not_excluded(x)
and filter_names_fun(x)
and is_disabled_needed(x)
and is_id_type_ok(x)
@@ -406,33 +371,6 @@ class MaigretDatabase:
sorted_list = sorted(
filtered_list, key=lambda x: x.alexa_rank, reverse=reverse
)[:top]
# Mirrors: sites whose `source` matches a parent platform that ranks in the
# top `top` by Alexa when disabled entries are included in the ranking pool
# (so e.g. Instagram can be a parent for Picuki even if Instagram is disabled).
if top < sys.maxsize and sorted_list:
filter_fun_ranking_parents = (
lambda x: filter_tags_engines_fun(x)
and is_not_excluded(x)
and filter_names_fun(x)
and is_id_type_ok(x)
)
ranking_pool = [s for s in self.sites if filter_fun_ranking_parents(s)]
sorted_parents = sorted(
ranking_pool, key=lambda x: x.alexa_rank, reverse=reverse
)[:top]
parent_names_lower = {s.name.lower() for s in sorted_parents}
base_names = {s.name for s in sorted_list}
def is_mirror(s) -> bool:
if not s.source or s.name in base_names:
return False
return s.source.lower() in parent_names_lower
mirrors = [s for s in filtered_list if is_mirror(s)]
mirrors.sort(key=lambda x: (x.alexa_rank, x.name))
sorted_list = list(sorted_list) + mirrors
return {site.name: site for site in sorted_list}
@property
+4 -14
View File
@@ -409,13 +409,8 @@ class Submitter:
self.logger.info('Domain is %s', domain_raw)
# check for existence
domain_re = re.compile(
r'://(www\.)?' + re.escape(domain_raw) + r'(/|$)'
)
matched_sites = list(
filter(
lambda x: domain_re.search(x.url_main + x.url), self.db.sites
)
filter(lambda x: domain_raw in x.url_main + x.url, self.db.sites)
)
if matched_sites:
@@ -453,14 +448,9 @@ class Submitter:
old_site = next(
(site for site in matched_sites if site.name == site_name), None
)
if old_site is None:
print(
f'{Fore.RED}[!] Site "{site_name}" not found in the matched list. Proceeding without updating an existing site.{Style.RESET_ALL}'
)
else:
print(
f'{Fore.GREEN}[+] We will update site "{old_site.name}" in case of success.{Style.RESET_ALL}'
)
print(
f'{Fore.GREEN}[+] We will update site "{old_site.name}" in case of success.{Style.RESET_ALL}'
)
# Check if the site check is ordinary or not
if old_site and (old_site.url_probe or old_site.activation):
+6 -17
View File
@@ -19,15 +19,14 @@ from maigret.sites import MaigretDatabase
from maigret.report import generate_report_context
app = Flask(__name__)
# Use environment variable for secret key, generate random one if not set
app.secret_key = os.getenv('FLASK_SECRET_KEY', os.urandom(24).hex())
app.secret_key = 'your-secret-key-here'
# add background job tracking
background_jobs = {}
job_results = {}
# Configuration
app.config["MAIGRET_DB_FILE"] = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'resources', 'data.json')
app.config["MAIGRET_DB_FILE"] = os.path.join('maigret', 'resources', 'data.json')
app.config["COOKIES_FILE"] = "cookies.txt"
app.config["UPLOAD_FOLDER"] = 'uploads'
app.config["REPORTS_FOLDER"] = os.path.abspath('/tmp/maigret_reports')
@@ -49,14 +48,12 @@ async def maigret_search(username, options):
top_sites = 999999999 # effectively all
tags = options.get('tags', [])
excluded_tags = options.get('excluded_tags', [])
site_list = options.get('site_list', [])
logger.info(f"Filtering sites by tags: {tags}, excluded: {excluded_tags}")
logger.info(f"Filtering sites by tags: {tags}")
sites = db.ranked_sites_dict(
top=top_sites,
tags=tags,
excluded_tags=excluded_tags,
names=site_list,
disabled=False,
id_type='username',
@@ -227,8 +224,7 @@ def search():
# Get selected tags - ensure it's a list
selected_tags = request.form.getlist('tags')
excluded_tags = request.form.getlist('excluded_tags')
logging.info(f"Selected tags: {selected_tags}, Excluded tags: {excluded_tags}")
logging.info(f"Selected tags: {selected_tags}")
options = {
'top_sites': request.form.get('top_sites') or '500',
@@ -243,14 +239,13 @@ def search():
'i2p_proxy': request.form.get('i2p_proxy', None) or None,
'permute': 'permute' in request.form,
'tags': selected_tags, # Pass selected tags as a list
'excluded_tags': excluded_tags, # Pass excluded tags as a list
'site_list': [
s.strip() for s in request.form.get('site', '').split(',') if s.strip()
],
}
logging.info(
f"Starting search for usernames: {usernames} with tags: {selected_tags}, excluded: {excluded_tags}"
f"Starting search for usernames: {usernames} with tags: {selected_tags}"
)
# Start background job
@@ -343,10 +338,4 @@ if __name__ == '__main__':
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
debug_mode = os.getenv('FLASK_DEBUG', 'False').lower() in ['true', '1', 't']
# Host configuration: secure by default
# Use 127.0.0.1 for local development, 0.0.0.0 only if explicitly set
host = os.getenv('FLASK_HOST', '127.0.0.1')
port = int(os.getenv('FLASK_PORT', '5000'))
app.run(host=host, port=port, debug=debug_mode)
app.run(debug=debug_mode)
+8 -145
View File
@@ -28,11 +28,6 @@
background-color: #28a745;
}
.tag.excluded {
background-color: #343a40;
text-decoration: line-through;
}
.tag:hover {
transform: translateY(-2px);
box-shadow: 0 2px 5px rgba(0, 0, 0, 0.2);
@@ -173,16 +168,7 @@
</div>
<div class="mb-3">
<label class="form-label">Tags (click to cycle: include → exclude → neutral)</label>
<div class="mb-2">
<small class="text-muted">
<span style="display:inline-block;width:12px;height:12px;background:#28a745;border-radius:50%;"></span> Included (whitelist)
&nbsp;&nbsp;
<span style="display:inline-block;width:12px;height:12px;background:#343a40;border-radius:50%;"></span> Excluded (blacklist)
&nbsp;&nbsp;
<span style="display:inline-block;width:12px;height:12px;background:#dc3545;border-radius:50%;"></span> Neutral
</small>
</div>
<label class="form-label">Tags (click to select)</label>
<div class="tag-cloud" id="tagCloud"></div>
<select multiple class="hidden-select" id="tags" name="tags">
<option value="gaming">Gaming</option>
@@ -244,89 +230,6 @@
<option value="q&a">Q&A</option>
<option value="crypto">Crypto</option>
<option value="ai">AI</option>
<!-- Country tags -->
<option value="ae" data-group="country">AE - United Arab Emirates</option>
<option value="ao" data-group="country">AO - Angola</option>
<option value="ar" data-group="country">AR - Argentina</option>
<option value="at" data-group="country">AT - Austria</option>
<option value="au" data-group="country">AU - Australia</option>
<option value="az" data-group="country">AZ - Azerbaijan</option>
<option value="bd" data-group="country">BD - Bangladesh</option>
<option value="be" data-group="country">BE - Belgium</option>
<option value="bg" data-group="country">BG - Bulgaria</option>
<option value="br" data-group="country">BR - Brazil</option>
<option value="by" data-group="country">BY - Belarus</option>
<option value="ca" data-group="country">CA - Canada</option>
<option value="ch" data-group="country">CH - Switzerland</option>
<option value="cl" data-group="country">CL - Chile</option>
<option value="cn" data-group="country">CN - China</option>
<option value="co" data-group="country">CO - Colombia</option>
<option value="cr" data-group="country">CR - Costa Rica</option>
<option value="cz" data-group="country">CZ - Czechia</option>
<option value="de" data-group="country">DE - Germany</option>
<option value="dk" data-group="country">DK - Denmark</option>
<option value="dz" data-group="country">DZ - Algeria</option>
<option value="ee" data-group="country">EE - Estonia</option>
<option value="eg" data-group="country">EG - Egypt</option>
<option value="es" data-group="country">ES - Spain</option>
<option value="eu" data-group="country">EU - European Union</option>
<option value="fi" data-group="country">FI - Finland</option>
<option value="fr" data-group="country">FR - France</option>
<option value="gb" data-group="country">GB - United Kingdom</option>
<option value="global" data-group="country">🌍 Global</option>
<option value="gr" data-group="country">GR - Greece</option>
<option value="hk" data-group="country">HK - Hong Kong</option>
<option value="hr" data-group="country">HR - Croatia</option>
<option value="hu" data-group="country">HU - Hungary</option>
<option value="id" data-group="country">ID - Indonesia</option>
<option value="ie" data-group="country">IE - Ireland</option>
<option value="il" data-group="country">IL - Israel</option>
<option value="in" data-group="country">IN - India</option>
<option value="ir" data-group="country">IR - Iran</option>
<option value="it" data-group="country">IT - Italy</option>
<option value="jp" data-group="country">JP - Japan</option>
<option value="kg" data-group="country">KG - Kyrgyzstan</option>
<option value="kr" data-group="country">KR - Korea</option>
<option value="kz" data-group="country">KZ - Kazakhstan</option>
<option value="la" data-group="country">LA - Laos</option>
<option value="lk" data-group="country">LK - Sri Lanka</option>
<option value="lt" data-group="country">LT - Lithuania</option>
<option value="ma" data-group="country">MA - Morocco</option>
<option value="md" data-group="country">MD - Moldova</option>
<option value="mg" data-group="country">MG - Madagascar</option>
<option value="mk" data-group="country">MK - North Macedonia</option>
<option value="mx" data-group="country">MX - Mexico</option>
<option value="ng" data-group="country">NG - Nigeria</option>
<option value="nl" data-group="country">NL - Netherlands</option>
<option value="no" data-group="country">NO - Norway</option>
<option value="ph" data-group="country">PH - Philippines</option>
<option value="pk" data-group="country">PK - Pakistan</option>
<option value="pl" data-group="country">PL - Poland</option>
<option value="pt" data-group="country">PT - Portugal</option>
<option value="re" data-group="country">RE - Réunion</option>
<option value="ro" data-group="country">RO - Romania</option>
<option value="rs" data-group="country">RS - Serbia</option>
<option value="ru" data-group="country">RU - Russia</option>
<option value="sa" data-group="country">SA - Saudi Arabia</option>
<option value="sd" data-group="country">SD - Sudan</option>
<option value="se" data-group="country">SE - Sweden</option>
<option value="sg" data-group="country">SG - Singapore</option>
<option value="sk" data-group="country">SK - Slovakia</option>
<option value="sv" data-group="country">SV - El Salvador</option>
<option value="th" data-group="country">TH - Thailand</option>
<option value="tn" data-group="country">TN - Tunisia</option>
<option value="tr" data-group="country">TR - Türkiye</option>
<option value="tw" data-group="country">TW - Taiwan</option>
<option value="ua" data-group="country">UA - Ukraine</option>
<option value="uk" data-group="country">UK - United Kingdom</option>
<option value="us" data-group="country">US - United States</option>
<option value="uz" data-group="country">UZ - Uzbekistan</option>
<option value="ve" data-group="country">VE - Venezuela</option>
<option value="vi" data-group="country">VI - Virgin Islands</option>
<option value="vn" data-group="country">VN - Viet Nam</option>
<option value="za" data-group="country">ZA - South Africa</option>
</select>
<select multiple class="hidden-select" id="excludedTags" name="excluded_tags">
</select>
</div>
</div>
@@ -389,66 +292,26 @@
}
document.addEventListener('DOMContentLoaded', function () {
// Tag cloud functionality with include/exclude (whitelist/blacklist) support
// Tag cloud functionality
const tagCloud = document.getElementById('tagCloud');
const hiddenSelect = document.getElementById('tags');
const excludedSelect = document.getElementById('excludedTags');
const allTags = Array.from(hiddenSelect.options).map(opt => ({
value: opt.value,
label: opt.text,
group: opt.dataset.group || 'category'
label: opt.text
}));
function updateTagSelects() {
// Clear and repopulate hidden selects based on tag states
Array.from(hiddenSelect.options).forEach(opt => opt.selected = false);
// Clear excluded select
excludedSelect.innerHTML = '';
document.querySelectorAll('#tagCloud .tag').forEach(tagEl => {
const val = tagEl.dataset.value;
if (tagEl.classList.contains('selected')) {
const option = Array.from(hiddenSelect.options).find(opt => opt.value === val);
if (option) option.selected = true;
} else if (tagEl.classList.contains('excluded')) {
const opt = document.createElement('option');
opt.value = val;
opt.selected = true;
excludedSelect.appendChild(opt);
}
});
}
let lastGroup = '';
allTags.forEach(tag => {
if (tag.group !== lastGroup && tag.group === 'country') {
const separator = document.createElement('div');
separator.style.cssText = 'width:100%;margin:8px 0 4px;padding:4px 0;border-top:1px solid rgba(0,0,0,0.15);font-size:13px;color:#666;';
separator.textContent = 'Countries';
tagCloud.appendChild(separator);
}
lastGroup = tag.group;
const tagElement = document.createElement('span');
tagElement.className = 'tag';
tagElement.textContent = tag.label;
tagElement.dataset.value = tag.value;
// Single click cycles: neutral -> included -> excluded -> neutral
tagElement.addEventListener('click', function (e) {
e.preventDefault();
if (this.classList.contains('selected')) {
// included -> excluded
this.classList.remove('selected');
this.classList.add('excluded');
} else if (this.classList.contains('excluded')) {
// excluded -> neutral
this.classList.remove('excluded');
} else {
// neutral -> included
this.classList.add('selected');
tagElement.addEventListener('click', function () {
const isSelected = this.classList.toggle('selected');
const option = Array.from(hiddenSelect.options).find(opt => opt.value === tag.value);
if (option) {
option.selected = isSelected;
}
updateTagSelects();
});
tagCloud.appendChild(tagElement);
Generated
+1122 -1575
View File
File diff suppressed because it is too large Load Diff
+2 -2
View File
@@ -1,5 +1,5 @@
maigret @ https://github.com/soxoj/maigret/archive/refs/heads/main.zip
pefile==2023.2.7 # do not bump while pyinstaller is 6.11.1, there is a conflict
psutil==7.1.3
pyinstaller==6.16.0
psutil==6.1.1
pyinstaller==6.11.1
pywin32-ctypes==0.2.3
+8 -9
View File
@@ -31,13 +31,13 @@ classifiers = [
# Install with dev dependencies:
# poetry install --with dev
python = "^3.10"
aiodns = ">=3,<5"
aiodns = "^3.0.0"
aiohttp = "^3.12.14"
aiohttp-socks = ">=0.10.1,<0.12.0"
aiohttp-socks = "^0.10.1"
arabic-reshaper = "^3.0.0"
async-timeout = "^5.0.1"
attrs = ">=25.3,<27.0"
certifi = ">=2025.6.15,<2027.0.0"
attrs = "^25.3.0"
certifi = "^2025.6.15"
chardet = "^5.0.0"
colorama = "^0.4.6"
future = "^1.0.0"
@@ -55,7 +55,6 @@ PySocks = "^1.7.1"
python-bidi = "^0.6.3"
requests = "^2.32.4"
requests-futures = "^1.0.2"
requests-toolbelt = "^1.0.0"
six = "^1.17.0"
socid-extractor = "^0.0.27"
soupsieve = "^2.6"
@@ -80,16 +79,16 @@ platformdirs = "^4.3.8"
# How to add a new dev dependency: poetry add black --group dev
# Install dev dependencies with: poetry install --with dev
flake8 = "^7.1.1"
pytest = ">=8.3.4,<10.0.0"
pytest = "^8.3.4"
pytest-asyncio = "^1.0.0"
pytest-cov = ">=6,<8"
pytest-cov = "^6.0.0"
pytest-httpserver = "^1.0.0"
pytest-rerunfailures = ">=15.1,<17.0"
pytest-rerunfailures = "^15.1"
reportlab = "^4.4.3"
mypy = "^1.14.1"
tuna = "^0.5.11"
coverage = "^7.9.2"
black = ">=25.1,<27.0"
black = "^25.1.0"
[tool.poetry.scripts]
# Run with: poetry run maigret <username>
-3
View File
@@ -1,3 +0,0 @@
[mutmut]
paths_to_mutate=maigret/
tests_dir=tests/
+2537 -2647
View File
File diff suppressed because it is too large Load Diff
-34
View File
@@ -5,13 +5,11 @@ from typing import Dict, Any
DEFAULT_ARGS: Dict[str, Any] = {
'all_sites': False,
'auto_disable': False,
'connections': 100,
'cookie_file': None,
'csv': False,
'db_file': 'resources/data.json',
'debug': False,
'diagnose': False,
'disable_extracting': False,
'disable_recursive_search': False,
'folderoutput': 'reports',
@@ -36,7 +34,6 @@ DEFAULT_ARGS: Dict[str, Any] = {
'site_list': [],
'stats': False,
'tags': '',
'exclude_tags': '',
'timeout': 30,
'tor_proxy': 'socks5://127.0.0.1:9050',
'i2p_proxy': 'http://127.0.0.1:4444',
@@ -106,34 +103,3 @@ def test_args_multiple_sites(argparser):
for arg in vars(args):
assert getattr(args, arg) == want_args[arg]
def test_args_exclude_tags(argparser):
args = argparser.parse_args('--exclude-tags porn,dating username'.split())
want_args = dict(DEFAULT_ARGS)
want_args.update(
{
'exclude_tags': 'porn,dating',
'username': ['username'],
}
)
for arg in vars(args):
assert getattr(args, arg) == want_args[arg]
def test_args_tags_with_exclude_tags(argparser):
args = argparser.parse_args('--tags coding --exclude-tags porn username'.split())
want_args = dict(DEFAULT_ARGS)
want_args.update(
{
'tags': 'coding',
'exclude_tags': 'porn',
'username': ['username'],
}
)
for arg in vars(args):
assert getattr(args, arg) == want_args[arg]
+84
View File
@@ -0,0 +1,84 @@
"""Tests for the close_invalid_telegram_prs utility."""
import unittest
import sys
import os
# Add the utils directory to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'utils'))
from close_invalid_telegram_prs import is_invalid_telegram_pr
class TestCloseInvalidTelegramPRs(unittest.TestCase):
"""Test cases for the invalid Telegram PR detection."""
def test_valid_invalid_telegram_pr_titles(self):
"""Test that valid invalid Telegram PR titles are correctly identified."""
valid_titles = [
"Invalid result https://t.me/someuser",
"invalid result https://t.me/channel123",
"Invalid Result https://t.me/bot_name",
"INVALID RESULT https://t.me/test",
"Invalid result https://t.me/user/123",
"Invalid result https://t.me/s/channel_name",
]
for title in valid_titles:
with self.subTest(title=title):
self.assertTrue(is_invalid_telegram_pr(title),
f"Title should be identified as invalid: {title}")
def test_invalid_telegram_pr_titles_not_matching(self):
"""Test that non-matching titles are correctly rejected."""
invalid_titles = [
"Valid result https://t.me/someuser", # "Valid" instead of "Invalid"
"Invalid results https://t.me/someuser", # "results" instead of "result"
"Invalid result http://t.me/someuser", # "http" instead of "https"
"Invalid result https://telegram.me/someuser", # Wrong domain
"Fix invalid result https://t.me/someuser", # Extra words before
"Invalid result for https://t.me/someuser", # Extra words in between
"Added telegram site", # Completely different
"Fix false positives", # Unrelated
"", # Empty title
"Invalid result", # Missing URL
"https://t.me/someuser", # Missing "Invalid result"
]
for title in invalid_titles:
with self.subTest(title=title):
self.assertFalse(is_invalid_telegram_pr(title),
f"Title should NOT be identified as invalid: {title}")
def test_whitespace_handling(self):
"""Test that whitespace is handled correctly."""
titles_with_whitespace = [
" Invalid result https://t.me/someuser ", # Leading/trailing spaces
"\tInvalid result https://t.me/someuser\t", # Tabs
"Invalid\tresult\thttps://t.me/someuser", # Tabs between words
"Invalid result https://t.me/someuser", # Multiple spaces
]
for title in titles_with_whitespace:
with self.subTest(title=title):
self.assertTrue(is_invalid_telegram_pr(title),
f"Title with whitespace should be identified: {title}")
def test_case_insensitive(self):
"""Test that the pattern matching is case insensitive."""
case_variations = [
"invalid result https://t.me/someuser",
"Invalid Result https://t.me/someuser",
"INVALID RESULT https://t.me/someuser",
"Invalid result https://T.ME/someuser",
"iNvAlId ReSuLt https://t.me/someuser",
]
for title in case_variations:
with self.subTest(title=title):
self.assertTrue(is_invalid_telegram_pr(title),
f"Case variation should be identified: {title}")
if __name__ == '__main__':
unittest.main()
+1 -3
View File
@@ -27,9 +27,7 @@ async def test_self_check_db(test_db):
assert test_db.sites_dict['ValidActive'].disabled is False
assert test_db.sites_dict['InvalidInactive'].disabled is True
await self_check(
test_db, test_db.sites_dict, logger, silent=False, auto_disable=True
)
await self_check(test_db, test_db.sites_dict, logger, silent=False)
assert test_db.sites_dict['InvalidActive'].disabled is True
assert test_db.sites_dict['ValidInactive'].disabled is False
-53
View File
@@ -1,53 +0,0 @@
import unittest
from unittest.mock import patch, mock_open
from maigret.settings import Settings
class TestSettings(unittest.TestCase):
@patch('json.load')
@patch('builtins.open', new_callable=mock_open)
def test_settings_cascade_and_override(self, mock_file, mock_json_load):
file1_data = {"timeout": 10, "retries_count": 3, "proxy_url": "http://proxy1"}
file2_data = {"timeout": 20, "recursive_search": True}
file3_data = {"proxy_url": "http://proxy3", "print_not_found": False}
mock_json_load.side_effect = [file1_data, file2_data, file3_data]
settings = Settings()
paths = ['file1.json', 'file2.json', 'file3.json']
was_inited, msg = settings.load(paths)
self.assertTrue(was_inited)
self.assertEqual(settings.retries_count, 3)
self.assertEqual(settings.timeout, 20)
self.assertTrue(settings.recursive_search)
self.assertEqual(settings.proxy_url, "http://proxy3")
self.assertFalse(settings.print_not_found)
@patch('builtins.open')
def test_settings_file_not_found(self, mock_open_func):
mock_open_func.side_effect = FileNotFoundError()
settings = Settings()
paths = ['nonexistent.json']
was_inited, msg = settings.load(paths)
self.assertFalse(was_inited)
self.assertIn('None of the default settings files found', msg)
@patch('json.load')
@patch('builtins.open', new_callable=mock_open)
def test_settings_invalid_json(self, mock_file, mock_json_load):
mock_json_load.side_effect = ValueError("Expecting value")
settings = Settings()
paths = ['invalid.json']
was_inited, msg = settings.load(paths)
self.assertFalse(was_inited)
self.assertIsInstance(msg, ValueError)
self.assertIn('Problem with parsing json contents', str(msg))
-91
View File
@@ -182,97 +182,6 @@ def test_ranked_sites_dict_id_type():
assert len(db.ranked_sites_dict(id_type='gaia_id')) == 1
def test_ranked_sites_dict_excluded_tags():
db = MaigretDatabase()
db.update_site(MaigretSite('3', {'alexaRank': 1000, 'engine': 'ucoz'}))
db.update_site(MaigretSite('1', {'alexaRank': 2, 'tags': ['forum']}))
db.update_site(MaigretSite('2', {'alexaRank': 10, 'tags': ['ru', 'forum']}))
# excluding by tag
assert list(db.ranked_sites_dict(excluded_tags=['ru']).keys()) == ['1', '3']
assert list(db.ranked_sites_dict(excluded_tags=['forum']).keys()) == ['3']
# excluding by engine
assert list(db.ranked_sites_dict(excluded_tags=['ucoz']).keys()) == ['1', '2']
# combining include and exclude tags
assert list(db.ranked_sites_dict(tags=['forum'], excluded_tags=['ru']).keys()) == ['1']
# excluding non-existent tag has no effect
assert list(db.ranked_sites_dict(excluded_tags=['nonexistent']).keys()) == ['1', '2', '3']
# exclude all
assert list(db.ranked_sites_dict(excluded_tags=['forum', 'ucoz']).keys()) == []
def test_ranked_sites_dict_excluded_tags_with_top():
"""Excluded tags should also prevent mirrors from being included."""
db = MaigretDatabase()
db.update_site(
MaigretSite('Parent', {'alexaRank': 1, 'tags': ['forum'], 'type': 'username'})
)
db.update_site(
MaigretSite('Mirror', {'alexaRank': 999999, 'source': 'Parent', 'tags': ['forum'], 'type': 'username'})
)
db.update_site(
MaigretSite('Other', {'alexaRank': 2, 'tags': ['coding'], 'type': 'username'})
)
# Without exclusion, mirror should be included
result = db.ranked_sites_dict(top=1, id_type='username')
assert 'Parent' in result
assert 'Mirror' in result
# With exclusion of 'forum', both Parent and Mirror should be excluded
result = db.ranked_sites_dict(top=2, excluded_tags=['forum'], id_type='username')
assert 'Parent' not in result
assert 'Mirror' not in result
assert 'Other' in result
def test_ranked_sites_dict_mirrors_disabled_parent():
"""Mirror is included when parent ranks in top N but parent is disabled."""
db = MaigretDatabase()
db.update_site(
MaigretSite(
'ParentPlatform',
{'alexaRank': 5, 'disabled': True, 'type': 'username'},
)
)
db.update_site(
MaigretSite(
'OtherSite',
{'alexaRank': 100, 'type': 'username'},
)
)
db.update_site(
MaigretSite(
'MirrorSite',
{
'alexaRank': 99999999,
'source': 'ParentPlatform',
'type': 'username',
},
)
)
result = db.ranked_sites_dict(top=1, disabled=False, id_type='username')
assert list(result.keys()) == ['OtherSite', 'MirrorSite']
def test_ranked_sites_dict_mirrors_no_extra_without_parent_in_top():
db = MaigretDatabase()
db.update_site(MaigretSite('A', {'alexaRank': 1, 'type': 'username'}))
db.update_site(
MaigretSite(
'B',
{'alexaRank': 2, 'source': 'NotInDb', 'type': 'username'},
)
)
assert list(db.ranked_sites_dict(top=1, id_type='username').keys()) == ['A']
def test_get_url_template():
site = MaigretSite(
"test",
+1 -84
View File
@@ -1,10 +1,8 @@
import re
import pytest
from unittest.mock import MagicMock, patch
from maigret.submit import Submitter
from aiohttp import ClientSession
from maigret.sites import MaigretDatabase, MaigretSite
from maigret.sites import MaigretDatabase
import logging
@@ -277,84 +275,3 @@ async def test_dialog_adds_site_negative(settings):
await submitter.close()
assert result is False
def test_domain_matching_exact():
"""Test that domain matching uses proper boundary checks, not substring matching.
x.com should NOT match sites like 500px.com, mix.com, etc.
"""
domain_raw = "x.com"
domain_re = re.compile(
r'://(www\.)?' + re.escape(domain_raw) + r'(/|$)'
)
# These should NOT match x.com
non_matching = [
MaigretSite("500px", {"url": "https://500px.com/p/{username}", "urlMain": "https://500px.com/"}),
MaigretSite("Mix", {"url": "https://mix.com/{username}", "urlMain": "https://mix.com"}),
MaigretSite("Screwfix", {"url": "{urlMain}{urlSubpath}/members/?username={username}", "urlMain": "https://community.screwfix.com"}),
MaigretSite("Wix", {"url": "https://{username}.wix.com", "urlMain": "https://wix.com/"}),
MaigretSite("1x", {"url": "https://1x.com/{username}", "urlMain": "https://1x.com"}),
MaigretSite("Roblox", {"url": "https://www.roblox.com/user.aspx?username={username}", "urlMain": "https://www.roblox.com/"}),
]
for site in non_matching:
assert not domain_re.search(site.url_main + site.url), \
f"x.com should NOT match site {site.name} ({site.url_main})"
def test_domain_matching_positive():
"""Test that domain matching correctly matches the exact domain."""
domain_raw = "x.com"
domain_re = re.compile(
r'://(www\.)?' + re.escape(domain_raw) + r'(/|$)'
)
# These SHOULD match x.com
matching = [
MaigretSite("X", {"url": "https://x.com/{username}", "urlMain": "https://x.com"}),
MaigretSite("X-www", {"url": "https://www.x.com/{username}", "urlMain": "https://www.x.com"}),
]
for site in matching:
assert domain_re.search(site.url_main + site.url), \
f"x.com SHOULD match site {site.name} ({site.url_main})"
def test_dialog_nonexistent_site_name_no_crash():
"""Test that entering a site name not in the matched list doesn't crash.
This tests the fix for: AttributeError: 'NoneType' object has no attribute 'name'
The old_site should be None when user enters a name not in matched_sites,
and the code should handle it gracefully.
"""
# Simulate the logic that was crashing
matched_sites = [
MaigretSite("ValidActive", {"url": "https://example.com/{username}", "urlMain": "https://example.com"}),
MaigretSite("InvalidActive", {"url": "https://example.com/alt/{username}", "urlMain": "https://example.com"}),
]
site_name = "NonExistentSite"
old_site = next(
(site for site in matched_sites if site.name == site_name), None
)
# This is what the old code did - it would crash here
assert old_site is None
# The fix: check before accessing .name
if old_site is None:
result = "not found"
else:
result = old_site.name
assert result == "not found"
# And when site_name IS in matched_sites, it should work
site_name = "ValidActive"
old_site = next(
(site for site in matched_sites if site.name == site_name), None
)
assert old_site is not None
assert old_site.name == "ValidActive"
-63
View File
@@ -1,63 +0,0 @@
"""Tests for the Twitter / X site entry and GraphQL probe."""
import re
import pytest
import requests
from maigret.sites import MaigretSite
def _twitter_site(site: MaigretSite) -> None:
assert site.name == "Twitter"
assert site.disabled is False
assert site.check_type == "message"
assert site.url_probe and "{username}" in site.url_probe
assert "UserByScreenName" in site.url_probe or "graphql" in site.url_probe
assert site.regex_check
assert re.fullmatch(site.regex_check, site.username_claimed)
assert re.fullmatch(site.regex_check, site.username_unclaimed)
assert site.absence_strs
assert site.activation.get("method") == "twitter"
assert site.activation.get("url")
assert "authorization" in {k.lower() for k in site.headers.keys()}
def test_twitter_site_entry_config(default_db):
"""Twitter entry in data.json must define probe URL, regex, and activation."""
site = default_db.sites_dict["Twitter"]
assert isinstance(site, MaigretSite)
_twitter_site(site)
@pytest.mark.slow
def test_twitter_graphql_probe_claimed_vs_unclaimed(default_db):
"""
Live check: guest activation + UserByScreenName GraphQL returns a user for
usernameClaimed and no user for usernameUnclaimed (same flow as urlProbe).
"""
site = default_db.sites_dict["Twitter"]
_twitter_site(site)
headers = dict(site.headers)
headers.pop("x-guest-token", None)
act = requests.post(site.activation["url"], headers=headers, timeout=45)
assert act.status_code == 200, act.text[:500]
body = act.json()
assert "guest_token" in body
headers["x-guest-token"] = body["guest_token"]
def fetch(username: str) -> dict:
url = site.url_probe.format(username=username)
resp = requests.get(url, headers=headers, timeout=45)
resp.raise_for_status()
return resp.json()
claimed_json = fetch(site.username_claimed)
assert "data" in claimed_json
assert claimed_json["data"].get("user") is not None
unclaimed_json = fetch(site.username_unclaimed)
data = unclaimed_json.get("data") or {}
assert data == {} or data.get("user") is None
-480
View File
@@ -1,480 +0,0 @@
#!/usr/bin/env python3
"""
Mass site checking utility for Maigret development.
Check top-N sites from data.json and generate a report.
Usage:
python utils/check_top_n.py --top 100 # Check top 100 sites
python utils/check_top_n.py --top 50 --parallel 10 # Check with 10 parallel requests
python utils/check_top_n.py --top 100 --output report.json
python utils/check_top_n.py --top 100 --fix # Auto-fix simple issues
"""
import argparse
import asyncio
import json
import sys
import time
from collections import defaultdict
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# Add parent dir for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
import aiohttp
except ImportError:
print("aiohttp not installed. Run: pip install aiohttp")
sys.exit(1)
class Colors:
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
CYAN = "\033[96m"
RESET = "\033[0m"
BOLD = "\033[1m"
def color(text: str, c: str) -> str:
return f"{c}{text}{Colors.RESET}"
@dataclass
class SiteCheckResult:
"""Result of checking a single site."""
site_name: str
alexa_rank: int
disabled: bool
check_type: str
# Status
status: str = "unknown" # working, broken, timeout, error, anti_bot, disabled
# HTTP results
claimed_http_status: Optional[int] = None
unclaimed_http_status: Optional[int] = None
claimed_error: Optional[str] = None
unclaimed_error: Optional[str] = None
# Issues detected
issues: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
# Recommendations
recommendations: List[str] = field(default_factory=list)
# Timing
check_time_ms: int = 0
DEFAULT_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
}
async def check_url(url: str, headers: dict, timeout: int = 15) -> dict:
"""Quick URL check returning status and basic info."""
result = {
"status": None,
"final_url": None,
"content_length": 0,
"error": None,
"error_type": None,
"content": None,
"markers": {},
}
try:
connector = aiohttp.TCPConnector(ssl=False)
timeout_obj = aiohttp.ClientTimeout(total=timeout)
async with aiohttp.ClientSession(connector=connector, timeout=timeout_obj) as session:
async with session.get(url, headers=headers, allow_redirects=True) as resp:
result["status"] = resp.status
result["final_url"] = str(resp.url)
try:
text = await resp.text()
result["content_length"] = len(text)
result["content"] = text
text_lower = text.lower()
result["markers"] = {
"404_text": any(m in text_lower for m in ["not found", "404", "doesn't exist"]),
"captcha": any(m in text_lower for m in ["captcha", "recaptcha", "challenge"]),
"cloudflare": "cloudflare" in text_lower,
"login": any(m in text_lower for m in ["log in", "login", "sign in"]),
}
except Exception as e:
result["error"] = f"Content error: {e}"
result["error_type"] = "content"
except asyncio.TimeoutError:
result["error"] = "Timeout"
result["error_type"] = "timeout"
except aiohttp.ClientError as e:
result["error"] = str(e)
result["error_type"] = "client"
except Exception as e:
result["error"] = str(e)
result["error_type"] = "unknown"
return result
async def check_site(site_name: str, config: dict, timeout: int = 15) -> SiteCheckResult:
"""Check a single site and return detailed result."""
start_time = time.time()
result = SiteCheckResult(
site_name=site_name,
alexa_rank=config.get("alexaRank", 999999),
disabled=config.get("disabled", False),
check_type=config.get("checkType", "status_code"),
)
# Skip disabled sites
if result.disabled:
result.status = "disabled"
return result
# Build URL
url_template = config.get("url", "")
url_main = config.get("urlMain", "")
url_subpath = config.get("urlSubpath", "")
url_template = url_template.replace("{urlMain}", url_main).replace("{urlSubpath}", url_subpath)
claimed = config.get("usernameClaimed")
unclaimed = config.get("usernameUnclaimed", "noonewouldeverusethis7")
if not claimed:
result.status = "error"
result.issues.append("No usernameClaimed defined")
return result
# Prepare headers
headers = DEFAULT_HEADERS.copy()
if config.get("headers"):
headers.update(config["headers"])
# Check both URLs
url_claimed = url_template.replace("{username}", claimed)
url_unclaimed = url_template.replace("{username}", unclaimed)
try:
claimed_result, unclaimed_result = await asyncio.gather(
check_url(url_claimed, headers, timeout),
check_url(url_unclaimed, headers, timeout),
)
except Exception as e:
result.status = "error"
result.issues.append(f"Check failed: {e}")
return result
result.claimed_http_status = claimed_result["status"]
result.unclaimed_http_status = unclaimed_result["status"]
result.claimed_error = claimed_result.get("error")
result.unclaimed_error = unclaimed_result.get("error")
# Categorize result
if claimed_result["error_type"] == "timeout" or unclaimed_result["error_type"] == "timeout":
result.status = "timeout"
result.issues.append("Request timeout")
elif claimed_result["status"] == 403 or claimed_result["status"] == 429:
result.status = "anti_bot"
result.issues.append(f"Anti-bot protection (HTTP {claimed_result['status']})")
elif claimed_result.get("markers", {}).get("captcha"):
result.status = "anti_bot"
result.issues.append("Captcha detected")
elif claimed_result.get("markers", {}).get("cloudflare"):
result.status = "anti_bot"
result.warnings.append("Cloudflare protection detected")
elif claimed_result["error"] or unclaimed_result["error"]:
result.status = "error"
if claimed_result["error"]:
result.issues.append(f"Claimed error: {claimed_result['error']}")
if unclaimed_result["error"]:
result.issues.append(f"Unclaimed error: {unclaimed_result['error']}")
else:
# Validate check type
check_type = config.get("checkType", "status_code")
if check_type == "status_code":
if claimed_result["status"] == unclaimed_result["status"]:
result.status = "broken"
result.issues.append(f"Same status code ({claimed_result['status']}) for both")
# Suggest fix
if claimed_result["final_url"] != unclaimed_result["final_url"]:
result.recommendations.append("Switch to checkType: response_url")
else:
result.status = "working"
elif check_type == "response_url":
if claimed_result["final_url"] == unclaimed_result["final_url"]:
result.status = "broken"
result.issues.append("Same final URL for both")
if claimed_result["status"] != unclaimed_result["status"]:
result.recommendations.append("Switch to checkType: status_code")
else:
result.status = "working"
elif check_type == "message":
presense_strs = config.get("presenseStrs", [])
absence_strs = config.get("absenceStrs", [])
claimed_content = claimed_result.get("content", "") or ""
unclaimed_content = unclaimed_result.get("content", "") or ""
presense_ok = not presense_strs or any(s in claimed_content for s in presense_strs)
absence_claimed = absence_strs and any(s in claimed_content for s in absence_strs)
absence_unclaimed = absence_strs and any(s in unclaimed_content for s in absence_strs)
if presense_strs and not presense_ok:
result.status = "broken"
result.issues.append(f"presenseStrs not found: {presense_strs}")
# Check if status_code would work
if claimed_result["status"] != unclaimed_result["status"]:
result.recommendations.append(f"Switch to checkType: status_code ({claimed_result['status']} vs {unclaimed_result['status']})")
elif absence_claimed:
result.status = "broken"
result.issues.append(f"absenceStrs found in claimed page")
elif absence_strs and not absence_unclaimed:
result.status = "broken"
result.warnings.append("absenceStrs not found in unclaimed page")
else:
result.status = "working"
else:
result.status = "unknown"
result.warnings.append(f"Unknown checkType: {check_type}")
result.check_time_ms = int((time.time() - start_time) * 1000)
return result
def load_sites(db_path: Path) -> Dict[str, dict]:
"""Load all sites from data.json."""
with open(db_path) as f:
data = json.load(f)
return data.get("sites", {})
def get_top_sites(sites: Dict[str, dict], n: int) -> List[Tuple[str, dict]]:
"""Get top N sites by Alexa rank."""
ranked = []
for name, config in sites.items():
rank = config.get("alexaRank", 999999)
ranked.append((name, config, rank))
ranked.sort(key=lambda x: x[2])
return [(name, config) for name, config, _ in ranked[:n]]
async def check_sites_batch(sites: List[Tuple[str, dict]], parallel: int = 5,
timeout: int = 15, progress_callback=None) -> List[SiteCheckResult]:
"""Check multiple sites with parallelism control."""
results = []
semaphore = asyncio.Semaphore(parallel)
async def check_with_semaphore(name, config, index):
async with semaphore:
if progress_callback:
progress_callback(index, len(sites), name)
return await check_site(name, config, timeout)
tasks = [
check_with_semaphore(name, config, i)
for i, (name, config) in enumerate(sites)
]
results = await asyncio.gather(*tasks)
return results
def print_progress(current: int, total: int, site_name: str):
"""Print progress indicator."""
pct = int(current / total * 100)
bar_width = 30
filled = int(bar_width * current / total)
bar = "" * filled + "" * (bar_width - filled)
print(f"\r[{bar}] {pct:3d}% ({current}/{total}) {site_name:<30}", end="", flush=True)
def generate_report(results: List[SiteCheckResult]) -> dict:
"""Generate a summary report from check results."""
report = {
"summary": {
"total": len(results),
"working": 0,
"broken": 0,
"disabled": 0,
"timeout": 0,
"anti_bot": 0,
"error": 0,
"unknown": 0,
},
"by_status": defaultdict(list),
"issues": [],
"recommendations": [],
}
for r in results:
report["summary"][r.status] = report["summary"].get(r.status, 0) + 1
report["by_status"][r.status].append(r.site_name)
if r.issues:
report["issues"].append({
"site": r.site_name,
"rank": r.alexa_rank,
"issues": r.issues,
})
if r.recommendations:
report["recommendations"].append({
"site": r.site_name,
"rank": r.alexa_rank,
"recommendations": r.recommendations,
})
return report
def print_report(report: dict, results: List[SiteCheckResult]):
"""Print a formatted report to console."""
summary = report["summary"]
print(f"\n{'='*60}")
print(f"{color('SITE CHECK REPORT', Colors.CYAN)}")
print(f"{'='*60}\n")
print(f"{color('SUMMARY:', Colors.BOLD)}")
print(f" Total sites checked: {summary['total']}")
print(f" {color('Working:', Colors.GREEN)} {summary['working']}")
print(f" {color('Broken:', Colors.RED)} {summary['broken']}")
print(f" {color('Disabled:', Colors.YELLOW)} {summary['disabled']}")
print(f" {color('Timeout:', Colors.YELLOW)} {summary['timeout']}")
print(f" {color('Anti-bot:', Colors.YELLOW)} {summary['anti_bot']}")
print(f" {color('Error:', Colors.RED)} {summary['error']}")
# Broken sites
if report["by_status"]["broken"]:
print(f"\n{color('BROKEN SITES:', Colors.RED)}")
for site in report["by_status"]["broken"][:20]:
r = next(x for x in results if x.site_name == site)
print(f" - {site} (rank {r.alexa_rank}): {', '.join(r.issues)}")
if len(report["by_status"]["broken"]) > 20:
print(f" ... and {len(report['by_status']['broken']) - 20} more")
# Timeout sites
if report["by_status"]["timeout"]:
print(f"\n{color('TIMEOUT SITES:', Colors.YELLOW)}")
for site in report["by_status"]["timeout"][:10]:
print(f" - {site}")
if len(report["by_status"]["timeout"]) > 10:
print(f" ... and {len(report['by_status']['timeout']) - 10} more")
# Anti-bot sites
if report["by_status"]["anti_bot"]:
print(f"\n{color('ANTI-BOT PROTECTED:', Colors.YELLOW)}")
for site in report["by_status"]["anti_bot"][:10]:
r = next(x for x in results if x.site_name == site)
print(f" - {site}: {', '.join(r.issues)}")
if len(report["by_status"]["anti_bot"]) > 10:
print(f" ... and {len(report['by_status']['anti_bot']) - 10} more")
# Recommendations
if report["recommendations"]:
print(f"\n{color('RECOMMENDATIONS:', Colors.CYAN)}")
for rec in report["recommendations"][:15]:
print(f" {rec['site']} (rank {rec['rank']}):")
for r in rec["recommendations"]:
print(f" -> {r}")
if len(report["recommendations"]) > 15:
print(f" ... and {len(report['recommendations']) - 15} more")
async def main():
parser = argparse.ArgumentParser(
description="Mass site checking for Maigret",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--top", "-n", type=int, default=100,
help="Check top N sites by Alexa rank (default: 100)")
parser.add_argument("--parallel", "-p", type=int, default=5,
help="Number of parallel requests (default: 5)")
parser.add_argument("--timeout", "-t", type=int, default=15,
help="Request timeout in seconds (default: 15)")
parser.add_argument("--output", "-o", help="Output JSON report to file")
parser.add_argument("--include-disabled", action="store_true",
help="Include disabled sites in results")
parser.add_argument("--only-broken", action="store_true",
help="Only show broken sites")
parser.add_argument("--json", action="store_true",
help="Output as JSON only")
args = parser.parse_args()
# Load sites
db_path = Path(__file__).parent.parent / "maigret" / "resources" / "data.json"
if not db_path.exists():
print(f"Database not found: {db_path}")
sys.exit(1)
sites = load_sites(db_path)
top_sites = get_top_sites(sites, args.top)
if not args.json:
print(f"Checking top {len(top_sites)} sites (parallel={args.parallel}, timeout={args.timeout}s)...")
print()
# Run checks
progress = print_progress if not args.json else None
results = await check_sites_batch(top_sites, args.parallel, args.timeout, progress)
if not args.json:
print() # Clear progress line
# Filter results
if not args.include_disabled:
results = [r for r in results if r.status != "disabled"]
if args.only_broken:
results = [r for r in results if r.status in ("broken", "error", "timeout")]
# Generate report
report = generate_report(results)
# Output
if args.json:
output = {
"report": report,
"results": [asdict(r) for r in results],
}
print(json.dumps(output, indent=2))
else:
print_report(report, results)
# Save to file
if args.output:
output = {
"report": report,
"results": [asdict(r) for r in results],
}
with open(args.output, "w") as f:
json.dump(output, f, indent=2)
print(f"\nReport saved to: {args.output}")
if __name__ == "__main__":
asyncio.run(main())
+205
View File
@@ -0,0 +1,205 @@
#!/usr/bin/env python3
"""
Utility script to close pull requests with titles matching "Invalid result https://t.me/..."
This script identifies and closes PRs that follow the pattern of invalid telegram results,
which are typically auto-generated or spam PRs that should not be processed.
"""
import argparse
import os
import re
import sys
from typing import List, Optional
try:
import requests
except ImportError:
print("Error: requests library is required. Install with: pip install requests")
sys.exit(1)
class GitHubAPI:
"""Simple GitHub API wrapper for managing pull requests."""
def __init__(self, token: str, owner: str, repo: str):
self.token = token
self.owner = owner
self.repo = repo
self.base_url = "https://api.github.com"
self.headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
def get_open_prs(self) -> List[dict]:
"""Get all open pull requests."""
url = f"{self.base_url}/repos/{self.owner}/{self.repo}/pulls"
params = {"state": "open", "per_page": 100}
all_prs = []
page = 1
while True:
params["page"] = page
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
prs = response.json()
if not prs:
break
all_prs.extend(prs)
page += 1
return all_prs
def close_pr(self, pr_number: int, comment: Optional[str] = None) -> bool:
"""Close a pull request with an optional comment."""
try:
# Add comment if provided
if comment:
comment_url = f"{self.base_url}/repos/{self.owner}/{self.repo}/issues/{pr_number}/comments"
comment_data = {"body": comment}
response = requests.post(comment_url, headers=self.headers, json=comment_data)
response.raise_for_status()
# Close the PR
close_url = f"{self.base_url}/repos/{self.owner}/{self.repo}/pulls/{pr_number}"
close_data = {"state": "closed"}
response = requests.patch(close_url, headers=self.headers, json=close_data)
response.raise_for_status()
return True
except requests.RequestException as e:
print(f"Error closing PR #{pr_number}: {e}")
return False
def is_invalid_telegram_pr(title: str) -> bool:
"""
Check if a PR title matches the pattern "Invalid result https://t.me/..."
Args:
title: The PR title to check
Returns:
True if the title matches the pattern, False otherwise
"""
# Pattern: "Invalid result https://t.me/..." (case insensitive)
pattern = r"^invalid\s+result\s+https://t\.me/.*"
return bool(re.match(pattern, title.strip(), re.IGNORECASE))
def find_invalid_telegram_prs(github_api: GitHubAPI) -> List[dict]:
"""
Find all open PRs that match the invalid telegram pattern.
Args:
github_api: GitHub API wrapper instance
Returns:
List of PR dictionaries that match the pattern
"""
all_prs = github_api.get_open_prs()
matching_prs = []
for pr in all_prs:
if is_invalid_telegram_pr(pr["title"]):
matching_prs.append(pr)
return matching_prs
def main():
"""Main function to find and close invalid telegram PRs."""
parser = argparse.ArgumentParser(
description="Close pull requests with titles matching 'Invalid result https://t.me/...'"
)
parser.add_argument(
"--token",
required=False,
help="GitHub personal access token (or set GITHUB_TOKEN env var)"
)
parser.add_argument(
"--owner",
default="soxoj",
help="Repository owner (default: soxoj)"
)
parser.add_argument(
"--repo",
default="maigret",
help="Repository name (default: maigret)"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be closed without actually closing PRs"
)
parser.add_argument(
"--comment",
default="Automatically closing this PR as it appears to be an invalid result for a Telegram URL. "
"If this is a legitimate PR, please reopen it with a more descriptive title.",
help="Comment to add when closing PRs"
)
args = parser.parse_args()
# Get GitHub token
token = args.token or os.getenv("GITHUB_TOKEN")
if not token:
print("Error: GitHub token is required. Provide via --token or GITHUB_TOKEN env var")
sys.exit(1)
# Initialize GitHub API
try:
github_api = GitHubAPI(token, args.owner, args.repo)
except Exception as e:
print(f"Error initializing GitHub API: {e}")
sys.exit(1)
# Find matching PRs
print(f"Searching for PRs matching pattern in {args.owner}/{args.repo}...")
try:
matching_prs = find_invalid_telegram_prs(github_api)
except Exception as e:
print(f"Error fetching PRs: {e}")
sys.exit(1)
if not matching_prs:
print("No PRs found matching the pattern 'Invalid result https://t.me/...'")
return
print(f"Found {len(matching_prs)} PR(s) matching the pattern:")
for pr in matching_prs:
print(f" - PR #{pr['number']}: {pr['title']}")
print(f" Created by: {pr['user']['login']}")
print(f" URL: {pr['html_url']}")
print()
if args.dry_run:
print("Dry run mode: No PRs were actually closed.")
return
# Confirm before closing
response = input(f"Close {len(matching_prs)} PR(s)? [y/N]: ")
if response.lower() != 'y':
print("Cancelled.")
return
# Close PRs
closed_count = 0
for pr in matching_prs:
print(f"Closing PR #{pr['number']}: {pr['title']}")
if github_api.close_pr(pr['number'], args.comment):
closed_count += 1
print(f" ✓ Closed successfully")
else:
print(f" ✗ Failed to close")
print(f"\nClosed {closed_count} out of {len(matching_prs)} PRs.")
if __name__ == "__main__":
main()
-223
View File
@@ -1,223 +0,0 @@
#!/usr/bin/env python3
"""
Probe likely false-positive sites among the top-N Alexa-ranked entries.
For each of K random *distinct* usernames taken from ``usernameClaimed`` fields in
the Maigret database, runs a clean ``maigret`` scan (``--top-sites N --json simple|ndjson``).
Sites that return CLAIMED in *every* run are reported: unrelated random claimed
handles are unlikely to all exist on the same third-party site, so such sites are
candidates for broken checks.
"""
from __future__ import annotations
import argparse
import json
import random
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
def repo_root() -> Path:
return Path(__file__).resolve().parent.parent
def load_username_claimed_pool(db_path: Path) -> list[str]:
with db_path.open(encoding="utf-8") as f:
data = json.load(f)
sites = data.get("sites") or {}
seen: set[str] = set()
pool: list[str] = []
for _name, site in sites.items():
u = (site or {}).get("usernameClaimed")
if not u or not isinstance(u, str):
continue
u = u.strip()
if not u or u in seen:
continue
seen.add(u)
pool.append(u)
return pool
def run_maigret(
*,
username: str,
db_path: Path,
out_dir: Path,
top_sites: int,
json_format: str,
quiet: bool,
) -> Path:
"""Run maigret subprocess; return path to the written JSON report."""
safe = username.replace("/", "_")
report_name = f"report_{safe}_{json_format}.json"
report_path = out_dir / report_name
cmd = [
sys.executable,
"-m",
"maigret",
username,
"--db",
str(db_path),
"--top-sites",
str(top_sites),
"--json",
json_format,
"--folderoutput",
str(out_dir),
"--no-progressbar",
"--no-color",
"--no-recursion",
"--no-extracting",
]
sink = subprocess.DEVNULL if quiet else None
proc = subprocess.run(
cmd,
cwd=str(repo_root()),
text=True,
stdout=sink,
stderr=sink,
)
if proc.returncode != 0:
raise RuntimeError(
f"maigret exited with {proc.returncode} for username {username!r}"
)
if not report_path.is_file():
raise FileNotFoundError(f"Expected report missing: {report_path}")
return report_path
def claimed_sites_from_report(path: Path, json_format: str) -> set[str]:
if json_format == "simple":
with path.open(encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
return set()
return set(data.keys())
# ndjson: one object per line, each has "sitename"
sites: set[str] = set()
with path.open(encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
obj = json.loads(line)
name = obj.get("sitename")
if isinstance(name, str) and name:
sites.add(name)
return sites
def main() -> int:
parser = argparse.ArgumentParser(
description=(
"Pick random distinct usernameClaimed values, run maigret --top-sites N "
"with JSON reports, and list sites that claimed all of them (suspicious FP)."
)
)
parser.add_argument(
"--db",
"-b",
type=Path,
default=repo_root() / "maigret" / "resources" / "data.json",
help="Path to Maigret data.json (a temp copy is used for runs).",
)
parser.add_argument(
"--top-sites",
"-n",
type=int,
default=500,
metavar="N",
help="Value for maigret --top-sites (default: 500).",
)
parser.add_argument(
"--samples",
"-k",
type=int,
default=5,
metavar="K",
help="How many distinct random usernames to draw (default: 5).",
)
parser.add_argument(
"--seed",
type=int,
default=None,
help="RNG seed for reproducible username selection.",
)
parser.add_argument(
"--json",
dest="json_format",
default="simple",
choices=["simple", "ndjson"],
help="JSON report type passed to maigret -J (default: simple).",
)
parser.add_argument(
"--verbose",
"-v",
action="store_true",
default=False,
help="Print maigret stdout/stderr (default: suppress child output).",
)
args = parser.parse_args()
quiet = not args.verbose
db_src = args.db.resolve()
if not db_src.is_file():
print(f"Database not found: {db_src}", file=sys.stderr)
return 2
pool = load_username_claimed_pool(db_src)
if len(pool) < args.samples:
print(
f"Need at least {args.samples} distinct usernameClaimed entries, "
f"found {len(pool)}.",
file=sys.stderr,
)
return 2
rng = random.Random(args.seed)
picked = rng.sample(pool, args.samples)
print(f"Database: {db_src}")
print(f"--top-sites {args.top_sites}, {args.samples} random usernameClaimed:")
for i, u in enumerate(picked, 1):
print(f" {i}. {u}")
site_sets: list[set[str]] = []
with tempfile.TemporaryDirectory(prefix="maigret_fp_probe_") as tmp:
tmp_path = Path(tmp)
db_work = tmp_path / "data.json"
shutil.copyfile(db_src, db_work)
for u in picked:
print(f"\nRunning maigret for {u!r} ...", flush=True)
report = run_maigret(
username=u,
db_path=db_work,
out_dir=tmp_path,
top_sites=args.top_sites,
json_format=args.json_format,
quiet=quiet,
)
sites = claimed_sites_from_report(report, args.json_format)
site_sets.append(sites)
print(f" -> {len(sites)} positive site(s) in JSON", flush=True)
always = set.intersection(*site_sets) if site_sets else set()
print("\n--- Sites with CLAIMED in all runs (candidates for false positives) ---")
if not always:
print("(none)")
else:
for name in sorted(always):
print(name)
return 0
if __name__ == "__main__":
raise SystemExit(main())
-750
View File
@@ -1,750 +0,0 @@
#!/usr/bin/env python3
"""
Site check utility for Maigret development.
Quickly test site availability, find valid usernames, and diagnose check issues.
Usage:
python utils/site_check.py --site "SiteName" --check-claimed
python utils/site_check.py --site "SiteName" --maigret # Test via Maigret
python utils/site_check.py --site "SiteName" --compare-methods # aiohttp vs Maigret
python utils/site_check.py --url "https://example.com/user/{username}" --test "john"
python utils/site_check.py --site "SiteName" --find-user
python utils/site_check.py --site "SiteName" --diagnose # Full diagnosis
"""
import argparse
import asyncio
import json
import logging
import re
import sys
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# Add parent dir for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
import aiohttp
except ImportError:
print("aiohttp not installed. Run: pip install aiohttp")
sys.exit(1)
# Maigret imports (optional, for --maigret mode)
MAIGRET_AVAILABLE = False
try:
from maigret.sites import MaigretDatabase, MaigretSite
from maigret.checking import (
SimpleAiohttpChecker,
check_site_for_username,
process_site_result,
make_site_result,
)
from maigret.notify import QueryNotifyPrint
from maigret.result import QueryStatus
MAIGRET_AVAILABLE = True
except ImportError:
pass
DEFAULT_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
}
COMMON_USERNAMES = ["blue", "test", "admin", "user", "john", "alex", "david", "mike", "chris", "dan"]
class Colors:
"""ANSI color codes for terminal output."""
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
MAGENTA = "\033[95m"
CYAN = "\033[96m"
RESET = "\033[0m"
BOLD = "\033[1m"
def color(text: str, c: str) -> str:
"""Wrap text with color codes."""
return f"{c}{text}{Colors.RESET}"
async def check_url_aiohttp(url: str, headers: dict = None, follow_redirects: bool = True,
timeout: int = 15, ssl_verify: bool = False) -> dict:
"""Check a URL using aiohttp and return detailed response info."""
headers = headers or DEFAULT_HEADERS.copy()
result = {
"method": "aiohttp",
"url": url,
"status": None,
"final_url": None,
"redirects": [],
"content_length": 0,
"content": None,
"title": None,
"error": None,
"error_type": None,
"markers": {},
}
try:
connector = aiohttp.TCPConnector(ssl=ssl_verify)
timeout_obj = aiohttp.ClientTimeout(total=timeout)
async with aiohttp.ClientSession(connector=connector, timeout=timeout_obj) as session:
async with session.get(url, headers=headers, allow_redirects=follow_redirects) as resp:
result["status"] = resp.status
result["final_url"] = str(resp.url)
# Get redirect history
if resp.history:
result["redirects"] = [str(r.url) for r in resp.history]
# Read content
try:
text = await resp.text()
result["content_length"] = len(text)
result["content"] = text
# Extract title
title_match = re.search(r'<title>([^<]*)</title>', text, re.IGNORECASE)
if title_match:
result["title"] = title_match.group(1).strip()[:100]
# Check common markers
text_lower = text.lower()
markers = {
"404_text": any(m in text_lower for m in ["not found", "404", "doesn't exist", "does not exist"]),
"profile_markers": any(m in text_lower for m in ["profile", "user", "member", "account"]),
"error_markers": any(m in text_lower for m in ["error", "banned", "suspended", "blocked"]),
"login_required": any(m in text_lower for m in ["log in", "login", "sign in", "signin"]),
"captcha": any(m in text_lower for m in ["captcha", "recaptcha", "challenge", "verify you"]),
"cloudflare": "cloudflare" in text_lower or "cf-ray" in text_lower,
"rate_limit": any(m in text_lower for m in ["rate limit", "too many requests", "429"]),
}
result["markers"] = markers
# First 500 chars of body for inspection
result["body_preview"] = text[:500].replace("\n", " ").strip()
except Exception as e:
result["error"] = f"Content read error: {e}"
result["error_type"] = "content_error"
except asyncio.TimeoutError:
result["error"] = "Timeout"
result["error_type"] = "timeout"
except aiohttp.ClientError as e:
result["error"] = f"Client error: {e}"
result["error_type"] = "client_error"
except Exception as e:
result["error"] = f"Error: {e}"
result["error_type"] = "unknown"
return result
async def check_url_maigret(site: 'MaigretSite', username: str, logger=None) -> dict:
"""Check a URL using Maigret's checking mechanism."""
if not MAIGRET_AVAILABLE:
return {"error": "Maigret not available", "method": "maigret"}
if logger is None:
logger = logging.getLogger("site_check")
logger.setLevel(logging.WARNING)
result = {
"method": "maigret",
"url": None,
"status": None,
"status_str": None,
"http_status": None,
"final_url": None,
"error": None,
"error_type": None,
"ids_data": None,
}
try:
# Create query options
options = {
"parsing": False,
"cookie_jar": None,
"timeout": 15,
}
# Create a simple notifier
class SilentNotify:
def start(self, msg=None): pass
def update(self, status, similar=False): pass
def finish(self, msg=None, status=None): pass
notifier = SilentNotify()
# Run the check
site_name, site_result = await check_site_for_username(
site, username, options, logger, notifier
)
result["url"] = site_result.get("url_user")
result["status"] = site_result.get("status")
result["status_str"] = str(site_result.get("status"))
result["http_status"] = site_result.get("http_status")
result["ids_data"] = site_result.get("ids_data")
# Check for errors
status = site_result.get("status")
if status and hasattr(status, 'error') and status.error:
result["error"] = f"{status.error.type}: {status.error.desc}"
result["error_type"] = str(status.error.type)
except Exception as e:
result["error"] = str(e)
result["error_type"] = "exception"
return result
async def find_valid_username(url_template: str, usernames: list = None, headers: dict = None) -> Optional[str]:
"""Try common usernames to find one that works."""
usernames = usernames or COMMON_USERNAMES
headers = headers or DEFAULT_HEADERS.copy()
print(f"Testing {len(usernames)} usernames on {url_template}...")
for username in usernames:
url = url_template.replace("{username}", username)
result = await check_url_aiohttp(url, headers)
status = result["status"]
markers = result.get("markers", {})
# Good signs: 200 status, profile markers, no 404 text
if status == 200 and not markers.get("404_text") and markers.get("profile_markers"):
print(f" {color('[+]', Colors.GREEN)} {username}: status={status}, has profile markers")
return username
elif status == 200 and not markers.get("404_text"):
print(f" {color('[?]', Colors.YELLOW)} {username}: status={status}, might work")
else:
print(f" {color('[-]', Colors.RED)} {username}: status={status}")
return None
async def compare_users_aiohttp(url_template: str, claimed: str, unclaimed: str = "noonewouldeverusethis7",
headers: dict = None) -> Tuple[dict, dict]:
"""Compare responses for claimed vs unclaimed usernames using aiohttp."""
headers = headers or DEFAULT_HEADERS.copy()
print(f"\n{'='*60}")
print(f"Comparing: {color(claimed, Colors.GREEN)} vs {color(unclaimed, Colors.RED)}")
print(f"URL template: {url_template}")
print(f"Method: aiohttp")
print(f"{'='*60}\n")
url_claimed = url_template.replace("{username}", claimed)
url_unclaimed = url_template.replace("{username}", unclaimed)
result_claimed, result_unclaimed = await asyncio.gather(
check_url_aiohttp(url_claimed, headers),
check_url_aiohttp(url_unclaimed, headers)
)
def print_result(name, r, c):
print(f"--- {color(name, c)} ---")
print(f" URL: {r['url']}")
print(f" Status: {color(str(r['status']), Colors.GREEN if r['status'] == 200 else Colors.RED)}")
if r["redirects"]:
print(f" Redirects: {' -> '.join(r['redirects'])} -> {r['final_url']}")
print(f" Final URL: {r['final_url']}")
print(f" Content length: {r['content_length']}")
print(f" Title: {r['title']}")
if r["error"]:
print(f" Error: {color(r['error'], Colors.RED)}")
print(f" Markers: {r['markers']}")
print()
print_result(f"CLAIMED ({claimed})", result_claimed, Colors.GREEN)
print_result(f"UNCLAIMED ({unclaimed})", result_unclaimed, Colors.RED)
# Analysis
print(f"--- {color('ANALYSIS', Colors.CYAN)} ---")
recommendations = []
if result_claimed["status"] != result_unclaimed["status"]:
print(f" [!] Status codes differ: {result_claimed['status']} vs {result_unclaimed['status']}")
recommendations.append(("status_code", f"Status codes: {result_claimed['status']} vs {result_unclaimed['status']}"))
if result_claimed["final_url"] != result_unclaimed["final_url"]:
print(f" [!] Final URLs differ")
recommendations.append(("response_url", "Final URLs differ"))
if result_claimed["content_length"] != result_unclaimed["content_length"]:
diff = abs(result_claimed["content_length"] - result_unclaimed["content_length"])
print(f" [!] Content length differs by {diff} bytes")
recommendations.append(("message", f"Content differs by {diff} bytes"))
if result_claimed["title"] != result_unclaimed["title"]:
print(f" [!] Titles differ:")
print(f" Claimed: {result_claimed['title']}")
print(f" Unclaimed: {result_unclaimed['title']}")
recommendations.append(("message", f"Titles differ: '{result_claimed['title']}' vs '{result_unclaimed['title']}'"))
# Check for problems
if result_claimed.get("markers", {}).get("captcha"):
print(f" {color('[WARN]', Colors.YELLOW)} Captcha detected on claimed page")
if result_claimed.get("markers", {}).get("cloudflare"):
print(f" {color('[WARN]', Colors.YELLOW)} Cloudflare protection detected")
if result_claimed.get("markers", {}).get("login_required"):
print(f" {color('[WARN]', Colors.YELLOW)} Login may be required")
if recommendations:
print(f"\n {color('Recommended checkType:', Colors.BOLD)} {recommendations[0][0]}")
else:
print(f" {color('[!]', Colors.RED)} No clear difference found - site may need special handling")
return result_claimed, result_unclaimed
async def compare_methods(site: 'MaigretSite', claimed: str, unclaimed: str) -> dict:
"""Compare aiohttp vs Maigret results for the same site."""
if not MAIGRET_AVAILABLE:
print(color("Maigret not available for comparison", Colors.RED))
return {}
print(f"\n{'='*60}")
print(f"{color('METHOD COMPARISON', Colors.CYAN)}: aiohttp vs Maigret")
print(f"Site: {site.name}")
print(f"Claimed: {claimed}, Unclaimed: {unclaimed}")
print(f"{'='*60}\n")
# Build URL template
url_template = site.url
url_template = url_template.replace("{urlMain}", site.url_main or "")
url_template = url_template.replace("{urlSubpath}", getattr(site, 'url_subpath', '') or "")
headers = DEFAULT_HEADERS.copy()
if hasattr(site, 'headers') and site.headers:
headers.update(site.headers)
# Run all checks in parallel
url_claimed = url_template.replace("{username}", claimed)
url_unclaimed = url_template.replace("{username}", unclaimed)
aiohttp_claimed, aiohttp_unclaimed, maigret_claimed, maigret_unclaimed = await asyncio.gather(
check_url_aiohttp(url_claimed, headers),
check_url_aiohttp(url_unclaimed, headers),
check_url_maigret(site, claimed),
check_url_maigret(site, unclaimed),
)
def status_icon(status):
if status == 200:
return color("200", Colors.GREEN)
elif status == 404:
return color("404", Colors.YELLOW)
elif status and status >= 400:
return color(str(status), Colors.RED)
return str(status)
def maigret_status_icon(status_str):
if "Claimed" in str(status_str):
return color("Claimed", Colors.GREEN)
elif "Available" in str(status_str):
return color("Available", Colors.YELLOW)
else:
return color(str(status_str), Colors.RED)
print(f"{'Method':<12} {'Username':<25} {'HTTP Status':<12} {'Result':<20}")
print("-" * 70)
print(f"{'aiohttp':<12} {claimed:<25} {status_icon(aiohttp_claimed['status']):<20} {'OK' if not aiohttp_claimed['error'] else aiohttp_claimed['error'][:20]}")
print(f"{'aiohttp':<12} {unclaimed:<25} {status_icon(aiohttp_unclaimed['status']):<20} {'OK' if not aiohttp_unclaimed['error'] else aiohttp_unclaimed['error'][:20]}")
print(f"{'Maigret':<12} {claimed:<25} {status_icon(maigret_claimed.get('http_status')):<20} {maigret_status_icon(maigret_claimed.get('status_str'))}")
print(f"{'Maigret':<12} {unclaimed:<25} {status_icon(maigret_unclaimed.get('http_status')):<20} {maigret_status_icon(maigret_unclaimed.get('status_str'))}")
# Check for discrepancies
print(f"\n--- {color('DISCREPANCY ANALYSIS', Colors.CYAN)} ---")
issues = []
if aiohttp_claimed['status'] != maigret_claimed.get('http_status'):
issues.append(f"HTTP status mismatch for claimed: aiohttp={aiohttp_claimed['status']}, Maigret={maigret_claimed.get('http_status')}")
if aiohttp_unclaimed['status'] != maigret_unclaimed.get('http_status'):
issues.append(f"HTTP status mismatch for unclaimed: aiohttp={aiohttp_unclaimed['status']}, Maigret={maigret_unclaimed.get('http_status')}")
# Check Maigret detection correctness
claimed_detected = "Claimed" in str(maigret_claimed.get('status_str', ''))
unclaimed_detected = "Available" in str(maigret_unclaimed.get('status_str', ''))
if not claimed_detected:
issues.append(f"Maigret did NOT detect claimed user '{claimed}' as Claimed")
if not unclaimed_detected:
issues.append(f"Maigret did NOT detect unclaimed user '{unclaimed}' as Available")
if issues:
for issue in issues:
print(f" {color('[!]', Colors.RED)} {issue}")
else:
print(f" {color('[OK]', Colors.GREEN)} Both methods agree on results")
return {
"aiohttp_claimed": aiohttp_claimed,
"aiohttp_unclaimed": aiohttp_unclaimed,
"maigret_claimed": maigret_claimed,
"maigret_unclaimed": maigret_unclaimed,
"issues": issues,
}
async def diagnose_site(site_config: dict, site_name: str) -> dict:
"""Full diagnosis of a site configuration."""
print(f"\n{'='*60}")
print(f"{color('FULL SITE DIAGNOSIS', Colors.CYAN)}: {site_name}")
print(f"{'='*60}\n")
diagnosis = {
"site_name": site_name,
"issues": [],
"warnings": [],
"recommendations": [],
"working": False,
}
# 1. Config analysis
print(f"--- {color('1. CONFIGURATION', Colors.BOLD)} ---")
check_type = site_config.get("checkType", "status_code")
url = site_config.get("url", "")
url_main = site_config.get("urlMain", "")
claimed = site_config.get("usernameClaimed")
unclaimed = site_config.get("usernameUnclaimed", "noonewouldeverusethis7")
disabled = site_config.get("disabled", False)
print(f" checkType: {check_type}")
print(f" URL: {url}")
print(f" urlMain: {url_main}")
print(f" usernameClaimed: {claimed}")
print(f" disabled: {disabled}")
if disabled:
diagnosis["issues"].append("Site is disabled")
print(f" {color('[!]', Colors.YELLOW)} Site is disabled")
if not claimed:
diagnosis["issues"].append("No usernameClaimed defined")
print(f" {color('[!]', Colors.RED)} No usernameClaimed defined")
return diagnosis
# Build full URL
url_template = url.replace("{urlMain}", url_main).replace("{urlSubpath}", site_config.get("urlSubpath", ""))
headers = DEFAULT_HEADERS.copy()
if site_config.get("headers"):
headers.update(site_config["headers"])
# 2. Connectivity test
print(f"\n--- {color('2. CONNECTIVITY TEST', Colors.BOLD)} ---")
url_claimed = url_template.replace("{username}", claimed)
url_unclaimed = url_template.replace("{username}", unclaimed)
result_claimed, result_unclaimed = await asyncio.gather(
check_url_aiohttp(url_claimed, headers),
check_url_aiohttp(url_unclaimed, headers)
)
print(f" Claimed ({claimed}): status={result_claimed['status']}, error={result_claimed['error']}")
print(f" Unclaimed ({unclaimed}): status={result_unclaimed['status']}, error={result_unclaimed['error']}")
# Check for common problems
if result_claimed["error_type"] == "timeout":
diagnosis["issues"].append("Timeout on claimed username")
if result_unclaimed["error_type"] == "timeout":
diagnosis["issues"].append("Timeout on unclaimed username")
if result_claimed.get("markers", {}).get("cloudflare"):
diagnosis["warnings"].append("Cloudflare protection detected")
if result_claimed.get("markers", {}).get("captcha"):
diagnosis["warnings"].append("Captcha detected")
if result_claimed["status"] == 403:
diagnosis["issues"].append("403 Forbidden - possible anti-bot protection")
if result_claimed["status"] == 429:
diagnosis["issues"].append("429 Rate Limited")
# 3. Check type validation
print(f"\n--- {color('3. CHECK TYPE VALIDATION', Colors.BOLD)} ---")
if check_type == "status_code":
if result_claimed["status"] == result_unclaimed["status"]:
diagnosis["issues"].append(f"status_code check but same status ({result_claimed['status']}) for both")
print(f" {color('[FAIL]', Colors.RED)} Same status code for claimed and unclaimed: {result_claimed['status']}")
else:
print(f" {color('[OK]', Colors.GREEN)} Status codes differ: {result_claimed['status']} vs {result_unclaimed['status']}")
diagnosis["working"] = True
elif check_type == "response_url":
if result_claimed["final_url"] == result_unclaimed["final_url"]:
diagnosis["issues"].append("response_url check but same final URL for both")
print(f" {color('[FAIL]', Colors.RED)} Same final URL for both")
else:
print(f" {color('[OK]', Colors.GREEN)} Final URLs differ")
diagnosis["working"] = True
elif check_type == "message":
presense_strs = site_config.get("presenseStrs", [])
absence_strs = site_config.get("absenceStrs", [])
print(f" presenseStrs: {presense_strs}")
print(f" absenceStrs: {absence_strs}")
claimed_content = result_claimed.get("content", "") or ""
unclaimed_content = result_unclaimed.get("content", "") or ""
# Check presenseStrs
presense_found_claimed = any(s in claimed_content for s in presense_strs) if presense_strs else True
presense_found_unclaimed = any(s in unclaimed_content for s in presense_strs) if presense_strs else True
# Check absenceStrs
absence_found_claimed = any(s in claimed_content for s in absence_strs) if absence_strs else False
absence_found_unclaimed = any(s in unclaimed_content for s in absence_strs) if absence_strs else False
print(f" Claimed - presenseStrs found: {presense_found_claimed}, absenceStrs found: {absence_found_claimed}")
print(f" Unclaimed - presenseStrs found: {presense_found_unclaimed}, absenceStrs found: {absence_found_unclaimed}")
if presense_strs and not presense_found_claimed:
diagnosis["issues"].append(f"presenseStrs {presense_strs} not found in claimed page")
print(f" {color('[FAIL]', Colors.RED)} presenseStrs not found in claimed page")
if absence_strs and absence_found_claimed:
diagnosis["issues"].append(f"absenceStrs {absence_strs} found in claimed page (should not be)")
print(f" {color('[FAIL]', Colors.RED)} absenceStrs found in claimed page")
if absence_strs and not absence_found_unclaimed:
diagnosis["warnings"].append(f"absenceStrs not found in unclaimed page")
print(f" {color('[WARN]', Colors.YELLOW)} absenceStrs not found in unclaimed page")
if presense_found_claimed and not absence_found_claimed and absence_found_unclaimed:
print(f" {color('[OK]', Colors.GREEN)} Message check should work correctly")
diagnosis["working"] = True
# 4. Recommendations
print(f"\n--- {color('4. RECOMMENDATIONS', Colors.BOLD)} ---")
if not diagnosis["working"]:
# Suggest alternatives
if result_claimed["status"] != result_unclaimed["status"]:
diagnosis["recommendations"].append(f"Switch to checkType: status_code (status {result_claimed['status']} vs {result_unclaimed['status']})")
if result_claimed["final_url"] != result_unclaimed["final_url"]:
diagnosis["recommendations"].append("Switch to checkType: response_url")
if result_claimed["title"] != result_unclaimed["title"]:
diagnosis["recommendations"].append(f"Use title as marker: presenseStrs=['{result_claimed['title']}'] or absenceStrs=['{result_unclaimed['title']}']")
if diagnosis["recommendations"]:
for rec in diagnosis["recommendations"]:
print(f" -> {rec}")
elif diagnosis["working"]:
print(f" {color('Site appears to be working correctly', Colors.GREEN)}")
else:
print(f" {color('No clear fix found - site may need special handling or should be disabled', Colors.RED)}")
# Summary
print(f"\n--- {color('SUMMARY', Colors.BOLD)} ---")
if diagnosis["issues"]:
print(f" Issues: {len(diagnosis['issues'])}")
for issue in diagnosis["issues"]:
print(f" - {issue}")
if diagnosis["warnings"]:
print(f" Warnings: {len(diagnosis['warnings'])}")
for warn in diagnosis["warnings"]:
print(f" - {warn}")
print(f" Working: {color('YES', Colors.GREEN) if diagnosis['working'] else color('NO', Colors.RED)}")
return diagnosis
def load_site_from_db(site_name: str) -> Tuple[Optional[dict], Optional['MaigretSite']]:
"""Load site config from data.json. Returns (config_dict, MaigretSite or None)."""
db_path = Path(__file__).parent.parent / "maigret" / "resources" / "data.json"
with open(db_path) as f:
data = json.load(f)
config = None
if site_name in data["sites"]:
config = data["sites"][site_name]
else:
# Try case-insensitive search
for name, cfg in data["sites"].items():
if name.lower() == site_name.lower():
config = cfg
site_name = name
break
if not config:
return None, None
# Also load MaigretSite if available
maigret_site = None
if MAIGRET_AVAILABLE:
try:
db = MaigretDatabase().load_from_path(db_path)
maigret_site = db.sites_dict.get(site_name)
except Exception:
pass
return config, maigret_site
async def main():
parser = argparse.ArgumentParser(
description="Site check utility for Maigret development",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --site "VK" --check-claimed # Test site with aiohttp
%(prog)s --site "VK" --maigret # Test site with Maigret
%(prog)s --site "VK" --compare-methods # Compare aiohttp vs Maigret
%(prog)s --site "VK" --diagnose # Full diagnosis
%(prog)s --url "https://vk.com/{username}" --compare blue nobody123
%(prog)s --site "VK" --find-user # Find a valid username
"""
)
parser.add_argument("--site", "-s", help="Site name from data.json")
parser.add_argument("--url", "-u", help="URL template with {username}")
parser.add_argument("--test", "-t", help="Username to test")
parser.add_argument("--compare", "-c", nargs=2, metavar=("CLAIMED", "UNCLAIMED"),
help="Compare two usernames")
parser.add_argument("--find-user", "-f", action="store_true",
help="Find a valid username")
parser.add_argument("--check-claimed", action="store_true",
help="Check if claimed username still works (aiohttp)")
parser.add_argument("--maigret", "-m", action="store_true",
help="Test using Maigret's checker instead of aiohttp")
parser.add_argument("--compare-methods", action="store_true",
help="Compare aiohttp vs Maigret results")
parser.add_argument("--diagnose", "-d", action="store_true",
help="Full diagnosis of site configuration")
parser.add_argument("--headers", help="Custom headers as JSON")
parser.add_argument("--timeout", type=int, default=15, help="Request timeout in seconds")
parser.add_argument("--json", action="store_true", help="Output results as JSON")
args = parser.parse_args()
url_template = None
claimed = None
unclaimed = "noonewouldeverusethis7"
headers = DEFAULT_HEADERS.copy()
site_config = None
maigret_site = None
# Load from site name
if args.site:
site_config, maigret_site = load_site_from_db(args.site)
if not site_config:
print(f"Site '{args.site}' not found in database")
sys.exit(1)
url_template = site_config.get("url", "")
url_main = site_config.get("urlMain", "")
url_subpath = site_config.get("urlSubpath", "")
url_template = url_template.replace("{urlMain}", url_main).replace("{urlSubpath}", url_subpath)
claimed = site_config.get("usernameClaimed")
unclaimed = site_config.get("usernameUnclaimed", unclaimed)
if site_config.get("headers"):
headers.update(site_config["headers"])
if not args.json:
print(f"Loaded site: {args.site}")
print(f" URL: {url_template}")
print(f" Claimed: {claimed}")
print(f" CheckType: {site_config.get('checkType', 'unknown')}")
print(f" Disabled: {site_config.get('disabled', False)}")
# Override with explicit URL
if args.url:
url_template = args.url
# Custom headers
if args.headers:
headers.update(json.loads(args.headers))
# Actions
if args.diagnose:
if not site_config:
print("--diagnose requires --site")
sys.exit(1)
result = await diagnose_site(site_config, args.site)
if args.json:
print(json.dumps(result, indent=2, default=str))
elif args.compare_methods:
if not maigret_site:
if not MAIGRET_AVAILABLE:
print("Maigret imports not available")
else:
print("Could not load MaigretSite object")
sys.exit(1)
result = await compare_methods(maigret_site, claimed, unclaimed)
if args.json:
print(json.dumps(result, indent=2, default=str))
elif args.maigret:
if not maigret_site:
if not MAIGRET_AVAILABLE:
print("Maigret imports not available")
else:
print("Could not load MaigretSite object")
sys.exit(1)
print(f"\n--- Testing with Maigret ---")
for username in [claimed, unclaimed]:
result = await check_url_maigret(maigret_site, username)
print(f" {username}: status={result.get('status_str')}, http={result.get('http_status')}, error={result.get('error')}")
elif args.find_user:
if not url_template:
print("--find-user requires --site or --url")
sys.exit(1)
result = await find_valid_username(url_template, headers=headers)
if result:
print(f"\n{color('Found valid username:', Colors.GREEN)} {result}")
else:
print(f"\n{color('No valid username found', Colors.RED)}")
elif args.compare:
if not url_template:
print("--compare requires --site or --url")
sys.exit(1)
result = await compare_users_aiohttp(url_template, args.compare[0], args.compare[1], headers)
if args.json:
# Remove content field for JSON output (too large)
for r in result:
if isinstance(r, dict) and "content" in r:
del r["content"]
print(json.dumps(result, indent=2, default=str))
elif args.check_claimed and claimed:
result = await compare_users_aiohttp(url_template, claimed, unclaimed, headers)
elif args.test:
if not url_template:
print("--test requires --site or --url")
sys.exit(1)
url = url_template.replace("{username}", args.test)
result = await check_url_aiohttp(url, headers, timeout=args.timeout)
if "content" in result:
del result["content"] # Too large for display
print(json.dumps(result, indent=2, default=str))
else:
# Default: check claimed username if available
if url_template and claimed:
await compare_users_aiohttp(url_template, claimed, unclaimed, headers)
else:
parser.print_help()
if __name__ == "__main__":
asyncio.run(main())
+39 -50
View File
@@ -24,44 +24,36 @@ RANKS.update({
'100000000': '100M',
})
SEMAPHORE = threading.Semaphore(20)
import csv
import io
from urllib.parse import urlparse
def get_rank(domain_to_query, site, print_errors=True):
with SEMAPHORE:
# Retrieve ranking data via alexa API
url = f"http://data.alexa.com/data?cli=10&url={domain_to_query}"
xml_data = requests.get(url).text
root = ET.fromstring(xml_data)
def fetch_majestic_million():
print("Fetching Majestic Million CSV (this may take a few seconds)...")
ranks = {}
url = "https://downloads.majestic.com/majestic_million.csv"
try:
response = requests.get(url, stream=True)
response.raise_for_status()
csv_file = io.StringIO(response.text)
reader = csv.reader(csv_file)
next(reader) # skip headers
for row in reader:
if not row or len(row) < 3:
continue
rank = int(row[0])
domain = row[2].lower()
ranks[domain] = rank
except Exception as e:
logging.error(f"Error fetching Majestic Million: {e}")
print(f"Loaded {len(ranks)} domains from Majestic Million.")
return ranks
try:
#Get ranking for this site.
site.alexa_rank = int(root.find('.//REACH').attrib['RANK'])
# country = root.find('.//COUNTRY')
# if not country is None and country.attrib:
# country_code = country.attrib['CODE']
# tags = set(site.tags)
# if country_code:
# tags.add(country_code.lower())
# site.tags = sorted(list(tags))
# if site.type != 'username':
# site.disabled = False
except Exception as e:
if print_errors:
logging.error(e)
# We did not find the rank for some reason.
print(f"Error retrieving rank information for '{domain_to_query}'")
print(f" Returned XML is |{xml_data}|")
def get_base_domain(url):
try:
netloc = urlparse(url).netloc
if netloc.startswith('www.'):
netloc = netloc[4:]
return netloc.lower()
except Exception:
return ""
return
def get_step_rank(rank):
@@ -99,33 +91,30 @@ def main():
with open("sites.md", "w") as site_file:
site_file.write(f"""
## List of supported sites (search methods): total {len(sites_subset)}\n
Rank data fetched from Majestic Million by domains.
Rank data fetched from Alexa by domains.
""")
majestic_ranks = {}
if args.with_rank:
majestic_ranks = fetch_majestic_million()
for site in sites_subset:
if not args.with_rank:
break
url_main = site.url_main
if site.alexa_rank < sys.maxsize and args.empty_only:
continue
if args.exclude_engine_list and site.engine in args.exclude_engine_list:
continue
domain = get_base_domain(site.url_main)
if domain in majestic_ranks:
site.alexa_rank = majestic_ranks[domain]
else:
site.alexa_rank = sys.maxsize
# In memory matching complete, no threads to join
site.alexa_rank = 0
th = threading.Thread(target=get_rank, args=(url_main, site,))
pool.append((site.name, url_main, th))
th.start()
if args.with_rank:
print("Successfully updated ranks matching Majestic Million dataset.")
index = 1
for site_name, url_main, th in pool:
th.join()
sys.stdout.write("\r{0}".format(f"Updated {index} out of {len(sites_subset)} entries"))
sys.stdout.flush()
index = index + 1
sites_full_list = [(s, int(s.alexa_rank)) for s in sites_subset]