OpenMetadata 完整训练手册

目标读者: AI/Data Platform Engineer,具备 Python、SQL、Spark 基础
学习目标: 从零开始,系统掌握 OpenMetadata 的核心功能,并能在生产级数据平台(如 FinLakehouse)中独立运用
预计时间: 约 12–16 小时(分模块完成)

为什么要学 OpenMetadata?

在现代数据平台中,数据治理是区分"会用工具的工程师"和"能设计平台的架构师"的关键分水岭。

OpenMetadata 是目前开源社区中功能最完整的元数据管理平台,它解决了一个核心问题:当数据资产越来越多,谁能告诉我——这张表是什么意思?它从哪来?它是否可信?

对于你的职业定位(Regulated AI Platform Engineer,目标 Frankfurt Fintech),OpenMetadata 的价值在于:

  • 它原生支持数据血缘(Lineage),这是 DORA 和 BaFin 合规审计的技术基础
  • 它提供 API-first 架构,可与 Dagster、Airflow、MLflow 深度集成
  • 它是 Databricks Unity Catalog 的开源替代品之一,掌握它可以强化你的"平台工程师"叙事

模块一:理解 OpenMetadata 的架构与核心概念

1.1 OpenMetadata 是什么?

OpenMetadata 是一个开放标准的元数据平台,它的核心思想是:把所有数据资产(表、仪表板、Pipeline、ML 模型、API)统一抽象成"实体(Entity)",并通过一套标准 API 进行管理。

你可以把它想象成数据资产的"GitHub":就像 GitHub 管理代码,OpenMetadata 管理数据的"定义、血缘和质量"。

1.2 核心架构组件

OpenMetadata 由四个主要部分构成:

① OpenMetadata Server(核心 API 服务)
这是整个平台的大脑,基于 Java 开发,提供 RESTful API。所有的元数据读写都经过这里。它背后使用 MySQL/PostgreSQL 作为持久化存储,Elasticsearch 作为搜索引擎。

② Ingestion Framework(摄取框架)
这是一个 Python 框架,负责从各种数据源(Postgres、Snowflake、Spark、Airflow 等)自动抓取元数据。它不抓实际数据,只抓"关于数据的数据"——表结构、字段类型、行数统计、血缘关系等。

③ UI(Web 界面)
React 前端,提供搜索、浏览、编辑、标注元数据的可视化界面。数据工程师和业务人员都通过这里访问数据目录。

④ Connectors(连接器)
OpenMetadata 内置了 80+ 个连接器,覆盖主流数据库、Data Lake、BI 工具、Pipeline 工具。每个 Connector 本质上是一个实现了标准接口的 Python 类。

1.3 核心概念:Entity(实体)

OpenMetadata 中一切皆为 Entity。常见的 Entity 类型有:

  • Table:数据库表或 Delta/Iceberg 表
  • Pipeline:Airflow DAG 或 Dagster Job
  • Dashboard:Grafana/Superset 仪表板
  • MlModel:MLflow 中注册的模型
  • Database / DatabaseSchema:层级组织容器
  • Glossary / GlossaryTerm:业务术语表

每个 Entity 都有唯一的 FQN(Fully Qualified Name),格式如:service_name.database.schema.table_name。这是你在 API 调用中引用任何资产的标准方式。

1.4 核心概念:Lineage(血缘)

血缘描述的是数据的"因果链":这张表的数据从哪里来,经过了哪些转换,最终流向了哪里。在 OpenMetadata 中,血缘是一个有向无环图(DAG),节点是 Entity,边是转换关系。

思维提升: 血缘不只是一个"好看的功能",它是合规的技术证明。当监管机构问"这个风控模型的特征数据来源于哪里",血缘图就是你的答案。这就是为什么 FinLakehouse 需要它。


模块二:本地环境搭建

2.1 使用 Docker Compose 快速启动

OpenMetadata 官方提供了完整的 Docker Compose 配置,这是最快的本地实验方式。

前置要求:

  • Docker Desktop(内存至少分配 6GB)
  • Docker Compose v2.x

步骤一:下载官方配置文件

# 创建工作目录
mkdir openmetadata-local && cd openmetadata-local

# 下载官方 docker-compose 文件
curl -sL https://github.com/open-metadata/OpenMetadata/releases/latest/download/docker-compose.yml \
  -o docker-compose.yml

步骤二:启动服务

docker compose up -d

# 查看所有容器状态,等待全部变为 healthy
docker compose ps

启动过程需要 3–5 分钟,因为要初始化数据库 schema 和 Elasticsearch 索引。

步骤三:访问 UI

打开浏览器访问 http://localhost:8585,默认账号密码为:

  • Username: admin
  • Password: admin

进入 UI 后,你会看到一个空的数据目录——这是正常的,我们接下来会填充它。

2.2 理解启动时发生了什么

当你运行 docker compose up 时,后台发生了以下事情:

  1. MySQL 容器启动,OpenMetadata Server 运行数据库迁移脚本,创建所有元数据表
  2. Elasticsearch 容器启动,OpenMetadata Server 创建搜索索引
  3. openmetadata-server 容器启动,加载默认配置(包含内置的数据类型分类、默认角色等)
  4. ingestion 容器启动,这是运行 Ingestion Pipeline 的工作节点

练习 2.1: 运行 docker compose logs openmetadata_server | grep "Server started" 确认服务成功启动,然后在浏览器中探索 UI 的各个菜单。


模块三:第一个 Ingestion Pipeline——连接 PostgreSQL

这是最重要的实践模块。我们会把一个 PostgreSQL 数据库接入 OpenMetadata,让它自动发现所有的表和字段。

3.1 准备测试数据库

如果你的 SoloLakehouse 环境中已有 PostgreSQL(比如 Hive Metastore 使用的那个),可以直接使用。否则,用以下命令启动一个测试实例:

# 启动测试 PostgreSQL,并预加载一些测试数据
docker run -d \
  --name pg-test \
  --network openmetadata-local_app_net \
  -e POSTGRES_PASSWORD=test123 \
  -e POSTGRES_DB=finlakehouse_demo \
  postgres:15

# 进入容器,创建一些示例表
docker exec -it pg-test psql -U postgres -d finlakehouse_demo -c "
CREATE SCHEMA IF NOT EXISTS bronze;
CREATE SCHEMA IF NOT EXISTS silver;

CREATE TABLE bronze.raw_transactions (
  tx_id VARCHAR(50) PRIMARY KEY,
  account_id VARCHAR(50),
  amount DECIMAL(18,2),
  currency VARCHAR(3),
  tx_timestamp TIMESTAMP,
  source_system VARCHAR(50)
);

CREATE TABLE silver.cleaned_transactions (
  tx_id VARCHAR(50) PRIMARY KEY,
  account_id VARCHAR(50),
  amount_eur DECIMAL(18,2),
  tx_date DATE,
  is_flagged BOOLEAN,
  processed_at TIMESTAMP
);
"

这模拟了你 FinLakehouse 中的 Bronze→Silver 数据分层架构。

3.2 在 UI 中创建 Database Service

操作路径: Settings → Services → Databases → Add New Service

在 UI 中按以下步骤配置:

  1. 选择 Service Type:PostgreSQL
  2. Service Name:finlakehouse-postgres
  3. 填写连接信息:
    • Host:pg-test(Docker 网络内的主机名)
    • Port:5432
    • Username:postgres
    • Password:test123
    • Database:finlakehouse_demo

点击 Test Connection 验证连通性,成功后点击 Save

3.3 配置并运行 Metadata Ingestion

Service 创建后,点击进入该 Service,选择 Ingestion 标签页,点击 Add Ingestion

选择 Metadata 类型(这会抓取表结构、字段等基础元数据)。

配置关键选项:

  • Schema Filter Patternbronze|silver(只摄取这两个 schema)
  • Include Views:开启
  • Mark Deleted Tables:开启(当源库中表被删除时,OpenMetadata 中标记为已删除而非直接删除,保留历史)

点击 Run Now 执行摄取。

练习 3.1: 摄取完成后,在 Explore → Tables 中搜索 raw_transactions,查看它的 Schema、字段类型和 Sample Data。

3.4 理解摄取背后发生了什么

Ingestion Container
       │
       ▼
PostgreSQL Connector
  (连接 PG,执行 information_schema 查询)
       │
       ▼
Entity Builder
  (把查询结果转换为 OpenMetadata 的 Table Entity JSON)
       │
       ▼
OpenMetadata REST API
  (PUT /api/v1/tables  →  存入 MySQL + Elasticsearch)

关键洞察:Connector 不读取你的实际业务数据,它只读取 information_schemapg_catalog 等系统表中的元数据。这意味着你可以安全地在生产库上运行摄取,而不会有数据泄漏风险。


模块四:数据血缘(Lineage)管理

血缘管理是 OpenMetadata 最核心的企业级功能,也是你在 FinLakehouse 场景下最需要掌握的能力。

4.1 手动添加血缘关系(UI 方式)

操作路径: 找到目标表 → Lineage 标签页 → Edit Lineage

在可视化编辑器中,你可以:

  1. 点击 + 添加上游(Upstream)或下游(Downstream)节点
  2. 搜索并选择相关的 Table 或 Pipeline
  3. 可以在连线上点击,添加具体的字段级血缘(Column-level Lineage)

练习 4.1: 手动创建以下血缘关系:

bronze.raw_transactions → silver.cleaned_transactions

4.2 通过 Python API 自动添加血缘(推荐方式)

手动添加血缘只适合演示,生产中应该在 Pipeline 代码中自动上报血缘。

首先安装 OpenMetadata Python SDK:

pip install "openmetadata-ingestion[postgres]"

然后编写血缘上报代码:

from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
    OpenMetadataJWTClientConfig,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
    OpenMetadataConnection,
    AuthProvider,
)

# 配置连接
server_config = OpenMetadataConnection(
    hostPort="http://localhost:8585/api",
    authProvider=AuthProvider.openmetadata,
    securityConfig=OpenMetadataJWTClientConfig(
        jwtToken="YOUR_JWT_TOKEN"  # 从 Settings → Bots 获取
    ),
)

metadata = OpenMetadata(server_config)

# 获取源表和目标表的 Entity 引用
# FQN 格式: service.database.schema.table
source_table = metadata.get_by_name(
    entity=Table,
    fqn="finlakehouse-postgres.finlakehouse_demo.bronze.raw_transactions"
)

target_table = metadata.get_by_name(
    entity=Table,
    fqn="finlakehouse-postgres.finlakehouse_demo.silver.cleaned_transactions"
)

# 构建血缘请求
lineage_request = AddLineageRequest(
    edge=EntitiesEdge(
        fromEntity=EntityReference(id=source_table.id, type="table"),
        toEntity=EntityReference(id=target_table.id, type="table"),
        lineageDetails=LineageDetails(
            # 可选:记录列级血缘
            # columnsLineage=[...]
            description="Bronze→Silver ETL: currency normalization and deduplication"
        )
    )
)

# 提交血缘
metadata.add_lineage(lineage_request)
print("Lineage added successfully!")

思维提升: 注意这个代码的本质是在 Pipeline 执行时"自我报告"血缘。更高级的做法是让 Dagster/Airflow 的 Hook 自动在每次 Job 成功后调用此 API,实现血缘的完全自动化。这就是"Platform Engineering"和"写脚本"的区别——前者设计系统让血缘自动流动,后者手动维护。

4.3 Pipeline 血缘:连接 Airflow

OpenMetadata 提供了 Airflow 的原生集成。在你的 DAG 中添加以下配置,血缘会在 DAG 运行时自动上报:

from airflow import DAG
from airflow.operators.python import PythonOperator
from openmetadata_managed_apis.workflows.ingestion.airflow_lineage_backend import (
    OpenMetadataLineageBackend
)

# airflow.cfg 中配置(或通过环境变量):
# [lineage]
# backend = openmetadata_managed_apis.workflows.ingestion.airflow_lineage_backend.OpenMetadataLineageBackend

with DAG("bronze_to_silver", ...) as dag:
    
    @dag.task(
        # 在 task 上声明血缘
        inlets=[{"tables": ["finlakehouse-postgres.finlakehouse_demo.bronze.raw_transactions"]}],
        outlets=[{"tables": ["finlakehouse-postgres.finlakehouse_demo.silver.cleaned_transactions"]}]
    )
    def transform_transactions():
        # 你的 ETL 逻辑
        pass

模块五:数据质量(Data Quality)管理

数据质量是 OpenMetadata 的另一个杀手级功能,特别适合 Fintech 合规场景(BaFin 要求数据有可验证的质量记录)。

5.1 核心概念:Test Suite 和 Test Case

OpenMetadata 的数据质量体系分两层:

Test Case(测试用例) 是最小单元,例如:"字段 amount 的值必须大于 0"、"表 raw_transactions 每天新增行数不少于 1000"。

Test Suite(测试套件) 是 Test Case 的集合,绑定到一张表上,定期执行。

5.2 通过 UI 创建数据质量测试

操作路径: 找到目标表 → Quality 标签页 → Add Test

选择 Test 类型,OpenMetadata 内置了以下常用测试:

  • columnValuesToBeNotNull:字段不允许为空
  • columnValuesToBeBetween:字段值在某个范围内
  • columnValuesToBeUnique:字段值唯一
  • tableRowCountToBeBetween:表的行数在某个范围内
  • columnValueLengthsToBeBetween:字段字符串长度范围

练习 5.1:bronze.raw_transactions 创建以下测试:

  1. tx_id 字段不允许为空(columnValuesToBeNotNull
  2. amount 字段值必须大于 0(columnValuesToBeBetween,min=0)
  3. 表行数在 0 到 1,000,000 之间

5.3 通过 Python SDK 创建测试(自动化方式)

from metadata.generated.schema.tests.basic import TestCaseResult, TestResultValue
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.generated.schema.tests.testDefinition import TestPlatform

# 创建 Test Suite
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest

test_suite = metadata.create_or_update(
    CreateTestSuiteRequest(
        name="finlakehouse_bronze_quality",
        description="Data quality tests for Bronze layer tables",
    )
)

# 创建 Test Case:amount 必须大于 0
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest

test_case = metadata.create_or_update(
    CreateTestCaseRequest(
        name="amount_must_be_positive",
        entityLink="<#E::table::finlakehouse-postgres.finlakehouse_demo.bronze.raw_transactions::columns::amount>",
        testDefinition="columnValuesToBeBetween",
        testSuite=test_suite.fullyQualifiedName,
        parameterValues=[
            TestCaseParameterValue(name="minValue", value="0"),
        ],
    )
)

print(f"Test Case created: {test_case.name}")

entityLink 是 OpenMetadata 中引用实体特定部分的方式,格式如下:

  • 引用整张表:<#E::table::service.db.schema.table>
  • 引用具体字段:<#E::table::service.db.schema.table::columns::column_name>

这个格式设计的很精妙——它把"实体类型"和"路径"都编码进一个字符串,使得 API 可以通用地处理各种 Entity 的引用,而不需要为每种资产类型设计不同的请求体。


模块六:业务术语表(Glossary)与标签(Tags)

6.1 为什么业务术语表对 Fintech 很重要

在金融领域,同一个概念可能有不同的技术实现。例如"交易金额"可能在多个系统中叫做 amounttx_valuetransaction_amount。业务术语表(Glossary)建立了业务语言和技术字段之间的映射,这正是 BaFin 文档要求的:数据定义必须清晰、一致、可追溯。

6.2 创建 Glossary

操作路径: Govern → Glossary → Add Glossary

创建一个名为 FinancialDomain 的术语表,然后添加以下 Term:

  • Transaction:A record of a financial exchange between two parties
  • Account:A financial record maintained for a customer
  • Flag:Indicator that a transaction requires compliance review

6.3 将 Glossary Term 关联到表字段

进入 silver.cleaned_transactions 表,点击字段 is_flagged 旁边的编辑图标,在 Tags 中搜索并添加 Glossary Term Flag

完成后,任何人搜索 Flag 这个业务术语,都能找到这张表的这个字段。这实现了从业务语义反向查找技术资产的能力。

6.4 通过 Python API 批量打标签

from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.type.tagLabel import TagLabel, TagSource, LabelType, State

# 给字段批量添加标签
from metadata.generated.schema.entity.data.table import Column

# 获取表
table = metadata.get_by_name(
    entity=Table,
    fqn="finlakehouse-postgres.finlakehouse_demo.silver.cleaned_transactions",
    fields=["columns", "tags"]
)

# 使用 patch 更新字段标签
# 这是更新已有 Entity 属性的标准方式
metadata.patch_column_tags(
    table=table,
    column_fqn="finlakehouse-postgres.finlakehouse_demo.silver.cleaned_transactions.account_id",
    tag_labels=[
        TagLabel(
            tagFQN="PII.Sensitive",   # OpenMetadata 内置的 PII 分类标签
            labelType=LabelType.Automated,
            state=State.Suggested,
            source=TagSource.Classification,
        )
    ]
)

思维提升: 注意 LabelType.Automatedstate=State.Suggested——这意味着这个标签是系统自动建议的,需要人工确认。这种设计模式(机器发现 + 人工审核)是企业级数据治理的标准范式。在 FinLakehouse 中,你可以构建一个自动 PII 检测器,扫描字段名和示例数据,自动建议敏感标签,然后等待数据负责人审核。


模块七:REST API 深度使用

OpenMetadata 是 API-first 平台,所有 UI 操作背后都是 REST API。掌握 API 使你能构建自定义自动化。

7.1 获取 JWT Token

操作路径: Settings → Bots → ingestion-bot → Token

复制 JWT Token,这是调用 API 的凭证。在生产环境中,应将其存储为 Secret(Kubernetes Secret 或 Vault)。

7.2 探索 API 文档

访问 http://localhost:8585/docs,这是完整的 Swagger UI。所有 API 都在这里有文档。

重要的 API 端点有:

GET  /api/v1/tables?database=<fqn>         # 列出某数据库下的所有表
GET  /api/v1/tables/<id>/lineage           # 获取某表的血缘图
POST /api/v1/lineage                        # 添加血缘关系
GET  /api/v1/tables/<id>/dataQuality       # 获取数据质量测试结果
PATCH /api/v1/tables/<id>                  # 更新表的元数据(描述、标签等)
GET  /api/v1/search/query?q=<keyword>      # 全文搜索

7.3 用 curl 快速调用 API

# 设置变量
OMETA_HOST="http://localhost:8585"
JWT_TOKEN="your_jwt_token_here"

# 搜索包含 "transaction" 的表
curl -s \
  -H "Authorization: Bearer $JWT_TOKEN" \
  "$OMETA_HOST/api/v1/search/query?q=transaction&index=table_search_index" \
  | python3 -m json.tool | head -50

# 获取特定表的详情(包含字段信息)
TABLE_FQN="finlakehouse-postgres.finlakehouse_demo.bronze.raw_transactions"
curl -s \
  -H "Authorization: Bearer $JWT_TOKEN" \
  "$OMETA_HOST/api/v1/tables/name/$TABLE_FQN?fields=columns,tags,followers" \
  | python3 -m json.tool

7.4 构建一个迷你"数据目录查询工具"

以下是一个实用的 Python 脚本,封装了常用的 API 操作:

# ometa_client.py — 你的 OpenMetadata 工具包
import requests
import json
from typing import Optional, List, Dict

class OMetaClient:
    """
    OpenMetadata REST API 客户端
    封装常用操作,供 FinLakehouse 其他组件调用
    """
    
    def __init__(self, host: str, jwt_token: str):
        self.base_url = f"{host}/api/v1"
        self.headers = {
            "Authorization": f"Bearer {jwt_token}",
            "Content-Type": "application/json"
        }
    
    def search_tables(self, keyword: str, limit: int = 10) -> List[Dict]:
        """全文搜索表"""
        resp = requests.get(
            f"{self.base_url}/search/query",
            params={"q": keyword, "index": "table_search_index", "from": 0, "size": limit},
            headers=self.headers
        )
        resp.raise_for_status()
        hits = resp.json().get("hits", {}).get("hits", [])
        return [h["_source"] for h in hits]
    
    def get_table_lineage(self, table_fqn: str, depth: int = 2) -> Dict:
        """获取表的血缘图(上下游各 depth 层)"""
        # 先通过 FQN 获取 table id
        table_resp = requests.get(
            f"{self.base_url}/tables/name/{table_fqn}",
            headers=self.headers
        )
        table_resp.raise_for_status()
        table_id = table_resp.json()["id"]
        
        # 获取血缘
        lineage_resp = requests.get(
            f"{self.base_url}/lineage/table/{table_id}",
            params={"upstreamDepth": depth, "downstreamDepth": depth},
            headers=self.headers
        )
        lineage_resp.raise_for_status()
        return lineage_resp.json()
    
    def update_table_description(self, table_fqn: str, description: str) -> Dict:
        """更新表描述(使用 PATCH 请求)"""
        # 先获取当前版本
        table_resp = requests.get(
            f"{self.base_url}/tables/name/{table_fqn}",
            headers=self.headers
        )
        table_resp.raise_for_status()
        
        # 使用 JSON Patch 格式更新
        patch_body = [
            {"op": "add", "path": "/description", "value": description}
        ]
        patch_headers = {**self.headers, "Content-Type": "application/json-patch+json"}
        table_id = table_resp.json()["id"]
        
        update_resp = requests.patch(
            f"{self.base_url}/tables/{table_id}",
            data=json.dumps(patch_body),
            headers=patch_headers
        )
        update_resp.raise_for_status()
        return update_resp.json()
    
    def list_tables_by_tag(self, tag_fqn: str) -> List[Dict]:
        """查找所有带有特定标签的表"""
        resp = requests.get(
            f"{self.base_url}/search/query",
            params={
                "q": f"tags.tagFQN:{tag_fqn}",
                "index": "table_search_index",
                "size": 50
            },
            headers=self.headers
        )
        resp.raise_for_status()
        hits = resp.json().get("hits", {}).get("hits", [])
        return [h["_source"] for h in hits]


# 使用示例
if __name__ == "__main__":
    client = OMetaClient(
        host="http://localhost:8585",
        jwt_token="YOUR_TOKEN"
    )
    
    # 搜索所有 transaction 相关的表
    tables = client.search_tables("transaction")
    print(f"Found {len(tables)} tables:")
    for t in tables:
        print(f"  - {t['fullyQualifiedName']}: {t.get('description', 'No description')}")
    
    # 查找所有 PII 标签的表
    pii_tables = client.list_tables_by_tag("PII.Sensitive")
    print(f"\nTables with PII data: {len(pii_tables)}")

练习 7.1: 运行这个脚本,然后扩展它,添加一个 generate_compliance_report() 方法,输出所有 PII 表的名称、负责人和最后修改时间。这正是 BaFin 审计时需要的报告格式。


模块八:自定义 Connector 开发

当你的数据源是内部系统或 OpenMetadata 暂不支持的工具时,就需要开发自定义 Connector。这是区分"会用"和"能扩展"的关键能力。

8.1 Connector 的工作原理

每个 Connector 都继承自 Source 基类,需要实现以下方法:

prepare()        → 初始化连接
yield_create_request_database_service()  → 创建 Service
yield_database()       → 产生 Database Entity
yield_database_schema()  → 产生 Schema Entity
yield_table()          → 产生 Table Entity(最核心)
close()          → 关闭连接清理资源

yield_* 是 Python Generator 模式——Connector 不是一次性返回所有数据,而是按需产生 Entity。这使得处理大型数据仓库(数千张表)时内存效率极高。

8.2 为 Delta Lake 表开发 Connector(简化版)

以下展示如何为你的 MinIO + Delta Lake 表开发一个基础 Connector:

# custom_delta_connector.py

from typing import Iterable, Optional
from metadata.ingestion.api.source import Source
from metadata.generated.schema.entity.data.table import (
    Column, DataType, Table, TableType
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import CreateDatabaseSchemaRequest
from delta import DeltaTable
import pyarrow as pa
from pyspark.sql import SparkSession

class DeltaLakeConnector(Source):
    """
    自定义 Delta Lake Connector
    从 MinIO 上的 Delta 表读取元数据,注入 OpenMetadata
    
    设计思路:
    - 通过 Spark 读取 Delta 表的 schema(不读实际数据)
    - 通过 Delta Log 获取 commit 历史和统计信息
    - 将这些转换为 OpenMetadata 的 Table Entity
    """
    
    def __init__(self, config, metadata):
        super().__init__()
        self.config = config
        self.metadata = metadata
        self.spark = None
    
    def prepare(self):
        """初始化 Spark Session,配置连接到 MinIO"""
        self.spark = (
            SparkSession.builder
            .appName("OpenMetadata-DeltaIngestion")
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .config("spark.hadoop.fs.s3a.endpoint", self.config.minio_endpoint)
            .config("spark.hadoop.fs.s3a.access.key", self.config.access_key)
            .config("spark.hadoop.fs.s3a.secret.key", self.config.secret_key)
            .getOrCreate()
        )
    
    def _pyarrow_to_ometa_type(self, pa_type) -> DataType:
        """将 PyArrow 数据类型映射到 OpenMetadata 数据类型"""
        type_map = {
            pa.int32(): DataType.INT,
            pa.int64(): DataType.BIGINT,
            pa.float32(): DataType.FLOAT,
            pa.float64(): DataType.DOUBLE,
            pa.string(): DataType.VARCHAR,
            pa.bool_(): DataType.BOOLEAN,
            pa.date32(): DataType.DATE,
            pa.timestamp("us"): DataType.TIMESTAMP,
        }
        return type_map.get(pa_type, DataType.UNKNOWN)
    
    def yield_table(self, table_path: str, table_name: str, schema_name: str) -> Iterable[CreateTableRequest]:
        """读取 Delta 表的 schema,产生 Table Entity"""
        
        # 读取 Delta 表(只读 schema,不读数据)
        delta_table = DeltaTable.forPath(self.spark, table_path)
        arrow_schema = delta_table.toDF().schema
        
        # 获取 Delta Log 中的统计信息
        history = delta_table.history(1).collect()[0]
        row_count = delta_table.toDF().count()  # 生产中应从统计文件读取
        
        # 构建字段列表
        columns = []
        for i, field in enumerate(arrow_schema.fields):
            columns.append(
                Column(
                    name=field.name,
                    dataType=self._pyarrow_to_ometa_type(field.dataType),
                    description=f"Column {field.name}",
                    ordinalPosition=i + 1,
                )
            )
        
        yield CreateTableRequest(
            name=table_name,
            databaseSchema=f"finlakehouse-delta.{schema_name}",
            tableType=TableType.Regular,
            columns=columns,
            description=f"Delta table at {table_path}. Last modified: {history['timestamp']}",
        )
    
    def close(self):
        if self.spark:
            self.spark.stop()

练习 8.1: 思考以下扩展方向:如果你想在 Connector 中自动检测字段是否可能包含 PII(比如字段名包含 emailphonename),应该在哪个方法里添加这个逻辑?应该把 PII 标签设置为 Suggested 还是 Confirmed?为什么?


模块九:与 Dagster 集成

9.1 集成原理

Dagster 和 OpenMetadata 的集成通过两种方式实现:

方式一:使用 Dagster 的 OpenMetadata IO Manager 在 Asset 执行后,自动将 Asset 的元数据同步到 OpenMetadata。

方式二:在 Op/Asset 中显式调用 OpenMetadata API 在 Dagster Job 完成后,调用 OpenMetadata Python SDK 上报血缘和统计信息。

推荐方式二,因为它更灵活,你对血缘的粒度有完全控制。

9.2 Dagster Asset 中的血缘上报

# dagster_with_ometa.py

from dagster import asset, AssetExecutionContext, OpExecutionContext
from ometa_client import OMetaClient  # 我们在模块七写的客户端

# 初始化 OpenMetadata 客户端(生产中从 Secrets Manager 获取 token)
ometa = OMetaClient(
    host="http://openmetadata:8585",
    jwt_token="YOUR_TOKEN"
)

@asset(
    group_name="bronze_layer",
    description="Ingest raw transactions from source system"
)
def raw_transactions(context: AssetExecutionContext):
    """Bronze 层原始数据摄取"""
    
    # 实际 ETL 逻辑
    df = load_from_source_system()
    df.write.format("delta").save("s3a://bronze/transactions/")
    
    # 上报元数据到 OpenMetadata
    row_count = df.count()
    context.add_output_metadata({"row_count": row_count})
    
    # 更新 OpenMetadata 中的表描述和统计
    ometa.update_table_description(
        table_fqn="finlakehouse-delta.bronze.raw_transactions",
        description=f"Last loaded {row_count} rows at {context.run.start_time}"
    )
    
    return df

@asset(
    group_name="silver_layer",
    deps=["raw_transactions"],
    description="Clean and normalize raw transactions"
)
def cleaned_transactions(context: AssetExecutionContext, raw_transactions):
    """Silver 层清洗逻辑"""
    
    df = transform(raw_transactions)
    df.write.format("delta").save("s3a://silver/transactions/")
    
    # 上报血缘:告诉 OpenMetadata 这个 Asset 的数据来自哪里
    ometa.add_lineage(
        from_fqn="finlakehouse-delta.bronze.raw_transactions",
        to_fqn="finlakehouse-delta.silver.cleaned_transactions",
        description=f"Dagster job: cleaned_transactions run {context.run_id}"
    )
    
    return df

9.3 核心思维:把元数据当作"一等公民"

在初级工程师眼中,ETL 代码 = 数据转换逻辑。在平台工程师眼中,ETL 代码 = 数据转换逻辑 + 元数据更新 + 质量检查上报 + 血缘注册

这个差别,就是你在简历上写"设计并实现了数据治理闭环"和"开发了 ETL Pipeline"的区别,也是薪资差距的来源。


模块十:生产级部署与安全

10.1 Kubernetes 部署(Helm Chart)

在生产环境中,OpenMetadata 应通过 Helm 部署在 Kubernetes 上:

# 添加 OpenMetadata Helm 仓库
helm repo add open-metadata https://helm.open-metadata.org/
helm repo update

# 创建 values.yaml(关键配置)
cat > ometa-values.yaml << 'EOF'
openmetadata:
  config:
    authentication:
      provider: "okta"  # 或 "azure",对接企业 SSO
      publicKeys:
        - "https://your-okta-domain/oauth2/v1/keys"
    
    authorizer:
      adminPrincipals:
        - "admin@yourcompany.com"
    
    database:
      # 外部 PostgreSQL(不使用内置 MySQL)
      driverClass: "org.postgresql.Driver"
      url: "jdbc:postgresql://your-pg-host:5432/openmetadata_db"
      user: "ometa_user"
      password:
        secretRef: "ometa-db-secret"
        secretKey: "password"

# 持久化存储
elasticsearch:
  enabled: true
  persistence:
    storageClass: "standard"
    size: "50Gi"
EOF

helm install openmetadata open-metadata/openmetadata \
  --namespace data-platform \
  --values ometa-values.yaml

10.2 安全配置:基于角色的访问控制(RBAC)

OpenMetadata 内置了细粒度的 RBAC 系统,关键角色有:

  • DataConsumer:只能查看和搜索,不能编辑
  • DataSteward:可以编辑元数据、管理术语表
  • DataEngineer:可以运行 Ingestion
  • Admin:完全控制

在 Fintech 场景下,你应该为不同团队分配不同角色:合规团队是 DataSteward,分析师是 DataConsumer,数据工程团队是 DataEngineer。

10.3 数据访问策略(Data Access Control)

OpenMetadata 支持定义策略来控制谁能看到哪些数据:

{
  "name": "PII-Data-Access-Policy",
  "rules": [
    {
      "name": "deny-pii-for-non-compliance",
      "effect": "DENY",
      "operations": ["ViewAll"],
      "resources": ["table"],
      "condition": "matchAnyTag('PII.Sensitive') && !inAnyTeam('compliance-team')"
    }
  ]
}

这个策略确保:只有合规团队成员才能访问被标记为 PII.Sensitive 的数据表。这是 GDPR 和 BaFin 数据访问控制要求的技术实现。


模块十一:综合实战项目

完成前面所有模块后,通过这个综合项目验证你的掌握程度。

项目:为 FinLakehouse 构建完整数据目录

目标: 建立一个覆盖 Bronze→Silver→Gold 全链路的数据目录,包含元数据、血缘、质量测试和 PII 标记。

步骤一:环境准备(30 分钟)

启动你的本地 OpenMetadata + PostgreSQL 环境,创建包含以下三层表的数据库结构:

  • Bronze: raw_transactions, raw_accounts, raw_market_data
  • Silver: cleaned_transactions, customer_360
  • Gold: risk_score_features, daily_pnl_report

步骤二:注册所有数据资产(1 小时)

编写自动化脚本,通过 Ingestion API 或手动配置,将所有表注册到 OpenMetadata。为每张表写好业务描述(用英文,模拟实际工作场景)。

步骤三:建立完整血缘(1 小时)

用 Python SDK 建立以下血缘链:

raw_transactions + raw_accounts → cleaned_transactions → customer_360 → risk_score_features
raw_market_data → daily_pnl_report

步骤四:配置数据质量测试(1 小时)

为每层的核心表创建至少 3 个 Test Case,覆盖:

  • Null 检查(关键字段不允许为空)
  • 范围检查(金额类字段合理范围)
  • 唯一性检查(主键唯一)

步骤五:PII 标记与合规文档(30 分钟)

识别所有包含 PII 数据的字段(账户 ID、姓名、金额等),通过 API 批量添加 PII.Sensitive 标签,并使用 OMetaClient.list_tables_by_tag() 生成一份 PII 资产清单(模拟合规审计报告)。

验收标准:

  • 所有表在 OpenMetadata UI 中可搜索且有描述
  • 血缘图可以从 raw_transactions 一路追踪到 risk_score_features
  • 所有数据质量测试运行通过
  • PII 资产报告能够列出所有敏感字段

总结:OpenMetadata 能力地图

Level 1 - 基础使用(本手册 1-5 模块)
├── 部署 OpenMetadata 本地环境
├── 配置 Database/Pipeline Service
├── 运行 Metadata Ingestion
├── 手动管理血缘
└── 配置数据质量测试

Level 2 - 工程化(本手册 6-8 模块)
├── 通过 Python SDK 自动化所有操作
├── 在 Dagster/Airflow 中集成血缘上报
├── 构建业务术语表和 PII 标记自动化
└── 开发自定义 Connector

Level 3 - 平台架构(本手册 9-11 模块)
├── Kubernetes 生产级部署
├── 企业 SSO 和 RBAC 配置
├── 数据访问控制策略
└── 构建端到端治理闭环

附录:常用命令速查

# 启动本地环境
docker compose up -d

# 查看摄取日志
docker compose logs -f ingestion

# 重启服务(更新配置后)
docker compose restart openmetadata_server

# 备份 OpenMetadata 数据库(生产环境必做)
docker exec -it openmetadata_mysql mysqldump \
  -u openmetadata -popenmetadata openmetadata_db > backup_$(date +%Y%m%d).sql

# 通过 CLI 触发 Ingestion(不需要 UI)
metadata ingest -c my_postgres_config.yaml
# Python SDK 快速参考

from metadata.ingestion.ometa.ometa_api import OpenMetadata

# 常用 Entity 操作
metadata.get_by_name(entity=Table, fqn="service.db.schema.table")
metadata.get_by_id(entity=Table, entity_id="uuid")
metadata.list_entities(entity=Table)
metadata.create_or_update(CreateTableRequest(...))
metadata.patch_description(entity=Table, source=table, description="text")
metadata.add_lineage(AddLineageRequest(...))
metadata.delete(entity=Table, entity_id="uuid", hard_delete=False)

手册版本:2025 年版 | 基于 OpenMetadata 1.3.x
作者:FinLakehouse Platform Documentation