Rifqi Hafizuddin commited on
Commit
f31f673
·
1 Parent(s): efc0c0a

[KM-553] initialize shared contract

Browse files
REPO_CONTEXT.md ADDED
@@ -0,0 +1,448 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Repo Context — Agentic Service Data Eyond Catalog
2
+
3
+ Orientation file for future Claude Code sessions. Cross-reference `ARCHITECTURE.md` for the full design rationale and decision log.
4
+
5
+ ---
6
+
7
+ ## TL;DR
8
+
9
+ FastAPI multi-agent backend for data analysis. Users upload documents and register databases / tabular files; they ask natural-language questions and get answers grounded in their data, streamed via SSE.
10
+
11
+ The architecture has two paths:
12
+
13
+ - **Unstructured** (PDF, DOCX, TXT) — dense similarity over prose chunks (PGVector).
14
+ - **Structured** (databases, XLSX, CSV, Parquet) — a per-user **data catalog** describes what tables/columns exist; an LLM produces a **JSON IR** of intent; a deterministic Python compiler turns the IR into SQL or pandas; the executor runs it.
15
+
16
+ The LLM produces *intent*, not query syntax. Deterministic code does the rest.
17
+
18
+ The repo is a **scaffold** — folder structure and contracts (Pydantic shapes, ABC signatures, docstrings) are in place; most module bodies raise `NotImplementedError`. See *Implementation status* below.
19
+
20
+ ---
21
+
22
+ ## Stack
23
+
24
+ - Python 3.12, FastAPI 0.115, uvicorn, sse-starlette
25
+ - Async SQLAlchemy 2.0 + asyncpg (Postgres), psycopg3 (PGVector multi-statement workaround)
26
+ - LangChain 0.3 + langchain-postgres (PGVector) + langchain-openai (Azure OpenAI GPT-4o + embeddings)
27
+ - LangGraph 0.2 + langgraph-checkpoint-postgres
28
+ - Redis 5 (response + retrieval cache)
29
+ - Azure Blob Storage (uploads + Parquet)
30
+ - pandas, pyarrow, polars-ready (deferred), sqlglot, pydantic v2, structlog, slowapi, langfuse
31
+ - presidio-analyzer + spaCy `en_core_web_lg` (PII), pytesseract + pdf2image (PDF OCR)
32
+ - DB connectors: psycopg2, pymysql, pymssql, sqlalchemy-bigquery, snowflake-sqlalchemy
33
+
34
+ Run: `uv run --no-sync uvicorn main:app --host 0.0.0.0 --port 7860`. On Windows use `uv run --no-sync python run.py` (sets `WindowsSelectorEventLoopPolicy` for psycopg3 async).
35
+
36
+ ---
37
+
38
+ ## Top-level layout
39
+
40
+ ```
41
+ main.py — FastAPI app + middleware + router wiring + init_db() on startup
42
+ run.py — Windows-safe local entry point
43
+ ARCHITECTURE.md — design intent (source of truth for shape + invariants)
44
+ README.md
45
+ Dockerfile — python:3.12-slim, installs spaCy en_core_web_lg, tesseract, poppler
46
+ pyproject.toml / uv.lock
47
+ scripts/ — backfill scripts (build_initial_catalogs, enrich_all_sources)
48
+ src/ — all application code
49
+ ```
50
+
51
+ ---
52
+
53
+ ## src/ map
54
+
55
+ ### Core data shapes (only files with real content)
56
+
57
+ | Path | Role |
58
+ |---|---|
59
+ | `catalog/models.py` | Pydantic: `Catalog → Source[] → Table[] → Column[]` |
60
+ | `query/ir/models.py` | `QueryIR` (select / filters / group_by / order_by / limit) |
61
+ | `query/ir/operators.py` | `ALLOWED_FILTER_OPS`, `ALLOWED_AGG_FNS`, `LIMIT_HARD_CAP=10000` |
62
+ | `security/pii_patterns.py` | name patterns + email/phone regex for PII detection |
63
+
64
+ ### Catalog — identity layer for structured sources (Cs ∪ Ct)
65
+
66
+ | Path | Role |
67
+ |---|---|
68
+ | `catalog/introspect/base.py` | `BaseIntrospector.introspect(location_ref) -> Source` |
69
+ | `catalog/introspect/database.py` | `information_schema` + ~100 row sample → draft Source |
70
+ | `catalog/introspect/tabular.py` | Parquet/CSV/XLSX header reader + sample (one Table per sheet for XLSX) |
71
+ | `catalog/enricher.py` | one LLM call per source — adds AI descriptions at source/table/column |
72
+ | `catalog/validator.py` | invariants beyond Pydantic shape (unique IDs, FK refs) |
73
+ | `catalog/store.py` | persist as Postgres `jsonb` row keyed by user_id (`get/upsert/delete`) |
74
+ | `catalog/reader.py` | load + filter catalog by source_hint (returns full catalog for ≤50 tables) |
75
+ | `catalog/pii_detector.py` | flag PII columns at ingestion → suppresses `sample_values` |
76
+
77
+ ### Query — catalog-driven structured path
78
+
79
+ | Path | Role |
80
+ |---|---|
81
+ | `query/service.py` | `QueryService.run(user_id, question, catalog) -> QueryResult` (top-level) |
82
+ | `query/planner/service.py` | LLM call: question + catalog → QueryIR (structured output) |
83
+ | `query/planner/prompt.py` | renders catalog into the planner prompt |
84
+ | `query/ir/validator.py` | catalog-aware IR validation: column_ids exist, ops whitelisted, value_type matches data_type, limit ≤ cap |
85
+ | `query/compiler/base.py` | `BaseCompiler.compile(ir) -> object` |
86
+ | `query/compiler/sql.py` | IR → `(sql, params)`; identifiers from catalog, values parameterized |
87
+ | `query/compiler/pandas.py` | IR → callable that runs against a DataFrame |
88
+ | `query/executor/base.py` | `BaseExecutor.run(ir) -> QueryResult` (uniform across backends) |
89
+ | `query/executor/db.py` | runs compiled SQL via asyncpg/pymysql in read-only txn (sqlglot second-line defence) |
90
+ | `query/executor/tabular.py` | runs pandas/polars chain on a Parquet file (eager pandas → pyarrow pushdown → polars lazy by file size) |
91
+ | `query/executor/dispatcher.py` | picks DB vs Tabular executor based on `source.source_type` of the IR's source |
92
+
93
+ ### Retrieval — unstructured path (Cu)
94
+
95
+ | Path | Role |
96
+ |---|---|
97
+ | `retrieval/document.py` | `DocumentRetriever` over PGVector chunks |
98
+ | `retrieval/router.py` | dispatches the `unstructured` route (the `chat` and `structured` routes do not pass through here) |
99
+
100
+ ### Agents — the four LLM call sites
101
+
102
+ | Path | Role |
103
+ |---|---|
104
+ | `agents/intent_router.py` | classify message → `needs_search`, `source_hint ∈ {chat, unstructured, structured}` |
105
+ | `agents/chatbot.py` | final answer formation (receives Cu chunks or QueryResult); SSE-streamed |
106
+
107
+ (`CatalogEnricher` + `QueryPlanner` are the other two LLM call sites — both live under `catalog/` and `query/planner/`.)
108
+
109
+ ### Pipelines — ingestion coordinators
110
+
111
+ | Path | Role |
112
+ |---|---|
113
+ | `pipeline/orchestrator.py` | top-level: routes uploads / DB connects to the right pipeline |
114
+ | `pipeline/structured_pipeline.py` | DB / tabular: introspect → enrich → validate → store |
115
+ | `pipeline/document_pipeline.py` | unstructured: extract → chunk → embed → PGVector |
116
+ | `pipeline/triggers.py` | event entry points called by API routes (`on_document_uploaded`, `on_db_registered`, …) |
117
+
118
+ ### Security — cross-cutting
119
+
120
+ | Path | Role |
121
+ |---|---|
122
+ | `security/auth.py` | bcrypt password hash/verify, JWT encode/decode, get_user |
123
+ | `security/credentials.py` | Fernet encrypt/decrypt for stored DB credentials |
124
+ | `security/pii_patterns.py` | (already listed) |
125
+
126
+ ### API + infra + config
127
+
128
+ | Path | Role |
129
+ |---|---|
130
+ | `api/v1/*.py` | FastAPI routers — thin endpoints delegating to `pipeline/triggers` and `query/service` |
131
+ | `models/api/{catalog,chat,document}.py` | request/response Pydantic models |
132
+ | `db/postgres/connection.py` | two async engines: `engine` (app) and `_pgvector_engine` (PGVector) |
133
+ | `db/postgres/init_db.py` | startup: creates `vector` extension, all tables, HNSW + GIN indexes |
134
+ | `db/postgres/models.py` | SQLAlchemy app tables (users, rooms, chat messages, …) |
135
+ | `db/postgres/vector_store.py` | shared PGVector instance (collection `document_embeddings`) |
136
+ | `db/redis/connection.py` | async Redis client |
137
+ | `storage/az_blob/az_blob.py` | Azure Blob async wrapper (uploads + Parquet) |
138
+ | `middlewares/{cors,logging,rate_limit}.py` | CORS allow-all (POC), structlog JSON, slowapi |
139
+ | `observability/langfuse/langfuse.py` | trace helper |
140
+ | `config/settings.py` | pydantic-settings; `.env` uses double-underscore aliases |
141
+ | `config/env_constant.py` | env file path constant |
142
+ | `config/prompts/*.md` | prompt templates: `intent_router`, `catalog_enricher`, `query_planner`, `chatbot_system`, `guardrails` |
143
+
144
+ ---
145
+
146
+ ## Core architectural decisions
147
+
148
+ 1. **Catalog as primary context, not retrieval.** For ≤50 tables (typical), the entire catalog is rendered into the planner prompt verbatim (~3–5k tokens). No vector search, no BM25, no top-k for structured data. Catalog-level retrieval (BM25 + table-level vectors with RRF) is the *deferred* upgrade for users with hundreds of tables.
149
+
150
+ 2. **JSON IR over raw SQL.** The planner LLM emits a Pydantic-validated intent, never a SQL string. The compiler is deterministic Python. Benefits: validatable before execution, dialect-portable (one IR → SQL of any dialect / pandas / polars), cheaper tokens, trivially testable without an LLM, and the LLM literally cannot emit invalid SQL syntax.
151
+
152
+ 3. **Deterministic compiler, not LLM SQL writer.** All actual query construction happens in pure code. Compiler bugs are reproducible and fixable. Same IR → same query.
153
+
154
+ 4. **Pipeline stage isolation.** Each stage (`IntentRouter`, `CatalogReader`, `QueryPlanner`, `IRValidator`, `QueryCompiler`, `QueryExecutor`, `ChatbotAgent`) is its own module with typed input and typed output. No god classes.
155
+
156
+ 5. **Minimal LLM surface.** Only four LLM call sites in the system:
157
+ - `CatalogEnricher` — once per source, **at ingestion** (not query time)
158
+ - `IntentRouter` — once per user message
159
+ - `QueryPlanner` — once per structured query
160
+ - `ChatbotAgent` — once per answer (formatting)
161
+
162
+ 6. **Three-way routing**: `chat` / `unstructured` / `structured`. The router commits to one path. Cross-source questions ("compare DB sales vs uploaded customer file") are handled inside the structured path because the planner sees Cs ∪ Ct in one prompt. **DB vs tabular is not a routing concern** — it's a per-source attribute (`source_type`) that only matters at execution time.
163
+
164
+ 7. **Stable IDs.** `source_id`, `table_id`, `column_id` are stable internal references. Renaming a column in the source DB does not invalidate cached IRs.
165
+
166
+ 8. **PII suppression at the boundary.** Columns flagged with `pii_flag=true` have `sample_values: null` — real PII never enters LLM prompts. Auto-detected at ingestion via name patterns + value regex (`security/pii_patterns.py`). When in doubt, flag — false positives cost nothing; false negatives leak data.
167
+
168
+ ---
169
+
170
+ ## End-to-end flows
171
+
172
+ ### Ingestion (when user uploads a file or connects a DB)
173
+
174
+ ```
175
+ source upload / DB connect
176
+
177
+ ├── unstructured (pdf/docx/txt)
178
+ │ → DocumentPipeline: extract → chunk → embed → PGVector
179
+
180
+ └── structured (DB schema or tabular file)
181
+ → introspect (information_schema or file headers + sample rows)
182
+ → CatalogEnricher (1 LLM call per source — AI descriptions)
183
+ → CatalogValidator (Pydantic + unique-IDs + FK refs)
184
+ → CatalogStore.upsert(user_id jsonb row)
185
+ ```
186
+
187
+ ### Query (per user message)
188
+
189
+ ```
190
+ user message
191
+
192
+ → Redis cache check (24h TTL) ── miss ─→ continue
193
+
194
+ → IntentRouter LLM → needs_search? source_hint?
195
+
196
+ ├── chat → ChatbotAgent → SSE stream
197
+ ├── unstructured → DocumentRetriever (Cu) → ChatbotAgent → SSE stream
198
+ └── structured →
199
+ CatalogReader.read(user_id, "structured") # full Cs ∪ Ct
200
+
201
+ QueryPlanner LLM(question, catalog) → QueryIR
202
+
203
+ IRValidator.validate(ir, catalog)
204
+ (source_id ∈ catalog, table_id ∈ source, column_ids ∈ table,
205
+ ops/aggs whitelisted, value_type matches data_type, limit ≤ 10000)
206
+ fail → re-prompt planner with error context (max 3 retries)
207
+
208
+ ExecutorDispatcher.pick(ir) # by source.source_type
209
+ ├─ DbExecutor → SqlCompiler → sqlglot guard → asyncpg/pymysql
210
+ │ (read-only txn, 30s timeout)
211
+ └─ TabularExecutor → PandasCompiler → eager pandas (≤100 MB)
212
+ or pyarrow pushdown (100 MB–1 GB)
213
+ or polars lazy scan (>1 GB)
214
+
215
+ QueryResult
216
+
217
+ ChatbotAgent → SSE stream
218
+ ```
219
+
220
+ ---
221
+
222
+ ## Catalog schema (per-user `jsonb` row)
223
+
224
+ ```
225
+ Catalog
226
+ ├── user_id, schema_version, generated_at
227
+ └── sources[]
228
+ └── Source { source_id, source_type, name, description, location_ref, updated_at }
229
+ └── tables[]
230
+ └── Table { table_id, name, description, row_count }
231
+ └── columns[]
232
+ └── Column { column_id, name, data_type, description,
233
+ nullable, pii_flag, sample_values[]|null, stats|null }
234
+ ```
235
+
236
+ `source_type ∈ {schema, tabular, unstructured}`.
237
+ `data_type ∈ {int, decimal, string, datetime, date, bool, json}`.
238
+
239
+ Deferred Column fields (add when justified): `description_human`, `synonyms[]`, `tags[]`, `primary_key`, `foreign_keys`, `unit`, `semantic_type`, `example_questions[]`, `schema_hash`, `enrichment_status`.
240
+
241
+ ---
242
+
243
+ ## JSON IR schema
244
+
245
+ ```jsonc
246
+ {
247
+ "ir_version": "1.0",
248
+ "source_id": "...",
249
+ "table_id": "...",
250
+ "select": [
251
+ {"kind": "column", "column_id": "...", "alias": "..."},
252
+ {"kind": "agg", "fn": "count|count_distinct|sum|avg|min|max",
253
+ "column_id": "...?", "alias": "..."}
254
+ ],
255
+ "filters": [
256
+ {"column_id": "...",
257
+ "op": "= | != | < | <= | > | >= | in | not_in | is_null | is_not_null | like | between",
258
+ "value": ...,
259
+ "value_type": "int|decimal|string|datetime|date|bool"}
260
+ ],
261
+ "group_by": ["column_id", ...],
262
+ "order_by": [{"column_id": "...", "dir": "asc|desc"}],
263
+ "limit": 100
264
+ }
265
+ ```
266
+
267
+ Single-table only in v1. `having`, `offset`, boolean filter trees, `distinct`, joins, window functions are deferred until user demand proves the limitation.
268
+
269
+ ---
270
+
271
+ ## Implementation status
272
+
273
+ `raise NotImplementedError` everywhere except the four files listed under *Core data shapes*. Every stub has a docstring describing inputs, outputs, and rules — those are the contract. When implementing, fill in the body; don't change the signature without updating `ARCHITECTURE.md`.
274
+
275
+ Per `ARCHITECTURE.md §9`, the initial PR ships:
276
+
277
+ | Item | Status |
278
+ |---|---|
279
+ | Catalog Pydantic models | ✓ done (`catalog/models.py`) |
280
+ | JSON IR Pydantic models | ✓ done (`query/ir/models.py`) |
281
+ | Catalog ingestion (introspect → enrich → validate → store) | stubs |
282
+ | `IntentRouter` (3-way `source_hint`) | stub |
283
+ | `CatalogReader` | stub |
284
+ | `QueryPlanner` LLM call | stub |
285
+ | IR validator | stub |
286
+
287
+ **Output of PR 1**: a validated `QueryIR` object. Execution lands in PR 2 (compiler), PR 3 (executors), PR 4 (retry/self-correction), PR 5 (eval harness), PR 6 (auto PII tagging). Joins, schema drift detection, hybrid catalog search are explicitly later.
288
+
289
+ ---
290
+
291
+ ## Team — division of work
292
+
293
+ The service is built by two engineers; many modules are source-type-agnostic and shared.
294
+
295
+ - **DB** owns SQL paths: introspection, SQL compiler, DB executor, credential storage.
296
+ - **TAB** owns tabular paths: CSV/XLSX/Parquet introspection, pandas compiler, tabular executor, blob/Parquet plumbing.
297
+ - **B** = both — shared contracts and source-type-agnostic plumbing. Pair-program or split with explicit hand-off.
298
+
299
+ ### Step-by-step ownership
300
+
301
+ | # | Step | File / area | Owner | Notes |
302
+ |---|---|---|---|---|
303
+ | 0 | **Lock contracts before coding** | — | B | See "Decisions to lock" below; block until aligned |
304
+ | 1 | Catalog Pydantic models | `catalog/models.py` | B | Already done; only touch if both agree |
305
+ | 2 | IR Pydantic models | `query/ir/models.py` | B | Already done; joins/window fns require joint sign-off |
306
+ | 3 | IR operator whitelists | `query/ir/operators.py` | B | Already done; both compilers rely on these |
307
+ | 4 | PII patterns / regex | `security/pii_patterns.py` | B | Already done; extend together as gaps appear |
308
+ | **Ingestion — introspection** | | | | |
309
+ | 5 | DB introspector (information_schema, sample, FKs) | `catalog/introspect/database.py` | DB | Use SQLAlchemy `inspect()`; dialect-aware quoting |
310
+ | 6 | Tabular introspector (CSV/XLSX/Parquet headers + sample) | `catalog/introspect/tabular.py` | TAB | Each XLSX sheet → one Table |
311
+ | 7 | `BaseIntrospector` ABC | `catalog/introspect/base.py` | B | Confirm signature returns the same `Source` shape |
312
+ | **Ingestion — shared catalog plumbing** | | | | |
313
+ | 8 | Catalog enricher + prompt | `catalog/enricher.py`, `config/prompts/catalog_enricher.md` | B | Whoever picks it up first; the other reviews. Prompt must work uniformly across source types |
314
+ | 9 | Catalog validator | `catalog/validator.py` | B | Type-agnostic |
315
+ | 10 | Catalog store (Postgres jsonb) | `catalog/store.py` | B | Recommend DB (Postgres expertise) |
316
+ | 11 | Catalog reader | `catalog/reader.py` | B | Type-agnostic |
317
+ | 12 | PII detector | `catalog/pii_detector.py` | B | Either; uses `pii_patterns.py` |
318
+ | **Ingestion — pipelines** | | | | |
319
+ | 13 | Structured pipeline (introspect → enrich → validate → store) | `pipeline/structured_pipeline.py` | B | Pair on this — calls both introspectors via dispatcher |
320
+ | 14 | Triggers (`on_db_registered`, `on_tabular_uploaded`) | `pipeline/triggers.py` | B | Each owns their trigger function |
321
+ | 15 | Ingestion orchestrator | `pipeline/orchestrator.py` | B | Routes by source_type; pair |
322
+ | 16 | Document pipeline (PDF/DOCX/TXT) | `pipeline/document_pipeline.py` | TAB | Tabular-adjacent (file uploads) |
323
+ | **Query — shared spine** | | | | |
324
+ | 17 | IR validator (catalog-aware) | `query/ir/validator.py` | B | Recommend DB; both must agree on exact error messages so retry-prompt is consistent |
325
+ | 18 | Planner LLM service | `query/planner/service.py` | B | Type-agnostic |
326
+ | 19 | Planner prompt (catalog → text) | `query/planner/prompt.py`, `config/prompts/query_planner.md` | B | **Pair-program**. Must describe DB tables and tabular files in one consistent format |
327
+ | 20 | Intent router (chat/unstructured/structured) | `agents/intent_router.py`, `config/prompts/intent_router.md` | B | Type-agnostic |
328
+ | 21 | Executor base + `QueryResult` | `query/executor/base.py` | B | Lock the shape before either implements an executor |
329
+ | 22 | Executor dispatcher | `query/executor/dispatcher.py` | B | Reads `source.source_type` from catalog; pair |
330
+ | 23 | Compiler base ABC | `query/compiler/base.py` | B | Already done |
331
+ | 24 | Top-level QueryService | `query/service.py` | B | Wires planner → validator → compiler → executor; pair |
332
+ | **Query — DB path** | | | | |
333
+ | 25 | SQL compiler (IR → SQL + params, per dialect) | `query/compiler/sql.py` | DB | Identifiers from catalog (quoted), values parameterized |
334
+ | 26 | DB executor (asyncpg/pymysql, sqlglot guard, RO txn, 30s timeout) | `query/executor/db.py` | DB | |
335
+ | 27 | Credential encryption (Fernet) | `security/credentials.py` | DB | Needed for stored user DB creds |
336
+ | 28 | User-DB connection management | helper in pipelines | DB | engine_scope context manager pattern |
337
+ | **Query — Tabular path** | | | | |
338
+ | 29 | Pandas compiler (IR → callable on DataFrame) | `query/compiler/pandas.py` | TAB | Same IR, different backend |
339
+ | 30 | Tabular executor (eager pandas first; pyarrow / polars later) | `query/executor/tabular.py` | TAB | Initial scope: eager pandas only |
340
+ | 31 | Parquet upload/download + Azure Blob wrapper | `storage/az_blob/az_blob.py` (+ helper) | TAB | XLSX sheet → one Parquet per sheet (deterministic blob name) |
341
+ | **Agents + chat** | | | | |
342
+ | 32 | Chatbot agent + prompt | `agents/chatbot.py`, `config/prompts/chatbot_system.md` | B | Receives QueryResult or Cu chunks |
343
+ | 33 | Guardrails prompt | `config/prompts/guardrails.md` | B | |
344
+ | **API surface** | | | | |
345
+ | 34 | DB client endpoints (register/ingest/list/delete) | `api/v1/db_client.py` | DB | |
346
+ | 35 | Document/tabular upload endpoints | `api/v1/document.py` | TAB | |
347
+ | 36 | Chat stream endpoint (SSE) | `api/v1/chat.py` | B | Dispatches both paths; pair |
348
+ | 37 | Room / users endpoints | `api/v1/room.py`, `api/v1/users.py` | B | Whoever has bandwidth |
349
+ | **Tests + eval** | | | | |
350
+ | 38 | DB compiler golden tests (IR → SQL fixtures) | `tests/query/compiler/test_sql.py` | DB | Pure-Python, no LLM |
351
+ | 39 | Pandas compiler golden tests (IR → expected DataFrame) | `tests/query/compiler/test_pandas.py` | TAB | Pure-Python, no LLM |
352
+ | 40 | IR validator tests (catalog × IR error matrix) | `tests/query/ir/test_validator.py` | B | Each contributes test cases for their source type |
353
+ | 41 | Planner eval (golden question → IR examples) | `tests/query/planner/` | B | Each contributes ~10 question→IR examples |
354
+ | 42 | E2E smoke tests | `tests/e2e/` | B | Pair |
355
+
356
+ ### Decisions to lock before coding
357
+
358
+ If made unilaterally these create silent contract drift. Lock them in a 30-min sync first.
359
+
360
+ | Decision | Why it matters | Recommended call |
361
+ |---|---|---|
362
+ | `QueryResult` shape (current scaffold: `source_id, backend, rows, row_count, truncated, elapsed_ms, error`) | Both executors return this; chatbot consumes it | Lock as-is unless either side needs more (e.g. `column_types` for formatting) |
363
+ | `Source.location_ref` format (`az_blob://...` vs `dbclient://{id}` etc.) | Dispatcher and executors both parse this | Pick a convention now; document in `catalog/models.py` docstring |
364
+ | Where do user DB credentials live? | DB executor needs creds to run queries; Source has `location_ref` but creds are encrypted separately | Recommend: `location_ref="dbclient://{client_id}"`; executor looks up creds by ID |
365
+ | How does dispatcher pick the executor? | Routes by `source.source_type` — but where does dispatcher get it (catalog reload, or IR carries it)? | Recommend: dispatcher takes `(Catalog, IR)`, looks up source by `IR.source_id` |
366
+ | Joins in v1 IR? | Excluded per ARCHITECTURE.md §7. DB path is most affected — real DB use often needs joins. | Recommend: ship single-table; revisit in PR 2. **DB owner must accept the constraint or push back early** |
367
+ | Planner prompt — render tabular vs DB sources uniformly | If described differently, planner gets confused | Pair-program. Render both as `Table: name (n rows) — Columns: ...` regardless of source_type |
368
+ | Error contract — raise or return `QueryResult.error`? | Both executors must behave the same so chatbot branches consistently | Recommend: never raise from `executor.run()`; populate `QueryResult.error` |
369
+ | PII handling for tabular `sample_values` | DB samples come from `information_schema`; tabular from file reads. Same `pii_flag` rule must apply both sides | Confirm tabular introspector calls `pii_detector` |
370
+ | Catalog refresh trigger (open question §3) | Affects both pipelines symmetrically | Default: rebuild on every upload/connect; defer auto-refresh |
371
+ | `updated_at` semantics — per-Source vs per-Catalog | Affects how each pipeline writes | Recommend: per-Source `updated_at` + Catalog-level `generated_at` |
372
+ | Dialect support scope for v1 | DB compiler must implement at least one dialect well | Recommend: Postgres first (matches app DB); MySQL second |
373
+ | Test-fixture format for golden IRs | Both compilers test against golden IR → expected output | Recommend: shared `tests/fixtures/golden_irs.json`; each side adds expected SQL or DataFrame |
374
+ | Logging conventions | structlog is already in place; both should log the same fields | Quick agreement: log `source_id`, `table_id`, `ir_version`, `elapsed_ms` |
375
+
376
+ ### Working rhythm (suggested)
377
+
378
+ 1. **Day 1** — 30-min sync to lock the decisions table. PR any contract/docstring changes that fall out.
379
+ 2. **Week 1** — both build introspectors + agree on the planner prompt format. PR in parallel; review each other's.
380
+ 3. **Week 2** — DB builds SQL compiler + DB executor; TAB builds pandas compiler + tabular executor. Both write golden tests against shared IR fixtures.
381
+ 4. **Week 3** — pair on dispatcher, QueryService, and chat endpoint integration. End-to-end smoke test.
382
+ 5. **Ongoing** — short daily standup, mostly to flag IR-shape questions and catalog-field additions *before* either side implements against an unconfirmed contract.
383
+
384
+ Biggest risk: **silent contract drift** — one side adds a `QueryResult` field or assumes a new IR op exists, the other ships without it, and integration breaks at the dispatcher. The §0 lock + shared golden-IR fixtures are what prevent that.
385
+
386
+ ### Onboarding to Claude Code
387
+
388
+ If you're new to Claude Code, before you start:
389
+
390
+ 1. Read `ARCHITECTURE.md` end-to-end (~10 min) — this is the source of truth.
391
+ 2. Skim this file (`REPO_CONTEXT.md`) — find your section in the ownership table.
392
+ 3. Read your owned files' docstrings — every stub explains its contract.
393
+ 4. Open Claude Code in this repo. When you ask Claude to implement a stub:
394
+ - Reference the file path + the contract it should follow
395
+ - Point it at `ARCHITECTURE.md` section if relevant (e.g. §7 for IR validation)
396
+ - Ask it to write the test first (golden IR fixtures), then the implementation
397
+ - Always review the diff — don't auto-accept
398
+
399
+ Useful slash commands while working: `/review` (PR review), `/security-review` (audit pending changes).
400
+
401
+ ---
402
+
403
+ ## Conventions & gotchas
404
+
405
+ - **Async event loop on Windows**: `run.py` sets `WindowsSelectorEventLoopPolicy` because psycopg3 async needs it. Don't call `uvicorn` directly on Windows.
406
+ - **Two Postgres engines**: `engine` (app tables) and `_pgvector_engine` (asyncpg with `prepared_statement_cache_size=0`) — the latter is required because PGVector emits `advisory_lock + CREATE EXTENSION` as a multi-statement string and asyncpg rejects multi-statement prepared queries. `init_db.py` creates the extension explicitly so `PGVector(create_extension=False)` skips that path.
407
+ - **Read-only at every layer for user DBs**: IR validation + compiler whitelists + sqlglot SELECT-only check + read-only DB credentials + LIMIT enforcement + 30s timeout. Five layers; no single point of failure.
408
+ - **Identifiers vs values**: identifiers (table/column names) come from the catalog and are inlined as quoted identifiers — they were verified at validation time so this is safe. Values from `IR.filters` are *always* parameterized, never inlined as strings.
409
+ - **Credential encryption**: Fernet via `dataeyond__db__credential__key` env var; lives in `security/credentials.py`. Sensitive fields = `{"password", "service_account_json"}`.
410
+ - **Settings env-var aliases**: `.env` uses double-underscore names (`azureai__api_key__4o`); `Settings` exposes them as `azureai_api_key_4o` via `Field(alias=...)`. Mind both forms when adding settings.
411
+ - **Prompts**: `src/config/prompts/*.md` — most are placeholders ("to be written"). The system prompt + few-shots for each LLM call site live here, not inline in the agent code.
412
+ - **No tests yet**: pytest-asyncio + ruff + mypy are in dev deps; create `tests/` when implementing PR 1. The IR validator and compiler should be the first targets — both are deterministic and testable without an LLM.
413
+
414
+ ---
415
+
416
+ ## Recommended reading order
417
+
418
+ 1. `ARCHITECTURE.md` — design intent (the source of truth)
419
+ 2. `src/catalog/models.py` + `src/query/ir/models.py` — the two data shapes everything else moves between
420
+ 3. `src/query/ir/operators.py` + `src/security/pii_patterns.py` — the explicit whitelists / patterns
421
+ 4. Skim every `__init__.py`-level docstring under `src/catalog/`, `src/query/`, `src/agents/`, `src/pipeline/` — each describes the contract its module enforces
422
+ 5. `main.py` + `src/db/postgres/{connection,init_db}.py` — runtime bootstrap
423
+ 6. `ARCHITECTURE.md §10` — five open questions that haven't been decided yet
424
+
425
+ ---
426
+
427
+ ## Open questions (unresolved)
428
+
429
+ From `ARCHITECTURE.md §10`:
430
+
431
+ 1. Catalog storage shape — JSON file per user vs Postgres `jsonb` row?
432
+ 2. Should the catalog also list unstructured files (with descriptions only) so the router has a unified view?
433
+ 3. Catalog refresh trigger — explicit "rebuild" button, on every upload, or background TTL?
434
+ 4. Confirm joins are out of initial IR scope?
435
+ 5. PII handling for `sample_values` — mask, synthesize, or skip?
436
+
437
+ Settle these as PRs land — most won't block PR 1.
438
+
439
+ ---
440
+
441
+ ## Glossary
442
+
443
+ - **Cu** — unstructured context (prose chunks)
444
+ - **Cs** — schema context (DB tables/columns from catalog)
445
+ - **Ct** — tabular context (file sheets/columns from catalog)
446
+ - **IR** — intermediate representation (the JSON query shape)
447
+ - **PII** — personally identifiable information
448
+ - **ABC** — abstract base class
src/catalog/introspect/database.py CHANGED
@@ -3,14 +3,210 @@
3
  Reads information_schema for tables/columns/types, samples ~100 rows per table
4
  for `sample_values` and basic stats. Does NOT generate descriptions
5
  (that happens in CatalogEnricher).
 
 
 
 
 
6
  """
7
 
8
- from ..models import Source
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
  from .base import BaseIntrospector
10
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
 
12
  class DatabaseIntrospector(BaseIntrospector):
13
  """Connect to user DB → read information_schema → sample 100 rows/table."""
14
 
 
 
 
15
  async def introspect(self, location_ref: str) -> Source:
16
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3
  Reads information_schema for tables/columns/types, samples ~100 rows per table
4
  for `sample_values` and basic stats. Does NOT generate descriptions
5
  (that happens in CatalogEnricher).
6
+
7
+ Reuses Phase 1 utilities (`database_client_service`, `db_credential_encryption`,
8
+ `db_pipeline_service.engine_scope`, `extractor.get_schema/profile_column/get_row_count`)
9
+ to avoid reimplementation. The cleanup PR will move those into `security/` and
10
+ `pipeline/db_pipeline/` respectively.
11
  """
12
 
13
+ import asyncio
14
+ import hashlib
15
+ from datetime import UTC, datetime
16
+ from decimal import Decimal
17
+ from typing import Any
18
+
19
+ from src.database_client.database_client_service import database_client_service
20
+ from src.db.postgres.connection import AsyncSessionLocal
21
+ from src.middlewares.logging import get_logger
22
+ from src.pipeline.db_pipeline import db_pipeline_service
23
+ from src.pipeline.db_pipeline.extractor import (
24
+ get_row_count,
25
+ get_schema,
26
+ profile_column,
27
+ )
28
+ from src.utils.db_credential_encryption import decrypt_credentials_dict
29
+
30
+ from ..models import Column, ColumnStats, DataType, Source, Table
31
+ from ..pii_detector import PIIDetector
32
  from .base import BaseIntrospector
33
 
34
+ logger = get_logger("db_introspector")
35
+
36
+ _DBCLIENT_PREFIX = "dbclient://"
37
+
38
+
39
+ def _stable_id(prefix: str, *parts: str) -> str:
40
+ """Deterministic short ID from joined parts. Survives renames at the
41
+ `name` field while preserving identity for cached IRs.
42
+
43
+ Hash is non-cryptographic (identifier only).
44
+ """
45
+ h = hashlib.sha1(
46
+ "/".join(parts).encode("utf-8"), usedforsecurity=False
47
+ ).hexdigest()[:12]
48
+ return f"{prefix}{h}"
49
+
50
+
51
+ def _map_sql_type(sql_type: str) -> DataType:
52
+ """Map a stringified SQLAlchemy type to a Catalog DataType.
53
+
54
+ Matches on substring of the SQLAlchemy type repr (e.g. 'INTEGER',
55
+ 'TIMESTAMP', 'BOOLEAN'). Conservative — unknowns fall back to "string"
56
+ so the column is at least addressable.
57
+ """
58
+ s = sql_type.upper()
59
+ if "INT" in s:
60
+ return "int"
61
+ if "FLOAT" in s or "NUMERIC" in s or "DECIMAL" in s or "REAL" in s or "DOUBLE" in s:
62
+ return "decimal"
63
+ if "BOOL" in s:
64
+ return "bool"
65
+ if "TIMESTAMP" in s or "DATETIME" in s:
66
+ return "datetime"
67
+ if "DATE" in s:
68
+ return "date"
69
+ if "JSON" in s:
70
+ return "json"
71
+ return "string"
72
+
73
+
74
+ def _normalize(v: Any) -> Any:
75
+ """Coerce non-JSON-native scalars (Decimal, numpy, datetime) to types
76
+ that survive the jsonb round-trip when the catalog is persisted.
77
+ """
78
+ if v is None:
79
+ return None
80
+ if isinstance(v, Decimal):
81
+ return float(v)
82
+ try:
83
+ import numpy as np
84
+
85
+ if isinstance(v, np.generic):
86
+ return v.item()
87
+ except ImportError:
88
+ pass
89
+ if isinstance(v, datetime):
90
+ return v.isoformat()
91
+ return v
92
+
93
 
94
  class DatabaseIntrospector(BaseIntrospector):
95
  """Connect to user DB → read information_schema → sample 100 rows/table."""
96
 
97
+ def __init__(self) -> None:
98
+ self._pii = PIIDetector()
99
+
100
  async def introspect(self, location_ref: str) -> Source:
101
+ if not location_ref.startswith(_DBCLIENT_PREFIX):
102
+ raise ValueError(
103
+ f"DatabaseIntrospector expects 'dbclient://...' location_ref, "
104
+ f"got {location_ref!r}"
105
+ )
106
+ client_id = location_ref[len(_DBCLIENT_PREFIX):]
107
+ if not client_id:
108
+ raise ValueError("location_ref is missing client_id after 'dbclient://'")
109
+
110
+ async with AsyncSessionLocal() as session:
111
+ client = await database_client_service.get(session, client_id)
112
+ if client is None:
113
+ raise ValueError(f"DatabaseClient {client_id!r} not found")
114
+
115
+ creds = decrypt_credentials_dict(client.credentials)
116
+ logger.info(
117
+ "introspecting db source",
118
+ client_id=client_id,
119
+ db_type=client.db_type,
120
+ name=client.name,
121
+ )
122
+
123
+ # SQLAlchemy inspect() + pandas read_sql are synchronous — run in a
124
+ # threadpool so the event loop stays free.
125
+ tables: list[Table] = await asyncio.to_thread(
126
+ self._introspect_sync, client.db_type, creds
127
+ )
128
+
129
+ return Source(
130
+ source_id=client_id,
131
+ source_type="schema",
132
+ name=client.name,
133
+ description="",
134
+ location_ref=location_ref,
135
+ updated_at=datetime.now(UTC),
136
+ tables=tables,
137
+ )
138
+
139
+ def _introspect_sync(self, db_type: str, creds: dict) -> list[Table]:
140
+ with db_pipeline_service.engine_scope(db_type, creds) as engine:
141
+ schema = get_schema(engine)
142
+ tables: list[Table] = []
143
+ for table_name, cols in schema.items():
144
+ try:
145
+ row_count = get_row_count(engine, table_name)
146
+ except Exception as e:
147
+ logger.error(
148
+ "row_count failed; skipping table",
149
+ table=table_name,
150
+ error=str(e),
151
+ )
152
+ continue
153
+
154
+ columns: list[Column] = []
155
+ for col in cols:
156
+ try:
157
+ profile = profile_column(
158
+ engine,
159
+ table_name,
160
+ col["name"],
161
+ col.get("is_numeric", False),
162
+ row_count,
163
+ )
164
+ except Exception as e:
165
+ logger.error(
166
+ "profile_column failed; skipping column",
167
+ table=table_name,
168
+ column=col["name"],
169
+ error=str(e),
170
+ )
171
+ continue
172
+ columns.append(self._to_column(table_name, col, profile))
173
+
174
+ tables.append(
175
+ Table(
176
+ table_id=_stable_id("t_", table_name),
177
+ name=table_name,
178
+ description="",
179
+ row_count=row_count,
180
+ columns=columns,
181
+ )
182
+ )
183
+ return tables
184
+
185
+ def _to_column(
186
+ self, table_name: str, col: dict[str, Any], profile: dict[str, Any]
187
+ ) -> Column:
188
+ name = col["name"]
189
+ sample_values: list[Any] | None = [
190
+ _normalize(v) for v in (profile.get("sample_values") or [])
191
+ ] or None
192
+
193
+ column = Column(
194
+ column_id=_stable_id("c_", table_name, name),
195
+ name=name,
196
+ data_type=_map_sql_type(str(col["type"])),
197
+ description="",
198
+ nullable=True, # nullable not surfaced by extractor; default permissive
199
+ pii_flag=False,
200
+ sample_values=sample_values,
201
+ stats=ColumnStats(
202
+ min=_normalize(profile.get("min")),
203
+ max=_normalize(profile.get("max")),
204
+ distinct_count=profile.get("distinct_count"),
205
+ ),
206
+ )
207
+ if self._pii.detect(column):
208
+ return column.model_copy(update={"pii_flag": True, "sample_values": None})
209
+ return column
210
+
211
+
212
+ database_introspector = DatabaseIntrospector()
src/catalog/models.py CHANGED
@@ -1,6 +1,25 @@
1
  """Pydantic models for the per-user data catalog (Cs + Ct).
2
 
3
  See ARCHITECTURE.md §6 for the full schema definition.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4
  """
5
 
6
  from datetime import datetime
@@ -8,7 +27,6 @@ from typing import Any, Literal
8
 
9
  from pydantic import BaseModel, Field
10
 
11
-
12
  SourceType = Literal["schema", "tabular", "unstructured"]
13
  DataType = Literal["int", "decimal", "string", "datetime", "date", "bool", "json"]
14
 
 
1
  """Pydantic models for the per-user data catalog (Cs + Ct).
2
 
3
  See ARCHITECTURE.md §6 for the full schema definition.
4
+
5
+ Source.location_ref URI scheme
6
+ ------------------------------
7
+ A `Source` is uniquely addressable by `location_ref`; introspectors and
8
+ executors parse it to find the underlying data:
9
+
10
+ schema sources → "dbclient://{database_client_id}"
11
+ Resolves via `database_client_service.get(...)` which
12
+ returns a `DatabaseClient` row whose Fernet-encrypted
13
+ credentials are decrypted at runtime.
14
+
15
+ tabular sources → "az_blob://{user_id}/{document_id}"
16
+ The Source aggregates one or more sheets as Tables;
17
+ each per-sheet Parquet blob is named via
18
+ `parquet_service.parquet_blob_name(user_id, document_id, sheet_name)`,
19
+ so executors derive the per-Table blob path from
20
+ `Source.location_ref` plus `Table.name`.
21
+
22
+ unstructured → reserved (deferred — see ARCHITECTURE.md §10 q2).
23
  """
24
 
25
  from datetime import datetime
 
27
 
28
  from pydantic import BaseModel, Field
29
 
 
30
  SourceType = Literal["schema", "tabular", "unstructured"]
31
  DataType = Literal["int", "decimal", "string", "datetime", "date", "bool", "json"]
32
 
src/catalog/pii_detector.py CHANGED
@@ -1,16 +1,39 @@
1
  """PII auto-detection for catalog columns.
2
 
3
  When pii_flag is set True, sample_values is forced to None so real PII
4
- never enters LLM prompts.
5
-
6
- Patterns live in src/security/pii_patterns.py.
7
  """
8
 
 
 
9
  from .models import Column
10
 
11
 
12
  class PIIDetector:
13
- """Marks columns as pii_flag=True when name/values look sensitive."""
 
 
 
 
14
 
15
  def detect(self, column: Column) -> bool:
16
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """PII auto-detection for catalog columns.
2
 
3
  When pii_flag is set True, sample_values is forced to None so real PII
4
+ never enters LLM prompts. Patterns live in src/security/pii_patterns.py.
 
 
5
  """
6
 
7
+ from src.security.pii_patterns import EMAIL_REGEX, PHONE_REGEX, PII_NAME_PATTERNS
8
+
9
  from .models import Column
10
 
11
 
12
  class PIIDetector:
13
+ """Marks columns as pii_flag=True when name or sampled values look sensitive.
14
+
15
+ Bias is intentional: false positives hide harmless sample values,
16
+ false negatives leak data. When unsure, flag.
17
+ """
18
 
19
  def detect(self, column: Column) -> bool:
20
+ if self._name_matches(column.name):
21
+ return True
22
+ if column.sample_values and self._values_match(column.sample_values):
23
+ return True
24
+ return False
25
+
26
+ @staticmethod
27
+ def _name_matches(name: str) -> bool:
28
+ lowered = name.lower()
29
+ return any(pat in lowered for pat in PII_NAME_PATTERNS)
30
+
31
+ @staticmethod
32
+ def _values_match(values: list) -> bool:
33
+ for v in values:
34
+ if v is None:
35
+ continue
36
+ s = str(v)
37
+ if EMAIL_REGEX.match(s) or PHONE_REGEX.match(s):
38
+ return True
39
+ return False
src/catalog/reader.py CHANGED
@@ -4,20 +4,37 @@ For typical users (≤50 tables), returns the FULL catalog with no slicing.
4
  Catalog-level search is added later if catalog grows past the limit.
5
  """
6
 
 
7
  from typing import Literal
8
 
9
  from .models import Catalog
10
  from .store import CatalogStore
11
 
12
-
13
  SourceHint = Literal["chat", "unstructured", "structured"]
14
 
15
 
16
  class CatalogReader:
17
- """Loads the user's catalog and filters by source_hint."""
 
 
 
 
 
 
18
 
19
  def __init__(self, store: CatalogStore) -> None:
20
  self._store = store
21
 
22
  async def read(self, user_id: str, source_hint: SourceHint) -> Catalog:
23
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
4
  Catalog-level search is added later if catalog grows past the limit.
5
  """
6
 
7
+ from datetime import UTC, datetime
8
  from typing import Literal
9
 
10
  from .models import Catalog
11
  from .store import CatalogStore
12
 
 
13
  SourceHint = Literal["chat", "unstructured", "structured"]
14
 
15
 
16
  class CatalogReader:
17
+ """Loads the user's catalog and filters by source_hint.
18
+
19
+ On miss, returns an empty Catalog (never raises) — query path is
20
+ responsible for handling "no data registered yet" gracefully.
21
+ Returned Catalog is always a copy; the underlying stored catalog
22
+ is never mutated.
23
+ """
24
 
25
  def __init__(self, store: CatalogStore) -> None:
26
  self._store = store
27
 
28
  async def read(self, user_id: str, source_hint: SourceHint) -> Catalog:
29
+ catalog = await self._store.get(user_id)
30
+ if catalog is None:
31
+ return Catalog(user_id=user_id, generated_at=datetime.now(UTC))
32
+
33
+ if source_hint == "chat":
34
+ filtered: list = []
35
+ elif source_hint == "structured":
36
+ filtered = [s for s in catalog.sources if s.source_type in {"schema", "tabular"}]
37
+ else: # "unstructured"
38
+ filtered = [s for s in catalog.sources if s.source_type == "unstructured"]
39
+
40
+ return catalog.model_copy(update={"sources": filtered})
src/catalog/store.py CHANGED
@@ -4,17 +4,63 @@ Storage shape: one row per user in a `catalogs` table with columns
4
  (user_id PK, data jsonb, schema_version, generated_at, updated_at).
5
  """
6
 
 
 
 
 
 
 
 
7
  from .models import Catalog
8
 
 
 
9
 
10
  class CatalogStore:
11
- """Read/write catalogs keyed by user_id."""
 
 
 
 
 
12
 
13
  async def get(self, user_id: str) -> Catalog | None:
14
- raise NotImplementedError
 
 
 
 
 
 
 
15
 
16
  async def upsert(self, catalog: Catalog) -> None:
17
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18
 
19
  async def delete(self, user_id: str) -> None:
20
- raise NotImplementedError
 
 
 
 
4
  (user_id PK, data jsonb, schema_version, generated_at, updated_at).
5
  """
6
 
7
+ from sqlalchemy import delete, select
8
+ from sqlalchemy.dialects.postgresql import insert
9
+
10
+ from src.db.postgres.connection import AsyncSessionLocal
11
+ from src.db.postgres.models import Catalog as CatalogRow
12
+ from src.middlewares.logging import get_logger
13
+
14
  from .models import Catalog
15
 
16
+ logger = get_logger("catalog_store")
17
+
18
 
19
  class CatalogStore:
20
+ """Read/write catalogs keyed by user_id.
21
+
22
+ Each method opens its own AsyncSession. Callers needing transactional
23
+ coordination across multiple stores can be refactored to accept an
24
+ explicit AsyncSession in a later PR.
25
+ """
26
 
27
  async def get(self, user_id: str) -> Catalog | None:
28
+ async with AsyncSessionLocal() as session:
29
+ result = await session.execute(
30
+ select(CatalogRow.data).where(CatalogRow.user_id == user_id)
31
+ )
32
+ row = result.scalar_one_or_none()
33
+ if row is None:
34
+ return None
35
+ return Catalog.model_validate(row)
36
 
37
  async def upsert(self, catalog: Catalog) -> None:
38
+ payload = catalog.model_dump(mode="json")
39
+ async with AsyncSessionLocal() as session:
40
+ stmt = insert(CatalogRow).values(
41
+ user_id=catalog.user_id,
42
+ data=payload,
43
+ schema_version=catalog.schema_version,
44
+ generated_at=catalog.generated_at,
45
+ )
46
+ stmt = stmt.on_conflict_do_update(
47
+ index_elements=[CatalogRow.user_id],
48
+ set_={
49
+ "data": stmt.excluded.data,
50
+ "schema_version": stmt.excluded.schema_version,
51
+ "generated_at": stmt.excluded.generated_at,
52
+ },
53
+ )
54
+ await session.execute(stmt)
55
+ await session.commit()
56
+ logger.info(
57
+ "catalog upserted",
58
+ user_id=catalog.user_id,
59
+ sources=len(catalog.sources),
60
+ )
61
 
62
  async def delete(self, user_id: str) -> None:
63
+ async with AsyncSessionLocal() as session:
64
+ await session.execute(delete(CatalogRow).where(CatalogRow.user_id == user_id))
65
+ await session.commit()
66
+ logger.info("catalog deleted", user_id=user_id)
src/catalog/validator.py CHANGED
@@ -21,4 +21,29 @@ class CatalogValidator:
21
  """
22
 
23
  def validate(self, catalog: Catalog) -> None:
24
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  """
22
 
23
  def validate(self, catalog: Catalog) -> None:
24
+ seen_sources: set[str] = set()
25
+ for source in catalog.sources:
26
+ if source.source_id in seen_sources:
27
+ raise CatalogValidationError(
28
+ f"duplicate source_id {source.source_id!r} in catalog "
29
+ f"for user_id={catalog.user_id!r}"
30
+ )
31
+ seen_sources.add(source.source_id)
32
+
33
+ seen_tables: set[str] = set()
34
+ for table in source.tables:
35
+ if table.table_id in seen_tables:
36
+ raise CatalogValidationError(
37
+ f"duplicate table_id {table.table_id!r} in source "
38
+ f"{source.source_id!r}"
39
+ )
40
+ seen_tables.add(table.table_id)
41
+
42
+ seen_columns: set[str] = set()
43
+ for column in table.columns:
44
+ if column.column_id in seen_columns:
45
+ raise CatalogValidationError(
46
+ f"duplicate column_id {column.column_id!r} in table "
47
+ f"{table.table_id!r} (source {source.source_id!r})"
48
+ )
49
+ seen_columns.add(column.column_id)
src/db/postgres/init_db.py CHANGED
@@ -3,6 +3,7 @@
3
  from sqlalchemy import text
4
  from src.db.postgres.connection import engine, Base
5
  from src.db.postgres.models import (
 
6
  ChatMessage,
7
  DatabaseClient,
8
  Document,
 
3
  from sqlalchemy import text
4
  from src.db.postgres.connection import engine, Base
5
  from src.db.postgres.models import (
6
+ Catalog,
7
  ChatMessage,
8
  DatabaseClient,
9
  Document,
src/db/postgres/models.py CHANGED
@@ -96,4 +96,20 @@ class DatabaseClient(Base):
96
  status = Column(String, nullable=False, default="active") # active | inactive
97
  created_at = Column(DateTime(timezone=True), server_default=func.now())
98
  updated_at = Column(DateTime(timezone=True), onupdate=func.now())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99
 
 
96
  status = Column(String, nullable=False, default="active") # active | inactive
97
  created_at = Column(DateTime(timezone=True), server_default=func.now())
98
  updated_at = Column(DateTime(timezone=True), onupdate=func.now())
99
+
100
+
101
+ class Catalog(Base):
102
+ """Per-user data catalog stored as a single jsonb row.
103
+
104
+ `data` holds the full Pydantic Catalog (src/catalog/models.py:Catalog)
105
+ serialized via `model_dump(mode="json")`. Read path uses
106
+ `Catalog.model_validate(...)` to rehydrate.
107
+ """
108
+ __tablename__ = "catalogs"
109
+
110
+ user_id = Column(String, primary_key=True)
111
+ data = Column(JSONB, nullable=False)
112
+ schema_version = Column(String, nullable=False, default="1.0")
113
+ generated_at = Column(DateTime(timezone=True), server_default=func.now())
114
+ updated_at = Column(DateTime(timezone=True), onupdate=func.now())
115
 
src/query/ir/models.py CHANGED
@@ -10,7 +10,6 @@ from typing import Any, Literal
10
 
11
  from pydantic import BaseModel, Field
12
 
13
-
14
  FilterOp = Literal[
15
  "=", "!=", "<", "<=", ">", ">=",
16
  "in", "not_in", "is_null", "is_not_null",
 
10
 
11
  from pydantic import BaseModel, Field
12
 
 
13
  FilterOp = Literal[
14
  "=", "!=", "<", "<=", ">", ">=",
15
  "in", "not_in", "is_null", "is_not_null",
src/query/ir/operators.py CHANGED
@@ -12,6 +12,16 @@ ALLOWED_AGG_FNS = frozenset({
12
 
13
  LIMIT_HARD_CAP = 10_000
14
 
15
- # Type compatibility: which value_types are valid for each column data_type.
16
- # To be filled with the explicit matrix when validator.py is implemented.
17
- TYPE_COMPATIBILITY: dict[str, frozenset[str]] = {}
 
 
 
 
 
 
 
 
 
 
 
12
 
13
  LIMIT_HARD_CAP = 10_000
14
 
15
+ # Type compatibility: which value_types may appear in a FilterClause when the
16
+ # referenced column has the given data_type. Numeric types are mutually
17
+ # compatible (decimal literal against int column is fine). Date/datetime accept
18
+ # string so the planner can emit ISO-8601 literals without mode juggling.
19
+ TYPE_COMPATIBILITY: dict[str, frozenset[str]] = {
20
+ "int": frozenset({"int", "decimal"}),
21
+ "decimal": frozenset({"int", "decimal"}),
22
+ "string": frozenset({"string"}),
23
+ "datetime": frozenset({"datetime", "date", "string"}),
24
+ "date": frozenset({"date", "datetime", "string"}),
25
+ "bool": frozenset({"bool"}),
26
+ "json": frozenset({"string"}),
27
+ }
src/query/ir/validator.py CHANGED
@@ -1,11 +1,20 @@
1
  """IRValidator — checks a QueryIR against a user's catalog.
2
 
3
- See ARCHITECTURE.md §7 for the validation rules.
4
- On failure, the planner is re-prompted with the error context (max 3 retries).
 
5
  """
6
 
7
- from ...catalog.models import Catalog
8
  from .models import QueryIR
 
 
 
 
 
 
 
 
9
 
10
 
11
  class IRValidationError(Exception):
@@ -20,9 +29,101 @@ class IRValidator:
20
  - table_id belongs to that source
21
  - every column_id exists in that table
22
  - every agg.fn and filter.op is whitelisted (see operators.py)
23
- - value_type consistent with column.data_type
24
- - limit positive int, ≤ hard cap
25
  """
26
 
27
  def validate(self, ir: QueryIR, catalog: Catalog) -> None:
28
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """IRValidator — checks a QueryIR against a user's catalog.
2
 
3
+ See ARCHITECTURE.md §7 for the validation rules. On failure, the planner
4
+ is re-prompted with the error context (max 3 retries) — error messages
5
+ must therefore be specific enough that the LLM can self-correct.
6
  """
7
 
8
+ from ...catalog.models import Catalog, Column, Source, Table
9
  from .models import QueryIR
10
+ from .operators import (
11
+ ALLOWED_AGG_FNS,
12
+ ALLOWED_FILTER_OPS,
13
+ LIMIT_HARD_CAP,
14
+ TYPE_COMPATIBILITY,
15
+ )
16
+
17
+ _NULLARY_FILTER_OPS = frozenset({"is_null", "is_not_null"})
18
 
19
 
20
  class IRValidationError(Exception):
 
29
  - table_id belongs to that source
30
  - every column_id exists in that table
31
  - every agg.fn and filter.op is whitelisted (see operators.py)
32
+ - value_type consistent with column.data_type (TYPE_COMPATIBILITY)
33
+ - limit positive int, ≤ LIMIT_HARD_CAP
34
  """
35
 
36
  def validate(self, ir: QueryIR, catalog: Catalog) -> None:
37
+ source = self._find_source(catalog, ir.source_id)
38
+ table = self._find_table(source, ir.table_id)
39
+ columns_by_id: dict[str, Column] = {c.column_id: c for c in table.columns}
40
+
41
+ select_aliases: set[str] = set()
42
+ for i, item in enumerate(ir.select):
43
+ where = f"select[{i}]"
44
+ if item.kind == "column":
45
+ self._require_column(columns_by_id, item.column_id, where)
46
+ else: # "agg"
47
+ if item.fn not in ALLOWED_AGG_FNS:
48
+ raise IRValidationError(
49
+ f"{where}.fn: must be in {sorted(ALLOWED_AGG_FNS)}, "
50
+ f"got {item.fn!r}"
51
+ )
52
+ if item.column_id is not None:
53
+ self._require_column(columns_by_id, item.column_id, where)
54
+ elif item.fn != "count":
55
+ raise IRValidationError(
56
+ f"{where}.fn={item.fn!r} requires a column_id "
57
+ "(only 'count' may omit it for COUNT(*))"
58
+ )
59
+ if item.alias:
60
+ select_aliases.add(item.alias)
61
+
62
+ for i, f in enumerate(ir.filters):
63
+ where = f"filters[{i}]"
64
+ col = self._require_column(columns_by_id, f.column_id, where)
65
+ if f.op not in ALLOWED_FILTER_OPS:
66
+ raise IRValidationError(
67
+ f"{where}.op: must be in {sorted(ALLOWED_FILTER_OPS)}, "
68
+ f"got {f.op!r}"
69
+ )
70
+ if f.op not in _NULLARY_FILTER_OPS:
71
+ allowed = TYPE_COMPATIBILITY.get(col.data_type, frozenset())
72
+ if f.value_type not in allowed:
73
+ raise IRValidationError(
74
+ f"{where}: value_type {f.value_type!r} incompatible with "
75
+ f"column.data_type {col.data_type!r} "
76
+ f"(allowed: {sorted(allowed)})"
77
+ )
78
+
79
+ for i, col_id in enumerate(ir.group_by):
80
+ self._require_column(columns_by_id, col_id, f"group_by[{i}]")
81
+
82
+ for i, ob in enumerate(ir.order_by):
83
+ if ob.column_id not in columns_by_id and ob.column_id not in select_aliases:
84
+ raise IRValidationError(
85
+ f"order_by[{i}].column_id: {ob.column_id!r} not found in table "
86
+ f"{ir.table_id!r} columns or select aliases "
87
+ f"(known columns: {sorted(columns_by_id.keys())}, "
88
+ f"aliases: {sorted(select_aliases)})"
89
+ )
90
+
91
+ if ir.limit is not None:
92
+ if ir.limit <= 0:
93
+ raise IRValidationError(f"limit must be positive, got {ir.limit}")
94
+ if ir.limit > LIMIT_HARD_CAP:
95
+ raise IRValidationError(
96
+ f"limit {ir.limit} exceeds hard cap {LIMIT_HARD_CAP}"
97
+ )
98
+
99
+ @staticmethod
100
+ def _find_source(catalog: Catalog, source_id: str) -> Source:
101
+ for s in catalog.sources:
102
+ if s.source_id == source_id:
103
+ return s
104
+ raise IRValidationError(
105
+ f"source_id {source_id!r} not in catalog "
106
+ f"(known: {[s.source_id for s in catalog.sources]})"
107
+ )
108
+
109
+ @staticmethod
110
+ def _find_table(source: Source, table_id: str) -> Table:
111
+ for t in source.tables:
112
+ if t.table_id == table_id:
113
+ return t
114
+ raise IRValidationError(
115
+ f"table_id {table_id!r} not in source {source.source_id!r} "
116
+ f"(known: {[t.table_id for t in source.tables]})"
117
+ )
118
+
119
+ @staticmethod
120
+ def _require_column(
121
+ columns_by_id: dict[str, Column], col_id: str, where: str
122
+ ) -> Column:
123
+ col = columns_by_id.get(col_id)
124
+ if col is None:
125
+ raise IRValidationError(
126
+ f"{where}.column_id: {col_id!r} not in table "
127
+ f"(known: {sorted(columns_by_id.keys())})"
128
+ )
129
+ return col