跳转至

Data Plane(数据执行面)

1. 概述

Data Plane 提供 NL2SQL 系统的 运行时 SQL 执行环境。在生产环境中使用 AWS Redshift Serverless;在本地开发中使用一套 Docker Stack 进行模拟:

环境 执行引擎 数据存储
生产 AWS Redshift Serverless S3 (Iceberg / Spectrum)
本地 pg_mooncake (PostgreSQL + columnstore) MinIO (S3-compatible)

本地 Stack 同时承载 PG Supernode(Aurora PostgreSQL 替代品),用于存储 Semantic Layer 元数据(semlayer schema)以及 Apache AGE 图 + pgvector embedding。

设计原则

本地开发尽可能还原生产拓扑,让 SQL 层面做到 write once, run anywhere —— 同一份 semantic-plane YAML 定义的 SQL 既能跑在 Redshift,也能在 pg_mooncake 上验证。


2. 架构

graph TB
    subgraph "Docker Compose Stack (data-plane/)"
        direction TB

        subgraph "PG Supernode (port 5435)"
            PG[PostgreSQL 18<br/>Apache AGE 1.7 + pgvector 0.8.2]
        end

        subgraph "pg_mooncake (port 5436)"
            PGMOONCAKE[mooncakelabs/pg_mooncake:latest<br/>PostgreSQL + columnstore]
        end

        subgraph "Object Storage"
            MINIO[MinIO<br/>port 9000 API / 9001 Console]
            INIT[minio-init<br/>Creates talktodata bucket]
        end

        PGMOONCAKE -->|S3 API| MINIO
        INIT -->|mc mb| MINIO
    end

    BE[Backend Service] -->|psycopg DSN :5435| PG
    BE -->|psycopg DSN :5436| PGMOONCAKE

    style PG fill:#326CE5,color:#fff
    style PGMOONCAKE fill:#FF6D00,color:#fff
    style MINIO fill:#C72C48,color:#fff

服务端口一览

服务 容器名 Host 端口 用途
PostgreSQL (AGE + pgvector) db 5435 semlayer schema — 元数据 + 图 + embedding
pg_mooncake pg-mooncake 5436 gold_hm schema — 列存数据表
MinIO API minio 9000 (localhost only) S3-compatible 对象存储
MinIO Console minio 9001 (localhost only) Web 管理界面
minio-init minio-init 一次性初始化容器

3. 容器配置

3.1 PostgreSQL (PG Supernode)

data-plane/docker-compose.yml (摘要)
postgres:
  build:
    context: ./docker/postgres
    dockerfile: Dockerfile
  image: age-pgvector:local
  container_name: db
  environment:
    POSTGRES_DB: ttd_db
    POSTGRES_USER: postgres
    POSTGRES_PASSWORD: postgres
  ports:
    - "5435:5432"
  volumes:
    - pgdata:/var/lib/postgresql
    - ./sql/001_create_schema.sql:/docker-entrypoint-initdb.d/001_create_schema.sql:ro

自定义镜像构建docker/postgres/Dockerfile):

FROM apache/age:release_PG18_1.7.0

# 在 AGE 基础镜像上编译 pgvector v0.8.2
ADD https://github.com/pgvector/pgvector/archive/refs/tags/v0.8.2.tar.gz /tmp/pgvector.tar.gz
RUN apt-get update && apt-get install -y build-essential postgresql-server-dev-18 \
    && mkdir /tmp/pgvector && tar xzf /tmp/pgvector.tar.gz -C /tmp/pgvector --strip-components=1 \
    && cd /tmp/pgvector && make OPTFLAGS="" && make install \
    && rm -rf /tmp/pgvector /tmp/pgvector.tar.gz

镜像名称

首次 docker compose up 时会自动 build 并标记为 age-pgvector:local,后续启动直接复用。


3.2 pg_mooncake

data-plane/docker-compose.yml (摘要)
pg-mooncake:
  image: mooncakelabs/pg_mooncake:latest
  container_name: pg-mooncake
  ports:
    - "5436:5432"
  volumes:
    - ./docker/pg-mooncake/init-mooncake.sql:/docker-entrypoint-initdb.d/100_init_mooncake.sql:ro
    - ./sql/100_hm_create_schema.sql:/docker-entrypoint-initdb.d/200_hm_create_schema.sql:ro
  environment:
    POSTGRES_DB: ttd_lake
    POSTGRES_USER: postgres
    POSTGRES_PASSWORD: postgres

官方 Docker 镜像

pg_mooncake 使用 mooncake-labs 官方发布的 Docker 镜像,无需本地构建。 单容器模型——PostgreSQL + columnstore 扩展集成在同一镜像中。

初始化脚本 (docker/pg-mooncake/init-mooncake.sql) 核心流程:

  1. 创建 pg_mooncake 扩展
  2. 创建 gold_hm schema
  3. 创建源表(_src 后缀)并通过 mooncake.create_table() 创建列存镜像

3.3 MinIO

data-plane/docker-compose.yml (摘要)
minio:
  image: minio/minio:latest
  container_name: minio
  command: server /data --console-address ":9001"
  ports:
    - "127.0.0.1:9000:9000"   # S3 API
    - "127.0.0.1:9001:9001"   # Console
  environment:
    MINIO_ROOT_USER: minioadmin
    MINIO_ROOT_PASSWORD: minioadmin

3.4 minio-init

data-plane/docker-compose.yml (摘要)
minio-init:
  image: minio/mc:latest
  container_name: minio-init
  entrypoint: ["/bin/sh", "/init-s3.sh"]
  volumes:
    - ./docker/pg-mooncake/init-s3.sh:/init-s3.sh:ro
  depends_on:
    minio:
      condition: service_healthy

init-s3.sh 内容:

mc alias set local http://minio:9000 minioadmin minioadmin
mc mb --ignore-existing local/talktodata

4. DDL Schema

4.1 semlayer Schema(端口 5435 / ttd_db)

存储三层治理模型的所有元数据。由 001_create_schema.sql 在容器首次启动时自动创建。

核心表

# 表名 用途 主键
1 data_domain 业务域定义 domain_id
2 table_asset 表资产定义 table_id
3 column_asset 列资产定义 column_id
4 metric_asset 指标资产定义 metric_id
5 join_rule 表间关联规则 join_rule_id
6 business_term 业务术语 (Layer 1+2) term_id
7 term_relationship 术语关系 (Layer 2) relationship_id
8 term_usage_audit 术语使用统计 (Layer 2) audit_id
9 few_shot_example NL2SQL 示例对 example_id
10 change_log 变更日志 log_id

扩展组件

组件 表/Index 用途
Apache AGE Graph ttd_governance 元数据关系图(Cypher 查询)
pgvector Embedding 8 张 *_embedding 语义向量搜索 (HNSW, cosine)
SQL Cache sql_cache 问题→SQL 语义缓存
Column Value Cache column_value_cache WHERE 子句生成辅助

Embedding 表映射

Embedding 表 关联主表 ID 列
table_asset_embedding table_asset table_id
column_asset_embedding column_asset column_id
metric_asset_embedding metric_asset metric_id
business_term_embedding business_term term_id
few_shot_embedding few_shot_example example_id
join_rule_embedding join_rule join_rule_id
business_rule_embedding business_rule rule_id
business_context_embedding business_context context_id

所有 embedding 表使用 vector(1024) 类型 + HNSW 索引 (vector_cosine_ops, m=16, ef_construction=128)。


4.2 gold_hm Schema(端口 5436 / ttd_lake)

H&M 时尚推荐数据集,以 columnstore mirror 形式存储。源表使用 _src 后缀,通过 mooncake.create_table() 创建列存镜像保持规范表名。

dim_article_src — 商品维度源表

CREATE TABLE IF NOT EXISTS gold_hm.dim_article_src (
    article_id                  VARCHAR(16),
    product_code                VARCHAR(16),
    prod_name                   TEXT,
    product_type_no             INTEGER,
    product_type_name           VARCHAR(256),
    product_group_name          VARCHAR(256),
    graphical_appearance_no     INTEGER,
    graphical_appearance_name   VARCHAR(256),
    colour_group_code           VARCHAR(8),
    colour_group_name           VARCHAR(128),
    perceived_colour_value_id   INTEGER,
    perceived_colour_value_name VARCHAR(128),
    perceived_colour_master_id  INTEGER,
    perceived_colour_master_name VARCHAR(128),
    department_no               INTEGER,
    department_name             VARCHAR(256),
    index_code                  VARCHAR(8),
    index_name                  VARCHAR(256),
    index_group_no              INTEGER,
    index_group_name            VARCHAR(256),
    section_no                  INTEGER,
    section_name                VARCHAR(256),
    garment_group_no            INTEGER,
    garment_group_name          VARCHAR(256),
    detail_desc                 TEXT,
    image_url                   TEXT          -- 派生字段
);

-- 列存镜像(查询时使用此表名)
SELECT mooncake.create_table('gold_hm.dim_article', 'gold_hm.dim_article_src');

dim_customer_src — 客户维度源表

CREATE TABLE IF NOT EXISTS gold_hm.dim_customer_src (
    customer_id         VARCHAR(64),
    fn                  REAL,
    active              REAL,
    club_member_status  VARCHAR(32),
    fashion_news_frequency VARCHAR(32),
    age                 SMALLINT,
    postal_code         VARCHAR(128)
);

-- 列存镜像
SELECT mooncake.create_table('gold_hm.dim_customer', 'gold_hm.dim_customer_src');

fact_transaction_src — 交易事实源表

CREATE TABLE IF NOT EXISTS gold_hm.fact_transaction_src (
    t_dat               DATE,
    customer_id         VARCHAR(64),
    article_id          VARCHAR(16),
    price               DECIMAL(18,8),
    sales_channel_id    SMALLINT
);

-- 列存镜像
SELECT mooncake.create_table('gold_hm.fact_transaction', 'gold_hm.fact_transaction_src');

查询使用镜像表名

数据加载写入 *_src 源表,查询时使用镜像表名(如 gold_hm.dim_article),与 Redshift 生产环境保持一致。


5. 语义元数据发布(Semantic Plane Publishing)

脚本路径:data-plane/scripts/publish_semantic_plane.py

5.1 工作流

flowchart LR
    A[semantic-plane/ YAML] --> B["ttd-publish-sp<br/>(task dp:publish-sp)"]
    B --> C[S3 Bucket]
    C --> D[Admin Backend<br/>auto-sync]
    D --> E[R+V+G Pipeline]

元数据摄入现由 Admin Backend 全面管理。本地开发者只需将 YAML 发布到 S3,Backend 自动检测变更并执行 R/V/G pipeline。

5.2 发布命令

# 发布 semantic-plane/ YAML 到 S3
task dp:publish-sp

5.3 Backend 自动同步

  • Admin UI 中配置 Semantic Plane datasource(Knowledge Base → S3 URI)
  • Backend 定期轮询 S3 manifest hash,检测变更后自动导入
  • 导入流程在 Backend 的 LangGraph semantic_plane agent 中执行完整 R/V/G pipeline
  • S3 快照中缺失的资产被标记 deleted_at(软删除),不再参与检索

全局单例

Semantic datasource 是全局单例——系统范围内仅存在一个活跃的语义数据源。


6. H&M 数据加载

脚本路径:data-plane/scripts/load_hm_data.py

6.1 流程

flowchart LR
    A[H&M CSV Files] --> B[psycopg COPY FROM STDIN]
    B --> C[pg_mooncake 源表 _src]
    C --> D[columnstore 镜像]

6.2 详细步骤

  1. COPY 数据到源表

    COPY gold_hm.dim_article_src(col1, col2, ...) FROM STDIN
        WITH (format 'csv', header true);
    
    数据通过 psycopg 的 copy_expert 以流式方式加载。

  2. 列存镜像自动同步 源表写入后,mooncake columnstore 镜像自动保持同步,查询时使用镜像表名。

  3. 派生 image_url 列

    UPDATE gold_hm.dim_article_src
    SET image_url = '/images/' || SUBSTRING(article_id, 1, 3) || '/' || article_id || '.jpg'
    WHERE image_url IS NULL;
    

  4. 上传商品图片(可选)

    • aws s3 sync images/ s3://talktodata/images/

6.3 CLI 用法

uv run --package ttd-data-plane data-plane/scripts/load_hm_data.py \
  --data-dir ~/opt/dataset/hm-personalized-fashion-recommendations
参数 说明 默认值
--data-dir H&M 数据集本地目录 ~/opt/dataset/hm-personalized-fashion-recommendations
--dsn pg_mooncake 连接串 TTD_PG_MOONCAKE_DSNlocalhost:5436

7. 与 Backend 集成

7.1 配置切换

Backend 通过环境变量 TTD_DATA_PLANE_BACKEND 选择 SQL 执行引擎:

.env
# 生产模式
TTD_DATA_PLANE_BACKEND=redshift

# 本地开发模式
TTD_DATA_PLANE_BACKEND=pg_mooncake

7.2 连接配置

TTD_PG_MOONCAKE_DSN=postgresql://postgres:postgres@localhost:5436/ttd_lake
TTD_REDSHIFT_HOST=xxx.redshift-serverless.amazonaws.com
TTD_REDSHIFT_PORT=5439
TTD_REDSHIFT_DATABASE=ttd_gold
TTD_REDSHIFT_USER=ttd_service
TTD_REDSHIFT_PASSWORD=xxx

7.3 PG Supernode 连接

Backend 同时连接 PG Supernode 读取元数据和执行语义搜索:

TTD_AURORA_DSN=postgresql://postgres:postgres@localhost:5435/ttd_db

7.4 SQL 兼容性

semantic-plane YAML 中的 SQL 定义使用标准 ANSI SQL 语法,兼容 Redshift 和 pg_mooncake 两种方言。关键约束:

  • 避免 Redshift-only 函数(如 GETDATE(),改用 CURRENT_DATE
  • 避免 PostgreSQL-only 函数(如 generate_series()
  • 查询使用镜像表名(如 gold_hm.dim_article),不使用 _src 后缀

8. 常用命令

Task 命令

命令 说明
task dp:up 启动所有 data-plane 容器
task dp:up:obs 启动 data-plane + Observability (AutoMQ)
task dp:down 停止所有 data-plane 容器
task dp:down:obs 停止 data-plane + Observability
task dp:build:docker 构建 Data Plane 自定义镜像 (age-pgvector)
task dp:logs 查看容器日志(tail -f)
task dp:ps 显示容器运行状态
task dp:publish-sp 发布 semantic-plane YAML 到 S3
task dp:load-hm 加载 H&M 数据集到 pg_mooncake
task dp:psql 连接 PG Supernode (port 5435)
task dp:psql:mooncake 连接 pg_mooncake (port 5436)

常用 psql 查询

# 检查元数据表行数
task dp:psql -- -c "SELECT 'table_asset' as t, count(*) FROM semlayer.table_asset
UNION ALL SELECT 'column_asset', count(*) FROM semlayer.column_asset
UNION ALL SELECT 'metric_asset', count(*) FROM semlayer.metric_asset;"

# 检查 embedding 覆盖率
task dp:psql -- -c "SELECT count(*) FROM semlayer.table_asset_embedding;"

# 查看 AGE 图节点数
task dp:psql -- -c "LOAD 'age'; SET search_path = ag_catalog, public;
SELECT * FROM cypher('ttd_governance', \$\$MATCH (n) RETURN labels(n), count(*)\$\$) AS (label agtype, cnt agtype);"

# 查询 pg_mooncake 列存表行数
task dp:psql:mooncake -- -c "SELECT count(*) FROM gold_hm.fact_transaction;"

9. 故障排查

pg_mooncake 连接失败

connection refused: localhost:5436

解决:确认 pg_mooncake 容器已启动 (task dp:ps);检查端口绑定 5436

MinIO 连接超时

Could not connect to the endpoint URL: "http://localhost:9000"

解决:确认 MinIO 容器已启动 (task dp:ps);检查端口绑定 127.0.0.1:9000

AGE 扩展不可用

ERROR: could not open extension control file: "age"

解决:确认使用的是自定义 age-pgvector:local 镜像而非标准 PostgreSQL 镜像。

Embedding 生成失败

DASHSCOPE_API_KEY not set — cannot generate embeddings.

解决:在 .env 中配置 TTD_DASHSCOPE_API_KEY=sk-xxx,或使用 --skip-embedding 跳过。


10. 目录结构

data-plane/
├── docker-compose.yml          # Docker 编排(4 services)
├── docker/
│   ├── postgres/
│   │   └── Dockerfile          # Apache AGE + pgvector 自定义镜像
│   └── pg-mooncake/
│       ├── init-mooncake.sql          # pg_mooncake 初始化(扩展 + schema + 镜像)
│       └── init-s3.sh                 # MinIO bucket 创建
├── sql/
│   ├── 001_create_schema.sql   # semlayer 完整 DDL (17 tables + indices)
│   └── 100_hm_create_schema.sql # gold_hm 列存表 DDL
├── scripts/
│   ├── publish_semantic_plane.py # 发布 semantic-plane YAML 到 S3
│   └── load_hm_data.py          # H&M 数据加载脚本
├── pyproject.toml               # ttd-data-plane 包定义
├── Taskfile.yml                 # Task 命令定义
└── README.md