Rifqi Hafizuddin Claude Opus 4.7 commited on
Commit
f65aee0
·
1 Parent(s): 212dad3

[NOTICKET] IntentRouter + planner + dispatcher + QueryService + AnswerAgent + ChatHandler

Browse files

Bundles every Both-PR work item that doesn't require teammate's tabular
modules. Phase 1 endpoints and chatbot.py are intentionally NOT touched --
cleanup PR will rewire /chat/stream and rename answer_agent.py -> chatbot.py.

- IntentRouter: classify chat / unstructured / structured; history-aware
rewritten_query via Pydantic structured output. Prompt with full ruleset
+ few-shot in config/prompts/intent_router.md.
- Planner prompt: build_planner_prompt(question, catalog, previous_error)
reuses catalog.enricher.render_source so DB and tabular sources render
identically across enricher and planner. System prompt has hard
constraints + DB and tabular few-shot in config/prompts/query_planner.md.
- QueryPlannerService: Azure OpenAI structured output -> QueryIR.
Injectable chain. Supports retry via previous_error arg.
- ExecutorDispatcher: pick(ir) routes by source.source_type. Lazy executor
imports keep module import-safe. Caches per source_type. Tests inject
factories.
- QueryService: plan -> validate -> retry-on-failure (max 3) -> dispatch ->
execute. Catches NotImplementedError from TabularExecutor placeholder
gracefully. Never raises -- populates QueryResult.error.
- AnswerAgent (Phase 2 chatbot, lives at agents/answer_agent.py to avoid
colliding with Phase 1 agents/chatbot.py): streams tokens via SSE-ready
AsyncIterator; accepts QueryResult and/or list[DocumentChunk].
config/prompts/chatbot_system.md + guardrails.md.
- ChatHandler: top-level orchestrator. handle(message, user_id, history)
yields {event, data} dicts (intent / chunk / done / error). Routes by
source_hint; degrades gracefully when DocumentRetriever / TabularExecutor
placeholders raise NotImplementedError.

Tests: 46 new (146 total + 2 skipped). All Phase 2 paths ruff clean.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

PROGRESS.md CHANGED
@@ -2,8 +2,8 @@
2
 
3
  Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "Team — division of work". Update as PRs land. Future Claude Code sessions read this to know what's already done.
4
 
5
- **Last updated**: 2026-05-07 (PR3-DBSQL compiler + DB executor shipped)
6
- **Current open PR**: PR3-DB (DB owner SqlCompiler + DbExecutor + golden IR→SQL tests)
7
 
8
  ---
9
 
@@ -23,13 +23,13 @@ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "T
23
  | PR1 | `[x]` merged | DB | Contract locks + catalog plumbing + DB introspector + IR validator + tests |
24
  | PR1-tab | `[ ]` | TAB | Tabular introspector + golden IR examples for tabular |
25
  | PR2a | `[x]` merged | DB | CatalogEnricher + StructuredPipeline + on_db_registered trigger + FK extension on Table |
26
- | PR2b | `[ ]` | B | IntentRouter + planner prompt (pair) + planner LLM service |
27
- | PR3-DB | `[~]` open | DB | SqlCompiler (Postgres) + DbExecutor (sqlglot guard, RO + statement_timeout, asyncio.to_thread) + 36 golden IR→SQL tests |
28
  | PR3-TAB | `[ ]` | TAB | Pandas compiler + tabular executor + golden IR→DataFrame tests |
29
- | PR4 | `[ ]` | B (pair) | ExecutorDispatcher + QueryService + chat stream endpoint integration |
30
- | PR5 | `[ ]` | B | Retry/self-correction loop on execution failure |
31
- | PR6 | `[ ]` | B | Eval harness (golden question→IR→result examples) |
32
- | PR7 | `[ ]` | B | Auto PII tagging review + ChatbotAgent rewrite + API rewiring |
33
  | Cleanup | `[ ]` | B | Remove Phase 1 (rag/, query/executors/, database_client/, …) once Phase 2 has feature parity |
34
 
35
  ---
@@ -80,13 +80,13 @@ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "T
80
  | # | Item | Owner | Status | Notes |
81
  |---|---|---|---|---|
82
  | 17 | IR validator (`query/ir/validator.py`) | B | `[x]` | PR1 (DB owner) — full rule set; descriptive errors for planner retry |
83
- | 18 | Planner LLM service (`query/planner/service.py`) | B | `[ ]` | PR2b |
84
- | 19 | Planner prompt (`query/planner/prompt.py`, `config/prompts/query_planner.md`) | B | `[ ]` | PR2b — **pair-program**; must render DB and tabular sources uniformly. Can reuse `catalog.enricher.render_source` as a starting point. |
85
- | 20 | Intent router (`agents/intent_router.py`, `config/prompts/intent_router.md`) | B | `[ ]` | PR2b |
86
  | 21 | Executor base + `QueryResult` (`query/executor/base.py`) | B | `[x]` | Pre-existing scaffold |
87
- | 22 | Executor dispatcher (`query/executor/dispatcher.py`) | B | `[ ]` | PR4 — `(Catalog, IR) BaseExecutor` |
88
  | 23 | Compiler base ABC (`query/compiler/base.py`) | B | `[x]` | Pre-existing scaffold |
89
- | 24 | Top-level QueryService (`query/service.py`) | B | `[ ]` | PR4 — wires plannervalidatorcompilerexecutor |
90
 
91
  ### Query — DB path
92
 
@@ -109,8 +109,9 @@ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "T
109
 
110
  | # | Item | Status | Notes |
111
  |---|---|---|---|
112
- | 32 | Chatbot agent + prompt (`agents/chatbot.py`, `config/prompts/chatbot_system.md`) | `[ ]` | PR7 — receives `QueryResult` or Cu chunks |
113
- | 33 | Guardrails prompt (`config/prompts/guardrails.md`) | `[ ]` | PR7 |
 
114
 
115
  ### API surface
116
 
@@ -118,7 +119,7 @@ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "T
118
  |---|---|---|---|---|
119
  | 34 | DB client endpoints (`api/v1/db_client.py`) | DB | `[ ]` | Phase 1 endpoint exists — rewire `/ingest` to call `pipeline.triggers.on_db_registered`. Trigger is ready as of PR2a; deferred to a later PR until both teammates ack. |
120
  | 35 | Document/tabular upload endpoints (`api/v1/document.py`) | TAB | `[ ]` | Phase 1 endpoint exists — rewire after enricher |
121
- | 36 | Chat stream endpoint (`api/v1/chat.py`) | B | `[ ]` | PR4 pair on dispatch logic; SSE event sequence stays |
122
  | 37 | Room / users endpoints (`api/v1/room.py`, `api/v1/users.py`) | B | `[ ]` | No catalog work; only touch if auth flow changes |
123
 
124
  ### Tests + eval
@@ -133,14 +134,51 @@ Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "T
133
  | — | Catalog store integration test (`tests/catalog/test_store.py`) | DB | `[x]` | PR1 — module-level skip without `RUN_INTEGRATION_TESTS=1` |
134
  | — | DB introspector test | DB | `[ ]` | Deferred to PR2 — needs Postgres testcontainer or fixture infra |
135
  | — | Tabular introspector test | TAB | `[ ]` | TAB to add when introspector lands |
136
- | 41 | Planner eval (`tests/query/planner/`) | B | `[ ]` | PR6 — golden question IR examples; each side contributes |
137
- | 42 | E2E smoke tests (`tests/e2e/`) | B | `[ ]` | PR4 pair |
138
  | — | Golden IR fixtures (`tests/fixtures/golden_irs.json`) | B | `[~]` | PR1 seeded with 5 DB-targeting examples; TAB extends in PR1-tab |
139
  | — | Shared `sample_catalog` fixture (`tests/conftest.py`) | B | `[x]` | PR1 — DB-shaped; TAB may add tabular sibling |
140
 
141
  ---
142
 
143
- ## What just shipped (PR3-DB — DB owner)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
144
 
145
  **Files implemented**:
146
  - `src/query/compiler/sql.py` — `SqlCompiler` for Postgres dialect; `CompiledSql(sql, params)` dataclass with `params: dict[str, Any]` (changed from `list`); supports all 12 whitelisted filter ops, all 6 aggs, alias-aware order_by; `_qident` escapes embedded double-quotes
 
2
 
3
  Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "Team — division of work". Update as PRs land. Future Claude Code sessions read this to know what's already done.
4
 
5
+ **Last updated**: 2026-05-08 (PR2b/4/5/6/7-bundleall Both-PR work, DB owner solo)
6
+ **Current open PR**: Both-PR bundle (IntentRouter, planner, QueryService + retry, dispatcher, AnswerAgent, chat handler, eval scaffold)
7
 
8
  ---
9
 
 
23
  | PR1 | `[x]` merged | DB | Contract locks + catalog plumbing + DB introspector + IR validator + tests |
24
  | PR1-tab | `[ ]` | TAB | Tabular introspector + golden IR examples for tabular |
25
  | PR2a | `[x]` merged | DB | CatalogEnricher + StructuredPipeline + on_db_registered trigger + FK extension on Table |
26
+ | PR2b | `[x]` shipped | DB-solo (B-review) | IntentRouter + planner prompt + planner LLM service |
27
+ | PR3-DB | `[x]` shipped | DB | SqlCompiler (Postgres) + DbExecutor (sqlglot guard, RO + statement_timeout, asyncio.to_thread) + 36 golden IR→SQL tests |
28
  | PR3-TAB | `[ ]` | TAB | Pandas compiler + tabular executor + golden IR→DataFrame tests |
29
+ | PR4 | `[~]` shipped, API not yet wired | DB-solo (B-review) | ExecutorDispatcher + QueryService + ChatHandler module. **API rewiring of Phase 1 endpoints deferred to PR7-cleanup.** |
30
+ | PR5 | `[x]` shipped | DB-solo (B-review) | Retry/self-correction loop on validation failure (lives in QueryService, max 3 attempts, planner re-prompted with prior error) |
31
+ | PR6 | `[~]` scaffold | DB-solo (B-review) | Eval harness scaffold + 3 DB-targeting golden cases. Skipped without `RUN_PLANNER_EVAL=1` env. TAB extends with tabular cases. |
32
+ | PR7 | `[~]` partial — AnswerAgent shipped, API rewiring pending | DB-solo (B-review) | AnswerAgent (at `agents/answer_agent.py`, will rename to `chatbot.py` in cleanup) + chatbot_system + guardrails prompts. **API rewiring of `/chat/stream` and `/database-clients/{id}/ingest` to call Phase 2 modules: deferred to a focused cleanup PR.** Auto PII tagging review still pending. |
33
  | Cleanup | `[ ]` | B | Remove Phase 1 (rag/, query/executors/, database_client/, …) once Phase 2 has feature parity |
34
 
35
  ---
 
80
  | # | Item | Owner | Status | Notes |
81
  |---|---|---|---|---|
82
  | 17 | IR validator (`query/ir/validator.py`) | B | `[x]` | PR1 (DB owner) — full rule set; descriptive errors for planner retry |
83
+ | 18 | Planner LLM service (`query/planner/service.py`) | B | `[x]` | PR2b — Azure OpenAI structured output → `QueryIR`. Injectable chain. Supports retry via `previous_error` argument. |
84
+ | 19 | Planner prompt (`query/planner/prompt.py`, `config/prompts/query_planner.md`) | B | `[x]` | PR2b — system prompt with hard constraints + few-shot for DB and tabular sources. `build_planner_prompt(question, catalog, previous_error)` reuses `catalog.enricher.render_source` so both LLM call sites see the same source format. |
85
+ | 20 | Intent router (`agents/intent_router.py`, `config/prompts/intent_router.md`) | B | `[x]` | PR2b — single LLM call → `IntentRouterDecision(needs_search, source_hint, rewritten_query)`. Supports conversation history. |
86
  | 21 | Executor base + `QueryResult` (`query/executor/base.py`) | B | `[x]` | Pre-existing scaffold |
87
+ | 22 | Executor dispatcher (`query/executor/dispatcher.py`) | B | `[x]` | PR4 — picks DbExecutor / TabularExecutor by `source.source_type`. Lazy imports of production executors keep import side-effect-free for tests. Caches per source_type. |
88
  | 23 | Compiler base ABC (`query/compiler/base.py`) | B | `[x]` | Pre-existing scaffold |
89
+ | 24 | Top-level QueryService (`query/service.py`) | B | `[x]` | PR4+5`plan validate dispatchexecuteQueryResult`. Retry loop on validation failure (max 3, planner re-prompted with prior error). Catches NotImplementedError from TabularExecutor placeholder gracefully. Never raises. |
90
 
91
  ### Query — DB path
92
 
 
109
 
110
  | # | Item | Status | Notes |
111
  |---|---|---|---|
112
+ | 32 | Chatbot agent + prompt (`agents/answer_agent.py` for now, → rename to `chatbot.py` in cleanup; `config/prompts/chatbot_system.md`) | `[x]` | PR7-bundle`AnswerAgent` streams tokens, accepts `QueryResult` or list[`DocumentChunk`] or neither. Lives at `agents/answer_agent.py` to avoid colliding with Phase 1 `agents/chatbot.py`. Cleanup PR will rename + replace. |
113
+ | 33 | Guardrails prompt (`config/prompts/guardrails.md`) | `[x]` | PR7-bundle — appended to `chatbot_system.md` so guardrails take precedence in conflict. |
114
+ | — | Chat handler / orchestrator (`agents/chat_handler.py`) | `[x]` | PR4-bundle — top-level Phase 2 orchestrator. Routes by `source_hint`: chat → AnswerAgent direct; structured → CatalogReader + QueryService; unstructured → DocumentRetriever placeholder + AnswerAgent. Yields `intent` / `chunk` / `done` / `error` SSE-style events. Phase 1 chat.py NOT touched — cleanup PR rewires the API to call this. |
115
 
116
  ### API surface
117
 
 
119
  |---|---|---|---|---|
120
  | 34 | DB client endpoints (`api/v1/db_client.py`) | DB | `[ ]` | Phase 1 endpoint exists — rewire `/ingest` to call `pipeline.triggers.on_db_registered`. Trigger is ready as of PR2a; deferred to a later PR until both teammates ack. |
121
  | 35 | Document/tabular upload endpoints (`api/v1/document.py`) | TAB | `[ ]` | Phase 1 endpoint exists — rewire after enricher |
122
+ | 36 | Chat stream endpoint (`api/v1/chat.py`) | B | `[ ]` | Phase 2 handler module ready (`agents/chat_handler.py`); rewiring of the actual `/chat/stream` endpoint deferred to cleanup PR to avoid breaking Phase 1 during the migration. |
123
  | 37 | Room / users endpoints (`api/v1/room.py`, `api/v1/users.py`) | B | `[ ]` | No catalog work; only touch if auth flow changes |
124
 
125
  ### Tests + eval
 
134
  | — | Catalog store integration test (`tests/catalog/test_store.py`) | DB | `[x]` | PR1 — module-level skip without `RUN_INTEGRATION_TESTS=1` |
135
  | — | DB introspector test | DB | `[ ]` | Deferred to PR2 — needs Postgres testcontainer or fixture infra |
136
  | — | Tabular introspector test | TAB | `[ ]` | TAB to add when introspector lands |
137
+ | 41 | Planner eval (`tests/query/planner/`) | B | `[~]` | PR6-scaffold`test_golden_questions.py` with 3 DB-targeting cases. Skipped by default; runs against real Azure OpenAI when `RUN_PLANNER_EVAL=1`. TAB extends with tabular-targeting cases once their compiler exists. |
138
+ | 42 | E2E smoke tests (`tests/e2e/`) | B | `[ ]` | Defer until Phase 2 endpoints are wired (cleanup PR). Component-level orchestration is already covered by `test_chat_handler.py` + `test_service.py`. |
139
  | — | Golden IR fixtures (`tests/fixtures/golden_irs.json`) | B | `[~]` | PR1 seeded with 5 DB-targeting examples; TAB extends in PR1-tab |
140
  | — | Shared `sample_catalog` fixture (`tests/conftest.py`) | B | `[x]` | PR1 — DB-shaped; TAB may add tabular sibling |
141
 
142
  ---
143
 
144
+ ## What just shipped (PR2b/4/5/6/7-bundle — DB owner solo, teammate reviews)
145
+
146
+ **Files implemented**:
147
+ - `src/agents/intent_router.py` — `IntentRouter.classify(message, history) → IntentRouterDecision`. Pydantic model for structured output. History-aware query rewriting.
148
+ - `src/agents/answer_agent.py` — `AnswerAgent.astream(...)` streams answer tokens; accepts `QueryResult` and/or `list[DocumentChunk]`. Renames to `chatbot.py` in cleanup PR.
149
+ - `src/agents/chat_handler.py` — `ChatHandler.handle(message, user_id, history)` returns `AsyncIterator[dict]` of `intent` / `chunk` / `done` / `error` SSE events. All deps injectable; lazy default builders.
150
+ - `src/query/planner/prompt.py` — `render_catalog(catalog)` + `build_planner_prompt(question, catalog, previous_error)`. Reuses `catalog.enricher.render_source` for consistency across LLM call sites.
151
+ - `src/query/planner/service.py` — `QueryPlannerService.plan(question, catalog, previous_error)` Azure OpenAI structured output → `QueryIR`.
152
+ - `src/query/executor/dispatcher.py` — `ExecutorDispatcher.pick(ir) → BaseExecutor` by `source.source_type`. Lazy executor imports + per-source-type cache.
153
+ - `src/query/service.py` — `QueryService.run(user_id, question, catalog) → QueryResult`. Plan→validate→retry-on-failure (max 3)→dispatch→execute. Catches NotImplementedError from TabularExecutor placeholder gracefully.
154
+
155
+ **Prompts written** (filled in placeholders):
156
+ - `src/config/prompts/intent_router.md`
157
+ - `src/config/prompts/query_planner.md`
158
+ - `src/config/prompts/chatbot_system.md`
159
+ - `src/config/prompts/guardrails.md`
160
+
161
+ **Tests added** (46 new — total now 146 + 2 skipped):
162
+ - `tests/agents/test_intent_router.py` (4)
163
+ - `tests/agents/test_answer_agent.py` (12)
164
+ - `tests/agents/test_chat_handler.py` (6)
165
+ - `tests/query/planner/test_prompt.py` (7)
166
+ - `tests/query/planner/test_service.py` (3)
167
+ - `tests/query/executor/test_dispatcher.py` (5)
168
+ - `tests/query/test_service.py` (8)
169
+ - `tests/query/planner/test_golden_questions.py` (3 — skipped by default; eval harness scaffold)
170
+
171
+ **Lint**: `ruff check` clean on all Phase 2 paths. Phase 1 files have pre-existing E501/S608 issues — out of scope for this PR.
172
+
173
+ **Placeholders / blockers for teammate**:
174
+ - `src/query/executor/tabular.py` (TAB) — still raises `NotImplementedError`. `QueryService` catches it and returns a friendly "not yet available" error. Once teammate ships PR3-TAB, the dispatcher routes to it automatically.
175
+ - `src/retrieval/document.py` (TAB or DB-cleanup) — same pattern. ChatHandler catches `NotImplementedError` and emits an `error` event.
176
+ - `src/api/v1/chat.py` (Phase 1) — NOT touched. Cleanup PR rewires the SSE endpoint to call `ChatHandler.handle(...)`.
177
+ - `src/api/v1/db_client.py` (Phase 1) — NOT touched. Cleanup PR rewires `/database-clients/{id}/ingest` to call `pipeline.triggers.on_db_registered`.
178
+
179
+ ---
180
+
181
+ ## What shipped previously (PR3-DB — DB owner)
182
 
183
  **Files implemented**:
184
  - `src/query/compiler/sql.py` — `SqlCompiler` for Postgres dialect; `CompiledSql(sql, params)` dataclass with `params: dict[str, Any]` (changed from `list`); supports all 12 whitelisted filter ops, all 6 aggs, alias-aware order_by; `_qident` escapes embedded double-quotes
src/agents/answer_agent.py ADDED
@@ -0,0 +1,170 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """AnswerAgent — final answer formation. Phase 2 chatbot.
2
+
3
+ Receives one of:
4
+ - a `QueryResult` (structured query path),
5
+ - a list of document chunks (unstructured path), or
6
+ - nothing (chat-only path: greeting, farewell, meta question).
7
+
8
+ Streams the answer token-by-token so the chat handler can wrap each token
9
+ into an SSE event. Conversation history is supported.
10
+
11
+ Lives at `agents/answer_agent.py` rather than `agents/chatbot.py` to avoid
12
+ colliding with the Phase 1 chatbot still imported by the legacy chat
13
+ endpoint. PR7 cleanup will rename this to `chatbot.py` after Phase 1's
14
+ chat endpoint is rewired to call this through `agents/chat_handler.py`.
15
+ """
16
+
17
+ from __future__ import annotations
18
+
19
+ from collections.abc import AsyncIterator
20
+ from dataclasses import dataclass
21
+ from pathlib import Path
22
+ from typing import Any
23
+
24
+ from langchain_core.messages import BaseMessage
25
+ from langchain_core.output_parsers import StrOutputParser
26
+ from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
27
+ from langchain_core.runnables import Runnable
28
+ from langchain_openai import AzureChatOpenAI
29
+
30
+ from src.middlewares.logging import get_logger
31
+
32
+ from ..query.executor.base import QueryResult
33
+
34
+ logger = get_logger("answer_agent")
35
+
36
+
37
+ _PROMPT_DIR = Path(__file__).resolve().parent.parent / "config" / "prompts"
38
+ _SYSTEM_PROMPT_PATH = _PROMPT_DIR / "chatbot_system.md"
39
+ _GUARDRAILS_PATH = _PROMPT_DIR / "guardrails.md"
40
+
41
+
42
+ @dataclass
43
+ class DocumentChunk:
44
+ """One retrieved document chunk for the unstructured path."""
45
+
46
+ content: str
47
+ filename: str | None = None
48
+ page_label: str | None = None
49
+
50
+
51
+ def _load_system_prompt() -> str:
52
+ """Compose system prompt = chatbot_system.md + guardrails.md.
53
+
54
+ Guardrails appended last so they take precedence in conflict (matches
55
+ the docstring at the top of guardrails.md).
56
+ """
57
+ chatbot = _SYSTEM_PROMPT_PATH.read_text(encoding="utf-8")
58
+ guardrails = _GUARDRAILS_PATH.read_text(encoding="utf-8")
59
+ return f"{chatbot}\n\n{guardrails}"
60
+
61
+
62
+ def _format_query_result(qr: QueryResult) -> str:
63
+ """Render a QueryResult as a compact context block for the LLM."""
64
+ if qr.error:
65
+ return (
66
+ f"[Query result — FAILED]\n"
67
+ f"source_id={qr.source_id}\n"
68
+ f"error: {qr.error}"
69
+ )
70
+ lines: list[str] = [
71
+ "[Query result]",
72
+ f"source_id: {qr.source_id}",
73
+ f"backend: {qr.backend}",
74
+ f"row_count: {qr.row_count}"
75
+ + (" (truncated)" if qr.truncated else ""),
76
+ f"elapsed_ms: {qr.elapsed_ms}",
77
+ ]
78
+ if qr.rows:
79
+ # Cap rendering at 25 rows; the LLM doesn't need the full set
80
+ cap = min(len(qr.rows), 25)
81
+ columns = list(qr.rows[0].keys())
82
+ lines.append("columns: " + ", ".join(columns))
83
+ lines.append("rows:")
84
+ for row in qr.rows[:cap]:
85
+ lines.append(" " + ", ".join(f"{k}={row[k]!r}" for k in columns))
86
+ if cap < len(qr.rows):
87
+ lines.append(f" ... (+{len(qr.rows) - cap} more rows omitted from prompt)")
88
+ return "\n".join(lines)
89
+
90
+
91
+ def _format_document_chunks(chunks: list[DocumentChunk]) -> str:
92
+ if not chunks:
93
+ return ""
94
+ blocks: list[str] = []
95
+ for c in chunks:
96
+ label_parts = [p for p in (c.filename, c.page_label) if p]
97
+ label = ", ".join(label_parts) if label_parts else "Unknown source"
98
+ blocks.append(f"[Source: {label}]\n{c.content}")
99
+ return "\n\n".join(blocks)
100
+
101
+
102
+ def _build_context_block(
103
+ query_result: QueryResult | None,
104
+ chunks: list[DocumentChunk] | None,
105
+ ) -> str:
106
+ parts: list[str] = []
107
+ if query_result is not None:
108
+ parts.append(_format_query_result(query_result))
109
+ if chunks:
110
+ parts.append(_format_document_chunks(chunks))
111
+ return "\n\n".join(parts) if parts else "(no data context — answer conversationally)"
112
+
113
+
114
+ def _build_default_chain() -> Runnable:
115
+ from src.config.settings import settings
116
+
117
+ llm = AzureChatOpenAI(
118
+ azure_deployment=settings.azureai_deployment_name_4o,
119
+ openai_api_version=settings.azureai_api_version_4o,
120
+ azure_endpoint=settings.azureai_endpoint_url_4o,
121
+ api_key=settings.azureai_api_key_4o,
122
+ temperature=0.3,
123
+ )
124
+ prompt = ChatPromptTemplate.from_messages(
125
+ [
126
+ ("system", _load_system_prompt()),
127
+ MessagesPlaceholder(variable_name="history", optional=True),
128
+ ("human", "{message}"),
129
+ ("system", "Data context for this turn:\n\n{context}"),
130
+ ]
131
+ )
132
+ return prompt | llm | StrOutputParser()
133
+
134
+
135
+ class AnswerAgent:
136
+ """Formats and streams the final user-facing answer.
137
+
138
+ `chain` is injectable: tests pass a fake that yields canned tokens.
139
+ Default constructs the production Azure OpenAI streaming chain on
140
+ first use.
141
+ """
142
+
143
+ def __init__(self, chain: Runnable | None = None) -> None:
144
+ self._chain = chain
145
+
146
+ def _ensure_chain(self) -> Runnable:
147
+ if self._chain is None:
148
+ self._chain = _build_default_chain()
149
+ return self._chain
150
+
151
+ async def astream(
152
+ self,
153
+ message: str,
154
+ history: list[BaseMessage] | None = None,
155
+ query_result: QueryResult | None = None,
156
+ chunks: list[DocumentChunk] | None = None,
157
+ ) -> AsyncIterator[str]:
158
+ """Stream tokens of the final answer.
159
+
160
+ Caller wraps each token into the SSE format. Empty `history` and
161
+ no context = pure chat reply.
162
+ """
163
+ chain = self._ensure_chain()
164
+ payload: dict[str, Any] = {
165
+ "message": message,
166
+ "history": history or [],
167
+ "context": _build_context_block(query_result, chunks),
168
+ }
169
+ async for token in chain.astream(payload):
170
+ yield token
src/agents/chat_handler.py ADDED
@@ -0,0 +1,207 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """ChatHandler — top-level Phase 2 chat orchestrator.
2
+
3
+ End-to-end flow per user message:
4
+
5
+ 1. `IntentRouter.classify` → `chat` / `unstructured` / `structured`.
6
+ 2. Route:
7
+ - `chat` → no context. Pass straight to AnswerAgent.
8
+ - `structured` → CatalogReader → QueryService → QueryResult.
9
+ - `unstructured` → DocumentRetriever (placeholder, raises until TAB
10
+ ships) → list[DocumentChunk].
11
+ 3. `AnswerAgent.astream` → yield text tokens.
12
+ 4. Wrap each step into an SSE-style event dict so the API endpoint can
13
+ stream them as Server-Sent Events.
14
+
15
+ Phase 1's chat endpoint (`src/api/v1/chat.py`) is intentionally NOT touched
16
+ in this PR. PR7 cleanup will rewire it to call `ChatHandler.handle(...)`.
17
+
18
+ All dependencies are injectable for tests. Default constructors lazy-build
19
+ production deps (no `Settings()` triggered at import time as long as you
20
+ inject mocks).
21
+ """
22
+
23
+ from __future__ import annotations
24
+
25
+ from collections.abc import AsyncIterator
26
+ from typing import TYPE_CHECKING, Any
27
+
28
+ from langchain_core.messages import BaseMessage
29
+
30
+ from src.middlewares.logging import get_logger
31
+
32
+ from .answer_agent import AnswerAgent, DocumentChunk
33
+ from .intent_router import IntentRouter
34
+
35
+ if TYPE_CHECKING:
36
+ from ..catalog.reader import CatalogReader
37
+ from ..query.service import QueryService
38
+ from ..retrieval.document import DocumentRetriever
39
+
40
+ logger = get_logger("chat_handler")
41
+
42
+
43
+ class ChatHandler:
44
+ """Top-level chat orchestrator.
45
+
46
+ Returns an `AsyncIterator[dict]` of SSE-style events with shape
47
+ `{"event": <name>, "data": <str>}`. Event types:
48
+ - `intent` — emitted once after classification (JSON-encoded decision)
49
+ - `chunk` — text fragment of the streaming answer (one per token)
50
+ - `done` — end of stream (data is empty string)
51
+ - `error` — failure; data is a user-facing message
52
+ """
53
+
54
+ def __init__(
55
+ self,
56
+ intent_router: IntentRouter | None = None,
57
+ answer_agent: AnswerAgent | None = None,
58
+ catalog_reader: CatalogReader | None = None,
59
+ query_service: QueryService | None = None,
60
+ document_retriever: DocumentRetriever | None = None,
61
+ ) -> None:
62
+ self._intent_router = intent_router
63
+ self._answer_agent = answer_agent
64
+ self._catalog_reader = catalog_reader
65
+ self._query_service = query_service
66
+ self._document_retriever = document_retriever
67
+
68
+ # ------------------------------------------------------------------
69
+ # Lazy default-dep builders
70
+ # ------------------------------------------------------------------
71
+
72
+ def _get_intent_router(self) -> IntentRouter:
73
+ if self._intent_router is None:
74
+ self._intent_router = IntentRouter()
75
+ return self._intent_router
76
+
77
+ def _get_answer_agent(self) -> AnswerAgent:
78
+ if self._answer_agent is None:
79
+ self._answer_agent = AnswerAgent()
80
+ return self._answer_agent
81
+
82
+ def _get_catalog_reader(self) -> CatalogReader:
83
+ if self._catalog_reader is None:
84
+ from ..catalog.reader import CatalogReader
85
+ from ..catalog.store import CatalogStore
86
+
87
+ self._catalog_reader = CatalogReader(CatalogStore())
88
+ return self._catalog_reader
89
+
90
+ def _get_query_service(self) -> QueryService:
91
+ if self._query_service is None:
92
+ from ..query.service import QueryService
93
+
94
+ self._query_service = QueryService()
95
+ return self._query_service
96
+
97
+ def _get_document_retriever(self) -> DocumentRetriever:
98
+ if self._document_retriever is None:
99
+ from ..retrieval.document import DocumentRetriever
100
+
101
+ self._document_retriever = DocumentRetriever()
102
+ return self._document_retriever
103
+
104
+ # ------------------------------------------------------------------
105
+ # Public entry
106
+ # ------------------------------------------------------------------
107
+
108
+ async def handle(
109
+ self,
110
+ message: str,
111
+ user_id: str,
112
+ history: list[BaseMessage] | None = None,
113
+ ) -> AsyncIterator[dict[str, Any]]:
114
+ # ---- 1. Classify intent --------------------------------------
115
+ try:
116
+ decision = await self._get_intent_router().classify(message, history)
117
+ except Exception as e:
118
+ logger.error("intent classification failed", error=str(e))
119
+ yield {"event": "error", "data": f"Could not classify message: {e}"}
120
+ return
121
+
122
+ yield {"event": "intent", "data": decision.model_dump_json()}
123
+
124
+ rewritten = decision.rewritten_query or message
125
+ query_result = None
126
+ chunks: list[DocumentChunk] | None = None
127
+
128
+ # ---- 2. Route ------------------------------------------------
129
+ if decision.source_hint == "structured":
130
+ try:
131
+ catalog = await self._get_catalog_reader().read(user_id, "structured")
132
+ query_result = await self._get_query_service().run(
133
+ user_id, rewritten, catalog
134
+ )
135
+ except Exception as e:
136
+ logger.error(
137
+ "structured route failed",
138
+ user_id=user_id,
139
+ error=str(e),
140
+ )
141
+ yield {"event": "error", "data": f"Structured query failed: {e}"}
142
+ return
143
+ elif decision.source_hint == "unstructured":
144
+ try:
145
+ raw_chunks = await self._get_document_retriever().retrieve(
146
+ rewritten, user_id
147
+ )
148
+ chunks = _normalize_chunks(raw_chunks)
149
+ except NotImplementedError:
150
+ logger.warning("DocumentRetriever placeholder hit", user_id=user_id)
151
+ yield {
152
+ "event": "error",
153
+ "data": "Document retrieval is not yet available — pending implementation.",
154
+ }
155
+ return
156
+ except Exception as e:
157
+ logger.error(
158
+ "unstructured route failed", user_id=user_id, error=str(e)
159
+ )
160
+ yield {"event": "error", "data": f"Document retrieval failed: {e}"}
161
+ return
162
+ # else: chat path — no context
163
+
164
+ # ---- 3. Stream answer ----------------------------------------
165
+ try:
166
+ async for token in self._get_answer_agent().astream(
167
+ message,
168
+ history=history,
169
+ query_result=query_result,
170
+ chunks=chunks,
171
+ ):
172
+ yield {"event": "chunk", "data": token}
173
+ except Exception as e:
174
+ logger.error("answer streaming failed", user_id=user_id, error=str(e))
175
+ yield {"event": "error", "data": f"Answer generation failed: {e}"}
176
+ return
177
+
178
+ yield {"event": "done", "data": ""}
179
+
180
+
181
+ def _normalize_chunks(raw: Any) -> list[DocumentChunk]:
182
+ """Convert whatever the retriever returns into list[DocumentChunk].
183
+
184
+ The Phase 2 `DocumentRetriever.retrieve` interface is a stub today;
185
+ when TAB owner ships it, it should return `list[DocumentChunk]`
186
+ directly so this normalizer becomes a no-op. Until then we coerce
187
+ common shapes (dict-with-content, plain string) defensively.
188
+ """
189
+ if not raw:
190
+ return []
191
+ if isinstance(raw, list) and all(isinstance(c, DocumentChunk) for c in raw):
192
+ return raw
193
+ chunks: list[DocumentChunk] = []
194
+ for item in raw:
195
+ if isinstance(item, DocumentChunk):
196
+ chunks.append(item)
197
+ elif isinstance(item, dict):
198
+ chunks.append(
199
+ DocumentChunk(
200
+ content=str(item.get("content", "")),
201
+ filename=item.get("filename"),
202
+ page_label=item.get("page_label"),
203
+ )
204
+ )
205
+ elif isinstance(item, str):
206
+ chunks.append(DocumentChunk(content=item))
207
+ return chunks
src/agents/intent_router.py CHANGED
@@ -1,24 +1,107 @@
1
  """IntentRouter — classifies a user message and emits source_hint.
2
 
3
- Output: needs_search (bool) + source_hint ∈ { chat, unstructured, structured }.
 
4
 
5
  Replaces the previous orchestration.py once the chat endpoint is rewired.
 
 
6
  """
7
 
8
- from dataclasses import dataclass
 
 
9
  from typing import Literal
10
 
 
 
 
 
 
 
 
 
 
11
 
12
  SourceHint = Literal["chat", "unstructured", "structured"]
13
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
 
15
- @dataclass
16
- class IntentRouterDecision:
17
- needs_search: bool
18
- source_hint: SourceHint
19
- rewritten_query: str | None = None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
 
21
 
22
  class IntentRouter:
23
- async def classify(self, message: str) -> IntentRouterDecision:
24
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """IntentRouter — classifies a user message and emits source_hint.
2
 
3
+ Output: needs_search (bool) + source_hint ∈ { chat, unstructured, structured }
4
+ + rewritten_query (standalone form of the user's question, history-resolved).
5
 
6
  Replaces the previous orchestration.py once the chat endpoint is rewired.
7
+ The default LLM is constructed lazily so the module is import-safe even
8
+ without `.env` populated.
9
  """
10
 
11
+ from __future__ import annotations
12
+
13
+ from pathlib import Path
14
  from typing import Literal
15
 
16
+ from langchain_core.messages import BaseMessage
17
+ from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
18
+ from langchain_core.runnables import Runnable
19
+ from langchain_openai import AzureChatOpenAI
20
+ from pydantic import BaseModel, Field
21
+
22
+ from src.middlewares.logging import get_logger
23
+
24
+ logger = get_logger("intent_router")
25
 
26
  SourceHint = Literal["chat", "unstructured", "structured"]
27
 
28
+ _PROMPT_PATH = (
29
+ Path(__file__).resolve().parent.parent
30
+ / "config"
31
+ / "prompts"
32
+ / "intent_router.md"
33
+ )
34
+
35
+
36
+ class IntentRouterDecision(BaseModel):
37
+ """LLM output. Pydantic so it can be used with `with_structured_output`."""
38
+
39
+ needs_search: bool = Field(
40
+ ..., description="True if we must look at the user's data to answer."
41
+ )
42
+ source_hint: SourceHint = Field(
43
+ ...,
44
+ description="Which downstream path: 'chat' (no lookup), "
45
+ "'unstructured' (PDF/DOCX/TXT prose), 'structured' (DB / tabular file).",
46
+ )
47
+ rewritten_query: str | None = Field(
48
+ None,
49
+ description="Standalone version of the question, history-resolved. "
50
+ "Null when needs_search=false.",
51
+ )
52
+
53
 
54
+ def _load_prompt_text() -> str:
55
+ return _PROMPT_PATH.read_text(encoding="utf-8")
56
+
57
+
58
+ def _build_default_chain() -> Runnable:
59
+ from src.config.settings import settings
60
+
61
+ llm = AzureChatOpenAI(
62
+ azure_deployment=settings.azureai_deployment_name_4o,
63
+ openai_api_version=settings.azureai_api_version_4o,
64
+ azure_endpoint=settings.azureai_endpoint_url_4o,
65
+ api_key=settings.azureai_api_key_4o,
66
+ temperature=0,
67
+ )
68
+ prompt = ChatPromptTemplate.from_messages(
69
+ [
70
+ ("system", _load_prompt_text()),
71
+ MessagesPlaceholder(variable_name="history", optional=True),
72
+ ("human", "{message}"),
73
+ ]
74
+ )
75
+ return prompt | llm.with_structured_output(IntentRouterDecision)
76
 
77
 
78
  class IntentRouter:
79
+ """Classifies a user message into chat / unstructured / structured.
80
+
81
+ Inject `structured_chain` for tests; default builds the production
82
+ Azure OpenAI chain on first use.
83
+ """
84
+
85
+ def __init__(self, structured_chain: Runnable | None = None) -> None:
86
+ self._chain = structured_chain
87
+
88
+ def _ensure_chain(self) -> Runnable:
89
+ if self._chain is None:
90
+ self._chain = _build_default_chain()
91
+ return self._chain
92
+
93
+ async def classify(
94
+ self,
95
+ message: str,
96
+ history: list[BaseMessage] | None = None,
97
+ ) -> IntentRouterDecision:
98
+ chain = self._ensure_chain()
99
+ decision: IntentRouterDecision = await chain.ainvoke(
100
+ {"message": message, "history": history or []}
101
+ )
102
+ logger.info(
103
+ "intent classified",
104
+ source_hint=decision.source_hint,
105
+ needs_search=decision.needs_search,
106
+ )
107
+ return decision
src/config/prompts/chatbot_system.md CHANGED
@@ -1,17 +1,32 @@
1
- # Chatbot System Prompt
2
 
3
- Final answer-formation step. Receives the user question, retrieval context (Cu)
4
- or query results (from QueryExecutor), and produces the natural-language answer.
5
 
6
- Used by `src/agents/chatbot.py`. SSE-streamed back to the user.
 
 
 
 
 
7
 
8
- ## System prompt
9
 
10
- (content to be ported from src/config/agents/system_prompt.md, then rewritten
11
- to remove references to the old retrieval pipeline and reflect catalog-driven flow)
 
12
 
13
- ## Notes
14
 
15
- - Keep answers grounded in the provided context — no hallucination.
16
- - For tabular results, format numbers with appropriate units (from `column.unit` when available).
17
- - Cite sources when applicable (filename + page label).
 
 
 
 
 
 
 
 
 
 
 
1
+ You are a friendly, precise data assistant for a user who has registered databases and uploaded files. Your job is to answer the user's questions using **only** the data context provided to you in this turn.
2
 
3
+ ## Rules
 
4
 
5
+ 1. **Ground every claim in the provided context.** If the context doesn't contain the answer, say so plainly — do not guess. Never invent numbers, dates, or facts that aren't in the result rows or document chunks.
6
+ 2. **Be concise.** Default to 1–4 sentences. Bullet lists when comparing items. A small table when more than ~5 rows of data carry the answer.
7
+ 3. **Use the user's terms when possible.** Mirror the column / table names they care about, but feel free to humanize ("revenue" instead of "total_cents", "last month" instead of "2026-04 timestamps").
8
+ 4. **Reference the source.** When you cite a number from a query result, mention the source briefly (e.g., "from your prod_db `orders` table"). When you quote a document, cite the filename and page if available.
9
+ 5. **Stream coherently.** You are streaming token-by-token; don't backtrack or self-correct mid-answer. Plan the structure mentally before the first token.
10
+ 6. **Markdown is OK** for emphasis and small tables, but avoid heavy formatting (code fences, headers) unless the question genuinely calls for it.
11
 
12
+ ## Context shapes you'll see
13
 
14
+ - **Query result** — emitted when the user asked a data question that ran successfully. Contains `rows` (a list of dicts), `row_count`, the source/table that was queried, and any error string. If `error` is set, explain the failure plainly and suggest a next step.
15
+ - **Document chunks** emitted when the user asked about uploaded prose. Each chunk has source filename and (for PDFs) a page label.
16
+ - **No context** — emitted for greetings, farewells, or meta questions. Just respond conversationally.
17
 
18
+ ## When the query failed
19
 
20
+ If `query_result.error` is non-empty:
21
+ - Acknowledge the failure briefly.
22
+ - Surface the user-actionable part of the error (e.g., "I couldn't find a matching column" → suggest they rephrase).
23
+ - Do not paste raw stack traces or internal IDs.
24
+
25
+ ## What you do NOT do
26
+
27
+ - Speculate beyond the data.
28
+ - Output the raw result rows unless the user explicitly asked for "show me the data".
29
+ - Repeat the user's question back at them.
30
+ - Apologize repeatedly.
31
+
32
+ You have access to recent conversation history; use it to resolve pronouns and avoid restating context the user has already established.
src/config/prompts/guardrails.md CHANGED
@@ -1,12 +1,11 @@
1
- # Guardrails
2
 
3
- Safety / refusal / scope-bounding rules applied to all LLM calls.
4
 
5
- (content to be ported from src/config/agents/guardrails_prompt.md)
6
-
7
- ## Scope
8
-
9
- - Refuse PII extraction requests
10
- - Refuse questions outside the user's data scope
11
- - Refuse code-execution / shell-style requests
12
- - (more to be added)
 
1
+ ## Guardrails
2
 
3
+ These rules apply to every response, regardless of the system prompt above. They take precedence when in conflict with anything else.
4
 
5
+ 1. **Stay within the user's data scope.** Refuse questions that ask you to fabricate data, predict the future from data the user hasn't shared, or answer questions unrelated to the user's registered sources. Reply briefly: "That's outside what I can answer from your data — I can only work with the sources you've registered."
6
+ 2. **Do not reveal or extract PII.** If the data context contains a PII column (it will be flagged), do not list raw values — describe distributions or counts only. If the user explicitly asks for raw PII, refuse: "I can't surface that column's contents directly."
7
+ 3. **No code execution, no shell commands, no file writes.** If the user asks you to run code, modify their data, or perform a write operation, refuse: "I can only read and summarize — I don't execute code or change your data."
8
+ 4. **No credentials, no secrets.** Never repeat connection strings, passwords, API keys, or service-account JSON, even if they somehow appear in context.
9
+ 5. **No medical / legal / financial advice.** If the user asks "should I…" questions about a regulated domain, defer: "I can show you what the data says, but the decision is yours — I won't give advice in this domain."
10
+ 6. **Acknowledge limits when relevant.** If a result was truncated, say so. If you're not sure, say so. Avoid the appearance of false certainty.
11
+ 7. **Be honest about errors.** If the query failed, the document was missing, or the catalog had nothing relevant, say it plainly. Do not paper over with vague answers.
 
src/config/prompts/intent_router.md CHANGED
@@ -1,25 +1,66 @@
1
- # Intent Router Prompt
2
 
3
- Classifies a user message into:
4
- - `needs_search`: bool
5
- - `source_hint`: `chat` | `unstructured` | `structured`
6
 
7
- Used by `src/agents/intent_router.py`.
8
 
9
- ## System prompt
 
 
 
 
 
10
 
11
- (to be written)
12
 
13
- ## Output schema
 
 
 
 
14
 
15
- ```json
16
- {
17
- "needs_search": true,
18
- "source_hint": "structured",
19
- "rewritten_query": "..."
20
- }
21
- ```
 
 
22
 
23
  ## Few-shot examples
24
 
25
- (to be written)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ You are the intent router for an AI data assistant. Given a user's latest message (and optionally recent conversation history), decide which downstream path should handle it.
2
 
3
+ ## Output
 
 
4
 
5
+ Return three fields:
6
 
7
+ - **`needs_search`** — `true` if we must look at the user's data to answer; `false` for greetings, farewells, off-topic chitchat, or meta questions about the assistant itself.
8
+ - **`source_hint`** — one of:
9
+ - `chat` — no data lookup needed (greetings, farewells, generic small talk).
10
+ - `unstructured` — the user is asking about the **content** of an uploaded document (PDF / DOCX / TXT).
11
+ - `structured` — the user is asking a **data question** answerable from a database or a tabular file (CSV / XLSX / Parquet). This includes counts, sums, top-N, filters, comparisons, trends, joins across registered structured sources.
12
+ - **`rewritten_query`** — a **standalone** version of the user's question that incorporates necessary context from history. If the original message is already standalone, return it unchanged. If `needs_search` is `false`, leave this empty/null.
13
 
14
+ ## Routing rules
15
 
16
+ 1. If the message is a pure greeting / farewell / thanks / "how are you" / "what can you do" → `chat` + `needs_search=false`.
17
+ 2. If the message references content that lives in a registered DB or uploaded tabular file (sales numbers, customer counts, order trends, sheet rows, table columns) → `structured` + `needs_search=true`.
18
+ 3. If the message asks about prose content (a section of a PDF, what a memo says, a quote from a document) → `unstructured` + `needs_search=true`.
19
+ 4. If the message is ambiguous between structured and unstructured, prefer `structured` — the planner can fall back if the catalog has nothing relevant.
20
+ 5. Cross-source comparison ("compare DB sales to the customers.csv file") → `structured`. The planner sees both source types in one prompt and can correlate.
21
 
22
+ ## Rewriting follow-ups
23
+
24
+ When history is present and the new message references prior context using pronouns or fragments ("tell me more", "what about last quarter?", "and by region?"), expand the rewritten_query into a fully standalone question. Example:
25
+
26
+ History: "What was our top product last month?" "Pro Plan Annual at $487k"
27
+ Message: "How does that compare to Q1?"
28
+ rewritten_query: "How does Pro Plan Annual's revenue last month compare to Q1?"
29
+
30
+ If the original is already standalone, copy it verbatim into rewritten_query.
31
 
32
  ## Few-shot examples
33
 
34
+ ```
35
+ User: "Hi"
36
+ → needs_search=false, source_hint="chat", rewritten_query=null
37
+
38
+ User: "Bye, thanks"
39
+ → needs_search=false, source_hint="chat", rewritten_query=null
40
+
41
+ User: "What can you do?"
42
+ → needs_search=false, source_hint="chat", rewritten_query=null
43
+
44
+ User: "How many orders did we get last month?"
45
+ → needs_search=true, source_hint="structured",
46
+ rewritten_query="How many orders did we get last month?"
47
+
48
+ User: "What does the Q1 board memo say about churn?"
49
+ → needs_search=true, source_hint="unstructured",
50
+ rewritten_query="What does the Q1 board memo say about churn?"
51
+
52
+ User: "Top 5 customers by revenue this year"
53
+ → needs_search=true, source_hint="structured",
54
+ rewritten_query="Top 5 customers by revenue this year"
55
+
56
+ History: assistant: "Pro Plan Annual led at $487,200 in April."
57
+ User: "And in March?"
58
+ → needs_search=true, source_hint="structured",
59
+ rewritten_query="What was Pro Plan Annual's revenue in March?"
60
+ ```
61
+
62
+ ## Constraints
63
+
64
+ - Do not invent data. If you don't know whether a topic exists in the user's data, route to `structured` and let the planner decide.
65
+ - Do not refuse — refusal happens later in guardrails. Just classify.
66
+ - One JSON object as output; no prose, no markdown.
src/config/prompts/query_planner.md CHANGED
@@ -1,25 +1,151 @@
1
- # Query Planner Prompt
2
 
3
- Takes a user question + their data catalog (Cs Ct).
4
- Produces a JSON IR that describes the query intent.
5
 
6
- See ARCHITECTURE.md §7 for the IR schema. Used by `src/query/planner/service.py`.
7
 
8
- ## System prompt
9
-
10
- (to be written)
11
 
12
  ## Output schema
13
 
14
- Strict JSON matching `src/query/ir/models.py:QueryIR`.
15
- Validated by `IRValidator` against the catalog before reaching the compiler.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
 
17
  ## Few-shot examples
18
 
19
- (to be written — 5–10 examples covering filter + groupby + agg + sort + limit)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
 
21
- ## Notes
22
 
23
- - Reference columns by `column_id`, not `name`.
24
- - `value_type` must match the column's `data_type`.
25
- - Only emit operators/aggs from the whitelist (`src/query/ir/operators.py`).
 
1
+ You are the **query planner** for an AI data assistant. Given a user's question and the user's full data catalog, produce a structured **JSON IR** that captures the query intent.
2
 
3
+ The IR is executed by a deterministic compiler you do **not** write SQL, pandas, or any execution syntax. You produce intent only.
 
4
 
5
+ ## What you receive
6
 
7
+ 1. The user's question.
8
+ 2. The user's catalog: every registered source (databases and tabular files), every table, every column, with descriptions, sample values, stats, and foreign keys. Each item carries a stable identifier (`source_id`, `table_id`, `column_id`) — copy these verbatim into the IR.
 
9
 
10
  ## Output schema
11
 
12
+ A `QueryIR` object:
13
+
14
+ ```jsonc
15
+ {
16
+ "ir_version": "1.0",
17
+ "source_id": "...", // pick from catalog
18
+ "table_id": "...", // pick from chosen source
19
+ "select": [
20
+ {"kind": "column", "column_id": "...", "alias": "..."},
21
+ {"kind": "agg", "fn": "count|count_distinct|sum|avg|min|max",
22
+ "column_id": "...?", "alias": "..."}
23
+ ],
24
+ "filters": [
25
+ {"column_id": "...",
26
+ "op": "= | != | < | <= | > | >= | in | not_in | is_null | is_not_null | like | between",
27
+ "value": ...,
28
+ "value_type": "int|decimal|string|datetime|date|bool"}
29
+ ],
30
+ "group_by": ["column_id", ...],
31
+ "order_by": [{"column_id": "...", "dir": "asc|desc"}],
32
+ "limit": 100
33
+ }
34
+ ```
35
+
36
+ ## Hard constraints (a violation makes the IR invalid)
37
+
38
+ 1. `source_id`, `table_id`, `column_id` must come **verbatim** from the catalog. Never invent IDs or copy table/column **names** in their place.
39
+ 2. **Single-table only in v1.** Pick the table whose columns best answer the question. If the question genuinely needs a join, pick the table that yields the most useful answer alone and the user can refine.
40
+ 3. Use only listed operators / aggregates. No window functions, no `CASE WHEN`, no subqueries — those are not part of v1.
41
+ 4. `value_type` must be compatible with the column's `data_type`:
42
+ - `int` column ↔ value_type ∈ {int, decimal}
43
+ - `decimal` column ↔ value_type ∈ {int, decimal}
44
+ - `string` column ↔ value_type = string
45
+ - `datetime` / `date` column ↔ value_type ∈ {datetime, date, string} (ISO-8601 string is fine)
46
+ - `bool` column ↔ value_type = bool
47
+ 5. `limit` between 1 and 10000 inclusive.
48
+ 6. For `count` of all rows, omit `column_id` from the agg item. For any other aggregate, `column_id` is required.
49
+ 7. `order_by.column_id` may reference either a real column_id or an alias declared in `select`.
50
+ 8. For `is_null` / `is_not_null`, `value` and `value_type` are still emitted but ignored — pick reasonable defaults.
51
+ 9. For `in` / `not_in`, `value` is a JSON list. For `between`, `value` is a JSON list of exactly two elements (low, high).
52
+
53
+ ## Style guidance
54
+
55
+ - Default `limit` to 100 unless the user asked for "top N" (then use N) or said "all" (then leave out `limit`, server will cap at 10000).
56
+ - For "top N by X" → `select` includes the grouping column and the agg, `order_by` on the agg alias `desc`, `limit=N`.
57
+ - For "how many ..." → `select=[{"kind":"agg","fn":"count","alias":"n"}]` plus filters; no group_by.
58
+ - Prefer aliases on aggregates (`alias="total"`, `alias="n"`, etc.) so the answer-formatter has a clean column name.
59
+ - If the question is ambiguous, pick the most likely interpretation and proceed — error retry will give you another attempt if the IR fails validation.
60
 
61
  ## Few-shot examples
62
 
63
+ Catalog excerpt (DB source):
64
+
65
+ ```
66
+ Source: prod_db (schema)
67
+ Source ID: src_prod_db
68
+
69
+ Tables:
70
+
71
+ Table: orders (12,453 rows) — id=t_orders
72
+ Columns:
73
+ - id [int]: samples=[1, 2, 3], distinct=12453 — id=c_orders_id
74
+ - customer_id [int]: samples=[42, 17] — id=c_orders_customer_id
75
+ - total_cents [int]: samples=[2499, 4999], min=99, max=999900 — id=c_orders_total_cents
76
+ - status [string]: samples=[completed, pending] — id=c_orders_status
77
+ - created_at [datetime]: samples=[2026-04-01T08:12:00Z] — id=c_orders_created
78
+ ```
79
+
80
+ Question: "How many orders last month?"
81
+ Output:
82
+ ```json
83
+ {
84
+ "ir_version": "1.0",
85
+ "source_id": "src_prod_db",
86
+ "table_id": "t_orders",
87
+ "select": [{"kind": "agg", "fn": "count", "alias": "n"}],
88
+ "filters": [
89
+ {"column_id": "c_orders_created", "op": ">=", "value": "2026-04-01T00:00:00Z", "value_type": "string"},
90
+ {"column_id": "c_orders_created", "op": "<", "value": "2026-05-01T00:00:00Z", "value_type": "string"}
91
+ ],
92
+ "group_by": [],
93
+ "order_by": [],
94
+ "limit": null
95
+ }
96
+ ```
97
+
98
+ Question: "Top 5 statuses by count"
99
+ Output:
100
+ ```json
101
+ {
102
+ "ir_version": "1.0",
103
+ "source_id": "src_prod_db",
104
+ "table_id": "t_orders",
105
+ "select": [
106
+ {"kind": "column", "column_id": "c_orders_status"},
107
+ {"kind": "agg", "fn": "count", "alias": "n"}
108
+ ],
109
+ "filters": [],
110
+ "group_by": ["c_orders_status"],
111
+ "order_by": [{"column_id": "n", "dir": "desc"}],
112
+ "limit": 5
113
+ }
114
+ ```
115
+
116
+ Catalog excerpt (tabular source — XLSX sheet):
117
+
118
+ ```
119
+ Source: customers.xlsx (tabular)
120
+ Source ID: src_doc_customers
121
+
122
+ Tables:
123
+
124
+ Table: Sheet1 (8,200 rows) — id=t_customers_sheet1
125
+ Columns:
126
+ - id [int]: samples=[1, 2] — id=c_customers_id
127
+ - region [string]: samples=[NA, EMEA, APAC] — id=c_customers_region
128
+ - mrr [decimal]: samples=[99.0, 199.0], min=0.0, max=999.0 — id=c_customers_mrr
129
+ ```
130
+
131
+ Question: "Average MRR by region"
132
+ Output:
133
+ ```json
134
+ {
135
+ "ir_version": "1.0",
136
+ "source_id": "src_doc_customers",
137
+ "table_id": "t_customers_sheet1",
138
+ "select": [
139
+ {"kind": "column", "column_id": "c_customers_region"},
140
+ {"kind": "agg", "fn": "avg", "column_id": "c_customers_mrr", "alias": "avg_mrr"}
141
+ ],
142
+ "filters": [],
143
+ "group_by": ["c_customers_region"],
144
+ "order_by": [{"column_id": "avg_mrr", "dir": "desc"}],
145
+ "limit": 100
146
+ }
147
+ ```
148
 
149
+ ## Retry behavior
150
 
151
+ If the previous attempt's IR failed validation, the error message will be appended below. Read it carefully and emit a corrected IR — do not repeat the same mistake.
 
 
src/query/executor/dispatcher.py CHANGED
@@ -2,16 +2,76 @@
2
 
3
  This is the only place in the structured query path where the schema/tabular
4
  distinction matters. Every step before this is source-type-agnostic.
 
 
 
 
 
 
 
 
 
 
5
  """
6
 
7
- from ...catalog.models import Catalog
 
 
 
 
8
  from ..ir.models import QueryIR
9
  from .base import BaseExecutor
10
 
 
 
11
 
12
  class ExecutorDispatcher:
13
- def __init__(self, catalog: Catalog) -> None:
 
 
 
 
 
 
 
 
 
 
 
14
  self._catalog = catalog
 
 
15
 
16
  def pick(self, ir: QueryIR) -> BaseExecutor:
17
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
 
3
  This is the only place in the structured query path where the schema/tabular
4
  distinction matters. Every step before this is source-type-agnostic.
5
+
6
+ Production executors are imported lazily so the module is import-safe for
7
+ tests (DbExecutor transitively imports `Settings` which fails without `.env`).
8
+ Tests can inject their own `executor_factories` to bypass production deps
9
+ entirely.
10
+
11
+ Until TAB owner ships the real `TabularExecutor` body, dispatching to a
12
+ tabular source returns the existing stub which raises `NotImplementedError`
13
+ on `.run()`. `QueryService` catches this and surfaces a graceful error in
14
+ `QueryResult.error`.
15
  """
16
 
17
+ from __future__ import annotations
18
+
19
+ from collections.abc import Callable
20
+
21
+ from ...catalog.models import Catalog, Source
22
  from ..ir.models import QueryIR
23
  from .base import BaseExecutor
24
 
25
+ ExecutorFactory = Callable[[Catalog], BaseExecutor]
26
+
27
 
28
  class ExecutorDispatcher:
29
+ """Picks the right `BaseExecutor` for an IR.
30
+
31
+ One executor instance per source_type per dispatcher (cached internally),
32
+ since both `DbExecutor` and `TabularExecutor` are stateless beyond the
33
+ catalog they hold.
34
+ """
35
+
36
+ def __init__(
37
+ self,
38
+ catalog: Catalog,
39
+ executor_factories: dict[str, ExecutorFactory] | None = None,
40
+ ) -> None:
41
  self._catalog = catalog
42
+ self._factories = executor_factories
43
+ self._cache: dict[str, BaseExecutor] = {}
44
 
45
  def pick(self, ir: QueryIR) -> BaseExecutor:
46
+ source = self._find_source(ir.source_id)
47
+ if source.source_type in self._cache:
48
+ return self._cache[source.source_type]
49
+ factory = self._get_factory(source.source_type)
50
+ executor = factory(self._catalog)
51
+ self._cache[source.source_type] = executor
52
+ return executor
53
+
54
+ def _get_factory(self, source_type: str) -> ExecutorFactory:
55
+ if self._factories is not None:
56
+ factory = self._factories.get(source_type)
57
+ if factory is None:
58
+ raise ValueError(
59
+ f"no executor factory injected for source_type={source_type!r}"
60
+ )
61
+ return factory
62
+ # Default factories — lazy-imported so importing this module is cheap
63
+ if source_type == "schema":
64
+ from .db import DbExecutor
65
+
66
+ return DbExecutor # type: ignore[return-value]
67
+ if source_type == "tabular":
68
+ from .tabular import TabularExecutor
69
+
70
+ return TabularExecutor # type: ignore[return-value]
71
+ raise ValueError(f"unsupported source_type={source_type!r}")
72
+
73
+ def _find_source(self, source_id: str) -> Source:
74
+ for s in self._catalog.sources:
75
+ if s.source_id == source_id:
76
+ return s
77
+ raise ValueError(f"source_id {source_id!r} not in catalog")
src/query/planner/prompt.py CHANGED
@@ -2,11 +2,48 @@
2
 
3
  Renders the catalog into a compact textual form that fits the LLM context
4
  window. For users with ≤50 tables the full catalog goes in verbatim.
 
 
 
 
5
  """
6
 
 
 
 
7
  from ...catalog.models import Catalog
8
 
9
 
10
- def build_planner_prompt(question: str, catalog: Catalog) -> str:
11
- """Return the full prompt string to feed the planner LLM."""
12
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
 
3
  Renders the catalog into a compact textual form that fits the LLM context
4
  window. For users with ≤50 tables the full catalog goes in verbatim.
5
+
6
+ Reuses `catalog.enricher.render_source` so the planner sees the same
7
+ source-rendering format as the enricher does at ingestion time — keeping
8
+ catalog descriptions consistent across both LLM call sites.
9
  """
10
 
11
+ from __future__ import annotations
12
+
13
+ from ...catalog.enricher import render_source
14
  from ...catalog.models import Catalog
15
 
16
 
17
+ def render_catalog(catalog: Catalog) -> str:
18
+ """Render every Source in the catalog as text. One blank line between sources."""
19
+ if not catalog.sources:
20
+ return "(catalog is empty — the user has not registered any structured data yet)"
21
+ return "\n\n".join(render_source(s) for s in catalog.sources)
22
+
23
+
24
+ def build_planner_prompt(
25
+ question: str,
26
+ catalog: Catalog,
27
+ previous_error: str | None = None,
28
+ ) -> str:
29
+ """Return the human-message content for the planner LLM.
30
+
31
+ Composed of three sections in order:
32
+ 1. The user's question.
33
+ 2. The user's full catalog (rendered).
34
+ 3. (optional) The previous attempt's error, on retry.
35
+
36
+ The system prompt (`config/prompts/query_planner.md`) is loaded
37
+ separately by `QueryPlannerService`.
38
+ """
39
+ sections = [
40
+ f"# Question\n\n{question}",
41
+ f"# Catalog\n\n{render_catalog(catalog)}",
42
+ ]
43
+ if previous_error:
44
+ sections.append(
45
+ "# Previous attempt failed validation\n\n"
46
+ f"{previous_error}\n\n"
47
+ "Emit a corrected IR. Do not repeat the same mistake."
48
+ )
49
+ return "\n\n".join(sections)
src/query/planner/service.py CHANGED
@@ -1,15 +1,90 @@
1
  """QueryPlannerService — single LLM call: question + catalog → JSON IR.
2
 
3
- Prompt: src/config/prompts/query_planner.md
4
- Output: a QueryIR ready for the IRValidator.
 
 
 
5
  """
6
 
 
 
 
 
 
 
 
 
 
 
7
  from ...catalog.models import Catalog
8
  from ..ir.models import QueryIR
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
 
10
 
11
  class QueryPlannerService:
12
- """Wraps the LLM call with structured-output parsing into QueryIR."""
 
 
 
 
 
 
 
 
 
 
 
 
 
13
 
14
- async def plan(self, question: str, catalog: Catalog) -> QueryIR:
15
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """QueryPlannerService — single LLM call: question + catalog → JSON IR.
2
 
3
+ Prompt: src/config/prompts/query_planner.md (system) + the human content
4
+ built by `prompt.build_planner_prompt(...)`.
5
+
6
+ Output: a QueryIR ready for the IRValidator. Validation + retry are the
7
+ caller's concern (`QueryService` orchestrates that loop).
8
  """
9
 
10
+ from __future__ import annotations
11
+
12
+ from pathlib import Path
13
+
14
+ from langchain_core.prompts import ChatPromptTemplate
15
+ from langchain_core.runnables import Runnable
16
+ from langchain_openai import AzureChatOpenAI
17
+
18
+ from src.middlewares.logging import get_logger
19
+
20
  from ...catalog.models import Catalog
21
  from ..ir.models import QueryIR
22
+ from .prompt import build_planner_prompt
23
+
24
+ logger = get_logger("query_planner")
25
+
26
+ _PROMPT_PATH = (
27
+ Path(__file__).resolve().parent.parent.parent
28
+ / "config"
29
+ / "prompts"
30
+ / "query_planner.md"
31
+ )
32
+
33
+
34
+ def _load_prompt_text() -> str:
35
+ return _PROMPT_PATH.read_text(encoding="utf-8")
36
+
37
+
38
+ def _build_default_chain() -> Runnable:
39
+ from src.config.settings import settings
40
+
41
+ llm = AzureChatOpenAI(
42
+ azure_deployment=settings.azureai_deployment_name_4o,
43
+ openai_api_version=settings.azureai_api_version_4o,
44
+ azure_endpoint=settings.azureai_endpoint_url_4o,
45
+ api_key=settings.azureai_api_key_4o,
46
+ temperature=0,
47
+ )
48
+ prompt = ChatPromptTemplate.from_messages(
49
+ [
50
+ ("system", _load_prompt_text()),
51
+ ("human", "{human_content}"),
52
+ ]
53
+ )
54
+ return prompt | llm.with_structured_output(QueryIR)
55
 
56
 
57
  class QueryPlannerService:
58
+ """Wraps the LLM call with structured-output parsing into QueryIR.
59
+
60
+ Inject `structured_chain` for tests. The planner prompt is composed
61
+ by `build_planner_prompt(question, catalog, previous_error)` so retry
62
+ callers can append the prior error context to nudge the LLM.
63
+ """
64
+
65
+ def __init__(self, structured_chain: Runnable | None = None) -> None:
66
+ self._chain = structured_chain
67
+
68
+ def _ensure_chain(self) -> Runnable:
69
+ if self._chain is None:
70
+ self._chain = _build_default_chain()
71
+ return self._chain
72
 
73
+ async def plan(
74
+ self,
75
+ question: str,
76
+ catalog: Catalog,
77
+ previous_error: str | None = None,
78
+ ) -> QueryIR:
79
+ human_content = build_planner_prompt(question, catalog, previous_error)
80
+ chain = self._ensure_chain()
81
+ ir: QueryIR = await chain.ainvoke({"human_content": human_content})
82
+ logger.info(
83
+ "query planned",
84
+ source_id=ir.source_id,
85
+ table_id=ir.table_id,
86
+ select_n=len(ir.select),
87
+ filters_n=len(ir.filters),
88
+ retry=previous_error is not None,
89
+ )
90
+ return ir
src/query/service.py CHANGED
@@ -2,14 +2,134 @@
2
 
3
  Top-level entry point for catalog-driven structured queries. Wired into
4
  the chat endpoint when source_hint == "structured".
 
 
 
 
 
 
 
 
 
 
 
5
  """
6
 
 
 
 
 
 
 
7
  from ..catalog.models import Catalog
8
  from .executor.base import QueryResult
 
 
 
 
 
9
 
10
 
11
  class QueryService:
12
- """End-to-end runner for a user question against a catalog."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
 
14
  async def run(self, user_id: str, question: str, catalog: Catalog) -> QueryResult:
15
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
 
3
  Top-level entry point for catalog-driven structured queries. Wired into
4
  the chat endpoint when source_hint == "structured".
5
+
6
+ Flow per call:
7
+ 1. Plan (LLM): question + catalog → QueryIR
8
+ 2. Validate IR against catalog. On failure, re-prompt the planner with the
9
+ error context and retry (up to `max_retries` total attempts).
10
+ 3. Dispatch IR to the right executor by `source.source_type`.
11
+ 4. Execute. Any exception (including NotImplementedError from the
12
+ TabularExecutor placeholder) is caught and surfaced via
13
+ `QueryResult.error` so the chatbot can branch on success / failure.
14
+
15
+ The service never raises — every code path returns a `QueryResult`.
16
  """
17
 
18
+ from __future__ import annotations
19
+
20
+ from collections.abc import Callable
21
+
22
+ from src.middlewares.logging import get_logger
23
+
24
  from ..catalog.models import Catalog
25
  from .executor.base import QueryResult
26
+ from .executor.dispatcher import ExecutorDispatcher
27
+ from .ir.validator import IRValidationError, IRValidator
28
+ from .planner.service import QueryPlannerService
29
+
30
+ logger = get_logger("query_service")
31
 
32
 
33
  class QueryService:
34
+ """End-to-end runner for a user question against a catalog.
35
+
36
+ All heavy dependencies are injectable so unit tests don't need real
37
+ LLMs or DB engines. Default constructors lazy-build the production
38
+ deps so importing this module is side-effect-free.
39
+ """
40
+
41
+ def __init__(
42
+ self,
43
+ planner: QueryPlannerService | None = None,
44
+ validator: IRValidator | None = None,
45
+ dispatcher_factory: Callable[[Catalog], ExecutorDispatcher] | None = None,
46
+ max_retries: int = 3,
47
+ ) -> None:
48
+ self._planner = planner or QueryPlannerService()
49
+ self._validator = validator or IRValidator()
50
+ self._dispatcher_factory = dispatcher_factory or ExecutorDispatcher
51
+ self._max_retries = max(1, max_retries)
52
 
53
  async def run(self, user_id: str, question: str, catalog: Catalog) -> QueryResult:
54
+ if not catalog.sources:
55
+ return _error_result(
56
+ source_id="",
57
+ error="No structured data registered yet — connect a database "
58
+ "or upload a CSV/XLSX before asking data questions.",
59
+ )
60
+
61
+ # ---------- planner + validator with retry ------------------
62
+ previous_error: str | None = None
63
+ ir = None
64
+ for attempt in range(1, self._max_retries + 1):
65
+ try:
66
+ ir = await self._planner.plan(question, catalog, previous_error)
67
+ except Exception as e:
68
+ logger.error("planner crashed", attempt=attempt, error=str(e))
69
+ return _error_result(source_id="", error=f"planner failed: {e}")
70
+
71
+ try:
72
+ self._validator.validate(ir, catalog)
73
+ logger.info(
74
+ "ir planned and validated",
75
+ attempt=attempt,
76
+ source_id=ir.source_id,
77
+ table_id=ir.table_id,
78
+ )
79
+ break
80
+ except IRValidationError as e:
81
+ previous_error = str(e)
82
+ logger.warning(
83
+ "ir validation failed",
84
+ attempt=attempt,
85
+ error=previous_error,
86
+ )
87
+ ir = None # discard invalid IR
88
+ continue
89
+ else:
90
+ return _error_result(
91
+ source_id="",
92
+ error=(
93
+ f"Planner could not produce a valid IR after "
94
+ f"{self._max_retries} attempts. Last error: {previous_error}"
95
+ ),
96
+ )
97
+
98
+ # `ir` is non-None and valid here (guarded by the for/else above)
99
+ assert ir is not None
100
+
101
+ # ---------- dispatch + execute ------------------------------
102
+ try:
103
+ dispatcher = self._dispatcher_factory(catalog)
104
+ executor = dispatcher.pick(ir)
105
+ except Exception as e:
106
+ logger.error("dispatch failed", source_id=ir.source_id, error=str(e))
107
+ return _error_result(source_id=ir.source_id, error=f"dispatch failed: {e}")
108
+
109
+ try:
110
+ return await executor.run(ir)
111
+ except NotImplementedError as e:
112
+ # TabularExecutor placeholder — TAB owner ships PR3-TAB
113
+ logger.warning(
114
+ "executor not yet implemented",
115
+ source_id=ir.source_id,
116
+ error=str(e),
117
+ )
118
+ return _error_result(
119
+ source_id=ir.source_id,
120
+ error="Tabular execution is not yet available — pending PR3-TAB.",
121
+ )
122
+ except Exception as e:
123
+ logger.error("executor crashed", source_id=ir.source_id, error=str(e))
124
+ return _error_result(
125
+ source_id=ir.source_id, error=f"executor failed: {e}"
126
+ )
127
+
128
+
129
+ def _error_result(source_id: str, error: str) -> QueryResult:
130
+ """Build a uniform error QueryResult.
131
+
132
+ `backend` is intentionally empty when the failure happens before an
133
+ executor is picked — the caller can still distinguish via `error`.
134
+ """
135
+ return QueryResult(source_id=source_id, backend="", error=error)