diff --git a/maigret/resources/db_meta.json b/maigret/resources/db_meta.json index c2a24ca..d563902 100644 --- a/maigret/resources/db_meta.json +++ b/maigret/resources/db_meta.json @@ -1,7 +1,7 @@ { "version": 1, - "updated_at": "2026-05-05T11:30:38Z", - "sites_count": 3154, + "updated_at": "2026-05-05T17:17:59Z", + "sites_count": 3155, "min_maigret_version": "0.6.0", "data_sha256": "acf9d9fef8412bf05fa09d50c1ae363e5c8394597b1aaa3f98a9a1c4e31ca356", "data_url": "https://raw.githubusercontent.com/soxoj/maigret/main/maigret/resources/data.json" diff --git a/tests/test_web.py b/tests/test_web.py new file mode 100644 index 0000000..950327f --- /dev/null +++ b/tests/test_web.py @@ -0,0 +1,172 @@ +"""Smoke tests for the Flask web interface in maigret.web.app. + +The goal is to catch breakage in the basic user flow (render index, kick off +search, redirect to results) without making real network calls. Heavy maigret +internals are mocked; the report-generation smoke test keeps `save_graph_report` +unmocked so regressions like `nt.options.groups = ...` (AttributeError on a +plain dict) are caught automatically. +""" +import os + +import pytest + +import maigret +import maigret.report +from maigret.web import app as web_app_module + + +CUR_PATH = os.path.dirname(os.path.realpath(__file__)) +TEST_DB = os.path.join(CUR_PATH, 'db.json') + + +class _SyncThread: + """Drop-in for threading.Thread that runs target synchronously on start().""" + + def __init__(self, target=None, args=(), kwargs=None, **_): + self._target = target + self._args = args + self._kwargs = kwargs or {} + + def start(self): + self._target(*self._args, **self._kwargs) + + +@pytest.fixture +def web_app(tmp_path): + web_app_module.app.config['TESTING'] = True + web_app_module.app.config['REPORTS_FOLDER'] = str(tmp_path) + web_app_module.app.config['MAIGRET_DB_FILE'] = TEST_DB + + web_app_module.background_jobs.clear() + web_app_module.job_results.clear() + + yield web_app_module + + web_app_module.background_jobs.clear() + web_app_module.job_results.clear() + + +@pytest.fixture +def client(web_app): + return web_app.app.test_client() + + +def test_index_renders(client): + resp = client.get('/') + assert resp.status_code == 200 + body = resp.get_data(as_text=True) + assert 'name="usernames"' in body + assert ' returns 200.""" + + def never_completes(usernames, options, timestamp): + # leave background_jobs[timestamp]['completed'] as False + pass + + monkeypatch.setattr(web_app, 'process_search_task', never_completes) + monkeypatch.setattr(web_app, 'Thread', _SyncThread) + + post = client.post('/search', data={'usernames': 'soxoj'}) + status_resp = client.get(post.location) + + assert status_resp.status_code == 200 + + +def test_completed_search_redirects_to_results(client, web_app, monkeypatch): + """Happy path: POST /search → background completes → /status/ → /results/.""" + + def fake_task(usernames, options, timestamp): + web_app.job_results[timestamp] = { + 'status': 'completed', + 'session_folder': f'search_{timestamp}', + 'graph_file': f'search_{timestamp}/combined_graph.html', + 'usernames': usernames, + 'individual_reports': [], + } + web_app.background_jobs[timestamp]['completed'] = True + + monkeypatch.setattr(web_app, 'process_search_task', fake_task) + monkeypatch.setattr(web_app, 'Thread', _SyncThread) + + post = client.post('/search', data={'usernames': 'soxoj'}) + assert post.status_code == 302 + + status_resp = client.get(post.location) + assert status_resp.status_code == 302 + assert '/results/search_' in status_resp.location + + results_resp = client.get(status_resp.location) + assert results_resp.status_code == 200 + assert b'soxoj' in results_resp.data + + +def test_failed_task_redirects_to_index(client, web_app, monkeypatch): + def failing_task(usernames, options, timestamp): + web_app.job_results[timestamp] = {'status': 'failed', 'error': 'boom'} + web_app.background_jobs[timestamp]['completed'] = True + + monkeypatch.setattr(web_app, 'process_search_task', failing_task) + monkeypatch.setattr(web_app, 'Thread', _SyncThread) + + post = client.post('/search', data={'usernames': 'soxoj'}) + status_resp = client.get(post.location) + + assert status_resp.status_code == 302 + assert status_resp.location.endswith('/') + + +def test_real_report_generation_does_not_crash(client, web_app, monkeypatch): + """End-to-end with mocked maigret.search but REAL report generation. + + This is the regression guard for bugs inside `save_graph_report` and friends + (e.g. `nt.options.groups = ...` raising AttributeError on a dict). If any of + the unmocked report functions throws, the task records a failed status and + this assertion catches it. + """ + + async def fake_search(*args, **kwargs): + return {} + + monkeypatch.setattr(maigret, 'search', fake_search) + # Mock the per-username report writers — they are not what we care about here, + # and pdf/html generation pulls in xhtml2pdf which is slow and brittle. + monkeypatch.setattr(maigret.report, 'save_csv_report', lambda *a, **kw: None) + monkeypatch.setattr(maigret.report, 'save_json_report', lambda *a, **kw: None) + monkeypatch.setattr(maigret.report, 'save_pdf_report', lambda *a, **kw: None) + monkeypatch.setattr(maigret.report, 'save_html_report', lambda *a, **kw: None) + monkeypatch.setattr(maigret.report, 'generate_report_context', lambda *a, **kw: {}) + monkeypatch.setattr(web_app, 'Thread', _SyncThread) + + post = client.post('/search', data={'usernames': 'testuser'}) + timestamp = post.location.rsplit('/', 1)[1] + + assert timestamp in web_app.job_results, 'background task did not record any result' + result = web_app.job_results[timestamp] + assert result['status'] == 'completed', ( + f"report generation failed: {result.get('error')!r}" + )