CLI test fixes

This commit is contained in:
Soxoj
2024-12-15 12:57:01 +01:00
parent f8f7c996ca
commit 86ea0b9212
4 changed files with 119 additions and 68 deletions
+1 -1
View File
@@ -1,7 +1,7 @@
LINT_FILES=maigret wizard.py tests LINT_FILES=maigret wizard.py tests
test: test:
coverage run --source=./maigret -m pytest tests coverage run --source=./maigret,./maigret/web -m pytest tests
coverage report -m coverage report -m
coverage html coverage html
+6 -6
View File
@@ -330,11 +330,9 @@ def setup_arguments_parser(settings: Settings):
type=int, type=int,
nargs='?', nargs='?',
const=5000, # default if --web is provided without a port const=5000, # default if --web is provided without a port
default=None, default=settings.web_interface_port,
help="Launches the web interface on the specified port (default: 5000 if no PORT is provided).", help="Launches the web interface on the specified port (default: 5000 if no PORT is provided).",
) )
output_group = parser.add_argument_group( output_group = parser.add_argument_group(
'Output options', 'Options to change verbosity and view of the console output' 'Output options', 'Options to change verbosity and view of the console output'
) )
@@ -494,12 +492,14 @@ async def main():
elif args.verbose: elif args.verbose:
log_level = logging.WARNING log_level = logging.WARNING
logger.setLevel(log_level) logger.setLevel(log_level)
if args.web is not None: if args.web is not None:
from maigret.web.app import app from maigret.web.app import app
port = args.web if args.web else 5000 # args.web is either the specified port or 5000 by const
app.run(port=port)
port = (
args.web if args.web else 5000
) # args.web is either the specified port or 5000 by const
app.run(port=port)
# Usernames initial list # Usernames initial list
usernames = { usernames = {
+103 -57
View File
@@ -1,5 +1,14 @@
# app.py # app.py
from flask import Flask, render_template, request, send_file, Response, flash, redirect, url_for from flask import (
Flask,
render_template,
request,
send_file,
Response,
flash,
redirect,
url_for,
)
import logging import logging
import os import os
import asyncio import asyncio
@@ -26,17 +35,19 @@ REPORTS_FOLDER = os.path.abspath('/tmp/maigret_reports')
os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(REPORTS_FOLDER, exist_ok=True) os.makedirs(REPORTS_FOLDER, exist_ok=True)
def setup_logger(log_level, name): def setup_logger(log_level, name):
logger = logging.getLogger(name) logger = logging.getLogger(name)
logger.setLevel(log_level) logger.setLevel(log_level)
return logger return logger
async def maigret_search(username, options): async def maigret_search(username, options):
logger = setup_logger(logging.WARNING, 'maigret') logger = setup_logger(logging.WARNING, 'maigret')
try: try:
db = MaigretDatabase().load_from_path(MAIGRET_DB_FILE) db = MaigretDatabase().load_from_path(MAIGRET_DB_FILE)
sites = db.ranked_sites_dict(top=int(options.get('top_sites', 500))) sites = db.ranked_sites_dict(top=int(options.get('top_sites', 500)))
results = await maigret.search( results = await maigret.search(
username=username, username=username,
site_dict=sites, site_dict=sites,
@@ -50,6 +61,7 @@ async def maigret_search(username, options):
logger.error(f"Error during search: {str(e)}") logger.error(f"Error during search: {str(e)}")
raise raise
async def search_multiple_usernames(usernames, options): async def search_multiple_usernames(usernames, options):
results = [] results = []
for username in usernames: for username in usernames:
@@ -60,109 +72,140 @@ async def search_multiple_usernames(usernames, options):
logging.error(f"Error searching username {username}: {str(e)}") logging.error(f"Error searching username {username}: {str(e)}")
return results return results
def process_search_task(usernames, options, timestamp): def process_search_task(usernames, options, timestamp):
try: try:
# Setup event loop for async operations # Setup event loop for async operations
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
# Run the search # Run the search
general_results = loop.run_until_complete(search_multiple_usernames(usernames, options)) general_results = loop.run_until_complete(
search_multiple_usernames(usernames, options)
)
# Create session folder # Create session folder
session_folder = os.path.join(REPORTS_FOLDER, f"search_{timestamp}") session_folder = os.path.join(REPORTS_FOLDER, f"search_{timestamp}")
os.makedirs(session_folder, exist_ok=True) os.makedirs(session_folder, exist_ok=True)
# Save the combined graph # Save the combined graph
graph_path = os.path.join(session_folder, "combined_graph.html") graph_path = os.path.join(session_folder, "combined_graph.html")
maigret.report.save_graph_report(graph_path, general_results, MaigretDatabase().load_from_path(MAIGRET_DB_FILE)) maigret.report.save_graph_report(
graph_path,
general_results,
MaigretDatabase().load_from_path(MAIGRET_DB_FILE),
)
# Save individual reports # Save individual reports
individual_reports = [] individual_reports = []
for username, id_type, results in general_results: for username, id_type, results in general_results:
report_base = os.path.join(session_folder, f"report_{username}") report_base = os.path.join(session_folder, f"report_{username}")
csv_path = f"{report_base}.csv" csv_path = f"{report_base}.csv"
json_path = f"{report_base}.json" json_path = f"{report_base}.json"
pdf_path = f"{report_base}.pdf" pdf_path = f"{report_base}.pdf"
html_path = f"{report_base}.html" html_path = f"{report_base}.html"
context = generate_report_context(general_results) context = generate_report_context(general_results)
maigret.report.save_csv_report(csv_path, username, results) maigret.report.save_csv_report(csv_path, username, results)
maigret.report.save_json_report(json_path, username, results, report_type='ndjson') maigret.report.save_json_report(
json_path, username, results, report_type='ndjson'
)
maigret.report.save_pdf_report(pdf_path, context) maigret.report.save_pdf_report(pdf_path, context)
maigret.report.save_html_report(html_path, context) maigret.report.save_html_report(html_path, context)
claimed_profiles = [] claimed_profiles = []
for site_name, site_data in results.items(): for site_name, site_data in results.items():
if (site_data.get('status') and if (
site_data['status'].status == maigret.result.MaigretCheckStatus.CLAIMED): site_data.get('status')
claimed_profiles.append({ and site_data['status'].status
'site_name': site_name, == maigret.result.MaigretCheckStatus.CLAIMED
'url': site_data.get('url_user', ''), ):
'tags': site_data.get('status').tags if site_data.get('status') else [] claimed_profiles.append(
}) {
'site_name': site_name,
individual_reports.append({ 'url': site_data.get('url_user', ''),
'username': username, 'tags': (
'csv_file': os.path.join(f"search_{timestamp}", f"report_{username}.csv"), site_data.get('status').tags
'json_file': os.path.join(f"search_{timestamp}", f"report_{username}.json"), if site_data.get('status')
'pdf_file': os.path.join(f"search_{timestamp}", f"report_{username}.pdf"), else []
'html_file': os.path.join(f"search_{timestamp}", f"report_{username}.html"), ),
'claimed_profiles': claimed_profiles, }
}) )
individual_reports.append(
{
'username': username,
'csv_file': os.path.join(
f"search_{timestamp}", f"report_{username}.csv"
),
'json_file': os.path.join(
f"search_{timestamp}", f"report_{username}.json"
),
'pdf_file': os.path.join(
f"search_{timestamp}", f"report_{username}.pdf"
),
'html_file': os.path.join(
f"search_{timestamp}", f"report_{username}.html"
),
'claimed_profiles': claimed_profiles,
}
)
# Save results and mark job as complete # Save results and mark job as complete
job_results[timestamp] = { job_results[timestamp] = {
'status': 'completed', 'status': 'completed',
'session_folder': f"search_{timestamp}", 'session_folder': f"search_{timestamp}",
'graph_file': os.path.join(f"search_{timestamp}", "combined_graph.html"), 'graph_file': os.path.join(f"search_{timestamp}", "combined_graph.html"),
'usernames': usernames, 'usernames': usernames,
'individual_reports': individual_reports 'individual_reports': individual_reports,
} }
except Exception as e: except Exception as e:
job_results[timestamp] = { job_results[timestamp] = {'status': 'failed', 'error': str(e)}
'status': 'failed',
'error': str(e)
}
finally: finally:
background_jobs[timestamp]['completed'] = True background_jobs[timestamp]['completed'] = True
@app.route('/') @app.route('/')
def index(): def index():
return render_template('index.html') return render_template('index.html')
@app.route('/search', methods=['POST']) @app.route('/search', methods=['POST'])
def search(): def search():
usernames_input = request.form.get('usernames', '').strip() usernames_input = request.form.get('usernames', '').strip()
if not usernames_input: if not usernames_input:
flash('At least one username is required', 'danger') flash('At least one username is required', 'danger')
return redirect(url_for('index')) return redirect(url_for('index'))
usernames = [u.strip() for u in usernames_input.replace(',', ' ').split() if u.strip()] usernames = [
u.strip() for u in usernames_input.replace(',', ' ').split() if u.strip()
]
# Create timestamp for this search session # Create timestamp for this search session
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
logging.info(f"Starting search for usernames: {usernames}") logging.info(f"Starting search for usernames: {usernames}")
options = { options = {
'top_sites': request.form.get('top_sites', '500'), 'top_sites': request.form.get('top_sites', '500'),
'timeout': request.form.get('timeout', '30'), 'timeout': request.form.get('timeout', '30'),
'id_type': 'username', # fixed as username 'id_type': 'username', # fixed as username
'use_cookies': 'use_cookies' in request.form, 'use_cookies': 'use_cookies' in request.form,
} }
# Start background job # Start background job
background_jobs[timestamp] = { background_jobs[timestamp] = {
'completed': False, 'completed': False,
'thread': Thread(target=process_search_task, args=(usernames, options, timestamp)) 'thread': Thread(
target=process_search_task, args=(usernames, options, timestamp)
),
} }
background_jobs[timestamp]['thread'].start() background_jobs[timestamp]['thread'].start()
logging.info(f"Search job started with timestamp: {timestamp}") logging.info(f"Search job started with timestamp: {timestamp}")
# Redirect to status page # Redirect to status page
return redirect(url_for('status', timestamp=timestamp)) return redirect(url_for('status', timestamp=timestamp))
@@ -170,19 +213,19 @@ def search():
@app.route('/status/<timestamp>') @app.route('/status/<timestamp>')
def status(timestamp): def status(timestamp):
logging.info(f"Status check for timestamp: {timestamp}") logging.info(f"Status check for timestamp: {timestamp}")
# Validate timestamp # Validate timestamp
if timestamp not in background_jobs: if timestamp not in background_jobs:
flash('Invalid search session', 'danger') flash('Invalid search session', 'danger')
return redirect(url_for('index')) return redirect(url_for('index'))
# Check if job is completed # Check if job is completed
if background_jobs[timestamp]['completed']: if background_jobs[timestamp]['completed']:
result = job_results.get(timestamp) result = job_results.get(timestamp)
if not result: if not result:
flash('No results found for this search session', 'warning') flash('No results found for this search session', 'warning')
return redirect(url_for('index')) return redirect(url_for('index'))
if result['status'] == 'completed': if result['status'] == 'completed':
# Redirect to results page once done # Redirect to results page once done
return redirect(url_for('results', session_id=result['session_folder'])) return redirect(url_for('results', session_id=result['session_folder']))
@@ -190,7 +233,7 @@ def status(timestamp):
error_msg = result.get('error', 'Unknown error occurred') error_msg = result.get('error', 'Unknown error occurred')
flash(f'Search failed: {error_msg}', 'danger') flash(f'Search failed: {error_msg}', 'danger')
return redirect(url_for('index')) return redirect(url_for('index'))
# If job is still running, show status page with a simple spinner # If job is still running, show status page with a simple spinner
return render_template('status.html', timestamp=timestamp) return render_template('status.html', timestamp=timestamp)
@@ -200,23 +243,25 @@ def results(session_id):
if not session_id.startswith('search_'): if not session_id.startswith('search_'):
flash('Invalid results session format', 'danger') flash('Invalid results session format', 'danger')
return redirect(url_for('index')) return redirect(url_for('index'))
result_data = next(
(r for r in job_results.values()
if r.get('status') == 'completed' and r['session_folder'] == session_id),
None
)
result_data = next(
(
r
for r in job_results.values()
if r.get('status') == 'completed' and r['session_folder'] == session_id
),
None,
)
return render_template( return render_template(
'results.html', 'results.html',
usernames=result_data['usernames'], usernames=result_data['usernames'],
graph_file=result_data['graph_file'], graph_file=result_data['graph_file'],
individual_reports=result_data['individual_reports'], individual_reports=result_data['individual_reports'],
timestamp=session_id.replace('search_', '') timestamp=session_id.replace('search_', ''),
) )
@app.route('/reports/<path:filename>') @app.route('/reports/<path:filename>')
def download_report(filename): def download_report(filename):
try: try:
@@ -226,9 +271,10 @@ def download_report(filename):
logging.error(f"Error serving file {filename}: {str(e)}") logging.error(f"Error serving file {filename}: {str(e)}")
return "File not found", 404 return "File not found", 404
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
) )
app.run(debug=True) app.run(debug=True)
+9 -4
View File
@@ -42,6 +42,7 @@ DEFAULT_ARGS: Dict[str, Any] = {
'use_disabled_sites': False, 'use_disabled_sites': False,
'username': [], 'username': [],
'verbose': False, 'verbose': False,
'web': 5000,
'with_domains': False, 'with_domains': False,
'xmind': False, 'xmind': False,
} }
@@ -55,7 +56,8 @@ def test_args_search_mode(argparser):
want_args = dict(DEFAULT_ARGS) want_args = dict(DEFAULT_ARGS)
want_args.update({'username': ['username']}) want_args.update({'username': ['username']})
assert args == Namespace(**want_args) for arg in vars(args):
assert getattr(args, arg) == want_args[arg]
def test_args_search_mode_several_usernames(argparser): def test_args_search_mode_several_usernames(argparser):
@@ -66,7 +68,8 @@ def test_args_search_mode_several_usernames(argparser):
want_args = dict(DEFAULT_ARGS) want_args = dict(DEFAULT_ARGS)
want_args.update({'username': ['username1', 'username2']}) want_args.update({'username': ['username1', 'username2']})
assert args == Namespace(**want_args) for arg in vars(args):
assert getattr(args, arg) == want_args[arg]
def test_args_self_check_mode(argparser): def test_args_self_check_mode(argparser):
@@ -81,7 +84,8 @@ def test_args_self_check_mode(argparser):
} }
) )
assert args == Namespace(**want_args) for arg in vars(args):
assert getattr(args, arg) == want_args[arg]
def test_args_multiple_sites(argparser): def test_args_multiple_sites(argparser):
@@ -97,4 +101,5 @@ def test_args_multiple_sites(argparser):
} }
) )
assert args == Namespace(**want_args) for arg in vars(args):
assert getattr(args, arg) == want_args[arg]