基于 Doris 构建数仓 ODS 层的具体步骤和操作实例,结合多个技术文档和实践经验整理而成:
一、数据源接入与存储设计
1. 数据源识别与接入方式
- 数据源类型:通常包括业务数据库(MySQL/Oracle)、日志文件(JSON/CSV)、Kafka 实时流等。
- 接入工具:
- 实时/增量数据:通过 Flink CDC 或 Kafka + Doris Routine Load 实时同步。
- 批量数据:使用 Doris 的 Broker Load 或 INSERT INTO SELECT 批量导入。
2. 表结构设计
核心原则:
- 保留原始数据:ODS 层不进行复杂加工,仅存储原始数据。
- 分区与分桶:按时间分区(如 dt 字段),分桶键选择高频查询字段(如事件类型)。
- 字段类型优化:根据数据特征选择合适类型(如 VARCHAR(256) 避免空间浪费)。
建表示例(日志数据场景):
CREATE TABLE ods_event_log (
distinct_id VARCHAR(256) NOT NULL COMMENT '用户ID',
event VARCHAR(256) COMMENT '事件类型',
ip VARCHAR(64) COMMENT 'IP地址',
properties STRING COMMENT 'JSON格式事件属性',
dt DATE COMMENT '事件日期'
)
ENGINE=OLAP
DUPLICATE KEY(distinct_id, event, dt)
PARTITION BY RANGE(dt) (
FROM ("2023-01-01") TO ("2025-12-31") INTERVAL 1 DAY
)
DISTRIBUTED BY HASH(event) BUCKETS 8
PROPERTIES (
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.end" = "3"
);
二、数据导入与实时同步
1. Kafka 实时数据接入
通过 Routine Load 实现 Kafka 到 Doris 的持续同步:
CREATE ROUTINE LOAD ods.kafka_to_event_log ON ods_event_log
COLUMNS(
distinct_id, event, ip, properties, dt,
_tmp_time = FROM_UNIXTIME(time/1000), -- 时间戳转换
dt = DATE(_tmp_time) -- 动态生成分区字段
)
PROPERTIES (
"desired_concurrent_number" = "3",
"max_batch_interval" = "10"
)
FROM KAFKA (
"kafka_broker_list" = "kafka-host:9092",
"kafka_topic" = "user_events",
"property.group.id" = "doris_consumer"
);
2. 全量数据初始化
使用 Broker Load 导入历史数据:
LOAD LABEL ods.init_customer_data (
DATA INFILE("hdfs://path/customer_data.csv")
INTO TABLE ods.customer_info
COLUMNS TERMINATED BY ","
(customer_id, name, age, dt)
)
WITH BROKER "hdfs_broker"
PROPERTIES (
"timeout" = "3600"
);
三、数据质量管理
1. 脏数据处理
- 动态分区兜底:为异常时间数据设置默认分区:
ALTER TABLE ods_event_log ADD PARTITION p_overflow VALUES LESS THAN ("2100-01-01");
- 字段容错:使用 STRING 类型存储不确定格式的 JSON 字段(如 properties)。
2. 数据校验
- 数据量核对:通过 SHOW LOAD 查看导入状态,或执行 SELECT COUNT(*) 比对源端与目标端数据量。
- 监控告警:利用 Doris 的审计日志和 Prometheus 监控导入延迟及错误率。
四、分层策略与优化
1. 全量与增量分离
- 全量表:存储历史快照,按业务主键分区(如 customer_id)。
- 增量表:按时间分区(如 dt),通过 WHERE dt >= '2025-03-01' 过滤增量数据。
2. 性能优化
- 动态分区:自动创建未来 3 天分区,避免手动维护。
- 前缀索引:将高频查询字段(如 event)放在 Duplicate Key 前列,加速查询。
五、操作实例:从 MySQL 同步到 ODS
- 创建 ODS 表:
CREATE TABLE ods.order_info (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10,2),
order_time DATETIME,
dt DATE
) DUPLICATE KEY(order_id)
PARTITION BY RANGE(dt) (...);
- 通过 Flink CDC 实时同步:
-- Flink SQL 示例
CREATE TABLE mysql_orders (
order_id BIGINT,
...
) WITH ('connector' = 'mysql-cdc', ...);
CREATE TABLE doris_ods (
dt = DATE_FORMAT(order_time, 'yyyy-MM-dd'),
...
) WITH ('connector' = 'doris', ...);
INSERT INTO doris_ods SELECT * FROM mysql_orders;
总结
ODS 层建设的核心是 轻加工、保原始、重同步,需结合 Doris 的动态分区、Routine Load 等特性实现高效数据接入。实时场景优先选择 Kafka + Flink 链路,批量场景可搭配 Broker Load。数据质量需通过分区兜底、字段容错和监控告警保障。