14 Commits

Author SHA1 Message Date
2c738579bd feat(ux): improve interactive mode readability and input visibility
All checks were successful
CI / test (push) Successful in 19s
- Replace plain 'tai>' prompt with styled console.input() bold cyan prompt
- Wrap interactive mode entry in a Rich Panel with border
- Frame each AI response with Rule dividers (──── AI Response ────)
- Style guardrail warnings with ⚠ prefix and bold yellow
- Improve /help output with formatted Panel showing all commands
- Style collection report: ✓/✗ per item with color, truncation in dim
- Style probe output: ✓/✗ with green/red, host info in dim
- Add Rule header divider on session start
2026-05-04 06:37:50 +02:00
6aa59bdd6b fix: strip v prefix from tag when generating deb version
All checks were successful
CI / test (push) Successful in 20s
2026-05-04 06:13:53 +02:00
530be62185 feat(cli): add response guardrails and grounded followup re-anchoring 2026-05-04 06:11:55 +02:00
2662d1b253 feat(cli): add structured JSONL session logging for AI output 2026-05-04 06:03:39 +02:00
fdcde37e46 feat(cli): support conversational AI follow-ups in interactive mode 2026-05-04 05:58:26 +02:00
67a0cb3e69 feat(cli): add interactive follow-up loop with slash commands 2026-05-04 05:54:15 +02:00
d092b508c3 chore: set deb version equal to tag
Some checks failed
Release / build (push) Failing after 8m20s
CI / test (push) Successful in 19s
2026-05-04 05:48:46 +02:00
7e1cac8bd1 feat: build and upload deb package in release workflow
Some checks failed
CI / test (push) Successful in 20s
Release / build (push) Has been cancelled
2026-05-04 05:48:10 +02:00
5fea8fe096 fix: align release Python setup with CI fallback logic
All checks were successful
CI / test (push) Successful in 19s
Release / build (push) Successful in 8m22s
2026-05-04 05:26:14 +02:00
05adbf7cc9 run
All checks were successful
CI / test (push) Successful in 19s
2026-05-04 05:24:14 +02:00
69d2bdd661 fix: use python3.12 explicitly for venv on Ubuntu Noble runner
Some checks failed
CI / test (push) Successful in 19s
Release / build (push) Failing after 3s
2026-05-04 05:19:43 +02:00
f88048762e fix: remove python3.11-venv, runner uses python3.12 on Ubuntu Noble
All checks were successful
CI / test (push) Successful in 19s
2026-05-04 05:17:37 +02:00
60f42c7754 fix: install python3.11-venv explicitly in release workflow
All checks were successful
CI / test (push) Successful in 19s
2026-05-04 05:13:18 +02:00
33dff26d2b fix: always install python3-venv in release workflow
All checks were successful
CI / test (push) Successful in 19s
2026-05-04 05:07:39 +02:00
12 changed files with 638 additions and 48 deletions

View File

@@ -59,13 +59,9 @@ jobs:
- name: Ensure Python and pip are available - name: Ensure Python and pip are available
run: | run: |
if command -v python3 >/dev/null 2>&1 && python3 -m pip --version >/dev/null 2>&1; then
python3 --version
exit 0
fi
if command -v apt-get >/dev/null 2>&1; then if command -v apt-get >/dev/null 2>&1; then
apt-get update apt-get update
apt-get install -y python3.12 python3.12-venv python3-pip || \
apt-get install -y python3 python3-pip python3-venv apt-get install -y python3 python3-pip python3-venv
elif command -v dnf >/dev/null 2>&1; then elif command -v dnf >/dev/null 2>&1; then
dnf install -y python3 python3-pip dnf install -y python3 python3-pip
@@ -76,11 +72,11 @@ jobs:
exit 1 exit 1
fi fi
python3 --version python3.12 --version || python3 --version
- name: Install package and dev dependencies - name: Install package and dev dependencies
run: | run: |
python3 -m venv .venv python3.12 -m venv .venv 2>/dev/null || python3 -m venv .venv
. .venv/bin/activate . .venv/bin/activate
python -m pip install --upgrade pip python -m pip install --upgrade pip
python -m pip install -e .[dev] python -m pip install -e .[dev]

View File

@@ -59,32 +59,35 @@ jobs:
- name: Ensure Python and build dependencies are available - name: Ensure Python and build dependencies are available
run: | run: |
if ! command -v python3 >/dev/null 2>&1; then
if command -v apt-get >/dev/null 2>&1; then if command -v apt-get >/dev/null 2>&1; then
apt-get update apt-get update
apt-get install -y python3.12 python3.12-venv python3-pip patchelf ccache || \
apt-get install -y python3 python3-pip python3-venv patchelf ccache apt-get install -y python3 python3-pip python3-venv patchelf ccache
elif command -v dnf >/dev/null 2>&1; then elif command -v dnf >/dev/null 2>&1; then
dnf install -y python3 python3-pip patchelf ccache dnf install -y python3 python3-pip python3-devel patchelf ccache
fi elif command -v yum >/dev/null 2>&1; then
yum install -y python3 python3-pip python3-devel patchelf ccache
else
echo "No supported package manager found to install Python/build deps."
exit 1
fi fi
# patchelf is required by Nuitka for standalone Linux binaries python3.12 --version || python3 --version
command -v patchelf >/dev/null 2>&1 || {
apt-get update && apt-get install -y patchelf
}
python3 --version
- name: Set up venv and install package + build deps - name: Set up venv and install package + build deps
run: | run: |
python3 -m venv .venv python3.12 -m venv .venv 2>/dev/null || python3 -m venv .venv
. .venv/bin/activate . .venv/bin/activate
python -m pip install --upgrade pip python -m pip install --upgrade pip
python -m pip install -e ".[build]" python -m pip install -e ".[build]"
- name: Derive version from tag - name: Derive version from tag
id: version id: version
run: echo "tag=${GITHUB_REF_NAME}" >> "$GITHUB_OUTPUT" run: |
tag="${GITHUB_REF_NAME}"
deb_version="${tag#v}" # Remove leading 'v' if present
echo "tag=${tag}" >> "$GITHUB_OUTPUT"
echo "deb_version=${deb_version}" >> "$GITHUB_OUTPUT"
- name: Build standalone binary with Nuitka - name: Build standalone binary with Nuitka
run: | run: |
@@ -101,6 +104,33 @@ jobs:
- name: Smoke-test the binary - name: Smoke-test the binary
run: dist/tai --help run: dist/tai --help
- name: Build .deb package
run: |
pkg_root="pkgroot"
pkg_name="tai"
deb_version="${{ steps.version.outputs.deb_version }}"
arch="amd64"
out_dir="dist"
deb_dir="${pkg_root}/${pkg_name}_${deb_version}_${arch}"
rm -rf "${pkg_root}"
mkdir -p "${deb_dir}/DEBIAN"
mkdir -p "${deb_dir}/usr/bin"
install -m 0755 dist/tai "${deb_dir}/usr/bin/tai"
cat > "${deb_dir}/DEBIAN/control" <<EOF
Package: ${pkg_name}
Version: ${deb_version}
Section: admin
Priority: optional
Architecture: ${arch}
Maintainer: tai maintainers <noreply@example.com>
Description: tai Linux troubleshooting assistant
EOF
dpkg-deb --build "${deb_dir}" "${out_dir}/${pkg_name}_${deb_version}_${arch}.deb"
- name: Upload binary artifact - name: Upload binary artifact
uses: actions/upload-artifact@v3 uses: actions/upload-artifact@v3
with: with:
@@ -108,3 +138,11 @@ jobs:
path: dist/tai path: dist/tai
if-no-files-found: error if-no-files-found: error
retention-days: 90 retention-days: 90
- name: Upload deb artifact
uses: actions/upload-artifact@v3
with:
name: tai-deb-amd64-${{ steps.version.outputs.tag }}
path: dist/tai_${{ steps.version.outputs.deb_version }}_amd64.deb
if-no-files-found: error
retention-days: 90

3
.gitignore vendored
View File

@@ -24,3 +24,6 @@ htmlcov/
# IDE # IDE
.vscode/ .vscode/
# Logs and session files
logs/

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
from collections.abc import Iterator from collections.abc import Iterator
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any, cast
from openai import OpenAI from openai import OpenAI
@@ -88,6 +89,20 @@ class AIClient:
if delta: if delta:
yield delta yield delta
def stream_messages(self, messages: list[dict[str, str]]) -> Iterator[str]:
"""Stream a completion from an explicit chat history."""
stream = self._client.chat.completions.create(
model=self._config.model,
max_tokens=self._config.max_tokens,
stream=True,
messages=cast(Any, messages),
)
for chunk in cast(Iterator[Any], stream):
delta = chunk.choices[0].delta.content
if delta:
yield delta
def summary(self) -> str: def summary(self) -> str:
"""Human-readable description of the AI config.""" """Human-readable description of the AI config."""
return f"host={self._config.host} model={self._config.model}" return f"host={self._config.host} model={self._config.model}"

36
src/tai/ai_guardrails.py Normal file
View File

@@ -0,0 +1,36 @@
"""Heuristic checks for AI response quality and safety."""
from __future__ import annotations
import re
_RISKY_ACTION_PATTERNS = [
r"\bsystemctl\s+(restart|stop|start)\b",
r"\b(edit|modify|change)\s+/etc/",
r"\bpasswd\b",
r"\bapt\s+install\b",
r"\bdnf\s+install\b",
r"\byum\s+install\b",
]
def validate_ai_response(response: str) -> list[str]:
"""Return warning messages for potentially unsafe or weakly grounded output."""
warnings: list[str] = []
if "Evidence" not in response:
warnings.append("Response is missing an Evidence section.")
if "`" not in response:
warnings.append("Response does not include quoted evidence snippets.")
lower_response = response.lower()
for pattern in _RISKY_ACTION_PATTERNS:
if re.search(pattern, lower_response):
warnings.append(
"Response suggests potentially modifying actions; "
"prefer read-only verification unless remediation was explicitly requested."
)
break
return warnings

View File

@@ -8,14 +8,19 @@ from typing import Annotated
import typer import typer
from rich.console import Console from rich.console import Console
from rich.markdown import Markdown from rich.markdown import Markdown
from rich.panel import Panel
from rich.rule import Rule
from rich.text import Text
from tai.ai_client import DEFAULT_AI_HOST, DEFAULT_MODEL, AIClient, AIConfig from tai.ai_client import DEFAULT_AI_HOST, DEFAULT_MODEL, AIClient, AIConfig
from tai.ai_guardrails import validate_ai_response
from tai.collectors import CollectionReport, collect_from_plan from tai.collectors import CollectionReport, collect_from_plan
from tai.input_parser import InputValidationError, build_request from tai.input_parser import InputValidationError, build_request
from tai.models import TroubleshootRequest from tai.models import TroubleshootRequest
from tai.plan import plan_from_request from tai.plan import plan_from_request
from tai.prompt_builder import build_system_prompt, build_user_message from tai.prompt_builder import build_followup_message, build_system_prompt, build_user_message
from tai.ssh_client import SSHClient, SSHCommandResult, SSHConnectionConfig from tai.session_log import SessionLogger
from tai.ssh_client import SSHClient, SSHCommandResult, SSHConnectionConfig, SSHSession
app = typer.Typer(no_args_is_help=True, add_completion=False) app = typer.Typer(no_args_is_help=True, add_completion=False)
console = Console() console = Console()
@@ -66,6 +71,13 @@ def run(
help="Send collected diagnostics to AI for analysis.", help="Send collected diagnostics to AI for analysis.",
), ),
] = False, ] = False,
interactive: Annotated[
bool,
typer.Option(
"--interactive/--no-interactive",
help="Start interactive follow-up mode (/collect, /analyze, /quit).",
),
] = False,
ai_host: Annotated[ ai_host: Annotated[
str, str,
typer.Option("--ai-host", help="OpenAI-compatible AI backend URL."), typer.Option("--ai-host", help="OpenAI-compatible AI backend URL."),
@@ -78,6 +90,13 @@ def run(
str, str,
typer.Option("--ai-key", help="API key for the AI backend (not needed for Ollama)."), typer.Option("--ai-key", help="API key for the AI backend (not needed for Ollama)."),
] = "ollama", ] = "ollama",
log_file: Annotated[
str | None,
typer.Option(
"--log-file",
help="Optional JSONL file path to log AI and session output.",
),
] = None,
) -> None: ) -> None:
"""Start an interactive troubleshooting session scaffold.""" """Start an interactive troubleshooting session scaffold."""
try: try:
@@ -103,22 +122,34 @@ def run(
) )
summary = SSHClient(config).summary() summary = SSHClient(config).summary()
console.print("[bold green]tai[/bold green]") console.print(Rule("[bold green]tai[/bold green]", style="green"))
console.print(f"Issue: {req.issue}") console.print(f" [bold]Issue:[/bold] {req.issue}")
console.print(f"SSH: {summary}") console.print(f" [bold]SSH:[/bold] {summary}")
if req.target_paths: if req.target_paths:
console.print(f"Paths: {', '.join(str(p) for p in req.target_paths)}") console.print(f" [bold]Paths:[/bold] {', '.join(str(p) for p in req.target_paths)}")
console.print()
if not (probe or collect or analyze): if not (probe or collect or analyze or interactive):
return # nothing SSH-related requested return # nothing SSH-related requested
ai_config = AIConfig(host=ai_host, model=model, api_key=ai_key) ai_config = AIConfig(host=ai_host, model=model, api_key=ai_key)
if analyze: logger = SessionLogger.create(log_file) if log_file else None
if analyze or interactive:
console.print(f"[cyan]AI:[/cyan] {AIClient(ai_config).summary()}") console.print(f"[cyan]AI:[/cyan] {AIClient(ai_config).summary()}")
try: try:
asyncio.run(_async_main(config, req, probe=probe, collect=collect, analyze=analyze, asyncio.run(
ai_config=ai_config)) _async_main(
config,
req,
probe=probe,
collect=collect,
analyze=analyze,
interactive=interactive,
ai_config=ai_config,
logger=logger,
)
)
except typer.Exit: except typer.Exit:
raise raise
except TimeoutError as exc: except TimeoutError as exc:
@@ -136,14 +167,38 @@ async def _async_main(
probe: bool, probe: bool,
collect: bool, collect: bool,
analyze: bool, analyze: bool,
interactive: bool,
ai_config: AIConfig, ai_config: AIConfig,
logger: SessionLogger | None,
) -> None: ) -> None:
"""Open a single SSH session and run probe / collection / analysis through it.""" """Open a single SSH session and run probe / collection / analysis through it."""
client = SSHClient(config) client = SSHClient(config)
if logger is not None:
logger.log_event(
"session_start",
{
"host": req.host,
"port": req.port,
"issue": req.issue,
"probe": probe,
"collect": collect,
"analyze": analyze,
"interactive": interactive,
},
)
async with client.connect() as session: async with client.connect() as session:
if probe: if probe:
result = await session.probe() result = await session.probe()
_handle_probe_result(result) _handle_probe_result(result)
if logger is not None:
logger.log_event(
"probe_result",
{
"exit_code": result.exit_code,
"stdout": result.stdout,
"stderr": result.stderr,
},
)
report: CollectionReport | None = None report: CollectionReport | None = None
if collect or analyze: if collect or analyze:
@@ -151,38 +206,177 @@ async def _async_main(
console.print(f"[cyan]Collecting diagnostics:[/cyan] {len(plan)} commands") console.print(f"[cyan]Collecting diagnostics:[/cyan] {len(plan)} commands")
report = await collect_from_plan(session, plan) report = await collect_from_plan(session, plan)
_handle_collection_report(report) _handle_collection_report(report)
if logger is not None:
logger.log_event(
"collection_summary",
{
"total": report.total,
"failed": report.failed,
},
)
if analyze and report is not None: if analyze and report is not None:
_run_analysis(ai_config, req.issue, report) _run_analysis(ai_config, req.issue, report, logger=logger)
if interactive:
await _interactive_loop(session, req, ai_config, report, logger=logger)
async def _interactive_loop(
session: SSHSession,
req: TroubleshootRequest,
ai_config: AIConfig,
report: CollectionReport | None,
logger: SessionLogger | None,
) -> None:
"""Run a follow-up loop for collecting and conversational analysis."""
console.print(
Panel(
"Ask questions directly, or use [bold]/collect[/bold], "
"[bold]/analyze[/bold], [bold]/help[/bold], [bold]/quit[/bold]",
title="[bold cyan]Interactive Mode[/bold cyan]",
border_style="cyan",
padding=(0, 1),
)
)
prior_questions: list[str] = []
while True:
try:
command = console.input("\n[bold cyan]tai[/bold cyan][dim] >[/dim] ").strip()
except (EOFError, KeyboardInterrupt):
console.print("\n[yellow]Exiting interactive mode.[/yellow]")
if logger is not None:
logger.log_event("interactive_exit", {"reason": "signal_or_eof"})
return
if not command:
continue
if command in {"/quit", "/exit"}:
console.print("[green]Bye.[/green]")
if logger is not None:
logger.log_event("interactive_exit", {"reason": "user_quit"})
return
if command == "/help":
console.print(
Panel(
"[bold]/collect[/bold] — re-run diagnostics\n"
"[bold]/analyze[/bold] — re-analyze current diagnostics\n"
"[bold]/help[/bold] — show this message\n"
"[bold]/quit[/bold] — end session\n"
"[dim]Anything else is sent directly to the AI as a question.[/dim]",
title="[bold]Commands[/bold]",
border_style="dim",
padding=(0, 1),
)
)
continue
if command == "/collect":
plan = plan_from_request(req)
console.print(f"[cyan]Collecting diagnostics:[/cyan] {len(plan)} commands")
report = await collect_from_plan(session, plan)
_handle_collection_report(report)
if logger is not None:
logger.log_event(
"collection_summary",
{
"total": report.total,
"failed": report.failed,
},
)
continue
if command == "/analyze":
if report is None:
plan = plan_from_request(req)
console.print(f"[cyan]Collecting diagnostics:[/cyan] {len(plan)} commands")
report = await collect_from_plan(session, plan)
_handle_collection_report(report)
if report is None:
console.print("[red]No diagnostics available to analyze.[/red]")
continue
_run_followup_analysis(
ai_config,
req.issue,
report,
"Provide an updated diagnosis from the current diagnostics.",
prior_questions,
logger=logger,
)
prior_questions.append("/analyze")
if logger is not None:
logger.log_event("interactive_followup", {"question": "/analyze"})
continue
if report is None:
plan = plan_from_request(req)
console.print(f"[cyan]Collecting diagnostics:[/cyan] {len(plan)} commands")
report = await collect_from_plan(session, plan)
_handle_collection_report(report)
if report is None:
console.print("[red]No diagnostics available to analyze.[/red]")
continue
_run_followup_analysis(
ai_config,
req.issue,
report,
command,
prior_questions,
logger=logger,
)
prior_questions.append(command)
if logger is not None:
logger.log_event("interactive_followup", {"question": command})
def _handle_probe_result(result: SSHCommandResult) -> None: def _handle_probe_result(result: SSHCommandResult) -> None:
"""Handle and render probe output for success or failure.""" """Handle and render probe output for success or failure."""
console.print("[cyan]Running SSH probe:[/cyan] uname -a") console.print("[dim]▶ SSH probe:[/dim] uname -a")
if result.exit_code != 0: if result.exit_code != 0:
details = result.stderr or result.stdout or "no error output from ssh" details = result.stderr or result.stdout or "no error output from ssh"
console.print(f"[red]Probe failed (exit {result.exit_code}):[/red] {details}") console.print(f"[bold red]Probe failed[/bold red] (exit {result.exit_code}): {details}")
raise typer.Exit(code=1) raise typer.Exit(code=1)
output = result.stdout or "(no output)" output = result.stdout or "(no output)"
console.print("[bold green]Probe succeeded.[/bold green]") console.print("[bold green]Probe succeeded.[/bold green]")
console.print(f"Remote: {output}") console.print(f" [dim]{output}[/dim]")
def _handle_collection_report(report: CollectionReport) -> None: def _handle_collection_report(report: CollectionReport) -> None:
"""Render collected command status and truncation hints.""" """Render collected command status and truncation hints."""
console.print( failed_label = (
f"[bold]Collection complete:[/bold] {report.total} commands, {report.failed} failed" f"[red]{report.failed} failed[/red]" if report.failed else "[green]0 failed[/green]"
) )
console.print(f"[bold]Collection complete:[/bold] {report.total} commands, {failed_label}")
for item in report.items: for item in report.items:
status = "ok" if item.result.exit_code == 0 else f"exit {item.result.exit_code}"
truncated = item.result.stdout_truncated or item.result.stderr_truncated truncated = item.result.stdout_truncated or item.result.stderr_truncated
trunc = " (truncated)" if truncated else "" trunc_label = " [dim](truncated)[/dim]" if truncated else ""
console.print(f"- {item.name}: {status}{trunc}") if item.result.exit_code == 0:
console.print(f" [green]✓[/green] [dim]{item.name}[/dim]{trunc_label}")
else:
console.print(
f" [red]✗[/red] {item.name} "
f"[red](exit {item.result.exit_code})[/red]{trunc_label}"
)
def _run_analysis(ai_config: AIConfig, issue: str, report: CollectionReport) -> None: def _run_analysis(
ai_config: AIConfig,
issue: str,
report: CollectionReport,
*,
logger: SessionLogger | None,
) -> None:
"""Send collected data to the AI and stream the analysis to stdout.""" """Send collected data to the AI and stream the analysis to stdout."""
console.print("[cyan]Analyzing...[/cyan]\n") console.print()
console.print(Rule("[bold cyan]Analysis[/bold cyan]", style="cyan"))
console.print()
ai = AIClient(ai_config) ai = AIClient(ai_config)
system_prompt = build_system_prompt() system_prompt = build_system_prompt()
user_message = build_user_message(issue, report) user_message = build_user_message(issue, report)
@@ -190,9 +384,78 @@ def _run_analysis(ai_config: AIConfig, issue: str, report: CollectionReport) ->
chunks: list[str] = [] chunks: list[str] = []
for chunk in ai.stream(system_prompt, user_message): for chunk in ai.stream(system_prompt, user_message):
chunks.append(chunk) chunks.append(chunk)
console.print(Markdown("".join(chunks))) response = "".join(chunks)
console.print(Markdown(response))
warnings = validate_ai_response(response)
for item in warnings:
warn_text = Text()
warn_text.append("⚠ Guardrail: ", style="bold yellow")
warn_text.append(item, style="yellow")
console.print(warn_text)
if logger is not None:
logger.log_event(
"analysis_response",
{
"issue": issue,
"response": response,
"guardrail_warnings": warnings,
},
)
except Exception as exc: # noqa: BLE001 except Exception as exc: # noqa: BLE001
console.print(f"[red]AI analysis failed:[/red] {exc}") console.print(f"[red]AI analysis failed:[/red] {exc}")
if logger is not None:
logger.log_event("analysis_error", {"error": str(exc)})
raise typer.Exit(code=1) from exc
def _run_followup_analysis(
ai_config: AIConfig,
issue: str,
report: CollectionReport,
question: str,
prior_questions: list[str],
*,
logger: SessionLogger | None,
) -> str:
"""Run grounded follow-up analysis re-anchored to current diagnostics."""
console.print()
console.print(Rule("[bold cyan]AI Response[/bold cyan]", style="cyan"))
console.print()
ai = AIClient(ai_config)
system_prompt = build_system_prompt()
user_message = build_followup_message(issue, report, question, prior_questions)
try:
chunks: list[str] = []
for chunk in ai.stream(system_prompt, user_message):
chunks.append(chunk)
response = "".join(chunks)
console.print(Markdown(response))
console.print(Rule(style="dim"))
warnings = validate_ai_response(response)
for item in warnings:
warn_text = Text()
warn_text.append("⚠ Guardrail: ", style="bold yellow")
warn_text.append(item, style="yellow")
console.print(warn_text)
if logger is not None:
logger.log_event(
"analysis_response",
{
"last_user_message": question,
"response": response,
"guardrail_warnings": warnings,
},
)
return response
except Exception as exc: # noqa: BLE001
console.print(f"[red]AI analysis failed:[/red] {exc}")
if logger is not None:
logger.log_event("analysis_error", {"error": str(exc), "question": question})
raise typer.Exit(code=1) from exc raise typer.Exit(code=1) from exc

View File

@@ -15,12 +15,15 @@ Your job:
Important rules: Important rules:
- Only draw conclusions from data that is actually present. Do not speculate or invent evidence. - Only draw conclusions from data that is actually present. Do not speculate or invent evidence.
- For every root-cause claim, quote at least one exact snippet from collected output in backticks.
- If a command shows "could not be executed (SSH error)" it means the remote host blocked or - If a command shows "could not be executed (SSH error)" it means the remote host blocked or
rejected that specific command — it is not evidence about the service or system state. rejected that specific command — it is not evidence about the service or system state.
- If there is not enough data to diagnose the issue, say so plainly and list exactly what - If there is not enough data to diagnose the issue, say so plainly and list exactly what
additional commands or log files would be needed. additional commands or log files would be needed.
- Keep the response short. Skip sections that have nothing useful to say. - Keep the response short. Skip sections that have nothing useful to say.
- Never suggest commands that modify the system unless explicitly asked. - Never suggest commands that modify the system unless explicitly asked.
- Default to read-only verification steps. Do not suggest restarting services or editing configs
unless the user explicitly asks for remediation actions.
- Format with clear sections: **Root Cause**, **Evidence**, **Recommended Actions**. - Format with clear sections: **Root Cause**, **Evidence**, **Recommended Actions**.
""" """
@@ -72,3 +75,27 @@ def build_user_message(issue: str, report: CollectionReport) -> str:
) )
return "\n".join(lines) return "\n".join(lines)
def build_followup_message(
issue: str,
report: CollectionReport,
question: str,
prior_questions: list[str],
) -> str:
"""Build a grounded follow-up message that re-anchors to diagnostics each turn."""
base = build_user_message(issue, report)
lines: list[str] = [base, "## Follow-up"]
if prior_questions:
lines.append("\nRecent user follow-up questions:")
for idx, item in enumerate(prior_questions[-5:], start=1):
lines.append(f"{idx}. {item}")
lines.append("\nCurrent follow-up question:")
lines.append(question)
lines.append(
"\nAnswer strictly from the collected diagnostics above. "
"If evidence is insufficient, explicitly say so."
)
return "\n".join(lines)

34
src/tai/session_log.py Normal file
View File

@@ -0,0 +1,34 @@
"""Structured session logging helpers for troubleshooting runs."""
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
@dataclass(slots=True)
class SessionLogger:
"""Append JSONL events to a log file for post-run analysis."""
path: Path
@classmethod
def create(cls, file_path: str) -> SessionLogger:
"""Create a logger for *file_path*, ensuring parent directories exist."""
path = Path(file_path).expanduser()
path.parent.mkdir(parents=True, exist_ok=True)
return cls(path=path)
def log_event(self, event: str, payload: dict[str, Any]) -> None:
"""Write one timestamped event row to the JSONL log."""
row = {
"ts": datetime.now(UTC).isoformat(),
"event": event,
"payload": payload,
}
with self.path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(row, ensure_ascii=True))
handle.write("\n")

View File

@@ -4,7 +4,7 @@ from unittest.mock import MagicMock, patch
from tai.ai_client import DEFAULT_AI_HOST, DEFAULT_MODEL, AIClient, AIConfig from tai.ai_client import DEFAULT_AI_HOST, DEFAULT_MODEL, AIClient, AIConfig
from tai.collectors import CollectedItem, CollectionReport from tai.collectors import CollectedItem, CollectionReport
from tai.prompt_builder import build_system_prompt, build_user_message from tai.prompt_builder import build_followup_message, build_system_prompt, build_user_message
from tai.ssh_client import SSHCommandResult from tai.ssh_client import SSHCommandResult
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -116,6 +116,34 @@ def test_stream_yields_chunks() -> None:
assert result == ["Root ", "cause ", "found."] assert result == ["Root ", "cause ", "found."]
def test_stream_messages_yields_chunks() -> None:
config = AIConfig()
client = AIClient(config)
def _make_chunk(text: str | None) -> MagicMock:
delta = MagicMock()
delta.content = text
choice = MagicMock()
choice.delta = delta
chunk = MagicMock()
chunk.choices = [choice]
return chunk
mock_chunks = [_make_chunk("A"), _make_chunk(None), _make_chunk("B")]
with patch.object(client._client.chat.completions, "create", return_value=iter(mock_chunks)):
result = list(
client.stream_messages(
[
{"role": "system", "content": "sys"},
{"role": "user", "content": "question"},
]
)
)
assert result == ["A", "B"]
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# prompt_builder # prompt_builder
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -190,3 +218,16 @@ def test_build_user_message_handles_no_output() -> None:
report = _make_report([("empty", "cat /nonexistent", 1, "", "")]) report = _make_report([("empty", "cat /nonexistent", 1, "", "")])
msg = build_user_message("test", report) msg = build_user_message("test", report)
assert "no output" in msg assert "no output" in msg
def test_build_followup_message_includes_question_context() -> None:
report = _make_report([("kernel", "uname -a", 0, "Linux web01", "")])
msg = build_followup_message(
"nginx is failing",
report,
"what should I check next?",
["is nginx running?", "show me logs"],
)
assert "Current follow-up question" in msg
assert "what should I check next?" in msg
assert "Recent user follow-up questions" in msg

View File

@@ -0,0 +1,24 @@
"""Tests for AI response guardrails."""
from tai.ai_guardrails import validate_ai_response
def test_validate_ai_response_flags_missing_evidence_and_quotes() -> None:
warnings = validate_ai_response("Root cause only, no structure.")
assert any("Evidence section" in item for item in warnings)
assert any("quoted evidence" in item for item in warnings)
def test_validate_ai_response_flags_risky_actions() -> None:
text = "Evidence: `PasswordAuthentication no`\nRun systemctl restart sshd now."
warnings = validate_ai_response(text)
assert any("modifying actions" in item for item in warnings)
def test_validate_ai_response_allows_grounded_read_only_answer() -> None:
text = (
"Evidence: `PasswordAuthentication no`\n"
"Recommended Actions: run `journalctl -u sshd -n 200 --no-pager`"
)
warnings = validate_ai_response(text)
assert not warnings

View File

@@ -137,5 +137,96 @@ def test_collect_success_prints_summary(monkeypatch) -> None: # type: ignore[no
assert result.exit_code == 0 assert result.exit_code == 0
assert "Collection complete" in result.stdout assert "Collection complete" in result.stdout
assert "kernel: ok" in result.stdout assert "kernel" in result.stdout
assert "journal: ok (truncated)" in result.stdout assert "journal" in result.stdout
assert "truncated" in result.stdout
def test_interactive_collect_then_quit(monkeypatch) -> None: # type: ignore[no-untyped-def]
_mock_session(monkeypatch)
async def fake_collect_from_plan(_session, _plan) -> CollectionReport: # type: ignore[no-untyped-def]
return CollectionReport(
host="ssh.archflux.net",
items=[
CollectedItem(
name="kernel",
result=SSHCommandResult(
command="uname -a",
exit_code=0,
stdout="Linux test",
stderr="",
),
),
],
)
commands = iter(["/collect", "/quit"])
monkeypatch.setattr("tai.cli.collect_from_plan", fake_collect_from_plan)
monkeypatch.setattr("tai.cli.console.input", lambda _prompt: next(commands))
runner = CliRunner()
result = runner.invoke(
app,
[
"apache failed",
"--host",
"ssh.archflux.net",
"--port",
"5566",
"--no-probe",
"--interactive",
],
)
assert result.exit_code == 0
assert "ask questions directly" in result.stdout.lower()
assert "Collection complete" in result.stdout
assert "Bye." in result.stdout
def test_interactive_unknown_command_prints_hint(monkeypatch) -> None: # type: ignore[no-untyped-def]
_mock_session(monkeypatch)
async def fake_collect_from_plan(_session, _plan) -> CollectionReport: # type: ignore[no-untyped-def]
return CollectionReport(
host="ssh.archflux.net",
items=[
CollectedItem(
name="kernel",
result=SSHCommandResult(
command="uname -a",
exit_code=0,
stdout="Linux test",
stderr="",
),
),
],
)
commands = iter(["what should I check next?", "/quit"])
monkeypatch.setattr("tai.cli.collect_from_plan", fake_collect_from_plan)
monkeypatch.setattr(
"tai.cli.AIClient.stream",
lambda *_args, **_kwargs: iter(["Check logs."]),
)
monkeypatch.setattr("tai.cli.console.input", lambda _prompt: next(commands))
runner = CliRunner()
result = runner.invoke(
app,
[
"apache failed",
"--host",
"ssh.archflux.net",
"--port",
"5566",
"--no-probe",
"--interactive",
],
)
assert result.exit_code == 0
assert "AI Response" in result.stdout
assert "Check logs." in result.stdout

22
tests/test_session_log.py Normal file
View File

@@ -0,0 +1,22 @@
"""Tests for structured session logging."""
from __future__ import annotations
import json
from tai.session_log import SessionLogger
def test_session_logger_writes_jsonl_row(tmp_path) -> None: # type: ignore[no-untyped-def]
log_path = tmp_path / "logs" / "session.jsonl"
logger = SessionLogger.create(str(log_path))
logger.log_event("analysis_response", {"response": "Root cause is X"})
lines = log_path.read_text(encoding="utf-8").splitlines()
assert len(lines) == 1
row = json.loads(lines[0])
assert row["event"] == "analysis_response"
assert row["payload"]["response"] == "Root cause is X"
assert "ts" in row