* Add AI mode
This commit is contained in:
Soxoj
2026-04-23 12:12:54 +02:00
committed by GitHub
parent 4bd2f7cb35
commit b1004588af
7 changed files with 320 additions and 16 deletions
+158
View File
@@ -0,0 +1,158 @@
"""Maigret AI Analysis Module
Provides AI-powered analysis of search results using OpenAI-compatible APIs.
"""
import asyncio
import json
import os
import sys
import threading
import aiohttp
def load_ai_prompt() -> str:
"""Load the AI system prompt from the resources directory."""
maigret_path = os.path.dirname(os.path.realpath(__file__))
prompt_path = os.path.join(maigret_path, "resources", "ai_prompt.txt")
with open(prompt_path, "r", encoding="utf-8") as f:
return f.read()
def resolve_api_key(settings) -> str | None:
"""Resolve OpenAI API key from settings or environment variable.
Priority: settings.openai_api_key > OPENAI_API_KEY env var.
"""
key = getattr(settings, "openai_api_key", None)
if key:
return key
return os.environ.get("OPENAI_API_KEY")
class _Spinner:
"""Simple animated spinner for terminal output."""
FRAMES = ["", "", "", "", "", "", "", "", "", ""]
def __init__(self, text=""):
self.text = text
self._stop = threading.Event()
self._thread = None
def start(self):
self._thread = threading.Thread(target=self._spin, daemon=True)
self._thread.start()
def _spin(self):
i = 0
while not self._stop.is_set():
frame = self.FRAMES[i % len(self.FRAMES)]
sys.stderr.write(f"\r{frame} {self.text}")
sys.stderr.flush()
i += 1
self._stop.wait(0.08)
def stop(self):
self._stop.set()
if self._thread:
self._thread.join()
sys.stderr.write("\r\033[2K")
sys.stderr.flush()
async def print_streaming(text: str, delay: float = 0.04):
"""Print text word by word with a delay, simulating streaming LLM output."""
words = text.split(" ")
for i, word in enumerate(words):
if i > 0:
sys.stdout.write(" ")
sys.stdout.write(word)
sys.stdout.flush()
await asyncio.sleep(delay)
sys.stdout.write("\n")
sys.stdout.flush()
async def get_ai_analysis(
api_key: str,
markdown_report: str,
model: str = "gpt-4o",
api_base_url: str = "https://api.openai.com/v1",
) -> str:
"""Send the markdown report to an OpenAI-compatible API and return the analysis.
Uses streaming to display tokens as they arrive.
Raises on HTTP errors with descriptive messages.
"""
system_prompt = load_ai_prompt()
url = f"{api_base_url.rstrip('/')}/chat/completions"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"stream": True,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": markdown_report},
],
}
spinner = _Spinner("Analysing the data with AI...")
spinner.start()
first_token = True
full_response = []
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload, headers=headers) as resp:
if resp.status == 401:
raise RuntimeError("Invalid OpenAI API key (HTTP 401)")
if resp.status == 429:
raise RuntimeError("OpenAI API rate limit exceeded (HTTP 429)")
if resp.status != 200:
body = await resp.text()
raise RuntimeError(
f"OpenAI API error (HTTP {resp.status}): {body[:500]}"
)
async for line in resp.content:
decoded = line.decode("utf-8").strip()
if not decoded or not decoded.startswith("data: "):
continue
data_str = decoded[len("data: "):]
if data_str == "[DONE]":
break
try:
chunk = json.loads(data_str)
except json.JSONDecodeError:
continue
delta = chunk.get("choices", [{}])[0].get("delta", {})
content = delta.get("content", "")
if not content:
continue
if first_token:
spinner.stop()
print()
first_token = False
sys.stdout.write(content)
sys.stdout.flush()
except Exception:
spinner.stop()
raise
if first_token:
# No tokens received — stop spinner anyway
spinner.stop()
print()
return "".join(full_response)
+80 -14
View File
@@ -494,6 +494,21 @@ def setup_arguments_parser(settings: Settings):
" (one report per username).", " (one report per username).",
) )
report_group.add_argument(
"--ai",
action="store_true",
dest="ai",
default=False,
help="Generate an AI-powered analysis of the search results using OpenAI API. "
"Requires OPENAI_API_KEY env var or openai_api_key in settings.",
)
report_group.add_argument(
"--ai-model",
dest="ai_model",
default=settings.openai_model,
help="OpenAI model to use for AI analysis (default: gpt-4o).",
)
parser.add_argument( parser.add_argument(
"--reports-sorting", "--reports-sorting",
default=settings.report_sorting, default=settings.report_sorting,
@@ -596,6 +611,7 @@ async def main():
print_found_only=not args.print_not_found, print_found_only=not args.print_not_found,
skip_check_errors=not args.print_check_errors, skip_check_errors=not args.print_check_errors,
color=not args.no_color, color=not args.no_color,
silent=args.ai,
) )
# Create object with all information about sites we are aware of. # Create object with all information about sites we are aware of.
@@ -711,17 +727,33 @@ async def main():
+ get_dict_ascii_tree(usernames, prepend="\t") + get_dict_ascii_tree(usernames, prepend="\t")
) )
if args.ai:
from .ai import resolve_api_key
if not resolve_api_key(settings):
query_notify.warning(
'AI analysis requires an OpenAI API key. '
'Set OPENAI_API_KEY environment variable or add '
'openai_api_key to settings.json.'
)
sys.exit(1)
if not site_data: if not site_data:
query_notify.warning('No sites to check, exiting!') query_notify.warning('No sites to check, exiting!')
sys.exit(2) sys.exit(2)
query_notify.warning( if args.ai:
f'Starting a search on top {len(site_data)} sites from the Maigret database...'
)
if not args.all_sites:
query_notify.warning( query_notify.warning(
'You can run search by full list of sites with flag `-a`', '!' f'Starting AI-assisted search on top {len(site_data)} sites from the Maigret database...'
) )
else:
query_notify.warning(
f'Starting a search on top {len(site_data)} sites from the Maigret database...'
)
if not args.all_sites:
query_notify.warning(
'You can run search by full list of sites with flag `-a`', '!'
)
already_checked = set() already_checked = set()
general_results = [] general_results = []
@@ -774,11 +806,12 @@ async def main():
check_domains=args.with_domains, check_domains=args.with_domains,
) )
errs = errors.notify_about_errors( if not args.ai:
results, query_notify, show_statistics=args.verbose errs = errors.notify_about_errors(
) results, query_notify, show_statistics=args.verbose
for e in errs: )
query_notify.warning(*e) for e in errs:
query_notify.warning(*e)
if args.reports_sorting == "data": if args.reports_sorting == "data":
results = sort_report_by_data_points(results) results = sort_report_by_data_points(results)
@@ -867,10 +900,43 @@ async def main():
save_graph_report(filename, general_results, db) save_graph_report(filename, general_results, db)
query_notify.warning(f'Graph report on all usernames saved in {filename}') query_notify.warning(f'Graph report on all usernames saved in {filename}')
text_report = get_plaintext_report(report_context) if not args.ai:
if text_report: text_report = get_plaintext_report(report_context)
query_notify.info('Short text report:') if text_report:
print(text_report) query_notify.info('Short text report:')
print(text_report)
if args.ai:
from .ai import get_ai_analysis, resolve_api_key
from .report import generate_markdown_report
api_key = resolve_api_key(settings)
run_flags = []
if args.tags:
run_flags.append(f"--tags {args.tags}")
if args.site_list:
run_flags.append(f"--site {','.join(args.site_list)}")
if args.all_sites:
run_flags.append("--all-sites")
run_info = {
"sites_count": sum(len(d) for _, _, d in general_results),
"flags": " ".join(run_flags) if run_flags else None,
}
md_report = generate_markdown_report(report_context, run_info=run_info)
try:
await get_ai_analysis(
api_key=api_key,
markdown_report=md_report,
model=args.ai_model,
api_base_url=getattr(
settings, 'openai_api_base_url', 'https://api.openai.com/v1'
),
)
except Exception as e:
query_notify.warning(f'AI analysis failed: {e}')
# update database # update database
db.save_to_file(db_file) db.save_to_file(db_file)
+8
View File
@@ -123,6 +123,7 @@ class QueryNotifyPrint(QueryNotify):
print_found_only=False, print_found_only=False,
skip_check_errors=False, skip_check_errors=False,
color=True, color=True,
silent=False,
): ):
"""Create Query Notify Print Object. """Create Query Notify Print Object.
@@ -149,6 +150,7 @@ class QueryNotifyPrint(QueryNotify):
self.print_found_only = print_found_only self.print_found_only = print_found_only
self.skip_check_errors = skip_check_errors self.skip_check_errors = skip_check_errors
self.color = color self.color = color
self.silent = silent
return return
@@ -187,6 +189,9 @@ class QueryNotifyPrint(QueryNotify):
Nothing. Nothing.
""" """
if self.silent:
return
title = f"Checking {id_type}" title = f"Checking {id_type}"
if self.color: if self.color:
print( print(
@@ -236,6 +241,9 @@ class QueryNotifyPrint(QueryNotify):
Return Value: Return Value:
Nothing. Nothing.
""" """
if self.silent:
return
notify = None notify = None
self.result = result self.result = result
+7 -2
View File
@@ -267,7 +267,7 @@ def _md_format_value(value) -> str:
return s return s
def save_markdown_report(filename: str, context: dict, run_info: dict = None): def generate_markdown_report(context: dict, run_info: dict = None) -> str:
username = context.get("username", "unknown") username = context.get("username", "unknown")
generated_at = context.get("generated_at", "") generated_at = context.get("generated_at", "")
brief = context.get("brief", "") brief = context.get("brief", "")
@@ -391,8 +391,13 @@ def save_markdown_report(filename: str, context: dict, run_info: dict = None):
"CCPA, and similar).\n" "CCPA, and similar).\n"
) )
return "\n".join(lines)
def save_markdown_report(filename: str, context: dict, run_info: dict = None):
content = generate_markdown_report(context, run_info)
with open(filename, "w", encoding="utf-8") as f: with open(filename, "w", encoding="utf-8") as f:
f.write("\n".join(lines)) f.write(content)
""" """
+62
View File
@@ -0,0 +1,62 @@
You are an OSINT analyst that converts raw username-investigation reports into a short, clean human-readable summary.
Your task:
Read the attached account-discovery report and produce a concise report in exactly this style:
# Investigation Summary
Name: <most likely real full name>
Location: <most likely current location>
Occupation: <short combined description based only on strong signals>
Interests: <36 broad interests inferred from platform types, bios, and activity>
Languages: <languages supported by strong evidence only>
Website: <main personal website if clearly present>
Username: <main username> (variant: <variant usernames if any>)
Platforms: <number> profiles, active from <first year> to <last year>
Confidence: <High / Medium / Low> — <one short explanation why>
# Other leads
- <lead 1>
- <lead 2>
- <lead 3 if needed>
Rules:
1. Use only information supported by the report.
2. Resolve identity using consistency of username, full name, bio, links, company, and location.
3. Prefer strong repeated signals over one-off weak signals.
4. If one profile clearly conflicts with the rest, mention it in "Other leads" as a likely false positive instead of mixing it into the main identity.
5. Keep the tone analytical and neutral.
6. Do not mention every platform individually.
7. Do not include raw URLs except for the main website.
8. Do not mention NSFW/adult platforms in the main summary unless they are the only source for a critical lead; if such a profile looks inconsistent, mention it only as a likely false positive.
9. "Occupation" should be a compact merged description, for example: "Chief Product Officer (CPO) at ..., entrepreneur, OSINT community founder".
10. "Interests" should be broad categories, not noisy tags. Convert raw platform/tag evidence into natural categories like OSINT, software development, blogging, gaming, streaming, etc.
11. "Languages" should only include languages clearly supported by bios, texts, country tags, or profile content.
12. For "Platforms", count the profiles reported as found by the report summary, not manually deduplicated.
13. For active years, use the earliest and latest reliable dates from the consistent identity cluster. Ignore obvious outlier dates if they belong to likely false positives or weak profiles.
14. For confidence:
- High = strong consistency across username, name, bio, links, location, and/or company
- Medium = partial consistency with some gaps
- Low = mostly username-only matches
15. If some field is not reliably known, omit speculation and use the best cautious wording possible.
16. For "Name", output only the most likely real personal name in clean canonical form.
- Remove nicknames, handles, aliases, or bracketed parts such as "(Soxoj)".
- Example: "Dmitriy (Soxoj) Danilov" -> "Dmitriy Danilov".
17. For "Website", output only the plain domain or URL as text, not a markdown hyperlink.
18. In "Other leads", do not label conflicting profiles as "false positive", "likely unrelated", or "potentially a false positive".
- Instead, use neutral intelligence wording such as:
"Accounts were found that are most likely unrelated to the main identity, but may indicate possible cross-border activity and should be verified."
19. When describing anomalies in "Other leads", prefer cautious investigative phrasing:
- "may be unrelated"
- "requires verification"
- "could indicate separate activity"
- "should be checked manually"
20. Do not include nicknames or aliases inside the Name field unless they are clearly part of the legal or real-world name.
Output requirements:
- Return only the final formatted text.
- Keep it short.
- No preamble, no explanations.
Now analyze the following report
+3
View File
@@ -55,6 +55,9 @@
"pdf_report": false, "pdf_report": false,
"html_report": false, "html_report": false,
"md_report": false, "md_report": false,
"openai_api_key": "",
"openai_model": "gpt-4o",
"openai_api_base_url": "https://api.openai.com/v1",
"web_interface_port": 5000, "web_interface_port": 5000,
"no_autoupdate": false, "no_autoupdate": false,
"db_update_meta_url": "https://raw.githubusercontent.com/soxoj/maigret/main/maigret/resources/db_meta.json", "db_update_meta_url": "https://raw.githubusercontent.com/soxoj/maigret/main/maigret/resources/db_meta.json",
+2
View File
@@ -49,6 +49,8 @@ DEFAULT_ARGS: Dict[str, Any] = {
'with_domains': False, 'with_domains': False,
'xmind': False, 'xmind': False,
'md': False, 'md': False,
'ai': False,
'ai_model': 'gpt-4o',
'no_autoupdate': False, 'no_autoupdate': False,
'force_update': False, 'force_update': False,
} }