OpenMetadata 完整训练手册
目标读者: AI/Data Platform Engineer,具备 Python、SQL、Spark 基础
学习目标: 从零开始,系统掌握 OpenMetadata 的核心功能,并能在生产级数据平台(如 FinLakehouse)中独立运用
预计时间: 约 12–16 小时(分模块完成)
为什么要学 OpenMetadata?
在现代数据平台中,数据治理是区分"会用工具的工程师"和"能设计平台的架构师"的关键分水岭。
OpenMetadata 是目前开源社区中功能最完整的元数据管理平台,它解决了一个核心问题:当数据资产越来越多,谁能告诉我——这张表是什么意思?它从哪来?它是否可信?
对于你的职业定位(Regulated AI Platform Engineer,目标 Frankfurt Fintech),OpenMetadata 的价值在于:
- 它原生支持数据血缘(Lineage),这是 DORA 和 BaFin 合规审计的技术基础
- 它提供 API-first 架构,可与 Dagster、Airflow、MLflow 深度集成
- 它是 Databricks Unity Catalog 的开源替代品之一,掌握它可以强化你的"平台工程师"叙事
模块一:理解 OpenMetadata 的架构与核心概念
1.1 OpenMetadata 是什么?
OpenMetadata 是一个开放标准的元数据平台,它的核心思想是:把所有数据资产(表、仪表板、Pipeline、ML 模型、API)统一抽象成"实体(Entity)",并通过一套标准 API 进行管理。
你可以把它想象成数据资产的"GitHub":就像 GitHub 管理代码,OpenMetadata 管理数据的"定义、血缘和质量"。
1.2 核心架构组件
OpenMetadata 由四个主要部分构成:
① OpenMetadata Server(核心 API 服务)
这是整个平台的大脑,基于 Java 开发,提供 RESTful API。所有的元数据读写都经过这里。它背后使用 MySQL/PostgreSQL 作为持久化存储,Elasticsearch 作为搜索引擎。
② Ingestion Framework(摄取框架)
这是一个 Python 框架,负责从各种数据源(Postgres、Snowflake、Spark、Airflow 等)自动抓取元数据。它不抓实际数据,只抓"关于数据的数据"——表结构、字段类型、行数统计、血缘关系等。
③ UI(Web 界面)
React 前端,提供搜索、浏览、编辑、标注元数据的可视化界面。数据工程师和业务人员都通过这里访问数据目录。
④ Connectors(连接器)
OpenMetadata 内置了 80+ 个连接器,覆盖主流数据库、Data Lake、BI 工具、Pipeline 工具。每个 Connector 本质上是一个实现了标准接口的 Python 类。
1.3 核心概念:Entity(实体)
OpenMetadata 中一切皆为 Entity。常见的 Entity 类型有:
Table:数据库表或 Delta/Iceberg 表Pipeline:Airflow DAG 或 Dagster JobDashboard:Grafana/Superset 仪表板MlModel:MLflow 中注册的模型Database/DatabaseSchema:层级组织容器Glossary/GlossaryTerm:业务术语表
每个 Entity 都有唯一的 FQN(Fully Qualified Name),格式如:service_name.database.schema.table_name。这是你在 API 调用中引用任何资产的标准方式。
1.4 核心概念:Lineage(血缘)
血缘描述的是数据的"因果链":这张表的数据从哪里来,经过了哪些转换,最终流向了哪里。在 OpenMetadata 中,血缘是一个有向无环图(DAG),节点是 Entity,边是转换关系。
思维提升: 血缘不只是一个"好看的功能",它是合规的技术证明。当监管机构问"这个风控模型的特征数据来源于哪里",血缘图就是你的答案。这就是为什么 FinLakehouse 需要它。
模块二:本地环境搭建
2.1 使用 Docker Compose 快速启动
OpenMetadata 官方提供了完整的 Docker Compose 配置,这是最快的本地实验方式。
前置要求:
- Docker Desktop(内存至少分配 6GB)
- Docker Compose v2.x
步骤一:下载官方配置文件
# 创建工作目录
mkdir openmetadata-local && cd openmetadata-local
# 下载官方 docker-compose 文件
curl -sL https://github.com/open-metadata/OpenMetadata/releases/latest/download/docker-compose.yml \
-o docker-compose.yml
步骤二:启动服务
docker compose up -d
# 查看所有容器状态,等待全部变为 healthy
docker compose ps
启动过程需要 3–5 分钟,因为要初始化数据库 schema 和 Elasticsearch 索引。
步骤三:访问 UI
打开浏览器访问 http://localhost:8585,默认账号密码为:
- Username:
admin - Password:
admin
进入 UI 后,你会看到一个空的数据目录——这是正常的,我们接下来会填充它。
2.2 理解启动时发生了什么
当你运行 docker compose up 时,后台发生了以下事情:
- MySQL 容器启动,OpenMetadata Server 运行数据库迁移脚本,创建所有元数据表
- Elasticsearch 容器启动,OpenMetadata Server 创建搜索索引
- openmetadata-server 容器启动,加载默认配置(包含内置的数据类型分类、默认角色等)
- ingestion 容器启动,这是运行 Ingestion Pipeline 的工作节点
练习 2.1: 运行 docker compose logs openmetadata_server | grep "Server started" 确认服务成功启动,然后在浏览器中探索 UI 的各个菜单。
模块三:第一个 Ingestion Pipeline——连接 PostgreSQL
这是最重要的实践模块。我们会把一个 PostgreSQL 数据库接入 OpenMetadata,让它自动发现所有的表和字段。
3.1 准备测试数据库
如果你的 SoloLakehouse 环境中已有 PostgreSQL(比如 Hive Metastore 使用的那个),可以直接使用。否则,用以下命令启动一个测试实例:
# 启动测试 PostgreSQL,并预加载一些测试数据
docker run -d \
--name pg-test \
--network openmetadata-local_app_net \
-e POSTGRES_PASSWORD=test123 \
-e POSTGRES_DB=finlakehouse_demo \
postgres:15
# 进入容器,创建一些示例表
docker exec -it pg-test psql -U postgres -d finlakehouse_demo -c "
CREATE SCHEMA IF NOT EXISTS bronze;
CREATE SCHEMA IF NOT EXISTS silver;
CREATE TABLE bronze.raw_transactions (
tx_id VARCHAR(50) PRIMARY KEY,
account_id VARCHAR(50),
amount DECIMAL(18,2),
currency VARCHAR(3),
tx_timestamp TIMESTAMP,
source_system VARCHAR(50)
);
CREATE TABLE silver.cleaned_transactions (
tx_id VARCHAR(50) PRIMARY KEY,
account_id VARCHAR(50),
amount_eur DECIMAL(18,2),
tx_date DATE,
is_flagged BOOLEAN,
processed_at TIMESTAMP
);
"
这模拟了你 FinLakehouse 中的 Bronze→Silver 数据分层架构。
3.2 在 UI 中创建 Database Service
操作路径: Settings → Services → Databases → Add New Service
在 UI 中按以下步骤配置:
- 选择 Service Type:PostgreSQL
- Service Name:
finlakehouse-postgres - 填写连接信息:
- Host:
pg-test(Docker 网络内的主机名) - Port:
5432 - Username:
postgres - Password:
test123 - Database:
finlakehouse_demo
- Host:
点击 Test Connection 验证连通性,成功后点击 Save。
3.3 配置并运行 Metadata Ingestion
Service 创建后,点击进入该 Service,选择 Ingestion 标签页,点击 Add Ingestion。
选择 Metadata 类型(这会抓取表结构、字段等基础元数据)。
配置关键选项:
- Schema Filter Pattern:
bronze|silver(只摄取这两个 schema) - Include Views:开启
- Mark Deleted Tables:开启(当源库中表被删除时,OpenMetadata 中标记为已删除而非直接删除,保留历史)
点击 Run Now 执行摄取。
练习 3.1: 摄取完成后,在 Explore → Tables 中搜索 raw_transactions,查看它的 Schema、字段类型和 Sample Data。
3.4 理解摄取背后发生了什么
Ingestion Container
│
▼
PostgreSQL Connector
(连接 PG,执行 information_schema 查询)
│
▼
Entity Builder
(把查询结果转换为 OpenMetadata 的 Table Entity JSON)
│
▼
OpenMetadata REST API
(PUT /api/v1/tables → 存入 MySQL + Elasticsearch)
关键洞察:Connector 不读取你的实际业务数据,它只读取 information_schema、pg_catalog 等系统表中的元数据。这意味着你可以安全地在生产库上运行摄取,而不会有数据泄漏风险。
模块四:数据血缘(Lineage)管理
血缘管理是 OpenMetadata 最核心的企业级功能,也是你在 FinLakehouse 场景下最需要掌握的能力。
4.1 手动添加血缘关系(UI 方式)
操作路径: 找到目标表 → Lineage 标签页 → Edit Lineage
在可视化编辑器中,你可以:
- 点击 + 添加上游(Upstream)或下游(Downstream)节点
- 搜索并选择相关的 Table 或 Pipeline
- 可以在连线上点击,添加具体的字段级血缘(Column-level Lineage)
练习 4.1: 手动创建以下血缘关系:
bronze.raw_transactions → silver.cleaned_transactions
4.2 通过 Python API 自动添加血缘(推荐方式)
手动添加血缘只适合演示,生产中应该在 Pipeline 代码中自动上报血缘。
首先安装 OpenMetadata Python SDK:
pip install "openmetadata-ingestion[postgres]"
然后编写血缘上报代码:
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
AuthProvider,
)
# 配置连接
server_config = OpenMetadataConnection(
hostPort="http://localhost:8585/api",
authProvider=AuthProvider.openmetadata,
securityConfig=OpenMetadataJWTClientConfig(
jwtToken="YOUR_JWT_TOKEN" # 从 Settings → Bots 获取
),
)
metadata = OpenMetadata(server_config)
# 获取源表和目标表的 Entity 引用
# FQN 格式: service.database.schema.table
source_table = metadata.get_by_name(
entity=Table,
fqn="finlakehouse-postgres.finlakehouse_demo.bronze.raw_transactions"
)
target_table = metadata.get_by_name(
entity=Table,
fqn="finlakehouse-postgres.finlakehouse_demo.silver.cleaned_transactions"
)
# 构建血缘请求
lineage_request = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=source_table.id, type="table"),
toEntity=EntityReference(id=target_table.id, type="table"),
lineageDetails=LineageDetails(
# 可选:记录列级血缘
# columnsLineage=[...]
description="Bronze→Silver ETL: currency normalization and deduplication"
)
)
)
# 提交血缘
metadata.add_lineage(lineage_request)
print("Lineage added successfully!")
思维提升: 注意这个代码的本质是在 Pipeline 执行时"自我报告"血缘。更高级的做法是让 Dagster/Airflow 的 Hook 自动在每次 Job 成功后调用此 API,实现血缘的完全自动化。这就是"Platform Engineering"和"写脚本"的区别——前者设计系统让血缘自动流动,后者手动维护。
4.3 Pipeline 血缘:连接 Airflow
OpenMetadata 提供了 Airflow 的原生集成。在你的 DAG 中添加以下配置,血缘会在 DAG 运行时自动上报:
from airflow import DAG
from airflow.operators.python import PythonOperator
from openmetadata_managed_apis.workflows.ingestion.airflow_lineage_backend import (
OpenMetadataLineageBackend
)
# airflow.cfg 中配置(或通过环境变量):
# [lineage]
# backend = openmetadata_managed_apis.workflows.ingestion.airflow_lineage_backend.OpenMetadataLineageBackend
with DAG("bronze_to_silver", ...) as dag:
@dag.task(
# 在 task 上声明血缘
inlets=[{"tables": ["finlakehouse-postgres.finlakehouse_demo.bronze.raw_transactions"]}],
outlets=[{"tables": ["finlakehouse-postgres.finlakehouse_demo.silver.cleaned_transactions"]}]
)
def transform_transactions():
# 你的 ETL 逻辑
pass
模块五:数据质量(Data Quality)管理
数据质量是 OpenMetadata 的另一个杀手级功能,特别适合 Fintech 合规场景(BaFin 要求数据有可验证的质量记录)。
5.1 核心概念:Test Suite 和 Test Case
OpenMetadata 的数据质量体系分两层:
Test Case(测试用例) 是最小单元,例如:"字段 amount 的值必须大于 0"、"表 raw_transactions 每天新增行数不少于 1000"。
Test Suite(测试套件) 是 Test Case 的集合,绑定到一张表上,定期执行。
5.2 通过 UI 创建数据质量测试
操作路径: 找到目标表 → Quality 标签页 → Add Test
选择 Test 类型,OpenMetadata 内置了以下常用测试:
columnValuesToBeNotNull:字段不允许为空columnValuesToBeBetween:字段值在某个范围内columnValuesToBeUnique:字段值唯一tableRowCountToBeBetween:表的行数在某个范围内columnValueLengthsToBeBetween:字段字符串长度范围
练习 5.1: 为 bronze.raw_transactions 创建以下测试:
tx_id字段不允许为空(columnValuesToBeNotNull)amount字段值必须大于 0(columnValuesToBeBetween,min=0)- 表行数在 0 到 1,000,000 之间
5.3 通过 Python SDK 创建测试(自动化方式)
from metadata.generated.schema.tests.basic import TestCaseResult, TestResultValue
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.generated.schema.tests.testDefinition import TestPlatform
# 创建 Test Suite
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
test_suite = metadata.create_or_update(
CreateTestSuiteRequest(
name="finlakehouse_bronze_quality",
description="Data quality tests for Bronze layer tables",
)
)
# 创建 Test Case:amount 必须大于 0
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
test_case = metadata.create_or_update(
CreateTestCaseRequest(
name="amount_must_be_positive",
entityLink="<#E::table::finlakehouse-postgres.finlakehouse_demo.bronze.raw_transactions::columns::amount>",
testDefinition="columnValuesToBeBetween",
testSuite=test_suite.fullyQualifiedName,
parameterValues=[
TestCaseParameterValue(name="minValue", value="0"),
],
)
)
print(f"Test Case created: {test_case.name}")
5.4 理解 entityLink 格式
entityLink 是 OpenMetadata 中引用实体特定部分的方式,格式如下:
- 引用整张表:
<#E::table::service.db.schema.table> - 引用具体字段:
<#E::table::service.db.schema.table::columns::column_name>
这个格式设计的很精妙——它把"实体类型"和"路径"都编码进一个字符串,使得 API 可以通用地处理各种 Entity 的引用,而不需要为每种资产类型设计不同的请求体。
模块六:业务术语表(Glossary)与标签(Tags)
6.1 为什么业务术语表对 Fintech 很重要
在金融领域,同一个概念可能有不同的技术实现。例如"交易金额"可能在多个系统中叫做 amount、tx_value、transaction_amount。业务术语表(Glossary)建立了业务语言和技术字段之间的映射,这正是 BaFin 文档要求的:数据定义必须清晰、一致、可追溯。
6.2 创建 Glossary
操作路径: Govern → Glossary → Add Glossary
创建一个名为 FinancialDomain 的术语表,然后添加以下 Term:
- Transaction:A record of a financial exchange between two parties
- Account:A financial record maintained for a customer
- Flag:Indicator that a transaction requires compliance review
6.3 将 Glossary Term 关联到表字段
进入 silver.cleaned_transactions 表,点击字段 is_flagged 旁边的编辑图标,在 Tags 中搜索并添加 Glossary Term Flag。
完成后,任何人搜索 Flag 这个业务术语,都能找到这张表的这个字段。这实现了从业务语义反向查找技术资产的能力。
6.4 通过 Python API 批量打标签
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.type.tagLabel import TagLabel, TagSource, LabelType, State
# 给字段批量添加标签
from metadata.generated.schema.entity.data.table import Column
# 获取表
table = metadata.get_by_name(
entity=Table,
fqn="finlakehouse-postgres.finlakehouse_demo.silver.cleaned_transactions",
fields=["columns", "tags"]
)
# 使用 patch 更新字段标签
# 这是更新已有 Entity 属性的标准方式
metadata.patch_column_tags(
table=table,
column_fqn="finlakehouse-postgres.finlakehouse_demo.silver.cleaned_transactions.account_id",
tag_labels=[
TagLabel(
tagFQN="PII.Sensitive", # OpenMetadata 内置的 PII 分类标签
labelType=LabelType.Automated,
state=State.Suggested,
source=TagSource.Classification,
)
]
)
思维提升: 注意 LabelType.Automated 和 state=State.Suggested——这意味着这个标签是系统自动建议的,需要人工确认。这种设计模式(机器发现 + 人工审核)是企业级数据治理的标准范式。在 FinLakehouse 中,你可以构建一个自动 PII 检测器,扫描字段名和示例数据,自动建议敏感标签,然后等待数据负责人审核。
模块七:REST API 深度使用
OpenMetadata 是 API-first 平台,所有 UI 操作背后都是 REST API。掌握 API 使你能构建自定义自动化。
7.1 获取 JWT Token
操作路径: Settings → Bots → ingestion-bot → Token
复制 JWT Token,这是调用 API 的凭证。在生产环境中,应将其存储为 Secret(Kubernetes Secret 或 Vault)。
7.2 探索 API 文档
访问 http://localhost:8585/docs,这是完整的 Swagger UI。所有 API 都在这里有文档。
重要的 API 端点有:
GET /api/v1/tables?database=<fqn> # 列出某数据库下的所有表
GET /api/v1/tables/<id>/lineage # 获取某表的血缘图
POST /api/v1/lineage # 添加血缘关系
GET /api/v1/tables/<id>/dataQuality # 获取数据质量测试结果
PATCH /api/v1/tables/<id> # 更新表的元数据(描述、标签等)
GET /api/v1/search/query?q=<keyword> # 全文搜索
7.3 用 curl 快速调用 API
# 设置变量
OMETA_HOST="http://localhost:8585"
JWT_TOKEN="your_jwt_token_here"
# 搜索包含 "transaction" 的表
curl -s \
-H "Authorization: Bearer $JWT_TOKEN" \
"$OMETA_HOST/api/v1/search/query?q=transaction&index=table_search_index" \
| python3 -m json.tool | head -50
# 获取特定表的详情(包含字段信息)
TABLE_FQN="finlakehouse-postgres.finlakehouse_demo.bronze.raw_transactions"
curl -s \
-H "Authorization: Bearer $JWT_TOKEN" \
"$OMETA_HOST/api/v1/tables/name/$TABLE_FQN?fields=columns,tags,followers" \
| python3 -m json.tool
7.4 构建一个迷你"数据目录查询工具"
以下是一个实用的 Python 脚本,封装了常用的 API 操作:
# ometa_client.py — 你的 OpenMetadata 工具包
import requests
import json
from typing import Optional, List, Dict
class OMetaClient:
"""
OpenMetadata REST API 客户端
封装常用操作,供 FinLakehouse 其他组件调用
"""
def __init__(self, host: str, jwt_token: str):
self.base_url = f"{host}/api/v1"
self.headers = {
"Authorization": f"Bearer {jwt_token}",
"Content-Type": "application/json"
}
def search_tables(self, keyword: str, limit: int = 10) -> List[Dict]:
"""全文搜索表"""
resp = requests.get(
f"{self.base_url}/search/query",
params={"q": keyword, "index": "table_search_index", "from": 0, "size": limit},
headers=self.headers
)
resp.raise_for_status()
hits = resp.json().get("hits", {}).get("hits", [])
return [h["_source"] for h in hits]
def get_table_lineage(self, table_fqn: str, depth: int = 2) -> Dict:
"""获取表的血缘图(上下游各 depth 层)"""
# 先通过 FQN 获取 table id
table_resp = requests.get(
f"{self.base_url}/tables/name/{table_fqn}",
headers=self.headers
)
table_resp.raise_for_status()
table_id = table_resp.json()["id"]
# 获取血缘
lineage_resp = requests.get(
f"{self.base_url}/lineage/table/{table_id}",
params={"upstreamDepth": depth, "downstreamDepth": depth},
headers=self.headers
)
lineage_resp.raise_for_status()
return lineage_resp.json()
def update_table_description(self, table_fqn: str, description: str) -> Dict:
"""更新表描述(使用 PATCH 请求)"""
# 先获取当前版本
table_resp = requests.get(
f"{self.base_url}/tables/name/{table_fqn}",
headers=self.headers
)
table_resp.raise_for_status()
# 使用 JSON Patch 格式更新
patch_body = [
{"op": "add", "path": "/description", "value": description}
]
patch_headers = {**self.headers, "Content-Type": "application/json-patch+json"}
table_id = table_resp.json()["id"]
update_resp = requests.patch(
f"{self.base_url}/tables/{table_id}",
data=json.dumps(patch_body),
headers=patch_headers
)
update_resp.raise_for_status()
return update_resp.json()
def list_tables_by_tag(self, tag_fqn: str) -> List[Dict]:
"""查找所有带有特定标签的表"""
resp = requests.get(
f"{self.base_url}/search/query",
params={
"q": f"tags.tagFQN:{tag_fqn}",
"index": "table_search_index",
"size": 50
},
headers=self.headers
)
resp.raise_for_status()
hits = resp.json().get("hits", {}).get("hits", [])
return [h["_source"] for h in hits]
# 使用示例
if __name__ == "__main__":
client = OMetaClient(
host="http://localhost:8585",
jwt_token="YOUR_TOKEN"
)
# 搜索所有 transaction 相关的表
tables = client.search_tables("transaction")
print(f"Found {len(tables)} tables:")
for t in tables:
print(f" - {t['fullyQualifiedName']}: {t.get('description', 'No description')}")
# 查找所有 PII 标签的表
pii_tables = client.list_tables_by_tag("PII.Sensitive")
print(f"\nTables with PII data: {len(pii_tables)}")
练习 7.1: 运行这个脚本,然后扩展它,添加一个 generate_compliance_report() 方法,输出所有 PII 表的名称、负责人和最后修改时间。这正是 BaFin 审计时需要的报告格式。
模块八:自定义 Connector 开发
当你的数据源是内部系统或 OpenMetadata 暂不支持的工具时,就需要开发自定义 Connector。这是区分"会用"和"能扩展"的关键能力。
8.1 Connector 的工作原理
每个 Connector 都继承自 Source 基类,需要实现以下方法:
prepare() → 初始化连接
yield_create_request_database_service() → 创建 Service
yield_database() → 产生 Database Entity
yield_database_schema() → 产生 Schema Entity
yield_table() → 产生 Table Entity(最核心)
close() → 关闭连接清理资源
yield_* 是 Python Generator 模式——Connector 不是一次性返回所有数据,而是按需产生 Entity。这使得处理大型数据仓库(数千张表)时内存效率极高。
8.2 为 Delta Lake 表开发 Connector(简化版)
以下展示如何为你的 MinIO + Delta Lake 表开发一个基础 Connector:
# custom_delta_connector.py
from typing import Iterable, Optional
from metadata.ingestion.api.source import Source
from metadata.generated.schema.entity.data.table import (
Column, DataType, Table, TableType
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import CreateDatabaseSchemaRequest
from delta import DeltaTable
import pyarrow as pa
from pyspark.sql import SparkSession
class DeltaLakeConnector(Source):
"""
自定义 Delta Lake Connector
从 MinIO 上的 Delta 表读取元数据,注入 OpenMetadata
设计思路:
- 通过 Spark 读取 Delta 表的 schema(不读实际数据)
- 通过 Delta Log 获取 commit 历史和统计信息
- 将这些转换为 OpenMetadata 的 Table Entity
"""
def __init__(self, config, metadata):
super().__init__()
self.config = config
self.metadata = metadata
self.spark = None
def prepare(self):
"""初始化 Spark Session,配置连接到 MinIO"""
self.spark = (
SparkSession.builder
.appName("OpenMetadata-DeltaIngestion")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.hadoop.fs.s3a.endpoint", self.config.minio_endpoint)
.config("spark.hadoop.fs.s3a.access.key", self.config.access_key)
.config("spark.hadoop.fs.s3a.secret.key", self.config.secret_key)
.getOrCreate()
)
def _pyarrow_to_ometa_type(self, pa_type) -> DataType:
"""将 PyArrow 数据类型映射到 OpenMetadata 数据类型"""
type_map = {
pa.int32(): DataType.INT,
pa.int64(): DataType.BIGINT,
pa.float32(): DataType.FLOAT,
pa.float64(): DataType.DOUBLE,
pa.string(): DataType.VARCHAR,
pa.bool_(): DataType.BOOLEAN,
pa.date32(): DataType.DATE,
pa.timestamp("us"): DataType.TIMESTAMP,
}
return type_map.get(pa_type, DataType.UNKNOWN)
def yield_table(self, table_path: str, table_name: str, schema_name: str) -> Iterable[CreateTableRequest]:
"""读取 Delta 表的 schema,产生 Table Entity"""
# 读取 Delta 表(只读 schema,不读数据)
delta_table = DeltaTable.forPath(self.spark, table_path)
arrow_schema = delta_table.toDF().schema
# 获取 Delta Log 中的统计信息
history = delta_table.history(1).collect()[0]
row_count = delta_table.toDF().count() # 生产中应从统计文件读取
# 构建字段列表
columns = []
for i, field in enumerate(arrow_schema.fields):
columns.append(
Column(
name=field.name,
dataType=self._pyarrow_to_ometa_type(field.dataType),
description=f"Column {field.name}",
ordinalPosition=i + 1,
)
)
yield CreateTableRequest(
name=table_name,
databaseSchema=f"finlakehouse-delta.{schema_name}",
tableType=TableType.Regular,
columns=columns,
description=f"Delta table at {table_path}. Last modified: {history['timestamp']}",
)
def close(self):
if self.spark:
self.spark.stop()
练习 8.1: 思考以下扩展方向:如果你想在 Connector 中自动检测字段是否可能包含 PII(比如字段名包含 email、phone、name),应该在哪个方法里添加这个逻辑?应该把 PII 标签设置为 Suggested 还是 Confirmed?为什么?
模块九:与 Dagster 集成
9.1 集成原理
Dagster 和 OpenMetadata 的集成通过两种方式实现:
方式一:使用 Dagster 的 OpenMetadata IO Manager 在 Asset 执行后,自动将 Asset 的元数据同步到 OpenMetadata。
方式二:在 Op/Asset 中显式调用 OpenMetadata API 在 Dagster Job 完成后,调用 OpenMetadata Python SDK 上报血缘和统计信息。
推荐方式二,因为它更灵活,你对血缘的粒度有完全控制。
9.2 Dagster Asset 中的血缘上报
# dagster_with_ometa.py
from dagster import asset, AssetExecutionContext, OpExecutionContext
from ometa_client import OMetaClient # 我们在模块七写的客户端
# 初始化 OpenMetadata 客户端(生产中从 Secrets Manager 获取 token)
ometa = OMetaClient(
host="http://openmetadata:8585",
jwt_token="YOUR_TOKEN"
)
@asset(
group_name="bronze_layer",
description="Ingest raw transactions from source system"
)
def raw_transactions(context: AssetExecutionContext):
"""Bronze 层原始数据摄取"""
# 实际 ETL 逻辑
df = load_from_source_system()
df.write.format("delta").save("s3a://bronze/transactions/")
# 上报元数据到 OpenMetadata
row_count = df.count()
context.add_output_metadata({"row_count": row_count})
# 更新 OpenMetadata 中的表描述和统计
ometa.update_table_description(
table_fqn="finlakehouse-delta.bronze.raw_transactions",
description=f"Last loaded {row_count} rows at {context.run.start_time}"
)
return df
@asset(
group_name="silver_layer",
deps=["raw_transactions"],
description="Clean and normalize raw transactions"
)
def cleaned_transactions(context: AssetExecutionContext, raw_transactions):
"""Silver 层清洗逻辑"""
df = transform(raw_transactions)
df.write.format("delta").save("s3a://silver/transactions/")
# 上报血缘:告诉 OpenMetadata 这个 Asset 的数据来自哪里
ometa.add_lineage(
from_fqn="finlakehouse-delta.bronze.raw_transactions",
to_fqn="finlakehouse-delta.silver.cleaned_transactions",
description=f"Dagster job: cleaned_transactions run {context.run_id}"
)
return df
9.3 核心思维:把元数据当作"一等公民"
在初级工程师眼中,ETL 代码 = 数据转换逻辑。在平台工程师眼中,ETL 代码 = 数据转换逻辑 + 元数据更新 + 质量检查上报 + 血缘注册。
这个差别,就是你在简历上写"设计并实现了数据治理闭环"和"开发了 ETL Pipeline"的区别,也是薪资差距的来源。
模块十:生产级部署与安全
10.1 Kubernetes 部署(Helm Chart)
在生产环境中,OpenMetadata 应通过 Helm 部署在 Kubernetes 上:
# 添加 OpenMetadata Helm 仓库
helm repo add open-metadata https://helm.open-metadata.org/
helm repo update
# 创建 values.yaml(关键配置)
cat > ometa-values.yaml << 'EOF'
openmetadata:
config:
authentication:
provider: "okta" # 或 "azure",对接企业 SSO
publicKeys:
- "https://your-okta-domain/oauth2/v1/keys"
authorizer:
adminPrincipals:
- "admin@yourcompany.com"
database:
# 外部 PostgreSQL(不使用内置 MySQL)
driverClass: "org.postgresql.Driver"
url: "jdbc:postgresql://your-pg-host:5432/openmetadata_db"
user: "ometa_user"
password:
secretRef: "ometa-db-secret"
secretKey: "password"
# 持久化存储
elasticsearch:
enabled: true
persistence:
storageClass: "standard"
size: "50Gi"
EOF
helm install openmetadata open-metadata/openmetadata \
--namespace data-platform \
--values ometa-values.yaml
10.2 安全配置:基于角色的访问控制(RBAC)
OpenMetadata 内置了细粒度的 RBAC 系统,关键角色有:
DataConsumer:只能查看和搜索,不能编辑DataSteward:可以编辑元数据、管理术语表DataEngineer:可以运行 IngestionAdmin:完全控制
在 Fintech 场景下,你应该为不同团队分配不同角色:合规团队是 DataSteward,分析师是 DataConsumer,数据工程团队是 DataEngineer。
10.3 数据访问策略(Data Access Control)
OpenMetadata 支持定义策略来控制谁能看到哪些数据:
{
"name": "PII-Data-Access-Policy",
"rules": [
{
"name": "deny-pii-for-non-compliance",
"effect": "DENY",
"operations": ["ViewAll"],
"resources": ["table"],
"condition": "matchAnyTag('PII.Sensitive') && !inAnyTeam('compliance-team')"
}
]
}
这个策略确保:只有合规团队成员才能访问被标记为 PII.Sensitive 的数据表。这是 GDPR 和 BaFin 数据访问控制要求的技术实现。
模块十一:综合实战项目
完成前面所有模块后,通过这个综合项目验证你的掌握程度。
项目:为 FinLakehouse 构建完整数据目录
目标: 建立一个覆盖 Bronze→Silver→Gold 全链路的数据目录,包含元数据、血缘、质量测试和 PII 标记。
步骤一:环境准备(30 分钟)
启动你的本地 OpenMetadata + PostgreSQL 环境,创建包含以下三层表的数据库结构:
- Bronze:
raw_transactions,raw_accounts,raw_market_data - Silver:
cleaned_transactions,customer_360 - Gold:
risk_score_features,daily_pnl_report
步骤二:注册所有数据资产(1 小时)
编写自动化脚本,通过 Ingestion API 或手动配置,将所有表注册到 OpenMetadata。为每张表写好业务描述(用英文,模拟实际工作场景)。
步骤三:建立完整血缘(1 小时)
用 Python SDK 建立以下血缘链:
raw_transactions + raw_accounts → cleaned_transactions → customer_360 → risk_score_features
raw_market_data → daily_pnl_report
步骤四:配置数据质量测试(1 小时)
为每层的核心表创建至少 3 个 Test Case,覆盖:
- Null 检查(关键字段不允许为空)
- 范围检查(金额类字段合理范围)
- 唯一性检查(主键唯一)
步骤五:PII 标记与合规文档(30 分钟)
识别所有包含 PII 数据的字段(账户 ID、姓名、金额等),通过 API 批量添加 PII.Sensitive 标签,并使用 OMetaClient.list_tables_by_tag() 生成一份 PII 资产清单(模拟合规审计报告)。
验收标准:
- 所有表在 OpenMetadata UI 中可搜索且有描述
- 血缘图可以从
raw_transactions一路追踪到risk_score_features - 所有数据质量测试运行通过
- PII 资产报告能够列出所有敏感字段
总结:OpenMetadata 能力地图
Level 1 - 基础使用(本手册 1-5 模块)
├── 部署 OpenMetadata 本地环境
├── 配置 Database/Pipeline Service
├── 运行 Metadata Ingestion
├── 手动管理血缘
└── 配置数据质量测试
Level 2 - 工程化(本手册 6-8 模块)
├── 通过 Python SDK 自动化所有操作
├── 在 Dagster/Airflow 中集成血缘上报
├── 构建业务术语表和 PII 标记自动化
└── 开发自定义 Connector
Level 3 - 平台架构(本手册 9-11 模块)
├── Kubernetes 生产级部署
├── 企业 SSO 和 RBAC 配置
├── 数据访问控制策略
└── 构建端到端治理闭环
附录:常用命令速查
# 启动本地环境
docker compose up -d
# 查看摄取日志
docker compose logs -f ingestion
# 重启服务(更新配置后)
docker compose restart openmetadata_server
# 备份 OpenMetadata 数据库(生产环境必做)
docker exec -it openmetadata_mysql mysqldump \
-u openmetadata -popenmetadata openmetadata_db > backup_$(date +%Y%m%d).sql
# 通过 CLI 触发 Ingestion(不需要 UI)
metadata ingest -c my_postgres_config.yaml
# Python SDK 快速参考
from metadata.ingestion.ometa.ometa_api import OpenMetadata
# 常用 Entity 操作
metadata.get_by_name(entity=Table, fqn="service.db.schema.table")
metadata.get_by_id(entity=Table, entity_id="uuid")
metadata.list_entities(entity=Table)
metadata.create_or_update(CreateTableRequest(...))
metadata.patch_description(entity=Table, source=table, description="text")
metadata.add_lineage(AddLineageRequest(...))
metadata.delete(entity=Table, entity_id="uuid", hard_delete=False)
手册版本:2025 年版 | 基于 OpenMetadata 1.3.x
作者:FinLakehouse Platform Documentation