从 HBase 迁移到 OceanBase:Flink 实时数据写入的终极方案

你正在为 HBase 运维成本高昂而发愁吗?

你是否在寻找实时数据流的最佳落地存储方案?

Flink + OceanBase OBKV-HBase,可能是你一直在等的答案!

为什么你需要关注这个方案?

场景一:HBase 迁移的”最后一公里”

如果你的团队正在考虑从 HBase 迁移到 OceanBase,但担心:现有的 Flink 实时数据流如何无缝迁移?HBase API 兼容性如何保证?迁移后吞吐、延迟和稳定性能否满足线上要求?

这个方案正是为你准备的! OceanBase OBKV-HBase 完全兼容 HBase API,配合 Flink 连接器,可让你平滑的进行迁移。

场景二:实时数仓建设中的”数据入湖”

如果你正在构建实时数仓,需要将 Kafka 中的实时数据流写入宽表存储、支持高并发写入并满足近实时查询与分析需求、保证数据一致性和可靠性。

Flink + OceanBase 组合能完美解决你的痛点! Flink 负责流式处理,OceanBase 负责在线存储与查询。

场景三:AI/LLM 应用中的数据存储

在 AI 时代,OceanBase 的 HBase 模式天然支持灵活的表结构,配合 Flink 的实时处理能力,可以轻松构建 AI 应用的数据底座。

核心优势:为什么选择 OceanBase OBKV-HBase?

🚀 兼容 HBase API

对于熟悉 HBase 的开发者,迁移成本几乎为零!只需修改连接配置即可无缝切换。

⚡ 高性能实时写入

支持缓冲批量写入:批处理场景高吞吐数据导入,实时场景延迟低至毫秒级。

🔄 流批一体

既支持 Flink 的流式写入,也支持批处理写入,一套系统满足多种需求。

🛡️ 企业级可靠性

完善的容错机制,OceanBase 原生高可用,支持多副本高可用与强一致能力。

快速开始

OBKV-HBase 是 OceanBase 提供的兼容 HBase 接口的宽表数据库。Flink OBKV HBase 连接器(flink-connector-obkv-hbase)基于 obkv-hbase-client-java 实现,支持通过 Flink SQL 将数据实时写入 OceanBase 的 HBase 模式表。

步骤 1:创建 HBase 表

在 OceanBase 中,HBase 表通过命名规范映射:每个 column family 对应一张物理表,格式为 hbase_table_name$family_name

1
2
3
4
5
6
7
CREATE TABLE `user_info$basic` (
`K` varbinary(1024) NOT NULL, -- rowkey
`Q` varbinary(256) NOT NULL, -- qualifier
`T` bigint(20) NOT NULL, -- timestamp
`V` varbinary(1024) DEFAULT NULL, -- value
PRIMARY KEY (`K`, `Q`, `T`)
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE TABLE user_info_sink (
rowkey STRING,
basic ROW<name STRING, age INT>,
contact ROW<email STRING, phone STRING>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'obkv-hbase',
'odp-mode' = 'true',
'odp-ip' = '127.0.0.1',
'odp-port' = '2885',
'schema-name' = 'your_database',
'table-name' = 'user_info',
'username' = 'user@tenant#cluster',
'password' = 'your_password'
);

步骤 5-6:写入测试数据并验证

1
2
3
INSERT INTO user_info_sink VALUES
('user001', ROW('Alice', 25), ROW('[email protected]', '13800138000')),
('user002', ROW('Bob', 30), ROW('[email protected]', '13900139000'));

配置参数

必需参数

  • connector: 固定值 obkv-hbase
  • username: 格式 user@tenant#cluster
  • password: 用户密码
  • schema-name: 数据库名
  • table-name: HBase 表名

连接模式

Config URL 模式(直连):需要 url、sys.username、sys.password
ODP 模式(代理):设置 odp-mode=true、odp-ip、odp-port

性能调优

  • sync-write: 是否同步写入(默认 false,推荐使用缓冲区)
  • buffer-flush.buffer-size: 缓冲区大小(默认 1000 行)
  • buffer-flush.interval: 刷新间隔(默认 1s)
  • max-retries: 最大重试次数(默认 3)

使用示例

从 Kafka 实时写入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE kafka_source (
user_id STRING, user_name STRING, age INT,
email STRING, phone STRING, event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);

INSERT INTO user_hbase_sink
SELECT user_id,
ROW(user_name, age) AS profile,
ROW(email, phone) AS contact
FROM kafka_source;

最佳实践

性能优化

  • 高吞吐批处理:buffer-size=5000, interval=5s
  • 低延迟实时:buffer-size=500, interval=500ms
  • 超低延迟:sync-write=true

错误排查

  • 连接失败:检查网络、验证用户名密码、确认 ODP 配置
  • 写入失败:检查 Flink 日志、验证表结构、确认权限
  • 性能慢:增大缓冲区、提高并行度、检查集群负载

参考信息

  • OceanBase 官方文档:https://www.oceanbase.com/docs
  • Flink 官方文档:https://nightlies.apache.org/flink/flink-docs-stable/
  • GitHub:https://github.com/oceanbase/flink-connector-oceanbase
  • obkv-hbase-client-java:https://github.com/oceanbase/obkv-hbase-client-java

感谢生态团队孙朝阳老师、赵明远老师的专业指导。