8 Commits

Author SHA1 Message Date
7749a02706 feat: add history UX and expand retention-focused roadmap
Some checks failed
CI / test (push) Failing after 15s
2026-05-11 21:07:39 +02:00
964aee3481 merge: package presence and session memory readiness
Some checks failed
CI / test (push) Failing after 18s
2026-05-11 20:38:27 +02:00
013410999a feat: finalize package presence branch and docs alignment 2026-05-11 20:38:16 +02:00
fab1f3afbf lint
All checks were successful
CI / test (push) Successful in 23s
2026-05-06 05:09:35 +02:00
54b202bdc2 update
Some checks failed
CI / test (push) Failing after 19s
2026-05-06 05:06:45 +02:00
bbc75b1559 lint
Some checks failed
CI / test (push) Failing after 19s
2026-05-06 05:03:51 +02:00
d5e1822644 update
Some checks failed
CI / test (push) Failing after 15s
2026-05-06 05:02:38 +02:00
74a56e3113 merge: feature/rag-tier1 into main
Some checks failed
CI / test (push) Failing after 15s
2026-05-06 04:50:51 +02:00
16 changed files with 1101 additions and 51 deletions

View File

@@ -10,7 +10,20 @@ ______________________________________________________________________
### Added
- Nothing yet.
- Tier 3 core session memory implementation:
- new `src/tai/session_store.py` persistent ChromaDB store
- `--session-memory` option on `tai run`
- prior-session retrieval injected into analysis/follow-up prompts
- final response indexing at session end
- Planner enhancements for broader service detection:
- generic service candidate extraction from free text
- package presence probes in plans (`rpm -q` and `dpkg-query -W`)
- SSH read-only allowlist expanded to permit package presence commands (`rpm`, `dpkg-query`)
- Session memory tests in `tests/test_session_store.py`
### Changed
- Documentation alignment updates in README and ROADMAP to reflect implemented session memory and package-presence capabilities.
______________________________________________________________________

View File

@@ -191,9 +191,8 @@ pytest tests/test_plan.py tests/test_ai.py tests/test_cli.py
## Known Limits
- Service-specific presence checks currently apply to recognized service/subsystem names.
- Package-manager-level presence checks are not yet in the default read-only command allowlist.
- Tier 3 persistent session memory is not implemented yet.
- Deep service-specific probes (known binary/config/package aliases) are richer for recognized services than generic service names.
- Session memory is available via `--session-memory`, but dedicated history UX commands (`tai history`, `/history`) are not implemented yet.
## Changelog and Roadmap

View File

@@ -130,7 +130,7 @@ model weights alone. Three tiers of increasing capability, each buildable indepe
- Build compounding institutional memory from past troubleshooting sessions
- Keep all data local — no embeddings or session content leaves the network
---
______________________________________________________________________
### Technology Decisions Required
@@ -143,9 +143,9 @@ model weights alone. Three tiers of increasing capability, each buildable indepe
| Hybrid retrieval | Semantic only, BM25 only, hybrid | Hybrid (BM25 keyword + cosine semantic) for best recall | ⬜ Pending |
| Reranking | None, cross-encoder (`ms-marco-MiniLM`), LLM-as-judge | Cross-encoder rerank pass before prompt injection | ⬜ Pending |
| Runbook format | Markdown, YAML, JSON | Markdown (human-editable, version-controllable) | ✅ Implemented |
| Session index storage | Local `~/.tai/`, configurable path | `~/.tai/sessions/` with ChromaDB collection | ⬜ Pending |
| Session index storage | Local `~/.tai/`, configurable path | `~/.tai/sessions/` with ChromaDB collection | ✅ Implemented (core) |
---
______________________________________________________________________
### Tier 1 — Diagnostic Chunk Retrieval (in-memory, per-session)
@@ -155,31 +155,36 @@ Status: ✅ Implemented
On busy hosts this floods the context window with irrelevant output, degrading quality.
**Approach:**
- After collection, split each command's output into overlapping token chunks (e.g. 512 tokens, 64 overlap)
- Embed all chunks using `nomic-embed-text` via Ollama embeddings API
- On each question (initial + follow-up), embed the question and retrieve top-k chunks by cosine similarity
- Inject only retrieved chunks into the prompt, not the full dump
**New module:** `src/tai/rag_retriever.py`
- `chunk_report(report) -> list[Chunk]`
- `embed_chunks(chunks) -> list[EmbeddedChunk]`
- `retrieve(question, embedded_chunks, top_k) -> list[Chunk]`
**Changes to existing code:**
- `prompt_builder.py`: accept `retrieved_chunks` instead of full `CollectionReport` for RAG-mode prompts
- `cli.py`: embed report after collection, pass retriever to `_run_analysis` and `_run_followup_analysis`
- `ai_client.py`: add `embed(text) -> list[float]` method using Ollama `/api/embeddings`
**Companion features buildable at same time:**
- `--no-rag` flag to bypass retrieval and use full dump (backwards compat)
- Token budget display: show user how many tokens are being sent vs. saved
- Per-chunk source attribution in AI response (which command produced the evidence)
**Tests:**
- `tests/test_rag_retriever.py`: chunk splitting, cosine similarity ranking, top-k retrieval
- `tests/test_ai.py`: add `test_embed_returns_float_list()`
---
______________________________________________________________________
### Tier 2 — Runbook Knowledge Base (persistent, ChromaDB)
@@ -189,63 +194,74 @@ Status: ✅ Implemented
specific environments, distros, or internal conventions.
**Approach:**
- Maintain a version-controlled corpus of Markdown runbooks in `runbooks/` directory
- On first run (or `tai runbooks --sync`), embed all runbooks and persist to ChromaDB collection
- On each analysis, retrieve top-3 relevant runbook chunks alongside diagnostic chunks
- Inject as a separate `## Runbook Context` section in the prompt
**New module:** `src/tai/runbook_store.py`
- `RunbookStore`: wraps ChromaDB collection
- `sync(runbooks_dir) -> int` — embed and upsert all runbooks
- `query(question, top_k) -> list[RunbookChunk]`
**New directory:** `runbooks/`
- `ssh.md`, `nginx.md`, `postgres.md`, `disk.md`, `kernel.md`, etc.
- Each runbook: YAML frontmatter (`service`, `symptoms`, `tags`) + Markdown body
**New CLI command:** `tai runbooks --sync [--path ./runbooks]`
**Changes to existing code:**
- `prompt_builder.py`: add `build_message_with_runbooks(retrieved_chunks, runbook_chunks)`
- `cli.py`: optionally load `RunbookStore`, query it per analysis turn
**Companion features buildable at same time:**
- `tai runbooks --list` — show indexed runbooks and last sync time
- `tai runbooks --add <file>` — index a single runbook
- `/runbooks` slash command in interactive mode — show which runbooks were retrieved
- Runbook citation in AI output: "Based on runbook: `ssh.md#AuthenticationFailures`"
---
______________________________________________________________________
### Tier 3 — Session Memory Index (institutional learning)
Status: ⬜ Pending
Status: ✅ Implemented (core retrieval/indexing) / ⬜ UX commands pending
**Problem:** Every session starts from zero. Repeat incidents on the same host or
same issue type get no benefit from past work.
**Approach:**
**Implemented now:**
- On session end, embed the session summary (issue + root cause + actions) and upsert into a persistent ChromaDB collection (`~/.tai/sessions/`)
- On session start, query for similar past sessions by issue text + hostname
- Inject top-2 past sessions as `## Prior Sessions` context
- Optionally: `/history` command in interactive mode to surface past sessions explicitly
**Pending UX layer:**
- `/history` command in interactive mode to surface past sessions explicitly
**New module:** `src/tai/session_store.py`
- `SessionStore`: wraps ChromaDB collection at `~/.tai/sessions/`
- `index_session(session_log_path)` — embed and store completed session
- `query_similar(issue, host, top_k) -> list[PastSession]`
- `index_session(host, issue, summary, ai)` — embed and store completed session
- `query(question, host, ai, top_k) -> list[PastSession]`
**Changes to existing code:**
- `session_log.py`: add `summarise() -> str` method (issue + final AI response)
- `cli.py`: query `SessionStore` at session start, index at session end
- `cli.py`: query `SessionStore` during analysis turns and index final responses at session end
**Companion features buildable at same time:**
- `tai history` CLI subcommand — search past sessions by keyword
- `tai history --host <hostname>` — all sessions for a host
- `tai history --export <file>` — export session summaries as Markdown report
- Auto-suggest: "Similar issue found from 2 weeks ago — load context? [y/N]"
---
______________________________________________________________________
### Implementation Order
@@ -258,6 +274,7 @@ Tier 3 (session memory) ← Builds on Tier 2 infrastructure. Minimal extr
```
**Estimated effort:**
- Tier 1: 23 days (new module + prompt builder changes + tests)
- Tier 2: 34 days (ChromaDB + runbook authoring + CLI command + tests)
- Tier 3: 12 days (reuses Tier 2 infrastructure)
@@ -293,14 +310,205 @@ ______________________________________________________________________
| Date | Decision | Outcome |
|------|----------|---------|
| 2026-05-04 | Implementation language | Python — with single distributable binary via Nuitka |
| — | AI inference backend | vLLM (provisional) |
| | Default model | `gemma4:a4b` (provisional) |
| 2026-05-04 | AI backend API | OpenAI-compatible API endpoint (local Ollama by default) |
| 2026-05-04 | Default model | `gemma3:4b` |
| 2026-05-04 | SSH auth methods | Keypair only (ed25519/RSA); auto-accept new hosts; reject on key change (MITM) |
| 2026-05-04 | Bastion host support | `--jump-host` flag via SSH native ProxyJump |
| 2026-05-04 | SSH config behavior | Use `~/.ssh/config` by default; allow override via `--ignore-ssh-config` |
| 2026-05-04 | CLI vs interactive mode | Interactive: REPL for v0.1, `textual` TUI for v0.2+ |
| 2026-05-04 | RAG embedding model | `nomic-embed-text` via Ollama (local, air-gapped safe) — ⬜ pending confirmation |
| 2026-05-04 | RAG embedding model | `nomic-embed-text` via Ollama (local, air-gapped safe) |
| 2026-05-04 | RAG vector store (Tier 1) | In-memory numpy cosine similarity — zero deps, session-scoped |
| 2026-05-04 | RAG vector store (Tier 2/3) | `chromadb` embedded mode (default) or `qdrant` self-hosted — ⬜ pending confirmation |
| 2026-05-04 | RAG vector store (Tier 2/3) | `chromadb` embedded mode (default) or `qdrant` self-hosted |
| 2026-05-04 | RAG chunking unit | Command-boundary splitting — each collected command = one or more chunks |
| 2026-05-04 | Runbook format | Markdown with YAML frontmatter, version-controlled in `runbooks/` directory |
______________________________________________________________________
## End-State UX Goal
After the current CLI and memory roadmap phases are stable, the long-term UX goal is a full-screen terminal TUI with an ncurses-style workflow.
### Target End-State
- Split-pane troubleshooting workspace (diagnostics, AI output, and command/input area)
- Live command/probe status with clear success/failure indicators
- In-session history browser for prior questions, retrieved evidence, and related past sessions
- Keyboard-first navigation for operators running in SSH-only environments
### Delivery Approach
- Keep shipping incremental CLI features first (current roadmap order remains unchanged)
- Promote stable workflows into TUI panels once behavior is proven in CLI mode
- Treat the TUI as a final UX consolidation milestone, not a blocker for core troubleshooting capabilities
______________________________________________________________________
## Container Distribution Goal (Docker)
After core CLI/TUI workflows stabilize, provide an official Docker image as an additional distribution target.
### Container Execution Model (Decision)
- Docker is a one-shot invocation target, not a daemon/service mode
- Each run executes a single `tai` command and exits
- State is persisted only through mounted host volumes
### Why Docker Is Valuable Here
- Reproducible runtime: pin Python and dependency versions to remove host-level drift
- Faster operator onboarding: run with one command instead of local Python setup
- Cleaner CI/CD release path: publish versioned images aligned with git tags
- Safer local footprint: isolate dependencies from the host OS package manager
### Subgoals
1. Base image and runtime hardening
- Multi-stage Dockerfile with slim runtime image
- Non-root runtime user and minimal filesystem permissions
- Healthcheck for CLI startup and version command
2. Runtime integration for SSH workflows
- Documented mounts for `~/.ssh` (read-only where possible) and known-hosts handling
- Pass-through for SSH config when needed (`--ignore-ssh-config` behavior documented)
- Clear guidance for jump-host and bastion scenarios from inside the container
- Documented one-shot run examples for `tai run` and `tai history`
3. Persistent data strategy
- Required volume mount guidance for runbook store (`~/.tai/runbooks`)
- Required volume mount guidance for session memory/history (`~/.tai/sessions`)
- Optional bind mount for JSONL logs and report export artifacts
- Clear defaults for container paths and equivalent host path mappings
4. Release and quality gates
- Build and publish image on tagged releases
- Smoke tests in CI: probe mode, collect mode, and history command against mocked endpoints
- Version labeling (image tags and OCI metadata) tied to changelog/release tags
### Data Retention and Lifecycle Policy
Retention behavior must be explicit and configurable at runtime. Defaults should be conservative and documented.
1. Retention classes
- Session memory store (`~/.tai/sessions`): keep semantically indexed summaries for troubleshooting continuity
- Runbook store (`~/.tai/runbooks`): retain until explicitly replaced or pruned by sync policy
- JSONL logs and exported reports: operator-controlled retention with optional TTL cleanup
2. Retention controls
- Add CLI controls for age-based pruning (for example `--retain-days` on cleanup commands)
- Add host-scoped cleanup (delete history for one host) and full-store cleanup (all hosts)
- Add dry-run cleanup mode to show what would be deleted before applying changes
3. No-persist mode
- Add a documented ephemeral mode where no session memory or logs are written
- Ensure one-shot diagnostics can run in read-only operational contexts
### Configuration and State Persistence Model
Configuration and retained state should be predictable across container upgrades and host environments.
1. Mount and path contract
- Define canonical container paths for `~/.tai/runbooks`, `~/.tai/sessions`, and optional log/export paths
- Document required versus optional mounts and expected permissions for each
- Document UID/GID mapping guidance to prevent host volume ownership issues
2. Schema and compatibility
- Introduce explicit storage schema version metadata for persistent stores
- Define upgrade behavior for older stores (migrate, re-index, or fail with clear guidance)
- Add compatibility notes for image upgrades and rollback expectations
3. Backup and recovery
- Provide export/import workflows for session memory and runbook indexes
- Document minimal backup set and restore order for disaster recovery
### Security and Privacy for Retained Data
Persisted troubleshooting evidence can include sensitive operational data and must be handled accordingly.
1. Data minimization
- Add optional redaction hooks for common sensitive patterns before persistence
- Keep prompt-only transient data separate from persisted summary/index content
2. Runtime hardening
- Target non-root container execution with read-only root filesystem by default
- Require explicit writable mounts only for retained data locations
3. Auditable behavior
- Log retention-affecting operations (cleanup, purge, export/import) with timestamps and scope
- Define stable exit codes for cleanup and retention workflows to support automation
### Kubernetes Position
Kubernetes is out of scope for this delivery plan.
- `tai` is currently an operator-invoked troubleshooting client, not a long-running service
- AI inference is external to `tai` (OpenAI-compatible endpoint), reducing the need for in-cluster model orchestration
- SSH key/config handling and per-operator context are simpler with local or single-container execution
Kubernetes can be revisited only if `tai` evolves into a centralized multi-user service with queueing, RBAC, and shared tenancy requirements.
______________________________________________________________________
## Final Long-Term Goal: Full Rust Migration
This is a final-stage roadmap goal and remains explicitly out of near-term scope.
It should begin only after the Python implementation, TUI direction, Docker one-shot model,
and retention/persistence policies are stable and proven in production usage.
### Why This Is the Final Goal
- Improve execution latency and startup speed for both native runs and container one-shot invocations
- Produce a single, portable native binary with minimal runtime dependency footprint
- Strengthen reliability and memory safety under heavy log parsing and concurrent workflows
- Simplify long-term packaging and distribution across Linux targets
### Migration Objectives
1. Preserve feature parity first
- Match existing CLI behavior, interactive workflows, RAG integration, runbook management, and history/session-memory features
- Keep command semantics and safety boundaries equivalent during transition
2. Target both distribution modes
- Native Rust binary for direct operator use
- Docker image built around the Rust binary for one-shot execution with mounted persistent volumes
3. Keep compatibility guardrails
- Define persistent data format compatibility or migration tooling for runbook/session stores
- Preserve operator-visible flags where practical to reduce migration friction
### Suggested Delivery Phases
1. Build baseline Rust CLI scaffold with feature-flagged parity checkpoints
2. Port SSH execution and read-only policy enforcement modules
3. Port planner, collectors, prompt composition, and AI client adapters
4. Port session memory/history and runbook workflows with migration tests
5. Port interactive UX/TUI layer and deprecate Python runtime path
### Rust Toolchain End-State
- Standardize on Cargo-based build/test/lint pipeline (`cargo fmt`, `cargo clippy`, `cargo test`)
- Add release profile optimization and reproducible build settings
- Publish signed native artifacts and Docker images derived from Rust release binaries
### Decision Gate Before Starting
Begin Rust migration only when:
- Python roadmap milestones are complete and stable
- Container distribution and retention policy workflows are operationally validated
- A parity test matrix exists to prove behavior equivalence during migration

View File

@@ -54,3 +54,11 @@ select = ["E", "F", "I", "UP", "B"]
python_version = "3.11"
strict = true
warn_unused_configs = true
[[tool.mypy.overrides]]
module = ["chromadb", "chromadb.*"]
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = ["tai.chroma_telemetry"]
disable_error_code = ["misc"]

View File

@@ -7,9 +7,10 @@ disabled, so tai wires ChromaDB to this no-op client instead.
from __future__ import annotations
from typing import override
from chromadb.config import System
from chromadb.telemetry.product import ProductTelemetryClient, ProductTelemetryEvent
from overrides import override
class NoOpProductTelemetryClient(ProductTelemetryClient):

View File

@@ -30,6 +30,7 @@ from tai.prompt_builder import (
from tai.rag_retriever import EmbeddedChunk, chunk_report, retrieve_scored
from tai.runbook_store import RunbookChunk, RunbookStore
from tai.session_log import SessionLogger
from tai.session_store import PastSession, SessionStore
from tai.ssh_client import SSHClient, SSHCommandResult, SSHConnectionConfig, SSHSession
app = typer.Typer(no_args_is_help=True, add_completion=False)
@@ -38,6 +39,83 @@ app.add_typer(runbooks_app, name="runbooks")
console = Console()
@app.command("history")
def history(
query: Annotated[
str | None,
typer.Option("--query", help="Optional keyword to match issue/summary text."),
] = None,
host: Annotated[
str | None,
typer.Option("--host", help="Filter history by host."),
] = None,
limit: Annotated[
int,
typer.Option("--limit", min=1, help="Maximum number of sessions to display."),
] = 20,
export: Annotated[
str | None,
typer.Option("--export", help="Optional path to write results as Markdown."),
] = None,
session_memory_path: Annotated[
str,
typer.Option(
"--session-memory",
help="Path to persistent session memory store. Defaults to ~/.tai/sessions.",
),
] = "~/.tai/sessions",
) -> None:
"""Search or list previously indexed troubleshooting sessions."""
from pathlib import Path
try:
store = SessionStore(session_memory_path)
except Exception as exc: # noqa: BLE001
console.print(f"[red]Could not open session memory:[/red] {exc}")
raise typer.Exit(code=1) from exc
try:
if query:
sessions = store.search_keyword(query, host=host, limit=limit)
else:
sessions = store.list_recent(host=host, limit=limit)
except Exception as exc: # noqa: BLE001
console.print(f"[red]History query failed:[/red] {exc}")
raise typer.Exit(code=1) from exc
if not sessions:
scope = f" for host {host}" if host else ""
qualifier = f" matching '{query}'" if query else ""
console.print(f"[yellow]No session history found{scope}{qualifier}.[/yellow]")
return
title = f"{len(sessions)} session(s)"
if host:
title += f" for host={host}"
if query:
title += f" matching '{query}'"
console.print(f"[bold]{title}[/bold]")
for sess in sessions:
summary_line = (sess.summary or "").strip().splitlines()
excerpt = summary_line[0] if summary_line else "(no summary)"
if len(excerpt) > 120:
excerpt = f"{excerpt[:120].rstrip()}..."
console.print(
f" [green]{sess.session_id}[/green] host={sess.host} issue={sess.issue}\n"
f" [dim]{excerpt}[/dim]"
)
if export:
output_path = Path(export).expanduser().resolve()
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(
_format_history_markdown(sessions, query=query, host=host),
encoding="utf-8",
)
console.print(f"[green]✓ Exported[/green] {output_path}")
@app.command()
def run(
issue: Annotated[str, typer.Argument(help="Ticket text or issue summary.")],
@@ -151,6 +229,16 @@ def run(
help="Path to a synced runbook ChromaDB store. Enables Tier 2 RAG.",
),
] = None,
session_memory_path: Annotated[
str | None,
typer.Option(
"--session-memory",
help=(
"Path to persistent session memory store for prior-session retrieval "
"(Tier 4). Omit to disable."
),
),
] = None,
) -> None:
"""Start an interactive troubleshooting session scaffold."""
try:
@@ -207,6 +295,17 @@ def run(
except Exception as exc: # noqa: BLE001
console.print(f"[yellow]Runbook store unavailable:[/yellow] {exc}")
session_store: SessionStore | None = None
if session_memory_path:
try:
session_store = SessionStore(session_memory_path)
mem_count = session_store.count()
console.print(
f"[dim]Session memory: {mem_count} indexed at {session_memory_path}[/dim]"
)
except Exception as exc: # noqa: BLE001
console.print(f"[yellow]Session memory unavailable:[/yellow] {exc}")
try:
asyncio.run(
_async_main(
@@ -220,6 +319,7 @@ def run(
no_rag=no_rag,
rag_debug=rag_debug,
runbook_store=runbook_store,
session_store=session_store,
logger=logger,
)
)
@@ -245,6 +345,7 @@ async def _async_main(
no_rag: bool,
rag_debug: bool,
runbook_store: RunbookStore | None,
session_store: SessionStore | None,
logger: SessionLogger | None,
) -> None:
"""Open a single SSH session and run probe / collection / analysis through it."""
@@ -291,19 +392,22 @@ async def _async_main(
},
)
initial_response: str | None = None
if analyze and report is not None:
_run_analysis(
initial_response = _run_analysis(
ai_config,
req.issue,
report,
no_rag=no_rag,
rag_debug=rag_debug,
runbook_store=runbook_store,
session_store=session_store,
logger=logger,
)
interactive_response: str | None = None
if interactive:
await _interactive_loop(
interactive_response = await _interactive_loop(
session,
req,
ai_config,
@@ -311,9 +415,14 @@ async def _async_main(
no_rag=no_rag,
rag_debug=rag_debug,
runbook_store=runbook_store,
session_store=session_store,
logger=logger,
)
final_response = interactive_response or initial_response
if session_store is not None and final_response:
_index_session_memory(session_store, ai_config, req, final_response, logger=logger)
async def _interactive_loop(
session: SSHSession,
@@ -324,13 +433,14 @@ async def _interactive_loop(
no_rag: bool = False,
rag_debug: bool = False,
runbook_store: RunbookStore | None = None,
session_store: SessionStore | None = None,
logger: SessionLogger | None,
) -> None:
) -> str | None:
"""Run a follow-up loop for collecting and conversational analysis."""
console.print(
Panel(
"Ask questions directly, or use [bold]/collect[/bold], "
"[bold]/analyze[/bold], [bold]/help[/bold], [bold]/quit[/bold]",
"[bold]/analyze[/bold], [bold]/history[/bold], [bold]/help[/bold], [bold]/quit[/bold]",
title="[bold cyan]Interactive Mode[/bold cyan]",
border_style="cyan",
padding=(0, 1),
@@ -340,6 +450,7 @@ async def _interactive_loop(
prior_questions: list[str] = []
embedded_chunks: list[EmbeddedChunk] | None = None
ai_embed = AIClient(ai_config)
last_response: str | None = None
if not no_rag and report is not None:
embedded_chunks, index_error, index_ms = await asyncio.to_thread(
@@ -377,14 +488,14 @@ async def _interactive_loop(
else:
line = sys.stdin.readline() # non-TTY / piped mode
if not line:
return
return last_response
command = line.strip()
console.print(f"\n[bold cyan]tai[/bold cyan][dim] >[/dim] {command}")
except (EOFError, KeyboardInterrupt):
console.print("\n[yellow]Exiting interactive mode.[/yellow]")
if logger is not None:
logger.log_event("interactive_exit", {"reason": "signal_or_eof"})
return
return last_response
if not command:
continue
@@ -393,13 +504,14 @@ async def _interactive_loop(
console.print("[green]Bye.[/green]")
if logger is not None:
logger.log_event("interactive_exit", {"reason": "user_quit"})
return
return last_response
if command == "/help":
console.print(
Panel(
"[bold]/collect[/bold] — re-run diagnostics\n"
"[bold]/analyze[/bold] — re-analyze current diagnostics\n"
"[bold]/history[/bold] — show prior sessions for this host\n"
"[bold]/help[/bold] — show this message\n"
"[bold]/quit[/bold] — end session\n"
"[dim]Anything else is sent directly to the AI as a question.[/dim]",
@@ -410,6 +522,52 @@ async def _interactive_loop(
)
continue
if command.startswith("/history"):
if session_store is None:
console.print(
"[yellow]Session memory is disabled. "
"Use --session-memory to enable /history.[/yellow]"
)
continue
keyword = command.removeprefix("/history").strip()
try:
sessions = (
session_store.search_keyword(keyword, host=req.host, limit=5)
if keyword
else session_store.list_recent(host=req.host, limit=5)
)
except Exception as exc: # noqa: BLE001
console.print(f"[yellow]History unavailable:[/yellow] {exc}")
continue
if not sessions:
qualifier = f" matching '{keyword}'" if keyword else ""
console.print(f"[yellow]No prior sessions found{qualifier}.[/yellow]")
continue
lines = []
for sess in sessions:
issue = sess.issue.strip() or "(unknown issue)"
lines.append(f"- [bold]{sess.session_id}[/bold] — {issue}")
heading = "Prior sessions"
if keyword:
heading += f" matching '{keyword}'"
console.print(
Panel(
"\n".join(lines),
title=f"[bold cyan]{heading}[/bold cyan]",
border_style="cyan",
padding=(0, 1),
)
)
if logger is not None:
logger.log_event(
"interactive_history",
{"keyword": keyword or None, "count": len(sessions)},
)
continue
if command == "/collect":
plan = plan_from_request(req)
console.print(f"[cyan]Collecting diagnostics:[/cyan] {len(plan)} commands")
@@ -466,7 +624,7 @@ async def _interactive_loop(
console.print("[red]No diagnostics available to analyze.[/red]")
continue
_run_followup_analysis(
response = _run_followup_analysis(
ai_config,
req.issue,
report,
@@ -475,11 +633,14 @@ async def _interactive_loop(
embedded_chunks=embedded_chunks,
rag_debug=rag_debug,
runbook_store=runbook_store,
session_store=session_store,
logger=logger,
)
prior_questions.append("/analyze")
if logger is not None:
logger.log_event("interactive_followup", {"question": "/analyze"})
last_response = response
continue
continue
if report is None:
@@ -523,7 +684,7 @@ async def _interactive_loop(
console.print("[red]No diagnostics available to analyze.[/red]")
continue
_run_followup_analysis(
response = _run_followup_analysis(
ai_config,
req.issue,
report,
@@ -532,11 +693,13 @@ async def _interactive_loop(
embedded_chunks=embedded_chunks,
rag_debug=rag_debug,
runbook_store=runbook_store,
session_store=session_store,
logger=logger,
)
prior_questions.append(command)
if logger is not None:
logger.log_event("interactive_followup", {"question": command})
last_response = response
def _try_embed_report(
@@ -597,8 +760,9 @@ def _run_analysis(
no_rag: bool = False,
rag_debug: bool = False,
runbook_store: RunbookStore | None = None,
session_store: SessionStore | None = None,
logger: SessionLogger | None,
) -> None:
) -> str:
"""Send collected data to the AI and stream the analysis to stdout."""
console.print()
console.print(Rule("[bold cyan]Analysis[/bold cyan]", style="cyan"))
@@ -606,10 +770,16 @@ def _run_analysis(
ai = AIClient(ai_config)
system_prompt = build_system_prompt()
runbook_chunks = _query_runbooks(runbook_store, issue, ai, top_k=1)
past_sessions = _query_sessions(session_store, issue, report.host, ai, top_k=2)
user_message: str
if no_rag:
user_message = build_user_message(issue, report, runbook_chunks=runbook_chunks or None)
user_message = build_user_message(
issue,
report,
runbook_chunks=runbook_chunks or None,
past_sessions=past_sessions or None,
)
else:
try:
chunks = chunk_report(report)
@@ -628,16 +798,28 @@ def _run_analysis(
report.host,
selected,
runbook_chunks=runbook_chunks or None,
past_sessions=past_sessions or None,
)
else:
user_message = build_user_message(issue, report, runbook_chunks=runbook_chunks or None)
user_message = build_user_message(
issue,
report,
runbook_chunks=runbook_chunks or None,
past_sessions=past_sessions or None,
)
except Exception as exc: # noqa: BLE001
console.print(
"[yellow]RAG unavailable for initial analysis; using full-context fallback.[/yellow]"
"[yellow]RAG unavailable for initial analysis; "
"using full-context fallback.[/yellow]"
)
if logger is not None:
logger.log_event("rag_index", {"status": "fallback", "error": str(exc)})
user_message = build_user_message(issue, report, runbook_chunks=runbook_chunks or None)
user_message = build_user_message(
issue,
report,
runbook_chunks=runbook_chunks or None,
past_sessions=past_sessions or None,
)
try:
response = _complete_ai_response(
ai,
@@ -662,6 +844,7 @@ def _run_analysis(
"guardrail_warnings": warnings,
},
)
return response
except Exception as exc: # noqa: BLE001
console.print(f"[red]AI analysis failed:[/red] {exc}")
if logger is not None:
@@ -688,6 +871,7 @@ def _run_followup_analysis(
embedded_chunks: list[EmbeddedChunk] | None = None,
rag_debug: bool = False,
runbook_store: RunbookStore | None = None,
session_store: SessionStore | None = None,
logger: SessionLogger | None,
) -> str:
"""Run grounded follow-up analysis re-anchored to current diagnostics.
@@ -702,6 +886,7 @@ def _run_followup_analysis(
ai = AIClient(ai_config)
system_prompt = build_system_prompt()
runbook_chunks = _query_runbooks(runbook_store, question, ai, top_k=1)
past_sessions = _query_sessions(session_store, question, report.host, ai, top_k=2)
user_message: str
retrieved_names: list[str] = []
@@ -724,6 +909,7 @@ def _run_followup_analysis(
question,
prior_questions,
runbook_chunks=runbook_chunks or None,
past_sessions=past_sessions or None,
)
if rag_debug:
pairs = ", ".join(
@@ -741,12 +927,14 @@ def _run_followup_analysis(
user_message = build_followup_message(
issue, report, question, prior_questions,
runbook_chunks=runbook_chunks or None,
past_sessions=past_sessions or None,
)
else:
fallback_reason = "rag not indexed"
user_message = build_followup_message(
issue, report, question, prior_questions,
runbook_chunks=runbook_chunks or None,
past_sessions=past_sessions or None,
)
if logger is not None:
@@ -826,6 +1014,68 @@ def _query_runbooks(
return []
def _query_sessions(
store: SessionStore | None,
question: str,
host: str,
ai: AIClient,
*,
top_k: int = 2,
) -> list[PastSession]:
"""Query the session memory store silently; returns empty list on failures."""
if store is None:
return []
try:
return store.query(question, host, ai, top_k=top_k)
except Exception: # noqa: BLE001
return []
def _index_session_memory(
store: SessionStore,
ai_config: AIConfig,
req: TroubleshootRequest,
summary: str,
*,
logger: SessionLogger | None,
) -> None:
"""Persist final session summary for future retrieval; non-fatal on failure."""
try:
ai = AIClient(ai_config)
session_id = store.index_session(req.host, req.issue, summary, ai)
if logger is not None:
logger.log_event("session_memory_indexed", {"session_id": session_id})
except Exception as exc: # noqa: BLE001
if logger is not None:
logger.log_event("session_memory_error", {"error": str(exc)})
def _format_history_markdown(
sessions: list[PastSession],
*,
query: str | None,
host: str | None,
) -> str:
"""Render session history rows as a Markdown report."""
lines = ["# tai session history", ""]
if host:
lines.append(f"Host filter: {host}")
if query:
lines.append(f"Keyword filter: {query}")
if host or query:
lines.append("")
for sess in sessions:
lines.append(f"## {sess.session_id}")
lines.append(f"- Host: {sess.host}")
lines.append(f"- Issue: {sess.issue}")
lines.append("")
lines.append(sess.summary.strip() or "(no summary)")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
# ---------------------------------------------------------------------------
# runbooks sub-app
# ---------------------------------------------------------------------------

View File

@@ -149,6 +149,37 @@ _SERVICE_BINARIES: dict[str, list[str]] = {
"apparmor": ["/usr/sbin/aa-status", "/sbin/apparmor_parser"],
}
_SERVICE_PACKAGES: dict[str, list[str]] = {
"docker": ["docker", "docker-ce"],
"sssd": ["sssd"],
"sshd": ["openssh-server", "openssh"],
"x2go": ["x2goserver", "x2goserver-xsession"],
"xorg": ["xorg-server", "xserver-xorg-core"],
"wayland": ["wayland", "xwayland"],
"selinux": ["selinux-policy", "selinux-policy-targeted"],
"apparmor": ["apparmor"],
}
_GENERIC_SERVICE_STOPWORDS: frozenset[str] = frozenset(
{
"a",
"an",
"and",
"app",
"application",
"daemon",
"for",
"is",
"my",
"not",
"service",
"systemd",
"the",
"unit",
"working",
}
)
# ---------------------------------------------------------------------------
# Command sets
# ---------------------------------------------------------------------------
@@ -225,6 +256,9 @@ def plan_from_request(request: TroubleshootRequest) -> CollectionPlan:
)
for idx, binary_path in enumerate(_SERVICE_BINARIES.get(svc, []), start=1):
plan.add(f"binary-{svc}-{idx}", f"ls -l {binary_path}")
for idx, package_name in enumerate(_service_package_candidates(svc), start=1):
plan.add(f"package-rpm-{svc}-{idx}", f"rpm -q {package_name}")
plan.add(f"package-dpkg-{svc}-{idx}", f"dpkg-query -W {package_name}")
plan.add(f"service-{svc}", f"systemctl status {svc}")
plan.add(f"journal-{svc}", f"journalctl -u {svc} -n 100 --no-pager")
for cfg_path in _SERVICE_CONFIGS.get(svc, []):
@@ -258,7 +292,11 @@ def _issue_words(issue: str) -> set[str]:
def _extract_services(issue: str) -> list[str]:
"""Return known service names mentioned in *issue*."""
"""Return service candidates mentioned in *issue*.
Includes known services plus generic service-like tokens near words such
as "service", "daemon", and "unit".
"""
words = _issue_words(issue)
found: list[str] = []
for svc in _KNOWN_SERVICES:
@@ -266,6 +304,58 @@ def _extract_services(issue: str) -> list[str]:
svc_words = {svc, svc.rstrip("d"), svc.replace("-", ""), svc.replace("-server", "")}
if words & svc_words:
found.append(svc)
for svc in _extract_generic_service_candidates(issue):
if svc not in found:
found.append(svc)
return found
def _extract_generic_service_candidates(issue: str) -> list[str]:
"""Extract likely service names from free text even when not pre-registered."""
tokens = [tok.lower() for tok in re.findall(r"[a-zA-Z0-9_.@-]+", issue)]
if not tokens:
return []
candidates: list[str] = []
for idx, token in enumerate(tokens):
normalized = token[:-8] if token.endswith(".service") else token
if _is_safe_service_name(normalized):
if token.endswith(".service") and normalized not in _GENERIC_SERVICE_STOPWORDS:
candidates.append(normalized)
if token in {"service", "daemon", "unit"}:
for neighbor in (idx - 1, idx + 1):
if neighbor < 0 or neighbor >= len(tokens):
continue
candidate = tokens[neighbor]
if candidate.endswith(".service"):
candidate = candidate[:-8]
if candidate in _GENERIC_SERVICE_STOPWORDS:
continue
if _is_safe_service_name(candidate):
candidates.append(candidate)
deduped: list[str] = []
seen: set[str] = set()
for item in candidates:
if item in seen:
continue
seen.add(item)
deduped.append(item)
return deduped
def _is_safe_service_name(name: str) -> bool:
"""Return True when *name* is safe to interpolate into read-only commands."""
if len(name) < 2 or len(name) > 64:
return False
return re.fullmatch(r"[a-z0-9_.@-]+", name) is not None
def _service_package_candidates(service: str) -> list[str]:
"""Return package names to probe for *service* presence."""
if service in _SERVICE_PACKAGES:
return _SERVICE_PACKAGES[service]
return [service]

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
from tai.collectors import CollectionReport
from tai.rag_retriever import Chunk
from tai.runbook_store import RunbookChunk
from tai.session_store import PastSession
_SYSTEM_PROMPT = """\
You are an expert Linux systems administrator and troubleshooting assistant.
@@ -21,7 +22,8 @@ Important rules:
- If a command shows "could not be executed (SSH error)" it means the remote host blocked or
rejected that specific command — it is not evidence about the service or system state.
- If service presence checks show a unit, binary, package, or config is missing, treat that as
evidence the component may be absent or not installed, not as proof that the component is broken.
evidence the component may be absent or not installed, not as proof that the
component is broken.
- If there is not enough data to diagnose the issue, say so plainly and list exactly what
additional commands or log files would be needed.
- Keep the response short. Skip sections that have nothing useful to say.
@@ -33,6 +35,7 @@ Important rules:
_MAX_RUNBOOK_CHARS = 500
_MAX_DIAGNOSTIC_CHUNK_CHARS = 700
_MAX_SESSION_SUMMARY_CHARS = 500
def build_system_prompt() -> str:
@@ -66,11 +69,34 @@ def _format_diagnostic_chunk(content: str) -> str:
return text[:_MAX_DIAGNOSTIC_CHUNK_CHARS].rstrip() + "\n...[truncated diagnostic context]"
def _format_session_context(past_sessions: list[PastSession]) -> str:
"""Format similar prior sessions as compact grounding context."""
lines: list[str] = ["## Similar prior sessions\n"]
lines.append(
"The following completed sessions were semantically similar. "
"Use them as historical hints, but prioritize current diagnostics if they conflict.\n"
)
for sess in past_sessions:
summary = sess.summary.strip()
if len(summary) > _MAX_SESSION_SUMMARY_CHARS:
summary = (
summary[:_MAX_SESSION_SUMMARY_CHARS].rstrip()
+ "\n...[truncated session summary]"
)
lines.append(f"### Session: {sess.session_id} (host={sess.host})\n")
lines.append(f"**Issue:** {sess.issue}")
lines.append("")
lines.append(summary)
lines.append("")
return "\n".join(lines)
def build_user_message(
issue: str,
report: CollectionReport,
*,
runbook_chunks: list[RunbookChunk] | None = None,
past_sessions: list[PastSession] | None = None,
) -> str:
"""Format *issue* and *report* into the user message sent to the AI."""
lines: list[str] = []
@@ -81,6 +107,9 @@ def build_user_message(
if runbook_chunks:
lines.append(_format_runbook_context(runbook_chunks))
if past_sessions:
lines.append(_format_session_context(past_sessions))
lines.append("## Collected diagnostics\n")
skipped: list[str] = []
@@ -126,9 +155,15 @@ def build_followup_message(
prior_questions: list[str],
*,
runbook_chunks: list[RunbookChunk] | None = None,
past_sessions: list[PastSession] | None = None,
) -> str:
"""Build a grounded follow-up message that re-anchors to diagnostics each turn."""
base = build_user_message(issue, report, runbook_chunks=runbook_chunks)
base = build_user_message(
issue,
report,
runbook_chunks=runbook_chunks,
past_sessions=past_sessions,
)
lines: list[str] = [base, "## Follow-up"]
if prior_questions:
@@ -157,6 +192,7 @@ def build_message_with_chunks(
prior_questions: list[str],
*,
runbook_chunks: list[RunbookChunk] | None = None,
past_sessions: list[PastSession] | None = None,
) -> str:
"""Build a follow-up message using only semantically retrieved diagnostic chunks.
@@ -178,6 +214,9 @@ def build_message_with_chunks(
if runbook_chunks:
lines.append(_format_runbook_context(runbook_chunks))
if past_sessions:
lines.append(_format_session_context(past_sessions))
lines.append("## Follow-up")
if prior_questions:
@@ -204,6 +243,7 @@ def build_analysis_message_with_chunks(
chunks: list[Chunk],
*,
runbook_chunks: list[RunbookChunk] | None = None,
past_sessions: list[PastSession] | None = None,
) -> str:
"""Build an initial analysis message from retrieved diagnostic chunks."""
lines: list[str] = []
@@ -213,6 +253,9 @@ def build_analysis_message_with_chunks(
if runbook_chunks:
lines.append(_format_runbook_context(runbook_chunks))
if past_sessions:
lines.append(_format_session_context(past_sessions))
lines.append("## Most relevant diagnostics (retrieved by semantic similarity)\n")
for chunk in chunks:
lines.append(f"### {chunk.name}\n")

View File

@@ -17,7 +17,7 @@ from __future__ import annotations
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from tai.ai_client import AIClient
@@ -123,7 +123,7 @@ class RunbookStore:
self._client = chromadb.PersistentClient(path=str(path))
else:
self._client = chromadb.PersistentClient(path=str(path), settings=settings)
self._collection = self._client.get_or_create_collection(
self._collection: Any = self._client.get_or_create_collection(
name=_COLLECTION_NAME,
metadata={"hnsw:space": "cosine"},
)
@@ -241,11 +241,14 @@ class RunbookStore:
return []
results = self._collection.get(include=["metadatas"])
metas = results.get("metadatas") or []
return [dict(m) for m in metas]
entries: list[dict[str, str]] = []
for meta in metas:
entries.append({str(k): str(v) for k, v in dict(meta).items()})
return entries
def count(self) -> int:
"""Return the number of indexed runbook documents."""
return self._collection.count()
return int(self._collection.count())
# ---------------------------------------------------------------------------

159
src/tai/session_store.py Normal file
View File

@@ -0,0 +1,159 @@
"""Persistent session memory store (Tier 4) backed by ChromaDB."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from tai.ai_client import AIClient
DEFAULT_SESSION_STORE_PATH = "~/.tai/sessions"
_COLLECTION_NAME = "tai_sessions"
@dataclass(slots=True)
class PastSession:
"""A retrieved prior session summary for prompt grounding."""
session_id: str
host: str
issue: str
summary: str
class SessionStore:
"""ChromaDB-backed persistent memory for prior troubleshooting sessions."""
def __init__(self, store_path: str | Path = DEFAULT_SESSION_STORE_PATH) -> None:
import chromadb
path = Path(store_path).expanduser().resolve()
path.mkdir(parents=True, exist_ok=True)
settings = None
try:
from chromadb.config import Settings
settings = Settings(
anonymized_telemetry=False,
chroma_product_telemetry_impl="tai.chroma_telemetry.NoOpProductTelemetryClient",
chroma_telemetry_impl="tai.chroma_telemetry.NoOpProductTelemetryClient",
)
except (ImportError, ModuleNotFoundError):
settings = None
if settings is None:
self._client = chromadb.PersistentClient(path=str(path))
else:
self._client = chromadb.PersistentClient(path=str(path), settings=settings)
self._collection: Any = self._client.get_or_create_collection(
name=_COLLECTION_NAME,
metadata={"hnsw:space": "cosine"},
)
def count(self) -> int:
"""Return number of indexed session summaries."""
return int(self._collection.count())
def index_session(self, host: str, issue: str, summary: str, ai: AIClient) -> str:
"""Embed and upsert one session summary into persistent storage."""
session_id = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ")
embed_text = _build_embed_text(host=host, issue=issue, summary=summary)
embedding = ai.embed(embed_text)
self._collection.upsert(
ids=[session_id],
documents=[summary.strip()],
embeddings=[embedding],
metadatas=[{"host": host, "issue": issue}],
)
return session_id
def query(self, question: str, host: str, ai: AIClient, *, top_k: int = 2) -> list[PastSession]:
"""Return top-k semantically similar sessions for this host and question."""
if self._collection.count() == 0:
return []
q_embedding = ai.embed(f"host: {host}\nquestion: {question}")
results = self._collection.query(
query_embeddings=[q_embedding],
n_results=min(top_k, self._collection.count()),
include=["documents", "metadatas"],
)
sessions: list[PastSession] = []
ids = results.get("ids") or []
docs = results.get("documents") or []
metas = results.get("metadatas") or []
for id_list, doc_list, meta_list in zip(ids, docs, metas, strict=False):
for sid, doc, meta in zip(id_list, doc_list, meta_list, strict=False):
sessions.append(
PastSession(
session_id=str(sid),
host=str(meta.get("host", "")),
issue=str(meta.get("issue", "")),
summary=str(doc),
)
)
return sessions
def list_recent(self, *, host: str | None = None, limit: int = 20) -> list[PastSession]:
"""Return recent indexed sessions, optionally filtered by host."""
if limit < 1:
raise ValueError("limit must be >= 1")
count = self._collection.count()
if count == 0:
return []
results = self._collection.get(
include=["documents", "metadatas"],
limit=min(limit, count),
)
ids = results.get("ids") or []
docs = results.get("documents") or []
metas = results.get("metadatas") or []
sessions: list[PastSession] = []
for sid, doc, meta in zip(ids, docs, metas, strict=False):
sessions.append(
PastSession(
session_id=str(sid),
host=str(meta.get("host", "")),
issue=str(meta.get("issue", "")),
summary=str(doc),
)
)
if host:
host_norm = host.strip().lower()
sessions = [s for s in sessions if s.host.lower() == host_norm]
sessions.sort(key=lambda s: s.session_id, reverse=True)
return sessions[:limit]
def search_keyword(
self,
keyword: str,
*,
host: str | None = None,
limit: int = 20,
) -> list[PastSession]:
"""Return recent sessions matching a keyword in issue or summary text."""
term = keyword.strip().lower()
if not term:
return self.list_recent(host=host, limit=limit)
all_recent = self.list_recent(host=host, limit=max(limit, self.count()))
filtered = [
sess
for sess in all_recent
if term in sess.issue.lower() or term in sess.summary.lower()
]
return filtered[:limit]
def _build_embed_text(*, host: str, issue: str, summary: str) -> str:
"""Build embedding text with host/issue context and summary excerpt."""
excerpt = summary.strip()[:1000]
return f"host: {host}\nissue: {issue}\nsummary:\n{excerpt}"

View File

@@ -51,6 +51,7 @@ class SSHClient:
_READ_ONLY_COMMANDS = {
"cat",
"dmesg",
"dpkg-query",
"df",
"du",
"find",
@@ -61,6 +62,7 @@ class SSHClient:
"journalctl",
"ls",
"netstat",
"rpm",
"sed",
"ss",
"stat",

View File

@@ -5,6 +5,7 @@ from unittest.mock import MagicMock, patch
from tai.ai_client import DEFAULT_AI_HOST, DEFAULT_MODEL, AIClient, AIConfig
from tai.collectors import CollectedItem, CollectionReport
from tai.prompt_builder import build_followup_message, build_system_prompt, build_user_message
from tai.session_store import PastSession
from tai.ssh_client import SSHCommandResult
# ---------------------------------------------------------------------------
@@ -221,6 +222,24 @@ def test_build_user_message_handles_no_output() -> None:
assert "no output" in msg
def test_build_user_message_includes_prior_session_context() -> None:
report = _make_report([("kernel", "uname -a", 0, "Linux web01", "")])
msg = build_user_message(
"sssd broken",
report,
past_sessions=[
PastSession(
session_id="20260506T120000Z",
host="web01",
issue="sssd broken",
summary="Root cause was missing sssd package.",
)
],
)
assert "Similar prior sessions" in msg
assert "missing sssd package" in msg
def test_build_followup_message_includes_question_context() -> None:
report = _make_report([("kernel", "uname -a", 0, "Linux web01", "")])
msg = build_followup_message(

View File

@@ -1,3 +1,4 @@
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
@@ -338,3 +339,108 @@ def test_interactive_rag_debug_prints_retrieval_scores(monkeypatch) -> None: #
assert result.exit_code == 0
assert "RAG retrieve:" in result.stdout
def test_history_command_lists_sessions(monkeypatch) -> None: # type: ignore[no-untyped-def]
class FakeStore:
def __init__(self, _path: str) -> None:
pass
def list_recent(self, *, host: str | None = None, limit: int = 20):
del limit
if host == "web01":
return [
SimpleNamespace(
session_id="20260507T120000Z",
host="web01",
issue="nginx down",
summary="Root cause: bad config",
)
]
return []
monkeypatch.setattr("tai.cli.SessionStore", FakeStore)
runner = CliRunner()
result = runner.invoke(
app,
["history", "--session-memory", "~/.tai/sessions", "--host", "web01"],
)
assert result.exit_code == 0
assert "session(s)" in result.stdout
assert "20260507T120000Z" in result.stdout
def test_history_command_exports_markdown(monkeypatch, tmp_path: Path) -> None: # type: ignore[no-untyped-def]
class FakeStore:
def __init__(self, _path: str) -> None:
pass
def list_recent(self, *, host: str | None = None, limit: int = 20):
del host, limit
return [
SimpleNamespace(
session_id="20260507T120000Z",
host="web01",
issue="nginx down",
summary="Root cause: bad config",
)
]
monkeypatch.setattr("tai.cli.SessionStore", FakeStore)
export_path = tmp_path / "history.md"
runner = CliRunner()
result = runner.invoke(
app,
["history", "--session-memory", "~/.tai/sessions", "--export", str(export_path)],
)
assert result.exit_code == 0
assert "Exported" in result.stdout
text = export_path.read_text(encoding="utf-8")
assert "# tai session history" in text
assert "nginx down" in text
def test_interactive_history_without_store_shows_hint(monkeypatch) -> None: # type: ignore[no-untyped-def]
_mock_session(monkeypatch)
async def fake_collect_from_plan(_session, _plan) -> CollectionReport: # type: ignore[no-untyped-def]
return CollectionReport(
host="ssh.archflux.net",
items=[
CollectedItem(
name="kernel",
result=SSHCommandResult(
command="uname -a",
exit_code=0,
stdout="Linux test",
stderr="",
),
),
],
)
commands = iter(["/history", "/quit"])
monkeypatch.setattr("tai.cli.collect_from_plan", fake_collect_from_plan)
monkeypatch.setattr("tai.cli.console.input", lambda _prompt: next(commands))
monkeypatch.setattr("tai.cli._stdin_is_tty", lambda: True)
runner = CliRunner()
result = runner.invoke(
app,
[
"run", "apache failed",
"--host",
"ssh.archflux.net",
"--port",
"5566",
"--no-probe",
"--interactive",
],
)
assert result.exit_code == 0
assert "Session memory is disabled" in result.stdout

View File

@@ -107,8 +107,12 @@ def test_sssd_in_issue_adds_presence_service_and_config_commands() -> None:
assert "binary-sssd-1" in names
assert "service-sssd" in names
assert "journal-sssd" in names
assert "package-rpm-sssd-1" in names
assert "package-dpkg-sssd-1" in names
assert any("cat /etc/sssd/sssd.conf" in c for c in cmds)
assert any("ls -l /usr/sbin/sssd" in c for c in cmds)
assert any("rpm -q sssd" in c for c in cmds)
assert any("dpkg-query -W sssd" in c for c in cmds)
assert any("list-unit-files sssd.service" in c for c in cmds)
@@ -119,8 +123,12 @@ def test_docker_presence_probe_checks_package_and_binary() -> None:
assert "unit-file-docker" in names
assert "binary-docker-1" in names
assert "binary-docker-2" in names
assert "package-rpm-docker-1" in names
assert "package-dpkg-docker-1" in names
assert any("ls -l /usr/bin/docker" in c for c in cmds)
assert any("ls -l /usr/bin/dockerd" in c for c in cmds)
assert any("rpm -q docker" in c for c in cmds)
assert any("dpkg-query -W docker" in c for c in cmds)
def test_unknown_service_name_no_config_cat() -> None:
@@ -183,6 +191,16 @@ def test_extract_services_case_insensitive() -> None:
assert "nginx" in _extract_services("NGINX failed")
def test_extract_services_detects_generic_service_name() -> None:
services = _extract_services("myweirdapp service keeps failing")
assert "myweirdapp" in services
def test_extract_services_detects_dot_service_pattern() -> None:
services = _extract_services("please check foobar.service on this host")
assert "foobar" in services
# ---------------------------------------------------------------------------
# Plan length sanity
# ---------------------------------------------------------------------------

129
tests/test_session_store.py Normal file
View File

@@ -0,0 +1,129 @@
"""Tests for session_store with mocked ChromaDB."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock, patch
from tai.session_store import PastSession, SessionStore, _build_embed_text
def _make_chromadb_mock() -> MagicMock:
collection = MagicMock()
collection.count.return_value = 0
client = MagicMock()
client.get_or_create_collection.return_value = collection
chroma_mod = MagicMock()
chroma_mod.PersistentClient.return_value = client
return chroma_mod
def _make_ai_mock(embedding: list[float] | None = None) -> MagicMock:
ai = MagicMock()
ai.embed.return_value = embedding or [0.1, 0.2, 0.3]
return ai
def test_build_embed_text_contains_host_issue_and_summary() -> None:
text = _build_embed_text(host="web01", issue="sssd broken", summary="Unit missing")
assert "host: web01" in text
assert "issue: sssd broken" in text
assert "Unit missing" in text
def test_index_session_upserts_with_metadata(tmp_path: Path) -> None:
chroma_mock = _make_chromadb_mock()
collection = chroma_mock.PersistentClient.return_value.get_or_create_collection.return_value
ai = _make_ai_mock()
with patch.dict("sys.modules", {"chromadb": chroma_mock}):
store = SessionStore(tmp_path / "store")
session_id = store.index_session("web01", "sssd broken", "summary text", ai)
assert session_id
collection.upsert.assert_called_once()
args = collection.upsert.call_args.kwargs
assert args["metadatas"][0]["host"] == "web01"
assert args["metadatas"][0]["issue"] == "sssd broken"
def test_query_returns_empty_when_no_docs(tmp_path: Path) -> None:
chroma_mock = _make_chromadb_mock()
ai = _make_ai_mock()
with patch.dict("sys.modules", {"chromadb": chroma_mock}):
store = SessionStore(tmp_path / "store")
results = store.query("why sssd", "web01", ai)
assert results == []
def test_query_returns_past_sessions(tmp_path: Path) -> None:
chroma_mock = _make_chromadb_mock()
collection = chroma_mock.PersistentClient.return_value.get_or_create_collection.return_value
collection.count.return_value = 1
collection.query.return_value = {
"ids": [["20260506T120000Z"]],
"documents": [["Root cause: package missing"]],
"metadatas": [[{"host": "web01", "issue": "sssd broken"}]],
}
ai = _make_ai_mock()
with patch.dict("sys.modules", {"chromadb": chroma_mock}):
store = SessionStore(tmp_path / "store")
results = store.query("sssd issue", "web01", ai)
assert len(results) == 1
assert isinstance(results[0], PastSession)
assert results[0].host == "web01"
assert "package missing" in results[0].summary
def test_list_recent_returns_sessions_sorted_desc(tmp_path: Path) -> None:
chroma_mock = _make_chromadb_mock()
collection = chroma_mock.PersistentClient.return_value.get_or_create_collection.return_value
collection.count.return_value = 3
collection.get.return_value = {
"ids": ["20260506T120000Z", "20260507T120000Z", "20260505T120000Z"],
"documents": ["older", "newer", "oldest"],
"metadatas": [
{"host": "web01", "issue": "i1"},
{"host": "web01", "issue": "i2"},
{"host": "db01", "issue": "i3"},
],
}
with patch.dict("sys.modules", {"chromadb": chroma_mock}):
store = SessionStore(tmp_path / "store")
results = store.list_recent(limit=2)
assert len(results) == 2
assert results[0].session_id == "20260507T120000Z"
assert results[1].session_id == "20260506T120000Z"
def test_search_keyword_filters_by_term_and_host(tmp_path: Path) -> None:
chroma_mock = _make_chromadb_mock()
collection = chroma_mock.PersistentClient.return_value.get_or_create_collection.return_value
collection.count.return_value = 3
collection.get.return_value = {
"ids": ["20260505T120000Z", "20260506T120000Z", "20260507T120000Z"],
"documents": [
"Root cause: nginx config typo",
"Root cause: package missing",
"Root cause: nginx port conflict",
],
"metadatas": [
{"host": "web01", "issue": "nginx fails"},
{"host": "web01", "issue": "sssd fails"},
{"host": "db01", "issue": "nginx start failed"},
],
}
with patch.dict("sys.modules", {"chromadb": chroma_mock}):
store = SessionStore(tmp_path / "store")
results = store.search_keyword("nginx", host="web01", limit=5)
assert len(results) == 1
assert results[0].host == "web01"
assert "nginx" in results[0].issue.lower()

View File

@@ -82,6 +82,8 @@ def test_allows_expected_read_only_commands() -> None:
"systemctl status apache2",
"cat /etc/hosts",
"ss -lntp",
"rpm -q sssd",
"dpkg-query -W sssd",
]:
client.validate_read_only_command(command)