跳转至

Backend 服务技术参考

概述

TTD Backend 是 Agentic BI 系统的运行时核心,采用 LangGraph Supervisor + Sub-agents 编排模式, 提供 Chat API、智能查询编排、安全防护与多模型 AI 管道。

  • 应用入口:backend/app/main.py
  • 当前版本:4.0.0
  • 包名:ttd-backend(uv workspace member)

1. 技术基线

组件 版本/规格 用途
Python 3.12+ 运行时语言
FastAPI 0.115+ ASGI Web 框架
Uvicorn latest ASGI 服务器
LangGraph ≥ 0.4.0 状态图编排、检查点恢复
LangChain latest 多模型集成抽象层
PostgreSQL (Aurora) 15+ 会话、聊天、配置、语义层存储
psycopg 3 latest 异步 PostgreSQL 驱动(raw AI pipeline)
SQLAlchemy 2.0 latest ORM(Admin 资源管理)
Pydantic v2 latest 数据校验 + Settings
PyJWT + cryptography latest JWT 解码 + JWKS 验签 (RS256)
structlog latest 结构化 JSON 日志
slowapi latest 请求限流

多模型配置

模型 Provider 适用任务
deepseek-v3.2 DashScope (OpenAI-compatible) Supervisor, Query Understanding, Planner, Follow-up
qwen3-max DashScope (OpenAI-compatible) Insights, Visualization, SQL Explanation, Knowledge QA
qwen3-coder-plus DashScope (OpenAI-compatible) SQL Generation, SQL Repair
claude-4.5-sonnet Anthropic 可选高质量推理(Admin 配置)

Embedding 模型

模型 Provider 维度
text-embedding-v4 DashScope 1024 dim

2. 应用启动与生命周期

应用使用 FastAPI 的 lifespan 上下文管理器进行资源初始化和清理。

启动序列

sequenceDiagram
    participant App as FastAPI App
    participant PG as PostgreSQL Pool
    participant IR as Instance Registry
    participant GS as Graceful Shutdown
    participant MR as Model Registry
    participant CP as LangGraph Checkpointer
    participant MS as Memory Store
    participant MC as Metadata Cache
    participant SG as Supervisor Graph
    participant SM as Session Manager

    App->>App: _configure_logging()
    App->>PG: AsyncConnectionPool(aurora_dsn, min=5, max=20)
    PG-->>App: pool ready

    opt Swarm Mode (TTD_SWARM_MODE=true)
        App->>IR: InstanceRegistry(pool, instance_id, hostname)
        IR->>PG: CREATE TABLE ttd.agent_instances
        IR->>PG: INSERT/UPSERT instance registration
        IR->>IR: start_heartbeat(interval=10s)
        App->>GS: GracefulShutdown(registry, drain_timeout=120s)
        GS->>GS: install_signal_handlers(SIGTERM, SIGINT)
    end

    App->>MR: ModelRegistry() + load_from_db()
    MR-->>App: models loaded
    App->>CP: AsyncPostgresSaver.from_conn_string()
    CP->>CP: setup() (create checkpoint tables)
    CP-->>App: checkpointer ready
    App->>MS: AsyncPostgresStore.from_conn_string()
    MS-->>App: store ready
    App->>MC: MetadataCache(pg_pool)
    MC-->>App: cache ready
    App->>SG: build_supervisor_graph(checkpointer, store, registry, pool)
    SG-->>App: compiled graph
    App->>SM: SessionManager(pg_pool)
    SM-->>App: manager ready
    App->>App: create_task(_cleanup_loop)
    Note over App: 🟢 "TTD Chat BI started (instance=xxx, swarm=true)"

关闭序列

  1. Graceful Drain (Swarm mode): 停止接受新请求,等待 in-flight 完成(max 120s)
  2. 停止 heartbeat + 从 ttd.agent_instances 标记为 shutdown
  3. 取消后台清理任务
  4. 关闭 LangGraph Memory Store 连接
  5. 关闭 LangGraph Checkpointer 连接
  6. 关闭 PostgreSQL 连接池
  7. 释放 SQLAlchemy Engine

Middleware 栈

中间件 功能
CORSMiddleware 跨域请求支持(生产环境收紧到 BETTER_AUTH_URL,拒绝 * + credentials)
Rate Limiter (slowapi) 按 IP 限流,默认 60 req/min/user
RateLimitExceeded Handler 429 响应处理
Prometheus Metrics /metrics 端点(可选,TTD_PROMETHEUS_ENABLED

Router 注册

所有 API 路由挂载在 /api/v1 前缀下:

app.include_router(api_router, prefix="/api/v1")

3. API 端点完整列表

3.1 Chat BI API(Primary)

推荐接口

新客户端应使用 Chat API,它提供完整的对话管理、流式响应和反馈机制。

方法 路径 说明
GET /api/v1/chats 列出当前用户所有对话
POST /api/v1/chats 创建新对话
GET /api/v1/chats/{id} 获取对话详情(含消息历史)
PATCH /api/v1/chats/{id} 更新对话(标题/状态)
DELETE /api/v1/chats/{id} 删除对话
POST /api/v1/chats/{id}/messages 发送消息(非流式响应)
POST /api/v1/chats/{id}/messages/stream 发送消息(SSE 流式响应)
POST /api/v1/messages/{id}/feedback 提交消息反馈

消息请求示例

{
  "question": "本月 GMV 同比增长多少?",
  "language": "zh",
  "options": {
    "enable_visualization": true,
    "enable_insights": true,
    "enable_evaluators": false
  }
}

SSE 流式事件格式

event: status
data: {"step": "query_understanding", "message": "正在理解您的问题..."}

event: status
data: {"step": "sql_generation", "message": "正在生成查询..."}

event: result
data: {"message_id": "...", "status": "success", ...}

3.2 Legacy API(向后兼容)

已废弃

Legacy API 保留用于旧客户端兼容,新开发请使用 Chat API。

方法 路径 说明
POST /api/v1/query 单次查询(同步)
GET /api/v1/query/{session_id}/stream Legacy SSE 流式
POST /api/v1/sql_execute 直接 SQL 执行
GET /api/v1/sessions 列出会话
POST /api/v1/feedback 提交反馈

3.3 Admin API

分组 路径前缀 说明
LLM Models /api/v1/admin/llms/* LLM 模型 CRUD
Embedding Models /api/v1/admin/embedding-models/* Embedding 模型管理
Reranker Models /api/v1/admin/reranker-models/* Reranker 模型管理
Knowledge Bases /api/v1/admin/knowledge-bases/* 知识库 CRUD
Data Sources /api/v1/admin/datasources/* 数据源管理
Documents /api/v1/admin/documents/* 文档管理
Chunks /api/v1/admin/chunks/* Chunk 浏览
Graph /api/v1/admin/graph/* KG Entity/Relationship
Chat Engines /api/v1/admin/chat-engines/* 聊天引擎配置
Evaluations /api/v1/admin/evaluations/* 评估任务管理
Feedbacks /api/v1/admin/feedbacks/* 反馈审核
Uploads /api/v1/admin/uploads/* 文件上传
API Keys /api/v1/admin/api-keys/* API Key 管理
Site Settings /api/v1/admin/site-settings/* 全局配置
Stats /api/v1/admin/stats/* 系统统计
Semantic Plane /api/v1/admin/semantic-plane/* 语义层资产浏览
Few-Shot Review /api/v1/admin/few-shots/* 动态 Few-Shot 质量审核(审批、驳回、YAML 导出)

3.4 Utility API

方法 路径 说明
GET /api/v1/health 健康检查
GET /metrics Prometheus 指标(可选)

4. Agent 编排拓扑

4.1 整体架构

graph TD
    START([START]) --> supervisor
    supervisor --> query_understanding_agent
    query_understanding_agent --> router

    router -->|kpi_lookup| data_retrieval_agent
    router -->|nl2sql_query| data_retrieval_agent
    router -->|deep_analysis_workflow| planner
    router -->|graph_reasoning| graph_rag_agent
    router -->|business_knowledge_qa| data_retrieval_agent_kb
    router -->|clarification_required| data_retrieval_agent_cl

    subgraph "SQL Pipeline"
        data_retrieval_agent --> reranker
        reranker --> sql_generator
        sql_generator --> guardrails_agent
        guardrails_agent -->|execute| sql_executor
        guardrails_agent -->|corrective_repair| corrective_retrieval
        guardrails_agent -->|repair| sql_generator
        guardrails_agent -->|reject| END_REJECT([END])
        corrective_retrieval --> sql_generator
        sql_executor --> result_processor
        result_processor --> sql_explainer
    end

    subgraph "Post-processing"
        sql_explainer -->|visualize| visualization_agent
        visualization_agent --> insights_agent
        sql_explainer -->|evaluate| retrieval_evaluator
        retrieval_evaluator --> sql_evaluator
        sql_evaluator --> visualization_agent
    end

    subgraph "Deep Analysis"
        planner --> data_retrieval_agent_deep
        data_retrieval_agent_deep --> reranker_deep[reranker]
        reranker_deep --> sql_generator_deep[sql_generator]
        sql_generator_deep --> guardrails_deep[guardrails]
    end

    insights_agent --> answer_evaluator
    answer_evaluator --> END_OK([END])
    insights_agent --> END_OK2([END])

    graph_rag_agent --> END_GRAPH([END])

    data_retrieval_agent_kb --> knowledge_qa_agent
    knowledge_qa_agent --> END_KB([END])

    data_retrieval_agent_cl --> follow_up_agent
    follow_up_agent --> END_CL([END])

4.2 路由决策详解

Route Decision 触发意图 执行路径
kpi_lookup kpi_lookup data_retrieval → reranker → sql_generator → guardrails → sql_executor → result_processor → sql_explainer
nl2sql_query trend_analysis, comparison, ranking, detail, clarification_reply data_retrieval → reranker → schema_explorer → sql_generator → guardrails → ... → visualization → insights
deep_analysis_workflow root_cause, report_generation planner → data_retrieval → reranker → ... → retrieval_evaluator → sql_evaluator → ... → answer_evaluator
analytical_workflow 诊断/预测/分群/归因 ML toolkit + Code Interpreter pipeline
graph_reasoning relationship, graph_exploration graph_rag_agent
business_knowledge_qa policy_qa data_retrieval → knowledge_qa_agent
clarification_required ambiguous data_retrieval → follow_up_agent

路由升级规则

analytical_depth == "complex" 时,nl2sql_query 自动升级为 deep_analysis_workflow

SQL Cache 快速路径

Router 在 embedding 命中缓存时,直接跳转到 guardrails_agent,跳过检索和生成步骤。

4.3 条件路由函数

函数 位置 可能返回值
_route_after_retrieval data_retrieval 后 rerank, knowledge_qa, no_context, clarify
_route_after_explorer schema_explorer 后 resolved, no_context
_route_after_validation guardrails 后 execute, corrective_repair, repair, reject
_route_after_result sql_explainer 后 visualize, evaluate, insight, done
_route_after_insights insights 后 evaluate, done
_route_after_sql_eval sql_evaluator 后 visualize, done

4.4 全局状态定义 (AgenticBIState)

AgenticBIState 是一个 TypedDict(total=False),所有字段可选:

字段 类型 说明
question str 用户原始问题
language str 语言标识 (zh/en)
user_context dict 用户上下文(roles, domains)
session_id str 会话 ID
options dict 管道选项开关
字段 类型 说明
routing_plan list[str] 执行路径节点列表
current_agent str \| None 当前执行节点
route_decision Literal[...] 路由决策结果
execution_plan list[dict] Planner 输出的执行计划
plan_step_index int 当前执行步骤索引
字段 类型 说明
short_term_memory list[dict] Working memory: 最近 N 轮对话 (LangGraph Checkpointer)
long_term_context dict \| None Legacy 兼容字段(向 memory_context 迁移中)
memory_context dict \| None 四层记忆上下文: profile, episodic, correction (由 MemoryFacade 填充)
字段 类型 说明
query_understanding dict QU 结构化输出
rewritten_question str 重写后的问题
retrieval_results dict R/V/G 检索结果
graph_context dict 图谱上下文
generated_sql str 生成的 SQL
sql_dialect str SQL 方言
sql_explanation str SQL 解释
execution_result dict 执行结果
visualization dict 可视化决策包(含 chart_type, echarts_option, template_ref, candidates, confidence, checkpoint_status, quality
visualization_candidates list[CandidateInfo] Top-k 候选图表列表(含模板 ID、分数、原因、限制)
visualization_confidence float 推荐置信度(0-1),低于 0.5 触发 HITL
visualization_checkpoint_status CheckpointStatus auto_pass / needs_review / blocked
visualization_quality dict 质量评估结果(passed, suitability_score, issues
visualization_feedback dict 用户反馈元数据(用于闭环学习)
insights list[str] 数据洞察
sql_business_context dict SQL 业务上下文
knowledge_answer str 知识问答回答
字段 类型 说明
retrieval_quality dict 检索质量评分
sql_quality dict SQL 质量评分
answer_quality dict 回答质量评分
字段 类型 说明
tenant_id str 租户 ID
governance_overlays dict 治理上下文覆盖
region str 区域标识
industry_archetype str 行业原型
字段 类型 说明
status Literal[...] 管道终态
errors list[str] 错误列表
sql_cache_hit bool 是否命中缓存
repair_attempt int 修复重试次数
clarification_message str 澄清消息
clarification_options list[dict] 澄清选项

5. Sub-Agent 详解

5.1 编排层

Agent 工厂函数 模型 说明
Supervisor make_supervisor_node(registry) embedding 计算 question embedding,解析治理上下文,初始化 state
Query Understanding make_query_understanding_node(registry) deepseek-v3.2 结构化分析意图、实体、指标、维度、时间/地域约束
Router make_router_node(sql_cache) — (规则) 基于 QU 输出映射路由决策,检查 SQL Cache 快速路径
Planner make_planner_node(registry) deepseek-v3.2 复杂分析场景的多步执行计划生成

5.2 检索与增强

Agent 工厂函数 模型 说明
Data Retrieval make_data_retrieval_node(retriever) embedding R/V/G 三引擎检索;从 QU 输出提取 search_terms 传入 Engine 1
Reranker make_reranker_node(registry) 规则引擎 术语绑定加分/降权 + 对检索结果重排序
Corrective Retrieval make_corrective_retrieval_node(retriever) 规则引擎 SQL 失败后的补偿检索,优先按术语绑定路由修复
Schema Explorer make_schema_explorer_node(registry, cache) follow_up 当检索上下文不足时,探索 schema 结构并提供澄清

5.3 SQL 管道

Agent 工厂函数 模型 说明
SQL Generation make_sql_generation_node(registry) qwen3-coder-plus 基于 R/V/G 上下文组装 prompt,生成安全 SQL
Guardrails make_guardrails_node(executor) 规则引擎 五层校验:语法分析 → 策略检查 → AST 列引用校验 → 术语绑定语义校验 → EXPLAIN 执行计划
SQL Executor _make_sql_executor_node(executor, cache) 在数据面执行 SQL,支持 Redshift / pg_mooncake
Result Processor _result_processor_node 结果格式化、截断、类型转换
SQL Explainer make_sql_explainer_node(registry) qwen3-max 用自然语言解释 SQL 的业务含义

5.4 后处理与知识

Agent 工厂函数 模型 说明
Visualization make_visualization_node(registry) qwen3-max(fallback) 推荐决策引擎(top-k recommendation + adapter render);LLM 仅用于 repair
Insights make_insights_node(registry) qwen3-max 从查询结果提取业务洞察
Follow-up make_followup_node(registry) deepseek-v3.2 意图模糊时生成澄清问题和选项
Knowledge QA make_knowledge_qa_node(registry) qwen3-max 基于检索的业务知识问答
Graph RAG _make_graph_rag_node(pool, registry) graph_rag Apache AGE 图谱推理,探索实体关系
Suggest Followup make_suggest_followup_node(registry) deepseek-v3.2 生成推荐的后续问题
Analytical make_analytical_node(...) ML toolkit 诊断/预测/分群/归因等 ML 分析任务

5.5 评估层

Agent 工厂函数 类型 说明
Retrieval Evaluator make_retrieval_evaluator_node() def(同步) 评估检索召回的覆盖度、相关性
SQL Evaluator make_sql_evaluator_node() def(同步) 评估 SQL 正确性、性能、安全性
Answer Evaluator make_answer_evaluator_node() def(同步) 评估最终回答的完整性、准确性

评估器设计

评估器为纯规则引擎(def 而非 async def),无 LLM/IO 调用,确保低延迟。 仅在 deep_analysis_workflow 路径中启用。


6. Skills 与 Tools 层

Skills 目录:backend/app/skills/

6.1 RAG 四引擎检索

文件:skills/rag.py

通过 pgvector 对 PG Supernode 执行四引擎语义搜索(Engine 1+1b+2+D 并行,Engine 3 串行):

引擎 目标表 Top-K / 阈值
Engine 1 — Relational (R) semlayer.business_term LLM 提取的 search_terms 逐词 ILIKE
Engine 1b — Keyword (R+) semlayer.column_asset (AliasDictionary) 字典精确匹配
Engine 2 — Vector (V) 7 张 embedding 表, cosine similarity threshold=0.65
Engine 3 — Graph (G) Apache AGE graph traversal depth=3
Engine D — Dynamic Few-Shot semlayer.dynamic_few_shot_cache top_k=3, sim≥0.95

Engine 1 术语匹配策略(LLM-based Query Decomposition)

Engine 1 不再使用正则表达式分词。搜索关键词来源于 Query Understanding 节点已经提取的结构化实体 (entities.termsmetric_candidatesdimension_candidates 等),由 QueryDecomposer skill 统一封装:

# skills/query_decomposer.py
from_query_understanding(qu)  # 从 QU 输出提取 search terms,零额外 LLM 调用
QueryDecomposer(registry)     # 独立 LLM 调用(当 QU 未运行时的 fallback)

这确保了关键词提取: - 语言无关 — LLM 天然理解中英文混合、缩写、口语化表达 - 领域无关 — 不硬编码任何停用词/分词规则 - 零额外延迟 — 正常流程复用 QU 节点已有的 LLM 输出

术语绑定注入(Term Binding Injection)

Engine 1 命中的术语如果有 mapped_asset_id(绑定到物理列/指标),会被构建为 _term_bindings 视图, 注入 retrieval_results 供后续所有节点共享:

_term_bindings = [
    {"term_id": "term_price", "canonical_term": "价格",
     "mapped_asset_type": "column", "mapped_asset_id": "col_txn_price"},
]

Engine D 仅返回管理员已 approve 的动态 few-shot 条目(review_status='approved'), 并要求 confidence_score ≥ 0.9fail_count ≤ 3(未超过失效阈值)。 新学习到的条目进入 pending_review 状态,不会自动注入,需管理员在 Admin Panel 审批后方才生效。

6.2 SQL Executor

文件:skills/sql_executor.py

Backend 用途 配置
Redshift 生产环境 TTD_DATA_PLANE_BACKEND=redshift
pg_mooncake 本地开发/POC TTD_DATA_PLANE_BACKEND=pg_mooncake

抽象接口 DataPlaneExecutor

  • execute(sql, max_rows, timeout_ms){columns, rows, row_count, has_more, execution_time_ms}
  • explain(sql){plan, estimated_rows, estimated_cost}

6.3 其他 Skills

Skill 文件 功能
Query Decomposer query_decomposer.py LLM 驱动的搜索关键词提取(独立 skill);优先复用 QU 输出 (from_query_understanding),也可独立调用 (QueryDecomposer.extract_search_terms)
Reranker reranker.py 术语绑定感知重排序:绑定列加分(0.95)、父表提升(+0.25)、无绑定表降权(-0.10);支持 Admin 配置的 Reranker 模型
Corrective Retrieval corrective_retrieval.py SQL 失败后补偿检索:优先检测术语绑定违规并按绑定关系修复,其次基于错误信息通用分析
Visualization visualization.py 推荐决策引擎(recommend → select → adapt → validate);含候选排序、置信度评估、HITL checkpoint 触发;LLM 仅保留 repair 辅助
Chart Templates chart_templates/ 模板注册表、数据分析器、top-k 推荐引擎(matcher)、adapter 层、渲染器、主题系统
Insights insights.py 数据洞察提取
Graph RAG graph_rag.py Apache AGE Cypher 查询
Graph Traversal graph_traversal.py 图谱多跳遍历
SQL Business Context sql_business_context.py SQL 业务语义解释
Metadata Cache metadata_cache.py 元数据缓存层(TTL: 24h);含 AliasDictionary 用于 Engine R+ 字典匹配
Context Compactor context_compactor.py 上下文压缩(token budget: 3000)
Governance governance.py 治理上下文解析(tenant, region, compliance)
Domain Context domain_context.py 业务域上下文注入
Column Mapping column_mapping.py 列名映射与消歧
SQL Column Validator sql_column_validator.py AST 级列引用静态校验(防幻觉);含 _suggest_closest_column() 模糊推荐;支持 CTE 虚拟列、SELECT alias、* 通配、函数参数、LATERAL JOIN、WITH RECURSIVE;最大递归深度 15
Error Handler error_handler.py 管道错误统一处理

7. 记忆系统

graph LR
    subgraph "Personalized Memory (per-user isolated)"
        W[Working Memory<br/>LangGraph Checkpointer] --> P[Profile Memory<br/>PostgreSQL]
        P --> E[Episodic Memory<br/>PostgreSQL]
        E --> C[Correction Memory<br/>PostgreSQL]
    end

    subgraph "Shared Optimization (not user memory)"
        SC[SQL Cache<br/>pgvector]
        FS[Dynamic Few-Shot]
        MC[Metadata Cache]
    end

    Q[用户问题] --> W
    W -->|会话上下文| Pipeline
    P -->|用户偏好/默认值| Pipeline
    E -->|历史交互摘要| Pipeline
    C -->|已接受纠正| Pipeline
    SC -->|语义匹配 SQL| Router

7.1 四层记忆架构 (Four-Layer Memory)

存储 生命周期 用途
Working LangGraph Checkpointer (PostgreSQL) 请求/会话作用域 当前对话上下文,最近 N 轮,Context Compaction
Profile ttd.ttd_memory_records (kind=profile) 长期 (365 天 TTL) 稳定用户偏好:常用指标、偏好维度、图表风格
Episodic ttd.ttd_memory_records (kind=episodic) 中期 (90 天 TTL, 衰减) 成功交互摘要、已接受澄清、任务成果
Correction ttd.ttd_memory_records (kind=correction) 中长期 (180 天 TTL) 用户批准的纠正:术语消歧、SQL 修复模式

7.2 Working Memory (短期上下文)

属性
存储 LangGraph Checkpointer (PostgreSQL)
窗口 最近 5 轮(TTD_SHORT_TERM_WINDOW=5
格式 list[dict],每轮包含 user/assistant message
压缩 Context Compactor(token budget 3000,最近 3 轮原样保留)
# app/memory/short_term.py
def trim_history(messages, *, window=5):
    """保留系统消息 + 最近 window 轮对话。"""

7.3 Profile / Episodic / Correction Memory

通过统一的 MemoryFacade 访问,支持路由感知的预算控制和可观测性集成:

# app/memory/facade.py
class MemoryFacade:
    async def retrieve(self, *, user_id, query, route_decision, ...) -> MemoryContext
    async def write_candidate(self, *, user_id, kind, summary_text, ...) -> str | None
    async def upsert(self, *, memory_id, user_id, ...) -> bool
    async def suppress(self, *, memory_id, user_id, ...) -> bool
    async def delete_for_user(self, *, user_id, ...) -> int

路由预算 (Route Budgets):

Route Profile Episodic Correction
nl2sql_query 5 2 5
kpi_lookup 2 0 1
business_knowledge_qa 3 1 1
deep_analysis_workflow 8 8 4

7.4 记忆形成 (Memory Formation)

记忆候选项从已持久化的 turn 中提取,而非内联于生成过程:

# app/memory/formation.py
class MemoryFormationService:
    async def process_turn(self, *, user_id, turn_id, ...) -> list[str]
    async def process_feedback(self, *, user_id, rating, ...) -> list[str]

高信号写入策略:仅在有明确偏好信号、被接受的澄清、重复选择模式、或评估器确认的成功交互时写入。

7.5 SQL Cache(共享优化层)

属性
存储 semlayer.sql_cache 表 (pgvector)
匹配 cosine similarity > 0.92(TTD_SQL_CACHE_SIMILARITY
TTL 15 分钟(TTD_SQL_CACHE_TTL_MINUTES
缓存内容 仅 SQL 文本,不缓存执行结果

数据新鲜度

SQL Cache 仅缓存生成的 SQL 语句。执行结果始终从数据面实时获取, 确保返回最新数据。

7.6 动态 Few-Shot 缓存 (Dynamic Few-Shot Cache)

文件:app/memory/dynamic_few_shot.py

从 SQL 修复成功案例中自动学习 {question, final_sql} 对,作为实时 few-shot 示例补充手工策划的 semantic-plane/few_shots/ 资产。

质量控制工作流(三步审批):

record_success()           →  review_status = 'pending_review'
                               (新条目不参与 RAG,等待审核)
Admin 审批 (/admin/few-shots)  →  approve / reject
search_similar()           →  仅返回 approved + sim ≥ 0.95 的条目
属性
存储 semlayer.dynamic_few_shot_cache 表 (pgvector + review 状态字段)
RAG 注入条件 review_status='approved' AND similarity ≥ 0.95 AND confidence_score ≥ 0.9 AND fail_count ≤ 3
自动过期 30 天(expires_at = NOW() + INTERVAL '30 days'),或 fail_count > 3
YAML 导出 审批通过的条目可导出为与 semantic-plane/few_shots/ 一致的 YAML 格式,便于晋升为正式资产

关键方法:

class DynamicFewShotCache:
    async def record_success(question, final_sql, *, intent_type, grounded_tables,
                             question_embedding, confidence) -> None
    # 新条目进入 pending_review;已有条目的 review_status 保持不变

    async def search_similar(question_embedding, *, top_k=3,
                             min_confidence=0.9, min_similarity=0.95) -> list[dict]
    # 仅返回 approved 条目,供 RAG Engine D 使用

    async def approve(entry_id, reviewer) -> bool
    async def reject(entry_id, reviewer) -> bool
    async def list_by_status(review_status, *, limit, offset) -> tuple[list, int]
    async def get_stats() -> dict  # pending / approved / rejected 各计数
    async def mark_exported(entry_ids) -> int

Admin API 端点(/api/v1/admin/few-shots/):

方法 路径 说明
GET /stats 各状态数量统计
GET / 按状态分页列表
PATCH /{id}/review 审批(approve / reject)
GET /{id}/export 导出单条 YAML
GET /export/bulk 批量导出全部 approved 条目为 YAML

8. 安全架构

graph TD
    Request[API 请求] --> JWKS[Better Auth JWKS 验签]
    JWKS --> Guard[Content Safety Guard]
    Guard --> Policy[Policy Enforcement]
    Policy --> Pipeline[AI Pipeline]
    Pipeline --> RLS[Row-Level Security]
    RLS --> Response[响应]

    JWKS -->|无 Token + Debug| DevBypass[Dev Mode Bypass]
    DevBypass --> Guard
    JWKS -->|无 Token + Prod| Reject[401 Unauthorized]

8.1 认证 — Better Auth JWKS 验签

文件:app/security/auth.py

TTD Backend 不再维护独立的用户认证逻辑。JWT 由 Next.js 侧的 Better Auth 签发(RS256),Backend 通过 JWKS 公钥本地验签。

配置 默认值 说明
TTD_BETTER_AUTH_URL http://localhost:3000 Better Auth 基础 URL(JWKS 端点来源)
TTD_BETTER_AUTH_ISSUER http://localhost:3000 JWT iss 校验值
TTD_JWT_AUDIENCE talk-to-data JWT aud 校验值
TTD_JWKS_CACHE_TTL_SECONDS 3600 JWKS 公钥缓存时间

验证流程:

# PyJWKClient 从 Better Auth JWKS 端点获取公钥
jwks_url = f"{settings.better_auth_url}/api/auth/jwks"
jwks_client = PyJWKClient(jwks_url, cache_jwk_set=True, lifespan=3600)
signing_key = jwks_client.get_signing_key_from_jwt(token)

payload = jwt.decode(
    token, signing_key.key,
    algorithms=["RS256", "ES256"],
    audience=settings.jwt_audience,
    issuer=settings.better_auth_issuer,
)

JWT Claims → UserContext 映射:

UserContext(
    user_id=payload["sub"],          # Better Auth user.id
    roles=_extract_roles(payload),    # "role" (str) + "roles" (list)
    allowed_domains=payload.get("allowed_domains", []),
)
Claim 映射 说明
sub UserContext.user_id Better Auth user.id,与 Chat/Feedback 中的 user_id 一致
role / roles UserContext.roles 支持单值 (admin plugin) 和列表形式
allowed_domains UserContext.allowed_domains Phase 1 为空集合

Admin 路由保护:

# app/api/admin/router.py
admin_router = APIRouter(prefix="/admin", dependencies=[Depends(require_admin)])

require_admin 检查 UserContext.roles 是否包含 "admin",否则返回 403 Forbidden

开发模式绕过:TTD_DEBUG=true 且请求无 Bearer Token 时,自动注入:

UserContext(user_id="dev-user", roles=["admin"], allowed_domains=[])

迁移期 HS256 回退

TTD_DEBUG=true 时,若 JWKS 验证失败,系统尝试以旧 TTD_JWT_SECRET (HS256) 验证 Token。 此回退仅用于本地开发过渡,将在迁移完成后移除。

8.2 Content Safety Guard

文件:app/security/guard.py

拦截恶意输入的两类模式:

TOPIC_DENY_PATTERNS = [
    r"\b(DROP|DELETE|INSERT|UPDATE|ALTER|CREATE|TRUNCATE|GRANT|REVOKE)\b",
    r"ignore\s+(previous|above)\s+instructions",
    r"system\s*prompt",
]
INJECTION_PATTERNS = [
    r";\s*(DROP|DELETE|INSERT|UPDATE)",
    r"UNION\s+SELECT",
]

8.3 Policy Enforcement

文件:app/security/policy.py

基于 UserContext.allowed_domainsllm_exposure_policy 进行:

  • Domain 过滤:只保留用户有权限的业务域资产
  • PII 过滤llm_exposure_policy=hidden 的列对 LLM 不可见
  • Column Maskingllm_exposure_policy=masked 的列仅提供脱敏样本

8.4 SQL Guardrails(五层校验)

检查项 拒绝条件 自愈策略
Layer 1 — 语法层 SQL 解析、多语句检测 非 SELECT / 多条语句 直接拒绝
Layer 2a — 策略层 禁止关键词、LIMIT 检查 含 DML 关键词 直接拒绝
Layer 2c — 列校验层 AST 列引用静态校验(sql_column_validator.py 引用不存在的列 有唯一候选时自动替换;无候选时携带可用列清单触发 corrective_retrieval
Layer 2d — 术语绑定校验 术语绑定语义校验(_validate_term_bindings() 绑定列被引用到错误的表 携带正确的 table.column 信息触发 corrective_retrieval
Layer 3 — 执行计划层 EXPLAIN 分析 预估行数 > 10M / cost > 100K 降级重写或拒绝

Layer 2d 术语绑定校验详细说明:

  • 仅在 retrieval_results 包含 _term_bindings 时激活
  • 对每个绑定的列,使用正则检查 SQL 中是否出现 wrong_table.column 模式
  • 例如:术语"价格"绑定到 fact_transaction.price,但 SQL 写了 dim_article.price,则拦截
  • 返回 Term binding violation 错误,包含正确的表和列名,供 Corrective Retrieval 精准修复
  • 不影响未绑定的列——仅对有明确 mapped_asset_id 的术语生效

Layer 2c 列校验详细说明:

  • 使用 sqlparse 进行纯内存 AST 解析,延迟 < 5ms
  • 构建白名单:从 retrieval_results 中提取所有可见列(含 alias_terms
  • 边界情况处理:CTE 虚拟列、SELECT alias、* 通配、函数参数、LATERAL JOIN、WITH RECURSIVE、最大递归深度 15 层
  • 自动列名修正(_sanitize_column_names():alias_terms 精确匹配 > difflib 0.8+ 唯一候选 > 不替换(保守策略)
  • 每次自动修正写入 structlog 审计日志

validation_result 新增字段:

字段 类型 说明
column_check_pass bool 列校验是否通过
column_errors list[str] 未通过时的列错误详情
column_check_duration_ms float 列校验耗时(ms)

8.5 Rate Limiting

配置 默认值 说明
TTD_RATE_LIMIT_PER_USER 60 每用户每分钟请求数
TTD_BURST_LIMIT 100 突发允许量

9. 多模型注册表

文件:app/models/registry.py

9.1 两层配置架构

graph TD
    Task[任务类型] --> Registry[ModelRegistry]
    Registry --> Check{DB 有配置?}
    Check -->|是| DB[Admin DB 模型]
    Check -->|否| Code[代码默认配置]
    DB --> LLM[LangChain Model]
    Code --> LLM
  1. 代码默认 (_DEFAULT_TASK_MODEL_MAP):硬编码 task → model-id 映射
  2. Admin DB 覆盖:通过 /admin/llms/* API 配置的模型优先级更高

9.2 Task 路由映射

Task 默认模型 说明
supervisor deepseek-v3.2 Supervisor 节点
intent_parsing deepseek-v3.2 意图解析
query_understanding deepseek-v3.2 查询理解
planner deepseek-v3.2 执行计划
sql_generation qwen3-coder-plus SQL 生成
sql_repair qwen3-coder-plus SQL 修复
insights qwen3-max 洞察提取
visualization qwen3-max 可视化修复辅助(仅 repair fallback 场景,95%+ 由推荐引擎处理)
sql_explanation qwen3-max SQL 解释
follow_up deepseek-v3.2 跟进澄清
knowledge_qa qwen3-max 知识问答

9.3 Fallback 链

_DEFAULT_FALLBACK_CHAIN = {
    "deepseek-v3.2": ["qwen3-max", "qwen3-coder-plus"],
    "qwen3-max": ["deepseek-v3.2", "qwen3-coder-plus"],
    "qwen3-coder-plus": ["qwen3-max", "deepseek-v3.2"],
}

当主模型不可用时,按顺序尝试 fallback 模型。

9.4 内置模型配置

Model ID Provider Model Name API
deepseek-v3.2 openai (compatible) deepseek-v3 DashScope
qwen3-max openai (compatible) qwen3-max DashScope
qwen3-coder-plus openai (compatible) qwen3-coder-plus DashScope
claude-4.5-sonnet anthropic claude-sonnet-4-20250514 Anthropic

10. Prompt 管理

10.1 架构

graph LR
    DB[(semlayer.prompt_templates)] --> Manager[PromptManager]
    Manager --> Cache[In-Process Cache]
    Cache --> Assembler[PromptAssembler]
    Assembler --> LLM[LLM 调用]

10.2 PromptManager

文件:app/prompt/manager.py

  • 存储:PostgreSQL semlayer.prompt_templates
  • 版本控制:每条模板有 versionis_active 字段
  • A/B 测试:支持 ab_group 参数选择不同模板变体
  • 缓存:进程内 dict 缓存,支持按 template_id 失效
class PromptManager:
    async def get_template(self, template_id, *, ab_group=None) -> str | None
    @staticmethod
    def invalidate_cache(template_id=None) -> None

10.3 PromptAssembler

文件:app/prompt/assembler.py

将 R/V/G 检索结果组装为结构化 prompt:

  • 注入 SQL 方言指导(Redshift / PostgreSQL 差异)
  • 注入表/列白名单(防止 LLM 幻觉表名)
  • 注入 JOIN 规则、指标定义、业务规则
  • 注入术语绑定约束_build_term_binding_context()):当检索到绑定术语时,生成强制指令如 「价格」→ 必须使用 gold_hm.fact_transaction.price
  • 注入性能约束(LIMIT、CTE 模式要求)
  • Jinja2 模板渲染

11. 会话管理

文件:app/session/manager.py

11.1 数据模型

Schema 说明
ttd.ttd_sessions ttd 会话主表
ttd.ttd_turns ttd 对话轮次

11.2 SessionManager API

class SessionManager:
    async def create_session(user_id, *, ttl_hours=24) -> str
    async def get_session(session_id) -> dict | None
    async def rename_session(session_id, title) -> bool
    async def list_sessions(user_id, *, limit=8) -> list[dict]
    async def cleanup_expired() -> int

11.3 TTL 与清理

配置 默认值 说明
TTD_SESSION_TTL_HOURS 24 会话过期时间
TTD_SESSION_MAX_TURNS 20 单会话最大轮次
TTD_CLEANUP_INTERVAL_MINUTES 60 清理循环间隔

后台 asyncio.Task 定期执行过期会话清理。

11.4 Auto-Title

新对话的标题自动取自第一条用户消息(通过 LATERAL JOIN 查询)。


12. 数据库模型

所有 ORM 模型定义在 app/db/models.py,使用 SQLAlchemy 2.0 Mapped Column 语法。

Schema:ttd(与 semlayer 语义层 schema 分离)

12.1 核心业务模型

模型 表名 说明
Chat ttd_chats 对话(含 user_id, title, engine_name, status)
ChatMessage ttd_chat_messages 消息(含完整管道输出字段)
Feedback ttd_feedback 用户反馈(rating, comment, correction_sql)

12.2 模型管理

模型 表名 说明
LLMModel ttd_llm_models LLM 模型配置
EmbeddingModel ttd_embedding_models Embedding 模型配置
RerankerModel ttd_reranker_models Reranker 模型配置
ChatEngine ttd_chat_engines 聊天引擎配置(关联 LLM + KB)

12.3 知识库系统

模型 表名 说明
KnowledgeBase ttd_knowledge_bases 知识库
DataSource ttd_data_sources 数据源(file/web/sitemap/semantic_plane)
KBDataSource ttd_kb_datasources KB-DS 关联表 (M:N)
Document ttd_documents 文档
Chunk ttd_chunks 文本块(含 embedding)
KGEntity ttd_kg_entities 知识图谱实体
KGRelationship ttd_kg_relationships 知识图谱关系

12.4 评估系统

模型 表名 说明
EvaluationDataset ttd_evaluation_datasets 评估数据集
EvaluationTask ttd_evaluation_tasks 评估任务
EvaluationTaskItem ttd_evaluation_task_items 评估任务项

12.5 系统管理

模型 表名 说明
Upload ttd_uploads 文件上传记录
SiteSettings ttd_site_settings 全局站点设置
ApiKey ttd_api_keys API 密钥管理

12.6 Enum 类型

Enum 用途
DocIndexStatus pending, running, completed, failed 文档索引状态
KGIndexStatus not_started, pending, running, completed, failed KG 索引状态
DataSourceType file, web_single_page, web_sitemap, semantic_plane 数据源类型
EvaluationStatus not_start, evaluating, done, error, cancel 评估状态
FeedbackType like, dislike 反馈类型
ChatVisibility public, private 对话可见性

13. 审计与可观测性

13.1 ObservabilityFacade

文件:app/observability/facade.py

ObservabilityFacade 是系统可观测性的统一网关,将 Langfuse(追踪与评分)、AutoMQ(事件流)和 structlog(结构化日志)整合为单一 API。所有 API 请求通过一个 trace 贯穿完整生命周期。

sequenceDiagram
    participant R as API Request
    participant F as ObservabilityFacade
    participant L as Langfuse
    participant A as AutoMQ
    participant S as structlog

    R->>F: start_trace(session_id, user_id)
    F->>L: create trace
    F->>S: log trace_start
    R->>F: start_span("sql_generation")
    F->>L: create span under trace
    R->>F: emit_event(SESSION_TURN_PERSISTED)
    F->>A: publish to topic
    R->>F: record_score("sql_quality", 0.92)
    F->>L: attach score to trace
    R->>F: finalize_trace(status="success")
    F->>L: close trace
    F->>S: log trace_end

核心方法

方法 说明
start_trace(session_id, user_id, ...) 创建请求级 trace
finalize_trace(trace_id, status, ...) 关闭 trace,汇总 token 与状态
start_span(trace_id, name, ...) 创建子 span(Agent 步骤、LLM 调用等)
end_span(span_id, output, ...) 关闭 span
record_score(trace_id, name, value, ...) 记录评分(评估器/用户反馈)
emit_event(event_type, payload, ...) 发送事件至 AutoMQ
get_langchain_handler(trace_id) 获取 LangChain callback handler

记忆事件 (Memory Events)

记忆系统(working / profile / episodic / correction 四层)的所有操作均为 first-class 事件,自动关联 trace_id / session_id / turn_id:

  • memory.retrieve.completed — 检索完成
  • memory.rank.completed — 排序完成
  • memory.write.candidate — 产生候选写入
  • memory.upsert.completed — 写入/更新完成
  • memory.suppress.completed — 记忆抑制
  • memory.expire.completed — 过期清理
  • memory.delete.completed — 显式删除

相关模块:app/observability/events.py(EventType 枚举)、app/observability/spans.py(observe_span 装饰器)、app/observability/automq_exporter.py(AutoMQ 导出)、app/observability/langfuse_adapter.py(Langfuse 适配)、app/observability/datasets.py(评估数据集管理)。

13.2 结构化审计日志

文件:app/audit/logger.py

每次 Supervisor 调用产生一条完整审计记录。记忆生命周期数据现通过 ObservabilityFacade 事件流采集(而非直接写入审计日志),审计 JSON 中的 memory 字段仅保留摘要引用:

{
  "trace_id": "uuid",
  "session_id": "...",
  "user_id": "...",
  "timestamp": 1700000000.0,
  "duration_ms": 1234,
  "input": {"question": "...", "language": "zh"},
  "orchestration": {"route_decision": "nl2sql_query"},
  "intent": {...},
  "retrieval": {...},
  "generation": {"sql": "...", "model": "qwen3-coder-plus"},
  "validation": {"passed": true},
  "execution": {"rows": 42, "time_ms": 150},
  "memory": {"cache_hit": false, "events_emitted": ["memory.retrieve.completed", "memory.upsert.completed"]},
  "models_used": [
    {"model": "deepseek-v3.2", "stage": "qu", "input_tokens": 500, "output_tokens": 200}
  ],
  "response": {"status": "success"},
  "errors": []
}

完整记忆操作详情可通过 Langfuse trace_id 或 AutoMQ 事件流查询,审计日志仅记录事件类型摘要以控制体积。

13.3 Prometheus 指标

TTD_PROMETHEUS_ENABLED=true 时,挂载 /metrics 端点。

所有指标包含 instance_id label 以支持多实例监控。

指标 类型 说明
ttd_pg_pool_size Gauge 连接池状态(idle/used)
ttd_pg_pool_waiting Gauge 等待连接的请求数
ttd_active_graph_invocations Gauge 正在执行的 LangGraph 调用数
ttd_agent_pool_size Gauge 在线 Agent Runtime 实例数
ttd_sessions_per_instance Gauge 本实例活跃 session 数
ttd_inflight_invocations Gauge 本实例 in-flight 调用数
ttd_advisory_lock_wait_seconds Histogram PG Advisory Lock 等待耗时
ttd_llm_request_duration_seconds Histogram LLM API 调用延迟
ttd_pipeline_duration_seconds Histogram 端到端管道执行时间
ttd_sql_execution_duration_seconds Histogram 数据面 SQL 执行时间
ttd_requests_total Counter API 请求总量
ttd_graph_errors_total Counter 图执行错误总量

13.4 健康检查

GET /api/v1/health 返回服务状态,适用于 K8s liveness/readiness probe。

13.5 错误 Sanitization

文件:app/errors.py

技术错误自动转为用户友好的中文消息:

技术错误 用户可见消息
current transaction is aborted SQL 执行异常,系统正在自动恢复,请重试
statement timeout 查询执行超时,请尝试缩小查询范围
relation XXX does not exist 查询引用了不存在的表,请换一种方式提问
permission denied 没有权限访问该数据,请联系管理员
Model XXX unavailable AI 模型服务暂时不可用,请稍后重试

14. 配置参考

所有环境变量使用 TTD_ 前缀,通过 app/config.py 的 Pydantic Settings 管理。

14.1 服务器

变量 类型 默认值 说明
TTD_HOST str 0.0.0.0 绑定地址
TTD_PORT int 8000 服务端口
TTD_WORKERS int 1 Uvicorn worker 数(多实例时固定为 1)
TTD_DEBUG bool false 调试模式(自动 reload + 免 JWT)

14.1a Runtime Pool (Docker Swarm)

变量 类型 默认值 说明
TTD_SWARM_MODE bool false 启用 Agent Runtime Pool 模式
TTD_INSTANCE_ID str auto-generated 实例唯一标识(Swarm 自动取 HOSTNAME)
TTD_HEARTBEAT_INTERVAL int 10 心跳间隔(秒)
TTD_HEARTBEAT_STALE_TIMEOUT int 30 多久无心跳标记实例为 offline(秒)
TTD_DRAIN_TIMEOUT int 120 优雅关闭时等待 in-flight 完成的超时(秒)
TTD_ADVISORY_LOCK_TIMEOUT_MS int 30000 PG Advisory Lock 获取超时(毫秒)

部署模式

  • 单实例开发TTD_SWARM_MODE=false(默认),无需 HAProxy,task up 启动
  • 多实例生产TTD_SWARM_MODE=true,通过 Docker Swarm + HAProxy 部署, 使用 task ha:up 一键启动(等价于 task deploy:swarm:deploy

14.2 数据库

变量 类型 默认值 说明
TTD_AURORA_DSN str postgresql://postgres:postgres@localhost:5435/ttd_db PG Supernode 连接串
TTD_PG_POOL_MIN int 5 连接池最小连接数(per instance)
TTD_PG_POOL_MAX int 20 连接池最大连接数(per instance)

14.3 数据面

变量 类型 默认值 说明
TTD_DATA_PLANE_BACKEND str redshift 数据面后端 (redshift / pg_mooncake)
TTD_REDSHIFT_HOST str localhost Redshift 主机
TTD_REDSHIFT_PORT int 5439 Redshift 端口
TTD_REDSHIFT_DATABASE str ttd_gold Redshift 数据库
TTD_REDSHIFT_USER str ttd_service Redshift 用户
TTD_REDSHIFT_PASSWORD str Redshift 密码
TTD_REDSHIFT_TIMEOUT_MS int 30000 Redshift 查询超时
TTD_PG_MOONCAKE_DSN str postgresql://...@localhost:5436/ttd_lake pg_mooncake 连接串

14.4 模型

变量 类型 默认值 说明
TTD_ANTHROPIC_API_KEY str Anthropic API Key
TTD_DASHSCOPE_API_KEY str DashScope API Key
TTD_DASHSCOPE_BASE_URL str https://dashscope.aliyuncs.com/compatible-mode/v1 DashScope Base URL
TTD_SUPERVISOR_MODEL str deepseek-v3.2 Supervisor 模型
TTD_SQL_GENERATION_MODEL str qwen3-coder-plus SQL 生成模型
TTD_INSIGHTS_MODEL str qwen3-max Insights 模型
TTD_VISUALIZATION_MODEL str qwen3-max Visualization 模型
TTD_FOLLOWUP_MODEL str deepseek-v3.2 Follow-up 模型
TTD_EMBEDDING_MODEL str text-embedding-v4 Embedding 模型

14.5 认证与 API

变量 类型 默认值 说明
TTD_BETTER_AUTH_URL str http://localhost:3000 Better Auth 基础 URL (JWKS 端点来源)
TTD_BETTER_AUTH_ISSUER str http://localhost:3000 JWT issuer 校验值
TTD_JWKS_CACHE_TTL_SECONDS int 3600 JWKS 公钥缓存时间(秒)
TTD_JWT_AUDIENCE str talk-to-data JWT 受众校验值
TTD_JWT_SECRET str change-me-in-production 旧 HS256 密钥(迁移期回退,将移除)
TTD_JWT_ALGORITHM str HS256 旧 JWT 算法(迁移期回退,将移除)
TTD_RATE_LIMIT_PER_USER int 60 每用户请求限制
TTD_BURST_LIMIT int 100 突发限制
TTD_MAX_QUESTION_LENGTH int 2000 问题最大长度
TTD_CORS_ORIGINS list ["*"] CORS 允许源(生产环境设为 Better Auth URL)

14.6 会话

变量 类型 默认值 说明
TTD_SESSION_TTL_HOURS int 24 会话 TTL
TTD_SESSION_MAX_TURNS int 20 最大轮次
TTD_CONTEXT_WINDOW int 5 上下文窗口
TTD_CLEANUP_INTERVAL_MINUTES int 60 清理间隔

14.7 检索

变量 类型 默认值 说明
TTD_TOP_K_TABLES int 5 表检索 Top-K
TTD_TOP_K_COLUMNS int 20 列检索 Top-K
TTD_TOP_K_METRICS int 5 指标检索 Top-K
TTD_TOP_K_TERMS int 10 术语检索 Top-K
TTD_TOP_K_FEW_SHOTS int 3 Few-shot Top-K
TTD_SIMILARITY_THRESHOLD float 0.65 相似度阈值
TTD_GRAPH_TRAVERSAL_DEPTH int 3 图遍历深度

14.8 执行

变量 类型 默认值 说明
TTD_STATEMENT_TIMEOUT_MS int 30000 SQL 超时
TTD_MAX_ROWS int 1000 最大返回行数
TTD_MAX_SCANNED_BYTES int 1073741824 最大扫描字节 (1GB)

14.9 记忆

变量 类型 默认值 说明
TTD_SHORT_TERM_WINDOW int 5 短期记忆窗口
TTD_CONTEXT_COMPACTION_TOKEN_BUDGET int 3000 上下文压缩 token 预算
TTD_CONTEXT_COMPACTION_RECENT_WINDOW int 3 保留原样的最近轮数
TTD_SQL_CACHE_SIMILARITY float 0.92 SQL 缓存匹配阈值
TTD_SQL_CACHE_TTL_MINUTES int 15 SQL 缓存 TTL
TTD_METADATA_CACHE_TTL_SECONDS int 86400 元数据缓存 TTL (24h)
TTD_SQL_EXECUTE_CACHE_TTL_SECONDS int 86400 SQL 执行缓存 TTL

14.10 修复与校验

变量 类型 默认值 说明
TTD_MAX_REPAIR_RETRIES int 1 SQL 修复最大重试次数
TTD_CIRCUIT_BREAKER_THRESHOLD int 3 熔断器阈值
TTD_MAX_ESTIMATED_ROWS int 10000000 EXPLAIN 预估行数上限
TTD_MAX_ESTIMATED_COST int 100000 EXPLAIN 预估成本上限
TTD_FORBIDDEN_KEYWORDS list [INSERT, UPDATE, DELETE, ...] 禁止的 SQL 关键词

14.11 审计与监控

变量 类型 默认值 说明
TTD_AUDIT_LOG_LEVEL str INFO 审计日志级别
TTD_AUDIT_ARCHIVE_BUCKET str ttd-audit-archive 归档 S3 桶
TTD_AUDIT_ARCHIVE_AFTER_DAYS int 90 归档天数
TTD_PROMETHEUS_ENABLED bool true 是否启用 Prometheus
TTD_METRICS_PATH str /metrics 指标端点路径

15. 本地开发

15.1 环境准备

cd backend
cp ../.env.example .env   # 填写 TTD_DASHSCOPE_API_KEY 等
uv sync                   # 安装依赖

15.2 Docker Compose

# 启动本地依赖(PostgreSQL + pg_mooncake)
docker compose up -d

# 环境变量自动设置:
# TTD_DATA_PLANE_BACKEND=pg_mooncake
# TTD_AURORA_DSN=postgresql://postgres:postgres@postgres:5435/ttd_db
# TTD_PG_MOONCAKE_DSN=postgresql://postgres:postgres@pg-mooncake:5432/postgres

15.3 数据库迁移 (Alembic)

# 运行所有 pending 迁移
task be:db:migrate     # 或 cd backend && uv run alembic upgrade head

# 回滚最近一次迁移
task be:db:rollback

# 创建新迁移
task be:db:revision MESSAGE="add_new_table"

# 查看当前版本
task be:db:current

15.4 开发服务器

# 启动(debug 模式,自动 reload,免 JWT)
task be:dev            # 或 cd backend && uv run ttd-app

# 服务启动后:
# - API: http://localhost:8000/api/v1
# - Docs: http://localhost:8000/docs (Swagger)
# - Metrics: http://localhost:8000/metrics

15.5 Task 命令参考

命令 说明
task be:dev 启动开发服务器
task be:lint 运行 ruff linter
task be:lint:fix 自动修复 lint 问题
task be:format 格式化代码
task be:typecheck 类型检查 (ty)
task be:test 运行 pytest
task be:test:cov 运行测试 + 覆盖率
task be:build:docker 构建 Backend Docker 镜像
task up 启动所有服务 (data-plane + backend + web)
task down 停止所有服务
task be:ci 完整 CI 检查(lint + typecheck + test)

15.6 项目结构

backend/
├── app/
│   ├── main.py              # FastAPI 入口 + lifespan + instance registration
│   ├── config.py            # Pydantic Settings(TTD_* 环境变量)
│   ├── deps.py              # FastAPI 依赖注入 + distributed lock
│   ├── errors.py            # 异常层次 + 错误 sanitization
│   ├── schemas.py           # API 请求/响应 Pydantic models
│   ├── agents/
│   │   ├── builder.py       # build_supervisor_graph() 图构建
│   │   ├── state.py         # AgenticBIState TypedDict
│   │   ├── supervisor.py    # Supervisor 节点
│   │   ├── embedding.py     # Embedding 工具函数
│   │   ├── sub_agents/      # 所有 Sub-agent 实现
│   │   │   ├── query_understanding.py
│   │   │   ├── router.py
│   │   │   ├── planner.py
│   │   │   ├── data_retrieval.py
│   │   │   ├── sql_generation.py
│   │   │   ├── guardrails.py
│   │   │   ├── visualization.py
│   │   │   ├── insights.py
│   │   │   ├── followup.py
│   │   │   ├── knowledge_qa.py
│   │   │   ├── schema_explorer.py
│   │   │   ├── sql_explainer.py
│   │   │   ├── suggest_followup.py
│   │   │   └── analytical.py
│   │   └── evaluators/      # 评估器
│   ├── runtime/             # Agent Runtime Pool 管理
│   │   ├── __init__.py      # 模块入口
│   │   ├── registry.py      # 实例注册 + heartbeat + stale detection
│   │   └── lifecycle.py     # Graceful shutdown + drain
│   ├── api/
│   │   ├── router.py        # 路由注册 + limiter
│   │   ├── chats.py         # Chat BI API (PG advisory lock)
│   │   ├── query.py         # Legacy query
│   │   ├── stream.py        # Legacy SSE
│   │   ├── sql_execute.py   # Direct SQL
│   │   ├── health.py        # Health check
│   │   ├── sessions.py      # Sessions
│   │   ├── feedback.py      # Feedback
│   │   └── admin/           # Admin APIs (20+ files)
│   ├── skills/              # AI Pipeline 技能
│   │   ├── query_decomposer.py  # LLM 驱动搜索关键词提取 + QU 复用
│   │   ├── chart_templates/ # 推荐决策引擎 + ECharts Adapter
│   │   │   ├── adapters/   # Renderer adapter 层 (base, echarts_adapter)
│   │   │   ├── matcher.py  # Top-k recommendation engine
│   │   │   └── recommendation.py  # Headless decision layer
│   │   └── visualization.py # 主入口 (recommend → adapt → validate)
│   ├── memory/              # 四层记忆系统 (working/profile/episodic/correction + cache)
│   ├── security/            # 认证 + 安全
│   ├── prompt/              # Prompt 管理
│   ├── models/              # 模型注册表
│   ├── audit/               # 审计日志
│   ├── session/             # 会话管理
│   ├── db/                  # SQLAlchemy ORM
│   ├── repositories/        # 数据访问层
│   └── services/            # 业务逻辑层
├── alembic/                 # 数据库迁移
├── tests/                   # 测试
├── Dockerfile
├── docker-compose.yml
├── Taskfile.yml
└── pyproject.toml

16. 生产部署 — Agent Runtime Pool

16.1 架构概览

生产环境采用 Agent Runtime Pool 模式:多个无状态实例通过 Docker Swarm 编排, HAProxy consistent hashing 实现 session affinity。

graph TB
    Client[Web/TUI Client]
    LB[HAProxy<br/>Consistent Hash<br/>on X-Chat-ID]

    subgraph Swarm["Docker Swarm Cluster"]
        R1[Runtime #1<br/>1 worker]
        R2[Runtime #2<br/>1 worker]
        R3[Runtime #3<br/>1 worker]
        RN[Runtime #N<br/>1 worker]
    end

    PG[(PG Supernode<br/>Checkpointer + Advisory Lock)]

    Client -->|X-Chat-ID| LB
    LB -->|ketama hash| R1
    LB -->|ketama hash| R2
    LB -->|ketama hash| R3
    LB -->|ketama hash| RN
    R1 --> PG
    R2 --> PG
    R3 --> PG
    RN --> PG

16.2 关键设计决策

决策 选择 理由
实例间状态共享 PG Supernode (已有) 不引入新中间件
并发控制 PG Advisory Lock 跨实例互斥,事务结束自动释放
路由策略 Consistent Hash (ketama) 缓存 warmth + 最小迁移
单 worker/instance Docker Swarm replicas 避免 GIL 竞争、内存膨胀
实例间通信 无 (zero gossip) 所有共享通过 PG

16.3 部署命令

# 一键启动 Swarm HA(data-plane + HAProxy + agent-runtime pool + web)
task ha:up

# 一键拆除
task ha:down

# 扩缩容 agent-runtime
N=5 task deploy:swarm:scale

# 滚动更新 agent-runtime 镜像
task deploy:swarm:update

# 查看实例状态
task deploy:swarm:status

# 查看日志
task deploy:swarm:logs

网络架构

  • 本地模式 (task up):所有服务共享 bridge 网络 ttd-net,容器间通过服务名直连 (e.g. db:5432)
  • Swarm 模式 (task ha:up):使用 overlay 网络 ttd-overlay (10.10.0.0/16),数据库通过 host.docker.internal 访问宿主机端口 (5435/5436)

16.4 HAProxy 路由规则

场景 路由策略 说明
X-Chat-ID header Consistent hash on header Session affinity
URL 含 /chats/{id}/ Consistent hash on path-extracted id 自动提取
新建对话 POST /chats Round-robin 均匀分配

16.5 故障转移

  • 节点故障: 一致性哈希环自动 rehash,仅 ~1/N sessions 受影响
  • 状态恢复: 从 PG checkpoint 恢复,无状态丢失
  • Health check: HAProxy inter 5s, fall 3, rise 2 自动摘除/恢复节点

16.6 Graceful Shutdown

当 Docker Swarm 发送 SIGTERM(滚动更新或缩容时):

  1. 停止接受新请求(GracefulShutdown.is_draining = True
  2. 等待 in-flight graph invocations 完成(max TTD_DRAIN_TIMEOUT=120s
  3. 停止 heartbeat + 标记实例为 shutdown
  4. 进程退出

附录 A:异常层次

TTDError                          # 基类
├── ClarificationNeededError      # Follow-up 发现歧义
├── PolicyViolationError          # Guardrails 安全策略违规
├── SQLValidationError            # SQL 校验失败(syntax/policy/EXPLAIN)
├── RetrievalMissError            # R/V/G 三引擎均未命中
└── ModelUnavailableError         # 模型 API 不可达且无 fallback

附录 B:设计原则

  1. 配置集中 — 所有配置通过 Settings 类管理,环境变量前缀 TTD_
  2. 依赖注入 — FastAPI Depends() 注入 PG pool、Graph、ModelRegistry
  3. 异步优先 — 所有 I/O 使用 async/await,数据库使用 AsyncConnectionPool
  4. Fail-Safe — 无法确认安全性时拒绝执行
  5. Defense in Depth — 安全检查在多层重复(意图→检索→生成→执行)
  6. Audit Everything — 每个管道步骤产出均被记录
  7. 工厂模式 — Sub-agent 通过 make_xxx_node(deps) 创建,便于测试和注入
  8. 状态增量 — 节点仅返回需要更新的 state 字段
  9. 错误传播 — 通过 state["errors"] 列表传播,不直接抛异常