refactor:reduces the cognitive complexity of get_ai_analysis (#2581)

This commit is contained in:
Danilo Salve
2026-05-05 15:52:34 -03:00
committed by GitHub
parent 846feb6e7e
commit 52c8917e2c
+42 -38
View File
@@ -75,6 +75,45 @@ async def print_streaming(text: str, delay: float = 0.04):
sys.stdout.flush()
async def _check_response(resp):
"""Raise descriptive errors for non-success HTTP responses."""
if resp.status == 401:
raise RuntimeError("Invalid OpenAI API key (HTTP 401)")
if resp.status == 429:
raise RuntimeError("OpenAI API rate limit exceeded (HTTP 429)")
if resp.status != 200:
body = await resp.text()
raise RuntimeError(f"OpenAI API error (HTTP {resp.status}): {body[:500]}")
async def _stream_response(resp, spinner, first_token):
"""Stream tokens from resp, display them, and return (first_token, full_analysis)."""
full_response = []
async for line in resp.content:
decoded = line.decode("utf-8").strip()
if not decoded or not decoded.startswith("data: "):
continue
data_str = decoded[len("data: "):]
if data_str == "[DONE]":
break
try:
chunk = json.loads(data_str)
except json.JSONDecodeError:
continue
delta = chunk.get("choices", [{}])[0].get("delta", {})
content = delta.get("content", "")
if not content:
continue
if first_token:
spinner.stop()
print()
first_token = False
sys.stdout.write(content)
sys.stdout.flush()
full_response.append(content)
return first_token, "".join(full_response)
async def get_ai_analysis(
api_key: str,
markdown_report: str,
@@ -105,47 +144,12 @@ async def get_ai_analysis(
spinner = _Spinner("Analysing the data with AI...")
spinner.start()
first_token = True
full_response = []
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload, headers=headers) as resp:
if resp.status == 401:
raise RuntimeError("Invalid OpenAI API key (HTTP 401)")
if resp.status == 429:
raise RuntimeError("OpenAI API rate limit exceeded (HTTP 429)")
if resp.status != 200:
body = await resp.text()
raise RuntimeError(
f"OpenAI API error (HTTP {resp.status}): {body[:500]}"
)
async for line in resp.content:
decoded = line.decode("utf-8").strip()
if not decoded or not decoded.startswith("data: "):
continue
data_str = decoded[len("data: "):]
if data_str == "[DONE]":
break
try:
chunk = json.loads(data_str)
except json.JSONDecodeError:
continue
delta = chunk.get("choices", [{}])[0].get("delta", {})
content = delta.get("content", "")
if not content:
continue
if first_token:
spinner.stop()
print()
first_token = False
sys.stdout.write(content)
sys.stdout.flush()
await _check_response(resp)
first_token, analysis = await _stream_response(resp, spinner, first_token)
except Exception:
spinner.stop()
raise
@@ -155,4 +159,4 @@ async def get_ai_analysis(
spinner.stop()
print()
return "".join(full_response)
return analysis