Backend 服务技术参考¶
概述
TTD Backend 是 Agentic BI 系统的运行时核心,采用 LangGraph Supervisor + Sub-agents 编排模式, 提供 Chat API、智能查询编排、安全防护与多模型 AI 管道。
- 应用入口:
backend/app/main.py - 当前版本:4.0.0
- 包名:
ttd-backend(uv workspace member)
1. 技术基线¶
| 组件 | 版本/规格 | 用途 |
|---|---|---|
| Python | 3.12+ | 运行时语言 |
| FastAPI | 0.115+ | ASGI Web 框架 |
| Uvicorn | latest | ASGI 服务器 |
| LangGraph | ≥ 0.4.0 | 状态图编排、检查点恢复 |
| LangChain | latest | 多模型集成抽象层 |
| PostgreSQL (Aurora) | 15+ | 会话、聊天、配置、语义层存储 |
| psycopg 3 | latest | 异步 PostgreSQL 驱动(raw AI pipeline) |
| SQLAlchemy 2.0 | latest | ORM(Admin 资源管理) |
| Pydantic v2 | latest | 数据校验 + Settings |
| PyJWT + cryptography | latest | JWT 解码 + JWKS 验签 (RS256) |
| structlog | latest | 结构化 JSON 日志 |
| slowapi | latest | 请求限流 |
多模型配置¶
| 模型 | Provider | 适用任务 |
|---|---|---|
deepseek-v3.2 |
DashScope (OpenAI-compatible) | Supervisor, Query Understanding, Planner, Follow-up |
qwen3-max |
DashScope (OpenAI-compatible) | Insights, Visualization, SQL Explanation, Knowledge QA |
qwen3-coder-plus |
DashScope (OpenAI-compatible) | SQL Generation, SQL Repair |
claude-4.5-sonnet |
Anthropic | 可选高质量推理(Admin 配置) |
Embedding 模型¶
| 模型 | Provider | 维度 |
|---|---|---|
text-embedding-v4 |
DashScope | 1024 dim |
2. 应用启动与生命周期¶
应用使用 FastAPI 的 lifespan 上下文管理器进行资源初始化和清理。
启动序列¶
sequenceDiagram
participant App as FastAPI App
participant PG as PostgreSQL Pool
participant IR as Instance Registry
participant GS as Graceful Shutdown
participant MR as Model Registry
participant CP as LangGraph Checkpointer
participant MS as Memory Store
participant MC as Metadata Cache
participant SG as Supervisor Graph
participant SM as Session Manager
App->>App: _configure_logging()
App->>PG: AsyncConnectionPool(aurora_dsn, min=5, max=20)
PG-->>App: pool ready
opt Swarm Mode (TTD_SWARM_MODE=true)
App->>IR: InstanceRegistry(pool, instance_id, hostname)
IR->>PG: CREATE TABLE ttd.agent_instances
IR->>PG: INSERT/UPSERT instance registration
IR->>IR: start_heartbeat(interval=10s)
App->>GS: GracefulShutdown(registry, drain_timeout=120s)
GS->>GS: install_signal_handlers(SIGTERM, SIGINT)
end
App->>MR: ModelRegistry() + load_from_db()
MR-->>App: models loaded
App->>CP: AsyncPostgresSaver.from_conn_string()
CP->>CP: setup() (create checkpoint tables)
CP-->>App: checkpointer ready
App->>MS: AsyncPostgresStore.from_conn_string()
MS-->>App: store ready
App->>MC: MetadataCache(pg_pool)
MC-->>App: cache ready
App->>SG: build_supervisor_graph(checkpointer, store, registry, pool)
SG-->>App: compiled graph
App->>SM: SessionManager(pg_pool)
SM-->>App: manager ready
App->>App: create_task(_cleanup_loop)
Note over App: 🟢 "TTD Chat BI started (instance=xxx, swarm=true)"
关闭序列¶
- Graceful Drain (Swarm mode): 停止接受新请求,等待 in-flight 完成(max 120s)
- 停止 heartbeat + 从
ttd.agent_instances标记为shutdown - 取消后台清理任务
- 关闭 LangGraph Memory Store 连接
- 关闭 LangGraph Checkpointer 连接
- 关闭 PostgreSQL 连接池
- 释放 SQLAlchemy Engine
Middleware 栈¶
| 中间件 | 功能 |
|---|---|
| CORSMiddleware | 跨域请求支持(生产环境收紧到 BETTER_AUTH_URL,拒绝 * + credentials) |
| Rate Limiter (slowapi) | 按 IP 限流,默认 60 req/min/user |
| RateLimitExceeded Handler | 429 响应处理 |
| Prometheus Metrics | /metrics 端点(可选,TTD_PROMETHEUS_ENABLED) |
Router 注册¶
所有 API 路由挂载在 /api/v1 前缀下:
3. API 端点完整列表¶
3.1 Chat BI API(Primary)¶
推荐接口
新客户端应使用 Chat API,它提供完整的对话管理、流式响应和反馈机制。
| 方法 | 路径 | 说明 |
|---|---|---|
GET |
/api/v1/chats |
列出当前用户所有对话 |
POST |
/api/v1/chats |
创建新对话 |
GET |
/api/v1/chats/{id} |
获取对话详情(含消息历史) |
PATCH |
/api/v1/chats/{id} |
更新对话(标题/状态) |
DELETE |
/api/v1/chats/{id} |
删除对话 |
POST |
/api/v1/chats/{id}/messages |
发送消息(非流式响应) |
POST |
/api/v1/chats/{id}/messages/stream |
发送消息(SSE 流式响应) |
POST |
/api/v1/messages/{id}/feedback |
提交消息反馈 |
消息请求示例¶
{
"question": "本月 GMV 同比增长多少?",
"language": "zh",
"options": {
"enable_visualization": true,
"enable_insights": true,
"enable_evaluators": false
}
}
SSE 流式事件格式¶
event: status
data: {"step": "query_understanding", "message": "正在理解您的问题..."}
event: status
data: {"step": "sql_generation", "message": "正在生成查询..."}
event: result
data: {"message_id": "...", "status": "success", ...}
3.2 Legacy API(向后兼容)¶
已废弃
Legacy API 保留用于旧客户端兼容,新开发请使用 Chat API。
| 方法 | 路径 | 说明 |
|---|---|---|
POST |
/api/v1/query |
单次查询(同步) |
GET |
/api/v1/query/{session_id}/stream |
Legacy SSE 流式 |
POST |
/api/v1/sql_execute |
直接 SQL 执行 |
GET |
/api/v1/sessions |
列出会话 |
POST |
/api/v1/feedback |
提交反馈 |
3.3 Admin API¶
| 分组 | 路径前缀 | 说明 |
|---|---|---|
| LLM Models | /api/v1/admin/llms/* |
LLM 模型 CRUD |
| Embedding Models | /api/v1/admin/embedding-models/* |
Embedding 模型管理 |
| Reranker Models | /api/v1/admin/reranker-models/* |
Reranker 模型管理 |
| Knowledge Bases | /api/v1/admin/knowledge-bases/* |
知识库 CRUD |
| Data Sources | /api/v1/admin/datasources/* |
数据源管理 |
| Documents | /api/v1/admin/documents/* |
文档管理 |
| Chunks | /api/v1/admin/chunks/* |
Chunk 浏览 |
| Graph | /api/v1/admin/graph/* |
KG Entity/Relationship |
| Chat Engines | /api/v1/admin/chat-engines/* |
聊天引擎配置 |
| Evaluations | /api/v1/admin/evaluations/* |
评估任务管理 |
| Feedbacks | /api/v1/admin/feedbacks/* |
反馈审核 |
| Uploads | /api/v1/admin/uploads/* |
文件上传 |
| API Keys | /api/v1/admin/api-keys/* |
API Key 管理 |
| Site Settings | /api/v1/admin/site-settings/* |
全局配置 |
| Stats | /api/v1/admin/stats/* |
系统统计 |
| Semantic Plane | /api/v1/admin/semantic-plane/* |
语义层资产浏览 |
| Few-Shot Review | /api/v1/admin/few-shots/* |
动态 Few-Shot 质量审核(审批、驳回、YAML 导出) |
3.4 Utility API¶
| 方法 | 路径 | 说明 |
|---|---|---|
GET |
/api/v1/health |
健康检查 |
GET |
/metrics |
Prometheus 指标(可选) |
4. Agent 编排拓扑¶
4.1 整体架构¶
graph TD
START([START]) --> supervisor
supervisor --> query_understanding_agent
query_understanding_agent --> router
router -->|kpi_lookup| data_retrieval_agent
router -->|nl2sql_query| data_retrieval_agent
router -->|deep_analysis_workflow| planner
router -->|graph_reasoning| graph_rag_agent
router -->|business_knowledge_qa| data_retrieval_agent_kb
router -->|clarification_required| data_retrieval_agent_cl
subgraph "SQL Pipeline"
data_retrieval_agent --> reranker
reranker --> sql_generator
sql_generator --> guardrails_agent
guardrails_agent -->|execute| sql_executor
guardrails_agent -->|corrective_repair| corrective_retrieval
guardrails_agent -->|repair| sql_generator
guardrails_agent -->|reject| END_REJECT([END])
corrective_retrieval --> sql_generator
sql_executor --> result_processor
result_processor --> sql_explainer
end
subgraph "Post-processing"
sql_explainer -->|visualize| visualization_agent
visualization_agent --> insights_agent
sql_explainer -->|evaluate| retrieval_evaluator
retrieval_evaluator --> sql_evaluator
sql_evaluator --> visualization_agent
end
subgraph "Deep Analysis"
planner --> data_retrieval_agent_deep
data_retrieval_agent_deep --> reranker_deep[reranker]
reranker_deep --> sql_generator_deep[sql_generator]
sql_generator_deep --> guardrails_deep[guardrails]
end
insights_agent --> answer_evaluator
answer_evaluator --> END_OK([END])
insights_agent --> END_OK2([END])
graph_rag_agent --> END_GRAPH([END])
data_retrieval_agent_kb --> knowledge_qa_agent
knowledge_qa_agent --> END_KB([END])
data_retrieval_agent_cl --> follow_up_agent
follow_up_agent --> END_CL([END])
4.2 路由决策详解¶
| Route Decision | 触发意图 | 执行路径 |
|---|---|---|
kpi_lookup |
kpi_lookup | data_retrieval → reranker → sql_generator → guardrails → sql_executor → result_processor → sql_explainer |
nl2sql_query |
trend_analysis, comparison, ranking, detail, clarification_reply | data_retrieval → reranker → schema_explorer → sql_generator → guardrails → ... → visualization → insights |
deep_analysis_workflow |
root_cause, report_generation | planner → data_retrieval → reranker → ... → retrieval_evaluator → sql_evaluator → ... → answer_evaluator |
analytical_workflow |
诊断/预测/分群/归因 | ML toolkit + Code Interpreter pipeline |
graph_reasoning |
relationship, graph_exploration | graph_rag_agent |
business_knowledge_qa |
policy_qa | data_retrieval → knowledge_qa_agent |
clarification_required |
ambiguous | data_retrieval → follow_up_agent |
路由升级规则
当 analytical_depth == "complex" 时,nl2sql_query 自动升级为 deep_analysis_workflow。
SQL Cache 快速路径
Router 在 embedding 命中缓存时,直接跳转到 guardrails_agent,跳过检索和生成步骤。
4.3 条件路由函数¶
| 函数 | 位置 | 可能返回值 |
|---|---|---|
_route_after_retrieval |
data_retrieval 后 | rerank, knowledge_qa, no_context, clarify |
_route_after_explorer |
schema_explorer 后 | resolved, no_context |
_route_after_validation |
guardrails 后 | execute, corrective_repair, repair, reject |
_route_after_result |
sql_explainer 后 | visualize, evaluate, insight, done |
_route_after_insights |
insights 后 | evaluate, done |
_route_after_sql_eval |
sql_evaluator 后 | visualize, done |
4.4 全局状态定义 (AgenticBIState)¶
AgenticBIState 是一个 TypedDict(total=False),所有字段可选:
| 字段 | 类型 | 说明 |
|---|---|---|
question |
str |
用户原始问题 |
language |
str |
语言标识 (zh/en) |
user_context |
dict |
用户上下文(roles, domains) |
session_id |
str |
会话 ID |
options |
dict |
管道选项开关 |
| 字段 | 类型 | 说明 |
|---|---|---|
routing_plan |
list[str] |
执行路径节点列表 |
current_agent |
str \| None |
当前执行节点 |
route_decision |
Literal[...] |
路由决策结果 |
execution_plan |
list[dict] |
Planner 输出的执行计划 |
plan_step_index |
int |
当前执行步骤索引 |
| 字段 | 类型 | 说明 |
|---|---|---|
short_term_memory |
list[dict] |
Working memory: 最近 N 轮对话 (LangGraph Checkpointer) |
long_term_context |
dict \| None |
Legacy 兼容字段(向 memory_context 迁移中) |
memory_context |
dict \| None |
四层记忆上下文: profile, episodic, correction (由 MemoryFacade 填充) |
| 字段 | 类型 | 说明 |
|---|---|---|
query_understanding |
dict |
QU 结构化输出 |
rewritten_question |
str |
重写后的问题 |
retrieval_results |
dict |
R/V/G 检索结果 |
graph_context |
dict |
图谱上下文 |
generated_sql |
str |
生成的 SQL |
sql_dialect |
str |
SQL 方言 |
sql_explanation |
str |
SQL 解释 |
execution_result |
dict |
执行结果 |
visualization |
dict |
可视化决策包(含 chart_type, echarts_option, template_ref, candidates, confidence, checkpoint_status, quality) |
visualization_candidates |
list[CandidateInfo] |
Top-k 候选图表列表(含模板 ID、分数、原因、限制) |
visualization_confidence |
float |
推荐置信度(0-1),低于 0.5 触发 HITL |
visualization_checkpoint_status |
CheckpointStatus |
auto_pass / needs_review / blocked |
visualization_quality |
dict |
质量评估结果(passed, suitability_score, issues) |
visualization_feedback |
dict |
用户反馈元数据(用于闭环学习) |
insights |
list[str] |
数据洞察 |
sql_business_context |
dict |
SQL 业务上下文 |
knowledge_answer |
str |
知识问答回答 |
| 字段 | 类型 | 说明 |
|---|---|---|
retrieval_quality |
dict |
检索质量评分 |
sql_quality |
dict |
SQL 质量评分 |
answer_quality |
dict |
回答质量评分 |
| 字段 | 类型 | 说明 |
|---|---|---|
tenant_id |
str |
租户 ID |
governance_overlays |
dict |
治理上下文覆盖 |
region |
str |
区域标识 |
industry_archetype |
str |
行业原型 |
| 字段 | 类型 | 说明 |
|---|---|---|
status |
Literal[...] |
管道终态 |
errors |
list[str] |
错误列表 |
sql_cache_hit |
bool |
是否命中缓存 |
repair_attempt |
int |
修复重试次数 |
clarification_message |
str |
澄清消息 |
clarification_options |
list[dict] |
澄清选项 |
5. Sub-Agent 详解¶
5.1 编排层¶
| Agent | 工厂函数 | 模型 | 说明 |
|---|---|---|---|
| Supervisor | make_supervisor_node(registry) |
embedding | 计算 question embedding,解析治理上下文,初始化 state |
| Query Understanding | make_query_understanding_node(registry) |
deepseek-v3.2 |
结构化分析意图、实体、指标、维度、时间/地域约束 |
| Router | make_router_node(sql_cache) |
— (规则) | 基于 QU 输出映射路由决策,检查 SQL Cache 快速路径 |
| Planner | make_planner_node(registry) |
deepseek-v3.2 |
复杂分析场景的多步执行计划生成 |
5.2 检索与增强¶
| Agent | 工厂函数 | 模型 | 说明 |
|---|---|---|---|
| Data Retrieval | make_data_retrieval_node(retriever) |
embedding | R/V/G 三引擎检索;从 QU 输出提取 search_terms 传入 Engine 1 |
| Reranker | make_reranker_node(registry) |
规则引擎 | 术语绑定加分/降权 + 对检索结果重排序 |
| Corrective Retrieval | make_corrective_retrieval_node(retriever) |
规则引擎 | SQL 失败后的补偿检索,优先按术语绑定路由修复 |
| Schema Explorer | make_schema_explorer_node(registry, cache) |
follow_up |
当检索上下文不足时,探索 schema 结构并提供澄清 |
5.3 SQL 管道¶
| Agent | 工厂函数 | 模型 | 说明 |
|---|---|---|---|
| SQL Generation | make_sql_generation_node(registry) |
qwen3-coder-plus |
基于 R/V/G 上下文组装 prompt,生成安全 SQL |
| Guardrails | make_guardrails_node(executor) |
规则引擎 | 五层校验:语法分析 → 策略检查 → AST 列引用校验 → 术语绑定语义校验 → EXPLAIN 执行计划 |
| SQL Executor | _make_sql_executor_node(executor, cache) |
— | 在数据面执行 SQL,支持 Redshift / pg_mooncake |
| Result Processor | _result_processor_node |
— | 结果格式化、截断、类型转换 |
| SQL Explainer | make_sql_explainer_node(registry) |
qwen3-max |
用自然语言解释 SQL 的业务含义 |
5.4 后处理与知识¶
| Agent | 工厂函数 | 模型 | 说明 |
|---|---|---|---|
| Visualization | make_visualization_node(registry) |
qwen3-max(fallback) |
推荐决策引擎(top-k recommendation + adapter render);LLM 仅用于 repair |
| Insights | make_insights_node(registry) |
qwen3-max |
从查询结果提取业务洞察 |
| Follow-up | make_followup_node(registry) |
deepseek-v3.2 |
意图模糊时生成澄清问题和选项 |
| Knowledge QA | make_knowledge_qa_node(registry) |
qwen3-max |
基于检索的业务知识问答 |
| Graph RAG | _make_graph_rag_node(pool, registry) |
graph_rag |
Apache AGE 图谱推理,探索实体关系 |
| Suggest Followup | make_suggest_followup_node(registry) |
deepseek-v3.2 |
生成推荐的后续问题 |
| Analytical | make_analytical_node(...) |
ML toolkit | 诊断/预测/分群/归因等 ML 分析任务 |
5.5 评估层¶
| Agent | 工厂函数 | 类型 | 说明 |
|---|---|---|---|
| Retrieval Evaluator | make_retrieval_evaluator_node() |
def(同步) |
评估检索召回的覆盖度、相关性 |
| SQL Evaluator | make_sql_evaluator_node() |
def(同步) |
评估 SQL 正确性、性能、安全性 |
| Answer Evaluator | make_answer_evaluator_node() |
def(同步) |
评估最终回答的完整性、准确性 |
评估器设计
评估器为纯规则引擎(def 而非 async def),无 LLM/IO 调用,确保低延迟。
仅在 deep_analysis_workflow 路径中启用。
6. Skills 与 Tools 层¶
Skills 目录:backend/app/skills/
6.1 RAG 四引擎检索¶
文件:skills/rag.py
通过 pgvector 对 PG Supernode 执行四引擎语义搜索(Engine 1+1b+2+D 并行,Engine 3 串行):
| 引擎 | 目标表 | Top-K / 阈值 |
|---|---|---|
| Engine 1 — Relational (R) | semlayer.business_term |
LLM 提取的 search_terms 逐词 ILIKE |
| Engine 1b — Keyword (R+) | semlayer.column_asset (AliasDictionary) |
字典精确匹配 |
| Engine 2 — Vector (V) | 7 张 embedding 表, cosine similarity | threshold=0.65 |
| Engine 3 — Graph (G) | Apache AGE graph traversal | depth=3 |
| Engine D — Dynamic Few-Shot | semlayer.dynamic_few_shot_cache |
top_k=3, sim≥0.95 |
Engine 1 术语匹配策略(LLM-based Query Decomposition):
Engine 1 不再使用正则表达式分词。搜索关键词来源于 Query Understanding 节点已经提取的结构化实体
(entities.terms、metric_candidates、dimension_candidates 等),由 QueryDecomposer skill 统一封装:
# skills/query_decomposer.py
from_query_understanding(qu) # 从 QU 输出提取 search terms,零额外 LLM 调用
QueryDecomposer(registry) # 独立 LLM 调用(当 QU 未运行时的 fallback)
这确保了关键词提取: - 语言无关 — LLM 天然理解中英文混合、缩写、口语化表达 - 领域无关 — 不硬编码任何停用词/分词规则 - 零额外延迟 — 正常流程复用 QU 节点已有的 LLM 输出
术语绑定注入(Term Binding Injection):
Engine 1 命中的术语如果有 mapped_asset_id(绑定到物理列/指标),会被构建为 _term_bindings 视图,
注入 retrieval_results 供后续所有节点共享:
_term_bindings = [
{"term_id": "term_price", "canonical_term": "价格",
"mapped_asset_type": "column", "mapped_asset_id": "col_txn_price"},
]
Engine D 仅返回管理员已 approve 的动态 few-shot 条目(review_status='approved'),
并要求 confidence_score ≥ 0.9 且 fail_count ≤ 3(未超过失效阈值)。
新学习到的条目进入 pending_review 状态,不会自动注入,需管理员在 Admin Panel 审批后方才生效。
6.2 SQL Executor¶
文件:skills/sql_executor.py
| Backend | 用途 | 配置 |
|---|---|---|
| Redshift | 生产环境 | TTD_DATA_PLANE_BACKEND=redshift |
| pg_mooncake | 本地开发/POC | TTD_DATA_PLANE_BACKEND=pg_mooncake |
抽象接口 DataPlaneExecutor:
execute(sql, max_rows, timeout_ms)→{columns, rows, row_count, has_more, execution_time_ms}explain(sql)→{plan, estimated_rows, estimated_cost}
6.3 其他 Skills¶
| Skill | 文件 | 功能 |
|---|---|---|
| Query Decomposer | query_decomposer.py |
LLM 驱动的搜索关键词提取(独立 skill);优先复用 QU 输出 (from_query_understanding),也可独立调用 (QueryDecomposer.extract_search_terms) |
| Reranker | reranker.py |
术语绑定感知重排序:绑定列加分(0.95)、父表提升(+0.25)、无绑定表降权(-0.10);支持 Admin 配置的 Reranker 模型 |
| Corrective Retrieval | corrective_retrieval.py |
SQL 失败后补偿检索:优先检测术语绑定违规并按绑定关系修复,其次基于错误信息通用分析 |
| Visualization | visualization.py |
推荐决策引擎(recommend → select → adapt → validate);含候选排序、置信度评估、HITL checkpoint 触发;LLM 仅保留 repair 辅助 |
| Chart Templates | chart_templates/ |
模板注册表、数据分析器、top-k 推荐引擎(matcher)、adapter 层、渲染器、主题系统 |
| Insights | insights.py |
数据洞察提取 |
| Graph RAG | graph_rag.py |
Apache AGE Cypher 查询 |
| Graph Traversal | graph_traversal.py |
图谱多跳遍历 |
| SQL Business Context | sql_business_context.py |
SQL 业务语义解释 |
| Metadata Cache | metadata_cache.py |
元数据缓存层(TTL: 24h);含 AliasDictionary 用于 Engine R+ 字典匹配 |
| Context Compactor | context_compactor.py |
上下文压缩(token budget: 3000) |
| Governance | governance.py |
治理上下文解析(tenant, region, compliance) |
| Domain Context | domain_context.py |
业务域上下文注入 |
| Column Mapping | column_mapping.py |
列名映射与消歧 |
| SQL Column Validator | sql_column_validator.py |
AST 级列引用静态校验(防幻觉);含 _suggest_closest_column() 模糊推荐;支持 CTE 虚拟列、SELECT alias、* 通配、函数参数、LATERAL JOIN、WITH RECURSIVE;最大递归深度 15 |
| Error Handler | error_handler.py |
管道错误统一处理 |
7. 记忆系统¶
graph LR
subgraph "Personalized Memory (per-user isolated)"
W[Working Memory<br/>LangGraph Checkpointer] --> P[Profile Memory<br/>PostgreSQL]
P --> E[Episodic Memory<br/>PostgreSQL]
E --> C[Correction Memory<br/>PostgreSQL]
end
subgraph "Shared Optimization (not user memory)"
SC[SQL Cache<br/>pgvector]
FS[Dynamic Few-Shot]
MC[Metadata Cache]
end
Q[用户问题] --> W
W -->|会话上下文| Pipeline
P -->|用户偏好/默认值| Pipeline
E -->|历史交互摘要| Pipeline
C -->|已接受纠正| Pipeline
SC -->|语义匹配 SQL| Router
7.1 四层记忆架构 (Four-Layer Memory)¶
| 层 | 存储 | 生命周期 | 用途 |
|---|---|---|---|
| Working | LangGraph Checkpointer (PostgreSQL) | 请求/会话作用域 | 当前对话上下文,最近 N 轮,Context Compaction |
| Profile | ttd.ttd_memory_records (kind=profile) |
长期 (365 天 TTL) | 稳定用户偏好:常用指标、偏好维度、图表风格 |
| Episodic | ttd.ttd_memory_records (kind=episodic) |
中期 (90 天 TTL, 衰减) | 成功交互摘要、已接受澄清、任务成果 |
| Correction | ttd.ttd_memory_records (kind=correction) |
中长期 (180 天 TTL) | 用户批准的纠正:术语消歧、SQL 修复模式 |
7.2 Working Memory (短期上下文)¶
| 属性 | 值 |
|---|---|
| 存储 | LangGraph Checkpointer (PostgreSQL) |
| 窗口 | 最近 5 轮(TTD_SHORT_TERM_WINDOW=5) |
| 格式 | list[dict],每轮包含 user/assistant message |
| 压缩 | Context Compactor(token budget 3000,最近 3 轮原样保留) |
7.3 Profile / Episodic / Correction Memory¶
通过统一的 MemoryFacade 访问,支持路由感知的预算控制和可观测性集成:
# app/memory/facade.py
class MemoryFacade:
async def retrieve(self, *, user_id, query, route_decision, ...) -> MemoryContext
async def write_candidate(self, *, user_id, kind, summary_text, ...) -> str | None
async def upsert(self, *, memory_id, user_id, ...) -> bool
async def suppress(self, *, memory_id, user_id, ...) -> bool
async def delete_for_user(self, *, user_id, ...) -> int
路由预算 (Route Budgets):
| Route | Profile | Episodic | Correction |
|---|---|---|---|
nl2sql_query |
5 | 2 | 5 |
kpi_lookup |
2 | 0 | 1 |
business_knowledge_qa |
3 | 1 | 1 |
deep_analysis_workflow |
8 | 8 | 4 |
7.4 记忆形成 (Memory Formation)¶
记忆候选项从已持久化的 turn 中提取,而非内联于生成过程:
# app/memory/formation.py
class MemoryFormationService:
async def process_turn(self, *, user_id, turn_id, ...) -> list[str]
async def process_feedback(self, *, user_id, rating, ...) -> list[str]
高信号写入策略:仅在有明确偏好信号、被接受的澄清、重复选择模式、或评估器确认的成功交互时写入。
7.5 SQL Cache(共享优化层)¶
| 属性 | 值 |
|---|---|
| 存储 | semlayer.sql_cache 表 (pgvector) |
| 匹配 | cosine similarity > 0.92(TTD_SQL_CACHE_SIMILARITY) |
| TTL | 15 分钟(TTD_SQL_CACHE_TTL_MINUTES) |
| 缓存内容 | 仅 SQL 文本,不缓存执行结果 |
数据新鲜度
SQL Cache 仅缓存生成的 SQL 语句。执行结果始终从数据面实时获取, 确保返回最新数据。
7.6 动态 Few-Shot 缓存 (Dynamic Few-Shot Cache)¶
文件:app/memory/dynamic_few_shot.py
从 SQL 修复成功案例中自动学习 {question, final_sql} 对,作为实时 few-shot 示例补充手工策划的 semantic-plane/few_shots/ 资产。
质量控制工作流(三步审批):
record_success() → review_status = 'pending_review'
(新条目不参与 RAG,等待审核)
↓
Admin 审批 (/admin/few-shots) → approve / reject
↓
search_similar() → 仅返回 approved + sim ≥ 0.95 的条目
| 属性 | 值 |
|---|---|
| 存储 | semlayer.dynamic_few_shot_cache 表 (pgvector + review 状态字段) |
| RAG 注入条件 | review_status='approved' AND similarity ≥ 0.95 AND confidence_score ≥ 0.9 AND fail_count ≤ 3 |
| 自动过期 | 30 天(expires_at = NOW() + INTERVAL '30 days'),或 fail_count > 3 |
| YAML 导出 | 审批通过的条目可导出为与 semantic-plane/few_shots/ 一致的 YAML 格式,便于晋升为正式资产 |
关键方法:
class DynamicFewShotCache:
async def record_success(question, final_sql, *, intent_type, grounded_tables,
question_embedding, confidence) -> None
# 新条目进入 pending_review;已有条目的 review_status 保持不变
async def search_similar(question_embedding, *, top_k=3,
min_confidence=0.9, min_similarity=0.95) -> list[dict]
# 仅返回 approved 条目,供 RAG Engine D 使用
async def approve(entry_id, reviewer) -> bool
async def reject(entry_id, reviewer) -> bool
async def list_by_status(review_status, *, limit, offset) -> tuple[list, int]
async def get_stats() -> dict # pending / approved / rejected 各计数
async def mark_exported(entry_ids) -> int
Admin API 端点(/api/v1/admin/few-shots/):
| 方法 | 路径 | 说明 |
|---|---|---|
GET |
/stats |
各状态数量统计 |
GET |
/ |
按状态分页列表 |
PATCH |
/{id}/review |
审批(approve / reject) |
GET |
/{id}/export |
导出单条 YAML |
GET |
/export/bulk |
批量导出全部 approved 条目为 YAML |
8. 安全架构¶
graph TD
Request[API 请求] --> JWKS[Better Auth JWKS 验签]
JWKS --> Guard[Content Safety Guard]
Guard --> Policy[Policy Enforcement]
Policy --> Pipeline[AI Pipeline]
Pipeline --> RLS[Row-Level Security]
RLS --> Response[响应]
JWKS -->|无 Token + Debug| DevBypass[Dev Mode Bypass]
DevBypass --> Guard
JWKS -->|无 Token + Prod| Reject[401 Unauthorized]
8.1 认证 — Better Auth JWKS 验签¶
文件:app/security/auth.py
TTD Backend 不再维护独立的用户认证逻辑。JWT 由 Next.js 侧的 Better Auth 签发(RS256),Backend 通过 JWKS 公钥本地验签。
| 配置 | 默认值 | 说明 |
|---|---|---|
TTD_BETTER_AUTH_URL |
http://localhost:3000 |
Better Auth 基础 URL(JWKS 端点来源) |
TTD_BETTER_AUTH_ISSUER |
http://localhost:3000 |
JWT iss 校验值 |
TTD_JWT_AUDIENCE |
talk-to-data |
JWT aud 校验值 |
TTD_JWKS_CACHE_TTL_SECONDS |
3600 |
JWKS 公钥缓存时间 |
验证流程:
# PyJWKClient 从 Better Auth JWKS 端点获取公钥
jwks_url = f"{settings.better_auth_url}/api/auth/jwks"
jwks_client = PyJWKClient(jwks_url, cache_jwk_set=True, lifespan=3600)
signing_key = jwks_client.get_signing_key_from_jwt(token)
payload = jwt.decode(
token, signing_key.key,
algorithms=["RS256", "ES256"],
audience=settings.jwt_audience,
issuer=settings.better_auth_issuer,
)
JWT Claims → UserContext 映射:
UserContext(
user_id=payload["sub"], # Better Auth user.id
roles=_extract_roles(payload), # "role" (str) + "roles" (list)
allowed_domains=payload.get("allowed_domains", []),
)
| Claim | 映射 | 说明 |
|---|---|---|
sub |
UserContext.user_id |
Better Auth user.id,与 Chat/Feedback 中的 user_id 一致 |
role / roles |
UserContext.roles |
支持单值 (admin plugin) 和列表形式 |
allowed_domains |
UserContext.allowed_domains |
Phase 1 为空集合 |
Admin 路由保护:
# app/api/admin/router.py
admin_router = APIRouter(prefix="/admin", dependencies=[Depends(require_admin)])
require_admin 检查 UserContext.roles 是否包含 "admin",否则返回 403 Forbidden。
开发模式绕过: 当 TTD_DEBUG=true 且请求无 Bearer Token 时,自动注入:
迁移期 HS256 回退
在 TTD_DEBUG=true 时,若 JWKS 验证失败,系统尝试以旧 TTD_JWT_SECRET (HS256) 验证 Token。
此回退仅用于本地开发过渡,将在迁移完成后移除。
8.2 Content Safety Guard¶
文件:app/security/guard.py
拦截恶意输入的两类模式:
8.3 Policy Enforcement¶
文件:app/security/policy.py
基于 UserContext.allowed_domains 和 llm_exposure_policy 进行:
- Domain 过滤:只保留用户有权限的业务域资产
- PII 过滤:
llm_exposure_policy=hidden的列对 LLM 不可见 - Column Masking:
llm_exposure_policy=masked的列仅提供脱敏样本
8.4 SQL Guardrails(五层校验)¶
| 层 | 检查项 | 拒绝条件 | 自愈策略 |
|---|---|---|---|
| Layer 1 — 语法层 | SQL 解析、多语句检测 | 非 SELECT / 多条语句 | 直接拒绝 |
| Layer 2a — 策略层 | 禁止关键词、LIMIT 检查 | 含 DML 关键词 | 直接拒绝 |
| Layer 2c — 列校验层 | AST 列引用静态校验(sql_column_validator.py) |
引用不存在的列 | 有唯一候选时自动替换;无候选时携带可用列清单触发 corrective_retrieval |
| Layer 2d — 术语绑定校验 | 术语绑定语义校验(_validate_term_bindings()) |
绑定列被引用到错误的表 | 携带正确的 table.column 信息触发 corrective_retrieval |
| Layer 3 — 执行计划层 | EXPLAIN 分析 | 预估行数 > 10M / cost > 100K | 降级重写或拒绝 |
Layer 2d 术语绑定校验详细说明:
- 仅在
retrieval_results包含_term_bindings时激活 - 对每个绑定的列,使用正则检查 SQL 中是否出现
wrong_table.column模式 - 例如:术语"价格"绑定到
fact_transaction.price,但 SQL 写了dim_article.price,则拦截 - 返回
Term binding violation错误,包含正确的表和列名,供 Corrective Retrieval 精准修复 - 不影响未绑定的列——仅对有明确
mapped_asset_id的术语生效
Layer 2c 列校验详细说明:
- 使用
sqlparse进行纯内存 AST 解析,延迟 < 5ms - 构建白名单:从
retrieval_results中提取所有可见列(含alias_terms) - 边界情况处理:CTE 虚拟列、SELECT alias、
*通配、函数参数、LATERAL JOIN、WITH RECURSIVE、最大递归深度 15 层 - 自动列名修正(
_sanitize_column_names()):alias_terms 精确匹配 > difflib 0.8+ 唯一候选 > 不替换(保守策略) - 每次自动修正写入 structlog 审计日志
validation_result 新增字段:
| 字段 | 类型 | 说明 |
|---|---|---|
column_check_pass |
bool |
列校验是否通过 |
column_errors |
list[str] |
未通过时的列错误详情 |
column_check_duration_ms |
float |
列校验耗时(ms) |
8.5 Rate Limiting¶
| 配置 | 默认值 | 说明 |
|---|---|---|
TTD_RATE_LIMIT_PER_USER |
60 | 每用户每分钟请求数 |
TTD_BURST_LIMIT |
100 | 突发允许量 |
9. 多模型注册表¶
文件:app/models/registry.py
9.1 两层配置架构¶
graph TD
Task[任务类型] --> Registry[ModelRegistry]
Registry --> Check{DB 有配置?}
Check -->|是| DB[Admin DB 模型]
Check -->|否| Code[代码默认配置]
DB --> LLM[LangChain Model]
Code --> LLM
- 代码默认 (
_DEFAULT_TASK_MODEL_MAP):硬编码 task → model-id 映射 - Admin DB 覆盖:通过
/admin/llms/*API 配置的模型优先级更高
9.2 Task 路由映射¶
| Task | 默认模型 | 说明 |
|---|---|---|
supervisor |
deepseek-v3.2 |
Supervisor 节点 |
intent_parsing |
deepseek-v3.2 |
意图解析 |
query_understanding |
deepseek-v3.2 |
查询理解 |
planner |
deepseek-v3.2 |
执行计划 |
sql_generation |
qwen3-coder-plus |
SQL 生成 |
sql_repair |
qwen3-coder-plus |
SQL 修复 |
insights |
qwen3-max |
洞察提取 |
visualization |
qwen3-max |
可视化修复辅助(仅 repair fallback 场景,95%+ 由推荐引擎处理) |
sql_explanation |
qwen3-max |
SQL 解释 |
follow_up |
deepseek-v3.2 |
跟进澄清 |
knowledge_qa |
qwen3-max |
知识问答 |
9.3 Fallback 链¶
_DEFAULT_FALLBACK_CHAIN = {
"deepseek-v3.2": ["qwen3-max", "qwen3-coder-plus"],
"qwen3-max": ["deepseek-v3.2", "qwen3-coder-plus"],
"qwen3-coder-plus": ["qwen3-max", "deepseek-v3.2"],
}
当主模型不可用时,按顺序尝试 fallback 模型。
9.4 内置模型配置¶
| Model ID | Provider | Model Name | API |
|---|---|---|---|
deepseek-v3.2 |
openai (compatible) | deepseek-v3 |
DashScope |
qwen3-max |
openai (compatible) | qwen3-max |
DashScope |
qwen3-coder-plus |
openai (compatible) | qwen3-coder-plus |
DashScope |
claude-4.5-sonnet |
anthropic | claude-sonnet-4-20250514 |
Anthropic |
10. Prompt 管理¶
10.1 架构¶
graph LR
DB[(semlayer.prompt_templates)] --> Manager[PromptManager]
Manager --> Cache[In-Process Cache]
Cache --> Assembler[PromptAssembler]
Assembler --> LLM[LLM 调用]
10.2 PromptManager¶
文件:app/prompt/manager.py
- 存储:PostgreSQL
semlayer.prompt_templates表 - 版本控制:每条模板有
version和is_active字段 - A/B 测试:支持
ab_group参数选择不同模板变体 - 缓存:进程内 dict 缓存,支持按
template_id失效
class PromptManager:
async def get_template(self, template_id, *, ab_group=None) -> str | None
@staticmethod
def invalidate_cache(template_id=None) -> None
10.3 PromptAssembler¶
文件:app/prompt/assembler.py
将 R/V/G 检索结果组装为结构化 prompt:
- 注入 SQL 方言指导(Redshift / PostgreSQL 差异)
- 注入表/列白名单(防止 LLM 幻觉表名)
- 注入 JOIN 规则、指标定义、业务规则
- 注入术语绑定约束(
_build_term_binding_context()):当检索到绑定术语时,生成强制指令如「价格」→ 必须使用 gold_hm.fact_transaction.price - 注入性能约束(LIMIT、CTE 模式要求)
- Jinja2 模板渲染
11. 会话管理¶
文件:app/session/manager.py
11.1 数据模型¶
| 表 | Schema | 说明 |
|---|---|---|
ttd.ttd_sessions |
ttd | 会话主表 |
ttd.ttd_turns |
ttd | 对话轮次 |
11.2 SessionManager API¶
class SessionManager:
async def create_session(user_id, *, ttl_hours=24) -> str
async def get_session(session_id) -> dict | None
async def rename_session(session_id, title) -> bool
async def list_sessions(user_id, *, limit=8) -> list[dict]
async def cleanup_expired() -> int
11.3 TTL 与清理¶
| 配置 | 默认值 | 说明 |
|---|---|---|
TTD_SESSION_TTL_HOURS |
24 | 会话过期时间 |
TTD_SESSION_MAX_TURNS |
20 | 单会话最大轮次 |
TTD_CLEANUP_INTERVAL_MINUTES |
60 | 清理循环间隔 |
后台 asyncio.Task 定期执行过期会话清理。
11.4 Auto-Title¶
新对话的标题自动取自第一条用户消息(通过 LATERAL JOIN 查询)。
12. 数据库模型¶
所有 ORM 模型定义在 app/db/models.py,使用 SQLAlchemy 2.0 Mapped Column 语法。
Schema:ttd(与 semlayer 语义层 schema 分离)
12.1 核心业务模型¶
| 模型 | 表名 | 说明 |
|---|---|---|
Chat |
ttd_chats |
对话(含 user_id, title, engine_name, status) |
ChatMessage |
ttd_chat_messages |
消息(含完整管道输出字段) |
Feedback |
ttd_feedback |
用户反馈(rating, comment, correction_sql) |
12.2 模型管理¶
| 模型 | 表名 | 说明 |
|---|---|---|
LLMModel |
ttd_llm_models |
LLM 模型配置 |
EmbeddingModel |
ttd_embedding_models |
Embedding 模型配置 |
RerankerModel |
ttd_reranker_models |
Reranker 模型配置 |
ChatEngine |
ttd_chat_engines |
聊天引擎配置(关联 LLM + KB) |
12.3 知识库系统¶
| 模型 | 表名 | 说明 |
|---|---|---|
KnowledgeBase |
ttd_knowledge_bases |
知识库 |
DataSource |
ttd_data_sources |
数据源(file/web/sitemap/semantic_plane) |
KBDataSource |
ttd_kb_datasources |
KB-DS 关联表 (M:N) |
Document |
ttd_documents |
文档 |
Chunk |
ttd_chunks |
文本块(含 embedding) |
KGEntity |
ttd_kg_entities |
知识图谱实体 |
KGRelationship |
ttd_kg_relationships |
知识图谱关系 |
12.4 评估系统¶
| 模型 | 表名 | 说明 |
|---|---|---|
EvaluationDataset |
ttd_evaluation_datasets |
评估数据集 |
EvaluationTask |
ttd_evaluation_tasks |
评估任务 |
EvaluationTaskItem |
ttd_evaluation_task_items |
评估任务项 |
12.5 系统管理¶
| 模型 | 表名 | 说明 |
|---|---|---|
Upload |
ttd_uploads |
文件上传记录 |
SiteSettings |
ttd_site_settings |
全局站点设置 |
ApiKey |
ttd_api_keys |
API 密钥管理 |
12.6 Enum 类型¶
| Enum | 值 | 用途 |
|---|---|---|
DocIndexStatus |
pending, running, completed, failed | 文档索引状态 |
KGIndexStatus |
not_started, pending, running, completed, failed | KG 索引状态 |
DataSourceType |
file, web_single_page, web_sitemap, semantic_plane | 数据源类型 |
EvaluationStatus |
not_start, evaluating, done, error, cancel | 评估状态 |
FeedbackType |
like, dislike | 反馈类型 |
ChatVisibility |
public, private | 对话可见性 |
13. 审计与可观测性¶
13.1 ObservabilityFacade¶
文件:app/observability/facade.py
ObservabilityFacade 是系统可观测性的统一网关,将 Langfuse(追踪与评分)、AutoMQ(事件流)和 structlog(结构化日志)整合为单一 API。所有 API 请求通过一个 trace 贯穿完整生命周期。
sequenceDiagram
participant R as API Request
participant F as ObservabilityFacade
participant L as Langfuse
participant A as AutoMQ
participant S as structlog
R->>F: start_trace(session_id, user_id)
F->>L: create trace
F->>S: log trace_start
R->>F: start_span("sql_generation")
F->>L: create span under trace
R->>F: emit_event(SESSION_TURN_PERSISTED)
F->>A: publish to topic
R->>F: record_score("sql_quality", 0.92)
F->>L: attach score to trace
R->>F: finalize_trace(status="success")
F->>L: close trace
F->>S: log trace_end
核心方法¶
| 方法 | 说明 |
|---|---|
start_trace(session_id, user_id, ...) |
创建请求级 trace |
finalize_trace(trace_id, status, ...) |
关闭 trace,汇总 token 与状态 |
start_span(trace_id, name, ...) |
创建子 span(Agent 步骤、LLM 调用等) |
end_span(span_id, output, ...) |
关闭 span |
record_score(trace_id, name, value, ...) |
记录评分(评估器/用户反馈) |
emit_event(event_type, payload, ...) |
发送事件至 AutoMQ |
get_langchain_handler(trace_id) |
获取 LangChain callback handler |
记忆事件 (Memory Events)¶
记忆系统(working / profile / episodic / correction 四层)的所有操作均为 first-class 事件,自动关联 trace_id / session_id / turn_id:
memory.retrieve.completed— 检索完成memory.rank.completed— 排序完成memory.write.candidate— 产生候选写入memory.upsert.completed— 写入/更新完成memory.suppress.completed— 记忆抑制memory.expire.completed— 过期清理memory.delete.completed— 显式删除
相关模块:app/observability/events.py(EventType 枚举)、app/observability/spans.py(observe_span 装饰器)、app/observability/automq_exporter.py(AutoMQ 导出)、app/observability/langfuse_adapter.py(Langfuse 适配)、app/observability/datasets.py(评估数据集管理)。
13.2 结构化审计日志¶
文件:app/audit/logger.py
每次 Supervisor 调用产生一条完整审计记录。记忆生命周期数据现通过 ObservabilityFacade 事件流采集(而非直接写入审计日志),审计 JSON 中的 memory 字段仅保留摘要引用:
{
"trace_id": "uuid",
"session_id": "...",
"user_id": "...",
"timestamp": 1700000000.0,
"duration_ms": 1234,
"input": {"question": "...", "language": "zh"},
"orchestration": {"route_decision": "nl2sql_query"},
"intent": {...},
"retrieval": {...},
"generation": {"sql": "...", "model": "qwen3-coder-plus"},
"validation": {"passed": true},
"execution": {"rows": 42, "time_ms": 150},
"memory": {"cache_hit": false, "events_emitted": ["memory.retrieve.completed", "memory.upsert.completed"]},
"models_used": [
{"model": "deepseek-v3.2", "stage": "qu", "input_tokens": 500, "output_tokens": 200}
],
"response": {"status": "success"},
"errors": []
}
完整记忆操作详情可通过 Langfuse trace_id 或 AutoMQ 事件流查询,审计日志仅记录事件类型摘要以控制体积。
13.3 Prometheus 指标¶
当 TTD_PROMETHEUS_ENABLED=true 时,挂载 /metrics 端点。
所有指标包含 instance_id label 以支持多实例监控。
| 指标 | 类型 | 说明 |
|---|---|---|
ttd_pg_pool_size |
Gauge | 连接池状态(idle/used) |
ttd_pg_pool_waiting |
Gauge | 等待连接的请求数 |
ttd_active_graph_invocations |
Gauge | 正在执行的 LangGraph 调用数 |
ttd_agent_pool_size |
Gauge | 在线 Agent Runtime 实例数 |
ttd_sessions_per_instance |
Gauge | 本实例活跃 session 数 |
ttd_inflight_invocations |
Gauge | 本实例 in-flight 调用数 |
ttd_advisory_lock_wait_seconds |
Histogram | PG Advisory Lock 等待耗时 |
ttd_llm_request_duration_seconds |
Histogram | LLM API 调用延迟 |
ttd_pipeline_duration_seconds |
Histogram | 端到端管道执行时间 |
ttd_sql_execution_duration_seconds |
Histogram | 数据面 SQL 执行时间 |
ttd_requests_total |
Counter | API 请求总量 |
ttd_graph_errors_total |
Counter | 图执行错误总量 |
13.4 健康检查¶
GET /api/v1/health 返回服务状态,适用于 K8s liveness/readiness probe。
13.5 错误 Sanitization¶
文件:app/errors.py
技术错误自动转为用户友好的中文消息:
| 技术错误 | 用户可见消息 |
|---|---|
current transaction is aborted |
SQL 执行异常,系统正在自动恢复,请重试 |
statement timeout |
查询执行超时,请尝试缩小查询范围 |
relation XXX does not exist |
查询引用了不存在的表,请换一种方式提问 |
permission denied |
没有权限访问该数据,请联系管理员 |
Model XXX unavailable |
AI 模型服务暂时不可用,请稍后重试 |
14. 配置参考¶
所有环境变量使用 TTD_ 前缀,通过 app/config.py 的 Pydantic Settings 管理。
14.1 服务器¶
| 变量 | 类型 | 默认值 | 说明 |
|---|---|---|---|
TTD_HOST |
str | 0.0.0.0 |
绑定地址 |
TTD_PORT |
int | 8000 |
服务端口 |
TTD_WORKERS |
int | 1 |
Uvicorn worker 数(多实例时固定为 1) |
TTD_DEBUG |
bool | false |
调试模式(自动 reload + 免 JWT) |
14.1a Runtime Pool (Docker Swarm)¶
| 变量 | 类型 | 默认值 | 说明 |
|---|---|---|---|
TTD_SWARM_MODE |
bool | false |
启用 Agent Runtime Pool 模式 |
TTD_INSTANCE_ID |
str | auto-generated | 实例唯一标识(Swarm 自动取 HOSTNAME) |
TTD_HEARTBEAT_INTERVAL |
int | 10 |
心跳间隔(秒) |
TTD_HEARTBEAT_STALE_TIMEOUT |
int | 30 |
多久无心跳标记实例为 offline(秒) |
TTD_DRAIN_TIMEOUT |
int | 120 |
优雅关闭时等待 in-flight 完成的超时(秒) |
TTD_ADVISORY_LOCK_TIMEOUT_MS |
int | 30000 |
PG Advisory Lock 获取超时(毫秒) |
部署模式
- 单实例开发:
TTD_SWARM_MODE=false(默认),无需 HAProxy,task up启动 - 多实例生产:
TTD_SWARM_MODE=true,通过 Docker Swarm + HAProxy 部署, 使用task ha:up一键启动(等价于task deploy:swarm:deploy)
14.2 数据库¶
| 变量 | 类型 | 默认值 | 说明 |
|---|---|---|---|
TTD_AURORA_DSN |
str | postgresql://postgres:postgres@localhost:5435/ttd_db |
PG Supernode 连接串 |
TTD_PG_POOL_MIN |
int | 5 |
连接池最小连接数(per instance) |
TTD_PG_POOL_MAX |
int | 20 |
连接池最大连接数(per instance) |
14.3 数据面¶
| 变量 | 类型 | 默认值 | 说明 |
|---|---|---|---|
TTD_DATA_PLANE_BACKEND |
str | redshift |
数据面后端 (redshift / pg_mooncake) |
TTD_REDSHIFT_HOST |
str | localhost |
Redshift 主机 |
TTD_REDSHIFT_PORT |
int | 5439 |
Redshift 端口 |
TTD_REDSHIFT_DATABASE |
str | ttd_gold |
Redshift 数据库 |
TTD_REDSHIFT_USER |
str | ttd_service |
Redshift 用户 |
TTD_REDSHIFT_PASSWORD |
str | — | Redshift 密码 |
TTD_REDSHIFT_TIMEOUT_MS |
int | 30000 |
Redshift 查询超时 |
TTD_PG_MOONCAKE_DSN |
str | postgresql://...@localhost:5436/ttd_lake |
pg_mooncake 连接串 |
14.4 模型¶
| 变量 | 类型 | 默认值 | 说明 |
|---|---|---|---|
TTD_ANTHROPIC_API_KEY |
str | — | Anthropic API Key |
TTD_DASHSCOPE_API_KEY |
str | — | DashScope API Key |
TTD_DASHSCOPE_BASE_URL |
str | https://dashscope.aliyuncs.com/compatible-mode/v1 |
DashScope Base URL |
TTD_SUPERVISOR_MODEL |
str | deepseek-v3.2 |
Supervisor 模型 |
TTD_SQL_GENERATION_MODEL |
str | qwen3-coder-plus |
SQL 生成模型 |
TTD_INSIGHTS_MODEL |
str | qwen3-max |
Insights 模型 |
TTD_VISUALIZATION_MODEL |
str | qwen3-max |
Visualization 模型 |
TTD_FOLLOWUP_MODEL |
str | deepseek-v3.2 |
Follow-up 模型 |
TTD_EMBEDDING_MODEL |
str | text-embedding-v4 |
Embedding 模型 |
14.5 认证与 API¶
| 变量 | 类型 | 默认值 | 说明 |
|---|---|---|---|
TTD_BETTER_AUTH_URL |
str | http://localhost:3000 |
Better Auth 基础 URL (JWKS 端点来源) |
TTD_BETTER_AUTH_ISSUER |
str | http://localhost:3000 |
JWT issuer 校验值 |
TTD_JWKS_CACHE_TTL_SECONDS |
int | 3600 |
JWKS 公钥缓存时间(秒) |
TTD_JWT_AUDIENCE |
str | talk-to-data |
JWT 受众校验值 |
TTD_JWT_SECRET |
str | change-me-in-production |
旧 HS256 密钥(迁移期回退,将移除) |
TTD_JWT_ALGORITHM |
str | HS256 |
旧 JWT 算法(迁移期回退,将移除) |
TTD_RATE_LIMIT_PER_USER |
int | 60 |
每用户请求限制 |
TTD_BURST_LIMIT |
int | 100 |
突发限制 |
TTD_MAX_QUESTION_LENGTH |
int | 2000 |
问题最大长度 |
TTD_CORS_ORIGINS |
list | ["*"] |
CORS 允许源(生产环境设为 Better Auth URL) |
14.6 会话¶
| 变量 | 类型 | 默认值 | 说明 |
|---|---|---|---|
TTD_SESSION_TTL_HOURS |
int | 24 |
会话 TTL |
TTD_SESSION_MAX_TURNS |
int | 20 |
最大轮次 |
TTD_CONTEXT_WINDOW |
int | 5 |
上下文窗口 |
TTD_CLEANUP_INTERVAL_MINUTES |
int | 60 |
清理间隔 |
14.7 检索¶
| 变量 | 类型 | 默认值 | 说明 |
|---|---|---|---|
TTD_TOP_K_TABLES |
int | 5 |
表检索 Top-K |
TTD_TOP_K_COLUMNS |
int | 20 |
列检索 Top-K |
TTD_TOP_K_METRICS |
int | 5 |
指标检索 Top-K |
TTD_TOP_K_TERMS |
int | 10 |
术语检索 Top-K |
TTD_TOP_K_FEW_SHOTS |
int | 3 |
Few-shot Top-K |
TTD_SIMILARITY_THRESHOLD |
float | 0.65 |
相似度阈值 |
TTD_GRAPH_TRAVERSAL_DEPTH |
int | 3 |
图遍历深度 |
14.8 执行¶
| 变量 | 类型 | 默认值 | 说明 |
|---|---|---|---|
TTD_STATEMENT_TIMEOUT_MS |
int | 30000 |
SQL 超时 |
TTD_MAX_ROWS |
int | 1000 |
最大返回行数 |
TTD_MAX_SCANNED_BYTES |
int | 1073741824 |
最大扫描字节 (1GB) |
14.9 记忆¶
| 变量 | 类型 | 默认值 | 说明 |
|---|---|---|---|
TTD_SHORT_TERM_WINDOW |
int | 5 |
短期记忆窗口 |
TTD_CONTEXT_COMPACTION_TOKEN_BUDGET |
int | 3000 |
上下文压缩 token 预算 |
TTD_CONTEXT_COMPACTION_RECENT_WINDOW |
int | 3 |
保留原样的最近轮数 |
TTD_SQL_CACHE_SIMILARITY |
float | 0.92 |
SQL 缓存匹配阈值 |
TTD_SQL_CACHE_TTL_MINUTES |
int | 15 |
SQL 缓存 TTL |
TTD_METADATA_CACHE_TTL_SECONDS |
int | 86400 |
元数据缓存 TTL (24h) |
TTD_SQL_EXECUTE_CACHE_TTL_SECONDS |
int | 86400 |
SQL 执行缓存 TTL |
14.10 修复与校验¶
| 变量 | 类型 | 默认值 | 说明 |
|---|---|---|---|
TTD_MAX_REPAIR_RETRIES |
int | 1 |
SQL 修复最大重试次数 |
TTD_CIRCUIT_BREAKER_THRESHOLD |
int | 3 |
熔断器阈值 |
TTD_MAX_ESTIMATED_ROWS |
int | 10000000 |
EXPLAIN 预估行数上限 |
TTD_MAX_ESTIMATED_COST |
int | 100000 |
EXPLAIN 预估成本上限 |
TTD_FORBIDDEN_KEYWORDS |
list | [INSERT, UPDATE, DELETE, ...] |
禁止的 SQL 关键词 |
14.11 审计与监控¶
| 变量 | 类型 | 默认值 | 说明 |
|---|---|---|---|
TTD_AUDIT_LOG_LEVEL |
str | INFO |
审计日志级别 |
TTD_AUDIT_ARCHIVE_BUCKET |
str | ttd-audit-archive |
归档 S3 桶 |
TTD_AUDIT_ARCHIVE_AFTER_DAYS |
int | 90 |
归档天数 |
TTD_PROMETHEUS_ENABLED |
bool | true |
是否启用 Prometheus |
TTD_METRICS_PATH |
str | /metrics |
指标端点路径 |
15. 本地开发¶
15.1 环境准备¶
15.2 Docker Compose¶
# 启动本地依赖(PostgreSQL + pg_mooncake)
docker compose up -d
# 环境变量自动设置:
# TTD_DATA_PLANE_BACKEND=pg_mooncake
# TTD_AURORA_DSN=postgresql://postgres:postgres@postgres:5435/ttd_db
# TTD_PG_MOONCAKE_DSN=postgresql://postgres:postgres@pg-mooncake:5432/postgres
15.3 数据库迁移 (Alembic)¶
# 运行所有 pending 迁移
task be:db:migrate # 或 cd backend && uv run alembic upgrade head
# 回滚最近一次迁移
task be:db:rollback
# 创建新迁移
task be:db:revision MESSAGE="add_new_table"
# 查看当前版本
task be:db:current
15.4 开发服务器¶
# 启动(debug 模式,自动 reload,免 JWT)
task be:dev # 或 cd backend && uv run ttd-app
# 服务启动后:
# - API: http://localhost:8000/api/v1
# - Docs: http://localhost:8000/docs (Swagger)
# - Metrics: http://localhost:8000/metrics
15.5 Task 命令参考¶
| 命令 | 说明 |
|---|---|
task be:dev |
启动开发服务器 |
task be:lint |
运行 ruff linter |
task be:lint:fix |
自动修复 lint 问题 |
task be:format |
格式化代码 |
task be:typecheck |
类型检查 (ty) |
task be:test |
运行 pytest |
task be:test:cov |
运行测试 + 覆盖率 |
task be:build:docker |
构建 Backend Docker 镜像 |
task up |
启动所有服务 (data-plane + backend + web) |
task down |
停止所有服务 |
task be:ci |
完整 CI 检查(lint + typecheck + test) |
15.6 项目结构¶
backend/
├── app/
│ ├── main.py # FastAPI 入口 + lifespan + instance registration
│ ├── config.py # Pydantic Settings(TTD_* 环境变量)
│ ├── deps.py # FastAPI 依赖注入 + distributed lock
│ ├── errors.py # 异常层次 + 错误 sanitization
│ ├── schemas.py # API 请求/响应 Pydantic models
│ ├── agents/
│ │ ├── builder.py # build_supervisor_graph() 图构建
│ │ ├── state.py # AgenticBIState TypedDict
│ │ ├── supervisor.py # Supervisor 节点
│ │ ├── embedding.py # Embedding 工具函数
│ │ ├── sub_agents/ # 所有 Sub-agent 实现
│ │ │ ├── query_understanding.py
│ │ │ ├── router.py
│ │ │ ├── planner.py
│ │ │ ├── data_retrieval.py
│ │ │ ├── sql_generation.py
│ │ │ ├── guardrails.py
│ │ │ ├── visualization.py
│ │ │ ├── insights.py
│ │ │ ├── followup.py
│ │ │ ├── knowledge_qa.py
│ │ │ ├── schema_explorer.py
│ │ │ ├── sql_explainer.py
│ │ │ ├── suggest_followup.py
│ │ │ └── analytical.py
│ │ └── evaluators/ # 评估器
│ ├── runtime/ # Agent Runtime Pool 管理
│ │ ├── __init__.py # 模块入口
│ │ ├── registry.py # 实例注册 + heartbeat + stale detection
│ │ └── lifecycle.py # Graceful shutdown + drain
│ ├── api/
│ │ ├── router.py # 路由注册 + limiter
│ │ ├── chats.py # Chat BI API (PG advisory lock)
│ │ ├── query.py # Legacy query
│ │ ├── stream.py # Legacy SSE
│ │ ├── sql_execute.py # Direct SQL
│ │ ├── health.py # Health check
│ │ ├── sessions.py # Sessions
│ │ ├── feedback.py # Feedback
│ │ └── admin/ # Admin APIs (20+ files)
│ ├── skills/ # AI Pipeline 技能
│ │ ├── query_decomposer.py # LLM 驱动搜索关键词提取 + QU 复用
│ │ ├── chart_templates/ # 推荐决策引擎 + ECharts Adapter
│ │ │ ├── adapters/ # Renderer adapter 层 (base, echarts_adapter)
│ │ │ ├── matcher.py # Top-k recommendation engine
│ │ │ └── recommendation.py # Headless decision layer
│ │ └── visualization.py # 主入口 (recommend → adapt → validate)
│ ├── memory/ # 四层记忆系统 (working/profile/episodic/correction + cache)
│ ├── security/ # 认证 + 安全
│ ├── prompt/ # Prompt 管理
│ ├── models/ # 模型注册表
│ ├── audit/ # 审计日志
│ ├── session/ # 会话管理
│ ├── db/ # SQLAlchemy ORM
│ ├── repositories/ # 数据访问层
│ └── services/ # 业务逻辑层
├── alembic/ # 数据库迁移
├── tests/ # 测试
├── Dockerfile
├── docker-compose.yml
├── Taskfile.yml
└── pyproject.toml
16. 生产部署 — Agent Runtime Pool¶
16.1 架构概览¶
生产环境采用 Agent Runtime Pool 模式:多个无状态实例通过 Docker Swarm 编排, HAProxy consistent hashing 实现 session affinity。
graph TB
Client[Web/TUI Client]
LB[HAProxy<br/>Consistent Hash<br/>on X-Chat-ID]
subgraph Swarm["Docker Swarm Cluster"]
R1[Runtime #1<br/>1 worker]
R2[Runtime #2<br/>1 worker]
R3[Runtime #3<br/>1 worker]
RN[Runtime #N<br/>1 worker]
end
PG[(PG Supernode<br/>Checkpointer + Advisory Lock)]
Client -->|X-Chat-ID| LB
LB -->|ketama hash| R1
LB -->|ketama hash| R2
LB -->|ketama hash| R3
LB -->|ketama hash| RN
R1 --> PG
R2 --> PG
R3 --> PG
RN --> PG
16.2 关键设计决策¶
| 决策 | 选择 | 理由 |
|---|---|---|
| 实例间状态共享 | PG Supernode (已有) | 不引入新中间件 |
| 并发控制 | PG Advisory Lock | 跨实例互斥,事务结束自动释放 |
| 路由策略 | Consistent Hash (ketama) | 缓存 warmth + 最小迁移 |
| 单 worker/instance | Docker Swarm replicas | 避免 GIL 竞争、内存膨胀 |
| 实例间通信 | 无 (zero gossip) | 所有共享通过 PG |
16.3 部署命令¶
# 一键启动 Swarm HA(data-plane + HAProxy + agent-runtime pool + web)
task ha:up
# 一键拆除
task ha:down
# 扩缩容 agent-runtime
N=5 task deploy:swarm:scale
# 滚动更新 agent-runtime 镜像
task deploy:swarm:update
# 查看实例状态
task deploy:swarm:status
# 查看日志
task deploy:swarm:logs
网络架构
- 本地模式 (
task up):所有服务共享 bridge 网络ttd-net,容器间通过服务名直连 (e.g.db:5432) - Swarm 模式 (
task ha:up):使用 overlay 网络ttd-overlay(10.10.0.0/16),数据库通过host.docker.internal访问宿主机端口 (5435/5436)
16.4 HAProxy 路由规则¶
| 场景 | 路由策略 | 说明 |
|---|---|---|
有 X-Chat-ID header |
Consistent hash on header | Session affinity |
URL 含 /chats/{id}/ |
Consistent hash on path-extracted id | 自动提取 |
新建对话 POST /chats |
Round-robin | 均匀分配 |
16.5 故障转移¶
- 节点故障: 一致性哈希环自动 rehash,仅 ~1/N sessions 受影响
- 状态恢复: 从 PG checkpoint 恢复,无状态丢失
- Health check: HAProxy
inter 5s, fall 3, rise 2自动摘除/恢复节点
16.6 Graceful Shutdown¶
当 Docker Swarm 发送 SIGTERM(滚动更新或缩容时):
- 停止接受新请求(
GracefulShutdown.is_draining = True) - 等待 in-flight graph invocations 完成(max
TTD_DRAIN_TIMEOUT=120s) - 停止 heartbeat + 标记实例为
shutdown - 进程退出
附录 A:异常层次¶
TTDError # 基类
├── ClarificationNeededError # Follow-up 发现歧义
├── PolicyViolationError # Guardrails 安全策略违规
├── SQLValidationError # SQL 校验失败(syntax/policy/EXPLAIN)
├── RetrievalMissError # R/V/G 三引擎均未命中
└── ModelUnavailableError # 模型 API 不可达且无 fallback
附录 B:设计原则¶
- 配置集中 — 所有配置通过
Settings类管理,环境变量前缀TTD_ - 依赖注入 — FastAPI
Depends()注入 PG pool、Graph、ModelRegistry - 异步优先 — 所有 I/O 使用
async/await,数据库使用AsyncConnectionPool - Fail-Safe — 无法确认安全性时拒绝执行
- Defense in Depth — 安全检查在多层重复(意图→检索→生成→执行)
- Audit Everything — 每个管道步骤产出均被记录
- 工厂模式 — Sub-agent 通过
make_xxx_node(deps)创建,便于测试和注入 - 状态增量 — 节点仅返回需要更新的 state 字段
- 错误传播 — 通过
state["errors"]列表传播,不直接抛异常