20 Commits

Author SHA1 Message Date
e49670a664 docs(roadmap): add Phase 6 RAG & Knowledge Layer plan
Some checks failed
CI / test (push) Failing after 15s
- Three-tier RAG architecture: diagnostic chunks, runbook KB, session memory
- Technology decisions table with options and recommendations
- Per-tier: approach, new modules, changes to existing code, companion features
- Implementation order and effort estimates
- New dependencies and optional pyproject.toml group
- Decisions log entries for RAG choices pending confirmation
2026-05-04 18:23:33 +02:00
4870bd3bfe ci: rename release.yml to tag.yml, fix trigger to match non-v tags
All checks were successful
CI / test (push) Successful in 20s
Tag Build / build (push) Successful in 8m33s
- Trigger was 'v*' but tags are bare semver (0.3.0) — fix to '[0-9]*'
- Rename to tag.yml to reflect tag-driven build purpose
- Add zip to apt dependencies (required for release zip step)
2026-05-04 06:48:34 +02:00
5798d87993 Merge branch 'feature/interactive-ux-improvements'
All checks were successful
CI / test (push) Successful in 20s
2026-05-04 06:43:33 +02:00
2c738579bd feat(ux): improve interactive mode readability and input visibility
All checks were successful
CI / test (push) Successful in 19s
- Replace plain 'tai>' prompt with styled console.input() bold cyan prompt
- Wrap interactive mode entry in a Rich Panel with border
- Frame each AI response with Rule dividers (──── AI Response ────)
- Style guardrail warnings with ⚠ prefix and bold yellow
- Improve /help output with formatted Panel showing all commands
- Style collection report: ✓/✗ per item with color, truncation in dim
- Style probe output: ✓/✗ with green/red, host info in dim
- Add Rule header divider on session start
2026-05-04 06:37:50 +02:00
27feeed8bf feat: add combined release zip with binary and deb package
All checks were successful
CI / test (push) Successful in 20s
2026-05-04 06:24:19 +02:00
96178c1438 chore: remove logs from tracking, add requirements.txt, improve .gitignore
All checks were successful
CI / test (push) Successful in 20s
2026-05-04 06:21:40 +02:00
021e95b04f test
All checks were successful
CI / test (push) Successful in 19s
2026-05-04 06:16:30 +02:00
6aa59bdd6b fix: strip v prefix from tag when generating deb version
All checks were successful
CI / test (push) Successful in 20s
2026-05-04 06:13:53 +02:00
530be62185 feat(cli): add response guardrails and grounded followup re-anchoring 2026-05-04 06:11:55 +02:00
2662d1b253 feat(cli): add structured JSONL session logging for AI output 2026-05-04 06:03:39 +02:00
fdcde37e46 feat(cli): support conversational AI follow-ups in interactive mode 2026-05-04 05:58:26 +02:00
67a0cb3e69 feat(cli): add interactive follow-up loop with slash commands 2026-05-04 05:54:15 +02:00
d092b508c3 chore: set deb version equal to tag
Some checks failed
Release / build (push) Failing after 8m20s
CI / test (push) Successful in 19s
2026-05-04 05:48:46 +02:00
7e1cac8bd1 feat: build and upload deb package in release workflow
Some checks failed
CI / test (push) Successful in 20s
Release / build (push) Has been cancelled
2026-05-04 05:48:10 +02:00
5fea8fe096 fix: align release Python setup with CI fallback logic
All checks were successful
CI / test (push) Successful in 19s
Release / build (push) Successful in 8m22s
2026-05-04 05:26:14 +02:00
05adbf7cc9 run
All checks were successful
CI / test (push) Successful in 19s
2026-05-04 05:24:14 +02:00
69d2bdd661 fix: use python3.12 explicitly for venv on Ubuntu Noble runner
Some checks failed
CI / test (push) Successful in 19s
Release / build (push) Failing after 3s
2026-05-04 05:19:43 +02:00
f88048762e fix: remove python3.11-venv, runner uses python3.12 on Ubuntu Noble
All checks were successful
CI / test (push) Successful in 19s
2026-05-04 05:17:37 +02:00
60f42c7754 fix: install python3.11-venv explicitly in release workflow
All checks were successful
CI / test (push) Successful in 19s
2026-05-04 05:13:18 +02:00
33dff26d2b fix: always install python3-venv in release workflow
All checks were successful
CI / test (push) Successful in 19s
2026-05-04 05:07:39 +02:00
15 changed files with 936 additions and 144 deletions

View File

@@ -59,13 +59,9 @@ jobs:
- name: Ensure Python and pip are available - name: Ensure Python and pip are available
run: | run: |
if command -v python3 >/dev/null 2>&1 && python3 -m pip --version >/dev/null 2>&1; then
python3 --version
exit 0
fi
if command -v apt-get >/dev/null 2>&1; then if command -v apt-get >/dev/null 2>&1; then
apt-get update apt-get update
apt-get install -y python3.12 python3.12-venv python3-pip || \
apt-get install -y python3 python3-pip python3-venv apt-get install -y python3 python3-pip python3-venv
elif command -v dnf >/dev/null 2>&1; then elif command -v dnf >/dev/null 2>&1; then
dnf install -y python3 python3-pip dnf install -y python3 python3-pip
@@ -76,11 +72,11 @@ jobs:
exit 1 exit 1
fi fi
python3 --version python3.12 --version || python3 --version
- name: Install package and dev dependencies - name: Install package and dev dependencies
run: | run: |
python3 -m venv .venv python3.12 -m venv .venv 2>/dev/null || python3 -m venv .venv
. .venv/bin/activate . .venv/bin/activate
python -m pip install --upgrade pip python -m pip install --upgrade pip
python -m pip install -e .[dev] python -m pip install -e .[dev]

View File

@@ -1,110 +0,0 @@
name: Release
on:
push:
tags:
- "v*"
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Ensure git is available
run: |
if command -v git >/dev/null 2>&1; then
git --version
exit 0
fi
if command -v apt-get >/dev/null 2>&1; then
apt-get update
apt-get install -y git
elif command -v dnf >/dev/null 2>&1; then
dnf install -y git
elif command -v yum >/dev/null 2>&1; then
yum install -y git
else
echo "No supported package manager found to install git."
exit 1
fi
- name: Checkout source (native git)
env:
CI_GIT_TOKEN: ${{ secrets.CI_GIT_TOKEN }}
run: |
if [ -z "${CI_GIT_TOKEN:-}" ]; then
echo "Missing secret CI_GIT_TOKEN. Add it in repository Actions secrets."
exit 1
fi
auth_server="${GITHUB_SERVER_URL#https://}"
auth_server="${auth_server#http://}"
remote_url="https://oauth2:${CI_GIT_TOKEN}@${auth_server}/${GITHUB_REPOSITORY}.git"
if [ -n "${GITHUB_WORKSPACE:-}" ]; then
cd "$GITHUB_WORKSPACE"
fi
if [ ! -d .git ]; then
git init
fi
git remote remove origin >/dev/null 2>&1 || true
git remote add origin "$remote_url"
# Fetch the tag by SHA so we get the exact tagged commit
git fetch --depth 1 origin "$GITHUB_SHA"
git checkout --force FETCH_HEAD
- name: Ensure Python and build dependencies are available
run: |
if ! command -v python3 >/dev/null 2>&1; then
if command -v apt-get >/dev/null 2>&1; then
apt-get update
apt-get install -y python3 python3-pip python3-venv patchelf ccache
elif command -v dnf >/dev/null 2>&1; then
dnf install -y python3 python3-pip patchelf ccache
fi
fi
# patchelf is required by Nuitka for standalone Linux binaries
command -v patchelf >/dev/null 2>&1 || {
apt-get update && apt-get install -y patchelf
}
python3 --version
- name: Set up venv and install package + build deps
run: |
python3 -m venv .venv
. .venv/bin/activate
python -m pip install --upgrade pip
python -m pip install -e ".[build]"
- name: Derive version from tag
id: version
run: echo "tag=${GITHUB_REF_NAME}" >> "$GITHUB_OUTPUT"
- name: Build standalone binary with Nuitka
run: |
. .venv/bin/activate
python -m nuitka \
--standalone \
--onefile \
--output-filename=tai \
--output-dir=dist \
--assume-yes-for-downloads \
--include-package=tai \
src/tai/cli.py
- name: Smoke-test the binary
run: dist/tai --help
- name: Upload binary artifact
uses: actions/upload-artifact@v3
with:
name: tai-linux-amd64-${{ steps.version.outputs.tag }}
path: dist/tai
if-no-files-found: error
retention-days: 90

166
.gitea/workflows/tag.yml Normal file
View File

@@ -0,0 +1,166 @@
name: Tag Build
on:
push:
tags:
- "[0-9]*"
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Ensure git is available
run: |
if command -v git >/dev/null 2>&1; then
git --version
exit 0
fi
if command -v apt-get >/dev/null 2>&1; then
apt-get update
apt-get install -y git
elif command -v dnf >/dev/null 2>&1; then
dnf install -y git
elif command -v yum >/dev/null 2>&1; then
yum install -y git
else
echo "No supported package manager found to install git."
exit 1
fi
- name: Checkout source (native git)
env:
CI_GIT_TOKEN: ${{ secrets.CI_GIT_TOKEN }}
run: |
if [ -z "${CI_GIT_TOKEN:-}" ]; then
echo "Missing secret CI_GIT_TOKEN. Add it in repository Actions secrets."
exit 1
fi
auth_server="${GITHUB_SERVER_URL#https://}"
auth_server="${auth_server#http://}"
remote_url="https://oauth2:${CI_GIT_TOKEN}@${auth_server}/${GITHUB_REPOSITORY}.git"
if [ -n "${GITHUB_WORKSPACE:-}" ]; then
cd "$GITHUB_WORKSPACE"
fi
if [ ! -d .git ]; then
git init
fi
git remote remove origin >/dev/null 2>&1 || true
git remote add origin "$remote_url"
# Fetch the tag by SHA so we get the exact tagged commit
git fetch --depth 1 origin "$GITHUB_SHA"
git checkout --force FETCH_HEAD
- name: Ensure Python and build dependencies are available
run: |
if command -v apt-get >/dev/null 2>&1; then
apt-get update
apt-get install -y python3.12 python3.12-venv python3-pip patchelf ccache zip || \
apt-get install -y python3 python3-pip python3-venv patchelf ccache zip
elif command -v dnf >/dev/null 2>&1; then
dnf install -y python3 python3-pip python3-devel patchelf ccache
elif command -v yum >/dev/null 2>&1; then
yum install -y python3 python3-pip python3-devel patchelf ccache
else
echo "No supported package manager found to install Python/build deps."
exit 1
fi
python3.12 --version || python3 --version
- name: Set up venv and install package + build deps
run: |
python3.12 -m venv .venv 2>/dev/null || python3 -m venv .venv
. .venv/bin/activate
python -m pip install --upgrade pip
python -m pip install -e ".[build]"
- name: Derive version from tag
id: version
run: |
tag="${GITHUB_REF_NAME}"
deb_version="${tag#v}" # Remove leading 'v' if present
echo "tag=${tag}" >> "$GITHUB_OUTPUT"
echo "deb_version=${deb_version}" >> "$GITHUB_OUTPUT"
- name: Build standalone binary with Nuitka
run: |
. .venv/bin/activate
python -m nuitka \
--standalone \
--onefile \
--output-filename=tai \
--output-dir=dist \
--assume-yes-for-downloads \
--include-package=tai \
src/tai/cli.py
- name: Smoke-test the binary
run: dist/tai --help
- name: Build .deb package
run: |
pkg_root="pkgroot"
pkg_name="tai"
deb_version="${{ steps.version.outputs.deb_version }}"
arch="amd64"
out_dir="dist"
deb_dir="${pkg_root}/${pkg_name}_${deb_version}_${arch}"
rm -rf "${pkg_root}"
mkdir -p "${deb_dir}/DEBIAN"
mkdir -p "${deb_dir}/usr/bin"
install -m 0755 dist/tai "${deb_dir}/usr/bin/tai"
cat > "${deb_dir}/DEBIAN/control" <<EOF
Package: ${pkg_name}
Version: ${deb_version}
Section: admin
Priority: optional
Architecture: ${arch}
Maintainer: tai maintainers <noreply@example.com>
Description: tai Linux troubleshooting assistant
EOF
dpkg-deb --build "${deb_dir}" "${out_dir}/${pkg_name}_${deb_version}_${arch}.deb"
- name: Create release zip with binary and deb
run: |
cd dist
deb_version="${{ steps.version.outputs.deb_version }}"
zip_name="tai-${deb_version}-linux-amd64.zip"
zip "${zip_name}" \
tai \
"tai_${deb_version}_amd64.deb"
cd ..
- name: Upload binary artifact
uses: actions/upload-artifact@v3
with:
name: tai-linux-amd64-${{ steps.version.outputs.tag }}
path: dist/tai
if-no-files-found: error
retention-days: 90
- name: Upload deb artifact
uses: actions/upload-artifact@v3
with:
name: tai-deb-amd64-${{ steps.version.outputs.tag }}
path: dist/tai_${{ steps.version.outputs.deb_version }}_amd64.deb
if-no-files-found: error
retention-days: 90
- name: Upload combined release zip
uses: actions/upload-artifact@v3
with:
name: tai-release-${{ steps.version.outputs.tag }}
path: dist/tai-${{ steps.version.outputs.deb_version }}-linux-amd64.zip
if-no-files-found: error
retention-days: 90

3
.gitignore vendored
View File

@@ -24,3 +24,6 @@ htmlcov/
# IDE # IDE
.vscode/ .vscode/
# Logs and session files
logs/

View File

@@ -117,6 +117,170 @@ Prepare for broader use.
______________________________________________________________________ ______________________________________________________________________
## Phase 6 — RAG & Knowledge Layer
Introduce Retrieval-Augmented Generation to ground AI responses in evidence rather than
model weights alone. Three tiers of increasing capability, each buildable independently.
### Goals
- Eliminate prompt flooding on hosts with large log output
- Ground recommendations in version-controlled runbooks, not model improvisation
- Build compounding institutional memory from past troubleshooting sessions
- Keep all data local — no embeddings or session content leaves the network
---
### Technology Decisions Required
| Decision | Options | Recommendation | Status |
|---|---|---|---|
| Embedding model | `nomic-embed-text`, `mxbai-embed-large`, `all-minilm` | `nomic-embed-text` via Ollama (local, 274MB, strong perf) | ⬜ Pending |
| Vector store — Tier 1 | In-memory numpy cosine, `faiss-cpu` | numpy (zero deps) for session scope | ⬜ Pending |
| Vector store — Tier 2/3 | `chromadb`, `qdrant`, `weaviate`, `pgvector` | `chromadb` (embedded mode, no server needed) or `qdrant` (self-hosted, REST API, production-grade) | ⬜ Pending |
| Chunking strategy | Fixed token, sentence-aware, command-boundary | Command-boundary splitting (natural unit for diagnostics) | ⬜ Pending |
| Hybrid retrieval | Semantic only, BM25 only, hybrid | Hybrid (BM25 keyword + cosine semantic) for best recall | ⬜ Pending |
| Reranking | None, cross-encoder (`ms-marco-MiniLM`), LLM-as-judge | Cross-encoder rerank pass before prompt injection | ⬜ Pending |
| Runbook format | Markdown, YAML, JSON | Markdown (human-editable, version-controllable) | ⬜ Pending |
| Session index storage | Local `~/.tai/`, configurable path | `~/.tai/sessions/` with ChromaDB collection | ⬜ Pending |
---
### Tier 1 — Diagnostic Chunk Retrieval (in-memory, per-session)
**Problem:** Current flow injects all collected output into the prompt as one block.
On busy hosts this floods the context window with irrelevant output, degrading quality.
**Approach:**
- After collection, split each command's output into overlapping token chunks (e.g. 512 tokens, 64 overlap)
- Embed all chunks using `nomic-embed-text` via Ollama embeddings API
- On each question (initial + follow-up), embed the question and retrieve top-k chunks by cosine similarity
- Inject only retrieved chunks into the prompt, not the full dump
**New module:** `src/tai/rag_retriever.py`
- `chunk_report(report) -> list[Chunk]`
- `embed_chunks(chunks) -> list[EmbeddedChunk]`
- `retrieve(question, embedded_chunks, top_k) -> list[Chunk]`
**Changes to existing code:**
- `prompt_builder.py`: accept `retrieved_chunks` instead of full `CollectionReport` for RAG-mode prompts
- `cli.py`: embed report after collection, pass retriever to `_run_analysis` and `_run_followup_analysis`
- `ai_client.py`: add `embed(text) -> list[float]` method using Ollama `/api/embeddings`
**Companion features buildable at same time:**
- `--no-rag` flag to bypass retrieval and use full dump (backwards compat)
- Token budget display: show user how many tokens are being sent vs. saved
- Per-chunk source attribution in AI response (which command produced the evidence)
**Tests:**
- `tests/test_rag_retriever.py`: chunk splitting, cosine similarity ranking, top-k retrieval
- `tests/test_ai.py`: add `test_embed_returns_float_list()`
---
### Tier 2 — Runbook Knowledge Base (persistent, ChromaDB)
**Problem:** AI improvises remediation steps from training data, which may be wrong for
specific environments, distros, or internal conventions.
**Approach:**
- Maintain a version-controlled corpus of Markdown runbooks in `runbooks/` directory
- On first run (or `tai runbooks --sync`), embed all runbooks and persist to ChromaDB collection
- On each analysis, retrieve top-3 relevant runbook chunks alongside diagnostic chunks
- Inject as a separate `## Runbook Context` section in the prompt
**New module:** `src/tai/runbook_store.py`
- `RunbookStore`: wraps ChromaDB collection
- `sync(runbooks_dir) -> int` — embed and upsert all runbooks
- `query(question, top_k) -> list[RunbookChunk]`
**New directory:** `runbooks/`
- `ssh.md`, `nginx.md`, `postgres.md`, `disk.md`, `kernel.md`, etc.
- Each runbook: YAML frontmatter (`service`, `symptoms`, `tags`) + Markdown body
**New CLI command:** `tai runbooks --sync [--path ./runbooks]`
**Changes to existing code:**
- `prompt_builder.py`: add `build_message_with_runbooks(retrieved_chunks, runbook_chunks)`
- `cli.py`: optionally load `RunbookStore`, query it per analysis turn
**Companion features buildable at same time:**
- `tai runbooks --list` — show indexed runbooks and last sync time
- `tai runbooks --add <file>` — index a single runbook
- `/runbooks` slash command in interactive mode — show which runbooks were retrieved
- Runbook citation in AI output: "Based on runbook: `ssh.md#AuthenticationFailures`"
---
### Tier 3 — Session Memory Index (institutional learning)
**Problem:** Every session starts from zero. Repeat incidents on the same host or
same issue type get no benefit from past work.
**Approach:**
- On session end, embed the session summary (issue + root cause + actions) and upsert into a persistent ChromaDB collection (`~/.tai/sessions/`)
- On session start, query for similar past sessions by issue text + hostname
- Inject top-2 past sessions as `## Prior Sessions` context
- Optionally: `/history` command in interactive mode to surface past sessions explicitly
**New module:** `src/tai/session_store.py`
- `SessionStore`: wraps ChromaDB collection at `~/.tai/sessions/`
- `index_session(session_log_path)` — embed and store completed session
- `query_similar(issue, host, top_k) -> list[PastSession]`
**Changes to existing code:**
- `session_log.py`: add `summarise() -> str` method (issue + final AI response)
- `cli.py`: query `SessionStore` at session start, index at session end
**Companion features buildable at same time:**
- `tai history` CLI subcommand — search past sessions by keyword
- `tai history --host <hostname>` — all sessions for a host
- `tai history --export <file>` — export session summaries as Markdown report
- Auto-suggest: "Similar issue found from 2 weeks ago — load context? [y/N]"
---
### Implementation Order
```
Tier 1 (diagnostic chunks) ← Start here. Zero new infra. Immediate prompt quality gain.
Tier 2 (runbook KB) ← After Tier 1. Requires ChromaDB dep + runbook authoring.
Tier 3 (session memory) ← Builds on Tier 2 infrastructure. Minimal extra work.
```
**Estimated effort:**
- Tier 1: 23 days (new module + prompt builder changes + tests)
- Tier 2: 34 days (ChromaDB + runbook authoring + CLI command + tests)
- Tier 3: 12 days (reuses Tier 2 infrastructure)
### New Dependencies
```
# Tier 1 (zero new runtime deps — uses Ollama HTTP API already in use)
# No additions needed
# Tier 2 + 3
chromadb>=0.5,<1.0 # embedded vector store, no separate server
# OR
qdrant-client>=1.9,<2.0 # if self-hosted Qdrant preferred
sentence-transformers>=3.0 # optional: cross-encoder reranking
```
### New pyproject.toml optional group
```toml
[project.optional-dependencies]
rag = [
"chromadb>=0.5,<1.0",
"sentence-transformers>=3.0,<4.0",
]
```
______________________________________________________________________
## Decisions Log ## Decisions Log
| Date | Decision | Outcome | | Date | Decision | Outcome |
@@ -128,3 +292,8 @@ ______________________________________________________________________
| 2026-05-04 | Bastion host support | `--jump-host` flag via SSH native ProxyJump | | 2026-05-04 | Bastion host support | `--jump-host` flag via SSH native ProxyJump |
| 2026-05-04 | SSH config behavior | Use `~/.ssh/config` by default; allow override via `--ignore-ssh-config` | | 2026-05-04 | SSH config behavior | Use `~/.ssh/config` by default; allow override via `--ignore-ssh-config` |
| 2026-05-04 | CLI vs interactive mode | Interactive: REPL for v0.1, `textual` TUI for v0.2+ | | 2026-05-04 | CLI vs interactive mode | Interactive: REPL for v0.1, `textual` TUI for v0.2+ |
| 2026-05-04 | RAG embedding model | `nomic-embed-text` via Ollama (local, air-gapped safe) — ⬜ pending confirmation |
| 2026-05-04 | RAG vector store (Tier 1) | In-memory numpy cosine similarity — zero deps, session-scoped |
| 2026-05-04 | RAG vector store (Tier 2/3) | `chromadb` embedded mode (default) or `qdrant` self-hosted — ⬜ pending confirmation |
| 2026-05-04 | RAG chunking unit | Command-boundary splitting — each collected command = one or more chunks |
| 2026-05-04 | Runbook format | Markdown with YAML frontmatter, version-controlled in `runbooks/` directory |

15
requirements.txt Normal file
View File

@@ -0,0 +1,15 @@
# Core dependencies
typer>=0.12,<1.0
rich>=13.7,<14.0
asyncssh>=2.14,<3.0
openai>=1.30,<2.0
# Development dependencies
pytest>=8.2,<9.0
ruff>=0.5,<1.0
mypy>=1.10,<2.0
mdformat>=0.7,<1.0
yamllint>=1.35,<2.0
# Build dependencies
nuitka>=2.4,<3.0

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
from collections.abc import Iterator from collections.abc import Iterator
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any, cast
from openai import OpenAI from openai import OpenAI
@@ -88,6 +89,20 @@ class AIClient:
if delta: if delta:
yield delta yield delta
def stream_messages(self, messages: list[dict[str, str]]) -> Iterator[str]:
"""Stream a completion from an explicit chat history."""
stream = self._client.chat.completions.create(
model=self._config.model,
max_tokens=self._config.max_tokens,
stream=True,
messages=cast(Any, messages),
)
for chunk in cast(Iterator[Any], stream):
delta = chunk.choices[0].delta.content
if delta:
yield delta
def summary(self) -> str: def summary(self) -> str:
"""Human-readable description of the AI config.""" """Human-readable description of the AI config."""
return f"host={self._config.host} model={self._config.model}" return f"host={self._config.host} model={self._config.model}"

36
src/tai/ai_guardrails.py Normal file
View File

@@ -0,0 +1,36 @@
"""Heuristic checks for AI response quality and safety."""
from __future__ import annotations
import re
_RISKY_ACTION_PATTERNS = [
r"\bsystemctl\s+(restart|stop|start)\b",
r"\b(edit|modify|change)\s+/etc/",
r"\bpasswd\b",
r"\bapt\s+install\b",
r"\bdnf\s+install\b",
r"\byum\s+install\b",
]
def validate_ai_response(response: str) -> list[str]:
"""Return warning messages for potentially unsafe or weakly grounded output."""
warnings: list[str] = []
if "Evidence" not in response:
warnings.append("Response is missing an Evidence section.")
if "`" not in response:
warnings.append("Response does not include quoted evidence snippets.")
lower_response = response.lower()
for pattern in _RISKY_ACTION_PATTERNS:
if re.search(pattern, lower_response):
warnings.append(
"Response suggests potentially modifying actions; "
"prefer read-only verification unless remediation was explicitly requested."
)
break
return warnings

View File

@@ -8,14 +8,19 @@ from typing import Annotated
import typer import typer
from rich.console import Console from rich.console import Console
from rich.markdown import Markdown from rich.markdown import Markdown
from rich.panel import Panel
from rich.rule import Rule
from rich.text import Text
from tai.ai_client import DEFAULT_AI_HOST, DEFAULT_MODEL, AIClient, AIConfig from tai.ai_client import DEFAULT_AI_HOST, DEFAULT_MODEL, AIClient, AIConfig
from tai.ai_guardrails import validate_ai_response
from tai.collectors import CollectionReport, collect_from_plan from tai.collectors import CollectionReport, collect_from_plan
from tai.input_parser import InputValidationError, build_request from tai.input_parser import InputValidationError, build_request
from tai.models import TroubleshootRequest from tai.models import TroubleshootRequest
from tai.plan import plan_from_request from tai.plan import plan_from_request
from tai.prompt_builder import build_system_prompt, build_user_message from tai.prompt_builder import build_followup_message, build_system_prompt, build_user_message
from tai.ssh_client import SSHClient, SSHCommandResult, SSHConnectionConfig from tai.session_log import SessionLogger
from tai.ssh_client import SSHClient, SSHCommandResult, SSHConnectionConfig, SSHSession
app = typer.Typer(no_args_is_help=True, add_completion=False) app = typer.Typer(no_args_is_help=True, add_completion=False)
console = Console() console = Console()
@@ -66,6 +71,13 @@ def run(
help="Send collected diagnostics to AI for analysis.", help="Send collected diagnostics to AI for analysis.",
), ),
] = False, ] = False,
interactive: Annotated[
bool,
typer.Option(
"--interactive/--no-interactive",
help="Start interactive follow-up mode (/collect, /analyze, /quit).",
),
] = False,
ai_host: Annotated[ ai_host: Annotated[
str, str,
typer.Option("--ai-host", help="OpenAI-compatible AI backend URL."), typer.Option("--ai-host", help="OpenAI-compatible AI backend URL."),
@@ -78,6 +90,13 @@ def run(
str, str,
typer.Option("--ai-key", help="API key for the AI backend (not needed for Ollama)."), typer.Option("--ai-key", help="API key for the AI backend (not needed for Ollama)."),
] = "ollama", ] = "ollama",
log_file: Annotated[
str | None,
typer.Option(
"--log-file",
help="Optional JSONL file path to log AI and session output.",
),
] = None,
) -> None: ) -> None:
"""Start an interactive troubleshooting session scaffold.""" """Start an interactive troubleshooting session scaffold."""
try: try:
@@ -103,22 +122,34 @@ def run(
) )
summary = SSHClient(config).summary() summary = SSHClient(config).summary()
console.print("[bold green]tai[/bold green]") console.print(Rule("[bold green]tai[/bold green]", style="green"))
console.print(f"Issue: {req.issue}") console.print(f" [bold]Issue:[/bold] {req.issue}")
console.print(f"SSH: {summary}") console.print(f" [bold]SSH:[/bold] {summary}")
if req.target_paths: if req.target_paths:
console.print(f"Paths: {', '.join(str(p) for p in req.target_paths)}") console.print(f" [bold]Paths:[/bold] {', '.join(str(p) for p in req.target_paths)}")
console.print()
if not (probe or collect or analyze): if not (probe or collect or analyze or interactive):
return # nothing SSH-related requested return # nothing SSH-related requested
ai_config = AIConfig(host=ai_host, model=model, api_key=ai_key) ai_config = AIConfig(host=ai_host, model=model, api_key=ai_key)
if analyze: logger = SessionLogger.create(log_file) if log_file else None
if analyze or interactive:
console.print(f"[cyan]AI:[/cyan] {AIClient(ai_config).summary()}") console.print(f"[cyan]AI:[/cyan] {AIClient(ai_config).summary()}")
try: try:
asyncio.run(_async_main(config, req, probe=probe, collect=collect, analyze=analyze, asyncio.run(
ai_config=ai_config)) _async_main(
config,
req,
probe=probe,
collect=collect,
analyze=analyze,
interactive=interactive,
ai_config=ai_config,
logger=logger,
)
)
except typer.Exit: except typer.Exit:
raise raise
except TimeoutError as exc: except TimeoutError as exc:
@@ -136,14 +167,38 @@ async def _async_main(
probe: bool, probe: bool,
collect: bool, collect: bool,
analyze: bool, analyze: bool,
interactive: bool,
ai_config: AIConfig, ai_config: AIConfig,
logger: SessionLogger | None,
) -> None: ) -> None:
"""Open a single SSH session and run probe / collection / analysis through it.""" """Open a single SSH session and run probe / collection / analysis through it."""
client = SSHClient(config) client = SSHClient(config)
if logger is not None:
logger.log_event(
"session_start",
{
"host": req.host,
"port": req.port,
"issue": req.issue,
"probe": probe,
"collect": collect,
"analyze": analyze,
"interactive": interactive,
},
)
async with client.connect() as session: async with client.connect() as session:
if probe: if probe:
result = await session.probe() result = await session.probe()
_handle_probe_result(result) _handle_probe_result(result)
if logger is not None:
logger.log_event(
"probe_result",
{
"exit_code": result.exit_code,
"stdout": result.stdout,
"stderr": result.stderr,
},
)
report: CollectionReport | None = None report: CollectionReport | None = None
if collect or analyze: if collect or analyze:
@@ -151,38 +206,177 @@ async def _async_main(
console.print(f"[cyan]Collecting diagnostics:[/cyan] {len(plan)} commands") console.print(f"[cyan]Collecting diagnostics:[/cyan] {len(plan)} commands")
report = await collect_from_plan(session, plan) report = await collect_from_plan(session, plan)
_handle_collection_report(report) _handle_collection_report(report)
if logger is not None:
logger.log_event(
"collection_summary",
{
"total": report.total,
"failed": report.failed,
},
)
if analyze and report is not None: if analyze and report is not None:
_run_analysis(ai_config, req.issue, report) _run_analysis(ai_config, req.issue, report, logger=logger)
if interactive:
await _interactive_loop(session, req, ai_config, report, logger=logger)
async def _interactive_loop(
session: SSHSession,
req: TroubleshootRequest,
ai_config: AIConfig,
report: CollectionReport | None,
logger: SessionLogger | None,
) -> None:
"""Run a follow-up loop for collecting and conversational analysis."""
console.print(
Panel(
"Ask questions directly, or use [bold]/collect[/bold], "
"[bold]/analyze[/bold], [bold]/help[/bold], [bold]/quit[/bold]",
title="[bold cyan]Interactive Mode[/bold cyan]",
border_style="cyan",
padding=(0, 1),
)
)
prior_questions: list[str] = []
while True:
try:
command = console.input("\n[bold cyan]tai[/bold cyan][dim] >[/dim] ").strip()
except (EOFError, KeyboardInterrupt):
console.print("\n[yellow]Exiting interactive mode.[/yellow]")
if logger is not None:
logger.log_event("interactive_exit", {"reason": "signal_or_eof"})
return
if not command:
continue
if command in {"/quit", "/exit"}:
console.print("[green]Bye.[/green]")
if logger is not None:
logger.log_event("interactive_exit", {"reason": "user_quit"})
return
if command == "/help":
console.print(
Panel(
"[bold]/collect[/bold] — re-run diagnostics\n"
"[bold]/analyze[/bold] — re-analyze current diagnostics\n"
"[bold]/help[/bold] — show this message\n"
"[bold]/quit[/bold] — end session\n"
"[dim]Anything else is sent directly to the AI as a question.[/dim]",
title="[bold]Commands[/bold]",
border_style="dim",
padding=(0, 1),
)
)
continue
if command == "/collect":
plan = plan_from_request(req)
console.print(f"[cyan]Collecting diagnostics:[/cyan] {len(plan)} commands")
report = await collect_from_plan(session, plan)
_handle_collection_report(report)
if logger is not None:
logger.log_event(
"collection_summary",
{
"total": report.total,
"failed": report.failed,
},
)
continue
if command == "/analyze":
if report is None:
plan = plan_from_request(req)
console.print(f"[cyan]Collecting diagnostics:[/cyan] {len(plan)} commands")
report = await collect_from_plan(session, plan)
_handle_collection_report(report)
if report is None:
console.print("[red]No diagnostics available to analyze.[/red]")
continue
_run_followup_analysis(
ai_config,
req.issue,
report,
"Provide an updated diagnosis from the current diagnostics.",
prior_questions,
logger=logger,
)
prior_questions.append("/analyze")
if logger is not None:
logger.log_event("interactive_followup", {"question": "/analyze"})
continue
if report is None:
plan = plan_from_request(req)
console.print(f"[cyan]Collecting diagnostics:[/cyan] {len(plan)} commands")
report = await collect_from_plan(session, plan)
_handle_collection_report(report)
if report is None:
console.print("[red]No diagnostics available to analyze.[/red]")
continue
_run_followup_analysis(
ai_config,
req.issue,
report,
command,
prior_questions,
logger=logger,
)
prior_questions.append(command)
if logger is not None:
logger.log_event("interactive_followup", {"question": command})
def _handle_probe_result(result: SSHCommandResult) -> None: def _handle_probe_result(result: SSHCommandResult) -> None:
"""Handle and render probe output for success or failure.""" """Handle and render probe output for success or failure."""
console.print("[cyan]Running SSH probe:[/cyan] uname -a") console.print("[dim]▶ SSH probe:[/dim] uname -a")
if result.exit_code != 0: if result.exit_code != 0:
details = result.stderr or result.stdout or "no error output from ssh" details = result.stderr or result.stdout or "no error output from ssh"
console.print(f"[red]Probe failed (exit {result.exit_code}):[/red] {details}") console.print(f"[bold red]Probe failed[/bold red] (exit {result.exit_code}): {details}")
raise typer.Exit(code=1) raise typer.Exit(code=1)
output = result.stdout or "(no output)" output = result.stdout or "(no output)"
console.print("[bold green]Probe succeeded.[/bold green]") console.print("[bold green]Probe succeeded.[/bold green]")
console.print(f"Remote: {output}") console.print(f" [dim]{output}[/dim]")
def _handle_collection_report(report: CollectionReport) -> None: def _handle_collection_report(report: CollectionReport) -> None:
"""Render collected command status and truncation hints.""" """Render collected command status and truncation hints."""
console.print( failed_label = (
f"[bold]Collection complete:[/bold] {report.total} commands, {report.failed} failed" f"[red]{report.failed} failed[/red]" if report.failed else "[green]0 failed[/green]"
) )
console.print(f"[bold]Collection complete:[/bold] {report.total} commands, {failed_label}")
for item in report.items: for item in report.items:
status = "ok" if item.result.exit_code == 0 else f"exit {item.result.exit_code}"
truncated = item.result.stdout_truncated or item.result.stderr_truncated truncated = item.result.stdout_truncated or item.result.stderr_truncated
trunc = " (truncated)" if truncated else "" trunc_label = " [dim](truncated)[/dim]" if truncated else ""
console.print(f"- {item.name}: {status}{trunc}") if item.result.exit_code == 0:
console.print(f" [green]✓[/green] [dim]{item.name}[/dim]{trunc_label}")
else:
console.print(
f" [red]✗[/red] {item.name} "
f"[red](exit {item.result.exit_code})[/red]{trunc_label}"
)
def _run_analysis(ai_config: AIConfig, issue: str, report: CollectionReport) -> None: def _run_analysis(
ai_config: AIConfig,
issue: str,
report: CollectionReport,
*,
logger: SessionLogger | None,
) -> None:
"""Send collected data to the AI and stream the analysis to stdout.""" """Send collected data to the AI and stream the analysis to stdout."""
console.print("[cyan]Analyzing...[/cyan]\n") console.print()
console.print(Rule("[bold cyan]Analysis[/bold cyan]", style="cyan"))
console.print()
ai = AIClient(ai_config) ai = AIClient(ai_config)
system_prompt = build_system_prompt() system_prompt = build_system_prompt()
user_message = build_user_message(issue, report) user_message = build_user_message(issue, report)
@@ -190,9 +384,78 @@ def _run_analysis(ai_config: AIConfig, issue: str, report: CollectionReport) ->
chunks: list[str] = [] chunks: list[str] = []
for chunk in ai.stream(system_prompt, user_message): for chunk in ai.stream(system_prompt, user_message):
chunks.append(chunk) chunks.append(chunk)
console.print(Markdown("".join(chunks))) response = "".join(chunks)
console.print(Markdown(response))
warnings = validate_ai_response(response)
for item in warnings:
warn_text = Text()
warn_text.append("⚠ Guardrail: ", style="bold yellow")
warn_text.append(item, style="yellow")
console.print(warn_text)
if logger is not None:
logger.log_event(
"analysis_response",
{
"issue": issue,
"response": response,
"guardrail_warnings": warnings,
},
)
except Exception as exc: # noqa: BLE001 except Exception as exc: # noqa: BLE001
console.print(f"[red]AI analysis failed:[/red] {exc}") console.print(f"[red]AI analysis failed:[/red] {exc}")
if logger is not None:
logger.log_event("analysis_error", {"error": str(exc)})
raise typer.Exit(code=1) from exc
def _run_followup_analysis(
ai_config: AIConfig,
issue: str,
report: CollectionReport,
question: str,
prior_questions: list[str],
*,
logger: SessionLogger | None,
) -> str:
"""Run grounded follow-up analysis re-anchored to current diagnostics."""
console.print()
console.print(Rule("[bold cyan]AI Response[/bold cyan]", style="cyan"))
console.print()
ai = AIClient(ai_config)
system_prompt = build_system_prompt()
user_message = build_followup_message(issue, report, question, prior_questions)
try:
chunks: list[str] = []
for chunk in ai.stream(system_prompt, user_message):
chunks.append(chunk)
response = "".join(chunks)
console.print(Markdown(response))
console.print(Rule(style="dim"))
warnings = validate_ai_response(response)
for item in warnings:
warn_text = Text()
warn_text.append("⚠ Guardrail: ", style="bold yellow")
warn_text.append(item, style="yellow")
console.print(warn_text)
if logger is not None:
logger.log_event(
"analysis_response",
{
"last_user_message": question,
"response": response,
"guardrail_warnings": warnings,
},
)
return response
except Exception as exc: # noqa: BLE001
console.print(f"[red]AI analysis failed:[/red] {exc}")
if logger is not None:
logger.log_event("analysis_error", {"error": str(exc), "question": question})
raise typer.Exit(code=1) from exc raise typer.Exit(code=1) from exc

View File

@@ -15,12 +15,15 @@ Your job:
Important rules: Important rules:
- Only draw conclusions from data that is actually present. Do not speculate or invent evidence. - Only draw conclusions from data that is actually present. Do not speculate or invent evidence.
- For every root-cause claim, quote at least one exact snippet from collected output in backticks.
- If a command shows "could not be executed (SSH error)" it means the remote host blocked or - If a command shows "could not be executed (SSH error)" it means the remote host blocked or
rejected that specific command — it is not evidence about the service or system state. rejected that specific command — it is not evidence about the service or system state.
- If there is not enough data to diagnose the issue, say so plainly and list exactly what - If there is not enough data to diagnose the issue, say so plainly and list exactly what
additional commands or log files would be needed. additional commands or log files would be needed.
- Keep the response short. Skip sections that have nothing useful to say. - Keep the response short. Skip sections that have nothing useful to say.
- Never suggest commands that modify the system unless explicitly asked. - Never suggest commands that modify the system unless explicitly asked.
- Default to read-only verification steps. Do not suggest restarting services or editing configs
unless the user explicitly asks for remediation actions.
- Format with clear sections: **Root Cause**, **Evidence**, **Recommended Actions**. - Format with clear sections: **Root Cause**, **Evidence**, **Recommended Actions**.
""" """
@@ -72,3 +75,27 @@ def build_user_message(issue: str, report: CollectionReport) -> str:
) )
return "\n".join(lines) return "\n".join(lines)
def build_followup_message(
issue: str,
report: CollectionReport,
question: str,
prior_questions: list[str],
) -> str:
"""Build a grounded follow-up message that re-anchors to diagnostics each turn."""
base = build_user_message(issue, report)
lines: list[str] = [base, "## Follow-up"]
if prior_questions:
lines.append("\nRecent user follow-up questions:")
for idx, item in enumerate(prior_questions[-5:], start=1):
lines.append(f"{idx}. {item}")
lines.append("\nCurrent follow-up question:")
lines.append(question)
lines.append(
"\nAnswer strictly from the collected diagnostics above. "
"If evidence is insufficient, explicitly say so."
)
return "\n".join(lines)

34
src/tai/session_log.py Normal file
View File

@@ -0,0 +1,34 @@
"""Structured session logging helpers for troubleshooting runs."""
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
@dataclass(slots=True)
class SessionLogger:
"""Append JSONL events to a log file for post-run analysis."""
path: Path
@classmethod
def create(cls, file_path: str) -> SessionLogger:
"""Create a logger for *file_path*, ensuring parent directories exist."""
path = Path(file_path).expanduser()
path.parent.mkdir(parents=True, exist_ok=True)
return cls(path=path)
def log_event(self, event: str, payload: dict[str, Any]) -> None:
"""Write one timestamped event row to the JSONL log."""
row = {
"ts": datetime.now(UTC).isoformat(),
"event": event,
"payload": payload,
}
with self.path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(row, ensure_ascii=True))
handle.write("\n")

View File

@@ -4,7 +4,7 @@ from unittest.mock import MagicMock, patch
from tai.ai_client import DEFAULT_AI_HOST, DEFAULT_MODEL, AIClient, AIConfig from tai.ai_client import DEFAULT_AI_HOST, DEFAULT_MODEL, AIClient, AIConfig
from tai.collectors import CollectedItem, CollectionReport from tai.collectors import CollectedItem, CollectionReport
from tai.prompt_builder import build_system_prompt, build_user_message from tai.prompt_builder import build_followup_message, build_system_prompt, build_user_message
from tai.ssh_client import SSHCommandResult from tai.ssh_client import SSHCommandResult
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -116,6 +116,34 @@ def test_stream_yields_chunks() -> None:
assert result == ["Root ", "cause ", "found."] assert result == ["Root ", "cause ", "found."]
def test_stream_messages_yields_chunks() -> None:
config = AIConfig()
client = AIClient(config)
def _make_chunk(text: str | None) -> MagicMock:
delta = MagicMock()
delta.content = text
choice = MagicMock()
choice.delta = delta
chunk = MagicMock()
chunk.choices = [choice]
return chunk
mock_chunks = [_make_chunk("A"), _make_chunk(None), _make_chunk("B")]
with patch.object(client._client.chat.completions, "create", return_value=iter(mock_chunks)):
result = list(
client.stream_messages(
[
{"role": "system", "content": "sys"},
{"role": "user", "content": "question"},
]
)
)
assert result == ["A", "B"]
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# prompt_builder # prompt_builder
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -190,3 +218,16 @@ def test_build_user_message_handles_no_output() -> None:
report = _make_report([("empty", "cat /nonexistent", 1, "", "")]) report = _make_report([("empty", "cat /nonexistent", 1, "", "")])
msg = build_user_message("test", report) msg = build_user_message("test", report)
assert "no output" in msg assert "no output" in msg
def test_build_followup_message_includes_question_context() -> None:
report = _make_report([("kernel", "uname -a", 0, "Linux web01", "")])
msg = build_followup_message(
"nginx is failing",
report,
"what should I check next?",
["is nginx running?", "show me logs"],
)
assert "Current follow-up question" in msg
assert "what should I check next?" in msg
assert "Recent user follow-up questions" in msg

View File

@@ -0,0 +1,24 @@
"""Tests for AI response guardrails."""
from tai.ai_guardrails import validate_ai_response
def test_validate_ai_response_flags_missing_evidence_and_quotes() -> None:
warnings = validate_ai_response("Root cause only, no structure.")
assert any("Evidence section" in item for item in warnings)
assert any("quoted evidence" in item for item in warnings)
def test_validate_ai_response_flags_risky_actions() -> None:
text = "Evidence: `PasswordAuthentication no`\nRun systemctl restart sshd now."
warnings = validate_ai_response(text)
assert any("modifying actions" in item for item in warnings)
def test_validate_ai_response_allows_grounded_read_only_answer() -> None:
text = (
"Evidence: `PasswordAuthentication no`\n"
"Recommended Actions: run `journalctl -u sshd -n 200 --no-pager`"
)
warnings = validate_ai_response(text)
assert not warnings

View File

@@ -137,5 +137,96 @@ def test_collect_success_prints_summary(monkeypatch) -> None: # type: ignore[no
assert result.exit_code == 0 assert result.exit_code == 0
assert "Collection complete" in result.stdout assert "Collection complete" in result.stdout
assert "kernel: ok" in result.stdout assert "kernel" in result.stdout
assert "journal: ok (truncated)" in result.stdout assert "journal" in result.stdout
assert "truncated" in result.stdout
def test_interactive_collect_then_quit(monkeypatch) -> None: # type: ignore[no-untyped-def]
_mock_session(monkeypatch)
async def fake_collect_from_plan(_session, _plan) -> CollectionReport: # type: ignore[no-untyped-def]
return CollectionReport(
host="ssh.archflux.net",
items=[
CollectedItem(
name="kernel",
result=SSHCommandResult(
command="uname -a",
exit_code=0,
stdout="Linux test",
stderr="",
),
),
],
)
commands = iter(["/collect", "/quit"])
monkeypatch.setattr("tai.cli.collect_from_plan", fake_collect_from_plan)
monkeypatch.setattr("tai.cli.console.input", lambda _prompt: next(commands))
runner = CliRunner()
result = runner.invoke(
app,
[
"apache failed",
"--host",
"ssh.archflux.net",
"--port",
"5566",
"--no-probe",
"--interactive",
],
)
assert result.exit_code == 0
assert "ask questions directly" in result.stdout.lower()
assert "Collection complete" in result.stdout
assert "Bye." in result.stdout
def test_interactive_unknown_command_prints_hint(monkeypatch) -> None: # type: ignore[no-untyped-def]
_mock_session(monkeypatch)
async def fake_collect_from_plan(_session, _plan) -> CollectionReport: # type: ignore[no-untyped-def]
return CollectionReport(
host="ssh.archflux.net",
items=[
CollectedItem(
name="kernel",
result=SSHCommandResult(
command="uname -a",
exit_code=0,
stdout="Linux test",
stderr="",
),
),
],
)
commands = iter(["what should I check next?", "/quit"])
monkeypatch.setattr("tai.cli.collect_from_plan", fake_collect_from_plan)
monkeypatch.setattr(
"tai.cli.AIClient.stream",
lambda *_args, **_kwargs: iter(["Check logs."]),
)
monkeypatch.setattr("tai.cli.console.input", lambda _prompt: next(commands))
runner = CliRunner()
result = runner.invoke(
app,
[
"apache failed",
"--host",
"ssh.archflux.net",
"--port",
"5566",
"--no-probe",
"--interactive",
],
)
assert result.exit_code == 0
assert "AI Response" in result.stdout
assert "Check logs." in result.stdout

22
tests/test_session_log.py Normal file
View File

@@ -0,0 +1,22 @@
"""Tests for structured session logging."""
from __future__ import annotations
import json
from tai.session_log import SessionLogger
def test_session_logger_writes_jsonl_row(tmp_path) -> None: # type: ignore[no-untyped-def]
log_path = tmp_path / "logs" / "session.jsonl"
logger = SessionLogger.create(str(log_path))
logger.log_event("analysis_response", {"response": "Root cause is X"})
lines = log_path.read_text(encoding="utf-8").splitlines()
assert len(lines) == 1
row = json.loads(lines[0])
assert row["event"] == "analysis_response"
assert row["payload"]["response"] == "Root cause is X"
assert "ts" in row