Compare commits

..

8 Commits

Author SHA1 Message Date
copilot-swe-agent[bot] b59179b4d5 Rebase: squash branch onto main with single Virgool data.json change 2026-03-24 22:01:31 +00:00
copilot-swe-agent[bot] a2142634e1 fix(virgool): enable virgool.io via POST user-existence API
Use existing POST support from main to enable virgool.io:
- checkType: message with presenseStrs for user_exist:true
- urlProbe: POST /api/v1.4/auth/user-existence
- requestMethod: POST with requestPayload for username lookup
- Content-Type: application/json header
- absenceStrs with Persian user not found message
- disabled flag removed — POST API bypasses JS cookie protection
2026-03-24 22:01:18 +00:00
copilot-swe-agent[bot] ab01dfce92 Merge remote-tracking branch 'origin/copilot/fix-broken-site-virgool' into copilot/fix-broken-site-virgool 2026-03-24 21:33:35 +00:00
copilot-swe-agent[bot] 64cca25b12 fix(virgool): use existing POST support from main to enable virgool.io via user-existence API
Co-authored-by: soxoj <31013580+soxoj@users.noreply.github.com>
Agent-Logs-Url: https://github.com/soxoj/maigret/sessions/e4d95115-25eb-44aa-b144-14d4bdc905c6
2026-03-24 21:32:45 +00:00
copilot-swe-agent[bot] 7ba2fd31ea refactor: move json import to module level and address review comments
Co-authored-by: soxoj <31013580+soxoj@users.noreply.github.com>
Agent-Logs-Url: https://github.com/soxoj/maigret/sessions/e7f4ab84-917a-49fc-bfbd-9bbaf76027f8
2026-03-24 21:21:32 +00:00
copilot-swe-agent[bot] 4d70f0f7c9 feat(virgool): add POST support and use user-existence API to bypass JS cookies
Co-authored-by: soxoj <31013580+soxoj@users.noreply.github.com>
Agent-Logs-Url: https://github.com/soxoj/maigret/sessions/e7f4ab84-917a-49fc-bfbd-9bbaf76027f8
2026-03-24 21:20:03 +00:00
copilot-swe-agent[bot] 2def9a2014 fix(virgool): switch from status_code to message check type with errors for JS cookies
The virgool.io site uses JS-generated cookies for anti-bot protection,
causing HTTP 200 for all URLs regardless of user existence. The
status_code check type always produced false positives.

Changes:
- Switch checkType from status_code to message so presence/absence
  strings are actually used for detection
- Add presenseStrs with profile-specific markers that won't match
  the JS cookie challenge page
- Add errors field to detect the JS cookie challenge page and report
  a meaningful error message
- Keep disabled: true as the site requires JS execution

Co-authored-by: soxoj <31013580+soxoj@users.noreply.github.com>
Agent-Logs-Url: https://github.com/soxoj/maigret/sessions/2df41b87-39e2-47c7-934a-f03106cef094
2026-03-22 21:00:40 +00:00
copilot-swe-agent[bot] 0615deab8b Initial plan 2026-03-22 20:36:33 +00:00
50 changed files with 5810 additions and 6018 deletions
-7
View File
@@ -1,10 +1,3 @@
#!/bin/sh
echo 'Activating update_sitesmd hook script...'
poetry run update_sitesmd
echo 'Regenerating db_meta.json...'
python3 utils/generate_db_meta.py
git add maigret/resources/db_meta.json
git add maigret/resources/data.json
git add sites.md
+15 -24
View File
@@ -1,30 +1,21 @@
name: Upload Python Package to PyPI when a Release is Published
name: Upload Python Package to PyPI when a Release is Created
on:
release:
types: [published]
jobs:
pypi-publish:
name: Publish release to PyPI
runs-on: ubuntu-latest
environment:
name: pypi
url: https://pypi.org/p/maigret
permissions:
types: [created]
push:
tags:
- "v*"
permissions:
id-token: write
contents: read
jobs:
build-and-publish:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.x"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install build
- name: Build package
run: |
python -m build
- name: Publish package distributions to PyPI
- uses: astral-sh/setup-uv@v3
- run: uv build
- name: Publish to PyPI (Trusted Publishing)
uses: pypa/gh-action-pypi-publish@release/v1
with:
packages-dir: dist
-3
View File
@@ -27,9 +27,6 @@ jobs:
pip3 install .
python3 ./utils/update_site_data.py --empty-only
- name: Regenerate db_meta.json
run: python3 utils/generate_db_meta.py
- name: Remove ambiguous main tag
run: git tag -d main || true
-1
View File
@@ -43,4 +43,3 @@ settings.json
# other
*.egg-info
build
LLM
+2 -1
View File
@@ -1,6 +1,7 @@
MIT License
Copyright (c) 2020-2026 Soxoj
Copyright (c) 2019 Sherlock Project
Copyright (c) 2020-2021 Soxoj
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
+452
View File
@@ -0,0 +1,452 @@
# Site checks — guide (Maigret)
Working document for future changes: workflow, findings from reviews, and practical steps. See also [`site-checks-playbook.md`](site-checks-playbook.md) (short checklist), [`socid_extractor_improvements.log`](socid_extractor_improvements.log) (proposals for upstream identity extraction), and the code in [`maigret/checking.py`](../maigret/checking.py).
**Documentation maintenance:** whenever you improve Maigret, add search tooling, or change check logic, update **this file** and [`site-checks-playbook.md`](site-checks-playbook.md) in sync (see the section at the end). If you change rules about the JSON API check or the `socid_extractor` log format, update **[`socid_extractor_improvements.log`](socid_extractor_improvements.log)** (template / header) together with this guide.
---
## 1. How checks work
Logic lives in `process_site_result` ([`maigret/checking.py`](../maigret/checking.py)):
| `checkType` | Meaning |
|-------------|---------|
| `message` | Profile is “found” if the HTML contains **none** of the `absenceStrs` substrings **and** at least one `presenseStrs` marker matches. If `presenseStrs` is **empty**, presence is treated as true for **any** page (risky configuration). |
| `status_code` | HTTP **2xx** is enough — only safe if the server does **not** return 200 for “user not found”. |
| `response_url` | Custom flow with **redirects disabled** so the status/URL of the *first* response can be used. |
For other `checkType` values, [`make_site_result`](../maigret/checking.py) sets **`allow_redirects=True`**: the client follows redirects and `process_site_result` sees the **final** response body and status (not the pre-redirect hop). You do **not** need to “turn on” follow-redirect separately for most sites.
Sites with an `engine` field (e.g. XenForo) are merged with a template from the `engines` section in [`maigret/resources/data.json`](../maigret/resources/data.json) ([`MaigretSite.update_from_engine`](../maigret/sites.py)).
### `urlProbe`: probe URL vs reported profile URL
- **`url`** — pattern for the **public profile page** users should open (what appears in reports as `url_user`). Supports `{username}`, `{urlMain}`, `{urlSubpath}`; the username segment is URL-encoded when the string is built ([`make_site_result`](../maigret/checking.py)).
- **`urlProbe`** (optional) — if set, Maigret sends the HTTP **GET** (or HEAD where applicable) to **this** URL for the check, instead of to `url`. Same placeholders. Use it when the reliable signal is a **JSON/API** endpoint but the human-facing link must stay on the main site (e.g. `https://picsart.com/u/{username}` + probe `https://api.picsart.com/users/show/{username}.json`, or GitHubs `https://github.com/{username}` + `https://api.github.com/users/{username}`).
If `urlProbe` is omitted, the probe URL defaults to `url`.
### Redirects and final URL as a signal
If the **HTML shell** looks the same for “user exists” and “user does not exist” (typical SPA), it is still worth checking whether the **server** behaves differently:
- **Final URL** after redirects (e.g. profile canonical URL vs `/404` path).
- **Redirect chain** length or target host (e.g. lander vs profile).
If that differs reliably, you may be able to use **`checkType`: `response_url`** in [`data.json`](../maigret/resources/data.json) (no auto-follow) or extend logic — but only when the difference is stable.
**Server-side HTTP vs client-side navigation.** Maigret follows **HTTP** redirects only; it does **not** run JavaScript. If the browser shows a navigation to `/u/name/posts` or `/not-found` **after** the SPA bundle loads, that may never appear as an extra hop in `curl`/aiohttp — only a **trailing-slash** `301` might show up. Always confirm with `curl -sIL` / a small script whether the **Location** chain differs for real vs fake users before relying on URL-based rules.
**Empirical check (claimed vs non-existent usernames, `GET` with follow redirects, no JS):**
| Site | Result |
|------|--------|
| **Kaskus** | No HTTP redirects beyond the request path; same generic `<title>` and near-identical body length — **no** discriminating signal from redirects alone. |
| **Bibsonomy** | Both requests redirect to **`/pow-challenge/?return=/user/...`** (proof-of-work). Only the `return` path changes with the username; **both** existing and fake hit the same challenge flow — not a profile-vs-missing distinction. |
| **Picsart (web UI `https://picsart.com/u/{username}`)** | Only a **trailing-slash** `301`; the first HTML is the same empty app shell (~3 KiB) for real and fake users. Browser-only routes such as `…/posts` vs `…/not-found` are **not** visible as additional HTTP redirects in this pipeline. |
**Picsart — workable check via public API.** The site exposes **`https://api.picsart.com/users/show/{username}.json`**: JSON with `"status":"success"` and a user object when the account exists, and `"reason":"user_not_found"` when it does not. Put that URL in **`urlProbe`**, set **`url`** to the web profile pattern **`https://picsart.com/u/{username}`**, and use **`checkType`: `message`** with narrow `presenseStrs` / `absenceStrs` so reports show the human link while the request hits the API (see **`urlProbe`** above).
For **Kaskus** and **Bibsonomy**, HTTP-level comparison still does **not** unlock a safe check without PoW / richer signals; keep **`disabled: true`** until something stable appears (API, SSR markers, etc.).
---
## 2. Standard checks: public JSON API and `socid_extractor` log
### 2.1 Public JSON API (always)
When diagnosing a site—especially **SPAs**, **soft 404s**, or **near-identical HTML** for real vs fake users—**routinely look for a public JSON (or JSON-like) API** used for profile or user lookup. Typical leads: paths containing `/api/`, `/v1/`, `graphql`, `users/show`, `.json` suffixes, or the same endpoints mobile apps use. Verify with `curl` (or the Maigret request path) that **claimed** and **unclaimed** usernames produce **reliably different** bodies or status codes. If such an endpoint is more stable than HTML, put it in **`urlProbe`** and keep **`url`** as the canonical profile page on the main site (see **`urlProbe`** in section 1). If there is no separate public URL for humans, you may still point **`url`** at the API only (reports will show that URL).
This is a **standard** part of site-check work, not an optional extra.
### 2.2 Mandatory: [`LLM/socid_extractor_improvements.log`](socid_extractor_improvements.log)
If you discover **either**:
1. **JSON embedded in HTML** with user/profile fields (inline scripts, `__NEXT_DATA__`, `application/ld+json`, hydration blobs, etc.), or
2. A **standalone JSON HTTP response** (public API) with user/profile data for that service,
you **must append** a proposal block to **[`LLM/socid_extractor_improvements.log`](socid_extractor_improvements.log)**.
**Why:** Maigret calls [`socid_extractor.extract`](https://pypi.org/project/socid-extractor/) on the response body ([`extract_ids_data` in `checking.py`](../maigret/checking.py)) to fill `ids_data`. New payloads usually need a **new scheme** upstream (`flags`, `regex`, optional `extract_json`, `fields`, optional `url_mutations` / `transforms`), matching patterns such as **`GitHub API`** or **`Gitlab API`** in `socid_extractor`s `schemes.py`.
**Each log entry must include:**
- **Date** — ISO `YYYY-MM-DD` (day you add the entry).
- **Example username** — Prefer the sites `usernameClaimed` from `data.json`, or any account that reproduces the payload.
- **Proposal** — Use the **block template** in the log file: detection idea, optional URL mutation, and field mappings in the same style as existing schemes.
If the service is **already covered** by an existing `socid_extractor` scheme, add a **short** entry anyway (date, example username, scheme name, “already implemented”) so there is an audit trail.
Do **not** paste secrets, cookies, or full private JSON; short key names and structure hints are enough.
---
## 3. Improvement workflow
### Phase A — Reproduce
1. Targeted run:
```bash
maigret --db /path/to/maigret/resources/data.json \
TEST_USERNAME \
--site "SiteName" \
--print-not-found --print-errors \
--no-progressbar -vv
```
2. Run separately with a **real** existing username and a **definitely non-existent** one (as `usernameClaimed` / `usernameUnclaimed` in JSON).
3. If needed: `-vvv` and `debug.log` (raw response).
4. Automated pair check:
```bash
maigret --db ... --self-check --site "SiteName" --no-progressbar
```
### Phase B — Classify the cause
| Symptom | Likely cause |
|---------|----------------|
| False “found” with `status_code` | Soft 404 (200 on a “not found” page). |
| False “found” with `message` | Overly broad `presenseStrs` (`name`, `email`, JSON keys) or stale `absenceStrs`. |
| Same HTML for different users | SPA / skeleton shell before hydration — also compare **final URL / redirect chain** (see above); if still identical, often `disabled`. |
| Login page instead of profile | XenForo etc.: guest, `ignore403`, “must be logged in” strings. |
| reCAPTCHA / “Checking your browser” / “not a bot” | Bot protection; Maigrets default User-Agent may worsen the response. |
| Redirect to another domain / lander | Stale URL template. |
### Phase C — Edits in [`data.json`](../maigret/resources/data.json)
1. Update `url` / `urlMain` if needed (HTTPS, new profile path).
2. Replace inappropriate `status_code` with `message` (or `response_url`), choosing:
- **`absenceStrs`** — only what reliably appears on the “user does not exist” page;
- **`presenseStrs`** — narrow markers of a real profile (avoid generic words).
3. For XenForo: override only fields that differ in the site entry; do not break the global `engines` template.
4. Refresh `usernameClaimed` / `usernameUnclaimed` if reference accounts disappeared.
5. Set **`headers`** (e.g. another `User-Agent`) if the site serves a captcha only to “suspicious” clients.
6. Use **`errors`**: HTML substring → meaningful check error (UNKNOWN), so it is not confused with “available”.
### Phase D — Decision criteria
| Outcome | When to use |
|---------|-------------|
| **Check fixed** | The `claimed` / `unclaimed` pair behaves predictably, `--self-check` passes, no regression on a similar site with the same engine. |
| **Check disabled** (`disabled: true`) | Cloudflare / anti-bot / login required / indistinguishable SPA without stable markers. |
| **Entry removed** | **Only** if the domain/service is gone (NXDOMAIN, clearly dead project), not “because it is hard to fix”. |
### Phase E — Before commit
- `maigret --self-check` for affected sites.
- `make test`.
---
## 4. Findings from reviews (concrete site batch)
Summary from an earlier false-positive review for: OpenSea, Mercado Livre, Redtube, Toms Guide, Kaggle, Kaskus, Livemaster, TechPowerUp, authorSTREAM, Bibsonomy, Bulbagarden, iXBT, Serebii, Picsart, Hashnode, hi5.
### What most often broke checks
1. **`status_code` where content checks are needed** — soft 404 with status 200.
2. **Broad `presenseStrs`** — matches on error pages or generic SPA shells.
3. **XenForo + guest** — HTML includes strings like “You must be logged in” that overlap the engine template.
4. **User-Agent** — on some sites (e.g. Kaggle) the default UA triggered a reCAPTCHA page instead of profile HTML; a deliberate `User-Agent` in site `headers` helped.
5. **SPAs and redirects** — identical first HTML, redirect to lander / another product (hi5 → Tagged), URL format changes by region (Mercado Livre).
### What worked as a fix
- Switching to **`message`** with narrow strings from **`<title>`** or unique markup where stable (**Kaggle**, **Mercado Livre**, **Hashnode**).
- For **Kaggle**, additionally: **`headers`**, **`errors`** for browser-check text.
- **Redtube** stayed valid on **`status_code`** with a stable **404** for non-existent users.
- **Picsart**: the web profile URL is a thin SPA shell; use the **JSON API** (`api.picsart.com/users/show/{username}.json`) in **`url`** with **`message`**-style markers (`"status":"success"` vs `user_not_found`), not the browser-only `/posts` vs `/not-found` navigation.
- For **Weblate / Anubis Anti-Bot**: Setting `headers` with a basic script User-Agent (e.g. `python-requests/2.25.1`) rather than the default browser UA completely bypassed the Anubis Proof-of-Work challenge HTTP 307 redirect, instantly recovering the native HTTP 404 framework.
### What required disabling checks
Where you **cannot** reliably tell “profile exists” from “no profile” without bypassing protection, login, or full JS:
- Anti-bot / captcha / “not a bot” page;
- Guest-only access to the needed page;
- SPA with indistinguishable first response;
- Forums returning **403** and a login page instead of a member profile for the member-search URL;
- Stale URLs that redirect to a stub.
In those cases **`disabled: true`** is better than false “found”; remove the DB entry only on **actual** domain death.
### Code notes
- For the `status_code` branch in `process_site_result`, use **strict** comparison `check_type == "status_code"`, not a substring match inside `"status_code"`.
- Treat empty `presenseStrs` with `message` as risky: when debugging, watch DEBUG-level logs if that diagnostics exists in code.
---
## 5. Future ideas (Maigret improvements)
- A mode or script: one site, two usernames, print statuses and first N bytes of the response (wrapper around `maigret()`).
- Document in CLI help that **`--use-disabled-sites`** is needed to analyze disabled entries.
---
## 6. Development utilities
### 6.1 `utils/site_check.py` — Single site diagnostics
A comprehensive utility for testing individual sites with multiple modes:
```bash
# Basic comparison of claimed vs unclaimed (aiohttp)
python utils/site_check.py --site "VK" --check-claimed
# Test via Maigret's checker directly
python utils/site_check.py --site "VK" --maigret
# Compare aiohttp vs Maigret results (find discrepancies)
python utils/site_check.py --site "VK" --compare-methods
# Full diagnosis with recommendations
python utils/site_check.py --site "VK" --diagnose
# Test with custom URL
python utils/site_check.py --url "https://example.com/{username}" --compare user1 user2
# Find a valid username for a site
python utils/site_check.py --site "VK" --find-user
```
**Key features:**
- `--maigret` — Uses Maigret's actual checking code, not raw aiohttp
- `--compare-methods` — Shows if aiohttp and Maigret see different results (useful for debugging)
- `--diagnose` — Validates checkType against actual responses, suggests fixes
- Color output with markers detection (captcha, cloudflare, login, etc.)
- `--json` flag for machine-readable output
**When to use each mode:**
| Mode | Use case |
|------|----------|
| `--check-claimed` | Quick sanity check: do claimed/unclaimed still differ? |
| `--maigret` | Verify Maigret's actual behavior matches expectations |
| `--compare-methods` | Debug "works in curl but fails in Maigret" issues |
| `--diagnose` | Full analysis when a site is broken, get fix recommendations |
### 6.2 `utils/check_top_n.py` — Mass site checking
Batch-check top N sites by Alexa rank with categorized reporting:
```bash
# Check top 100 sites
python utils/check_top_n.py --top 100
# Faster with more parallelism
python utils/check_top_n.py --top 100 --parallel 10
# Output JSON report
python utils/check_top_n.py --top 100 --output report.json
# Only show broken sites
python utils/check_top_n.py --top 100 --only-broken
```
**Output categories:**
- `working` — Site check passes
- `broken` — Check fails (wrong status, missing markers)
- `timeout` — Request timed out
- `anti_bot` — 403/429 or captcha detected
- `error` — Connection or other errors
- `disabled` — Already disabled in data.json
**Report includes:**
- Summary counts by category
- List of broken sites with issues
- Recommendations for fixes (e.g., "Switch to checkType: status_code")
### 6.3 Self-check behavior (`--self-check`)
The self-check command has been improved to be less aggressive:
```bash
# Check sites WITHOUT auto-disabling (default)
maigret --self-check --site "VK"
# Auto-disable failing sites (old behavior)
maigret --self-check --site "VK" --auto-disable
# Show detailed diagnosis for each failure
maigret --self-check --site "VK" --diagnose
```
**Behavior changes:**
| Flag | Effect |
|------|--------|
| `--self-check` alone | Reports issues but does NOT disable sites |
| `--auto-disable` | Automatically disables sites that fail (opt-in) |
| `--diagnose` | Prints detailed diagnosis with recommendations |
**Why this matters:**
- Old behavior was too aggressive — sites got disabled without explanation
- New behavior reports issues and suggests fixes
- Explicit `--auto-disable` required to modify database
---
## 7. Lessons learned (practical observations)
Collected from hands-on work fixing top-ranked sites (Reddit, Wikipedia, Microsoft Learn, Baidu, etc.).
### 7.1 JSON API is the first thing to look for
Both Reddit and Microsoft Learn had working public APIs that solved the problem entirely. The web pages were SPAs or blocked by anti-bot measures, but the APIs worked reliably:
- **Reddit**: `https://api.reddit.com/user/{username}/about` — returns JSON with user data or `{"message": "Not Found", "error": 404}`.
- **Microsoft Learn**: `https://learn.microsoft.com/api/profiles/{username}` — returns JSON with `userName` field or HTTP 404.
This confirms the playbook recommendation: always check for `/api/`, `.json`, GraphQL endpoints before giving up on a site.
### 7.2 `urlProbe` is a powerful tool
It separates "what we check" (API) from "what we show the user" (human-readable profile URL). Reddit is a perfect example:
```json
{
"url": "https://www.reddit.com/user/{username}",
"urlProbe": "https://api.reddit.com/user/{username}/about",
"checkType": "message",
"presenseStrs": ["\"name\":"],
"absenceStrs": ["Not Found"]
}
```
The check hits the API, but reports display `www.reddit.com/user/blue`.
### 7.3 aiohttp ≠ curl ≠ requests
Wikipedia returned HTTP 200 for `curl` and Python `requests`, but HTTP 403 for `aiohttp`. This is **TLS fingerprinting** — the server identifies the HTTP library by cryptographic characteristics of the TLS handshake, not by headers.
**Key insight:** Changing `User-Agent` does **not** help against TLS fingerprinting. Always test with aiohttp directly (or via Maigret with `-vvv` and `debug.log`), not just `curl`.
```python
# This returns 403 for Wikipedia even with browser UA:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers={"User-Agent": "Mozilla/5.0 ..."}) as resp:
print(resp.status) # 403
```
### 7.4 HTTP 403 in Maigret can mean different things
Initially it seemed Wikipedia was returning 403, but `curl` showed 200. Only `debug.log` revealed the real picture — aiohttp was getting blocked at TLS level.
**Lesson:** Use `-vvv` flag and inspect `debug.log` for raw response status and body. The warning message alone may be misleading.
### 7.5 Dead services migrate, not disappear
MSDN Social and TechNet profiles redirected to Microsoft Learn. Instead of deleting old entries:
1. Keep old entries with `disabled: true` as historical record.
2. Create a new entry for the current service with working API.
This preserves audit trail and avoids breaking existing workflows.
### 7.6 `status_code` is more reliable than `message` for APIs
Microsoft Learn API returns HTTP 404 for non-existent users — a clean signal without HTML parsing. For JSON APIs that return proper HTTP status codes, `status_code` is often the best choice:
```json
{
"checkType": "status_code",
"urlProbe": "https://learn.microsoft.com/api/profiles/{username}"
}
```
No need for fragile string matching when the API speaks HTTP correctly.
### 7.8 Engine templates can silently break across many sites
The **vBulletin** engine template has `absenceStrs` in five languages ("This user has not registered…", "Пользователь не зарегистрирован…", etc.). In a batch review of ~12 vBulletin forums (oneclickchicks, mirf, Pesiq, VKMOnline, forum.zone-game.info, etc.), **none** of the absence strings matched — the forums returned identical pages for both claimed and unclaimed usernames. Root cause: many of these forums require login to view member profiles, so they serve a generic page (no "user not registered" message at all) instead of an informative error.
**Lesson:** When a whole engine class shows false positives, do not patch sites one by one — check whether the **engine template** itself still matches the actual error pages. A template written for one version/language pack may silently stop working after a forum upgrade or config change.
### 7.9 Search-by-author URLs are architecturally unreliable
Several sites (OnanistovNet, Shoppingzone, Pogovorim, Astrogalaxy, Sexwin) used a phpBB-style `search.php?keywords=&terms=all&author={username}` URL as the check endpoint. This searches for **posts** by that author, not for the user account itself. Even if the markers worked, a user who exists but has zero posts would be indistinguishable from a non-existent user. And in practice, the sites changed their response format — some now return HTTP 404, others dropped the expected Russian absence text altogether.
**Lesson:** Avoid author-search URLs as the check endpoint; they test "has posts" rather than "account exists" and are doubly fragile (both logic mismatch and format drift).
### 7.10 Some sites generate a page for any path — permanent false positives
Two distinct patterns:
- **Pbase** creates a stub page titled "pbase Artist {username}" for **every** URL, real or fake. Both return HTTP 200 with nearly identical content (~3.3 KB). No markers can distinguish them.
- **ffm.bio** is even trickier: for the non-existent username `a.slomkoowski` it generated a page titled "mr.a" with description "a is a", apparently fuzzy-matching the path to the closest real entry. Both return HTTP 200 with large, content-rich pages.
**Lesson:** Before writing markers for a site, verify that the "unclaimed" URL actually produces an **error-like** response (different status, different title, unique error text). If the site always returns a plausible-looking page, no combination of `presenseStrs` / `absenceStrs` will help — `disabled: true` is the only safe option.
### 7.11 TLS fingerprinting can degrade over time (Kaggle)
Kaggle was previously fixed with a custom `User-Agent` header and `errors` for the "Checking your browser" captcha page. In the latest batch review, aiohttp receives HTTP 404 with identical content for **both** claimed and unclaimed usernames — the site now blocks the entire request before it reaches the profile page. This matches the TLS fingerprinting pattern seen earlier with Wikipedia (section 7.3), but here the degradation happened **after** a working fix was already in place.
**Lesson:** Sites that rely on bot-detection can tighten their rules at any time. A working `User-Agent` override today may fail tomorrow. When a previously fixed site starts returning identical responses for both usernames, suspect TLS fingerprinting first, and accept `disabled: true` if no public API is available.
### 7.12 API endpoints may bypass Cloudflare even when the main site is blocked
All four Fandom wikis returned HTTP 403 with a Cloudflare "Just a moment..." challenge when aiohttp accessed the user profile page (`/wiki/User:{username}`). However, the **MediaWiki API** on the same domain (`/api.php?action=query&list=users&ususers={username}&format=json`) returned clean JSON without any challenge. Similarly, **Substack** served a captcha-laden SPA for `/@{username}`, but its `public_profile` API (`/api/v1/user/{username}/public_profile`) responded with proper JSON and correct HTTP 404 for missing users.
This is likely because API routes are excluded from the Cloudflare WAF rules or use a different pipeline than the HTML-serving paths.
**Lesson:** When a site's main pages are blocked by Cloudflare or similar WAF, still check API endpoints on the **same domain** — they may not go through the same protection layer. This is especially true for:
- MediaWiki's `api.php` on wiki farms (Fandom, Wikia, self-hosted MediaWiki)
- REST API paths (`/api/v1/`, `/api/v2/`) on SPA-heavy sites
- Internal data endpoints that the SPA itself calls
### 7.13 GraphQL APIs often support GET, not just POST
**hashnode** exposes a GraphQL endpoint at `https://gql.hashnode.com`. While GraphQL is typically associated with POST requests, many implementations also support **GET** with the query passed as a URL parameter. This is critical for Maigret, which only supports GET/HEAD for `urlProbe`.
```
GET https://gql.hashnode.com?query=%7Buser(username%3A%20%22melwinalm%22)%20%7B%20name%20username%20%7D%7D
→ {"data":{"user":{"name":"Melwin D'Almeida","username":"melwinalm"}}}
GET https://gql.hashnode.com?query=%7Buser(username%3A%20%22a.slomkoowski%22)%20%7B%20name%20username%20%7D%7D
→ {"data":{"user":null}}
```
**Lesson:** Before giving up on a GraphQL-only site, try the same query via GET with `?query=...` (URL-encoded). Many GraphQL servers accept both methods.
### 7.14 URL-encoding resolves template placeholder conflicts
The hashnode GraphQL query `{user(username: "{username}") { name }}` contains curly braces that conflict with Maigret's `{username}` placeholder — Python's `str.format()` would raise a `KeyError` on `{user(username...}`.
The fix: URL-encode the GraphQL braces (`{` → `%7B`, `}` → `%7D`) but leave `{username}` as-is. Python's `.format()` only interprets literal `{…}` as placeholders, not `%7B…%7D`, and the GraphQL server decodes the percent-encoding on its end:
```
urlProbe: https://gql.hashnode.com?query=%7Buser(username%3A%20%22{username}%22)%20%7B%20name%20username%20%7D%7D
```
After `.format(username="melwinalm")`:
```
https://gql.hashnode.com?query=%7Buser(username%3A%20%22melwinalm%22)%20%7B%20name%20username%20%7D%7D
```
**Lesson:** When a `urlProbe` needs literal curly braces (GraphQL, JSON in URL, etc.), percent-encode them. This is a general technique for any `data.json` URL field processed by `.format()`.
### 7.7 The playbook classification works
The decision tree from the documentation accurately describes real-world cases:
| Situation | Playbook says | Actual result |
|-----------|---------------|---------------|
| Captcha (Baidu) | `disabled: true` | Correct |
| TLS fingerprinting (Wikipedia) | `disabled: true` (anti-bot) | Correct |
| Working API available (Reddit, MS Learn) | Use `urlProbe` | Correct |
| Service migrated (MSDN → MS Learn) | Update URL or create new entry | Correct |
---
## Documentation maintenance
For any of the changes below, **always** keep these artifacts in sync — this file ([`site-checks-guide.md`](site-checks-guide.md)), [`site-checks-playbook.md`](site-checks-playbook.md), and (when rules or templates change) the header/template in [`socid_extractor_improvements.log`](socid_extractor_improvements.log):
- Maigret code changes (including [`maigret/checking.py`](../maigret/checking.py), request executors, CLI);
- New or changed search tools / helper utilities for site checks;
- Changes to rules or semantics of `checkType`, `data.json` fields, self-check, etc.;
- Changes to the **public JSON API** diagnostic step or **mandatory** `socid_extractor` logging rules.
Prefer updating the guide, playbook, and log template in one commit or in the same task so instructions do not diverge. **Append-only:** new proposals go at the bottom of `socid_extractor_improvements.log`; do not delete historical entries when editing the template.
+87
View File
@@ -0,0 +1,87 @@
# Site checks — playbook (Maigret)
Short checklist for edits to [`maigret/resources/data.json`](../maigret/resources/data.json) and, when needed, [`maigret/checking.py`](../maigret/checking.py). Full guide: [`site-checks-guide.md`](site-checks-guide.md). Upstream extraction proposals: [`socid_extractor_improvements.log`](socid_extractor_improvements.log).
**Documentation maintenance:** whenever you improve Maigret, add search tooling, or change check logic, update **both** this file and [`site-checks-guide.md`](site-checks-guide.md) (see the “Documentation maintenance” section at the end of that file). When JSON API / `socid_extractor` logging rules change, update the **template header** in [`socid_extractor_improvements.log`](socid_extractor_improvements.log) in the same change.
## 0. Standard checks (do alongside reproduce / classify)
- **Public JSON API:** always look for a stable JSON (or GraphQL JSON) profile endpoint (`/api/`, `.json`, mobile-style URLs). When the API is more reliable than HTML, set **`urlProbe`** to that endpoint and keep **`url`** as the human-readable profile link (e.g. `https://picsart.com/u/{username}`). If there is no separate profile URL, use the API as `url` only. Details: **`urlProbe`** and section **2.1** in [`site-checks-guide.md`](site-checks-guide.md).
- **`socid_extractor` log (mandatory):** if you find **embedded user JSON in HTML** or a **standalone JSON profile API**, append a dated entry (with **example username**) to [`socid_extractor_improvements.log`](socid_extractor_improvements.log). Details: section **2.2** in [`site-checks-guide.md`](site-checks-guide.md).
## 1. Reproduce
- Run a targeted check:
`maigret USER --db /path/to/maigret/resources/data.json --site "SiteName" --print-not-found --print-errors --no-progressbar -vv`
- Compare an **existing** and a **non-existent** username (as `usernameClaimed` / `usernameUnclaimed` in JSON).
- With `-vvv`, inspect `debug.log` (raw response in the log).
## 2. Classify the cause
| Symptom | Typical cause | Action |
|--------|-----------------|--------|
| HTTP 200 for “user does not exist” | Soft 404 | Move from `status_code` to `message` or `response_url`; add `absenceStrs` / narrow `presenseStrs` |
| Generic words match (`name`, `email`) | `presenseStrs` too broad | Remove generic markers; add profile-specific ones |
| Same HTML without JS | SPA / skeleton shell | Compare **final URL and HTTP redirects** (Maigret already follows redirects by default). If the browser shows extra routes (`/posts`, `/not-found`) only **after JS**, they will **not** appear to Maigret — try a **public JSON/API** endpoint for the same site if one exists. See **Redirects and final URL** and **Picsart** in [`site-checks-guide.md`](site-checks-guide.md). |
| 403 / “Log in” / guest-only | Auth or anti-bot required | `disabled: true` |
| reCAPTCHA / “Checking your browser” | Bot protection | Try a reasonable `User-Agent` in `headers`; else `errors` + UNKNOWN or `disabled` |
| Domain does not resolve / persistent timeout | Dead service | Remove entry **only** after confirming the domain is dead |
## 3. Data edits
1. Update `url` / `urlMain` if needed (HTTPS redirects). Use optional **`urlProbe`** when the HTTP check should hit a different URL than the profile link shown in reports (API vs web UI).
2. For `message`: **always** tune string pairs so `absenceStrs` fire on “no user” pages and `presenseStrs` fire on real profiles without false absence hits.
3. Engine (`engine`, e.g. XenForo): override only differing fields in the site entry so other sites are not broken.
4. Keep `status_code` only if the response **reliably** differs by status code without soft 404.
## 4. Verify
- `maigret --self-check --site "SiteName" --db ...` for touched entries.
- `make test` before commit.
## 5. Code notes
- `process_site_result` uses strict comparison to `"status_code"` for `checkType` (not a substring trick).
- Empty `presenseStrs` with `message` means “presence always true”; a debug line is logged only at DEBUG level.
## 6. Development utilities
Quick reference for site check utilities. Full details: section **6** in [`site-checks-guide.md`](site-checks-guide.md).
| Command | Purpose |
|---------|---------|
| `python utils/site_check.py --site "X" --check-claimed` | Quick aiohttp comparison |
| `python utils/site_check.py --site "X" --maigret` | Test via Maigret checker |
| `python utils/site_check.py --site "X" --compare-methods` | Find aiohttp vs Maigret discrepancies |
| `python utils/site_check.py --site "X" --diagnose` | Full diagnosis with fix recommendations |
| `python utils/check_top_n.py --top 100` | Mass-check top 100 sites |
| `maigret --self-check --site "X"` | Self-check (reports only, no auto-disable) |
| `maigret --self-check --site "X" --auto-disable` | Self-check with auto-disable |
| `maigret --self-check --site "X" --diagnose` | Self-check with detailed diagnosis |
## 7. Quick tips (lessons learned)
Practical observations from fixing top-ranked sites. Full details: section **7** in [`site-checks-guide.md`](site-checks-guide.md).
| Tip | Why it matters |
|-----|----------------|
| **API first** | Reddit, Microsoft Learn — APIs worked when web pages were blocked. Always check `/api/`, `.json` endpoints. |
| **`urlProbe` separates check from display** | Check via API, show human URL in reports. Example: Reddit API → `www.reddit.com/user/` link. |
| **aiohttp ≠ curl** | Wikipedia returned 200 for curl, 403 for aiohttp (TLS fingerprinting). Always test with Maigret directly. |
| **Use `debug.log`** | Run with `-vvv` to see raw response. Warning messages alone can be misleading. |
| **`status_code` for clean APIs** | If API returns proper 404 for missing users, prefer `status_code` over `message`. |
| **Migrate, don't delete** | MSDN → Microsoft Learn: keep old entry disabled, create new one for current service. |
| **Engine templates break silently** | vBulletin `absenceStrs` failed on ~12 forums at once — many require login, showing a generic page with no error text. Check the engine template first. |
| **Search-by-author is unreliable** | phpBB `search.php?author=` checks for posts, not accounts. A user with zero posts looks identical to a non-existent user. Avoid these URLs. |
| **Some sites always generate a page** | Pbase stubs "pbase Artist {name}" for any path; ffm.bio fuzzy-matches to the nearest real entry. No markers can help — `disabled: true`. |
| **TLS fingerprinting degrades over time** | Kaggle's custom `User-Agent` fix stopped working — aiohttp now gets 404 for both usernames. Accept `disabled: true` when no API exists. |
| **API endpoints bypass Cloudflare** | Fandom `api.php` and Substack `/api/v1/` returned clean JSON while main pages were blocked by Cloudflare. Always try API paths on the same domain. |
| **Inspect Network tab for POST APIs** | Many modern platforms (e.g., Discord) heavily protect HTML profiles but expose unauthenticated `POST` endpoints for username checks. Maigret supports this natively: define `"request_method": "POST"` and `"request_payload": {"username": "{username}"}` in `data.json` to query them! |
| **Strict JSON markers are bulletproof** | When probing APIs, use `checkType: "message"` with exact JSON substrings (like `"{\"taken\": false}"`). Unlike HTML layout checks, this approach is immune to UI redesigns, A/B testing, and language translations. |
| **GraphQL supports GET too** | hashnode GraphQL works via `GET ?query=...` (URL-encoded). You can use either native POST payloads or GET `urlProbe` for GraphQL. |
| **URL-encode braces for template safety** | GraphQL `{...}` conflicts with Maigret's `{username}`. Use `%7B`/`%7D` for literal braces in `urlProbe``.format()` ignores percent-encoded chars. |
| **Anti-bot bypass via simple UA** | "Anubis" anti-bot PoW screens (like on Weblate) intercept modern browser UAs via HTTP 307. Hardcoding `"headers": {"User-Agent": "python-requests/2.25.1"}` circumvents the scraper filter and restores default detection logic. |
## 8. Documentation maintenance
When you change Maigret, add search tools, or change check logic, keep **this playbook**, [`site-checks-guide.md`](site-checks-guide.md), and (when applicable) the template in [`socid_extractor_improvements.log`](socid_extractor_improvements.log) aligned. New log **entries** are append-only at the bottom of that file.
+4
View File
@@ -0,0 +1,4 @@
include LICENSE
include README.md
include requirements.txt
include maigret/resources/*
+1 -15
View File
@@ -25,7 +25,7 @@
<i>The Commissioner Jules Maigret is a fictional French police detective, created by Georges Simenon. His investigation method is based on understanding the personality of different people and their interactions.</i>
<b>👉👉👉 [Online Telegram bot](https://t.me/maigret_search_bot) | 🏢 [Commercial use & API](#commercial-use)</b>
<b>👉👉👉 [Online Telegram bot](https://t.me/maigret_search_bot)</b>
## About
@@ -112,10 +112,6 @@ docker run -v /mydir:/app/reports soxoj/maigret:latest username --html
docker build -t maigret .
```
### Troubleshooting
If you encounter build errors during installation, check the [troubleshooting guide](https://maigret.readthedocs.io/en/latest/installation.html#troubleshooting).
## Usage examples
```bash
@@ -196,16 +192,6 @@ The authors and developers of this tool bear no responsibility for any misuse or
If you have any questions, suggestions, or feedback, please feel free to [open an issue](https://github.com/soxoj/maigret/issues), create a [GitHub discussion](https://github.com/soxoj/maigret/discussions), or contact the author directly via [Telegram](https://t.me/soxoj).
## Commercial Use
If you need a **daily updated database** of supported sites or an **API for username checks**, feel free to reach out:
📧 [maigret@soxoj.com](mailto:maigret@soxoj.com)
Available options:
- Up-to-date site database - regularly maintained and updated list of 5K+ sites, delivered daily
- Username check API - programmatic access to Maigret's search capabilities for integration into your products
## SOWEL classification
This tool uses the following OSINT techniques:
+7 -106
View File
@@ -82,63 +82,11 @@ id types, sites will be filtered automatically.
ids. Useful for repeated scanning with found known irrelevant usernames.
``--db`` - Load Maigret database from a JSON file or an online, valid,
JSON file. See :ref:`custom-database` below.
``--no-autoupdate`` - Disable the automatic database update check that
runs at startup. The currently cached (or bundled) database is used
as-is.
``--force-update`` - Force a database update check at startup, ignoring
the usual check interval. Implies ``--no-autoupdate`` for the rest of
the run after the explicit update finishes.
JSON file.
``--retries RETRIES`` - Count of attempts to restart temporarily failed
requests.
.. _custom-database:
Using a custom sites database
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The ``--db`` flag accepts three forms:
1. **HTTP(S) URL** — fetched as-is, e.g.
``--db https://example.com/my_db.json``.
2. **Local file path** — absolute (``--db /tmp/private.json``) or
relative to the current working directory
(``--db LLM/maigret_private_db.json``).
3. **Module-relative path** — kept for backwards compatibility, resolved
against the installed ``maigret/`` package directory (e.g. the
default ``resources/data.json``).
Resolution order for local paths: the path is first tried as given
(absolute or cwd-relative); if that file does not exist, Maigret falls
back to the legacy module-relative resolution. If neither location
contains the file, Maigret exits with an error rather than silently
loading the bundled database.
When ``--db`` points to a custom file, automatic database updates are
skipped — the file is used exactly as provided.
On every run Maigret prints the database it actually loaded, for
example::
[+] Using sites database: /path/to/maigret_private_db.json (6 sites)
If loading the requested database fails for any other reason (corrupt
JSON, missing required keys, …), Maigret prints a warning, falls back
to the bundled database, and reports the fallback explicitly::
[-] Falling back to bundled database: /…/maigret/resources/data.json
[+] Using sites database: /…/maigret/resources/data.json (3154 sites)
A typical invocation against a private database, with auto-update
disabled and all sites scanned, looks like::
python3 -m maigret username \
--db LLM/maigret_private_db.json \
--no-autoupdate -a
Reports
-------
@@ -158,9 +106,6 @@ username).
``-J``, ``--json`` - Generate a JSON report of specific type: simple,
ndjson (one report per username). E.g. ``--json ndjson``
``-M``, ``--md`` - Generate a Markdown report (general report on all
usernames). See :ref:`markdown-report` below.
``-fo``, ``--folderoutput`` - Results will be saved to this folder,
``results`` by default. Will be created if doesnt exist.
@@ -185,60 +130,16 @@ Other operations modes
``--version`` - Display version information and dependencies.
``--self-check`` - Do self-checking for sites and database. Each site is
tested by looking up its known-claimed and known-unclaimed usernames and
verifying that the results match expectations. Individual site failures
(network errors, unexpected exceptions, etc.) are caught and logged
without stopping the overall process, so the check always runs to
completion. After checking, Maigret reports a summary of issues found.
If any sites were disabled (see ``--auto-disable``), Maigret asks if you
want to save updates; answering y/Y will rewrite the local database.
``--auto-disable`` - Used with ``--self-check``: automatically disable
sites that fail checks (incorrect detection of claimed/unclaimed
usernames, connection errors, or unexpected exceptions). Without this
flag, ``--self-check`` only **reports** issues without modifying the
database.
``--diagnose`` - Used with ``--self-check``: print detailed diagnosis
information for each failing site, including the check type, the list
of issues found, and recommendations (e.g. suggesting a different
``checkType``).
``--self-check`` - Do self-checking for sites and database and disable
non-working ones **for current search session** by default. Its useful
for testing new internet connection (it depends on provider/hosting on
which sites there will be censorship stub or captcha display). After
checking Maigret asks if you want to save updates, answering y/Y will
rewrite the local database.
``--submit URL`` - Do an automatic analysis of the given account URL or
site main page URL to determine the site engine and methods to check
account presence. After checking Maigret asks if you want to add the
site, answering y/Y will rewrite the local database.
.. _markdown-report:
Markdown report (LLM-friendly)
------------------------------
The ``--md`` / ``-M`` flag generates a Markdown report designed for both human reading and analysis by AI assistants (ChatGPT, Claude, etc.).
.. code-block:: console
maigret username --md
The report includes:
- **Summary** with aggregated personal data (all fullnames, locations, bios found across accounts), country tags, website tags, first/last seen timestamps.
- **Per-account sections** with profile URL, site tags, and all extracted fields (username, bio, follower count, linked accounts, etc.).
- **Possible false positives** disclaimer explaining that accounts may belong to different people.
- **Ethical use** notice about applicable data protection laws.
**Using with AI tools:**
The Markdown format is optimized for LLM context windows. You can feed the report directly to an AI assistant for follow-up analysis:
.. code-block:: console
# Generate the report
maigret johndoe --md
# Feed it to an AI tool
cat reports/report_johndoe.md | llm "Analyze this OSINT report and summarize key findings"
The structured Markdown with per-site sections makes it easy for AI tools to extract relationships, cross-reference identities, and identify patterns across accounts.
-39
View File
@@ -69,21 +69,6 @@ Use the following commands to check Maigret:
make speed
Site naming conventions
-----------------------------------------------
Site names are the keys in ``data.json`` and appear in user-facing reports. Follow these rules:
- **Title Case** by default: ``Product Hunt``, ``Hacker News``.
- **Lowercase** only if the brand itself is written that way: ``kofi``, ``note``, ``hi5``.
- **No domain suffix** (``calendly.com````Calendly``), unless the domain is part of the recognized brand name: ``last.fm``, ``VC.ru``, ``Archive.org``.
- **No full UPPERCASE** unless the brand is an acronym: ``VK``, ``CNET``, ``ICQ``, ``IFTTT``.
- **No** ``www.`` **or** ``https://`` **prefix** in the name.
- **Spaces** are allowed when the brand uses them: ``Star Citizen``, ``Google Maps``.
- **{username} templates** in names are acceptable: ``{username}.tilda.ws``.
When in doubt, check how the service refers to itself on its homepage.
How to fix false-positives
-----------------------------------------------
@@ -137,30 +122,6 @@ There are few options for sites data.json helpful in various cases:
- ``regexCheck`` - a regex to check if the username is valid, in case of frequent false-positives
- ``requestMethod`` - set the HTTP method to use (e.g., ``POST``). By default, Maigret natively defaults to GET or HEAD.
- ``requestPayload`` - a dictionary with the JSON payload to send for POST requests (e.g., ``{"username": "{username}"}``), extremely useful for parsing GraphQL or modern JSON APIs.
- ``protection`` - a list of protection types detected on the site (see below).
``protection`` (site protection tracking)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The ``protection`` field records what kind of anti-bot protection a site uses. Maigret reads this field and automatically applies the appropriate bypass mechanism.
Supported values:
- ``tls_fingerprint`` — the site fingerprints the TLS handshake (JA3/JA4) and blocks non-browser clients. Maigret automatically uses ``curl_cffi`` with Chrome browser emulation to bypass this. Requires the ``curl_cffi`` package (included as a dependency). Examples: Instagram, NPM, Codepen, Kickstarter, Letterboxd.
- ``ip_reputation`` — the site blocks requests from datacenter/cloud IPs regardless of headers or TLS. Cannot be bypassed automatically; run Maigret from a regular internet connection (not a datacenter) or use a proxy (``--proxy``). Examples: Reddit, Patreon, Figma.
- ``js_challenge`` — the site serves a JavaScript challenge page (e.g. "Just a moment...") that cannot be solved without a browser. Maigret detects challenge signatures and returns UNKNOWN instead of a false positive.
Example:
.. code-block:: json
"Instagram": {
"url": "https://www.instagram.com/{username}/",
"checkType": "message",
"presenseStrs": ["\"routePath\":\"\\/"],
"absenceStrs": ["\"routePath\":null"],
"protection": ["tls_fingerprint"]
}
``urlProbe`` (optional profile probe URL)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-29
View File
@@ -170,35 +170,6 @@ Maigret will do retries of the requests with temporary errors got (connection fa
One attempt by default, can be changed with option ``--retries N``.
Database self-check
-------------------
Maigret includes a self-check mode (``--self-check``) that validates every site
in the database by looking up its known-claimed and known-unclaimed usernames
and verifying that the detection results match expectations.
The self-check is **error-resilient**: if an individual site check raises an
unexpected exception (e.g. a network error or a parsing failure), the error is
caught, logged, and recorded as an issue — the remaining sites continue to be
checked without interruption. This means the process always runs to completion,
even when checking hundreds of sites with ``-a --self-check``.
Use ``--auto-disable`` together with ``--self-check`` to automatically disable
sites that fail checks. Without it, issues are only reported. Use ``--diagnose``
to print detailed per-site diagnosis including the check type, specific issues,
and recommendations.
.. code-block:: console
# Report-only mode (no changes to the database)
maigret --self-check
# Automatically disable failing sites and save updates
maigret -a --self-check --auto-disable
# Show detailed diagnosis for each failing site
maigret -a --self-check --diagnose
Archives and mirrors checking
-----------------------------
-36
View File
@@ -90,39 +90,3 @@ Docker
# manual build
docker build -t maigret .
Troubleshooting
---------------
If you encounter build errors during installation such as ``cannot find ft2build.h``
or errors related to ``reportlab`` / ``_renderPM``, you need to install system-level
dependencies required to compile native extensions.
**Debian/Ubuntu/Kali:**
.. code-block:: bash
sudo apt install -y libfreetype6-dev libjpeg-dev libffi-dev
**Fedora/RHEL/CentOS:**
.. code-block:: bash
sudo dnf install -y freetype-devel libjpeg-devel libffi-devel
**Arch Linux:**
.. code-block:: bash
sudo pacman -S freetype2 libjpeg-turbo libffi
**macOS (Homebrew):**
.. code-block:: bash
brew install freetype
After installing the system dependencies, retry the maigret installation.
If you continue to have issues, consider using Docker instead, which includes all
necessary dependencies.
-74
View File
@@ -27,77 +27,3 @@ Missing any of these files is not an error.
If the next settings file contains already known option,
this option will be rewrited. So it is possible to make
custom configuration for different users and directories.
.. _database-auto-update:
Database auto-update
--------------------
Maigret ships with a bundled site database, but it gets outdated between releases. To keep the database current, Maigret automatically checks for updates on startup.
**How it works:**
1. On startup, Maigret checks if more than 24 hours have passed since the last update check.
2. If so, it fetches a lightweight metadata file (~200 bytes) from GitHub to see if a newer database is available.
3. If a newer, compatible database exists, Maigret downloads it to ``~/.maigret/data.json`` and uses it instead of the bundled copy.
4. If the download fails or the new database is incompatible with your Maigret version, the bundled database is used as a fallback.
The downloaded database has **higher priority** than the bundled one — it replaces, not overlays.
**Status messages** are printed only when an action occurs:
.. code-block:: text
[*] DB auto-update: checking for updates...
[+] DB auto-update: database updated successfully (3180 sites)
[*] DB auto-update: database is up to date (3157 sites)
[!] DB auto-update: latest database requires maigret >= 0.6.0, you have 0.5.0
**Forcing an update:**
Use the ``--force-update`` flag to check for updates immediately, ignoring the check interval:
.. code-block:: console
maigret username --force-update
The update happens at startup, then the search continues normally with the freshly downloaded database.
**Disabling auto-update:**
Use the ``--no-autoupdate`` flag to skip the update check entirely:
.. code-block:: console
maigret username --no-autoupdate
Or set it permanently in ``~/.maigret/settings.json``:
.. code-block:: json
{
"no_autoupdate": true
}
This is recommended for **Docker containers**, **CI pipelines**, and **air-gapped environments**.
**Configuration options** (in ``settings.json``):
.. list-table::
:header-rows: 1
:widths: 35 15 50
* - Setting
- Default
- Description
* - ``no_autoupdate``
- ``false``
- Disable auto-update entirely
* - ``autoupdate_check_interval_hours``
- ``24``
- How often to check for updates (in hours)
* - ``db_update_meta_url``
- GitHub raw URL
- URL of the metadata file (for custom mirrors)
**Using a custom database** with ``--db`` always skips auto-update — you are explicitly choosing your data source.
+1 -6
View File
@@ -10,12 +10,7 @@ The use of tags allows you to select a subset of the sites from big Maigret DB f
There are several types of tags:
1. **Country codes**: ``us``, ``jp``, ``br``... (`ISO 3166-1 alpha-2 <https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2>`_). A country tag means that having an account on the site implies a connection to that country — either origin or residence. The goal is attribution, not perfect accuracy.
- **Global sites** (GitHub, YouTube, Reddit, Medium, etc.) get **no country tag** — an account there says nothing about where a person is from.
- **Regional/local sites** where an account implies a specific country **must** have a country tag: ``VK````ru``, ``Naver````kr``, ``Zhihu````cn``.
- Multiple country tags are allowed when a service is used predominantly in a few countries (e.g. ``Xing````de``, ``eu``).
- Do **not** assign country tags based on traffic statistics alone — a site popular in India by traffic is not "Indian" if it is used globally.
1. **Country codes**: ``us``, ``jp``, ``br``... (`ISO 3166-1 alpha-2 <https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2>`_). These tags reflect the site language and regional origin of its users and are then used to locate the owner of a username. If the regional origin is difficult to establish or a site is positioned as worldwide, `no country code is given`. There could be multiple country code tags for one site.
2. **Site engines**. Most of them are forum engines now: ``uCoz``, ``vBulletin``, ``XenForo`` et al. Full list of engines stored in the Maigret database.
+1 -1
View File
@@ -33,7 +33,7 @@ Use Cases
If you experience many false positives, you can do the following:
- Install the last development version of Maigret from GitHub
- Run Maigret with ``--self-check --auto-disable`` flag and agree on disabling of problematic sites
- Run Maigret with ``--self-check`` flag and agree on disabling of problematic sites
3. Search for accounts with username ``machine42`` and generate HTML and PDF reports.
+1 -1
View File
@@ -1,3 +1,3 @@
"""Maigret version file"""
__version__ = '0.6.0'
__version__ = '0.5.0'
+14 -3
View File
@@ -30,6 +30,17 @@ class ParsingActivator:
jwt_token = r.json()["jwt"]
site.headers["Authorization"] = "jwt " + jwt_token
@staticmethod
def spotify(site, logger, cookies={}):
headers = dict(site.headers)
if "Authorization" in headers:
del headers["Authorization"]
import requests
r = requests.get(site.activation["url"])
bearer_token = r.json()["accessToken"]
site.headers["authorization"] = f"Bearer {bearer_token}"
@staticmethod
def weibo(site, logger):
headers = dict(site.headers)
@@ -43,7 +54,7 @@ class ParsingActivator:
logger.debug(
f"1 stage: {'success' if r.status_code == 302 else 'no 302 redirect, fail!'}"
)
location = r.headers.get("Location", "")
location = r.headers.get("Location")
# 2 stage: go to passport visitor page
headers["Referer"] = location
@@ -73,9 +84,9 @@ def import_aiohttp_cookies(cookiestxt_filename):
cookies = CookieJar()
cookies_list = []
for domain in cookies_obj._cookies.values(): # type: ignore[attr-defined]
for domain in cookies_obj._cookies.values():
for key, cookie in list(domain.values())[0].items():
c: Morsel = Morsel()
c = Morsel()
c.set(key, cookie.value, cookie.value)
c["domain"] = cookie.domain
c["path"] = cookie.path
+20 -128
View File
@@ -6,7 +6,7 @@ import random
import re
import ssl
import sys
from typing import Any, Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Tuple
from urllib.parse import quote
# Third party imports
@@ -15,7 +15,7 @@ from alive_progress import alive_bar
from aiohttp import ClientSession, TCPConnector, http_exceptions
from aiohttp.client_exceptions import ClientConnectorError, ServerDisconnectedError
from python_socks import _errors as proxy_errors
from socid_extractor import extract # type: ignore[import-not-found]
from socid_extractor import extract
try:
from mock import Mock
@@ -80,7 +80,7 @@ class SimpleAiohttpChecker(CheckerBase):
async def _make_request(
self, session, url, headers, allow_redirects, timeout, method, logger, payload=None
) -> Tuple[Optional[str], int, Optional[CheckError]]:
) -> Tuple[str, int, Optional[CheckError]]:
try:
if method.lower() == 'get':
request_method = session.get
@@ -136,21 +136,15 @@ class SimpleAiohttpChecker(CheckerBase):
logger.debug(e, exc_info=True)
return None, 0, CheckError("Unexpected", str(e))
async def check(self) -> Tuple[Optional[str], int, Optional[CheckError]]:
async def check(self) -> Tuple[str, int, Optional[CheckError]]:
from aiohttp_socks import ProxyConnector
# Use a real SSL context instead of ssl=False to avoid TLS fingerprinting
# blocks by Cloudflare and similar WAFs. Certificate verification is
# disabled to handle sites with invalid/expired certs.
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
connector = (
ProxyConnector.from_url(self.proxy)
if self.proxy
else TCPConnector(ssl=ssl_context)
else TCPConnector(ssl=False)
)
connector.verify_ssl = False
async with ClientSession(
connector=connector,
@@ -195,7 +189,7 @@ class AiodnsDomainResolver(CheckerBase):
self.url = url
return None
async def check(self) -> Tuple[Optional[str], int, Optional[CheckError]]:
async def check(self) -> Tuple[str, int, Optional[CheckError]]:
status = 404
error = None
text = ''
@@ -213,76 +207,6 @@ class AiodnsDomainResolver(CheckerBase):
return text, status, error
try:
from curl_cffi.requests import AsyncSession as CurlCffiAsyncSession
CURL_CFFI_AVAILABLE = True
except ImportError:
CURL_CFFI_AVAILABLE = False
class CurlCffiChecker(CheckerBase):
"""Checker using curl_cffi to emulate browser TLS fingerprint and bypass WAF."""
def __init__(self, *args, **kwargs):
self.logger = kwargs.get('logger', Mock())
self.browser_emulate = kwargs.get('browser_emulate', 'chrome')
self.url = None
self.headers = None
self.allow_redirects = True
self.timeout = 0
self.method = 'get'
self.payload = None
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get', payload=None):
self.url = url
self.headers = headers
self.allow_redirects = allow_redirects
self.timeout = timeout
self.method = method
self.payload = payload
return None
async def close(self):
pass
async def check(self) -> Tuple[Optional[str], int, Optional[CheckError]]:
try:
async with CurlCffiAsyncSession() as session:
kwargs = {
'url': self.url,
'headers': self.headers,
'allow_redirects': self.allow_redirects,
'timeout': self.timeout if self.timeout else 10,
'impersonate': self.browser_emulate,
}
if self.payload and self.method.lower() == 'post':
kwargs['json'] = self.payload
if self.method.lower() == 'post':
response = await session.post(**kwargs)
elif self.method.lower() == 'head':
response = await session.head(**kwargs)
else:
response = await session.get(**kwargs)
status_code = response.status_code
decoded_content = response.text
self.logger.debug(decoded_content)
error = CheckError("Connection lost") if status_code == 0 else None
return decoded_content, status_code, error
except asyncio.TimeoutError as e:
return None, 0, CheckError("Request timeout", str(e))
except KeyboardInterrupt:
return None, 0, CheckError("Interrupted")
except Exception as e:
self.logger.debug(e, exc_info=True)
return None, 0, CheckError("Unexpected", str(e))
class CheckerMock:
def __init__(self, *args, **kwargs):
pass
@@ -290,7 +214,7 @@ class CheckerMock:
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get', payload=None):
return None
async def check(self) -> Tuple[Optional[str], int, Optional[CheckError]]:
async def check(self) -> Tuple[str, int, Optional[CheckError]]:
await asyncio.sleep(0)
return '', 0, None
@@ -539,17 +463,7 @@ def make_site_result(
# workaround to prevent slash errors
url = re.sub("(?<!:)/+", "/", url)
# Select checker: use curl_cffi for sites requiring TLS impersonation
needs_impersonation = 'tls_fingerprint' in site.protection
if needs_impersonation and CURL_CFFI_AVAILABLE:
checker = CurlCffiChecker(logger=logger, browser_emulate='chrome')
elif needs_impersonation and not CURL_CFFI_AVAILABLE:
logger.warning(
f"Site {site.name} requires TLS impersonation (curl_cffi) but it's not installed. "
"Install with: pip install curl_cffi"
)
checker = options["checkers"][site.protocol]
else:
# always clearweb_checker for now
checker = options["checkers"][site.protocol]
# site check is disabled
@@ -885,7 +799,7 @@ async def maigret(
with alive_bar(
len(tasks_dict), title="Searching", force_tty=True, disable=no_progressbar
) as progress:
async for result in executor.run(list(tasks_dict.values())): # type: ignore[arg-type]
async for result in executor.run(tasks_dict.values()):
cur_results.append(result)
progress()
@@ -961,13 +875,12 @@ async def site_self_check(
If False (default), only report issues without disabling.
diagnose: If True, print detailed diagnosis information.
"""
changes: Dict[str, Any] = {
changes = {
"disabled": False,
"issues": [],
"recommendations": [],
}
try:
check_data = [
(site.username_claimed, MaigretCheckStatus.CLAIMED),
(site.username_unclaimed, MaigretCheckStatus.AVAILABLE),
@@ -1009,7 +922,7 @@ async def site_self_check(
results_cache[username] = results_dict[site.name]
if result.error and 'Cannot connect to host' in result.error.desc:
changes["issues"].append("Cannot connect to host")
changes["issues"].append(f"Cannot connect to host")
if auto_disable:
changes["disabled"] = True
@@ -1067,11 +980,11 @@ async def site_self_check(
if diagnose and changes["issues"]:
print(f"\n--- {site.name} DIAGNOSIS ---")
print(f" Check type: {site.check_type}")
print(" Issues:")
print(f" Issues:")
for issue in changes["issues"]:
print(f" - {issue}")
if changes["recommendations"]:
print(" Recommendations:")
print(f" Recommendations:")
for rec in changes["recommendations"]:
print(f" -> {rec}")
@@ -1092,19 +1005,6 @@ async def site_self_check(
site.tags.remove("unchecked")
db.update_site(site)
except Exception as e:
logger.warning(
f"Self-check of {site.name} failed with unexpected error: {e}",
exc_info=True,
)
changes["issues"].append(f"Unexpected error: {e}")
if auto_disable and not site.disabled:
changes["disabled"] = True
site.disabled = True
db.update_site(site)
if not silent:
print(f"Disabled site {site.name} (unexpected error)...")
return changes
@@ -1119,7 +1019,6 @@ async def self_check(
i2p_proxy=None,
auto_disable=False,
diagnose=False,
no_progressbar=False,
) -> dict:
"""
Run self-check on sites.
@@ -1154,20 +1053,9 @@ async def self_check(
tasks.append((site.name, future))
if tasks:
with alive_bar(len(tasks), title='Self-checking', force_tty=True, disable=no_progressbar) as progress:
with alive_bar(len(tasks), title='Self-checking', force_tty=True) as progress:
for site_name, f in tasks:
try:
result = await f
except Exception as e:
logger.warning(
f"Self-check task for {site_name} raised unexpected error: {e}",
exc_info=True,
)
result = {
"disabled": False,
"issues": [f"Unexpected error: {e}"],
"recommendations": [],
}
result['site_name'] = site_name
all_results.append(result)
progress() # Update the progress bar
@@ -1203,6 +1091,10 @@ async def self_check(
needs_update = total_disabled != 0 or unchecked_new_count != unchecked_old_count
# For backwards compatibility, return bool if auto_disable is True
if auto_disable:
return needs_update
return {
'needs_update': needs_update,
'results': all_results,
@@ -1226,7 +1118,7 @@ def parse_usernames(extracted_ids_data, logger) -> Dict:
elif "usernames" in k:
try:
tree = ast.literal_eval(v)
if isinstance(tree, list):
if type(tree) == list:
for n in tree:
new_usernames[n] = "username"
except Exception as e:
-342
View File
@@ -1,342 +0,0 @@
"""
Database auto-update logic for maigret.
Checks a lightweight meta file to determine if a newer site database is available,
downloads it if compatible, and caches it locally in ~/.maigret/.
"""
import hashlib
import json
import logging
import os
import os.path as path
import tempfile
from datetime import datetime, timezone
from typing import Optional
import requests
from colorama import Fore, Style
from .__version__ import __version__
logger = logging.getLogger("maigret")
_use_color = True
def _print_info(msg: str) -> None:
text = f"[*] {msg}"
if _use_color:
print(Style.BRIGHT + Fore.GREEN + text + Style.RESET_ALL)
else:
print(text)
def _print_success(msg: str) -> None:
text = f"[+] {msg}"
if _use_color:
print(Style.BRIGHT + Fore.GREEN + text + Style.RESET_ALL)
else:
print(text)
def _print_warning(msg: str) -> None:
text = f"[!] {msg}"
if _use_color:
print(Style.BRIGHT + Fore.YELLOW + text + Style.RESET_ALL)
else:
print(text)
DEFAULT_META_URL = (
"https://raw.githubusercontent.com/soxoj/maigret/main/maigret/resources/db_meta.json"
)
DEFAULT_CHECK_INTERVAL_HOURS = 24
MAIGRET_HOME = path.expanduser("~/.maigret")
CACHED_DB_PATH = path.join(MAIGRET_HOME, "data.json")
STATE_PATH = path.join(MAIGRET_HOME, "autoupdate_state.json")
BUNDLED_DB_PATH = path.join(path.dirname(path.realpath(__file__)), "resources", "data.json")
def _parse_version(version_str: str) -> tuple:
"""Parse a version string like '0.5.0' into a comparable tuple (0, 5, 0)."""
try:
return tuple(int(x) for x in version_str.strip().split("."))
except (ValueError, AttributeError):
return (0, 0, 0)
def _ensure_maigret_home() -> None:
os.makedirs(MAIGRET_HOME, exist_ok=True)
def _load_state() -> dict:
try:
with open(STATE_PATH, "r", encoding="utf-8") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError, OSError):
return {}
def _save_state(state: dict) -> None:
_ensure_maigret_home()
tmp_path = STATE_PATH + ".tmp"
try:
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(state, f, indent=2, ensure_ascii=False)
os.replace(tmp_path, STATE_PATH)
except OSError:
try:
os.unlink(tmp_path)
except OSError:
pass
def _needs_check(state: dict, interval_hours: int) -> bool:
last_check = state.get("last_check_at")
if not last_check:
return True
try:
last_dt = datetime.fromisoformat(last_check.replace("Z", "+00:00"))
elapsed = (datetime.now(timezone.utc) - last_dt).total_seconds() / 3600
return elapsed >= interval_hours
except (ValueError, TypeError):
return True
def _fetch_meta(meta_url: str, timeout: int = 10) -> Optional[dict]:
try:
response = requests.get(meta_url, timeout=timeout)
if response.status_code == 200:
return response.json()
except Exception:
pass
return None
def _is_version_compatible(meta: dict) -> bool:
min_ver = meta.get("min_maigret_version", "0.0.0")
return _parse_version(__version__) >= _parse_version(min_ver)
def _is_update_available(meta: dict, state: dict) -> bool:
if not path.isfile(CACHED_DB_PATH):
return True
remote_date = meta.get("updated_at", "")
cached_date = state.get("last_meta", {}).get("updated_at", "")
return remote_date > cached_date
def _download_and_verify(data_url: str, expected_sha256: str, timeout: int = 60) -> Optional[str]:
_ensure_maigret_home()
tmp_fd, tmp_path = tempfile.mkstemp(dir=MAIGRET_HOME, suffix=".json")
try:
response = requests.get(data_url, timeout=timeout)
if response.status_code != 200:
return None
content = response.content
actual_sha256 = hashlib.sha256(content).hexdigest()
if actual_sha256 != expected_sha256:
_print_warning("DB auto-update: SHA-256 mismatch, download rejected")
return None
# Validate JSON structure
data = json.loads(content)
if not all(k in data for k in ("sites", "engines", "tags")):
_print_warning("DB auto-update: invalid database structure")
return None
os.write(tmp_fd, content)
os.close(tmp_fd)
tmp_fd = None
os.replace(tmp_path, CACHED_DB_PATH)
return CACHED_DB_PATH
except Exception:
return None
finally:
if tmp_fd is not None:
os.close(tmp_fd)
try:
os.unlink(tmp_path)
except OSError:
pass
def _best_local() -> str:
"""Return cached DB if it exists and is valid, otherwise bundled."""
if path.isfile(CACHED_DB_PATH):
try:
with open(CACHED_DB_PATH, "r", encoding="utf-8") as f:
data = json.load(f)
if "sites" in data:
return CACHED_DB_PATH
except (json.JSONDecodeError, OSError):
pass
return BUNDLED_DB_PATH
def _now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def resolve_db_path(
db_file_arg: str,
no_autoupdate: bool = False,
meta_url: str = DEFAULT_META_URL,
check_interval_hours: int = DEFAULT_CHECK_INTERVAL_HOURS,
color: bool = True,
) -> str:
"""
Determine which database file to use, potentially downloading an update.
Returns the path to the database file that should be loaded.
"""
global _use_color
_use_color = color
default_db_name = "resources/data.json"
# User specified a custom DB — skip auto-update
is_url = db_file_arg.startswith("http://") or db_file_arg.startswith("https://")
is_default = db_file_arg == default_db_name
if is_url:
return db_file_arg
if not is_default:
# Try the path as-is (absolute or relative to cwd) first.
if path.isfile(db_file_arg):
return path.abspath(db_file_arg)
# Fall back to legacy behavior: resolve relative to the maigret module dir.
module_relative = path.join(path.dirname(path.realpath(__file__)), db_file_arg)
if module_relative != db_file_arg and path.isfile(module_relative):
return module_relative
if module_relative != db_file_arg:
raise FileNotFoundError(
f"Custom database file not found: {db_file_arg!r} "
f"(also tried {module_relative!r})"
)
raise FileNotFoundError(f"Custom database file not found: {db_file_arg!r}")
# Auto-update disabled
if no_autoupdate:
return _best_local()
# Check interval
_ensure_maigret_home()
state = _load_state()
if not _needs_check(state, check_interval_hours):
return _best_local()
# Time to check
_print_info("DB auto-update: checking for updates...")
meta = _fetch_meta(meta_url)
if meta is None:
_print_warning("DB auto-update: could not reach update server, using local database")
state["last_check_at"] = _now_iso()
_save_state(state)
return _best_local()
# Version compatibility
if not _is_version_compatible(meta):
min_ver = meta.get("min_maigret_version", "?")
_print_warning(
f"DB auto-update: latest database requires maigret >= {min_ver}, "
f"you have {__version__}. Please upgrade with: pip install -U maigret"
)
state["last_check_at"] = _now_iso()
_save_state(state)
return _best_local()
# Check if update available
if not _is_update_available(meta, state):
sites_count = meta.get("sites_count", "?")
_print_info(f"DB auto-update: database is up to date ({sites_count} sites)")
state["last_check_at"] = _now_iso()
state["last_meta"] = meta
_save_state(state)
return _best_local()
# Download update
new_count = meta.get("sites_count", "?")
old_count = state.get("last_meta", {}).get("sites_count")
if old_count:
_print_info(f"DB auto-update: downloading updated database ({new_count} sites, was {old_count})...")
else:
_print_info(f"DB auto-update: downloading database ({new_count} sites)...")
data_url = meta.get("data_url", "")
expected_sha = meta.get("data_sha256", "")
result = _download_and_verify(data_url, expected_sha)
if result is None:
_print_warning("DB auto-update: download failed, using local database")
state["last_check_at"] = _now_iso()
_save_state(state)
return _best_local()
_print_success(f"DB auto-update: database updated successfully ({new_count} sites)")
state["last_check_at"] = _now_iso()
state["last_meta"] = meta
state["cached_db_sha256"] = expected_sha
_save_state(state)
return CACHED_DB_PATH
def force_update(
meta_url: str = DEFAULT_META_URL,
color: bool = True,
) -> bool:
"""
Force check for database updates and download if available.
Returns True if database was updated, False otherwise.
"""
global _use_color
_use_color = color
_ensure_maigret_home()
_print_info("DB update: checking for updates...")
meta = _fetch_meta(meta_url)
if meta is None:
_print_warning("DB update: could not reach update server")
return False
if not _is_version_compatible(meta):
min_ver = meta.get("min_maigret_version", "?")
_print_warning(
f"DB update: latest database requires maigret >= {min_ver}, "
f"you have {__version__}. Please upgrade with: pip install -U maigret"
)
return False
state = _load_state()
new_count = meta.get("sites_count", "?")
old_count = state.get("last_meta", {}).get("sites_count")
if not _is_update_available(meta, state):
_print_info(f"DB update: database is already up to date ({new_count} sites)")
state["last_check_at"] = _now_iso()
state["last_meta"] = meta
_save_state(state)
return False
if old_count:
_print_info(f"DB update: downloading updated database ({new_count} sites, was {old_count})...")
else:
_print_info(f"DB update: downloading database ({new_count} sites)...")
data_url = meta.get("data_url", "")
expected_sha = meta.get("data_sha256", "")
result = _download_and_verify(data_url, expected_sha)
if result is None:
_print_warning("DB update: download failed")
return False
_print_success(f"DB update: database updated successfully ({new_count} sites)")
state["last_check_at"] = _now_iso()
state["last_meta"] = meta
state["cached_db_sha256"] = expected_sha
_save_state(state)
return True
-2
View File
@@ -58,8 +58,6 @@ COMMON_ERRORS = {
'Censorship', 'MGTS'
),
'Incapsula incident ID': CheckError('Bot protection', 'Incapsula'),
'<title>Client Challenge</title>': CheckError('Bot protection', 'Anti-bot challenge'),
'<title>DDoS-Guard</title>': CheckError('Bot protection', 'DDoS-Guard'),
'Сайт заблокирован хостинг-провайдером': CheckError(
'Site-specific', 'Site is disabled (Beget)'
),
+4 -4
View File
@@ -103,7 +103,7 @@ class AsyncioProgressbarQueueExecutor(AsyncExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.workers_count = kwargs.get('in_parallel', 10)
self.queue: asyncio.Queue = asyncio.Queue(self.workers_count)
self.queue = asyncio.Queue(self.workers_count)
self.timeout = kwargs.get('timeout')
# Pass a progress function; alive_bar by default
self.progress_func = kwargs.get('progress_func', alive_bar)
@@ -184,10 +184,10 @@ class AsyncioQueueGeneratorExecutor:
# Deprecated: will be removed soon, don't use it
def __init__(self, *args, **kwargs):
self.workers_count = kwargs.get('in_parallel', 10)
self.queue: asyncio.Queue = asyncio.Queue()
self.queue = asyncio.Queue()
self.timeout = kwargs.get('timeout')
self.logger = kwargs['logger']
self._results: asyncio.Queue = asyncio.Queue()
self._results = asyncio.Queue()
self._stop_signal = object()
async def worker(self):
@@ -209,7 +209,7 @@ class AsyncioQueueGeneratorExecutor:
result = kwargs.get('default')
await self._results.put(result)
except Exception as e:
self.logger.error(f"Error in worker: {e}", exc_info=True)
self.logger.error(f"Error in worker: {e}")
finally:
self.queue.task_done()
+10 -77
View File
@@ -13,7 +13,7 @@ from argparse import ArgumentParser, RawDescriptionHelpFormatter
from typing import List, Tuple
import os.path as path
from socid_extractor import extract, parse # type: ignore[import-not-found]
from socid_extractor import extract, parse
from .__version__ import __version__
from .checking import (
@@ -37,7 +37,6 @@ from .report import (
get_plaintext_report,
sort_report_by_data_points,
save_graph_report,
save_markdown_report,
)
from .sites import MaigretDatabase
from .submit import Submitter
@@ -76,7 +75,7 @@ def extract_ids_from_page(url, logger, timeout=5) -> dict:
elif 'usernames' in k:
try:
tree = ast.literal_eval(v)
if isinstance(tree, list):
if type(tree) == list:
for n in tree:
results[n] = 'username'
except Exception as e:
@@ -202,20 +201,6 @@ def setup_arguments_parser(settings: Settings):
default=settings.sites_db_path,
help="Load Maigret database from a JSON file or HTTP web resource.",
)
parser.add_argument(
"--no-autoupdate",
action="store_true",
dest="no_autoupdate",
default=settings.no_autoupdate,
help="Disable automatic database updates on startup.",
)
parser.add_argument(
"--force-update",
action="store_true",
dest="force_update",
default=False,
help="Force check for database updates and download if available.",
)
parser.add_argument(
"--cookies-jar-file",
metavar="COOKIE_FILE",
@@ -466,14 +451,6 @@ def setup_arguments_parser(settings: Settings):
default=settings.pdf_report,
help="Generate a PDF report (general report on all usernames).",
)
report_group.add_argument(
"-M",
"--md",
action="store_true",
dest="md",
default=settings.md_report,
help="Generate a Markdown report (general report on all usernames).",
)
report_group.add_argument(
"-G",
"--graph",
@@ -566,25 +543,9 @@ async def main():
else:
args.exclude_tags = []
from .db_updater import resolve_db_path, force_update, BUNDLED_DB_PATH
if args.force_update:
force_update(
meta_url=settings.db_update_meta_url,
color=not args.no_color,
)
try:
db_file = resolve_db_path(
db_file_arg=args.db_file,
no_autoupdate=args.no_autoupdate or args.force_update,
meta_url=settings.db_update_meta_url,
check_interval_hours=settings.autoupdate_check_interval_hours,
color=not args.no_color,
)
except FileNotFoundError as e:
logger.error(str(e))
sys.exit(2)
db_file = args.db_file \
if (args.db_file.startswith("http://") or args.db_file.startswith("https://")) \
else path.join(path.dirname(path.realpath(__file__)), args.db_file)
if args.top_sites == 0 or args.all_sites:
args.top_sites = sys.maxsize
@@ -599,21 +560,7 @@ async def main():
)
# Create object with all information about sites we are aware of.
try:
db = MaigretDatabase().load_from_path(db_file)
query_notify.success(f'Using sites database: {db_file} ({len(db.sites)} sites)')
except Exception as e:
logger.warning(f"Failed to load database from {db_file}: {e}")
if db_file != BUNDLED_DB_PATH:
query_notify.warning(
f'Falling back to bundled database: {BUNDLED_DB_PATH}'
)
db = MaigretDatabase().load_from_path(BUNDLED_DB_PATH)
query_notify.success(
f'Using sites database: {BUNDLED_DB_PATH} ({len(db.sites)} sites)'
)
else:
raise
get_top_sites_for_id = lambda x: db.ranked_sites_dict(
top=args.top_sites,
tags=args.tags,
@@ -653,10 +600,13 @@ async def main():
i2p_proxy=args.i2p_proxy,
auto_disable=args.auto_disable,
diagnose=args.diagnose,
no_progressbar=args.no_progressbar,
)
# Handle both old (bool) and new (dict) return types
if isinstance(check_result, dict):
is_need_update = check_result.get('needs_update', False)
else:
is_need_update = check_result
if is_need_update:
if input('Do you want to save changes permanently? [Yn]\n').lower() in (
@@ -822,7 +772,7 @@ async def main():
# reporting for all the result
if general_results:
if args.html or args.pdf or args.md:
if args.html or args.pdf:
query_notify.warning('Generating report info...')
report_context = generate_report_context(general_results)
# determine main username
@@ -842,23 +792,6 @@ async def main():
save_pdf_report(filename, report_context)
query_notify.warning(f'PDF report on all usernames saved in {filename}')
if args.md:
username = username.replace('/', '_')
filename = report_filepath_tpl.format(username=username, postfix='.md')
run_flags = []
if args.tags:
run_flags.append(f"--tags {args.tags}")
if args.site_list:
run_flags.append(f"--site {','.join(args.site_list)}")
if args.all_sites:
run_flags.append("--all-sites")
run_info = {
"sites_count": sum(len(d) for _, _, d in general_results),
"flags": " ".join(run_flags) if run_flags else None,
}
save_markdown_report(filename, report_context, run_info=run_info)
query_notify.warning(f'Markdown report on all usernames saved in {filename}')
if args.graph:
username = username.replace('/', '_')
filename = report_filepath_tpl.format(
+1 -1
View File
@@ -174,7 +174,7 @@ class QueryNotifyPrint(QueryNotify):
else:
return self.make_simple_terminal_notify(*args)
def start(self, message=None, id_type="username"):
def start(self, message, id_type):
"""Notify Start.
Will print the title to the standard output.
+12 -151
View File
@@ -7,7 +7,7 @@ import os
from datetime import datetime
from typing import Dict, Any
import xmind # type: ignore[import-untyped]
import xmind
from dateutil.tz import gettz
from dateutil.parser import parse as parse_datetime_str
from jinja2 import Template
@@ -79,7 +79,7 @@ def save_pdf_report(filename: str, context: dict):
filled_template = template.render(**context)
# moved here to speed up the launch of Maigret
from xhtml2pdf import pisa # type: ignore[import-untyped]
from xhtml2pdf import pisa
with open(filename, "w+b") as f:
pisa.pisaDocument(io.StringIO(filled_template), dest=f, default_css=css)
@@ -91,9 +91,9 @@ def save_json_report(filename: str, username: str, results: dict, report_type: s
class MaigretGraph:
other_params: dict = {'size': 10, 'group': 3}
site_params: dict = {'size': 15, 'group': 2}
username_params: dict = {'size': 20, 'group': 1}
other_params = {'size': 10, 'group': 3}
site_params = {'size': 15, 'group': 2}
username_params = {'size': 20, 'group': 1}
def __init__(self, graph):
self.G = graph
@@ -121,12 +121,12 @@ class MaigretGraph:
def save_graph_report(filename: str, username_results: list, db: MaigretDatabase):
import networkx as nx
G: Any = nx.Graph()
G = nx.Graph()
graph = MaigretGraph(G)
base_site_nodes = {}
site_account_nodes = {}
processed_values: Dict[str, Any] = {} # Track processed values to avoid duplicates
processed_values = {} # Track processed values to avoid duplicates
for username, id_type, results in username_results:
# Add username node, using normalized version directly if different
@@ -239,7 +239,7 @@ def save_graph_report(filename: str, username_results: list, db: MaigretDatabase
G.remove_nodes_from(single_degree_sites)
# Generate interactive visualization
from pyvis.network import Network # type: ignore[import-untyped]
from pyvis.network import Network
nt = Network(notebook=True, height="750px", width="100%")
nt.from_nx(G)
@@ -257,144 +257,6 @@ def get_plaintext_report(context: dict) -> str:
return output.strip()
def _md_format_value(value) -> str:
"""Format a value for Markdown output, detecting links."""
if isinstance(value, list):
return ", ".join(str(v) for v in value)
s = str(value)
if s.startswith("http://") or s.startswith("https://"):
return f"[{s}]({s})"
return s
def save_markdown_report(filename: str, context: dict, run_info: dict = None):
username = context.get("username", "unknown")
generated_at = context.get("generated_at", "")
brief = context.get("brief", "")
countries = context.get("countries_tuple_list", [])
interests = context.get("interests_tuple_list", [])
first_seen = context.get("first_seen")
results = context.get("results", [])
# Collect ALL values for key fields across all accounts
all_fields: Dict[str, list] = {}
last_seen = None
for _, _, data in results:
for _, v in data.items():
if not v.get("found") or v.get("is_similar"):
continue
ids_data = v.get("ids_data", {})
# Map multiple source fields to unified output fields
field_sources = {
"fullname": ("fullname", "name"),
"location": ("location", "country", "city", "country_code", "locale", "region"),
"gender": ("gender",),
"bio": ("bio", "about", "description"),
}
for out_field, source_keys in field_sources.items():
for src in source_keys:
val = ids_data.get(src)
if val:
all_fields.setdefault(out_field, [])
val_str = str(val)
if val_str not in all_fields[out_field]:
all_fields[out_field].append(val_str)
# Track last_seen
for ts_field in ("last_online", "latest_activity_at", "updated_at"):
ts = ids_data.get(ts_field)
if ts and (last_seen is None or str(ts) > str(last_seen)):
last_seen = ts
lines = []
lines.append(f"# Report by searching on username \"{username}\"\n")
# Generated line with run info
gen_line = f"Generated at {generated_at} by [Maigret](https://github.com/soxoj/maigret)"
if run_info:
parts = []
if run_info.get("sites_count"):
parts.append(f"{run_info['sites_count']} sites checked")
if run_info.get("flags"):
parts.append(f"flags: `{run_info['flags']}`")
if parts:
gen_line += f" ({', '.join(parts)})"
lines.append(f"{gen_line}\n")
# Summary
lines.append("## Summary\n")
lines.append(f"{brief}\n")
if all_fields:
lines.append("**Information extracted from accounts:**\n")
for field, values in all_fields.items():
title = CaseConverter.snake_to_title(field)
lines.append(f"- {title}: {'; '.join(values)}")
lines.append("")
if countries:
geo = ", ".join(f"{code} (x{count})" for code, count in countries)
lines.append(f"**Country tags:** {geo}\n")
if interests:
tags = ", ".join(f"{tag} (x{count})" for tag, count in interests)
lines.append(f"**Website tags:** {tags}\n")
if first_seen:
lines.append(f"**First seen:** {first_seen}")
if last_seen:
lines.append(f"**Last seen:** {last_seen}")
if first_seen or last_seen:
lines.append("")
# Accounts found
lines.append("## Accounts found\n")
for u, id_type, data in results:
for site_name, v in data.items():
if not v.get("found") or v.get("is_similar"):
continue
lines.append(f"### {site_name}\n")
lines.append(f"- **URL:** [{v.get('url_user', '')}]({v.get('url_user', '')})")
tags = v.get("status") and v["status"].tags or []
if tags:
lines.append(f"- **Tags:** {', '.join(tags)}")
lines.append("")
ids_data = v.get("ids_data", {})
if ids_data:
for field, value in ids_data.items():
if field == "image":
continue
title = CaseConverter.snake_to_title(field)
lines.append(f"- {title}: {_md_format_value(value)}")
lines.append("")
# Possible false positives
lines.append("## Possible false positives\n")
lines.append(
f"This report was generated by searching for accounts matching the username `{username}`. "
f"Accounts listed above may belong to different people who happen to use the same "
f"or similar username. Results without extracted personal information could contain "
f"some false positive findings. Always verify findings before drawing conclusions.\n"
)
# Ethical use
lines.append("## Ethical use\n")
lines.append(
"This report is a result of a technical collection of publicly available information "
"from online accounts and does not constitute personal data processing. If you intend "
"to use this data for personal data processing or collection purposes, ensure your use "
"complies with applicable laws and regulations in your jurisdiction (such as GDPR, "
"CCPA, and similar).\n"
)
with open(filename, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
"""
REPORTS GENERATING
"""
@@ -491,12 +353,11 @@ def generate_report_context(username_results: list):
if k in ["country", "locale"]:
try:
if is_country_tag(k):
country = pycountry.countries.get(alpha_2=v)
tag = country.alpha_2.lower() # type: ignore[union-attr]
tag = pycountry.countries.get(alpha_2=v).alpha_2.lower()
else:
tag = pycountry.countries.search_fuzzy(v)[
0
].alpha_2.lower() # type: ignore[attr-defined]
].alpha_2.lower()
# TODO: move countries to another struct
tags[tag] = tags.get(tag, 0) + 1
except Exception as e:
@@ -652,8 +513,8 @@ def add_xmind_subtopic(userlink, k, v, supposed_data):
def design_xmind_sheet(sheet, username, results):
alltags: Dict[str, Any] = {}
supposed_data: Dict[str, Any] = {}
alltags = {}
supposed_data = {}
sheet.setTitle("%s Analysis" % (username))
root_topic1 = sheet.getRootTopic()
+3219 -2303
View File
File diff suppressed because it is too large Load Diff
-8
View File
@@ -1,8 +0,0 @@
{
"version": 1,
"updated_at": "2026-04-10T10:28:14Z",
"sites_count": 3150,
"min_maigret_version": "0.6.0",
"data_sha256": "72a493fef4eb8958fe8ed0c9b895841ec10c335f1b8e5e9b24b50784be6ad017",
"data_url": "https://raw.githubusercontent.com/soxoj/maigret/main/maigret/resources/data.json"
}
+1 -5
View File
@@ -54,9 +54,5 @@
"graph_report": false,
"pdf_report": false,
"html_report": false,
"md_report": false,
"web_interface_port": 5000,
"no_autoupdate": false,
"db_update_meta_url": "https://raw.githubusercontent.com/soxoj/maigret/main/maigret/resources/db_meta.json",
"autoupdate_check_interval_hours": 24
"web_interface_port": 5000
}
-4
View File
@@ -42,11 +42,7 @@ class Settings:
pdf_report: bool
html_report: bool
graph_report: bool
md_report: bool
web_interface_port: int
no_autoupdate: bool
db_update_meta_url: str
autoupdate_check_interval_hours: int
# submit mode settings
presence_strings: list
+7 -34
View File
@@ -92,12 +92,10 @@ class MaigretSite:
# Alexa traffic rank
alexa_rank = None
# Source (in case a site is a mirror of another site)
source: Optional[str] = None
source = None
# URL protocol (http/https)
protocol = ''
# Protection types detected on this site (e.g. ["tls_fingerprint", "ddos_guard"])
protection: List[str] = []
def __init__(self, name, information):
self.name = name
@@ -175,7 +173,7 @@ class MaigretSite:
self.__dict__[CaseConverter.camel_to_snake(group)],
)
self.url_regexp = URLMatcher.make_profile_url_regexp(url, self.regex_check or "")
self.url_regexp = URLMatcher.make_profile_url_regexp(url, self.regex_check)
def detect_username(self, url: str) -> Optional[str]:
if self.url_regexp:
@@ -464,9 +462,9 @@ class MaigretDatabase:
"tags": self._tags,
}
json_data = json.dumps(db_data, indent=4, ensure_ascii=False)
json_data = json.dumps(db_data, indent=4)
with open(filename, "w", encoding="utf-8") as f:
with open(filename, "w") as f:
f.write(json_data)
return self
@@ -566,7 +564,7 @@ class MaigretDatabase:
def get_scan_stats(self, sites_dict):
sites = sites_dict or self.sites_dict
found_flags: Dict[str, int] = {}
found_flags = {}
for _, s in sites.items():
if "presense_flag" in s.stats:
flag = s.stats["presense_flag"]
@@ -587,10 +585,8 @@ class MaigretDatabase:
def get_db_stats(self, is_markdown=False):
# Initialize counters
sites_dict = self.sites_dict
urls: Dict[str, int] = {}
tags: Dict[str, int] = {}
engine_total: Dict[str, int] = {}
engine_enabled: Dict[str, int] = {}
urls = {}
tags = {}
disabled_count = 0
message_checks_one_factor = 0
status_checks = 0
@@ -613,14 +609,6 @@ class MaigretDatabase:
elif site.check_type == 'status_code':
status_checks += 1
# Count engines
if site.engine:
engine_total[site.engine] = engine_total.get(site.engine, 0) + 1
if not site.disabled:
engine_enabled[site.engine] = (
engine_enabled.get(site.engine, 0) + 1
)
# Count tags
if not site.tags:
tags["NO_TAGS"] = tags.get("NO_TAGS", 0) + 1
@@ -657,26 +645,11 @@ class MaigretDatabase:
f"Sites with probing: {', '.join(sorted(site_with_probing))}",
f"Sites with activation: {', '.join(sorted(site_with_activation))}",
self._format_top_items("profile URLs", urls, 20, is_markdown),
self._format_engine_stats(engine_total, engine_enabled, is_markdown),
self._format_top_items("tags", tags, 20, is_markdown, self._tags),
]
return separator.join(output)
def _format_engine_stats(self, engine_total, engine_enabled, is_markdown):
"""Format per-engine enabled/total counts, sorted by total descending."""
output = "Sites by engine:\n"
for engine, total in sorted(
engine_total.items(), key=lambda x: x[1], reverse=True
):
enabled = engine_enabled.get(engine, 0)
perc = round(100 * enabled / total, 1) if total else 0.0
if is_markdown:
output += f"- `{engine}`: {enabled}/{total} ({perc}%)\n"
else:
output += f"{enabled}/{total} ({perc}%)\t{engine}\n"
return output
def _format_top_items(
self, title, items_dict, limit, is_markdown, valid_items=None
):
+24 -27
View File
@@ -6,7 +6,8 @@ import logging
from typing import Any, Dict, List, Optional, Tuple
from aiohttp import ClientSession, TCPConnector
import cloudscraper # type: ignore[import-untyped]
from aiohttp_socks import ProxyConnector
import cloudscraper
from colorama import Fore, Style
from .activation import import_aiohttp_cookies
@@ -67,10 +68,8 @@ class Submitter:
else:
cookie_jar = import_aiohttp_cookies(args.cookie_file)
ssl_context = __import__('ssl').create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = __import__('ssl').CERT_NONE
connector = ProxyConnector.from_url(proxy) if proxy else TCPConnector(ssl=ssl_context)
connector = ProxyConnector.from_url(proxy) if proxy else TCPConnector(ssl=False)
connector.verify_ssl = False
self.session = ClientSession(
connector=connector, trust_env=True, cookie_jar=cookie_jar
)
@@ -89,9 +88,7 @@ class Submitter:
alexa_rank = 0
try:
reach_elem = root.find('.//REACH')
if reach_elem is not None:
alexa_rank = int(reach_elem.attrib['RANK'])
alexa_rank = int(root.find('.//REACH').attrib['RANK'])
except Exception:
pass
@@ -130,7 +127,7 @@ class Submitter:
async def detect_known_engine(
self, url_exists, url_mainpage, session, follow_redirects, headers
) -> Tuple[List[MaigretSite], str]:
) -> [List[MaigretSite], str]:
session = session or self.session
resp_text, _ = await self.get_html_response_to_compare(
@@ -194,9 +191,8 @@ class Submitter:
# TODO: replace with checking.py/SimpleAiohttpChecker call
@staticmethod
async def get_html_response_to_compare(
url: str, session: Optional[ClientSession] = None, redirects=False, headers: Optional[Dict] = None
url: str, session: ClientSession = None, redirects=False, headers: Dict = None
):
assert session is not None, "session must not be None"
async with session.get(
url, allow_redirects=redirects, headers=headers
) as response:
@@ -215,10 +211,10 @@ class Submitter:
username: str,
url_exists: str,
cookie_filename="", # TODO: use cookies
session: Optional[ClientSession] = None,
session: ClientSession = None,
follow_redirects=False,
headers: Optional[dict] = None,
) -> Tuple[Optional[List[str]], Optional[List[str]], str, str]:
headers: dict = None,
) -> Tuple[List[str], List[str], str, str]:
random_username = generate_random_username()
url_of_non_existing_account = url_exists.lower().replace(
@@ -273,8 +269,11 @@ class Submitter:
tokens_a = set(re.split(f'[{self.SEPARATORS}]', first_html_response))
tokens_b = set(re.split(f'[{self.SEPARATORS}]', second_html_response))
a_minus_b: List[str] = [x.strip('\\') for x in tokens_a.difference(tokens_b)]
b_minus_a: List[str] = [x.strip('\\') for x in tokens_b.difference(tokens_a)]
a_minus_b = tokens_a.difference(tokens_b)
b_minus_a = tokens_b.difference(tokens_a)
a_minus_b = list(map(lambda x: x.strip('\\'), a_minus_b))
b_minus_a = list(map(lambda x: x.strip('\\'), b_minus_a))
# Filter out strings containing usernames
a_minus_b = [s for s in a_minus_b if username.lower() not in s.lower()]
@@ -379,7 +378,7 @@ class Submitter:
).strip()
if field in ['tags', 'presense_strs', 'absence_strs']:
new_value = list(map(str.strip, new_value.split(','))) # type: ignore[assignment]
new_value = list(map(str.strip, new_value.split(',')))
if new_value:
setattr(site, field, new_value)
@@ -425,12 +424,12 @@ class Submitter:
f"{Fore.YELLOW}[!] Sites with domain \"{domain_raw}\" already exists in the Maigret database!{Style.RESET_ALL}"
)
site_status = lambda s: "(disabled)" if s.disabled else ""
status = lambda s: "(disabled)" if s.disabled else ""
url_block = lambda s: f"\n\t{s.url_main}\n\t{s.url}"
print(
"\n".join(
[
f"{site.name} {site_status(site)}{url_block(site)}"
f"{site.name} {status(site)}{url_block(site)}"
for site in matched_sites
]
)
@@ -498,7 +497,7 @@ class Submitter:
)
print('Detecting site engine, please wait...')
sites: List[MaigretSite] = []
sites = []
text = None
try:
sites, text = await self.detect_known_engine(
@@ -511,7 +510,7 @@ class Submitter:
except KeyboardInterrupt:
print('Engine detect process is interrupted.')
if text and 'cloudflare' in text.lower():
if 'cloudflare' in text.lower():
print(
'Cloudflare protection detected. I will use cloudscraper for further work'
)
@@ -574,8 +573,6 @@ class Submitter:
found = True
break
assert chosen_site is not None, "No sites to check"
if not found:
print(
f"{Fore.RED}[!] The check for site '{chosen_site.name}' failed!{Style.RESET_ALL}"
@@ -634,8 +631,8 @@ class Submitter:
# chosen_site.alexa_rank = rank
self.logger.info(chosen_site.json)
stripped_site = chosen_site.strip_engine_data()
self.logger.info(stripped_site.json)
site_data = chosen_site.strip_engine_data()
self.logger.info(site_data.json)
if old_site:
# Update old site with new values and log changes
@@ -654,7 +651,7 @@ class Submitter:
for field, display_name in fields_to_check.items():
old_value = getattr(old_site, field)
new_value = getattr(stripped_site, field)
new_value = getattr(site_data, field)
if field == 'tags' and not new_tags:
continue
if str(old_value) != str(new_value):
@@ -664,7 +661,7 @@ class Submitter:
old_site.__dict__[field] = new_value
# update the site
final_site = old_site if old_site else stripped_site
final_site = old_site if old_site else site_data
self.db.update_site(final_site)
# save the db in file
+2 -5
View File
@@ -8,7 +8,7 @@ from typing import Any
DEFAULT_USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36",
]
@@ -71,10 +71,7 @@ class URLMatcher:
def ascii_data_display(data: str) -> Any:
try:
return ast.literal_eval(data)
except (ValueError, SyntaxError):
return data
def get_dict_ascii_tree(items, prepend="", new_line=True):
@@ -89,7 +86,7 @@ def get_dict_ascii_tree(items, prepend="", new_line=True):
new_result + new_line if num != len(items) - 1 else last_result + new_line
)
if isinstance(item, tuple):
if type(item) == tuple:
field_name, field_value = item
if field_value.startswith("['"):
is_last_item = num == len(items) - 1
+2 -3
View File
@@ -13,7 +13,6 @@ import os
import asyncio
from datetime import datetime
from threading import Thread
from typing import Any, Dict
import maigret
import maigret.settings
from maigret.sites import MaigretDatabase
@@ -24,7 +23,7 @@ app = Flask(__name__)
app.secret_key = os.getenv('FLASK_SECRET_KEY', os.urandom(24).hex())
# add background job tracking
background_jobs: Dict[str, Any] = {}
background_jobs = {}
job_results = {}
# Configuration
@@ -261,7 +260,7 @@ def search():
target=process_search_task, args=(usernames, options, timestamp)
),
}
background_jobs[timestamp]['thread'].start() # type: ignore[union-attr]
background_jobs[timestamp]['thread'].start()
return redirect(url_for('status', timestamp=timestamp))
Generated
+766 -902
View File
File diff suppressed because it is too large Load Diff
+2 -2
View File
@@ -1,5 +1,5 @@
maigret @ https://github.com/soxoj/maigret/archive/refs/heads/main.zip
pefile==2023.2.7 # do not bump while pyinstaller is 6.11.1, there is a conflict
psutil==7.2.2
pyinstaller==6.19.0
psutil==7.1.3
pyinstaller==6.16.0
pywin32-ctypes==0.2.3
+5 -6
View File
@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "maigret"
version = "0.6.0"
version = "0.5.0"
description = "🕵️‍♂️ Collect a dossier on a person by username from thousands of sites."
authors = ["Soxoj <soxoj@protonmail.com>"]
readme = "README.md"
@@ -38,18 +38,18 @@ arabic-reshaper = "^3.0.0"
async-timeout = "^5.0.1"
attrs = ">=25.3,<27.0"
certifi = ">=2025.6.15,<2027.0.0"
chardet = ">=5,<8"
chardet = "^5.0.0"
colorama = "^0.4.6"
future = "^1.0.0"
future-annotations= "^1.0.0"
html5lib = "^1.1"
idna = "^3.4"
Jinja2 = "^3.1.6"
lxml = ">=6.0.2,<7.0"
lxml = ">=5.3,<7.0"
MarkupSafe = "^3.0.2"
mock = "^5.1.0"
multidict = "^6.6.3"
pycountry = ">=24.6.1,<27.0.0"
pycountry = "^24.6.1"
PyPDF2 = "^3.0.1"
PySocks = "^1.7.1"
python-bidi = "^0.6.3"
@@ -57,7 +57,7 @@ requests = "^2.32.4"
requests-futures = "^1.0.2"
requests-toolbelt = "^1.0.0"
six = "^1.17.0"
socid-extractor = ">=0.0.27,<0.0.29"
socid-extractor = "^0.0.27"
soupsieve = "^2.6"
stem = "^1.8.1"
torrequest = "^0.1.0"
@@ -74,7 +74,6 @@ cloudscraper = "^1.2.71"
flask = {extras = ["async"], version = "^3.1.1"}
asgiref = "^3.9.1"
platformdirs = "^4.3.8"
curl-cffi = ">=0.14,<1.0"
[tool.poetry.group.dev.dependencies]
+3
View File
@@ -0,0 +1,3 @@
[mutmut]
paths_to_mutate=maigret/
tests_dir=tests/
+1006 -919
View File
File diff suppressed because it is too large Load Diff
-3
View File
@@ -48,9 +48,6 @@ DEFAULT_ARGS: Dict[str, Any] = {
'web': None,
'with_domains': False,
'xmind': False,
'md': False,
'no_autoupdate': False,
'force_update': False,
}
-83
View File
@@ -4,30 +4,6 @@ import pytest
from maigret.utils import is_country_tag
TOP_SITES_ALEXA_RANK_LIMIT = 50
KNOWN_SOCIAL_DOMAINS = [
"facebook.com",
"instagram.com",
"twitter.com",
"tiktok.com",
"vk.com",
"reddit.com",
"pinterest.com",
"snapchat.com",
"linkedin.com",
"tumblr.com",
"threads.net",
"bsky.app",
"myspace.com",
"weibo.com",
"mastodon.social",
"gab.com",
"minds.com",
"clubhouse.com",
]
@pytest.mark.slow
def test_tags_validity(default_db):
unknown_tags = set()
@@ -43,62 +19,3 @@ def test_tags_validity(default_db):
# if you see "unchecked" tag error, please, do
# maigret --db `pwd`/maigret/resources/data.json --self-check --tag unchecked --use-disabled-sites
assert unknown_tags == set()
@pytest.mark.slow
def test_top_sites_have_category_tag(default_db):
"""Top sites by alexaRank must have at least one category tag (not just country codes)."""
sites_ranked = sorted(
[s for s in default_db.sites if s.alexa_rank],
key=lambda s: s.alexa_rank,
)[:TOP_SITES_ALEXA_RANK_LIMIT]
missing_category = []
for site in sites_ranked:
category_tags = [t for t in site.tags if not is_country_tag(t)]
if not category_tags:
missing_category.append(f"{site.name} (rank {site.alexa_rank})")
assert missing_category == [], (
f"{len(missing_category)} top-{TOP_SITES_ALEXA_RANK_LIMIT} sites have no category tag: "
+ ", ".join(missing_category[:20])
)
@pytest.mark.slow
def test_no_unused_tags_in_registry(default_db):
"""Every tag in the registry should be used by at least one site."""
all_used_tags = set()
for site in default_db.sites:
for tag in site.tags:
if not is_country_tag(tag):
all_used_tags.add(tag)
registered_tags = set(default_db._tags)
unused = registered_tags - all_used_tags
assert unused == set(), f"Tags registered but not used by any site: {unused}"
@pytest.mark.slow
def test_social_networks_have_social_tag(default_db):
"""Known social network domains must have the 'social' tag."""
from urllib.parse import urlparse
missing_social = []
for site in default_db.sites:
url = site.url_main or ""
try:
hostname = urlparse(url).hostname or ""
except Exception:
continue
for domain in KNOWN_SOCIAL_DOMAINS:
if hostname == domain or hostname.endswith("." + domain):
if "social" not in site.tags:
missing_social.append(f"{site.name} ({domain})")
break
assert missing_social == [], (
f"{len(missing_social)} known social networks missing 'social' tag: "
+ ", ".join(missing_social)
)
-236
View File
@@ -1,236 +0,0 @@
"""Tests for the database auto-update system."""
import json
import os
import hashlib
from datetime import datetime, timezone, timedelta
from unittest.mock import patch, MagicMock
import pytest
from maigret.db_updater import (
_parse_version,
_needs_check,
_is_version_compatible,
_is_update_available,
_load_state,
_save_state,
_best_local,
_now_iso,
resolve_db_path,
force_update,
CACHED_DB_PATH,
BUNDLED_DB_PATH,
STATE_PATH,
MAIGRET_HOME,
)
def test_parse_version():
assert _parse_version("0.5.0") == (0, 5, 0)
assert _parse_version("1.2.3") == (1, 2, 3)
assert _parse_version("bad") == (0, 0, 0)
assert _parse_version("") == (0, 0, 0)
def test_needs_check_no_state():
assert _needs_check({}, 24) is True
def test_needs_check_recent():
state = {"last_check_at": _now_iso()}
assert _needs_check(state, 24) is False
def test_needs_check_expired():
old_time = (datetime.now(timezone.utc) - timedelta(hours=25)).strftime("%Y-%m-%dT%H:%M:%SZ")
state = {"last_check_at": old_time}
assert _needs_check(state, 24) is True
def test_needs_check_corrupt():
state = {"last_check_at": "not-a-date"}
assert _needs_check(state, 24) is True
def test_version_compatible():
with patch("maigret.db_updater.__version__", "0.5.0"):
assert _is_version_compatible({"min_maigret_version": "0.5.0"}) is True
assert _is_version_compatible({"min_maigret_version": "0.4.0"}) is True
assert _is_version_compatible({"min_maigret_version": "0.6.0"}) is False
assert _is_version_compatible({}) is True # missing field = compatible
def test_update_available_no_cache(tmp_path):
with patch("maigret.db_updater.CACHED_DB_PATH", str(tmp_path / "nonexistent.json")):
assert _is_update_available({"updated_at": "2026-01-01T00:00:00Z"}, {}) is True
def test_update_available_newer(tmp_path):
cache = tmp_path / "data.json"
cache.write_text("{}")
with patch("maigret.db_updater.CACHED_DB_PATH", str(cache)):
state = {"last_meta": {"updated_at": "2026-01-01T00:00:00Z"}}
meta = {"updated_at": "2026-02-01T00:00:00Z"}
assert _is_update_available(meta, state) is True
def test_update_available_same(tmp_path):
cache = tmp_path / "data.json"
cache.write_text("{}")
with patch("maigret.db_updater.CACHED_DB_PATH", str(cache)):
state = {"last_meta": {"updated_at": "2026-01-01T00:00:00Z"}}
meta = {"updated_at": "2026-01-01T00:00:00Z"}
assert _is_update_available(meta, state) is False
def test_load_state_missing(tmp_path):
with patch("maigret.db_updater.STATE_PATH", str(tmp_path / "missing.json")):
assert _load_state() == {}
def test_load_state_corrupt(tmp_path):
corrupt = tmp_path / "state.json"
corrupt.write_text("not json{{{")
with patch("maigret.db_updater.STATE_PATH", str(corrupt)):
assert _load_state() == {}
def test_save_and_load_state(tmp_path):
state_file = tmp_path / "state.json"
with patch("maigret.db_updater.STATE_PATH", str(state_file)):
with patch("maigret.db_updater.MAIGRET_HOME", str(tmp_path)):
_save_state({"last_check_at": "2026-01-01T00:00:00Z"})
loaded = _load_state()
assert loaded["last_check_at"] == "2026-01-01T00:00:00Z"
def test_best_local_with_valid_cache(tmp_path):
cache = tmp_path / "data.json"
cache.write_text('{"sites": {}, "engines": {}, "tags": []}')
with patch("maigret.db_updater.CACHED_DB_PATH", str(cache)):
assert _best_local() == str(cache)
def test_best_local_with_corrupt_cache(tmp_path):
cache = tmp_path / "data.json"
cache.write_text("not json")
with patch("maigret.db_updater.CACHED_DB_PATH", str(cache)):
assert _best_local() == BUNDLED_DB_PATH
def test_best_local_no_cache(tmp_path):
with patch("maigret.db_updater.CACHED_DB_PATH", str(tmp_path / "missing.json")):
assert _best_local() == BUNDLED_DB_PATH
def test_resolve_db_path_custom_url():
result = resolve_db_path("https://example.com/db.json")
assert result == "https://example.com/db.json"
def test_resolve_db_path_custom_file(tmp_path):
custom_db = tmp_path / "custom" / "path.json"
custom_db.parent.mkdir(parents=True)
custom_db.write_text("{}")
result = resolve_db_path(str(custom_db))
assert result.endswith("custom/path.json")
def test_resolve_db_path_no_autoupdate(tmp_path):
with patch("maigret.db_updater.CACHED_DB_PATH", str(tmp_path / "missing.json")):
result = resolve_db_path("resources/data.json", no_autoupdate=True)
assert result == BUNDLED_DB_PATH
def test_resolve_db_path_no_autoupdate_with_cache(tmp_path):
cache = tmp_path / "data.json"
cache.write_text('{"sites": {}, "engines": {}, "tags": []}')
with patch("maigret.db_updater.CACHED_DB_PATH", str(cache)):
result = resolve_db_path("resources/data.json", no_autoupdate=True)
assert result == str(cache)
@patch("maigret.db_updater._fetch_meta")
def test_resolve_db_path_network_failure(mock_fetch, tmp_path):
mock_fetch.return_value = None
with patch("maigret.db_updater.MAIGRET_HOME", str(tmp_path)):
with patch("maigret.db_updater.STATE_PATH", str(tmp_path / "state.json")):
with patch("maigret.db_updater.CACHED_DB_PATH", str(tmp_path / "missing.json")):
result = resolve_db_path("resources/data.json")
assert result == BUNDLED_DB_PATH
# --- force_update tests ---
@patch("maigret.db_updater._fetch_meta")
def test_force_update_network_failure(mock_fetch, tmp_path):
mock_fetch.return_value = None
with patch("maigret.db_updater.MAIGRET_HOME", str(tmp_path)):
with patch("maigret.db_updater.STATE_PATH", str(tmp_path / "state.json")):
assert force_update() is False
@patch("maigret.db_updater._fetch_meta")
def test_force_update_incompatible_version(mock_fetch, tmp_path):
mock_fetch.return_value = {"min_maigret_version": "99.0.0", "sites_count": 100}
with patch("maigret.db_updater.MAIGRET_HOME", str(tmp_path)):
with patch("maigret.db_updater.STATE_PATH", str(tmp_path / "state.json")):
assert force_update() is False
@patch("maigret.db_updater._download_and_verify")
@patch("maigret.db_updater._fetch_meta")
def test_force_update_success(mock_fetch, mock_download, tmp_path):
mock_fetch.return_value = {
"min_maigret_version": "0.1.0",
"sites_count": 3200,
"updated_at": "2099-01-01T00:00:00Z",
"data_url": "https://example.com/data.json",
"data_sha256": "abc123",
}
mock_download.return_value = str(tmp_path / "data.json")
with patch("maigret.db_updater.MAIGRET_HOME", str(tmp_path)):
with patch("maigret.db_updater.STATE_PATH", str(tmp_path / "state.json")):
with patch("maigret.db_updater.CACHED_DB_PATH", str(tmp_path / "missing.json")):
assert force_update() is True
state = _load_state()
assert state["last_meta"]["sites_count"] == 3200
@patch("maigret.db_updater._fetch_meta")
def test_force_update_already_up_to_date(mock_fetch, tmp_path):
cache = tmp_path / "data.json"
cache.write_text('{"sites": {}, "engines": {}, "tags": []}')
state_file = tmp_path / "state.json"
state_file.write_text(json.dumps({
"last_check_at": _now_iso(),
"last_meta": {"updated_at": "2026-01-01T00:00:00Z", "sites_count": 3000},
}))
mock_fetch.return_value = {
"min_maigret_version": "0.1.0",
"sites_count": 3000,
"updated_at": "2026-01-01T00:00:00Z",
}
with patch("maigret.db_updater.MAIGRET_HOME", str(tmp_path)):
with patch("maigret.db_updater.STATE_PATH", str(state_file)):
with patch("maigret.db_updater.CACHED_DB_PATH", str(cache)):
assert force_update() is False
@patch("maigret.db_updater._download_and_verify")
@patch("maigret.db_updater._fetch_meta")
def test_force_update_download_fails(mock_fetch, mock_download, tmp_path):
mock_fetch.return_value = {
"min_maigret_version": "0.1.0",
"sites_count": 3200,
"updated_at": "2099-01-01T00:00:00Z",
"data_url": "https://example.com/data.json",
"data_sha256": "abc123",
}
mock_download.return_value = None
with patch("maigret.db_updater.MAIGRET_HOME", str(tmp_path)):
with patch("maigret.db_updater.STATE_PATH", str(tmp_path / "state.json")):
with patch("maigret.db_updater.CACHED_DB_PATH", str(tmp_path / "missing.json")):
assert force_update() is False
+2 -2
View File
@@ -36,7 +36,7 @@ def test_notify_about_errors():
},
}
notifications = notify_about_errors(results, query_notify=None, show_statistics=True)
results = notify_about_errors(results, query_notify=None, show_statistics=True)
# Check the output
expected_output = [
@@ -55,4 +55,4 @@ def test_notify_about_errors():
('Access denied: 25.0%', '!'),
('You can see detailed site check errors with a flag `--print-errors`', '-'),
]
assert notifications == expected_output
assert results == expected_output
+9 -10
View File
@@ -3,7 +3,6 @@
import pytest
import asyncio
import logging
from typing import Any, List, Tuple, Callable, Dict
from maigret.executors import (
AsyncioSimpleExecutor,
AsyncioProgressbarExecutor,
@@ -22,7 +21,7 @@ async def func(n):
@pytest.mark.asyncio
async def test_simple_asyncio_executor():
tasks: List[Tuple[Callable, list, dict]] = [(func, [n], {}) for n in range(10)]
tasks = [(func, [n], {}) for n in range(10)]
executor = AsyncioSimpleExecutor(logger=logger)
assert await executor.run(tasks) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert executor.execution_time > 0.2
@@ -31,7 +30,7 @@ async def test_simple_asyncio_executor():
@pytest.mark.asyncio
async def test_asyncio_progressbar_executor():
tasks: List[Tuple[Callable, list, dict]] = [(func, [n], {}) for n in range(10)]
tasks = [(func, [n], {}) for n in range(10)]
executor = AsyncioProgressbarExecutor(logger=logger)
# no guarantees for the results order
@@ -42,7 +41,7 @@ async def test_asyncio_progressbar_executor():
@pytest.mark.asyncio
async def test_asyncio_progressbar_semaphore_executor():
tasks: List[Tuple[Callable, list, dict]] = [(func, [n], {}) for n in range(10)]
tasks = [(func, [n], {}) for n in range(10)]
executor = AsyncioProgressbarSemaphoreExecutor(logger=logger, in_parallel=5)
# no guarantees for the results order
@@ -54,7 +53,7 @@ async def test_asyncio_progressbar_semaphore_executor():
@pytest.mark.slow
@pytest.mark.asyncio
async def test_asyncio_progressbar_queue_executor():
tasks: List[Tuple[Callable, list, dict]] = [(func, [n], {}) for n in range(10)]
tasks = [(func, [n], {}) for n in range(10)]
executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=2)
assert await executor.run(tasks) == [0, 1, 3, 2, 4, 6, 7, 5, 9, 8]
@@ -82,22 +81,22 @@ async def test_asyncio_progressbar_queue_executor():
@pytest.mark.asyncio
async def test_asyncio_queue_generator_executor():
tasks: List[Tuple[Callable, list, dict]] = [(func, [n], {}) for n in range(10)]
tasks = [(func, [n], {}) for n in range(10)]
executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=2)
results = [result async for result in executor.run(tasks)] # type: ignore[arg-type]
results = [result async for result in executor.run(tasks)]
assert results == [0, 1, 3, 2, 4, 6, 7, 5, 9, 8]
assert executor.execution_time > 0.5
assert executor.execution_time < 0.6
executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=3)
results = [result async for result in executor.run(tasks)] # type: ignore[arg-type]
results = [result async for result in executor.run(tasks)]
assert results == [0, 3, 1, 4, 6, 2, 7, 9, 5, 8]
assert executor.execution_time > 0.4
assert executor.execution_time < 0.5
executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=5)
results = [result async for result in executor.run(tasks)] # type: ignore[arg-type]
results = [result async for result in executor.run(tasks)]
assert results in (
[0, 3, 6, 1, 4, 7, 9, 2, 5, 8],
[0, 3, 6, 1, 4, 9, 7, 2, 5, 8],
@@ -106,7 +105,7 @@ async def test_asyncio_queue_generator_executor():
assert executor.execution_time < 0.4
executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=10)
results = [result async for result in executor.run(tasks)] # type: ignore[arg-type]
results = [result async for result in executor.run(tasks)]
assert results == [0, 3, 6, 9, 1, 4, 7, 2, 5, 8]
assert executor.execution_time > 0.2
assert executor.execution_time < 0.3
+2 -84
View File
@@ -2,7 +2,6 @@
import asyncio
import copy
from unittest.mock import patch
import pytest
from mock import Mock
@@ -12,8 +11,7 @@ from maigret.maigret import (
extract_ids_from_page,
extract_ids_from_results,
)
from maigret.checking import site_self_check
from maigret.sites import MaigretSite, MaigretDatabase
from maigret.sites import MaigretSite
from maigret.result import MaigretCheckResult, MaigretCheckStatus
from tests.conftest import RESULTS_EXAMPLE
@@ -39,86 +37,6 @@ async def test_self_check_db(test_db):
assert test_db.sites_dict['InvalidInactive'].disabled is True
@pytest.mark.slow
@pytest.mark.asyncio
async def test_self_check_no_progressbar(test_db):
"""Verify that no_progressbar=True disables the alive_bar in self_check."""
logger = Mock()
with patch('maigret.checking.alive_bar') as mock_alive_bar:
mock_bar = Mock()
mock_alive_bar.return_value.__enter__ = Mock(return_value=mock_bar)
mock_alive_bar.return_value.__exit__ = Mock(return_value=False)
await self_check(
test_db, test_db.sites_dict, logger, silent=True,
no_progressbar=True,
)
# First call is the self-check progress bar; subsequent calls are
# from inner search() invocations.
self_check_call = mock_alive_bar.call_args_list[0]
_, kwargs = self_check_call
assert kwargs.get('title') == 'Self-checking'
assert kwargs.get('disable') is True
@pytest.mark.slow
@pytest.mark.asyncio
async def test_self_check_progressbar_enabled_by_default(test_db):
"""Verify that alive_bar is enabled by default (no_progressbar=False)."""
logger = Mock()
with patch('maigret.checking.alive_bar') as mock_alive_bar:
mock_bar = Mock()
mock_alive_bar.return_value.__enter__ = Mock(return_value=mock_bar)
mock_alive_bar.return_value.__exit__ = Mock(return_value=False)
await self_check(
test_db, test_db.sites_dict, logger, silent=True,
)
self_check_call = mock_alive_bar.call_args_list[0]
_, kwargs = self_check_call
assert kwargs.get('title') == 'Self-checking'
assert kwargs.get('disable') is False
@pytest.mark.asyncio
async def test_site_self_check_handles_exception(test_db):
"""Verify that site_self_check catches unexpected exceptions and returns a valid result."""
logger = Mock()
sem = asyncio.Semaphore(1)
site = test_db.sites_dict['ValidActive']
with patch('maigret.checking.maigret', side_effect=RuntimeError("test crash")):
result = await site_self_check(site, logger, sem, test_db)
assert isinstance(result, dict)
assert "issues" in result
assert len(result["issues"]) > 0
assert any("Unexpected error" in issue for issue in result["issues"])
@pytest.mark.asyncio
async def test_self_check_handles_task_exception(test_db):
"""Verify that self_check continues when individual site checks raise exceptions."""
logger = Mock()
with patch('maigret.checking.maigret', side_effect=RuntimeError("test crash")):
result = await self_check(
test_db, test_db.sites_dict, logger, silent=True,
no_progressbar=True,
)
assert isinstance(result, dict)
assert 'results' in result
assert len(result['results']) == len(test_db.sites_dict)
for r in result['results']:
assert 'site_name' in r
assert 'issues' in r
@pytest.mark.slow
@pytest.mark.skip(reason="broken, fixme")
def test_maigret_results(test_db):
@@ -194,7 +112,7 @@ def test_extract_ids_from_page(test_db):
def test_extract_ids_from_results(test_db):
TEST_EXAMPLE: dict = copy.deepcopy(RESULTS_EXAMPLE)
TEST_EXAMPLE = copy.deepcopy(RESULTS_EXAMPLE)
TEST_EXAMPLE['Reddit']['ids_usernames'] = {'test1': 'yandex_public_id'}
TEST_EXAMPLE['Reddit']['ids_links'] = ['https://www.reddit.com/user/test2']
+1 -1
View File
@@ -6,7 +6,7 @@ import os
import pytest
from io import StringIO
import xmind # type: ignore[import-untyped]
import xmind
from jinja2 import Template
from maigret.report import (
+1 -3
View File
@@ -1,10 +1,8 @@
"""Maigret Database test functions"""
from typing import Any, Dict
from maigret.sites import MaigretDatabase, MaigretSite
EXAMPLE_DB: Dict[str, Any] = {
EXAMPLE_DB = {
'engines': {
"XenForo": {
"presenseStrs": ["XenForo"],
+2 -2
View File
@@ -28,7 +28,7 @@ async def test_detect_known_engine(test_db, local_test_db):
url_exists = "https://devforum.zoom.us/u/adam"
url_mainpage = "https://devforum.zoom.us/"
# Mock extract_username_dialog to return "adam"
submitter.extract_username_dialog = MagicMock(return_value="adam") # type: ignore[method-assign]
submitter.extract_username_dialog = MagicMock(return_value="adam")
sites, resp_text = await submitter.detect_known_engine(
url_exists, url_mainpage, session=None, follow_redirects=False, headers=None
@@ -111,7 +111,7 @@ async def test_check_features_manually_success(settings):
@pytest.mark.slow
@pytest.mark.asyncio
async def test_check_features_manually_cloudflare(settings):
async def test_check_features_manually_success(settings):
# Setup
db = MaigretDatabase()
logger = logging.getLogger("test_logger")
-59
View File
@@ -1,59 +0,0 @@
"""Generate db_meta.json from data.json for the auto-update system."""
import argparse
import hashlib
import json
import os.path as path
import sys
from datetime import datetime, timezone
RESOURCES_DIR = path.join(path.dirname(path.dirname(path.abspath(__file__))), "maigret", "resources")
DATA_JSON_PATH = path.join(RESOURCES_DIR, "data.json")
META_JSON_PATH = path.join(RESOURCES_DIR, "db_meta.json")
DEFAULT_DATA_URL = "https://raw.githubusercontent.com/soxoj/maigret/main/maigret/resources/data.json"
def get_current_version():
version_file = path.join(path.dirname(path.dirname(path.abspath(__file__))), "maigret", "__version__.py")
with open(version_file) as f:
for line in f:
if line.startswith("__version__"):
return line.split("=")[1].strip().strip("'\"")
return "0.0.0"
def main():
parser = argparse.ArgumentParser(description="Generate db_meta.json from data.json")
parser.add_argument("--min-version", default=None, help="Minimum compatible maigret version (default: current version)")
parser.add_argument("--data-url", default=DEFAULT_DATA_URL, help="URL where data.json can be downloaded")
args = parser.parse_args()
min_version = args.min_version or get_current_version()
with open(DATA_JSON_PATH, "rb") as f:
raw = f.read()
sha256 = hashlib.sha256(raw).hexdigest()
data = json.loads(raw)
sites_count = len(data.get("sites", {}))
meta = {
"version": 1,
"updated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"sites_count": sites_count,
"min_maigret_version": min_version,
"data_sha256": sha256,
"data_url": args.data_url,
}
with open(META_JSON_PATH, "w", encoding="utf-8") as f:
json.dump(meta, f, indent=4, ensure_ascii=False)
print(f"Generated {META_JSON_PATH}")
print(f" sites: {sites_count}")
print(f" sha256: {sha256[:16]}...")
print(f" min_version: {min_version}")
if __name__ == "__main__":
main()
+9 -67
View File
@@ -26,7 +26,6 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
try:
import aiohttp
from yarl import URL as YarlURL
except ImportError:
print("aiohttp not installed. Run: pip install aiohttp")
sys.exit(1)
@@ -75,14 +74,8 @@ def color(text: str, c: str) -> str:
async def check_url_aiohttp(url: str, headers: dict = None, follow_redirects: bool = True,
timeout: int = 15, ssl_verify: bool = False,
method: str = "GET", payload: dict = None) -> dict:
"""Check a URL using aiohttp and return detailed response info.
Args:
method: HTTP method ("GET" or "POST").
payload: JSON payload for POST requests (dict, will be serialized).
"""
timeout: int = 15, ssl_verify: bool = False) -> dict:
"""Check a URL using aiohttp and return detailed response info."""
headers = headers or DEFAULT_HEADERS.copy()
result = {
"method": "aiohttp",
@@ -103,14 +96,7 @@ async def check_url_aiohttp(url: str, headers: dict = None, follow_redirects: bo
timeout_obj = aiohttp.ClientTimeout(total=timeout)
async with aiohttp.ClientSession(connector=connector, timeout=timeout_obj) as session:
# Use encoded=True if URL contains percent-encoded chars to prevent double-encoding
request_url = YarlURL(url, encoded=True) if '%' in url else url
request_kwargs = dict(headers=headers, allow_redirects=follow_redirects)
if method.upper() == "POST" and payload is not None:
request_kwargs["json"] = payload
request_fn = session.post if method.upper() == "POST" else session.get
async with request_fn(request_url, **request_kwargs) as resp:
async with session.get(url, headers=headers, allow_redirects=follow_redirects) as resp:
result["status"] = resp.status
result["final_url"] = str(resp.url)
@@ -452,54 +438,21 @@ async def diagnose_site(site_config: dict, site_name: str) -> dict:
print(f" {color('[!]', Colors.RED)} No usernameClaimed defined")
return diagnosis
# Build full URL (display URL)
# Build full URL
url_template = url.replace("{urlMain}", url_main).replace("{urlSubpath}", site_config.get("urlSubpath", ""))
# Build probe URL (what Maigret actually requests)
url_probe = site_config.get("urlProbe", "")
if url_probe:
probe_template = url_probe.replace("{urlMain}", url_main).replace("{urlSubpath}", site_config.get("urlSubpath", ""))
else:
probe_template = url_template
# Detect request method and payload
request_method = site_config.get("requestMethod", "GET").upper()
request_payload_template = site_config.get("requestPayload")
headers = DEFAULT_HEADERS.copy()
# For API probes (urlProbe, POST), use neutral Accept header instead of text/html
# which can cause servers to return HTML instead of JSON
if url_probe or request_method == "POST":
headers["Accept"] = "*/*"
if site_config.get("headers"):
headers.update(site_config["headers"])
if url_probe:
print(f" urlProbe: {url_probe}")
if request_method != "GET":
print(f" requestMethod: {request_method}")
if request_payload_template:
print(f" requestPayload: {request_payload_template}")
# 2. Connectivity test
print(f"\n--- {color('2. CONNECTIVITY TEST', Colors.BOLD)} ---")
probe_claimed = probe_template.replace("{username}", claimed)
probe_unclaimed = probe_template.replace("{username}", unclaimed)
# Build payloads with username substituted
payload_claimed = None
payload_unclaimed = None
if request_payload_template and request_method == "POST":
payload_claimed = json.loads(
json.dumps(request_payload_template).replace("{username}", claimed)
)
payload_unclaimed = json.loads(
json.dumps(request_payload_template).replace("{username}", unclaimed)
)
url_claimed = url_template.replace("{username}", claimed)
url_unclaimed = url_template.replace("{username}", unclaimed)
result_claimed, result_unclaimed = await asyncio.gather(
check_url_aiohttp(probe_claimed, headers, method=request_method, payload=payload_claimed),
check_url_aiohttp(probe_unclaimed, headers, method=request_method, payload=payload_unclaimed)
check_url_aiohttp(url_claimed, headers),
check_url_aiohttp(url_unclaimed, headers)
)
print(f" Claimed ({claimed}): status={result_claimed['status']}, error={result_claimed['error']}")
@@ -570,18 +523,7 @@ async def diagnose_site(site_config: dict, site_name: str) -> dict:
diagnosis["warnings"].append(f"absenceStrs not found in unclaimed page")
print(f" {color('[WARN]', Colors.YELLOW)} absenceStrs not found in unclaimed page")
# Check works if: claimed is detected as present AND unclaimed is detected as absent.
# Presence detection: presenseStrs found (or empty = always true).
# Absence detection: absenceStrs found in unclaimed (or empty = never, rely on presenseStrs only).
# With only presenseStrs: works if found in claimed but NOT in unclaimed.
# With only absenceStrs: works if found in unclaimed but NOT in claimed.
# With both: standard combination.
claimed_is_present = presense_found_claimed and not absence_found_claimed
unclaimed_is_absent = (
(absence_strs and absence_found_unclaimed) or
(presense_strs and not presense_found_unclaimed)
)
if claimed_is_present and unclaimed_is_absent:
if presense_found_claimed and not absence_found_claimed and absence_found_unclaimed:
print(f" {color('[OK]', Colors.GREEN)} Message check should work correctly")
diagnosis["working"] = True
-84
View File
@@ -4,7 +4,6 @@ This module generates the listing of supported sites in file `SITES.md`
and pretty prints file with sites data.
"""
import sys
import socket
import requests
import logging
import threading
@@ -65,49 +64,6 @@ def get_base_domain(url):
return ""
def check_dns(domain, timeout=5):
"""Check if a domain resolves via DNS. Returns True if it resolves."""
try:
socket.setdefaulttimeout(timeout)
socket.getaddrinfo(domain, None)
return True
except (socket.gaierror, socket.timeout, OSError):
return False
def check_sites_dns(sites):
"""Check DNS resolution for all sites. Returns a set of site names that failed."""
SKIP_TLDS = ('.onion', '.i2p')
domains = {}
for site in sites:
domain = get_base_domain(site.url_main)
if domain and not any(domain.endswith(tld) for tld in SKIP_TLDS):
domains.setdefault(domain, []).append(site)
failed_sites = set()
results = {}
def resolve(domain):
results[domain] = check_dns(domain)
threads = []
for domain in domains:
t = threading.Thread(target=resolve, args=(domain,))
threads.append(t)
t.start()
for t in threads:
t.join()
for domain, resolved in results.items():
if not resolved:
for site in domains[domain]:
failed_sites.add(site.name)
logging.warning(f"DNS resolution failed for {domain}")
return failed_sites
def get_step_rank(rank):
def get_readable_rank(r):
return RANKS[str(r)]
@@ -130,8 +86,6 @@ def main():
parser.add_argument('--empty-only', help='update only sites without rating', action='store_true')
parser.add_argument('--exclude-engine', help='do not update score with certain engine',
action="append", dest="exclude_engine_list", default=[])
parser.add_argument('--dns-check', help='disable sites whose domains do not resolve via DNS',
action='store_true')
pool = list()
@@ -149,24 +103,6 @@ Rank data fetched from Majestic Million by domains.
""")
if args.dns_check:
print("Checking DNS resolution for all site domains...")
failed = check_sites_dns(sites_subset)
disabled_count = 0
re_enabled_count = 0
for site in sites_subset:
if site.name in failed:
if not site.disabled:
site.disabled = True
disabled_count += 1
print(f" Disabled {site.name}: DNS does not resolve ({get_base_domain(site.url_main)})")
else:
if site.disabled:
# Re-enable previously disabled site if DNS now resolves
# (only if it was likely disabled due to DNS failure)
pass
print(f"DNS check complete: {disabled_count} site(s) disabled, {len(failed)} domain(s) unresolvable.")
majestic_ranks = {}
if args.with_rank:
majestic_ranks = fetch_majestic_million()
@@ -217,26 +153,6 @@ Rank data fetched from Majestic Million by domains.
site_file.write(f'\nThe list was updated at ({datetime.now(timezone.utc).date()})\n')
db.save_to_file(args.base_file)
# Regenerate db_meta.json to stay in sync with data.json
try:
import hashlib, json, os
db_data_raw = open(args.base_file, 'rb').read()
db_data_parsed = json.loads(db_data_raw)
meta = {
"version": 1,
"updated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"sites_count": len(db_data_parsed.get("sites", {})),
"min_maigret_version": "0.5.0",
"data_sha256": hashlib.sha256(db_data_raw).hexdigest(),
"data_url": "https://raw.githubusercontent.com/soxoj/maigret/main/maigret/resources/data.json",
}
meta_path = os.path.join(os.path.dirname(args.base_file), "db_meta.json")
with open(meta_path, "w", encoding="utf-8") as mf:
json.dump(meta, mf, indent=4, ensure_ascii=False)
print(f"Updated {meta_path} ({meta['sites_count']} sites)")
except Exception as e:
print(f"Warning: could not regenerate db_meta.json: {e}")
statistics_text = db.get_db_stats(is_markdown=True)
site_file.write('## Statistics\n\n')
site_file.write(statistics_text)