16 Commits

Author SHA1 Message Date
fab1f3afbf lint
All checks were successful
CI / test (push) Successful in 23s
2026-05-06 05:09:35 +02:00
54b202bdc2 update
Some checks failed
CI / test (push) Failing after 19s
2026-05-06 05:06:45 +02:00
bbc75b1559 lint
Some checks failed
CI / test (push) Failing after 19s
2026-05-06 05:03:51 +02:00
d5e1822644 update
Some checks failed
CI / test (push) Failing after 15s
2026-05-06 05:02:38 +02:00
74a56e3113 merge: feature/rag-tier1 into main
Some checks failed
CI / test (push) Failing after 15s
2026-05-06 04:50:51 +02:00
57f4c0efaa feat: complete RAG runbook workflow and release docs
Some checks failed
CI / test (push) Failing after 15s
2026-05-06 04:48:41 +02:00
450de24d28 update
Some checks failed
CI / test (push) Failing after 32s
2026-05-06 03:34:01 +02:00
e943e84bd2 feat(rag): harden Tier 1 retrieval observability and stability
Some checks failed
CI / test (push) Failing after 15s
- Add --rag-debug flag to show retrieved chunk names and similarity scores
- Add explicit fallback notices when RAG indexing/query embedding fails
- Log RAG index/query metrics (duration, scores, top hit, token estimate)
- Normalize and cap chunk content for more stable prompt shape on small models
- Add hypothesis-continuity instruction for follow-up prompts
- Add retrieval scoring API and new tests for truncation/fallback/debug paths
2026-05-04 19:13:57 +02:00
5529960e79 feat(rag): add --embed-model flag with nomic-embed-text default
Some checks failed
CI / test (push) Failing after 15s
2026-05-04 18:41:55 +02:00
be181c2d7f feat(rag): implement Tier 1 in-memory RAG for interactive follow-ups
Some checks failed
CI / test (push) Failing after 15s
- Add embed() to AIClient using Ollama nomic-embed-text via /v1/embeddings
- Add DEFAULT_EMBED_MODEL and embed_model field to AIConfig
- New rag_retriever.py: chunk_report(), EmbeddedChunk, retrieve() (pure-Python cosine)
- prompt_builder: add build_message_with_chunks() for RAG-aware follow-up prompts
- cli: add --no-rag flag, embed report chunks after collection, retrieve top-5 per question
- Graceful fallback to full-context if embedding model unavailable
- 16 new tests in test_rag_retriever.py (67 total, all passing)
- Add chromadb>=0.5 as optional [rag] dep in pyproject.toml
- README: add step 3 (pull nomic-embed-text), update Suggested Tooling table
2026-05-04 18:36:12 +02:00
c1192cdb94 update
Some checks failed
CI / test (push) Failing after 15s
2026-05-04 18:30:54 +02:00
739e19f595 update
Some checks failed
CI / test (push) Failing after 15s
2026-05-04 18:30:33 +02:00
e49670a664 docs(roadmap): add Phase 6 RAG & Knowledge Layer plan
Some checks failed
CI / test (push) Failing after 15s
- Three-tier RAG architecture: diagnostic chunks, runbook KB, session memory
- Technology decisions table with options and recommendations
- Per-tier: approach, new modules, changes to existing code, companion features
- Implementation order and effort estimates
- New dependencies and optional pyproject.toml group
- Decisions log entries for RAG choices pending confirmation
2026-05-04 18:23:33 +02:00
4870bd3bfe ci: rename release.yml to tag.yml, fix trigger to match non-v tags
All checks were successful
CI / test (push) Successful in 20s
Tag Build / build (push) Successful in 8m33s
- Trigger was 'v*' but tags are bare semver (0.3.0) — fix to '[0-9]*'
- Rename to tag.yml to reflect tag-driven build purpose
- Add zip to apt dependencies (required for release zip step)
2026-05-04 06:48:34 +02:00
5798d87993 Merge branch 'feature/interactive-ux-improvements'
All checks were successful
CI / test (push) Successful in 20s
2026-05-04 06:43:33 +02:00
2c738579bd feat(ux): improve interactive mode readability and input visibility
All checks were successful
CI / test (push) Successful in 19s
- Replace plain 'tai>' prompt with styled console.input() bold cyan prompt
- Wrap interactive mode entry in a Rich Panel with border
- Frame each AI response with Rule dividers (──── AI Response ────)
- Style guardrail warnings with ⚠ prefix and bold yellow
- Improve /help output with formatted Panel showing all commands
- Style collection report: ✓/✗ per item with color, truncation in dim
- Style probe output: ✓/✗ with green/red, host info in dim
- Add Rule header divider on session start
2026-05-04 06:37:50 +02:00
34 changed files with 3949 additions and 158 deletions

View File

@@ -1,9 +1,9 @@
name: Release
name: Tag Build
on:
push:
tags:
- "v*"
- "[0-9]*"
jobs:
build:
@@ -61,8 +61,8 @@ jobs:
run: |
if command -v apt-get >/dev/null 2>&1; then
apt-get update
apt-get install -y python3.12 python3.12-venv python3-pip patchelf ccache || \
apt-get install -y python3 python3-pip python3-venv patchelf ccache
apt-get install -y python3.12 python3.12-venv python3-pip patchelf ccache zip || \
apt-get install -y python3 python3-pip python3-venv patchelf ccache zip
elif command -v dnf >/dev/null 2>&1; then
dnf install -y python3 python3-pip python3-devel patchelf ccache
elif command -v yum >/dev/null 2>&1; then

View File

@@ -10,27 +10,37 @@ ______________________________________________________________________
### Added
- `README.md` — project overview, description, example workflow, supported distributions, and suggested tooling
- `ROADMAP.md` — phased development plan covering decisions, data collection, AI integration, CLI design, and hardening
- `CHANGELOG.md` — this file; established changelog tracking for the project
- `.gitea/workflows/ci.yml` — Gitea Actions CI workflow for push and pull request events
- Gitea CI now uses native `git` checkout and system Python setup to avoid host-executor JavaScript action path issues
- Gitea native checkout now uses `CI_GIT_TOKEN` repository secret for authenticated fetch from private repos
- Gitea CI now installs dependencies in a local `.venv` to avoid Debian/PEP 668 externally-managed pip errors
- Python package scaffold with `src` layout and project metadata in `pyproject.toml`
- Initial CLI entrypoint with agreed SSH flags: `--identity-file`, `--jump-host`, and `--ignore-ssh-config`
- Input parsing/validation module and core request model
- SSH configuration scaffold module for upcoming connection/read-only execution work
- Implemented SSH module with real key-based command execution via system `ssh`
- Added explicit SSH port support across CLI, input parsing, request model, and SSH client (`--port`, e.g. 5566)
- Added live SSH connectivity probe (`uname -a`) enabled by default, with `--no-probe` opt-out and non-zero exit on failure
- Added baseline diagnostics collection via `--collect`, including service, journal, disk, and network checks
- Read-only command policy enforcement (allowlist + blocked shell operators)
- Added byte-limited SSH output capture with truncation markers for large command output
- Test scaffold (`pytest`) with initial parser and CLI coverage
- SSH test coverage for policy checks, SSH argument construction, and config summary behavior
- CI workflow for lint (`ruff`), type-check (`mypy`), and tests (`pytest`)
- CI coverage expanded with Markdown formatting checks (`mdformat --check`) and YAML linting (`yamllint`)
- Nothing yet.
______________________________________________________________________
## [0.4.0] - 2026-05-06
### Added
- `runbooks/` corpus with service troubleshooting guides: `ssh`, `nginx`, `postgres`, `disk`, `kernel`, `docker`, `sssd`, `xorg`, `wayland`, `x2go`, `selinux`, `apparmor`
- Runbook knowledge store module `src/tai/runbook_store.py` (persistent ChromaDB-backed index and query)
- Chroma telemetry no-op client `src/tai/chroma_telemetry.py` to suppress noisy local telemetry errors
- `tai runbooks` command group with:
- `sync` for indexing all Markdown runbooks
- `list` for listing indexed metadata
- `add` for indexing a single runbook file
- `--runbooks` option on `tai run` to enable Tier 2 runbook retrieval
- Initial analysis RAG path using retrieved diagnostic chunks (`build_analysis_message_with_chunks`)
- Follow-up RAG path updates with tighter `top_k` and runbook context injection
- AI runtime controls:
- `--ai-timeout-seconds`
- `--ai-max-tokens`
- Non-streaming AI completion path for improved local backend reliability
- Service/subsystem presence probes in collection plans:
- unit-file checks
- expected binary path checks
- status/journal/config probes for recognized services including `sssd`
- Prompt instruction for "component absent or not installed" interpretation when presence signals are missing
- Runbook store unit tests in `tests/test_runbook_store.py`
- CLI tests updated for `tai run` subcommand and non-streaming completion mocks
- README refreshed with current CLI, architecture layout, RAG/runbook workflow, and usage examples
- `docs/ARCHITECTURE.md` with end-to-end flow, module responsibilities, safety boundaries, and fallback behavior
### Removed
@@ -44,3 +54,5 @@ ______________________________________________________________________
- SSH bastion support: `--jump-host` flag using SSH native ProxyJump
- SSH config behavior: use `~/.ssh/config` by default; allow override via `--ignore-ssh-config`
- Interface: **interactive REPL** for v0.1; `textual`-based TUI (split-pane) for v0.2+
- RAG Tier 1 strategy: semantic diagnostic chunk retrieval with local embeddings
- RAG Tier 2 strategy: Markdown runbooks persisted in embedded ChromaDB

211
README.md
View File

@@ -1,93 +1,202 @@
# tai Linux AI Troubleshooting Agent
# tai - Linux AI Troubleshooting Agent
`tai` is an agentic AI-driven troubleshooting tool for Linux systems. It autonomously investigates issues on remote hosts via SSH, analyzes relevant logs and configuration files, and provides a clear diagnosis along with suggested remediation steps — all without making any changes to the target system.
`tai` is a read-only Linux troubleshooting assistant that connects to remote hosts via SSH, collects diagnostics, and runs grounded AI analysis using local models.
## Overview
The project is designed for operators who want AI speed without losing operational safety or evidence traceability.
Given a problem description and a target hostname, `tai` connects to the remote system over SSH, gathers relevant data (logs, configuration files, service status, etc.), and uses a locally-hosted AI model to reason about the root cause and recommend solutions.
## What tai Does
The agent operates in **read-only mode at all times**. It will never modify the target system under any circumstances — all suggestions are presented to the human troubleshooter for review and action.
- Runs safe, read-only remote checks over SSH
- Builds a diagnostics collection plan from issue text
- Supports one-shot analysis and interactive follow-up mode
- Uses local AI backends (OpenAI-compatible endpoint, typically Ollama)
- Uses RAG over collected diagnostics (Tier 1)
- Uses persistent runbook retrieval with ChromaDB (Tier 2)
- Emits structured Markdown analysis with evidence and actions
- Can log session and retrieval telemetry locally as JSONL
## Supported Distributions
## Safety Model
- Ubuntu
- Debian
- RHEL
- Rocky Linux
`tai` enforces read-only command policy on all remote commands.
## Example Workflow
- Allowlist based command validation
- Blocked shell operators (`>`, `>>`, `<`, `|`, `&&`, `||`, `;`)
- No write/mutation actions are executed on target hosts
A troubleshooter receives a ticket reporting that the Apache service on a remote server has failed to start. They provide `tai` with:
The tool may suggest remediation commands in output, but does not execute them.
1. The ticket description or error message
1. The hostname of the affected system
1. Any relevant directories to focus on
## Current Feature Set
`tai` then connects to the host, reads through system logs, service configurations, and any other related files, and returns a structured analysis of the likely cause along with recommended next steps.
### Core CLI
## Suggested Tooling
- `tai run ...` main troubleshooting entrypoint
- SSH options: host, port, identity file, jump host, SSH config control
- Live probe mode (`uname -a`)
- Diagnostics collection mode
- AI analysis mode
- Interactive loop with `/collect`, `/analyze`, `/help`, `/quit`
| Component | Tool |
|-----------|------|
| AI inference backend | [Ollama](https://ollama.com) |
| Model | `gemma3:4b`, `llama3.1:8b`, or `qwen2.5:7b` |
| Language | Python 3.11+ |
### AI and Prompting
______________________________________________________________________
- OpenAI-compatible AI client
- Configurable model, timeout, token budget
- Guardrails to keep responses evidence-based
- Initial and follow-up prompts grounded in collected diagnostics
- Non-streaming completion path for local backend reliability
## How-To: Setting Up the AI Backend (Arch Linux + RTX 3080)
### RAG and Knowledge
`tai` uses [Ollama](https://ollama.com) as its local AI backend. It exposes an OpenAI-compatible HTTP API that `tai` talks to — no cloud services, no data leaving your machine.
- Tier 1: semantic retrieval of diagnostic chunks per question
- Tier 2: persistent runbook knowledge base with ChromaDB
- Runbook retrieval injected as separate prompt context
- Retrieval debug output (`--rag-debug`)
- Full-context fallback if retrieval/indexing fails
An RTX 3080 (10 GB VRAM) comfortably runs 78B parameter models at 4-bit quantisation.
### Runbook Management
### 1. Install CUDA and Ollama
- `tai runbooks sync --path ./runbooks --store ~/.tai/runbooks`
- `tai runbooks list --store ~/.tai/runbooks`
- `tai runbooks add <file> --store ~/.tai/runbooks`
```bash
# CUDA runtime (skip if already installed)
sudo pacman -S cuda
### Presence and Absence Signals
# Ollama with CUDA support from the AUR
yay -S ollama-cuda
# or: paru -S ollama-cuda
For recognized services/subsystems (for example `sssd`, `docker`, `x2go`, `xorg`, `wayland`, `selinux`, `apparmor`), collection includes:
# Enable and start the service
sudo systemctl enable --now ollama
- service unit-file discovery (`systemctl list-unit-files ...`)
- binary presence checks via `ls -l <expected path>`
- service status and journals
- selected config path probes where defined
This improves analysis quality for "component missing/not installed" scenarios.
## Repository Layout
```text
src/tai/
cli.py # CLI commands and orchestration
ssh_client.py # SSH execution + read-only policy
collectors.py # execution of collection plans
plan.py # issue -> command plan builder
ai_client.py # OpenAI-compatible AI + embeddings client
ai_guardrails.py # response guardrails/validation
prompt_builder.py # prompt composition
rag_retriever.py # diagnostic chunk retrieval
runbook_store.py # persistent ChromaDB runbook index/query
chroma_telemetry.py # no-op Chroma telemetry client
session_log.py # JSONL session logging
input_parser.py # CLI input validation
models.py # domain request models
runbooks/
*.md # Markdown runbooks with frontmatter
tests/
test_*.py # unit and CLI coverage
```
### 2. Pull a model
## Installation
```bash
ollama pull gemma3:4b # ~3 GB — fast, good for sysadmin tasks
ollama pull llama3.1:8b # ~5 GB — stronger reasoning
ollama pull qwen2.5:7b # ~4.5 GB — strong structured output
python -m venv .venv
source .venv/bin/activate
pip install -e .
```
### 3. Verify the model works
RAG runbook storage requires optional dependencies:
```bash
ollama run gemma3:4b "what causes a systemd service to enter failed state?"
pip install -e .[rag]
```
### 4. Verify the HTTP API is running
Development dependencies:
`tai` communicates with Ollama over its OpenAI-compatible REST API:
```bash
pip install -e .[dev]
```
## AI Backend Setup (Ollama)
`tai` expects an OpenAI-compatible API endpoint, defaulting to `http://localhost:11434/v1`.
```bash
ollama pull gemma3:4b
ollama pull nomic-embed-text
```
Quick backend check:
```bash
curl http://localhost:11434/api/generate \
-d '{"model":"gemma3:4b","prompt":"hello","stream":false}'
```
A JSON response with a `response` field confirms everything is working.
## Usage
### 5. Point tai at your Ollama instance
Once `tai` AI integration is complete, use these flags:
### Basic Probe and Collect
```bash
tai "nginx failing to start" --host web01 \
--ai-host http://localhost:11434 \
--model gemma3:4b
tai run "nginx failing to start" \
--host web01 \
--probe \
--collect
```
The default values for `--ai-host` and `--model` will be `http://localhost:11434` and `gemma3:4b` respectively, so for local use you won't need to specify them explicitly.
### Analyze with RAG and Runbooks
```bash
tai run "why isnt sssd working?" \
--host ssh.archflux.net \
--port 5566 \
--probe --collect --analyze \
--runbooks ~/.tai/runbooks \
--rag-debug \
--ai-timeout-seconds 45 \
--ai-max-tokens 300
```
### Interactive Session
```bash
tai run "docker daemon keeps failing" \
--host app01 \
--collect \
--interactive \
--runbooks ~/.tai/runbooks
```
## Runbook Workflow
1. Write Markdown runbooks in `runbooks/` with frontmatter keys: `service`, `symptoms`, `tags`.
1. Sync the store.
1. Pass `--runbooks <store-path>` to `tai run`.
Example:
```bash
tai runbooks sync --path ./runbooks --store ~/.tai/runbooks
tai runbooks list --store ~/.tai/runbooks
```
## Testing
```bash
pytest
```
Focused suites:
```bash
pytest tests/test_plan.py tests/test_ai.py tests/test_cli.py
```
## Known Limits
- Service-specific presence checks currently apply to recognized service/subsystem names.
- Package-manager-level presence checks are not yet in the default read-only command allowlist.
- Tier 3 persistent session memory is not implemented yet.
## Changelog and Roadmap
- See `CHANGELOG.md` for release history.
- See `ROADMAP.md` for phase status and next milestones.
- See `docs/ARCHITECTURE.md` for module-level architecture and data flow.

View File

@@ -18,10 +18,11 @@ These must be resolved before meaningful development can begin.
### AI Backend & Model
- [ ] Confirm use of [vLLM](https://github.com/vllm-project/vllm) as the inference backend
- [ ] Confirm `gemma4:a4b` as the default model (or select an alternative)
- [x] OpenAI-compatible backend client implemented (`AIClient`)
- [x] Default local backend profile wired for Ollama (`http://localhost:11434/v1`)
- [x] Default model profile set to `gemma3:4b` (override via `--model`)
- [ ] Define minimum hardware requirements for running the model locally
- [ ] Decide whether the AI backend is bundled, self-hosted externally, or user-supplied
- [x] AI backend is user-supplied/self-hosted
### SSH Strategy
@@ -38,7 +39,7 @@ These must be resolved before meaningful development can begin.
### Scope & Constraints
- [ ] Define the supported scope of issues (services, network, disk, kernel, etc.)
- [ ] Confirm read-only guarantee — document exactly what "read-only" means in practice
- [x] Read-only guarantee implemented with command allowlist + blocked shell operator policy
- [x] **Decision: interactive REPL mode for v0.1, full TUI for v0.2+**
- v0.1: chat-loop REPL launched from CLI; human can follow up, correct, and redirect the agent
- v0.2+: `textual`-based TUI with split panes (collected data | AI output | input bar)
@@ -52,7 +53,7 @@ Basic project scaffolding and connectivity.
- [x] Finalise repository structure and language toolchain
- [x] Set up CI pipeline (linting, tests)
- [ ] Implement SSH connection module
- [x] Implement SSH connection module
- [x] Define SSH config model and probe interface scaffold
- [x] Connect to remote host
- [x] Execute read-only commands (e.g. `journalctl`, `systemctl status`, `cat`)
@@ -68,15 +69,15 @@ ______________________________________________________________________
Define what information the agent gathers and how.
- [ ] Identify the canonical set of data sources per issue type:
- [x] Identify a baseline canonical set of data sources per issue type:
- Service failures: `journalctl`, `systemctl`, service config files
- Network issues: `ip`, `ss`, `netstat`, firewall rules
- Disk issues: `df`, `du`, `dmesg`, `smartctl`
- General: `/var/log/syslog`, `/var/log/messages`, `dmesg`
- [ ] Implement pluggable "collector" modules per data source
- [ ] Implement directory traversal for user-specified paths (read-only)
- [x] Implement collectors and plan builder for baseline issue categories
- [x] Implement directory traversal for user-specified paths (read-only)
- [ ] Add support for per-distro variations (Ubuntu vs RHEL path differences, etc.)
- [ ] Write tests with mocked SSH output
- [x] Write tests with mocked SSH output
______________________________________________________________________
@@ -84,12 +85,12 @@ ______________________________________________________________________
Wire collected data into the local AI model.
- [ ] Implement vLLM client module
- [ ] Design prompt template: system context, collected data, issue description → diagnosis
- [ ] Implement response parsing and structured output (root cause + suggested steps)
- [ ] Tune context window usage — handle truncation for large log outputs
- [ ] Add streaming support for long AI responses
- [ ] Evaluate and test model output quality on common issue types
- [x] Implement OpenAI-compatible AI client module
- [x] Design prompt templates for initial and follow-up analysis
- [x] Implement response guardrail checks and structured response headings
- [x] Tune context usage with RAG retrieval and chunk/runbook truncation budgets
- [x] Implement reliable non-streaming completion path for local backends
- [ ] Continue output quality tuning and grounding evaluation on real hosts
______________________________________________________________________
@@ -97,11 +98,11 @@ ______________________________________________________________________
Polish the interface for real-world use.
- [ ] Design CLI interface (flags, subcommands, interactive prompts)
- [ ] Implement structured output: diagnosis, confidence, recommended actions
- [ ] Add `--verbose` / `--debug` mode showing raw collected data
- [x] Design CLI interface with run command, interactive prompts, and runbook subcommands
- [x] Implement structured output sections (Root Cause, Evidence, Recommended Actions)
- [x] Add RAG debug mode (`--rag-debug`) showing retrieval scores
- [ ] Support output to file or clipboard
- [ ] Write man page / `--help` documentation
- [x] Provide comprehensive `--help` command documentation via Typer options
______________________________________________________________________
@@ -117,6 +118,191 @@ Prepare for broader use.
______________________________________________________________________
## Phase 6 — RAG & Knowledge Layer
Introduce Retrieval-Augmented Generation to ground AI responses in evidence rather than
model weights alone. Three tiers of increasing capability, each buildable independently.
### Goals
- Eliminate prompt flooding on hosts with large log output
- Ground recommendations in version-controlled runbooks, not model improvisation
- Build compounding institutional memory from past troubleshooting sessions
- Keep all data local — no embeddings or session content leaves the network
______________________________________________________________________
### Technology Decisions Required
| Decision | Options | Recommendation | Status |
|---|---|---|---|
| Embedding model | `nomic-embed-text`, `mxbai-embed-large`, `all-minilm` | `nomic-embed-text` via Ollama (local, 274MB, strong perf) | ✅ Implemented |
| Vector store — Tier 1 | In-memory numpy cosine, `faiss-cpu` | numpy (zero deps) for session scope | ✅ Implemented |
| Vector store — Tier 2/3 | `chromadb`, `qdrant`, `weaviate`, `pgvector` | `chromadb` embedded mode | ✅ Tier 2 Implemented |
| Chunking strategy | Fixed token, sentence-aware, command-boundary | Command-boundary splitting (natural unit for diagnostics) | ✅ Implemented |
| Hybrid retrieval | Semantic only, BM25 only, hybrid | Hybrid (BM25 keyword + cosine semantic) for best recall | ⬜ Pending |
| Reranking | None, cross-encoder (`ms-marco-MiniLM`), LLM-as-judge | Cross-encoder rerank pass before prompt injection | ⬜ Pending |
| Runbook format | Markdown, YAML, JSON | Markdown (human-editable, version-controllable) | ✅ Implemented |
| Session index storage | Local `~/.tai/`, configurable path | `~/.tai/sessions/` with ChromaDB collection | ⬜ Pending |
______________________________________________________________________
### Tier 1 — Diagnostic Chunk Retrieval (in-memory, per-session)
Status: ✅ Implemented
**Problem:** Current flow injects all collected output into the prompt as one block.
On busy hosts this floods the context window with irrelevant output, degrading quality.
**Approach:**
- After collection, split each command's output into overlapping token chunks (e.g. 512 tokens, 64 overlap)
- Embed all chunks using `nomic-embed-text` via Ollama embeddings API
- On each question (initial + follow-up), embed the question and retrieve top-k chunks by cosine similarity
- Inject only retrieved chunks into the prompt, not the full dump
**New module:** `src/tai/rag_retriever.py`
- `chunk_report(report) -> list[Chunk]`
- `embed_chunks(chunks) -> list[EmbeddedChunk]`
- `retrieve(question, embedded_chunks, top_k) -> list[Chunk]`
**Changes to existing code:**
- `prompt_builder.py`: accept `retrieved_chunks` instead of full `CollectionReport` for RAG-mode prompts
- `cli.py`: embed report after collection, pass retriever to `_run_analysis` and `_run_followup_analysis`
- `ai_client.py`: add `embed(text) -> list[float]` method using Ollama `/api/embeddings`
**Companion features buildable at same time:**
- `--no-rag` flag to bypass retrieval and use full dump (backwards compat)
- Token budget display: show user how many tokens are being sent vs. saved
- Per-chunk source attribution in AI response (which command produced the evidence)
**Tests:**
- `tests/test_rag_retriever.py`: chunk splitting, cosine similarity ranking, top-k retrieval
- `tests/test_ai.py`: add `test_embed_returns_float_list()`
______________________________________________________________________
### Tier 2 — Runbook Knowledge Base (persistent, ChromaDB)
Status: ✅ Implemented
**Problem:** AI improvises remediation steps from training data, which may be wrong for
specific environments, distros, or internal conventions.
**Approach:**
- Maintain a version-controlled corpus of Markdown runbooks in `runbooks/` directory
- On first run (or `tai runbooks --sync`), embed all runbooks and persist to ChromaDB collection
- On each analysis, retrieve top-3 relevant runbook chunks alongside diagnostic chunks
- Inject as a separate `## Runbook Context` section in the prompt
**New module:** `src/tai/runbook_store.py`
- `RunbookStore`: wraps ChromaDB collection
- `sync(runbooks_dir) -> int` — embed and upsert all runbooks
- `query(question, top_k) -> list[RunbookChunk]`
**New directory:** `runbooks/`
- `ssh.md`, `nginx.md`, `postgres.md`, `disk.md`, `kernel.md`, etc.
- Each runbook: YAML frontmatter (`service`, `symptoms`, `tags`) + Markdown body
**New CLI command:** `tai runbooks --sync [--path ./runbooks]`
**Changes to existing code:**
- `prompt_builder.py`: add `build_message_with_runbooks(retrieved_chunks, runbook_chunks)`
- `cli.py`: optionally load `RunbookStore`, query it per analysis turn
**Companion features buildable at same time:**
- `tai runbooks --list` — show indexed runbooks and last sync time
- `tai runbooks --add <file>` — index a single runbook
- `/runbooks` slash command in interactive mode — show which runbooks were retrieved
- Runbook citation in AI output: "Based on runbook: `ssh.md#AuthenticationFailures`"
______________________________________________________________________
### Tier 3 — Session Memory Index (institutional learning)
Status: ⬜ Pending
**Problem:** Every session starts from zero. Repeat incidents on the same host or
same issue type get no benefit from past work.
**Approach:**
- On session end, embed the session summary (issue + root cause + actions) and upsert into a persistent ChromaDB collection (`~/.tai/sessions/`)
- On session start, query for similar past sessions by issue text + hostname
- Inject top-2 past sessions as `## Prior Sessions` context
- Optionally: `/history` command in interactive mode to surface past sessions explicitly
**New module:** `src/tai/session_store.py`
- `SessionStore`: wraps ChromaDB collection at `~/.tai/sessions/`
- `index_session(session_log_path)` — embed and store completed session
- `query_similar(issue, host, top_k) -> list[PastSession]`
**Changes to existing code:**
- `session_log.py`: add `summarise() -> str` method (issue + final AI response)
- `cli.py`: query `SessionStore` at session start, index at session end
**Companion features buildable at same time:**
- `tai history` CLI subcommand — search past sessions by keyword
- `tai history --host <hostname>` — all sessions for a host
- `tai history --export <file>` — export session summaries as Markdown report
- Auto-suggest: "Similar issue found from 2 weeks ago — load context? [y/N]"
______________________________________________________________________
### Implementation Order
```
Tier 1 (diagnostic chunks) ← Start here. Zero new infra. Immediate prompt quality gain.
Tier 2 (runbook KB) ← After Tier 1. Requires ChromaDB dep + runbook authoring.
Tier 3 (session memory) ← Builds on Tier 2 infrastructure. Minimal extra work.
```
**Estimated effort:**
- Tier 1: 23 days (new module + prompt builder changes + tests)
- Tier 2: 34 days (ChromaDB + runbook authoring + CLI command + tests)
- Tier 3: 12 days (reuses Tier 2 infrastructure)
### New Dependencies
```
# Tier 1 (zero new runtime deps — uses Ollama HTTP API already in use)
# No additions needed
# Tier 2 + 3
chromadb>=0.5,<1.0 # embedded vector store, no separate server
# OR
qdrant-client>=1.9,<2.0 # if self-hosted Qdrant preferred
sentence-transformers>=3.0 # optional: cross-encoder reranking
```
### New pyproject.toml optional group
```toml
[project.optional-dependencies]
rag = [
"chromadb>=0.5,<1.0",
"sentence-transformers>=3.0,<4.0",
]
```
______________________________________________________________________
## Decisions Log
| Date | Decision | Outcome |
@@ -128,3 +314,8 @@ ______________________________________________________________________
| 2026-05-04 | Bastion host support | `--jump-host` flag via SSH native ProxyJump |
| 2026-05-04 | SSH config behavior | Use `~/.ssh/config` by default; allow override via `--ignore-ssh-config` |
| 2026-05-04 | CLI vs interactive mode | Interactive: REPL for v0.1, `textual` TUI for v0.2+ |
| 2026-05-04 | RAG embedding model | `nomic-embed-text` via Ollama (local, air-gapped safe) — ⬜ pending confirmation |
| 2026-05-04 | RAG vector store (Tier 1) | In-memory numpy cosine similarity — zero deps, session-scoped |
| 2026-05-04 | RAG vector store (Tier 2/3) | `chromadb` embedded mode (default) or `qdrant` self-hosted — ⬜ pending confirmation |
| 2026-05-04 | RAG chunking unit | Command-boundary splitting — each collected command = one or more chunks |
| 2026-05-04 | Runbook format | Markdown with YAML frontmatter, version-controlled in `runbooks/` directory |

85
docs/ARCHITECTURE.md Normal file
View File

@@ -0,0 +1,85 @@
# Architecture
This document describes tai's current runtime architecture, module responsibilities, and data flow.
## High-Level Flow
1. User runs `tai run` with issue text and target host settings.
1. CLI validates input and opens a shared SSH session.
1. Probe and collection run against a read-only command plan.
1. Collection output is converted into diagnostic chunks.
1. Optional RAG retrieval selects top-k chunks per question.
1. Optional runbook retrieval selects top-k runbook chunks from ChromaDB.
1. Prompt builder composes system + user message.
1. AI completion returns analysis.
1. Guardrails validate response quality signals.
1. Optional session logger writes JSONL events.
## Module Layout
- `src/tai/cli.py`
- Command definitions (`run`, `runbooks sync/list/add`)
- Orchestration across SSH, collection, RAG, prompts, AI, and logging
- `src/tai/input_parser.py`
- User input validation and request normalization
- `src/tai/models.py`
- Core dataclasses (`TroubleshootRequest`)
- `src/tai/ssh_client.py`
- SSH invocation
- Read-only command policy validation
- Probe and command execution helpers
- `src/tai/plan.py`
- Issue keyword/service extraction
- Command plan generation
- Service/subsystem presence probes (unit files, binaries)
- `src/tai/collectors.py`
- Executes command plans and builds `CollectionReport`
- `src/tai/rag_retriever.py`
- Command-output chunking
- Embedding wrapper structures
- Similarity retrieval and scoring
- `src/tai/runbook_store.py`
- Persistent ChromaDB runbook indexing and querying
- `src/tai/chroma_telemetry.py`
- No-op telemetry adapter for Chroma local usage
- `src/tai/prompt_builder.py`
- Prompt assembly for full-context and retrieved-context paths
- `src/tai/ai_client.py`
- OpenAI-compatible completions and embeddings client
- `src/tai/ai_guardrails.py`
- Lightweight response guardrails and warnings
- `src/tai/session_log.py`
- Optional JSONL event logging
## Data Stores
- Runbook store (Tier 2): local ChromaDB path, default `~/.tai/runbooks`
- Session logs: optional JSONL file configured by `--log-file`
## Retrieval Layers
- Tier 1 (implemented): in-memory semantic retrieval over diagnostic chunks
- Tier 2 (implemented): persistent semantic retrieval over runbook corpus
- Tier 3 (pending): persistent retrieval over prior sessions
## Safety Boundaries
Read-only policy is enforced before each remote command execution.
- Allowed command families are explicitly enumerated.
- Shell composition operators are blocked.
- Commands that fail execution are recorded and surfaced to the model as non-evidence.
## Failure and Fallback Behavior
- If RAG indexing fails, analysis falls back to full-context prompts.
- If runbook store is unavailable, analysis proceeds without runbook context.
- If AI call fails, CLI exits with non-zero status and displays an error.
## Test Coverage Highlights
- Planner behavior and service detection
- Prompt formatting and guardrail-sensitive messaging
- CLI command behavior and interactive loop controls
- Runbook store parsing/index/query behavior (with mocked Chroma)
- SSH policy validation and command execution contract

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "tai"
version = "0.1.0"
version = "0.4.0"
description = "Linux AI-driven troubleshooting agent"
readme = "README.md"
requires-python = ">=3.11"
@@ -19,6 +19,9 @@ dependencies = [
]
[project.optional-dependencies]
rag = [
"chromadb>=0.5,<1.0",
]
dev = [
"pytest>=8.2,<9.0",
"ruff>=0.5,<1.0",
@@ -51,3 +54,11 @@ select = ["E", "F", "I", "UP", "B"]
python_version = "3.11"
strict = true
warn_unused_configs = true
[[tool.mypy.overrides]]
module = ["chromadb", "chromadb.*"]
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = ["tai.chroma_telemetry"]
disable_error_code = ["misc"]

86
runbooks/apparmor.md Normal file
View File

@@ -0,0 +1,86 @@
---
service: apparmor
symptoms: permission denied despite correct unix permissions, apparmor deny logs, service blocked by profile, executable transition denied, path access denied, snap confinement issue, profile in complain mode
tags: apparmor, security, profile, aa-status, audit, confinement, complain, enforce, snap
---
## Symptoms
- Application gets `Permission denied` even though Unix permissions look correct
- Service starts in complain mode but fails in enforce mode
- Log shows AppArmor `DENIED` entries
- Binary works when profile is disabled but fails when confinement is enabled
- Snap or packaged app cannot access expected files or sockets
## Diagnostics
### Check AppArmor status and loaded profiles
```
aa-status
systemctl status apparmor
```
Confirm whether the profile is loaded and whether it is in enforce or complain mode.
### Check denial logs
```
journalctl -k | grep -i apparmor
journalctl -b | grep -i DENIED
dmesg | grep -i apparmor
```
AppArmor denials usually identify the profile, operation, and path that was blocked.
### Inspect the active profile
```
find /etc/apparmor.d -maxdepth 2 -type f | sort
cat /etc/apparmor.d/<profile>
```
Look for missing file path rules, capability rules, and `ix`/`px` execution transitions.
### Check complain vs enforce mode
```
aa-status | grep complain
```
If the issue only occurs in enforce mode, the profile is too restrictive rather than the app being broken.
### Check profile parser and reload
```
apparmor_parser -r /etc/apparmor.d/<profile>
aa-status
```
Syntax or include errors can prevent an updated profile from loading.
## Remediation
**Profile too restrictive:**
Add the missing path, capability, or network rule to the profile, then reload AppArmor.
If the denial pattern is repetitive, use AppArmor tooling to review and refine the profile instead of disabling confinement globally.
**Need to observe without blocking:**
Temporarily switch the profile to complain mode:
```
aa-complain /etc/apparmor.d/<profile>
```
**Return to enforcement after fixing rules:**
```
aa-enforce /etc/apparmor.d/<profile>
```
**Profile reload after changes:**
```
apparmor_parser -r /etc/apparmor.d/<profile>
systemctl reload apparmor
```
Do not disable AppArmor globally when the issue is isolated to a single profile.

106
runbooks/disk.md Normal file
View File

@@ -0,0 +1,106 @@
---
service: disk
symptoms: no space left on device, disk full, inode exhaustion, df shows 100%, du large files, write failed, cannot create file, filesystem read-only, ext4 error
tags: disk, filesystem, storage, inodes, df, du, ext4, xfs, lvm, partition, full, space
---
## Symptoms
- `No space left on device` — disk or inode exhaustion
- `df -h` shows a filesystem at 100% (or near 100%)
- `df -i` shows inode usage at 100% — file count exhausted even if byte space is free
- Filesystem remounted read-only — kernel detected errors and protected itself
- Services failing to write logs, create temp files, or open sockets
## Diagnostics
### Overall disk usage
```
df -h
df -i
```
`df -h` shows byte space; `df -i` shows inode usage. Both can be independently exhausted.
Note which filesystem is full (`/`, `/var`, `/tmp`, `/home`, etc.).
### Find the large directories
```
du -sh /* 2>/dev/null | sort -rh | head -20
du -sh /var/* 2>/dev/null | sort -rh | head -20
du -sh /var/log/* 2>/dev/null | sort -rh | head -20
```
### Find large individual files
```
find / -xdev -type f -size +100M 2>/dev/null | sort -k5 -rn
find /var/log -type f -size +50M 2>/dev/null
```
### Find deleted-but-open files holding space
```
lsof +L1 2>/dev/null | grep -v "^COMMAND"
```
Files deleted while a process still has them open do not free space until the process releases the file descriptor.
### Inode exhaustion — find directories with many small files
```
find / -xdev -printf '%h\n' 2>/dev/null | sort | uniq -c | sort -rn | head -20
```
### Filesystem errors (after a crash or read-only remount)
```
dmesg | grep -i 'ext4\|xfs\|btrfs\|error\|corrupt'
journalctl -k | grep -i 'filesystem\|disk\|io error'
```
### LVM / partition layout
```
lsblk
pvs
vgs
lvs
```
## Remediation
**Large log files — truncate safely (do NOT rm while in use):**
```
truncate -s 0 /var/log/<logfile>
```
Or configure log rotation in `/etc/logrotate.d/`.
**Old journal logs eating space:**
```
journalctl --disk-usage
journalctl --vacuum-size=500M
journalctl --vacuum-time=30d
```
**Deleted-but-open files — restart the holding process to release space:**
Identify the PID from `lsof +L1`, then:
```
systemctl restart <service>
```
**Inode exhaustion — remove many small files:**
Common culprits: PHP session files in `/var/lib/php/sessions/`, old apt cache, tmp dirs.
```
find /var/lib/php/sessions -type f -mtime +7 -delete
apt-get clean
find /tmp -type f -mtime +3 -delete
```
**Extend LVM volume (if free extents exist in the volume group):**
```
lvextend -l +100%FREE /dev/<vg>/<lv>
resize2fs /dev/<vg>/<lv> # ext4
xfs_growfs /mountpoint # xfs
```

120
runbooks/docker.md Normal file
View File

@@ -0,0 +1,120 @@
---
service: docker
symptoms: cannot connect to docker daemon, docker daemon failed to start, docker socket permission denied, containers cannot resolve dns, docker network broken, daemon.json conflict, docker oom, unable to remove filesystem
tags: docker, dockerd, containerd, container, daemon, daemon.json, cgroup, dns, docker0, socket, compose
---
## Symptoms
- `Cannot connect to the Docker daemon. Is the docker daemon running on this host?`
- `permission denied` on `/var/run/docker.sock`
- `dockerd` fails to start after a `daemon.json` change
- Containers cannot resolve DNS or pull images
- Docker bridge/network disappears or container networking breaks after boot
- Container or daemon is killed by the kernel OOM killer
- `Error: Unable to remove filesystem` when removing a container
## Diagnostics
### Check daemon health and client target
```
docker info
systemctl is-active docker
systemctl status docker
ps -ef | grep dockerd
env | grep DOCKER_HOST
```
If `DOCKER_HOST` is set incorrectly, the CLI may be talking to the wrong daemon.
### Check daemon logs and startup failures
```
journalctl -u docker -n 200
journalctl -u containerd -n 100
cat /etc/docker/daemon.json
systemctl cat docker
```
Look for conflicts between `daemon.json` keys and systemd startup flags, especially duplicate `hosts` settings.
### Check socket permissions and group access
```
ls -la /var/run/docker.sock
id
getent group docker
ls -la ~/.docker/
```
If the user was added to the `docker` group recently, a new login shell may be required.
### Check kernel, cgroups, and memory pressure
```
uname -r
free -h
dmesg | grep -i -E 'docker|cgroup|oom|killed process'
```
Low memory, missing kernel features, or cgroup issues can stop containers or the daemon.
### Check Docker networking and DNS
```
docker network ls
ip addr show docker0
sysctl net.ipv4.ip_forward
cat /etc/resolv.conf
ps aux | grep dnsmasq
```
Loopback DNS resolvers in `/etc/resolv.conf` often break container DNS unless Docker is given explicit nameservers.
### Check storage and stuck mounts
```
df -h /var/lib/docker
docker system df
lsof /var/lib/docker
```
Bind-mounting `/var/lib/docker` into other containers can keep container filesystems busy and block removal.
## Remediation
**Daemon not running or client aimed at the wrong host:**
Unset an incorrect `DOCKER_HOST`, then start the daemon:
```
unset DOCKER_HOST
systemctl restart docker
```
**`daemon.json` conflicts with systemd flags:**
Remove duplicate settings or create a systemd override so `dockerd` is started without conflicting flags.
**Permission denied on Docker socket:**
Add the user to the `docker` group, then re-login:
```
usermod -aG docker $USER
newgrp docker
```
If `~/.docker/` was created by `sudo`, fix ownership:
```
sudo chown "$USER":"$USER" "$HOME/.docker" -R
sudo chmod g+rwx "$HOME/.docker" -R
```
**Container DNS broken:**
Configure explicit DNS servers in `/etc/docker/daemon.json`, then restart Docker.
**Docker networking disappears after boot:**
Stop the host network manager from managing Docker interfaces and confirm `net.ipv4.ip_forward=1`.
**OOM kills:**
Treat this as host memory pressure first; reduce workload, add memory, or enforce container memory limits.
**Unable to remove filesystem:**
Find the process holding the path open with `lsof`, then stop that process or the container bind-mounting `/var/lib/docker`.

117
runbooks/kernel.md Normal file
View File

@@ -0,0 +1,117 @@
---
service: kernel
symptoms: OOM kill, out of memory, high load average, kernel panic, segfault, soft lockup, CPU steal, system unresponsive, zombie processes, NMI watchdog
tags: kernel, oom, memory, load, cpu, panic, dmesg, segfault, lockup, swap, zombie
---
## Symptoms
- `Out of memory: Kill process <pid>` in dmesg — OOM killer fired
- Load average far above CPU count — system overloaded or I/O blocked
- `kernel: BUG: soft lockup` — CPU stuck in kernel code
- `segfault at ...` in dmesg — process crashed due to invalid memory access
- `kernel panic` — unrecoverable kernel error (visible only on console or serial)
- Many zombie (`Z`) processes in `ps` output
- High `%steal` in `top`/`vmstat` — hypervisor CPU contention
## Diagnostics
### Recent kernel messages
```
dmesg -T | tail -100
dmesg -T | grep -iE 'error|warn|oom|kill|panic|oops|fault|hung|lockup'
journalctl -k -n 200
```
### OOM events
```
dmesg -T | grep -i 'out of memory\|oom_kill\|killed process'
```
The log shows which process was killed, its RSS at time of kill, and available memory.
### Memory usage
```
free -h
cat /proc/meminfo | head -30
vmstat -s
```
`MemAvailable` is the key metric. If it is near zero and swap is also exhausted, OOM kills are imminent.
### Swap
```
swapon --show
cat /proc/swaps
vmstat 1 5
```
High `si`/`so` (swap-in/swap-out) in `vmstat` indicates active swapping and likely memory pressure.
### Load average and CPU
```
uptime
top -b -n1 | head -30
mpstat -P ALL 1 3
```
Load average above 2× CPU count sustained over 15 minutes is concerning.
High `%iowait` indicates processes blocked on disk I/O, not CPU-bound load.
### Process memory usage
```
ps aux --sort=-%mem | head -20
ps aux --sort=-%cpu | head -20
```
### Zombie processes
```
ps aux | awk '$8=="Z"'
```
Zombies cannot be killed; the parent must `wait()` for them or be killed itself.
### I/O wait and disk health
```
iostat -x 1 3
dmesg -T | grep -iE 'i/o error|hard resetting link|ata.*error|blk_update_request'
```
Persistent I/O errors alongside high load suggest failing storage.
## Remediation
**Memory pressure / frequent OOM kills:**
Identify the largest memory consumers from `ps aux --sort=-%mem`.
Consider increasing swap, adding RAM, tuning `vm.overcommit_memory`, or scaling the workload.
Do NOT just raise `vm.overcommit_ratio` without understanding the root consumer.
**Adjust OOM killer scoring for critical services (temporary, resets on reboot):**
```
echo -17 > /proc/<pid>/oom_adj # legacy
echo -1000 > /proc/<pid>/oom_score_adj # current kernels
```
**Swap exhausted — add a swapfile:**
```
fallocate -l 2G /swapfile
chmod 600 /swapfile
mkswap /swapfile
swapon /swapfile
```
**High I/O wait — find the I/O-heavy process:**
```
iotop -a -o -b -n3
```
**Zombie reaping — if parent is stuck:**
Kill the parent process (it will reap children on exit), then verify zombies disappear.

99
runbooks/nginx.md Normal file
View File

@@ -0,0 +1,99 @@
---
service: nginx
symptoms: 502 Bad Gateway, 504 Gateway Timeout, upstream connection refused, nginx not starting, failed to bind socket, permission denied reading config, configuration test failed
tags: nginx, web, http, https, proxy, upstream, reverse-proxy, load-balancer
---
## Symptoms
- `502 Bad Gateway` — nginx reached the upstream but got an invalid response, or upstream is down
- `504 Gateway Timeout` — upstream took too long to respond
- `111: Connection refused` in nginx error log — upstream process is not running or not on the expected port
- `nginx.service: Start request repeated too quickly` — crash-loop; check error log
- `[emerg] bind() to 0.0.0.0:80 failed (98: Address already in use)` — port conflict
- `[emerg] open() ... failed (13: Permission denied)` — file permission issue
## Diagnostics
### Service status
```
systemctl status nginx
```
### Config test
```
nginx -t
```
A config error is the most common reason for nginx failing to start or reload.
### Error log
```
journalctl -u nginx -n 100
tail -n 100 /var/log/nginx/error.log
```
For 502/504 errors look for: `connect() failed`, `upstream timed out`, `no live upstreams`.
### Access log — recent requests
```
tail -n 50 /var/log/nginx/access.log
```
### Check upstream services
For `proxy_pass` targets, verify the upstream is running:
```
systemctl status <upstream-service>
ss -tlnp | grep <upstream-port>
```
Common upstreams: `gunicorn`, `uwsgi`, `node`, `puma`, `php-fpm`.
### Port binding conflicts
```
ss -tlnp | grep ':80\|:443'
```
### Config files
```
cat /etc/nginx/nginx.conf
ls /etc/nginx/sites-enabled/
cat /etc/nginx/sites-enabled/<vhost>
```
Check `proxy_pass`, `upstream` blocks, `proxy_connect_timeout`, `proxy_read_timeout`.
## Remediation
**Upstream service not running:**
Start the upstream service, then verify nginx resumes proxying.
**Config syntax error:**
Fix the error shown by `nginx -t`, then:
```
systemctl reload nginx
```
**Port already in use:**
Find the conflicting process with `ss -tlnp | grep :80`, stop it, then restart nginx.
**Upstream timeouts — increase timeouts (caution: treat the slow upstream as the root cause):**
```nginx
proxy_connect_timeout 10s;
proxy_read_timeout 60s;
proxy_send_timeout 60s;
```
**Permission denied on log or socket file:**
```
ls -la /var/log/nginx/
ls -la /run/nginx.pid
chown -R www-data:www-data /var/log/nginx/
```

107
runbooks/postgres.md Normal file
View File

@@ -0,0 +1,107 @@
---
service: postgres
symptoms: connection refused port 5432, FATAL password authentication failed, replication lag, disk full, out of shared memory, too many connections, relation does not exist, could not connect to the primary
tags: postgres, postgresql, database, replication, pg, psql, disk, connections
---
## Symptoms
- `could not connect to server: Connection refused` — postgres not running or not on port 5432
- `FATAL: password authentication failed for user "<user>"` — wrong credentials or pg_hba mismatch
- `FATAL: too many connections` — connection pool exhausted
- `ERROR: could not resize shared memory segment` / `out of shared memory` — shared_buffers too high for system
- `PANIC: could not write to file "pg_wal/..."` — disk full on WAL directory
- Replication lag growing — standby falling behind primary
- `FATAL: could not connect to the primary server` — standby cannot reach primary
## Diagnostics
### Service status
```
systemctl status postgresql
systemctl status postgresql@<version>-main
```
### PostgreSQL logs
```
journalctl -u postgresql -n 100
tail -n 100 /var/log/postgresql/postgresql-*.log
```
### Is postgres listening?
```
ss -tlnp | grep 5432
```
### Disk space (WAL and data directory are the critical paths)
```
df -h
du -sh /var/lib/postgresql/
du -sh /var/lib/postgresql/*/main/pg_wal/
```
A full disk on the pg_wal partition causes a PANIC and hard crash.
### Connection count
```sql
SELECT count(*), state FROM pg_stat_activity GROUP BY state;
SELECT setting FROM pg_settings WHERE name = 'max_connections';
```
### Replication lag (run on primary)
```sql
SELECT client_addr, state, sent_lsn, write_lsn, flush_lsn, replay_lsn,
(sent_lsn - replay_lsn) AS lag_bytes
FROM pg_stat_replication;
```
### pg_hba.conf — authentication rules
```
cat /etc/postgresql/*/main/pg_hba.conf
```
Entries are matched top-to-bottom. `reject` or missing entry for the client IP causes auth failure even with correct credentials.
### Shared memory / kernel settings
```
cat /proc/sys/kernel/shmmax
cat /etc/postgresql/*/main/postgresql.conf | grep shared_buffers
```
`shared_buffers` must not exceed ~40% of RAM; kernel `shmmax` must accommodate it.
## Remediation
**Postgres not running:**
```
systemctl start postgresql
```
Check logs immediately after start for the failure reason.
**Authentication failure (pg_hba mismatch):**
Add or update the correct entry in `pg_hba.conf`, then reload:
```
systemctl reload postgresql
```
**Too many connections — increase limit (requires restart):**
In `postgresql.conf`:
```
max_connections = 200
```
Or deploy a connection pooler (`pgbouncer`).
**Disk full on WAL:**
Identify and remove old base backups or archived WAL segments under `/var/lib/postgresql/*/main/pg_wal/`.
Do NOT delete pg_wal files directly — use `pg_archivecleanup` or let archiving catch up.
**Replication lag — standby too far behind:**
Check network bandwidth and I/O on standby. If `wal_receiver_status_interval` lag is large, increase `wal_sender_timeout` temporarily.

112
runbooks/selinux.md Normal file
View File

@@ -0,0 +1,112 @@
---
service: selinux
symptoms: permission denied despite correct unix permissions, service blocked by selinux, avc denied, file context mismatch, port binding denied, boolean missing, domain transition failure
tags: selinux, avc, enforcing, security, policy, restorecon, audit, sealert, semanage
---
## Symptoms
- Service gets `Permission denied` even though file ownership and mode look correct
- Process cannot bind to a port or open a file after a config change
- AVC denials appear in audit logs
- App works when SELinux is permissive but fails in enforcing mode
- Newly created files under custom paths are inaccessible to a confined service
## Diagnostics
### Confirm SELinux mode and policy
```
getenforce
sestatus
cat /etc/selinux/config
```
If SELinux is `Permissive`, denials are logged but not enforced.
### Check AVC denials
```
auditctl -s
ausearch -m AVC,USER_AVC,SELINUX_ERR,USER_SELINUX_ERR -ts recent
journalctl -t setroubleshoot -n 50
dmesg | grep -i -e type=1300 -e type=1400
```
AVC denials are the primary source of truth for SELinux policy failures.
If AVCs are missing but SELinux still appears involved, temporarily disable `dontaudit` rules to expose hidden denials:
```
semodule -DB
```
Re-enable them after reproducing the issue:
```
semodule -B
```
### Inspect file contexts
```
ls -lZ /path/to/file
ps -eZ | grep <service>
matchpathcon -V /path/to/file
```
A service can have correct Unix permissions and still fail if the SELinux context is wrong.
### Check port labeling and booleans
```
semanage port -l | grep <port>
getsebool -a | grep <service-or-feature>
semanage boolean -l | grep <service-or-feature>
```
Custom ports often require explicit SELinux port labels.
### Check for relabeling needs
```
restorecon -nRv /path
matchpathcon /path/to/file
sealert -l "*"
```
`restorecon -n` shows what would change without modifying labels.
`sealert` is often the fastest way to turn a raw AVC into a concrete fix, but treat `audit2allow` suggestions as a last resort, not a first response.
## Remediation
**Wrong file context:**
Restore the default context:
```
restorecon -Rv /path
```
**Custom application path needs persistent labeling:**
```
semanage fcontext -a -t <type> '/custom/path(/.*)?'
restorecon -Rv /custom/path
```
**Custom port binding denied:**
Add the port label required by the service type:
```
semanage port -a -t <port_type> -p tcp <port>
```
**Boolean disabled:**
Enable the needed boolean persistently:
```
setsebool -P <boolean_name> on
```
**Still unsure whether SELinux is the blocker:**
Temporarily switch to permissive mode and reproduce the issue:
```
setenforce 0
```
If the problem still occurs, SELinux is not the root cause.
Do not disable SELinux or generate custom policy modules as a first response. Fix labels, booleans, or port mappings first.

100
runbooks/ssh.md Normal file
View File

@@ -0,0 +1,100 @@
---
service: ssh
symptoms: connection refused, authentication failed, host key mismatch, permission denied, timeout connecting, no route to host
tags: ssh, sshd, openssh, authentication, network, connectivity
---
## Symptoms
- `ssh: connect to host <hostname> port 22: Connection refused`
- `Permission denied (publickey)` — key not accepted or wrong user
- `WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!` — host key mismatch
- `Connection timed out` — firewall blocking or host unreachable
- `No route to host` — routing issue or host is down
## Diagnostics
### Is sshd running?
```
systemctl status sshd
systemctl status ssh
```
A stopped or failed sshd is the most common cause of "connection refused".
### Check sshd configuration
```
sshd -t
cat /etc/ssh/sshd_config
```
Look for: `PasswordAuthentication`, `PubkeyAuthentication yes`, `AuthorizedKeysFile`.
### Check authorised keys
```
ls -la ~/.ssh/
cat ~/.ssh/authorized_keys
```
Permissions must be: `~/.ssh``700`, `authorized_keys``600`.
Wrong permissions cause silent auth failure even with the correct key.
### Check sshd logs
```
journalctl -u sshd -n 100
journalctl -u ssh -n 100
grep sshd /var/log/auth.log | tail -50
```
Look for: `Invalid user`, `Failed publickey`, `Connection reset by peer`, `Too many authentication failures`.
### Check listening port
```
ss -tlnp | grep sshd
netstat -tlnp | grep :22
```
If sshd is running but not listening on the expected port, check `Port` in `/etc/ssh/sshd_config`.
### Firewall rules
```
iptables -L INPUT -n -v
nft list ruleset
ufw status verbose
```
A DROP rule on port 22 causes silent timeouts, not "connection refused".
## Remediation
**sshd not running:**
```
systemctl enable --now sshd
```
**Wrong permissions on authorized_keys:**
```
chmod 700 ~/.ssh
chmod 600 ~/.ssh/authorized_keys
chown -R $USER:$USER ~/.ssh
```
**sshd config error:**
Fix the error reported by `sshd -t`, then:
```
systemctl restart sshd
```
**Host key mismatch (expected after reinstall/reprovisioning):**
Remove the old key from the client:
```
ssh-keygen -R <hostname>
```
Only do this if you are certain the host was intentionally reprovisioned.
If the key change is unexpected, treat as a potential MITM and investigate before connecting.

115
runbooks/sssd.md Normal file
View File

@@ -0,0 +1,115 @@
---
service: sssd
symptoms: login denied, user not found, id command hangs, sudo rules missing, ldap auth failure, kerberos failure, cache stale, offline authentication not working
tags: sssd, ldap, kerberos, ad, identity, auth, pam, nss, sudo
---
## Symptoms
- `id <user>` hangs or returns no such user for a domain account
- SSH or console login fails for directory-backed users
- Group membership is missing or incomplete
- `sudo` rules from LDAP/AD do not appear
- Authentication works intermittently or only after cache flush
- Offline authentication fails when the directory is unreachable
## Diagnostics
### Check service health
```
systemctl status sssd
sssctl domain-list
sssctl config-check
cat /etc/nsswitch.conf
```
A running daemon with a valid config and `sss` present in `nsswitch.conf` are the first prerequisites.
### Check identity resolution
```
id <user>
getent passwd <user>
getent group <group>
```
If NSS lookups fail, the issue is often in SSSD configuration, connectivity, or cache.
### Check SSSD logs
```
journalctl -u sssd -n 100
ls -la /var/log/sssd/
tail -n 100 /var/log/sssd/*.log
sssctl logs-fetch
```
Look for: backend offline, LDAP bind failures, Kerberos errors, TLS problems, and access provider denials.
If the issue is unclear, raise `debug_level=6` in the relevant `[nss]`, `[pam]`, and `[domain/<name>]` sections. Raising debug only in `[sssd]` is not enough for most real failures.
### Check domain reachability
```
sssctl domain-status <domain>
ping <ldap-or-ad-host>
dig -t SRV _ldap._tcp.<domain>
cat /etc/resolv.conf
```
If the identity provider is unreachable, SSSD may serve cached data only or fail entirely.
### Check Kerberos and LDAP configuration
```
cat /etc/sssd/sssd.conf
cat /etc/krb5.conf
kinit <user>
klist
ldapsearch -ZZ -x -H ldap://<server> -b <base-dn>
```
Look for wrong realm names, bad server addresses, TLS settings, and access filters.
For AD or IPA providers, Kerberos and DNS are often the real dependency chain: broken SRV lookup, keytab issues, or a slow KDC will surface as SSSD failures.
### Check cache and permissions
```
ls -la /var/lib/sss/db/
sssctl cache-status
sssctl cache-expire -E
```
`/etc/sssd/sssd.conf` must usually be mode `600` or SSSD will refuse to start.
Do not wipe cache files blindly on an offline system that depends on cached logins.
## Remediation
**Config syntax or permission issue:**
Fix `sssd.conf`, set secure permissions, then restart:
```
chmod 600 /etc/sssd/sssd.conf
systemctl restart sssd
```
**Stale cache:**
Clear cache carefully, then repopulate with a fresh lookup:
```
sss_cache -E
id <user>
```
**Kerberos failure:**
Validate time sync, realm, keytab credentials, and KDC reachability before changing LDAP settings.
**Backend offline or `sdap_async_sys_connect request failed`:**
Treat as DNS/network first. Validate SRV records and TLS handshake before increasing `ldap_network_timeout` or `ldap_search_timeout`.
**Access denied despite successful lookup:**
Check `access_provider`, LDAP filters, HBAC rules, or AD group-based access restrictions.
**No `pam_sss` messages at all:**
The PAM stack is likely misconfigured. Fix the PAM/authselect profile before changing SSSD itself.

89
runbooks/wayland.md Normal file
View File

@@ -0,0 +1,89 @@
---
service: wayland
symptoms: wayland session fails, gdm falls back to xorg, black screen on login, fractional scaling broken, screen sharing broken, remote desktop broken, wlroots crash, compositor crash
tags: wayland, compositor, gnome, kde, mutter, wlroots, pipewire, xwayland, graphics
---
## Symptoms
- User selects a Wayland session but is returned to login
- GDM or another display manager falls back to Xorg
- Screen sharing, remote desktop, or clipboard integration is broken
- Apps requiring XWayland fail while native Wayland apps work
- Fractional scaling or multi-monitor layout behaves incorrectly
- Wayland compositor crashes after login
## Diagnostics
### Confirm the active session type
```
echo $XDG_SESSION_TYPE
loginctl show-session $XDG_SESSION_ID -p Type
echo $WAYLAND_DISPLAY
```
If the session type is `x11`, you are not debugging an active Wayland session.
### Check display manager and compositor logs
```
systemctl status gdm
journalctl -b | grep -iE 'wayland|mutter|kwin|wlroots|xwayland'
journalctl -b | grep -i 'renderer for'
```
Look for compositor crashes, GPU driver incompatibilities, and forced Xorg fallback messages.
### Check XWayland and PipeWire components
```
which Xwayland
systemctl --user status pipewire
systemctl --user status xdg-desktop-portal
systemctl --user status xdg-desktop-portal-gnome
systemctl --user status xdg-desktop-portal-kde
xlsclients -l
```
Broken screen sharing is often a PipeWire or portal issue, not a compositor issue.
`xlsclients -l` helps identify apps that are actually running under XWayland rather than native Wayland.
### Check GPU compatibility
```
lspci -k | grep -A3 -E 'VGA|3D|Display'
lsmod | grep -E 'nvidia|nouveau|amdgpu|i915'
```
Wayland support quality depends heavily on the GPU driver stack.
### Check environment and session overrides
```
env | grep -E 'WAYLAND|XDG|GDK_BACKEND|QT_QPA_PLATFORM'
cat /etc/gdm/custom.conf
wayland-info
```
Environment overrides can force apps onto X11 or disable Wayland entirely.
For NVIDIA systems, confirm the compositor is using a supported buffer path (GBM on current drivers is the expected default).
## Remediation
**Wayland disabled in display manager config:**
Check `WaylandEnable=false` or similar settings and remove the override if unintended.
**Fallback to Xorg on unsupported GPU stack:**
Upgrade or change the graphics driver; Wayland stability is often limited by the driver, not the compositor.
**Screen sharing broken:**
Fix PipeWire and `xdg-desktop-portal` services before changing compositor settings.
**XWayland-only app failures:**
Treat them separately from native Wayland issues; confirm `Xwayland` is installed and launching.
**Remote desktop, VM, or game input grabbing is broken:**
This is often a Wayland protocol/compositor support limitation, not a generic keyboard bug. Check compositor support for pointer constraints, relative pointer, and keyboard shortcut inhibit protocols.

106
runbooks/x2go.md Normal file
View File

@@ -0,0 +1,106 @@
---
service: x2go
symptoms: x2go session fails to start, x2go black screen, x2go disconnects immediately, no desktop in session, authentication failure, x2go agent not starting, sound forwarding broken
tags: x2go, nx, remote-desktop, x2goserver, x2goclient, session, desktop, xauth
---
## Symptoms
- X2Go login succeeds but the session immediately disconnects
- Black screen after login
- Session is created but no desktop appears
- `x2goruncommand error` or `X2Go Agent got stuck in state`
- Sound, clipboard, or drive sharing fails while login itself works
- Authentication works over SSH but X2Go session startup fails
## Diagnostics
### Check X2Go services and packages
```
systemctl status x2goserver
systemctl status sshd
rpm -qa | grep x2go
apt list --installed | grep x2go
which x2golistsessions
```
X2Go depends on working SSH plus installed `x2goserver` and `x2goserver-xsession` components.
### Check X2Go logs
```
journalctl -u x2goserver -n 100
journalctl -u sshd -n 100
ls -la ~/.x2go/
find ~/.x2go -type f -maxdepth 2 -print
x2golistsessions
```
Look for session startup failures, agent crashes, and auth helper errors.
### Check desktop environment startup command
```
cat /etc/x2go/Xsession
cat ~/.xsession
cat ~/.Xclients
```
A missing or broken desktop session command is a common cause of black screens.
### Check X11 and xauth availability
```
which xauth
xauth -V
ls -la ~/.Xauthority
which sshfs
```
X2Go requires a working X11 session setup. Missing `xauth` or a bad `.Xauthority` often breaks startup.
Filesystem and folder-sharing features may also depend on `sshfs` being installed.
### Check session limits and stale sessions
```
x2golistsessions
x2gocleansessions
ulimit -a
loginctl list-sessions
```
Stale sessions or per-user process limits can prevent a new desktop from starting.
### Check desktop dependencies
```
which startxfce4
which mate-session
which startplasma-x11
env | grep -E 'DESKTOP|XDG'
```
If the selected desktop command does not exist, X2Go may connect and then terminate immediately.
## Remediation
**Missing or broken desktop startup command:**
Set the session to a known-good desktop such as XFCE and verify the binary exists.
**Corrupt Xauthority or stale X2Go session files:**
Remove stale session state and regenerate auth files:
```
rm -f ~/.Xauthority
rm -rf ~/.x2go/C-*
```
**Missing `xauth` or X11 helpers:**
Install the missing X11 packages, then retry the session.
**Required server packages missing:**
Install `x2goserver` and `x2goserver-xsession` first, then retry before debugging desktop startup.
**SSH works but X2Go session fails:**
Treat it as a desktop startup or X11 auth problem, not an SSH transport problem.

94
runbooks/xorg.md Normal file
View File

@@ -0,0 +1,94 @@
---
service: xorg
symptoms: xorg black screen, display manager loop, no screens found, failed to start X server, GPU driver error, xrandr missing outputs, login screen not appearing
tags: xorg, x11, display, gpu, drm, xrandr, gdm, sddm, lightdm
---
## Symptoms
- Black screen after graphical boot
- Display manager loops back to login
- `no screens found` in Xorg log
- External monitors are missing or not detected
- X server fails after a driver update
- `startx` exits immediately with display or device errors
## Diagnostics
### Check display manager and Xorg service path
```
systemctl status display-manager
systemctl status gdm
systemctl status sddm
systemctl status lightdm
```
If the display manager is failing, inspect its logs before focusing on Xorg itself.
### Check Xorg logs
```
find /var/log -name 'Xorg*.log' -o -name 'Xorg.*.log'
grep -E '\(EE\)|\(WW\)' /var/log/Xorg.0.log
journalctl -b | grep -iE 'xorg|gdm|sddm|lightdm'
ls -la ~/.local/share/xorg/
```
Look for: `no screens found`, GPU module load failures, and permission/device access errors.
On rootless Xorg, logs are often under `~/.local/share/xorg/Xorg.0.log` instead of `/var/log/`.
### Check DRM and GPU driver state
```
lspci -k | grep -A3 -E 'VGA|3D|Display'
lsmod | grep -E 'nouveau|nvidia|amdgpu|i915'
dmesg | grep -iE 'drm|gpu|nvidia|amdgpu|i915'
```
Driver mismatches after kernel updates are a common cause of X startup failures.
### Check monitor detection and permissions
```
loginctl session-status
xrandr --query
ls -la /dev/dri/
ps -o user= -C Xorg
```
If `/dev/dri/*` permissions or seat assignment are wrong, X may fail to access the GPU.
### Check X configuration files
```
find /etc/X11 -maxdepth 3 -type f
cat /etc/X11/xorg.conf
cat /etc/X11/xorg.conf.d/*.conf
ls -la ~/.xinitrc ~/.xserverrc
```
Custom `Device`, `Monitor`, or `Screen` sections often break auto-detection.
An empty or broken `.xinitrc` can produce a black screen even when the X server itself started correctly.
## Remediation
**Bad static Xorg config:**
Move custom config aside and let auto-detection work unless the hardware truly needs manual config.
**Driver mismatch after update:**
Reinstall the GPU driver package matching the running kernel and reboot or restart the display manager.
**`no screens found`:**
Check whether the correct DRM module loaded and whether the display manager is running on the expected seat.
**Display manager loop:**
Correlate Xorg errors with PAM/auth logs; some loops are session startup failures, not graphics failures.
**Framebuffer mode failure:**
If X falls back to `fbdev` and errors with framebuffer/bus ID messages, remove the generic `fbdev` driver package and let Xorg use the proper modesetting or vendor driver.
**`SocketCreateListener() failed`:**
Check for stale sockets in `/tmp/.X11-unix`, especially after previous root-run Xorg sessions.

View File

@@ -10,6 +10,7 @@ from openai import OpenAI
DEFAULT_AI_HOST = "http://localhost:11434/v1"
DEFAULT_MODEL = "gemma3:4b"
DEFAULT_EMBED_MODEL = "nomic-embed-text"
@dataclass(slots=True)
@@ -21,6 +22,7 @@ class AIConfig:
api_key: str = "ollama" # Ollama ignores this; required by the openai client
timeout_seconds: float = 120.0
max_tokens: int = 4096
embed_model: str = DEFAULT_EMBED_MODEL
extra_headers: dict[str, str] = field(default_factory=dict)
@@ -106,3 +108,11 @@ class AIClient:
def summary(self) -> str:
"""Human-readable description of the AI config."""
return f"host={self._config.host} model={self._config.model}"
def embed(self, text: str) -> list[float]:
"""Embed *text* using the configured embedding model via the OpenAI-compatible endpoint."""
response = self._client.embeddings.create(
model=self._config.embed_model,
input=text,
)
return list(response.data[0].embedding)

View File

@@ -0,0 +1,22 @@
"""Local no-op telemetry implementation for ChromaDB.
ChromaDB expects a product telemetry client component. Some local package
combinations emit noisy PostHog errors even when anonymized telemetry is
disabled, so tai wires ChromaDB to this no-op client instead.
"""
from __future__ import annotations
from chromadb.config import System
from chromadb.telemetry.product import ProductTelemetryClient, ProductTelemetryEvent
class NoOpProductTelemetryClient(ProductTelemetryClient):
"""Telemetry client that intentionally drops all events."""
def __init__(self, system: System):
super().__init__(system)
def capture(self, event: ProductTelemetryEvent) -> None:
del event
return None

View File

@@ -3,23 +3,39 @@
from __future__ import annotations
import asyncio
import sys
from time import perf_counter
from typing import Annotated
import typer
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from rich.rule import Rule
from rich.text import Text
from tai.ai_client import DEFAULT_AI_HOST, DEFAULT_MODEL, AIClient, AIConfig
from tai.ai_client import DEFAULT_AI_HOST, DEFAULT_EMBED_MODEL, DEFAULT_MODEL, AIClient, AIConfig
from tai.ai_guardrails import validate_ai_response
from tai.collectors import CollectionReport, collect_from_plan
from tai.input_parser import InputValidationError, build_request
from tai.models import TroubleshootRequest
from tai.plan import plan_from_request
from tai.prompt_builder import build_followup_message, build_system_prompt, build_user_message
from tai.prompt_builder import (
build_analysis_message_with_chunks,
build_followup_message,
build_message_with_chunks,
build_system_prompt,
build_user_message,
)
from tai.rag_retriever import EmbeddedChunk, chunk_report, retrieve_scored
from tai.runbook_store import RunbookChunk, RunbookStore
from tai.session_log import SessionLogger
from tai.session_store import PastSession, SessionStore
from tai.ssh_client import SSHClient, SSHCommandResult, SSHConnectionConfig, SSHSession
app = typer.Typer(no_args_is_help=True, add_completion=False)
runbooks_app = typer.Typer(no_args_is_help=True, help="Manage the runbook knowledge base.")
app.add_typer(runbooks_app, name="runbooks")
console = Console()
@@ -87,6 +103,20 @@ def run(
str,
typer.Option("--ai-key", help="API key for the AI backend (not needed for Ollama)."),
] = "ollama",
ai_timeout_seconds: Annotated[
float,
typer.Option(
"--ai-timeout-seconds",
help="Timeout for AI requests/generation in seconds.",
),
] = 120.0,
ai_max_tokens: Annotated[
int,
typer.Option(
"--ai-max-tokens",
help="Upper bound for generated completion tokens.",
),
] = 1024,
log_file: Annotated[
str | None,
typer.Option(
@@ -94,6 +124,44 @@ def run(
help="Optional JSONL file path to log AI and session output.",
),
] = None,
no_rag: Annotated[
bool,
typer.Option(
"--no-rag",
help="Disable RAG; send full diagnostics to AI instead of retrieved chunks.",
),
] = False,
embed_model: Annotated[
str,
typer.Option(
"--embed-model",
help="Embedding model for RAG. Must be pulled in Ollama on the AI host.",
),
] = DEFAULT_EMBED_MODEL,
rag_debug: Annotated[
bool,
typer.Option(
"--rag-debug/--no-rag-debug",
help="Print retrieved chunk names/scores and log per-question retrieval metrics.",
),
] = False,
runbooks_path: Annotated[
str | None,
typer.Option(
"--runbooks",
help="Path to a synced runbook ChromaDB store. Enables Tier 2 RAG.",
),
] = None,
session_memory_path: Annotated[
str | None,
typer.Option(
"--session-memory",
help=(
"Path to persistent session memory store for prior-session retrieval "
"(Tier 4). Omit to disable."
),
),
] = None,
) -> None:
"""Start an interactive troubleshooting session scaffold."""
try:
@@ -119,20 +187,48 @@ def run(
)
summary = SSHClient(config).summary()
console.print("[bold green]tai[/bold green]")
console.print(f"Issue: {req.issue}")
console.print(f"SSH: {summary}")
console.print(Rule("[bold green]tai[/bold green]", style="green"))
console.print(f" [bold]Issue:[/bold] {req.issue}")
console.print(f" [bold]SSH:[/bold] {summary}")
if req.target_paths:
console.print(f"Paths: {', '.join(str(p) for p in req.target_paths)}")
console.print(f" [bold]Paths:[/bold] {', '.join(str(p) for p in req.target_paths)}")
console.print()
if not (probe or collect or analyze or interactive):
return # nothing SSH-related requested
ai_config = AIConfig(host=ai_host, model=model, api_key=ai_key)
ai_config = AIConfig(
host=ai_host,
model=model,
api_key=ai_key,
timeout_seconds=ai_timeout_seconds,
max_tokens=ai_max_tokens,
embed_model=embed_model,
)
logger = SessionLogger.create(log_file) if log_file else None
if analyze or interactive:
console.print(f"[cyan]AI:[/cyan] {AIClient(ai_config).summary()}")
runbook_store: RunbookStore | None = None
if runbooks_path is not None:
try:
runbook_store = RunbookStore(runbooks_path)
rb_count = runbook_store.count()
console.print(f"[dim]Runbooks: {rb_count} indexed at {runbooks_path}[/dim]")
except Exception as exc: # noqa: BLE001
console.print(f"[yellow]Runbook store unavailable:[/yellow] {exc}")
session_store: SessionStore | None = None
if session_memory_path:
try:
session_store = SessionStore(session_memory_path)
mem_count = session_store.count()
console.print(
f"[dim]Session memory: {mem_count} indexed at {session_memory_path}[/dim]"
)
except Exception as exc: # noqa: BLE001
console.print(f"[yellow]Session memory unavailable:[/yellow] {exc}")
try:
asyncio.run(
_async_main(
@@ -143,6 +239,10 @@ def run(
analyze=analyze,
interactive=interactive,
ai_config=ai_config,
no_rag=no_rag,
rag_debug=rag_debug,
runbook_store=runbook_store,
session_store=session_store,
logger=logger,
)
)
@@ -165,6 +265,10 @@ async def _async_main(
analyze: bool,
interactive: bool,
ai_config: AIConfig,
no_rag: bool,
rag_debug: bool,
runbook_store: RunbookStore | None,
session_store: SessionStore | None,
logger: SessionLogger | None,
) -> None:
"""Open a single SSH session and run probe / collection / analysis through it."""
@@ -211,11 +315,36 @@ async def _async_main(
},
)
initial_response: str | None = None
if analyze and report is not None:
_run_analysis(ai_config, req.issue, report, logger=logger)
initial_response = _run_analysis(
ai_config,
req.issue,
report,
no_rag=no_rag,
rag_debug=rag_debug,
runbook_store=runbook_store,
session_store=session_store,
logger=logger,
)
interactive_response: str | None = None
if interactive:
await _interactive_loop(session, req, ai_config, report, logger=logger)
interactive_response = await _interactive_loop(
session,
req,
ai_config,
report,
no_rag=no_rag,
rag_debug=rag_debug,
runbook_store=runbook_store,
session_store=session_store,
logger=logger,
)
final_response = interactive_response or initial_response
if session_store is not None and final_response:
_index_session_memory(session_store, ai_config, req, final_response, logger=logger)
async def _interactive_loop(
@@ -223,24 +352,73 @@ async def _interactive_loop(
req: TroubleshootRequest,
ai_config: AIConfig,
report: CollectionReport | None,
*,
no_rag: bool = False,
rag_debug: bool = False,
runbook_store: RunbookStore | None = None,
session_store: SessionStore | None = None,
logger: SessionLogger | None,
) -> None:
) -> str | None:
"""Run a follow-up loop for collecting and conversational analysis."""
console.print(
"[cyan]Interactive mode:[/cyan] "
"ask questions directly, or use /collect, /analyze, /help, /quit"
Panel(
"Ask questions directly, or use [bold]/collect[/bold], "
"[bold]/analyze[/bold], [bold]/help[/bold], [bold]/quit[/bold]",
title="[bold cyan]Interactive Mode[/bold cyan]",
border_style="cyan",
padding=(0, 1),
)
)
prior_questions: list[str] = []
embedded_chunks: list[EmbeddedChunk] | None = None
ai_embed = AIClient(ai_config)
last_response: str | None = None
if not no_rag and report is not None:
embedded_chunks, index_error, index_ms = await asyncio.to_thread(
_try_embed_report, report, ai_embed
)
if embedded_chunks is not None:
console.print(f"[dim]RAG: indexed {len(embedded_chunks)} diagnostic chunks[/dim]")
if logger is not None:
logger.log_event(
"rag_index",
{
"status": "ok",
"chunk_count": len(embedded_chunks),
"duration_ms": round(index_ms, 2),
},
)
else:
console.print(
"[yellow]RAG unavailable (indexing failed); using full-context fallback.[/yellow]"
)
if logger is not None:
logger.log_event(
"rag_index",
{
"status": "fallback",
"error": index_error,
"duration_ms": round(index_ms, 2),
},
)
while True:
try:
command = input("tai> ").strip()
if _stdin_is_tty():
command = console.input("\n[bold cyan]tai[/bold cyan][dim] >[/dim] ").strip()
else:
line = sys.stdin.readline() # non-TTY / piped mode
if not line:
return last_response
command = line.strip()
console.print(f"\n[bold cyan]tai[/bold cyan][dim] >[/dim] {command}")
except (EOFError, KeyboardInterrupt):
console.print("\n[yellow]Exiting interactive mode.[/yellow]")
if logger is not None:
logger.log_event("interactive_exit", {"reason": "signal_or_eof"})
return
return last_response
if not command:
continue
@@ -249,11 +427,21 @@ async def _interactive_loop(
console.print("[green]Bye.[/green]")
if logger is not None:
logger.log_event("interactive_exit", {"reason": "user_quit"})
return
return last_response
if command == "/help":
console.print("Commands: /collect, /analyze, /help, /quit")
console.print("Tip: any non-slash text is treated as a follow-up AI question.")
console.print(
Panel(
"[bold]/collect[/bold] — re-run diagnostics\n"
"[bold]/analyze[/bold] — re-analyze current diagnostics\n"
"[bold]/help[/bold] — show this message\n"
"[bold]/quit[/bold] — end session\n"
"[dim]Anything else is sent directly to the AI as a question.[/dim]",
title="[bold]Commands[/bold]",
border_style="dim",
padding=(0, 1),
)
)
continue
if command == "/collect":
@@ -261,6 +449,37 @@ async def _interactive_loop(
console.print(f"[cyan]Collecting diagnostics:[/cyan] {len(plan)} commands")
report = await collect_from_plan(session, plan)
_handle_collection_report(report)
if not no_rag:
embedded_chunks, index_error, index_ms = await asyncio.to_thread(
_try_embed_report, report, ai_embed
)
if embedded_chunks is not None:
console.print(
f"[dim]RAG: indexed {len(embedded_chunks)} diagnostic chunks[/dim]"
)
if logger is not None:
logger.log_event(
"rag_index",
{
"status": "ok",
"chunk_count": len(embedded_chunks),
"duration_ms": round(index_ms, 2),
},
)
else:
console.print(
"[yellow]RAG unavailable (indexing failed); "
"using full-context fallback.[/yellow]"
)
if logger is not None:
logger.log_event(
"rag_index",
{
"status": "fallback",
"error": index_error,
"duration_ms": round(index_ms, 2),
},
)
if logger is not None:
logger.log_event(
"collection_summary",
@@ -281,17 +500,23 @@ async def _interactive_loop(
console.print("[red]No diagnostics available to analyze.[/red]")
continue
_run_followup_analysis(
response = _run_followup_analysis(
ai_config,
req.issue,
report,
"Provide an updated diagnosis from the current diagnostics.",
prior_questions,
embedded_chunks=embedded_chunks,
rag_debug=rag_debug,
runbook_store=runbook_store,
session_store=session_store,
logger=logger,
)
prior_questions.append("/analyze")
if logger is not None:
logger.log_event("interactive_followup", {"question": "/analyze"})
last_response = response
continue
continue
if report is None:
@@ -299,46 +524,108 @@ async def _interactive_loop(
console.print(f"[cyan]Collecting diagnostics:[/cyan] {len(plan)} commands")
report = await collect_from_plan(session, plan)
_handle_collection_report(report)
if not no_rag:
embedded_chunks, index_error, index_ms = await asyncio.to_thread(
_try_embed_report, report, ai_embed
)
if embedded_chunks is not None:
console.print(
f"[dim]RAG: indexed {len(embedded_chunks)} diagnostic chunks[/dim]"
)
if logger is not None:
logger.log_event(
"rag_index",
{
"status": "ok",
"chunk_count": len(embedded_chunks),
"duration_ms": round(index_ms, 2),
},
)
else:
console.print(
"[yellow]RAG unavailable (indexing failed); "
"using full-context fallback.[/yellow]"
)
if logger is not None:
logger.log_event(
"rag_index",
{
"status": "fallback",
"error": index_error,
"duration_ms": round(index_ms, 2),
},
)
if report is None:
console.print("[red]No diagnostics available to analyze.[/red]")
continue
_run_followup_analysis(
response = _run_followup_analysis(
ai_config,
req.issue,
report,
command,
prior_questions,
embedded_chunks=embedded_chunks,
rag_debug=rag_debug,
runbook_store=runbook_store,
session_store=session_store,
logger=logger,
)
prior_questions.append(command)
if logger is not None:
logger.log_event("interactive_followup", {"question": command})
last_response = response
def _try_embed_report(
report: CollectionReport,
ai: AIClient,
) -> tuple[list[EmbeddedChunk] | None, str | None, float]:
"""Embed all diagnostic chunks from *report*.
Returns (chunks, error_message, duration_ms). On failure, chunks is None
and callers should fall back to non-RAG full-context prompts.
"""
start = perf_counter()
try:
chunks = chunk_report(report)
if not chunks:
return None, "no eligible chunks to index", (perf_counter() - start) * 1000.0
embedded = [EmbeddedChunk(chunk=c, embedding=ai.embed(c.content)) for c in chunks]
return embedded, None, (perf_counter() - start) * 1000.0
except Exception as exc: # noqa: BLE001
return None, str(exc), (perf_counter() - start) * 1000.0
def _handle_probe_result(result: SSHCommandResult) -> None:
"""Handle and render probe output for success or failure."""
console.print("[cyan]Running SSH probe:[/cyan] uname -a")
console.print("[dim]▶ SSH probe:[/dim] uname -a")
if result.exit_code != 0:
details = result.stderr or result.stdout or "no error output from ssh"
console.print(f"[red]Probe failed (exit {result.exit_code}):[/red] {details}")
console.print(f"[bold red]Probe failed[/bold red] (exit {result.exit_code}): {details}")
raise typer.Exit(code=1)
output = result.stdout or "(no output)"
console.print("[bold green]Probe succeeded.[/bold green]")
console.print(f"Remote: {output}")
console.print("[bold green]Probe succeeded.[/bold green]")
console.print(f" [dim]{output}[/dim]")
def _handle_collection_report(report: CollectionReport) -> None:
"""Render collected command status and truncation hints."""
console.print(
f"[bold]Collection complete:[/bold] {report.total} commands, {report.failed} failed"
failed_label = (
f"[red]{report.failed} failed[/red]" if report.failed else "[green]0 failed[/green]"
)
console.print(f"[bold]Collection complete:[/bold] {report.total} commands, {failed_label}")
for item in report.items:
status = "ok" if item.result.exit_code == 0 else f"exit {item.result.exit_code}"
truncated = item.result.stdout_truncated or item.result.stderr_truncated
trunc = " (truncated)" if truncated else ""
console.print(f"- {item.name}: {status}{trunc}")
trunc_label = " [dim](truncated)[/dim]" if truncated else ""
if item.result.exit_code == 0:
console.print(f" [green]✓[/green] [dim]{item.name}[/dim]{trunc_label}")
else:
console.print(
f" [red]✗[/red] {item.name} "
f"[red](exit {item.result.exit_code})[/red]{trunc_label}"
)
def _run_analysis(
@@ -346,23 +633,83 @@ def _run_analysis(
issue: str,
report: CollectionReport,
*,
no_rag: bool = False,
rag_debug: bool = False,
runbook_store: RunbookStore | None = None,
session_store: SessionStore | None = None,
logger: SessionLogger | None,
) -> None:
) -> str:
"""Send collected data to the AI and stream the analysis to stdout."""
console.print("[cyan]Analyzing...[/cyan]\n")
console.print()
console.print(Rule("[bold cyan]Analysis[/bold cyan]", style="cyan"))
console.print()
ai = AIClient(ai_config)
system_prompt = build_system_prompt()
user_message = build_user_message(issue, report)
runbook_chunks = _query_runbooks(runbook_store, issue, ai, top_k=1)
past_sessions = _query_sessions(session_store, issue, report.host, ai, top_k=2)
user_message: str
if no_rag:
user_message = build_user_message(
issue,
report,
runbook_chunks=runbook_chunks or None,
past_sessions=past_sessions or None,
)
else:
try:
chunks = chunk_report(report)
embedded = [EmbeddedChunk(chunk=c, embedding=ai.embed(c.content)) for c in chunks]
q_embedding = ai.embed(issue)
scored = retrieve_scored(q_embedding, embedded, top_k=3)
if rag_debug:
pairs = ", ".join(
f"{chunk.name}={score:.3f}" for chunk, score in scored
)
console.print(f"[dim]RAG retrieve (initial):[/dim] {pairs or 'no matches'}")
selected = [chunk for chunk, _score in scored]
if selected:
user_message = build_analysis_message_with_chunks(
issue,
report.host,
selected,
runbook_chunks=runbook_chunks or None,
past_sessions=past_sessions or None,
)
else:
user_message = build_user_message(
issue,
report,
runbook_chunks=runbook_chunks or None,
past_sessions=past_sessions or None,
)
except Exception as exc: # noqa: BLE001
console.print(
"[yellow]RAG unavailable for initial analysis; "
"using full-context fallback.[/yellow]"
)
if logger is not None:
logger.log_event("rag_index", {"status": "fallback", "error": str(exc)})
user_message = build_user_message(
issue,
report,
runbook_chunks=runbook_chunks or None,
past_sessions=past_sessions or None,
)
try:
chunks: list[str] = []
for chunk in ai.stream(system_prompt, user_message):
chunks.append(chunk)
response = "".join(chunks)
response = _complete_ai_response(
ai,
system_prompt,
user_message,
)
console.print(Markdown(response))
warnings = validate_ai_response(response)
for item in warnings:
console.print(f"[yellow]Guardrail warning:[/yellow] {item}")
warn_text = Text()
warn_text.append("⚠ Guardrail: ", style="bold yellow")
warn_text.append(item, style="yellow")
console.print(warn_text)
if logger is not None:
logger.log_event(
@@ -373,6 +720,7 @@ def _run_analysis(
"guardrail_warnings": warnings,
},
)
return response
except Exception as exc: # noqa: BLE001
console.print(f"[red]AI analysis failed:[/red] {exc}")
if logger is not None:
@@ -380,6 +728,15 @@ def _run_analysis(
raise typer.Exit(code=1) from exc
def _stdin_is_tty() -> bool:
return sys.stdin.isatty()
def _estimate_tokens(text: str) -> int:
"""Rough token estimate for metrics and tuning; assumes ~4 chars/token."""
return max(1, len(text) // 4)
def _run_followup_analysis(
ai_config: AIConfig,
issue: str,
@@ -387,24 +744,105 @@ def _run_followup_analysis(
question: str,
prior_questions: list[str],
*,
embedded_chunks: list[EmbeddedChunk] | None = None,
rag_debug: bool = False,
runbook_store: RunbookStore | None = None,
session_store: SessionStore | None = None,
logger: SessionLogger | None,
) -> str:
"""Run grounded follow-up analysis re-anchored to current diagnostics."""
console.print("[cyan]Analyzing...[/cyan]\n")
"""Run grounded follow-up analysis re-anchored to current diagnostics.
When *embedded_chunks* is provided, the question is embedded and top-k
relevant chunks are selected. If retrieval fails, a clear fallback message
is emitted and full diagnostic context is used.
"""
console.print()
console.print(Rule("[bold cyan]AI Response[/bold cyan]", style="cyan"))
console.print()
ai = AIClient(ai_config)
system_prompt = build_system_prompt()
user_message = build_followup_message(issue, report, question, prior_questions)
runbook_chunks = _query_runbooks(runbook_store, question, ai, top_k=1)
past_sessions = _query_sessions(session_store, question, report.host, ai, top_k=2)
user_message: str
retrieved_names: list[str] = []
retrieved_scores: list[float] = []
retrieval_ms = 0.0
fallback_reason: str | None = None
if embedded_chunks is not None:
retrieval_start = perf_counter()
try:
q_embedding = ai.embed(question)
scored = retrieve_scored(q_embedding, embedded_chunks, top_k=3)
retrieval_ms = (perf_counter() - retrieval_start) * 1000.0
retrieved_names = [chunk.name for chunk, _score in scored]
retrieved_scores = [round(score, 4) for _chunk, score in scored]
user_message = build_message_with_chunks(
issue,
report.host,
[chunk for chunk, _score in scored],
question,
prior_questions,
runbook_chunks=runbook_chunks or None,
past_sessions=past_sessions or None,
)
if rag_debug:
pairs = ", ".join(
f"{name}={score:.3f}"
for name, score in zip(retrieved_names, retrieved_scores, strict=False)
)
console.print(f"[dim]RAG retrieve:[/dim] {pairs or 'no matches'}")
except Exception as exc: # noqa: BLE001
retrieval_ms = (perf_counter() - retrieval_start) * 1000.0
fallback_reason = str(exc)
console.print(
"[yellow]RAG unavailable (query embedding failed); using full-context "
"fallback.[/yellow]"
)
user_message = build_followup_message(
issue, report, question, prior_questions,
runbook_chunks=runbook_chunks or None,
past_sessions=past_sessions or None,
)
else:
fallback_reason = "rag not indexed"
user_message = build_followup_message(
issue, report, question, prior_questions,
runbook_chunks=runbook_chunks or None,
past_sessions=past_sessions or None,
)
if logger is not None:
logger.log_event(
"rag_query",
{
"question": question,
"retrieved_chunk_names": retrieved_names,
"scores": retrieved_scores,
"retrieval_ms": round(retrieval_ms, 2),
"top_score": retrieved_scores[0] if retrieved_scores else None,
"used_fallback": fallback_reason is not None,
"fallback_reason": fallback_reason,
"estimated_prompt_tokens": _estimate_tokens(system_prompt + user_message),
},
)
try:
chunks: list[str] = []
for chunk in ai.stream(system_prompt, user_message):
chunks.append(chunk)
response = "".join(chunks)
response = _complete_ai_response(
ai,
system_prompt,
user_message,
)
console.print(Markdown(response))
console.print(Rule(style="dim"))
warnings = validate_ai_response(response)
for item in warnings:
console.print(f"[yellow]Guardrail warning:[/yellow] {item}")
warn_text = Text()
warn_text.append("⚠ Guardrail: ", style="bold yellow")
warn_text.append(item, style="yellow")
console.print(warn_text)
if logger is not None:
logger.log_event(
@@ -423,6 +861,186 @@ def _run_followup_analysis(
raise typer.Exit(code=1) from exc
def _complete_ai_response(
ai: AIClient,
system_prompt: str,
user_message: str,
) -> str:
"""Return a full AI completion in one request.
Some local backends intermittently stall on streaming before yielding a first
token; using a non-streaming completion path is more reliable for CLI runs.
"""
return ai.complete(system_prompt, user_message).content
def _query_runbooks(
store: RunbookStore | None,
question: str,
ai: AIClient,
*,
top_k: int = 3,
) -> list[RunbookChunk]:
"""Query the runbook store silently; returns empty list on any failure."""
if store is None:
return []
try:
return store.query(question, ai, top_k=top_k)
except Exception: # noqa: BLE001
return []
def _query_sessions(
store: SessionStore | None,
question: str,
host: str,
ai: AIClient,
*,
top_k: int = 2,
) -> list[PastSession]:
"""Query the session memory store silently; returns empty list on failures."""
if store is None:
return []
try:
return store.query(question, host, ai, top_k=top_k)
except Exception: # noqa: BLE001
return []
def _index_session_memory(
store: SessionStore,
ai_config: AIConfig,
req: TroubleshootRequest,
summary: str,
*,
logger: SessionLogger | None,
) -> None:
"""Persist final session summary for future retrieval; non-fatal on failure."""
try:
ai = AIClient(ai_config)
session_id = store.index_session(req.host, req.issue, summary, ai)
if logger is not None:
logger.log_event("session_memory_indexed", {"session_id": session_id})
except Exception as exc: # noqa: BLE001
if logger is not None:
logger.log_event("session_memory_error", {"error": str(exc)})
# ---------------------------------------------------------------------------
# runbooks sub-app
# ---------------------------------------------------------------------------
@runbooks_app.command("sync")
def runbooks_sync(
path: Annotated[
str,
typer.Option("--path", help="Directory containing runbook Markdown files."),
] = "./runbooks",
store_path: Annotated[
str,
typer.Option("--store", help="ChromaDB store path. Defaults to ~/.tai/runbooks."),
] = "~/.tai/runbooks",
ai_host: Annotated[
str,
typer.Option("--ai-host", help="OpenAI-compatible AI backend URL."),
] = DEFAULT_AI_HOST,
embed_model: Annotated[
str,
typer.Option("--embed-model", help="Embedding model name."),
] = DEFAULT_EMBED_MODEL,
ai_key: Annotated[
str,
typer.Option("--ai-key", help="API key for the AI backend."),
] = "ollama",
) -> None:
"""Embed and index all runbooks from PATH into the persistent store."""
from pathlib import Path
runbooks_dir = Path(path).expanduser().resolve()
if not runbooks_dir.is_dir():
console.print(f"[red]Directory not found:[/red] {runbooks_dir}")
raise typer.Exit(code=1)
ai_config = AIConfig(host=ai_host, model="", api_key=ai_key, embed_model=embed_model)
ai = AIClient(ai_config)
try:
store = RunbookStore(store_path)
count = store.sync(runbooks_dir, ai)
console.print(f"[green]✓ Synced {count} runbook(s)[/green] → {store_path}")
except Exception as exc: # noqa: BLE001
console.print(f"[red]Sync failed:[/red] {exc}")
raise typer.Exit(code=1) from exc
@runbooks_app.command("list")
def runbooks_list(
store_path: Annotated[
str,
typer.Option("--store", help="ChromaDB store path. Defaults to ~/.tai/runbooks."),
] = "~/.tai/runbooks",
) -> None:
"""List all indexed runbooks and their metadata."""
try:
store = RunbookStore(store_path)
entries = store.list_indexed()
except Exception as exc: # noqa: BLE001
console.print(f"[red]Could not open store:[/red] {exc}")
raise typer.Exit(code=1) from exc
if not entries:
console.print("[yellow]No runbooks indexed.[/yellow] Run [bold]tai runbooks sync[/bold].")
return
console.print(f"[bold]{len(entries)} indexed runbook(s):[/bold]")
for entry in sorted(entries, key=lambda e: e.get("title", "")):
title = entry.get("title", "?")
service = entry.get("service", "")
tags = entry.get("tags", "")
console.print(f" [green]{title}[/green] service={service} tags={tags}")
@runbooks_app.command("add")
def runbooks_add(
file: Annotated[str, typer.Argument(help="Path to a single runbook Markdown file.")],
store_path: Annotated[
str,
typer.Option("--store", help="ChromaDB store path. Defaults to ~/.tai/runbooks."),
] = "~/.tai/runbooks",
ai_host: Annotated[
str,
typer.Option("--ai-host", help="OpenAI-compatible AI backend URL."),
] = DEFAULT_AI_HOST,
embed_model: Annotated[
str,
typer.Option("--embed-model", help="Embedding model name."),
] = DEFAULT_EMBED_MODEL,
ai_key: Annotated[
str,
typer.Option("--ai-key", help="API key for the AI backend."),
] = "ollama",
) -> None:
"""Embed and index a single runbook file into the persistent store."""
from pathlib import Path
runbook_path = Path(file).expanduser().resolve()
if not runbook_path.is_file():
console.print(f"[red]File not found:[/red] {runbook_path}")
raise typer.Exit(code=1)
ai_config = AIConfig(host=ai_host, model="", api_key=ai_key, embed_model=embed_model)
ai = AIClient(ai_config)
try:
store = RunbookStore(store_path)
store.sync_single(runbook_path, ai)
console.print(f"[green]✓ Indexed[/green] {runbook_path.name}{store_path}")
except Exception as exc: # noqa: BLE001
console.print(f"[red]Add failed:[/red] {exc}")
raise typer.Exit(code=1) from exc
def main() -> None:
"""Console script entrypoint."""
app()

View File

@@ -91,6 +91,7 @@ _KNOWN_SERVICES: list[str] = [
"docker",
"containerd",
"kubelet",
"sssd",
"sshd",
"postfix",
"dovecot",
@@ -107,6 +108,11 @@ _KNOWN_SERVICES: list[str] = [
"crond",
"rsyslog",
"auditd",
"selinux",
"apparmor",
"xorg",
"wayland",
"x2go",
"firewalld",
"haproxy",
"varnish",
@@ -121,6 +127,7 @@ _SERVICE_CONFIGS: dict[str, list[str]] = {
"mysqld": ["/etc/my.cnf"],
"mariadb": ["/etc/mysql/mariadb.conf.d/50-server.cnf"],
"postgresql": ["/etc/postgresql"],
"sssd": ["/etc/sssd/sssd.conf"],
"sshd": ["/etc/ssh/sshd_config"],
"postfix": ["/etc/postfix/main.cf"],
"haproxy": ["/etc/haproxy/haproxy.cfg"],
@@ -128,8 +135,51 @@ _SERVICE_CONFIGS: dict[str, list[str]] = {
"redis-server": ["/etc/redis/redis.conf"],
"fail2ban": ["/etc/fail2ban/jail.conf"],
"ufw": ["/etc/ufw/ufw.conf"],
"x2go": ["/etc/x2go"],
}
_SERVICE_BINARIES: dict[str, list[str]] = {
"docker": ["/usr/bin/docker", "/usr/bin/dockerd"],
"sssd": ["/usr/sbin/sssd", "/usr/bin/sssctl"],
"sshd": ["/usr/sbin/sshd", "/usr/bin/ssh"],
"x2go": ["/usr/bin/x2golistsessions", "/usr/bin/x2goruncommand"],
"xorg": ["/usr/bin/Xorg", "/usr/bin/xrandr"],
"wayland": ["/usr/bin/wayland-info", "/usr/bin/Xwayland"],
"selinux": ["/usr/sbin/getenforce", "/usr/sbin/sestatus"],
"apparmor": ["/usr/sbin/aa-status", "/sbin/apparmor_parser"],
}
_SERVICE_PACKAGES: dict[str, list[str]] = {
"docker": ["docker", "docker-ce"],
"sssd": ["sssd"],
"sshd": ["openssh-server", "openssh"],
"x2go": ["x2goserver", "x2goserver-xsession"],
"xorg": ["xorg-server", "xserver-xorg-core"],
"wayland": ["wayland", "xwayland"],
"selinux": ["selinux-policy", "selinux-policy-targeted"],
"apparmor": ["apparmor"],
}
_GENERIC_SERVICE_STOPWORDS: frozenset[str] = frozenset(
{
"a",
"an",
"and",
"app",
"application",
"daemon",
"for",
"is",
"my",
"not",
"service",
"systemd",
"the",
"unit",
"working",
}
)
# ---------------------------------------------------------------------------
# Command sets
# ---------------------------------------------------------------------------
@@ -200,6 +250,15 @@ def plan_from_request(request: TroubleshootRequest) -> CollectionPlan:
if svc in seen:
continue
seen.add(svc)
plan.add(
f"unit-file-{svc}",
f"systemctl list-unit-files {svc}.service --no-pager --no-legend",
)
for idx, binary_path in enumerate(_SERVICE_BINARIES.get(svc, []), start=1):
plan.add(f"binary-{svc}-{idx}", f"ls -l {binary_path}")
for idx, package_name in enumerate(_service_package_candidates(svc), start=1):
plan.add(f"package-rpm-{svc}-{idx}", f"rpm -q {package_name}")
plan.add(f"package-dpkg-{svc}-{idx}", f"dpkg-query -W {package_name}")
plan.add(f"service-{svc}", f"systemctl status {svc}")
plan.add(f"journal-{svc}", f"journalctl -u {svc} -n 100 --no-pager")
for cfg_path in _SERVICE_CONFIGS.get(svc, []):
@@ -233,7 +292,11 @@ def _issue_words(issue: str) -> set[str]:
def _extract_services(issue: str) -> list[str]:
"""Return known service names mentioned in *issue*."""
"""Return service candidates mentioned in *issue*.
Includes known services plus generic service-like tokens near words such
as "service", "daemon", and "unit".
"""
words = _issue_words(issue)
found: list[str] = []
for svc in _KNOWN_SERVICES:
@@ -241,4 +304,58 @@ def _extract_services(issue: str) -> list[str]:
svc_words = {svc, svc.rstrip("d"), svc.replace("-", ""), svc.replace("-server", "")}
if words & svc_words:
found.append(svc)
for svc in _extract_generic_service_candidates(issue):
if svc not in found:
found.append(svc)
return found
def _extract_generic_service_candidates(issue: str) -> list[str]:
"""Extract likely service names from free text even when not pre-registered."""
tokens = [tok.lower() for tok in re.findall(r"[a-zA-Z0-9_.@-]+", issue)]
if not tokens:
return []
candidates: list[str] = []
for idx, token in enumerate(tokens):
normalized = token[:-8] if token.endswith(".service") else token
if _is_safe_service_name(normalized):
if token.endswith(".service") and normalized not in _GENERIC_SERVICE_STOPWORDS:
candidates.append(normalized)
if token in {"service", "daemon", "unit"}:
for neighbor in (idx - 1, idx + 1):
if neighbor < 0 or neighbor >= len(tokens):
continue
candidate = tokens[neighbor]
if candidate.endswith(".service"):
candidate = candidate[:-8]
if candidate in _GENERIC_SERVICE_STOPWORDS:
continue
if _is_safe_service_name(candidate):
candidates.append(candidate)
deduped: list[str] = []
seen: set[str] = set()
for item in candidates:
if item in seen:
continue
seen.add(item)
deduped.append(item)
return deduped
def _is_safe_service_name(name: str) -> bool:
"""Return True when *name* is safe to interpolate into read-only commands."""
if len(name) < 2 or len(name) > 64:
return False
return re.fullmatch(r"[a-z0-9_.@-]+", name) is not None
def _service_package_candidates(service: str) -> list[str]:
"""Return package names to probe for *service* presence."""
if service in _SERVICE_PACKAGES:
return _SERVICE_PACKAGES[service]
return [service]

View File

@@ -3,6 +3,9 @@
from __future__ import annotations
from tai.collectors import CollectionReport
from tai.rag_retriever import Chunk
from tai.runbook_store import RunbookChunk
from tai.session_store import PastSession
_SYSTEM_PROMPT = """\
You are an expert Linux systems administrator and troubleshooting assistant.
@@ -18,6 +21,9 @@ Important rules:
- For every root-cause claim, quote at least one exact snippet from collected output in backticks.
- If a command shows "could not be executed (SSH error)" it means the remote host blocked or
rejected that specific command — it is not evidence about the service or system state.
- If service presence checks show a unit, binary, package, or config is missing, treat that as
evidence the component may be absent or not installed, not as proof that the
component is broken.
- If there is not enough data to diagnose the issue, say so plainly and list exactly what
additional commands or log files would be needed.
- Keep the response short. Skip sections that have nothing useful to say.
@@ -27,18 +33,83 @@ Important rules:
- Format with clear sections: **Root Cause**, **Evidence**, **Recommended Actions**.
"""
_MAX_RUNBOOK_CHARS = 500
_MAX_DIAGNOSTIC_CHUNK_CHARS = 700
_MAX_SESSION_SUMMARY_CHARS = 500
def build_system_prompt() -> str:
"""Return the static system prompt for the troubleshooting agent."""
return _SYSTEM_PROMPT.strip()
def build_user_message(issue: str, report: CollectionReport) -> str:
def _format_runbook_context(runbook_chunks: list[RunbookChunk]) -> str:
"""Format retrieved runbook chunks as a Markdown context section."""
lines: list[str] = ["## Runbook context\n"]
lines.append(
"The following runbooks are relevant to this issue. "
"Use them to ground your diagnosis and recommendations in known procedures.\n"
)
for rb in runbook_chunks:
tag_str = f" — tags: {', '.join(rb.tags)}" if rb.tags else ""
content = rb.content.strip()
if len(content) > _MAX_RUNBOOK_CHARS:
content = content[:_MAX_RUNBOOK_CHARS].rstrip() + "\n...[truncated runbook context]"
lines.append(f"### Runbook: {rb.title} ({rb.service}){tag_str}\n")
lines.append(content)
lines.append("")
return "\n".join(lines)
def _format_diagnostic_chunk(content: str) -> str:
"""Cap diagnostic chunk size before prompt injection."""
text = content.strip()
if len(text) <= _MAX_DIAGNOSTIC_CHUNK_CHARS:
return text
return text[:_MAX_DIAGNOSTIC_CHUNK_CHARS].rstrip() + "\n...[truncated diagnostic context]"
def _format_session_context(past_sessions: list[PastSession]) -> str:
"""Format similar prior sessions as compact grounding context."""
lines: list[str] = ["## Similar prior sessions\n"]
lines.append(
"The following completed sessions were semantically similar. "
"Use them as historical hints, but prioritize current diagnostics if they conflict.\n"
)
for sess in past_sessions:
summary = sess.summary.strip()
if len(summary) > _MAX_SESSION_SUMMARY_CHARS:
summary = (
summary[:_MAX_SESSION_SUMMARY_CHARS].rstrip()
+ "\n...[truncated session summary]"
)
lines.append(f"### Session: {sess.session_id} (host={sess.host})\n")
lines.append(f"**Issue:** {sess.issue}")
lines.append("")
lines.append(summary)
lines.append("")
return "\n".join(lines)
def build_user_message(
issue: str,
report: CollectionReport,
*,
runbook_chunks: list[RunbookChunk] | None = None,
past_sessions: list[PastSession] | None = None,
) -> str:
"""Format *issue* and *report* into the user message sent to the AI."""
lines: list[str] = []
lines.append(f"## Issue reported\n\n{issue}\n")
lines.append(f"## Target host\n\n{report.host}\n")
if runbook_chunks:
lines.append(_format_runbook_context(runbook_chunks))
if past_sessions:
lines.append(_format_session_context(past_sessions))
lines.append("## Collected diagnostics\n")
skipped: list[str] = []
@@ -82,9 +153,17 @@ def build_followup_message(
report: CollectionReport,
question: str,
prior_questions: list[str],
*,
runbook_chunks: list[RunbookChunk] | None = None,
past_sessions: list[PastSession] | None = None,
) -> str:
"""Build a grounded follow-up message that re-anchors to diagnostics each turn."""
base = build_user_message(issue, report)
base = build_user_message(
issue,
report,
runbook_chunks=runbook_chunks,
past_sessions=past_sessions,
)
lines: list[str] = [base, "## Follow-up"]
if prior_questions:
@@ -98,4 +177,93 @@ def build_followup_message(
"\nAnswer strictly from the collected diagnostics above. "
"If evidence is insufficient, explicitly say so."
)
lines.append(
"Keep hypothesis continuity across turns: retain the previous leading "
"hypothesis unless newly retrieved evidence directly contradicts it."
)
return "\n".join(lines)
def build_message_with_chunks(
issue: str,
host: str,
chunks: list[Chunk],
question: str,
prior_questions: list[str],
*,
runbook_chunks: list[RunbookChunk] | None = None,
past_sessions: list[PastSession] | None = None,
) -> str:
"""Build a follow-up message using only semantically retrieved diagnostic chunks.
Used by the RAG path: instead of sending the full report, only the top-k
most relevant chunks are included, reducing token usage and focusing the AI.
If *runbook_chunks* are provided they are injected as a separate context
section before the follow-up question.
"""
lines: list[str] = []
lines.append(f"## Issue reported\n\n{issue}\n")
lines.append(f"## Target host\n\n{host}\n")
lines.append("## Most relevant diagnostics (retrieved by semantic similarity)\n")
for chunk in chunks:
lines.append(f"### {chunk.name}\n")
lines.append(_format_diagnostic_chunk(chunk.content))
lines.append("")
if runbook_chunks:
lines.append(_format_runbook_context(runbook_chunks))
if past_sessions:
lines.append(_format_session_context(past_sessions))
lines.append("## Follow-up")
if prior_questions:
lines.append("\nRecent user follow-up questions:")
for idx, q in enumerate(prior_questions[-5:], start=1):
lines.append(f"{idx}. {q}")
lines.append("\nCurrent follow-up question:")
lines.append(question)
lines.append(
"\nAnswer strictly from the retrieved diagnostics above. "
"If evidence is insufficient, explicitly say so."
)
lines.append(
"Keep hypothesis continuity across turns: retain the previous leading "
"hypothesis unless newly retrieved evidence directly contradicts it."
)
return "\n".join(lines)
def build_analysis_message_with_chunks(
issue: str,
host: str,
chunks: list[Chunk],
*,
runbook_chunks: list[RunbookChunk] | None = None,
past_sessions: list[PastSession] | None = None,
) -> str:
"""Build an initial analysis message from retrieved diagnostic chunks."""
lines: list[str] = []
lines.append(f"## Issue reported\n\n{issue}\n")
lines.append(f"## Target host\n\n{host}\n")
if runbook_chunks:
lines.append(_format_runbook_context(runbook_chunks))
if past_sessions:
lines.append(_format_session_context(past_sessions))
lines.append("## Most relevant diagnostics (retrieved by semantic similarity)\n")
for chunk in chunks:
lines.append(f"### {chunk.name}\n")
lines.append(_format_diagnostic_chunk(chunk.content))
lines.append("")
lines.append(
"Use the diagnostics above to provide an initial analysis. "
"If evidence is insufficient, state exactly what is missing."
)
return "\n".join(lines)

115
src/tai/rag_retriever.py Normal file
View File

@@ -0,0 +1,115 @@
"""In-memory RAG retriever for diagnostic report chunks (Tier 1).
Chunks one CollectionReport item per Chunk, embeds via AIClient, then
ranks chunks against a question using pure-Python cosine similarity.
No external vector store required — everything lives in process memory.
"""
from __future__ import annotations
import math
from dataclasses import dataclass
from tai.collectors import CollectionReport
DEFAULT_MAX_CHUNK_CHARS = 1800
@dataclass(slots=True)
class Chunk:
"""A single retrievable piece of diagnostic content."""
name: str
content: str
@dataclass(slots=True)
class EmbeddedChunk:
"""A Chunk paired with its embedding vector."""
chunk: Chunk
embedding: list[float]
def _normalize_text(text: str, *, max_chars: int) -> str:
"""Normalize whitespace and cap text length with a truncation marker."""
compact = text.strip()
if len(compact) <= max_chars:
return compact
clipped = compact[:max_chars].rstrip()
return f"{clipped}\n...[truncated for RAG]"
def chunk_report(
report: CollectionReport,
*,
max_chunk_chars: int = DEFAULT_MAX_CHUNK_CHARS,
) -> list[Chunk]:
"""Split a CollectionReport into one Chunk per diagnostic item.
Items that SSH could not execute at all (exit 255, no output) are dropped —
they carry no diagnostic signal. Chunk text is normalized and capped so the
prompt shape stays more stable on smaller local models.
"""
chunks: list[Chunk] = []
for item in report.items:
result = item.result
if result.exit_code == 255 and not result.stdout and not result.stderr:
continue
parts: list[str] = [
f"Command: {result.command}",
f"Exit code: {result.exit_code}",
]
if result.stdout:
parts.append(f"stdout:\n{_normalize_text(result.stdout, max_chars=max_chunk_chars)}")
if result.stderr:
parts.append(f"stderr:\n{_normalize_text(result.stderr, max_chars=max_chunk_chars)}")
if not result.stdout and not result.stderr:
parts.append("(no output)")
content = _normalize_text("\n".join(parts), max_chars=max_chunk_chars)
chunks.append(Chunk(name=item.name, content=content))
return chunks
def _cosine_similarity(a: list[float], b: list[float]) -> float:
"""Return cosine similarity in [-1, 1] using pure Python (no numpy)."""
dot = sum(x * y for x, y in zip(a, b, strict=False))
norm_a = math.sqrt(sum(x * x for x in a))
norm_b = math.sqrt(sum(x * x for x in b))
if norm_a == 0.0 or norm_b == 0.0:
return 0.0
return dot / (norm_a * norm_b)
def retrieve_scored(
question_embedding: list[float],
embedded_chunks: list[EmbeddedChunk],
*,
top_k: int = 5,
) -> list[tuple[Chunk, float]]:
"""Return top-k retrieved chunks with similarity scores."""
if not embedded_chunks or top_k <= 0:
return []
scored: list[tuple[float, Chunk]] = [
(_cosine_similarity(question_embedding, ec.embedding), ec.chunk)
for ec in embedded_chunks
]
scored.sort(key=lambda x: x[0], reverse=True)
return [(chunk, score) for score, chunk in scored[:top_k]]
def retrieve(
question_embedding: list[float],
embedded_chunks: list[EmbeddedChunk],
*,
top_k: int = 5,
) -> list[Chunk]:
"""Return the *top_k* chunks most similar to *question_embedding*."""
scored = retrieve_scored(
question_embedding,
embedded_chunks,
top_k=top_k,
)
return [chunk for chunk, _score in scored]

271
src/tai/runbook_store.py Normal file
View File

@@ -0,0 +1,271 @@
"""Persistent runbook knowledge base backed by ChromaDB (Tier 2).
Runbooks are Markdown files with YAML-style frontmatter describing a service,
its typical symptoms, and tags used for retrieval matching. The store embeds
each runbook via AIClient and persists the collection so that queries across
sessions are instant (no re-embedding on startup).
Typical flow
------------
1. User runs ``tai runbooks --sync ./runbooks`` once (or after adding files).
2. On each analysis turn, the store is queried with the user's question and the
top-k matching runbooks are injected as ``## Runbook Context`` in the prompt.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from tai.ai_client import AIClient
DEFAULT_STORE_PATH = "~/.tai/runbooks"
_COLLECTION_NAME = "tai_runbooks"
# ---------------------------------------------------------------------------
# Data types
# ---------------------------------------------------------------------------
@dataclass(slots=True)
class RunbookChunk:
"""A retrieved runbook document ready for prompt injection."""
title: str
service: str
tags: list[str]
content: str
@dataclass
class RunbookMeta:
"""Parsed frontmatter metadata from a runbook file."""
service: str = ""
symptoms: list[str] = field(default_factory=list)
tags: list[str] = field(default_factory=list)
# ---------------------------------------------------------------------------
# Frontmatter parsing
# ---------------------------------------------------------------------------
_FRONTMATTER_RE = re.compile(r"^\s*---\s*\n(.*?)\n---\s*\n", re.DOTALL)
_KV_RE = re.compile(r"^(\w+)\s*:\s*(.+)$")
def _parse_frontmatter(text: str) -> tuple[RunbookMeta, str]:
"""Split YAML-style frontmatter from Markdown body.
Supports simple ``key: value`` and ``key: item1, item2`` syntax only.
Returns ``(meta, body)``; if no frontmatter found, meta has empty fields.
"""
meta = RunbookMeta()
match = _FRONTMATTER_RE.match(text)
if not match:
return meta, text
for line in match.group(1).splitlines():
kv = _KV_RE.match(line.strip())
if not kv:
continue
key, value = kv.group(1).lower(), kv.group(2).strip()
if key == "service":
meta.service = value
elif key == "symptoms":
meta.symptoms = [s.strip() for s in value.split(",") if s.strip()]
elif key == "tags":
meta.tags = [t.strip() for t in value.split(",") if t.strip()]
body = text[match.end():]
return meta, body
# ---------------------------------------------------------------------------
# RunbookStore
# ---------------------------------------------------------------------------
class RunbookStore:
"""ChromaDB-backed store for runbook documents.
Parameters
----------
store_path:
Directory where ChromaDB persists its data.
Defaults to ``~/.tai/runbooks``.
"""
def __init__(self, store_path: str | Path = DEFAULT_STORE_PATH) -> None:
import chromadb # optional dep — imported lazily
path = Path(store_path).expanduser().resolve()
path.mkdir(parents=True, exist_ok=True)
settings = None
try:
from chromadb.config import Settings
settings = Settings(
anonymized_telemetry=False,
chroma_product_telemetry_impl="tai.chroma_telemetry.NoOpProductTelemetryClient",
chroma_telemetry_impl="tai.chroma_telemetry.NoOpProductTelemetryClient",
)
except (ImportError, ModuleNotFoundError):
# Test doubles may replace `chromadb` with a lightweight mock that
# does not expose the real config module.
settings = None
if settings is None:
self._client = chromadb.PersistentClient(path=str(path))
else:
self._client = chromadb.PersistentClient(path=str(path), settings=settings)
self._collection: Any = self._client.get_or_create_collection(
name=_COLLECTION_NAME,
metadata={"hnsw:space": "cosine"},
)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def sync(self, runbooks_dir: Path, ai: AIClient) -> int:
"""Embed and upsert all ``*.md`` files from *runbooks_dir*.
Existing documents with the same ID are updated if the file content
has changed. Returns the number of runbooks successfully indexed.
"""
runbooks_dir = Path(runbooks_dir).expanduser().resolve()
if not runbooks_dir.is_dir():
raise FileNotFoundError(f"Runbooks directory not found: {runbooks_dir}")
files = sorted(runbooks_dir.glob("*.md"))
if not files:
return 0
ids: list[str] = []
documents: list[str] = []
embeddings: list[list[float]] = []
metadatas: list[dict[str, str]] = []
for path in files:
raw = path.read_text(encoding="utf-8")
meta, body = _parse_frontmatter(raw)
# Embed the full document (frontmatter stripped) for semantic recall
embed_text = _build_embed_text(path.stem, meta, body)
embedding = ai.embed(embed_text)
ids.append(path.stem)
documents.append(body.strip())
embeddings.append(embedding)
metadatas.append(
{
"title": path.stem,
"service": meta.service,
"tags": ", ".join(meta.tags),
"symptoms": ", ".join(meta.symptoms),
}
)
self._collection.upsert(
ids=ids,
documents=documents,
embeddings=embeddings,
metadatas=metadatas,
)
return len(ids)
def sync_single(self, runbook_path: Path, ai: AIClient) -> None:
"""Embed and upsert a single runbook file."""
path = Path(runbook_path).expanduser().resolve()
if not path.is_file():
raise FileNotFoundError(f"Runbook not found: {path}")
raw = path.read_text(encoding="utf-8")
meta, body = _parse_frontmatter(raw)
embed_text = _build_embed_text(path.stem, meta, body)
embedding = ai.embed(embed_text)
self._collection.upsert(
ids=[path.stem],
documents=[body.strip()],
embeddings=[embedding],
metadatas=[
{
"title": path.stem,
"service": meta.service,
"tags": ", ".join(meta.tags),
"symptoms": ", ".join(meta.symptoms),
}
],
)
def query(self, question: str, ai: AIClient, *, top_k: int = 3) -> list[RunbookChunk]:
"""Return the *top_k* most relevant runbooks for *question*.
Returns an empty list if the collection is empty or if the AI backend
is unavailable — callers should handle an empty result gracefully.
"""
if self._collection.count() == 0:
return []
q_embedding = ai.embed(question)
results = self._collection.query(
query_embeddings=[q_embedding],
n_results=min(top_k, self._collection.count()),
include=["documents", "metadatas"],
)
chunks: list[RunbookChunk] = []
docs = results.get("documents") or []
metas = results.get("metadatas") or []
for doc_list, meta_list in zip(docs, metas, strict=False):
for doc, meta in zip(doc_list, meta_list, strict=False):
chunks.append(
RunbookChunk(
title=str(meta.get("title", "")),
service=str(meta.get("service", "")),
tags=[t.strip() for t in str(meta.get("tags", "")).split(",") if t.strip()],
content=doc,
)
)
return chunks
def list_indexed(self) -> list[dict[str, str]]:
"""Return metadata for all indexed runbooks."""
if self._collection.count() == 0:
return []
results = self._collection.get(include=["metadatas"])
metas = results.get("metadatas") or []
entries: list[dict[str, str]] = []
for meta in metas:
entries.append({str(k): str(v) for k, v in dict(meta).items()})
return entries
def count(self) -> int:
"""Return the number of indexed runbook documents."""
return int(self._collection.count())
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _build_embed_text(title: str, meta: RunbookMeta, body: str) -> str:
"""Build the text to embed for a runbook — combines signals for best recall."""
parts: list[str] = [f"title: {title}"]
if meta.service:
parts.append(f"service: {meta.service}")
if meta.symptoms:
parts.append(f"symptoms: {', '.join(meta.symptoms)}")
if meta.tags:
parts.append(f"tags: {', '.join(meta.tags)}")
# Prepend a stripped excerpt of the body for additional signal
body_excerpt = body.strip()[:800]
parts.append(body_excerpt)
return "\n".join(parts)

105
src/tai/session_store.py Normal file
View File

@@ -0,0 +1,105 @@
"""Persistent session memory store (Tier 4) backed by ChromaDB."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from tai.ai_client import AIClient
DEFAULT_SESSION_STORE_PATH = "~/.tai/sessions"
_COLLECTION_NAME = "tai_sessions"
@dataclass(slots=True)
class PastSession:
"""A retrieved prior session summary for prompt grounding."""
session_id: str
host: str
issue: str
summary: str
class SessionStore:
"""ChromaDB-backed persistent memory for prior troubleshooting sessions."""
def __init__(self, store_path: str | Path = DEFAULT_SESSION_STORE_PATH) -> None:
import chromadb
path = Path(store_path).expanduser().resolve()
path.mkdir(parents=True, exist_ok=True)
settings = None
try:
from chromadb.config import Settings
settings = Settings(
anonymized_telemetry=False,
chroma_product_telemetry_impl="tai.chroma_telemetry.NoOpProductTelemetryClient",
chroma_telemetry_impl="tai.chroma_telemetry.NoOpProductTelemetryClient",
)
except (ImportError, ModuleNotFoundError):
settings = None
if settings is None:
self._client = chromadb.PersistentClient(path=str(path))
else:
self._client = chromadb.PersistentClient(path=str(path), settings=settings)
self._collection: Any = self._client.get_or_create_collection(
name=_COLLECTION_NAME,
metadata={"hnsw:space": "cosine"},
)
def count(self) -> int:
"""Return number of indexed session summaries."""
return int(self._collection.count())
def index_session(self, host: str, issue: str, summary: str, ai: AIClient) -> str:
"""Embed and upsert one session summary into persistent storage."""
session_id = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ")
embed_text = _build_embed_text(host=host, issue=issue, summary=summary)
embedding = ai.embed(embed_text)
self._collection.upsert(
ids=[session_id],
documents=[summary.strip()],
embeddings=[embedding],
metadatas=[{"host": host, "issue": issue}],
)
return session_id
def query(self, question: str, host: str, ai: AIClient, *, top_k: int = 2) -> list[PastSession]:
"""Return top-k semantically similar sessions for this host and question."""
if self._collection.count() == 0:
return []
q_embedding = ai.embed(f"host: {host}\nquestion: {question}")
results = self._collection.query(
query_embeddings=[q_embedding],
n_results=min(top_k, self._collection.count()),
include=["documents", "metadatas"],
)
sessions: list[PastSession] = []
ids = results.get("ids") or []
docs = results.get("documents") or []
metas = results.get("metadatas") or []
for id_list, doc_list, meta_list in zip(ids, docs, metas, strict=False):
for sid, doc, meta in zip(id_list, doc_list, meta_list, strict=False):
sessions.append(
PastSession(
session_id=str(sid),
host=str(meta.get("host", "")),
issue=str(meta.get("issue", "")),
summary=str(doc),
)
)
return sessions
def _build_embed_text(*, host: str, issue: str, summary: str) -> str:
"""Build embedding text with host/issue context and summary excerpt."""
excerpt = summary.strip()[:1000]
return f"host: {host}\nissue: {issue}\nsummary:\n{excerpt}"

View File

@@ -51,6 +51,7 @@ class SSHClient:
_READ_ONLY_COMMANDS = {
"cat",
"dmesg",
"dpkg-query",
"df",
"du",
"find",
@@ -61,6 +62,7 @@ class SSHClient:
"journalctl",
"ls",
"netstat",
"rpm",
"sed",
"ss",
"stat",

View File

@@ -5,6 +5,7 @@ from unittest.mock import MagicMock, patch
from tai.ai_client import DEFAULT_AI_HOST, DEFAULT_MODEL, AIClient, AIConfig
from tai.collectors import CollectedItem, CollectionReport
from tai.prompt_builder import build_followup_message, build_system_prompt, build_user_message
from tai.session_store import PastSession
from tai.ssh_client import SSHCommandResult
# ---------------------------------------------------------------------------
@@ -174,6 +175,7 @@ def test_build_system_prompt_contains_key_instructions() -> None:
assert "Evidence" in prompt
assert "Recommended Actions" in prompt
assert "read-only" in prompt.lower()
assert "absent or not installed" in prompt
def test_build_user_message_contains_issue_and_host() -> None:
@@ -220,6 +222,24 @@ def test_build_user_message_handles_no_output() -> None:
assert "no output" in msg
def test_build_user_message_includes_prior_session_context() -> None:
report = _make_report([("kernel", "uname -a", 0, "Linux web01", "")])
msg = build_user_message(
"sssd broken",
report,
past_sessions=[
PastSession(
session_id="20260506T120000Z",
host="web01",
issue="sssd broken",
summary="Root cause was missing sssd package.",
)
],
)
assert "Similar prior sessions" in msg
assert "missing sssd package" in msg
def test_build_followup_message_includes_question_context() -> None:
report = _make_report([("kernel", "uname -a", 0, "Linux web01", "")])
msg = build_followup_message(

View File

@@ -1,9 +1,11 @@
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
from typer.testing import CliRunner
from tai.cli import app
from tai.collectors import CollectedItem, CollectionReport
from tai.rag_retriever import Chunk, EmbeddedChunk
from tai.ssh_client import SSHCommandResult
@@ -30,7 +32,7 @@ def test_run_command_prints_scaffold_summary() -> None:
result = runner.invoke(
app,
[
"apache failed",
"run", "apache failed",
"--host",
"web01",
"--port",
@@ -61,7 +63,7 @@ def test_probe_success_prints_remote_output_by_default(monkeypatch) -> None: #
runner = CliRunner()
result = runner.invoke(
app,
["apache failed", "--host", "ssh.archflux.net", "--port", "5566", "--probe"],
["run", "apache failed", "--host", "ssh.archflux.net", "--port", "5566", "--probe"],
)
assert result.exit_code == 0
@@ -83,7 +85,7 @@ def test_probe_failure_returns_non_zero(monkeypatch) -> None: # type: ignore[no
runner = CliRunner()
result = runner.invoke(
app,
["apache failed", "--host", "ssh.archflux.net", "--port", "5566", "--probe"],
["run", "apache failed", "--host", "ssh.archflux.net", "--port", "5566", "--probe"],
)
assert result.exit_code == 1
@@ -125,7 +127,7 @@ def test_collect_success_prints_summary(monkeypatch) -> None: # type: ignore[no
result = runner.invoke(
app,
[
"apache failed",
"run", "apache failed",
"--host",
"ssh.archflux.net",
"--port",
@@ -137,8 +139,9 @@ def test_collect_success_prints_summary(monkeypatch) -> None: # type: ignore[no
assert result.exit_code == 0
assert "Collection complete" in result.stdout
assert "kernel: ok" in result.stdout
assert "journal: ok (truncated)" in result.stdout
assert "kernel" in result.stdout
assert "journal" in result.stdout
assert "truncated" in result.stdout
def test_interactive_collect_then_quit(monkeypatch) -> None: # type: ignore[no-untyped-def]
@@ -163,13 +166,14 @@ def test_interactive_collect_then_quit(monkeypatch) -> None: # type: ignore[no-
commands = iter(["/collect", "/quit"])
monkeypatch.setattr("tai.cli.collect_from_plan", fake_collect_from_plan)
monkeypatch.setattr("builtins.input", lambda _prompt: next(commands))
monkeypatch.setattr("tai.cli.console.input", lambda _prompt: next(commands))
monkeypatch.setattr("tai.cli._stdin_is_tty", lambda: True)
runner = CliRunner()
result = runner.invoke(
app,
[
"apache failed",
"run", "apache failed",
"--host",
"ssh.archflux.net",
"--port",
@@ -180,8 +184,8 @@ def test_interactive_collect_then_quit(monkeypatch) -> None: # type: ignore[no-
)
assert result.exit_code == 0
assert "Interactive mode" in result.stdout
assert "Collection complete" in result.stdout
assert "ask questions directly" in result.stdout.lower()
assert "collection complete" in result.stdout.lower()
assert "Bye." in result.stdout
@@ -207,16 +211,17 @@ def test_interactive_unknown_command_prints_hint(monkeypatch) -> None: # type:
commands = iter(["what should I check next?", "/quit"])
monkeypatch.setattr("tai.cli.collect_from_plan", fake_collect_from_plan)
monkeypatch.setattr(
"tai.cli.AIClient.stream",
lambda *_args, **_kwargs: iter(["Check logs."]),
"tai.cli.AIClient.complete",
lambda *_args, **_kwargs: SimpleNamespace(content="Check logs."),
)
monkeypatch.setattr("builtins.input", lambda _prompt: next(commands))
monkeypatch.setattr("tai.cli.console.input", lambda _prompt: next(commands))
monkeypatch.setattr("tai.cli._stdin_is_tty", lambda: True)
runner = CliRunner()
result = runner.invoke(
app,
[
"apache failed",
"run", "apache failed",
"--host",
"ssh.archflux.net",
"--port",
@@ -227,5 +232,109 @@ def test_interactive_unknown_command_prints_hint(monkeypatch) -> None: # type:
)
assert result.exit_code == 0
assert "Analyzing..." in result.stdout
assert "AI Response" in result.stdout
assert "Check logs." in result.stdout
def test_interactive_prints_rag_fallback_notice_on_index_failure(monkeypatch) -> None: # type: ignore[no-untyped-def]
_mock_session(monkeypatch)
async def fake_collect_from_plan(_session, _plan) -> CollectionReport: # type: ignore[no-untyped-def]
return CollectionReport(
host="ssh.archflux.net",
items=[
CollectedItem(
name="kernel",
result=SSHCommandResult(
command="uname -a",
exit_code=0,
stdout="Linux test",
stderr="",
),
),
],
)
commands = iter(["what should I check next?", "/quit"])
monkeypatch.setattr("tai.cli.collect_from_plan", fake_collect_from_plan)
monkeypatch.setattr("tai.cli._try_embed_report", lambda *_args: (None, "embed failed", 1.0))
monkeypatch.setattr(
"tai.cli.AIClient.complete",
lambda *_args, **_kwargs: SimpleNamespace(content="Check logs."),
)
monkeypatch.setattr("tai.cli.console.input", lambda _prompt: next(commands))
monkeypatch.setattr("tai.cli._stdin_is_tty", lambda: True)
runner = CliRunner()
result = runner.invoke(
app,
[
"run", "apache failed",
"--host",
"ssh.archflux.net",
"--port",
"5566",
"--no-probe",
"--interactive",
],
)
assert result.exit_code == 0
assert "RAG unavailable (indexing failed)" in result.stdout
assert "AI Response" in result.stdout
def test_interactive_rag_debug_prints_retrieval_scores(monkeypatch) -> None: # type: ignore[no-untyped-def]
_mock_session(monkeypatch)
async def fake_collect_from_plan(_session, _plan) -> CollectionReport: # type: ignore[no-untyped-def]
return CollectionReport(
host="ssh.archflux.net",
items=[
CollectedItem(
name="kernel",
result=SSHCommandResult(
command="uname -a",
exit_code=0,
stdout="Linux test",
stderr="",
),
),
],
)
commands = iter(["what should I check next?", "/quit"])
monkeypatch.setattr("tai.cli.collect_from_plan", fake_collect_from_plan)
monkeypatch.setattr(
"tai.cli._try_embed_report",
lambda *_args: (
[EmbeddedChunk(chunk=Chunk(name="kernel", content="content"), embedding=[1.0, 0.0])],
None,
1.0,
),
)
monkeypatch.setattr("tai.cli.AIClient.embed", lambda *_args, **_kwargs: [1.0, 0.0])
monkeypatch.setattr(
"tai.cli.AIClient.complete",
lambda *_args, **_kwargs: SimpleNamespace(content="Check logs."),
)
monkeypatch.setattr("tai.cli.console.input", lambda _prompt: next(commands))
monkeypatch.setattr("tai.cli._stdin_is_tty", lambda: True)
runner = CliRunner()
result = runner.invoke(
app,
[
"run", "apache failed",
"--host",
"ssh.archflux.net",
"--port",
"5566",
"--no-probe",
"--interactive",
"--rag-debug",
],
)
assert result.exit_code == 0
assert "RAG retrieve:" in result.stdout

View File

@@ -80,6 +80,7 @@ def test_nginx_in_issue_adds_nginx_service_commands() -> None:
plan = plan_from_request(_req("nginx is failing to start"))
names = _names(plan)
cmds = _commands(plan)
assert "unit-file-nginx" in names
assert "service-nginx" in names
assert "journal-nginx" in names
assert any("systemctl status nginx" in c for c in cmds)
@@ -98,6 +99,38 @@ def test_sshd_adds_config_cat() -> None:
assert any("cat /etc/ssh/sshd_config" in c for c in cmds)
def test_sssd_in_issue_adds_presence_service_and_config_commands() -> None:
plan = plan_from_request(_req("troubleshoot sssd login failures"))
names = _names(plan)
cmds = _commands(plan)
assert "unit-file-sssd" in names
assert "binary-sssd-1" in names
assert "service-sssd" in names
assert "journal-sssd" in names
assert "package-rpm-sssd-1" in names
assert "package-dpkg-sssd-1" in names
assert any("cat /etc/sssd/sssd.conf" in c for c in cmds)
assert any("ls -l /usr/sbin/sssd" in c for c in cmds)
assert any("rpm -q sssd" in c for c in cmds)
assert any("dpkg-query -W sssd" in c for c in cmds)
assert any("list-unit-files sssd.service" in c for c in cmds)
def test_docker_presence_probe_checks_package_and_binary() -> None:
plan = plan_from_request(_req("docker daemon not running"))
names = _names(plan)
cmds = _commands(plan)
assert "unit-file-docker" in names
assert "binary-docker-1" in names
assert "binary-docker-2" in names
assert "package-rpm-docker-1" in names
assert "package-dpkg-docker-1" in names
assert any("ls -l /usr/bin/docker" in c for c in cmds)
assert any("ls -l /usr/bin/dockerd" in c for c in cmds)
assert any("rpm -q docker" in c for c in cmds)
assert any("dpkg-query -W docker" in c for c in cmds)
def test_unknown_service_name_no_config_cat() -> None:
plan = plan_from_request(_req("myweirdapp service crashed"))
cmds = _commands(plan)
@@ -158,6 +191,16 @@ def test_extract_services_case_insensitive() -> None:
assert "nginx" in _extract_services("NGINX failed")
def test_extract_services_detects_generic_service_name() -> None:
services = _extract_services("myweirdapp service keeps failing")
assert "myweirdapp" in services
def test_extract_services_detects_dot_service_pattern() -> None:
services = _extract_services("please check foobar.service on this host")
assert "foobar" in services
# ---------------------------------------------------------------------------
# Plan length sanity
# ---------------------------------------------------------------------------

198
tests/test_rag_retriever.py Normal file
View File

@@ -0,0 +1,198 @@
"""Tests for rag_retriever — pure-Python, no network calls."""
from __future__ import annotations
from tai.collectors import CollectedItem, CollectionReport
from tai.rag_retriever import (
Chunk,
EmbeddedChunk,
_cosine_similarity,
chunk_report,
retrieve,
retrieve_scored,
)
from tai.ssh_client import SSHCommandResult
def _report(*items: tuple[str, str, int]) -> CollectionReport:
"""Build a CollectionReport from (name, stdout, exit_code) tuples."""
return CollectionReport(
host="test-host",
items=[
CollectedItem(
name=name,
result=SSHCommandResult(
command=f"cmd-{name}",
exit_code=code,
stdout=stdout,
stderr="",
),
)
for name, stdout, code in items
],
)
# ---------------------------------------------------------------------------
# chunk_report
# ---------------------------------------------------------------------------
def test_chunk_report_creates_one_chunk_per_item() -> None:
report = _report(("kernel", "Linux test 6.1", 0), ("journal", "Started nginx.", 0))
chunks = chunk_report(report)
assert len(chunks) == 2
assert chunks[0].name == "kernel"
assert chunks[1].name == "journal"
def test_chunk_report_includes_stdout_in_content() -> None:
report = _report(("kernel", "Linux test 6.1", 0))
chunks = chunk_report(report)
assert "Linux test 6.1" in chunks[0].content
def test_chunk_report_includes_exit_code_in_content() -> None:
report = _report(("fail", "error output", 1))
chunks = chunk_report(report)
assert "Exit code: 1" in chunks[0].content
def test_chunk_report_skips_ssh_unreachable_items() -> None:
"""Items with exit 255 and no output represent SSH failures and are dropped."""
report = CollectionReport(
host="test-host",
items=[
CollectedItem(
name="unreachable",
result=SSHCommandResult(
command="some-cmd", exit_code=255, stdout="", stderr=""
),
),
CollectedItem(
name="ok",
result=SSHCommandResult(
command="uname -a", exit_code=0, stdout="Linux", stderr=""
),
),
],
)
chunks = chunk_report(report)
assert len(chunks) == 1
assert chunks[0].name == "ok"
def test_chunk_report_keeps_exit_255_with_output() -> None:
"""Exit 255 with stderr present is a real failure — keep it."""
report = CollectionReport(
host="test-host",
items=[
CollectedItem(
name="partial",
result=SSHCommandResult(
command="some-cmd",
exit_code=255,
stdout="",
stderr="Permission denied",
),
),
],
)
chunks = chunk_report(report)
assert len(chunks) == 1
assert "Permission denied" in chunks[0].content
def test_chunk_report_notes_no_output() -> None:
report = CollectionReport(
host="test-host",
items=[
CollectedItem(
name="silent",
result=SSHCommandResult(command="cmd", exit_code=0, stdout="", stderr=""),
),
],
)
chunks = chunk_report(report)
assert "(no output)" in chunks[0].content
def test_chunk_report_caps_large_content() -> None:
report = _report(("huge", "x" * 5000, 0))
chunks = chunk_report(report, max_chunk_chars=200)
assert len(chunks[0].content) <= 230
assert "...[truncated for RAG]" in chunks[0].content
# ---------------------------------------------------------------------------
# _cosine_similarity
# ---------------------------------------------------------------------------
def test_cosine_similarity_identical_vectors() -> None:
v = [1.0, 0.0, 0.0]
assert abs(_cosine_similarity(v, v) - 1.0) < 1e-9
def test_cosine_similarity_orthogonal_vectors() -> None:
a = [1.0, 0.0]
b = [0.0, 1.0]
assert abs(_cosine_similarity(a, b)) < 1e-9
def test_cosine_similarity_opposite_vectors() -> None:
a = [1.0, 0.0]
b = [-1.0, 0.0]
assert abs(_cosine_similarity(a, b) - (-1.0)) < 1e-9
def test_cosine_similarity_zero_vector_returns_zero() -> None:
assert _cosine_similarity([0.0, 0.0], [1.0, 0.0]) == 0.0
# ---------------------------------------------------------------------------
# retrieve
# ---------------------------------------------------------------------------
def _embedded(name: str, vec: list[float]) -> EmbeddedChunk:
return EmbeddedChunk(chunk=Chunk(name=name, content=f"content of {name}"), embedding=vec)
def test_retrieve_returns_top_k_by_similarity() -> None:
chunks = [
_embedded("close", [1.0, 0.0]), # most similar
_embedded("mid", [0.7, 0.7]),
_embedded("far", [0.0, 1.0]), # orthogonal to query
]
query = [1.0, 0.0]
result = retrieve(query, chunks, top_k=2)
assert len(result) == 2
assert result[0].name == "close"
assert result[1].name == "mid"
def test_retrieve_scored_includes_scores() -> None:
chunks = [
_embedded("close", [1.0, 0.0]),
_embedded("far", [0.0, 1.0]),
]
result = retrieve_scored([1.0, 0.0], chunks, top_k=2)
assert len(result) == 2
assert result[0][0].name == "close"
assert result[0][1] > result[1][1]
def test_retrieve_respects_top_k_larger_than_pool() -> None:
chunks = [_embedded("only", [1.0, 0.0])]
result = retrieve([1.0, 0.0], chunks, top_k=10)
assert len(result) == 1
def test_retrieve_empty_pool_returns_empty() -> None:
assert retrieve([1.0, 0.0], [], top_k=5) == []
def test_retrieve_top_k_zero_returns_empty() -> None:
chunks = [_embedded("x", [1.0, 0.0])]
assert retrieve([1.0, 0.0], chunks, top_k=0) == []

253
tests/test_runbook_store.py Normal file
View File

@@ -0,0 +1,253 @@
"""Tests for runbook_store — no network calls, ChromaDB mocked."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from tai.runbook_store import (
RunbookChunk,
RunbookMeta,
RunbookStore,
_build_embed_text,
_parse_frontmatter,
)
# ---------------------------------------------------------------------------
# _parse_frontmatter
# ---------------------------------------------------------------------------
def test_parse_frontmatter_extracts_service() -> None:
text = "---\nservice: nginx\n---\n## Body\nsome content"
meta, body = _parse_frontmatter(text)
assert meta.service == "nginx"
assert "## Body" in body
def test_parse_frontmatter_extracts_tags_as_list() -> None:
text = "---\ntags: nginx, web, http\n---\nbody"
meta, body = _parse_frontmatter(text)
assert meta.tags == ["nginx", "web", "http"]
def test_parse_frontmatter_extracts_symptoms_as_list() -> None:
text = "---\nsymptoms: 502 Bad Gateway, upstream refused\n---\nbody"
meta, body = _parse_frontmatter(text)
assert meta.symptoms == ["502 Bad Gateway", "upstream refused"]
def test_parse_frontmatter_returns_empty_meta_when_missing() -> None:
text = "# Just a heading\nno frontmatter here"
meta, body = _parse_frontmatter(text)
assert meta.service == ""
assert meta.tags == []
assert meta.symptoms == []
assert "Just a heading" in body
def test_parse_frontmatter_body_strips_delimiter() -> None:
text = "---\nservice: ssh\n---\nBody starts here."
_, body = _parse_frontmatter(text)
assert body.strip() == "Body starts here."
# ---------------------------------------------------------------------------
# _build_embed_text
# ---------------------------------------------------------------------------
def test_build_embed_text_includes_title_and_service() -> None:
meta = RunbookMeta(service="nginx", symptoms=["502"], tags=["web"])
result = _build_embed_text("nginx", meta, "body content")
assert "title: nginx" in result
assert "service: nginx" in result
def test_build_embed_text_includes_symptoms_and_tags() -> None:
meta = RunbookMeta(service="nginx", symptoms=["502 Bad Gateway"], tags=["web", "http"])
result = _build_embed_text("nginx", meta, "body")
assert "502 Bad Gateway" in result
assert "web" in result
def test_build_embed_text_includes_body_excerpt() -> None:
meta = RunbookMeta()
result = _build_embed_text("disk", meta, "check df -h output")
assert "check df -h output" in result
def test_build_embed_text_truncates_long_body() -> None:
meta = RunbookMeta()
long_body = "x" * 2000
result = _build_embed_text("disk", meta, long_body)
# Body excerpt is capped at 800 chars
assert len(result) < 1500
# ---------------------------------------------------------------------------
# RunbookStore — unit tests using tmp_path and mocked chromadb
# ---------------------------------------------------------------------------
def _make_chromadb_mock() -> MagicMock:
"""Return a chromadb mock that satisfies RunbookStore internals."""
collection = MagicMock()
collection.count.return_value = 0
client = MagicMock()
client.get_or_create_collection.return_value = collection
chroma_mod = MagicMock()
chroma_mod.PersistentClient.return_value = client
return chroma_mod
def _make_ai_mock(embedding: list[float] | None = None) -> MagicMock:
ai = MagicMock()
ai.embed.return_value = embedding or [0.1, 0.2, 0.3]
return ai
def test_runbook_store_sync_returns_count(tmp_path: Path) -> None:
(tmp_path / "nginx.md").write_text(
"---\nservice: nginx\ntags: web\nsymptoms: 502\n---\n## Body\ncontent"
)
(tmp_path / "ssh.md").write_text(
"---\nservice: ssh\ntags: ssh\nsymptoms: refused\n---\n## Body\ncontent"
)
chroma_mock = _make_chromadb_mock()
ai = _make_ai_mock()
with patch.dict("sys.modules", {"chromadb": chroma_mock}):
store = RunbookStore(tmp_path / "store")
count = store.sync(tmp_path, ai)
assert count == 2
def test_runbook_store_sync_calls_upsert(tmp_path: Path) -> None:
(tmp_path / "nginx.md").write_text("---\nservice: nginx\n---\nbody")
chroma_mock = _make_chromadb_mock()
collection = chroma_mock.PersistentClient.return_value.get_or_create_collection.return_value
ai = _make_ai_mock()
with patch.dict("sys.modules", {"chromadb": chroma_mock}):
store = RunbookStore(tmp_path / "store")
store.sync(tmp_path, ai)
collection.upsert.assert_called_once()
call_kwargs = collection.upsert.call_args.kwargs
assert "nginx" in call_kwargs["ids"]
def test_runbook_store_sync_empty_dir_returns_zero(tmp_path: Path) -> None:
chroma_mock = _make_chromadb_mock()
ai = _make_ai_mock()
with patch.dict("sys.modules", {"chromadb": chroma_mock}):
store = RunbookStore(tmp_path / "store")
count = store.sync(tmp_path, ai)
assert count == 0
def test_runbook_store_sync_missing_dir_raises(tmp_path: Path) -> None:
chroma_mock = _make_chromadb_mock()
ai = _make_ai_mock()
with patch.dict("sys.modules", {"chromadb": chroma_mock}):
store = RunbookStore(tmp_path / "store")
with pytest.raises(FileNotFoundError):
store.sync(tmp_path / "nonexistent", ai)
def test_runbook_store_query_returns_empty_when_no_docs(tmp_path: Path) -> None:
chroma_mock = _make_chromadb_mock()
# collection.count() returns 0 by default in our mock
ai = _make_ai_mock()
with patch.dict("sys.modules", {"chromadb": chroma_mock}):
store = RunbookStore(tmp_path / "store")
results = store.query("disk full", ai)
assert results == []
def test_runbook_store_query_returns_runbook_chunks(tmp_path: Path) -> None:
chroma_mock = _make_chromadb_mock()
collection = chroma_mock.PersistentClient.return_value.get_or_create_collection.return_value
collection.count.return_value = 2
collection.query.return_value = {
"documents": [["## Body\ncheck df -h"]],
"metadatas": [
[{"title": "disk", "service": "disk", "tags": "disk, storage", "symptoms": "full"}]
],
}
ai = _make_ai_mock()
with patch.dict("sys.modules", {"chromadb": chroma_mock}):
store = RunbookStore(tmp_path / "store")
results = store.query("disk is full", ai)
assert len(results) == 1
assert isinstance(results[0], RunbookChunk)
assert results[0].title == "disk"
assert results[0].service == "disk"
assert "disk" in results[0].tags
assert "df -h" in results[0].content
def test_runbook_store_list_indexed_returns_metadata(tmp_path: Path) -> None:
chroma_mock = _make_chromadb_mock()
collection = chroma_mock.PersistentClient.return_value.get_or_create_collection.return_value
collection.count.return_value = 1
collection.get.return_value = {
"metadatas": [{"title": "nginx", "service": "nginx", "tags": "web", "symptoms": "502"}]
}
with patch.dict("sys.modules", {"chromadb": chroma_mock}):
store = RunbookStore(tmp_path / "store")
entries = store.list_indexed()
assert len(entries) == 1
assert entries[0]["title"] == "nginx"
def test_runbook_store_count_delegates_to_collection(tmp_path: Path) -> None:
chroma_mock = _make_chromadb_mock()
collection = chroma_mock.PersistentClient.return_value.get_or_create_collection.return_value
collection.count.return_value = 5
with patch.dict("sys.modules", {"chromadb": chroma_mock}):
store = RunbookStore(tmp_path / "store")
assert store.count() == 5
def test_runbook_store_sync_single_upserts_one(tmp_path: Path) -> None:
runbook = tmp_path / "nginx.md"
runbook.write_text("---\nservice: nginx\ntags: web\n---\nbody text")
chroma_mock = _make_chromadb_mock()
collection = chroma_mock.PersistentClient.return_value.get_or_create_collection.return_value
ai = _make_ai_mock()
with patch.dict("sys.modules", {"chromadb": chroma_mock}):
store = RunbookStore(tmp_path / "store")
store.sync_single(runbook, ai)
collection.upsert.assert_called_once()
call_kwargs = collection.upsert.call_args.kwargs
assert call_kwargs["ids"] == ["nginx"]
def test_runbook_store_sync_single_missing_file_raises(tmp_path: Path) -> None:
chroma_mock = _make_chromadb_mock()
ai = _make_ai_mock()
with patch.dict("sys.modules", {"chromadb": chroma_mock}):
store = RunbookStore(tmp_path / "store")
with pytest.raises(FileNotFoundError):
store.sync_single(tmp_path / "missing.md", ai)

View File

@@ -0,0 +1,79 @@
"""Tests for session_store with mocked ChromaDB."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock, patch
from tai.session_store import PastSession, SessionStore, _build_embed_text
def _make_chromadb_mock() -> MagicMock:
collection = MagicMock()
collection.count.return_value = 0
client = MagicMock()
client.get_or_create_collection.return_value = collection
chroma_mod = MagicMock()
chroma_mod.PersistentClient.return_value = client
return chroma_mod
def _make_ai_mock(embedding: list[float] | None = None) -> MagicMock:
ai = MagicMock()
ai.embed.return_value = embedding or [0.1, 0.2, 0.3]
return ai
def test_build_embed_text_contains_host_issue_and_summary() -> None:
text = _build_embed_text(host="web01", issue="sssd broken", summary="Unit missing")
assert "host: web01" in text
assert "issue: sssd broken" in text
assert "Unit missing" in text
def test_index_session_upserts_with_metadata(tmp_path: Path) -> None:
chroma_mock = _make_chromadb_mock()
collection = chroma_mock.PersistentClient.return_value.get_or_create_collection.return_value
ai = _make_ai_mock()
with patch.dict("sys.modules", {"chromadb": chroma_mock}):
store = SessionStore(tmp_path / "store")
session_id = store.index_session("web01", "sssd broken", "summary text", ai)
assert session_id
collection.upsert.assert_called_once()
args = collection.upsert.call_args.kwargs
assert args["metadatas"][0]["host"] == "web01"
assert args["metadatas"][0]["issue"] == "sssd broken"
def test_query_returns_empty_when_no_docs(tmp_path: Path) -> None:
chroma_mock = _make_chromadb_mock()
ai = _make_ai_mock()
with patch.dict("sys.modules", {"chromadb": chroma_mock}):
store = SessionStore(tmp_path / "store")
results = store.query("why sssd", "web01", ai)
assert results == []
def test_query_returns_past_sessions(tmp_path: Path) -> None:
chroma_mock = _make_chromadb_mock()
collection = chroma_mock.PersistentClient.return_value.get_or_create_collection.return_value
collection.count.return_value = 1
collection.query.return_value = {
"ids": [["20260506T120000Z"]],
"documents": [["Root cause: package missing"]],
"metadatas": [[{"host": "web01", "issue": "sssd broken"}]],
}
ai = _make_ai_mock()
with patch.dict("sys.modules", {"chromadb": chroma_mock}):
store = SessionStore(tmp_path / "store")
results = store.query("sssd issue", "web01", ai)
assert len(results) == 1
assert isinstance(results[0], PastSession)
assert results[0].host == "web01"
assert "package missing" in results[0].summary

View File

@@ -82,6 +82,8 @@ def test_allows_expected_read_only_commands() -> None:
"systemctl status apache2",
"cat /etc/hosts",
"ss -lntp",
"rpm -q sssd",
"dpkg-query -W sssd",
]:
client.validate_read_only_command(command)