Data Plane(数据执行面)¶
1. 概述¶
Data Plane 提供 NL2SQL 系统的 运行时 SQL 执行环境。在生产环境中使用 AWS Redshift Serverless;在本地开发中使用一套 Docker Stack 进行模拟:
| 环境 | 执行引擎 | 数据存储 |
|---|---|---|
| 生产 | AWS Redshift Serverless | S3 (Iceberg / Spectrum) |
| 本地 | pg_mooncake (PostgreSQL + columnstore) | MinIO (S3-compatible) |
本地 Stack 同时承载 PG Supernode(Aurora PostgreSQL 替代品),用于存储 Semantic Layer 元数据(semlayer schema)以及 Apache AGE 图 + pgvector embedding。
设计原则
本地开发尽可能还原生产拓扑,让 SQL 层面做到 write once, run anywhere —— 同一份 semantic-plane YAML 定义的 SQL 既能跑在 Redshift,也能在 pg_mooncake 上验证。
2. 架构¶
graph TB
subgraph "Docker Compose Stack (data-plane/)"
direction TB
subgraph "PG Supernode (port 5435)"
PG[PostgreSQL 18<br/>Apache AGE 1.7 + pgvector 0.8.2]
end
subgraph "pg_mooncake (port 5436)"
PGMOONCAKE[mooncakelabs/pg_mooncake:latest<br/>PostgreSQL + columnstore]
end
subgraph "Object Storage"
MINIO[MinIO<br/>port 9000 API / 9001 Console]
INIT[minio-init<br/>Creates talktodata bucket]
end
PGMOONCAKE -->|S3 API| MINIO
INIT -->|mc mb| MINIO
end
BE[Backend Service] -->|psycopg DSN :5435| PG
BE -->|psycopg DSN :5436| PGMOONCAKE
style PG fill:#326CE5,color:#fff
style PGMOONCAKE fill:#FF6D00,color:#fff
style MINIO fill:#C72C48,color:#fff
服务端口一览¶
| 服务 | 容器名 | Host 端口 | 用途 |
|---|---|---|---|
| PostgreSQL (AGE + pgvector) | db |
5435 |
semlayer schema — 元数据 + 图 + embedding |
| pg_mooncake | pg-mooncake |
5436 |
gold_hm schema — 列存数据表 |
| MinIO API | minio |
9000 (localhost only) |
S3-compatible 对象存储 |
| MinIO Console | minio |
9001 (localhost only) |
Web 管理界面 |
| minio-init | minio-init |
— | 一次性初始化容器 |
3. 容器配置¶
3.1 PostgreSQL (PG Supernode)¶
postgres:
build:
context: ./docker/postgres
dockerfile: Dockerfile
image: age-pgvector:local
container_name: db
environment:
POSTGRES_DB: ttd_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
ports:
- "5435:5432"
volumes:
- pgdata:/var/lib/postgresql
- ./sql/001_create_schema.sql:/docker-entrypoint-initdb.d/001_create_schema.sql:ro
自定义镜像构建(docker/postgres/Dockerfile):
FROM apache/age:release_PG18_1.7.0
# 在 AGE 基础镜像上编译 pgvector v0.8.2
ADD https://github.com/pgvector/pgvector/archive/refs/tags/v0.8.2.tar.gz /tmp/pgvector.tar.gz
RUN apt-get update && apt-get install -y build-essential postgresql-server-dev-18 \
&& mkdir /tmp/pgvector && tar xzf /tmp/pgvector.tar.gz -C /tmp/pgvector --strip-components=1 \
&& cd /tmp/pgvector && make OPTFLAGS="" && make install \
&& rm -rf /tmp/pgvector /tmp/pgvector.tar.gz
镜像名称
首次 docker compose up 时会自动 build 并标记为 age-pgvector:local,后续启动直接复用。
3.2 pg_mooncake¶
pg-mooncake:
image: mooncakelabs/pg_mooncake:latest
container_name: pg-mooncake
ports:
- "5436:5432"
volumes:
- ./docker/pg-mooncake/init-mooncake.sql:/docker-entrypoint-initdb.d/100_init_mooncake.sql:ro
- ./sql/100_hm_create_schema.sql:/docker-entrypoint-initdb.d/200_hm_create_schema.sql:ro
environment:
POSTGRES_DB: ttd_lake
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
官方 Docker 镜像
pg_mooncake 使用 mooncake-labs 官方发布的 Docker 镜像,无需本地构建。 单容器模型——PostgreSQL + columnstore 扩展集成在同一镜像中。
初始化脚本 (docker/pg-mooncake/init-mooncake.sql) 核心流程:
- 创建
pg_mooncake扩展 - 创建
gold_hmschema - 创建源表(
_src后缀)并通过mooncake.create_table()创建列存镜像
3.3 MinIO¶
minio:
image: minio/minio:latest
container_name: minio
command: server /data --console-address ":9001"
ports:
- "127.0.0.1:9000:9000" # S3 API
- "127.0.0.1:9001:9001" # Console
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
3.4 minio-init¶
minio-init:
image: minio/mc:latest
container_name: minio-init
entrypoint: ["/bin/sh", "/init-s3.sh"]
volumes:
- ./docker/pg-mooncake/init-s3.sh:/init-s3.sh:ro
depends_on:
minio:
condition: service_healthy
init-s3.sh 内容:
4. DDL Schema¶
4.1 semlayer Schema(端口 5435 / ttd_db)¶
存储三层治理模型的所有元数据。由 001_create_schema.sql 在容器首次启动时自动创建。
核心表¶
| # | 表名 | 用途 | 主键 |
|---|---|---|---|
| 1 | data_domain |
业务域定义 | domain_id |
| 2 | table_asset |
表资产定义 | table_id |
| 3 | column_asset |
列资产定义 | column_id |
| 4 | metric_asset |
指标资产定义 | metric_id |
| 5 | join_rule |
表间关联规则 | join_rule_id |
| 6 | business_term |
业务术语 (Layer 1+2) | term_id |
| 7 | term_relationship |
术语关系 (Layer 2) | relationship_id |
| 8 | term_usage_audit |
术语使用统计 (Layer 2) | audit_id |
| 9 | few_shot_example |
NL2SQL 示例对 | example_id |
| 10 | change_log |
变更日志 | log_id |
扩展组件¶
| 组件 | 表/Index | 用途 |
|---|---|---|
| Apache AGE Graph | ttd_governance |
元数据关系图(Cypher 查询) |
| pgvector Embedding | 8 张 *_embedding 表 |
语义向量搜索 (HNSW, cosine) |
| SQL Cache | sql_cache |
问题→SQL 语义缓存 |
| Column Value Cache | column_value_cache |
WHERE 子句生成辅助 |
Embedding 表映射¶
| Embedding 表 | 关联主表 | ID 列 |
|---|---|---|
table_asset_embedding |
table_asset |
table_id |
column_asset_embedding |
column_asset |
column_id |
metric_asset_embedding |
metric_asset |
metric_id |
business_term_embedding |
business_term |
term_id |
few_shot_embedding |
few_shot_example |
example_id |
join_rule_embedding |
join_rule |
join_rule_id |
business_rule_embedding |
business_rule |
rule_id |
business_context_embedding |
business_context |
context_id |
所有 embedding 表使用 vector(1024) 类型 + HNSW 索引 (vector_cosine_ops, m=16, ef_construction=128)。
4.2 gold_hm Schema(端口 5436 / ttd_lake)¶
H&M 时尚推荐数据集,以 columnstore mirror 形式存储。源表使用 _src 后缀,通过 mooncake.create_table() 创建列存镜像保持规范表名。
dim_article_src — 商品维度源表¶
CREATE TABLE IF NOT EXISTS gold_hm.dim_article_src (
article_id VARCHAR(16),
product_code VARCHAR(16),
prod_name TEXT,
product_type_no INTEGER,
product_type_name VARCHAR(256),
product_group_name VARCHAR(256),
graphical_appearance_no INTEGER,
graphical_appearance_name VARCHAR(256),
colour_group_code VARCHAR(8),
colour_group_name VARCHAR(128),
perceived_colour_value_id INTEGER,
perceived_colour_value_name VARCHAR(128),
perceived_colour_master_id INTEGER,
perceived_colour_master_name VARCHAR(128),
department_no INTEGER,
department_name VARCHAR(256),
index_code VARCHAR(8),
index_name VARCHAR(256),
index_group_no INTEGER,
index_group_name VARCHAR(256),
section_no INTEGER,
section_name VARCHAR(256),
garment_group_no INTEGER,
garment_group_name VARCHAR(256),
detail_desc TEXT,
image_url TEXT -- 派生字段
);
-- 列存镜像(查询时使用此表名)
SELECT mooncake.create_table('gold_hm.dim_article', 'gold_hm.dim_article_src');
dim_customer_src — 客户维度源表¶
CREATE TABLE IF NOT EXISTS gold_hm.dim_customer_src (
customer_id VARCHAR(64),
fn REAL,
active REAL,
club_member_status VARCHAR(32),
fashion_news_frequency VARCHAR(32),
age SMALLINT,
postal_code VARCHAR(128)
);
-- 列存镜像
SELECT mooncake.create_table('gold_hm.dim_customer', 'gold_hm.dim_customer_src');
fact_transaction_src — 交易事实源表¶
CREATE TABLE IF NOT EXISTS gold_hm.fact_transaction_src (
t_dat DATE,
customer_id VARCHAR(64),
article_id VARCHAR(16),
price DECIMAL(18,8),
sales_channel_id SMALLINT
);
-- 列存镜像
SELECT mooncake.create_table('gold_hm.fact_transaction', 'gold_hm.fact_transaction_src');
查询使用镜像表名
数据加载写入 *_src 源表,查询时使用镜像表名(如 gold_hm.dim_article),与 Redshift 生产环境保持一致。
5. 语义元数据发布(Semantic Plane Publishing)¶
脚本路径:data-plane/scripts/publish_semantic_plane.py
5.1 工作流¶
flowchart LR
A[semantic-plane/ YAML] --> B["ttd-publish-sp<br/>(task dp:publish-sp)"]
B --> C[S3 Bucket]
C --> D[Admin Backend<br/>auto-sync]
D --> E[R+V+G Pipeline]
元数据摄入现由 Admin Backend 全面管理。本地开发者只需将 YAML 发布到 S3,Backend 自动检测变更并执行 R/V/G pipeline。
5.2 发布命令¶
5.3 Backend 自动同步¶
- Admin UI 中配置 Semantic Plane datasource(Knowledge Base → S3 URI)
- Backend 定期轮询 S3 manifest hash,检测变更后自动导入
- 导入流程在 Backend 的 LangGraph
semantic_planeagent 中执行完整 R/V/G pipeline - S3 快照中缺失的资产被标记
deleted_at(软删除),不再参与检索
全局单例
Semantic datasource 是全局单例——系统范围内仅存在一个活跃的语义数据源。
6. H&M 数据加载¶
脚本路径:data-plane/scripts/load_hm_data.py
6.1 流程¶
flowchart LR
A[H&M CSV Files] --> B[psycopg COPY FROM STDIN]
B --> C[pg_mooncake 源表 _src]
C --> D[columnstore 镜像]
6.2 详细步骤¶
-
COPY 数据到源表
数据通过 psycopg 的copy_expert以流式方式加载。 -
列存镜像自动同步 源表写入后,mooncake columnstore 镜像自动保持同步,查询时使用镜像表名。
-
派生 image_url 列
-
上传商品图片(可选)
aws s3 sync images/ s3://talktodata/images/
6.3 CLI 用法¶
uv run --package ttd-data-plane data-plane/scripts/load_hm_data.py \
--data-dir ~/opt/dataset/hm-personalized-fashion-recommendations
| 参数 | 说明 | 默认值 |
|---|---|---|
--data-dir |
H&M 数据集本地目录 | ~/opt/dataset/hm-personalized-fashion-recommendations |
--dsn |
pg_mooncake 连接串 | TTD_PG_MOONCAKE_DSN 或 localhost:5436 |
7. 与 Backend 集成¶
7.1 配置切换¶
Backend 通过环境变量 TTD_DATA_PLANE_BACKEND 选择 SQL 执行引擎:
7.2 连接配置¶
7.3 PG Supernode 连接¶
Backend 同时连接 PG Supernode 读取元数据和执行语义搜索:
7.4 SQL 兼容性¶
semantic-plane YAML 中的 SQL 定义使用标准 ANSI SQL 语法,兼容 Redshift 和 pg_mooncake 两种方言。关键约束:
- 避免 Redshift-only 函数(如
GETDATE(),改用CURRENT_DATE) - 避免 PostgreSQL-only 函数(如
generate_series()) - 查询使用镜像表名(如
gold_hm.dim_article),不使用_src后缀
8. 常用命令¶
Task 命令¶
| 命令 | 说明 |
|---|---|
task dp:up |
启动所有 data-plane 容器 |
task dp:up:obs |
启动 data-plane + Observability (AutoMQ) |
task dp:down |
停止所有 data-plane 容器 |
task dp:down:obs |
停止 data-plane + Observability |
task dp:build:docker |
构建 Data Plane 自定义镜像 (age-pgvector) |
task dp:logs |
查看容器日志(tail -f) |
task dp:ps |
显示容器运行状态 |
task dp:publish-sp |
发布 semantic-plane YAML 到 S3 |
task dp:load-hm |
加载 H&M 数据集到 pg_mooncake |
task dp:psql |
连接 PG Supernode (port 5435) |
task dp:psql:mooncake |
连接 pg_mooncake (port 5436) |
常用 psql 查询¶
# 检查元数据表行数
task dp:psql -- -c "SELECT 'table_asset' as t, count(*) FROM semlayer.table_asset
UNION ALL SELECT 'column_asset', count(*) FROM semlayer.column_asset
UNION ALL SELECT 'metric_asset', count(*) FROM semlayer.metric_asset;"
# 检查 embedding 覆盖率
task dp:psql -- -c "SELECT count(*) FROM semlayer.table_asset_embedding;"
# 查看 AGE 图节点数
task dp:psql -- -c "LOAD 'age'; SET search_path = ag_catalog, public;
SELECT * FROM cypher('ttd_governance', \$\$MATCH (n) RETURN labels(n), count(*)\$\$) AS (label agtype, cnt agtype);"
# 查询 pg_mooncake 列存表行数
task dp:psql:mooncake -- -c "SELECT count(*) FROM gold_hm.fact_transaction;"
9. 故障排查¶
pg_mooncake 连接失败¶
解决:确认 pg_mooncake 容器已启动 (task dp:ps);检查端口绑定 5436。
MinIO 连接超时¶
解决:确认 MinIO 容器已启动 (task dp:ps);检查端口绑定 127.0.0.1:9000。
AGE 扩展不可用¶
解决:确认使用的是自定义 age-pgvector:local 镜像而非标准 PostgreSQL 镜像。
Embedding 生成失败¶
解决:在 .env 中配置 TTD_DASHSCOPE_API_KEY=sk-xxx,或使用 --skip-embedding 跳过。
10. 目录结构¶
data-plane/
├── docker-compose.yml # Docker 编排(4 services)
├── docker/
│ ├── postgres/
│ │ └── Dockerfile # Apache AGE + pgvector 自定义镜像
│ └── pg-mooncake/
│ ├── init-mooncake.sql # pg_mooncake 初始化(扩展 + schema + 镜像)
│ └── init-s3.sh # MinIO bucket 创建
├── sql/
│ ├── 001_create_schema.sql # semlayer 完整 DDL (17 tables + indices)
│ └── 100_hm_create_schema.sql # gold_hm 列存表 DDL
├── scripts/
│ ├── publish_semantic_plane.py # 发布 semantic-plane YAML 到 S3
│ └── load_hm_data.py # H&M 数据加载脚本
├── pyproject.toml # ttd-data-plane 包定义
├── Taskfile.yml # Task 命令定义
└── README.md