Files
maigret/maigret/db_updater.py
T
Soxoj 269d50eedc DB update mechanism (#2458)
* Database update mechanism
2026-04-04 18:00:50 +02:00

331 lines
9.9 KiB
Python

"""
Database auto-update logic for maigret.
Checks a lightweight meta file to determine if a newer site database is available,
downloads it if compatible, and caches it locally in ~/.maigret/.
"""
import hashlib
import json
import logging
import os
import os.path as path
import tempfile
from datetime import datetime, timezone
from typing import Optional
import requests
from colorama import Fore, Style
from .__version__ import __version__
logger = logging.getLogger("maigret")
_use_color = True
def _print_info(msg: str) -> None:
text = f"[*] {msg}"
if _use_color:
print(Style.BRIGHT + Fore.GREEN + text + Style.RESET_ALL)
else:
print(text)
def _print_success(msg: str) -> None:
text = f"[+] {msg}"
if _use_color:
print(Style.BRIGHT + Fore.GREEN + text + Style.RESET_ALL)
else:
print(text)
def _print_warning(msg: str) -> None:
text = f"[!] {msg}"
if _use_color:
print(Style.BRIGHT + Fore.YELLOW + text + Style.RESET_ALL)
else:
print(text)
DEFAULT_META_URL = (
"https://raw.githubusercontent.com/soxoj/maigret/main/maigret/resources/db_meta.json"
)
DEFAULT_CHECK_INTERVAL_HOURS = 24
MAIGRET_HOME = path.expanduser("~/.maigret")
CACHED_DB_PATH = path.join(MAIGRET_HOME, "data.json")
STATE_PATH = path.join(MAIGRET_HOME, "autoupdate_state.json")
BUNDLED_DB_PATH = path.join(path.dirname(path.realpath(__file__)), "resources", "data.json")
def _parse_version(version_str: str) -> tuple:
"""Parse a version string like '0.5.0' into a comparable tuple (0, 5, 0)."""
try:
return tuple(int(x) for x in version_str.strip().split("."))
except (ValueError, AttributeError):
return (0, 0, 0)
def _ensure_maigret_home() -> None:
os.makedirs(MAIGRET_HOME, exist_ok=True)
def _load_state() -> dict:
try:
with open(STATE_PATH, "r", encoding="utf-8") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError, OSError):
return {}
def _save_state(state: dict) -> None:
_ensure_maigret_home()
tmp_path = STATE_PATH + ".tmp"
try:
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(state, f, indent=2, ensure_ascii=False)
os.replace(tmp_path, STATE_PATH)
except OSError:
try:
os.unlink(tmp_path)
except OSError:
pass
def _needs_check(state: dict, interval_hours: int) -> bool:
last_check = state.get("last_check_at")
if not last_check:
return True
try:
last_dt = datetime.fromisoformat(last_check.replace("Z", "+00:00"))
elapsed = (datetime.now(timezone.utc) - last_dt).total_seconds() / 3600
return elapsed >= interval_hours
except (ValueError, TypeError):
return True
def _fetch_meta(meta_url: str, timeout: int = 10) -> Optional[dict]:
try:
response = requests.get(meta_url, timeout=timeout)
if response.status_code == 200:
return response.json()
except Exception:
pass
return None
def _is_version_compatible(meta: dict) -> bool:
min_ver = meta.get("min_maigret_version", "0.0.0")
return _parse_version(__version__) >= _parse_version(min_ver)
def _is_update_available(meta: dict, state: dict) -> bool:
if not path.isfile(CACHED_DB_PATH):
return True
remote_date = meta.get("updated_at", "")
cached_date = state.get("last_meta", {}).get("updated_at", "")
return remote_date > cached_date
def _download_and_verify(data_url: str, expected_sha256: str, timeout: int = 60) -> Optional[str]:
_ensure_maigret_home()
tmp_fd, tmp_path = tempfile.mkstemp(dir=MAIGRET_HOME, suffix=".json")
try:
response = requests.get(data_url, timeout=timeout)
if response.status_code != 200:
return None
content = response.content
actual_sha256 = hashlib.sha256(content).hexdigest()
if actual_sha256 != expected_sha256:
_print_warning("DB auto-update: SHA-256 mismatch, download rejected")
return None
# Validate JSON structure
data = json.loads(content)
if not all(k in data for k in ("sites", "engines", "tags")):
_print_warning("DB auto-update: invalid database structure")
return None
os.write(tmp_fd, content)
os.close(tmp_fd)
tmp_fd = None
os.replace(tmp_path, CACHED_DB_PATH)
return CACHED_DB_PATH
except Exception:
return None
finally:
if tmp_fd is not None:
os.close(tmp_fd)
try:
os.unlink(tmp_path)
except OSError:
pass
def _best_local() -> str:
"""Return cached DB if it exists and is valid, otherwise bundled."""
if path.isfile(CACHED_DB_PATH):
try:
with open(CACHED_DB_PATH, "r", encoding="utf-8") as f:
data = json.load(f)
if "sites" in data:
return CACHED_DB_PATH
except (json.JSONDecodeError, OSError):
pass
return BUNDLED_DB_PATH
def _now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def resolve_db_path(
db_file_arg: str,
no_autoupdate: bool = False,
meta_url: str = DEFAULT_META_URL,
check_interval_hours: int = DEFAULT_CHECK_INTERVAL_HOURS,
color: bool = True,
) -> str:
"""
Determine which database file to use, potentially downloading an update.
Returns the path to the database file that should be loaded.
"""
global _use_color
_use_color = color
default_db_name = "resources/data.json"
# User specified a custom DB — skip auto-update
is_url = db_file_arg.startswith("http://") or db_file_arg.startswith("https://")
is_default = db_file_arg == default_db_name
if is_url:
return db_file_arg
if not is_default:
return path.join(path.dirname(path.realpath(__file__)), db_file_arg)
# Auto-update disabled
if no_autoupdate:
return _best_local()
# Check interval
_ensure_maigret_home()
state = _load_state()
if not _needs_check(state, check_interval_hours):
return _best_local()
# Time to check
_print_info("DB auto-update: checking for updates...")
meta = _fetch_meta(meta_url)
if meta is None:
_print_warning("DB auto-update: could not reach update server, using local database")
state["last_check_at"] = _now_iso()
_save_state(state)
return _best_local()
# Version compatibility
if not _is_version_compatible(meta):
min_ver = meta.get("min_maigret_version", "?")
_print_warning(
f"DB auto-update: latest database requires maigret >= {min_ver}, "
f"you have {__version__}. Please upgrade with: pip install -U maigret"
)
state["last_check_at"] = _now_iso()
_save_state(state)
return _best_local()
# Check if update available
if not _is_update_available(meta, state):
sites_count = meta.get("sites_count", "?")
_print_info(f"DB auto-update: database is up to date ({sites_count} sites)")
state["last_check_at"] = _now_iso()
state["last_meta"] = meta
_save_state(state)
return _best_local()
# Download update
new_count = meta.get("sites_count", "?")
old_count = state.get("last_meta", {}).get("sites_count")
if old_count:
_print_info(f"DB auto-update: downloading updated database ({new_count} sites, was {old_count})...")
else:
_print_info(f"DB auto-update: downloading database ({new_count} sites)...")
data_url = meta.get("data_url", "")
expected_sha = meta.get("data_sha256", "")
result = _download_and_verify(data_url, expected_sha)
if result is None:
_print_warning("DB auto-update: download failed, using local database")
state["last_check_at"] = _now_iso()
_save_state(state)
return _best_local()
_print_success(f"DB auto-update: database updated successfully ({new_count} sites)")
state["last_check_at"] = _now_iso()
state["last_meta"] = meta
state["cached_db_sha256"] = expected_sha
_save_state(state)
return CACHED_DB_PATH
def force_update(
meta_url: str = DEFAULT_META_URL,
color: bool = True,
) -> bool:
"""
Force check for database updates and download if available.
Returns True if database was updated, False otherwise.
"""
global _use_color
_use_color = color
_ensure_maigret_home()
_print_info("DB update: checking for updates...")
meta = _fetch_meta(meta_url)
if meta is None:
_print_warning("DB update: could not reach update server")
return False
if not _is_version_compatible(meta):
min_ver = meta.get("min_maigret_version", "?")
_print_warning(
f"DB update: latest database requires maigret >= {min_ver}, "
f"you have {__version__}. Please upgrade with: pip install -U maigret"
)
return False
state = _load_state()
new_count = meta.get("sites_count", "?")
old_count = state.get("last_meta", {}).get("sites_count")
if not _is_update_available(meta, state):
_print_info(f"DB update: database is already up to date ({new_count} sites)")
state["last_check_at"] = _now_iso()
state["last_meta"] = meta
_save_state(state)
return False
if old_count:
_print_info(f"DB update: downloading updated database ({new_count} sites, was {old_count})...")
else:
_print_info(f"DB update: downloading database ({new_count} sites)...")
data_url = meta.get("data_url", "")
expected_sha = meta.get("data_sha256", "")
result = _download_and_verify(data_url, expected_sha)
if result is None:
_print_warning("DB update: download failed")
return False
_print_success(f"DB update: database updated successfully ({new_count} sites)")
state["last_check_at"] = _now_iso()
state["last_meta"] = meta
state["cached_db_sha256"] = expected_sha
_save_state(state)
return True