Files
maigret/maigret/web/app.py
T
2026-04-02 21:01:49 +02:00

354 lines
12 KiB
Python

from flask import (
Flask,
render_template,
request,
send_file,
Response,
flash,
redirect,
url_for,
)
import logging
import os
import asyncio
from datetime import datetime
from threading import Thread
from typing import Any, Dict
import maigret
import maigret.settings
from maigret.sites import MaigretDatabase
from maigret.report import generate_report_context
app = Flask(__name__)
# Use environment variable for secret key, generate random one if not set
app.secret_key = os.getenv('FLASK_SECRET_KEY', os.urandom(24).hex())
# add background job tracking
background_jobs: Dict[str, Any] = {}
job_results = {}
# Configuration
app.config["MAIGRET_DB_FILE"] = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'resources', 'data.json')
app.config["COOKIES_FILE"] = "cookies.txt"
app.config["UPLOAD_FOLDER"] = 'uploads'
app.config["REPORTS_FOLDER"] = os.path.abspath('/tmp/maigret_reports')
def setup_logger(log_level, name):
logger = logging.getLogger(name)
logger.setLevel(log_level)
return logger
async def maigret_search(username, options):
logger = setup_logger(logging.WARNING, 'maigret')
try:
db = MaigretDatabase().load_from_path(app.config["MAIGRET_DB_FILE"])
top_sites = int(options.get('top_sites') or 500)
if options.get('all_sites'):
top_sites = 999999999 # effectively all
tags = options.get('tags', [])
excluded_tags = options.get('excluded_tags', [])
site_list = options.get('site_list', [])
logger.info(f"Filtering sites by tags: {tags}, excluded: {excluded_tags}")
sites = db.ranked_sites_dict(
top=top_sites,
tags=tags,
excluded_tags=excluded_tags,
names=site_list,
disabled=False,
id_type='username',
)
logger.info(f"Found {len(sites)} sites matching the tag criteria")
results = await maigret.search(
username=username,
site_dict=sites,
timeout=int(options.get('timeout', 30)),
logger=logger,
id_type='username',
cookies=app.config["COOKIES_FILE"] if options.get('use_cookies') else None,
is_parsing_enabled=(not options.get('disable_extracting', False)),
recursive_search_enabled=(
not options.get('disable_recursive_search', False)
),
check_domains=options.get('with_domains', False),
proxy=options.get('proxy', None),
tor_proxy=options.get('tor_proxy', None),
i2p_proxy=options.get('i2p_proxy', None),
)
return results
except Exception as e:
logger.error(f"Error during search: {str(e)}")
raise
async def search_multiple_usernames(usernames, options):
results = []
for username in usernames:
try:
search_results = await maigret_search(username.strip(), options)
results.append((username.strip(), 'username', search_results))
except Exception as e:
logging.error(f"Error searching username {username}: {str(e)}")
return results
def process_search_task(usernames, options, timestamp):
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
general_results = loop.run_until_complete(
search_multiple_usernames(usernames, options)
)
os.makedirs(app.config["REPORTS_FOLDER"], exist_ok=True)
session_folder = os.path.join(
app.config["REPORTS_FOLDER"], f"search_{timestamp}"
)
os.makedirs(session_folder, exist_ok=True)
graph_path = os.path.join(session_folder, "combined_graph.html")
maigret.report.save_graph_report(
graph_path,
general_results,
MaigretDatabase().load_from_path(app.config["MAIGRET_DB_FILE"]),
)
individual_reports = []
for username, id_type, results in general_results:
report_base = os.path.join(session_folder, f"report_{username}")
csv_path = f"{report_base}.csv"
json_path = f"{report_base}.json"
pdf_path = f"{report_base}.pdf"
html_path = f"{report_base}.html"
context = generate_report_context(general_results)
maigret.report.save_csv_report(csv_path, username, results)
maigret.report.save_json_report(
json_path, username, results, report_type='ndjson'
)
maigret.report.save_pdf_report(pdf_path, context)
maigret.report.save_html_report(html_path, context)
claimed_profiles = []
for site_name, site_data in results.items():
if (
site_data.get('status')
and site_data['status'].status
== maigret.result.MaigretCheckStatus.CLAIMED
):
claimed_profiles.append(
{
'site_name': site_name,
'url': site_data.get('url_user', ''),
'tags': (
site_data.get('status').tags
if site_data.get('status')
else []
),
}
)
individual_reports.append(
{
'username': username,
'csv_file': os.path.join(
f"search_{timestamp}", f"report_{username}.csv"
),
'json_file': os.path.join(
f"search_{timestamp}", f"report_{username}.json"
),
'pdf_file': os.path.join(
f"search_{timestamp}", f"report_{username}.pdf"
),
'html_file': os.path.join(
f"search_{timestamp}", f"report_{username}.html"
),
'claimed_profiles': claimed_profiles,
}
)
# save results and mark job as complete using timestamp as key
job_results[timestamp] = {
'status': 'completed',
'session_folder': f"search_{timestamp}",
'graph_file': os.path.join(f"search_{timestamp}", "combined_graph.html"),
'usernames': usernames,
'individual_reports': individual_reports,
}
except Exception as e:
logging.error(f"Error in search task for timestamp {timestamp}: {str(e)}")
job_results[timestamp] = {'status': 'failed', 'error': str(e)}
finally:
background_jobs[timestamp]['completed'] = True
@app.route('/')
def index():
# load site data for autocomplete
db = MaigretDatabase().load_from_path(app.config["MAIGRET_DB_FILE"])
site_options = []
for site in db.sites:
# add main site name
site_options.append(site.name)
# add URL if different from name
if site.url_main and site.url_main not in site_options:
site_options.append(site.url_main)
# sort and deduplicate
site_options = sorted(set(site_options))
return render_template('index.html', site_options=site_options)
# Modified search route
@app.route('/search', methods=['POST'])
def search():
usernames_input = request.form.get('usernames', '').strip()
if not usernames_input:
flash('At least one username is required', 'danger')
return redirect(url_for('index'))
usernames = [
u.strip() for u in usernames_input.replace(',', ' ').split() if u.strip()
]
# Create timestamp for this search session
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Get selected tags - ensure it's a list
selected_tags = request.form.getlist('tags')
excluded_tags = request.form.getlist('excluded_tags')
logging.info(f"Selected tags: {selected_tags}, Excluded tags: {excluded_tags}")
options = {
'top_sites': request.form.get('top_sites') or '500',
'timeout': request.form.get('timeout') or '30',
'use_cookies': 'use_cookies' in request.form,
'all_sites': 'all_sites' in request.form,
'disable_recursive_search': 'disable_recursive_search' in request.form,
'disable_extracting': 'disable_extracting' in request.form,
'with_domains': 'with_domains' in request.form,
'proxy': request.form.get('proxy', None) or None,
'tor_proxy': request.form.get('tor_proxy', None) or None,
'i2p_proxy': request.form.get('i2p_proxy', None) or None,
'permute': 'permute' in request.form,
'tags': selected_tags, # Pass selected tags as a list
'excluded_tags': excluded_tags, # Pass excluded tags as a list
'site_list': [
s.strip() for s in request.form.get('site', '').split(',') if s.strip()
],
}
logging.info(
f"Starting search for usernames: {usernames} with tags: {selected_tags}, excluded: {excluded_tags}"
)
# Start background job
background_jobs[timestamp] = {
'completed': False,
'thread': Thread(
target=process_search_task, args=(usernames, options, timestamp)
),
}
background_jobs[timestamp]['thread'].start() # type: ignore[union-attr]
return redirect(url_for('status', timestamp=timestamp))
@app.route('/status/<timestamp>')
def status(timestamp):
logging.info(f"Status check for timestamp: {timestamp}")
# Validate timestamp
if timestamp not in background_jobs:
flash('Invalid search session.', 'danger')
logging.error(f"Invalid search session: {timestamp}")
return redirect(url_for('index'))
# Check if job is completed
if background_jobs[timestamp]['completed']:
result = job_results.get(timestamp)
if not result:
flash('No results found for this search session.', 'warning')
logging.error(f"No results found for completed session: {timestamp}")
return redirect(url_for('index'))
if result['status'] == 'completed':
# Note: use the session_folder from the results to redirect
return redirect(url_for('results', session_id=result['session_folder']))
else:
error_msg = result.get('error', 'Unknown error occurred.')
flash(f'Search failed: {error_msg}', 'danger')
logging.error(f"Search failed for session {timestamp}: {error_msg}")
return redirect(url_for('index'))
# If job is still running, show a status page
return render_template('status.html', timestamp=timestamp)
@app.route('/results/<session_id>')
def results(session_id):
# Find completed results that match this session_folder
result_data = next(
(
r
for r in job_results.values()
if r.get('status') == 'completed' and r['session_folder'] == session_id
),
None,
)
if not result_data:
flash('No results found for this session ID.', 'danger')
logging.error(f"Results for session {session_id} not found in job_results.")
return redirect(url_for('index'))
return render_template(
'results.html',
usernames=result_data['usernames'],
graph_file=result_data['graph_file'],
individual_reports=result_data['individual_reports'],
timestamp=session_id.replace('search_', ''),
)
@app.route('/reports/<path:filename>')
def download_report(filename):
try:
os.makedirs(app.config["REPORTS_FOLDER"], exist_ok=True)
file_path = os.path.normpath(
os.path.join(app.config["REPORTS_FOLDER"], filename)
)
if not file_path.startswith(app.config["REPORTS_FOLDER"]):
raise Exception("Invalid file path")
return send_file(file_path)
except Exception as e:
logging.error(f"Error serving file {filename}: {str(e)}")
return "File not found", 404
if __name__ == '__main__':
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
debug_mode = os.getenv('FLASK_DEBUG', 'False').lower() in ['true', '1', 't']
# Host configuration: secure by default
# Use 127.0.0.1 for local development, 0.0.0.0 only if explicitly set
host = os.getenv('FLASK_HOST', '127.0.0.1')
port = int(os.getenv('FLASK_PORT', '5000'))
app.run(host=host, port=port, debug=debug_mode)