iis服务器的默认网站wordpress插件 评分
iis服务器的默认网站,wordpress插件 评分,dyndns如何申请免费域名,wordpress 文章tag标签目录 1. 架构规划
1.1 硬件与系统信息
1.2节点分配
1.3 依赖组件 (CDH)
2. 操作系统基础配置 (所有节点)
2.1 检查 CPU AVX2 指令集
2.2 操作系统参数优化 (核心稳定性保障)
2.3 配置 Hosts 映射
2.4 创建目录与授权
3. Doris 安装与环境集成 (Bigdata 用户)
3.1 解决…目录1. 架构规划1.1 硬件与系统信息1.2节点分配1.3 依赖组件 (CDH)2. 操作系统基础配置 (所有节点)2.1 检查 CPU AVX2 指令集2.2 操作系统参数优化 (核心稳定性保障)2.3 配置 Hosts 映射2.4 创建目录与授权3. Doris 安装与环境集成 (Bigdata 用户)3.1 解决 JDK 版本冲突 (关键)3.2 解压 Doris 安装包3.3 集成 CDH 配置文件4. FE (Frontend) 部署4.1 修改 fe.conf4.2 启动与组网5. BE (Backend) 部署5.1 修改 be.conf5.2 启动与注册6. Paimon 数据湖集成 (最终验证方案)6.1 创建 Catalog SQL (生产推荐)6.2 验证查询6.2.1 切换 Catalog 与 数据库6.2.2 查看表列表6.2.3 数据查询验证6.3 方案优势总结 (Why HMS?)7. 部署过程问题排查记录 (Troubleshooting)8. 基础测试(增删改查)8.1 插入数据8.2 更新数据8.3 删除数据9. Paimon外表数据写入Doris内表基础测试9.1 Paimon数据准备9.2 测试代码9.3 验证数据1. 项目背景与技术升级随着业务数据量的激增及对实时性要求的提高现有Paimon OLAP架构面临着更高的性能挑战。尽管早期引入的 Doris 2.x 版本成功解决了大字段存储痛点但为了进一步挖掘数据价值追求极致的查询响应速度与更低的资源消耗项目组计划探索Apache Doris 4.0.1官方最新版本的架构潜力。新版本在异步物化视图、全新的查询优化器Nereids以及湖仓读取管线上进行了颠覆性升级。本次选型调整旨在验证新一代引擎在处理超大规模复杂数据时的性能边界评估其是否能作为下一代核心计算单元进一步提升数仓的流批处理能力。2. 测试目标本次测试旨在对Doris 4.0.1 Paimon 1.1.1进行前瞻性的深度集成验证重点评估新版本特性带来的收益具体包括极速读取性能验证新版本针对 Paimon 格式的 Native Reader 优化效果评估在海量数据扫描场景下的 IO 吞吐提升幅度。复杂计算与回写效能在保留大字段处理优势的前提下对比测试新优化器在执行复杂 ETL 逻辑时的计算加速比以及高并发场景下的数据回写稳定性。3.补充说明Paimon官网支持Doris 2.0.6 及以上版本。对应网址Doris | Apache PaimonDoirs官网支持对Paimon的查询使用 Doris 的分布式计算引擎直接访问 Paimon 数据以实现查询加速数据集成读取Paimon数据并将其写入Doris内部表或使用Doris计算引擎执行ZeroETL不支持数据写回Paimon。对应网址Paimon Catalog - Apache Doris支持Paimon版本为1.0.0调研架构图如下1. 架构规划1.1 硬件与系统信息操作系统: CentOS 7 (CDH 6.3.2 环境混合部署)节点配置:CPU: 10核内存: 14GB (资源紧缺需精细调优)存储: 400GB SSD部署用户:bigdataJava 环境: Doris 4.0 需独立安装 JDK 17这里我安装部署之后的路径为/home/bigdata/doris/jdk-17.0.2Doris官方文档软硬件环境检查 - Apache Doris1.2节点分配前置组件分配IP主机名角色版本10.x.xx.201-10.x.xx.205 10.x.xx.215 10.x.xx.149 10.x.xx.151 10.x.xx.156 10.x.xx.157 10.x.xx.167 10.x.xx.206nd1-nd5 nd6 nd11 nd12 nd13 nd14 nd15 nd16CDH6.3.210.x.xx.201-10.x.xx.205nd1-nd5Paimon1.1.1采用 3 节点混合部署 (FE BE 同节点) 的高可用架构。IP 地址主机名角色规划操作系统用户备注10.x.xx.157nd14FE (Master)BEbigdata引导节点10.x.xx.167nd15FE (Follower)BEbigdata高可用节点10.x.xx.206nd16FE (Follower)BEbigdata高可用节点1.3 依赖组件 (CDH)Hive Metastore:nd1 (10.x.xx.201),nd3 (10.x.xx.203)HDFS NameNode: HA 模式 (Active 节点假设为 nd1)CDH Java 版本: JDK 1.8 (与 Doris 4.0 不兼容需独立安装 JDK 17)2. 操作系统基础配置 (所有节点)执行对象所有 3 台机器nd14、nd15、nd16上执行以下操作。2.1 检查 CPU AVX2 指令集Doris 4.0 强依赖 AVX2。cat /proc/cpuinfo | grep avx2有输出继续下一步。无输出您需要下载 Doris 的x64-noavx2版本安装包否则 BE 启动会报错Illegal instruction。2.2 操作系统参数优化 (核心稳定性保障)针对 14GB 小内存环境必须关闭 Swap 并调大文件句柄。# 1. 永久关闭 Swap (防止内存吃紧时拖死机器) # 临时关闭,这里我使用的临时关闭可根据实际环境选择是否永久关闭 swapoff -a # 永久关闭 sed -i /swap/s/^/#/ /etc/fstab # 2. 修改内核参数 sudo vi /etc/sysctl.conf # 添加 vm.max_map_count2000000 vm.swappiness0 # 生效 sysctl -p# 3. 修改资源限制 vi /etc/security/limits.conf # 添加 * soft nofile 65536 * hard nofile 65536 * soft nproc 65536 * hard nproc 65536注意修改 limits 后必须退出 SSH 重新登录bigdata用户才能生效。注意由于原始的配置均为65535Doris 官方推荐 65536 是为了取个整2的16次方但实际上 65535 对于 Doris 来说没有任何区别。只要这个数值大于 60000Doris 就能非常稳定地运行。因此上述配置可以不用进行配置。2.3 配置 Hosts 映射确保 Doris 节点能解析自身以及 CDH 的节点。由于这里我是在CDH集群节点上选择的节点搭建相应的配置均有因此也可不用进行配置。vi /etc/hosts # Doris 集群 10.x.xx.157 nd14 10.x.xx.167 nd15 10.x.xx.206 nd16 # CDH 依赖 (必须包含 hive-site.xml 中配置的主机名) 10.x.xx.201 nd1 10.x.xx.203 nd32.4 创建目录与授权mkdir -p /home/bigdata/doris mkdir -p /home/bigdata/doris/doris-meta # FE 元数据 mkdir -p /home/bigdata/doris/doris-storage # BE 数据存储 # 移交权限给 bigdata chown -R bigdata:bigdata /home/bigdata/doris3. Doris 安装与环境集成 (Bigdata 用户)执行对象所有 3 台机器执行用户bigdata3.1 解决 JDK 版本冲突 (关键)痛点CDH 环境是 JDK 8但 Doris 4.0.1 强制要求 JDK 17。方案下载免安装版 JDK 17仅供 Doris 内部使用不影响系统环境变量。# 1. 上传或下载 JDK 17 到 /home/bigdata/doris/ 目录 cd /home/bigdata/doris/ # 假设已下载 openjdk-17.0.2_linux-x64_bin.tar.gz # wget https://download.java.net/java/GA/jdk17.0.2/dfd4a8d0985749f896bed50d7138ee7f/8/GPL/openjdk-17.0.2_linux-x64_bin.tar.gz tar -zxvf openjdk-17.0.2_linux-x64_bin.tar.gz # 2. 记录路径 (后续配置要用) # 路径为: /home/bigdata/doris/jdk-17.0.2也可使用网址下载然后上传https://download.java.net/java/GA/jdk17.0.2/dfd4a8d0985749f896bed50d7138ee7f/8/GPL/openjdk-17.0.2_linux-x64_bin.tar.gz下载压缩包成功的截图如下解压之后的结果如下3.2 解压 Doris 安装包cd /home/bigdata/doris tar -zxvf apache-doris-4.0.1-bin-x64.tar.gz cd apache-doris-4.0.1-bin-x64 mv fe /home/bigdata/doris/ mv be /home/bigdata/doris/3.3 集成 CDH 配置文件在nd14上操作将 nd1上CDH 集群的配置文件拉取到 Doris 配置目录。mkdir -p /home/bigdata/doris/conf/cdh_conf/ # 从 CDH 节点 (201) 拷贝 # 拷贝 Hadoop 配置文件 (core-site.xml 和 hdfs-site.xml) scp root10.x.xx.201:/etc/hadoop/conf/core-site.xml /home/bigdata/doris/conf/cdh_conf/ scp root10.x.xx.201:/etc/hadoop/conf/hdfs-site.xml /home/bigdata/doris/conf/cdh_conf/ # 拷贝 Hive 配置文件 (hive-site.xml) scp root10.x.xx.201:/etc/hive/conf/hive-site.xml /home/bigdata/doris/conf/cdh_conf/验证文件ls -l /home/bigdata/doris/conf/cdh_conf/分发到其他节点 (nd15, nd16)将这 3 个文件放到 Doris 2台机器另外2台的统一目录例如/home/bigdata/doris/conf/cdh_conf/。# 1. 确保目标机器也有这个目录 ssh bigdatand15 mkdir -p /home/bigdata/doris/conf/cdh_conf/ ssh bigdatand16 mkdir -p /home/bigdata/doris/conf/cdh_conf/# 2. 发送文件给 nd15 scp /home/bigdata/doris/conf/cdh_conf/* bigdatand15:/home/bigdata/doris/conf/cdh_conf/# 3. 发送文件给 nd16 scp /home/bigdata/doris/conf/cdh_conf/* bigdatand16:/home/bigdata/doris/conf/cdh_conf/4. FE (Frontend) 部署4.1 修改fe.conf操作对象3 台机器。vi /home/bigdata/doris/fe/conf/fe.conf# 1. 元数据目录 meta_dir /home/bigdata/doris/doris-meta # 2. 绑定网段 priority_networks 10.8.15.0/24 # 3. 【关键】指定 JDK 17 路径 (解决 The jdk_version is 8, must be 17 报错) JAVA_HOME /home/bigdata/doris/jdk-17.0.2 # 4. 内存优化 (G1GC 4G堆内存) # 14G 内存机器给 FE 分配 4G预留资源给 BE JAVA_OPTS -Xmx4096m -Xms4096m -XX:UseG1GC -XX:MaxGCPauseMillis200 -Xlog:gc*:$DORIS_HOME/log/fe.gc.log:time,uptimemillis,level,tags4.2 启动与组网1. 启动 Master (10.x.xx.157)/home/bigdata/doris/fe/bin/start_fe.sh --daemon # 停止服务 # /home/bigdata/doris/fe/bin/stop_fe.sh --daemon2. 启动 Followers (10.x.xx.167 和 206)首次启动需指定 Helper/home/bigdata/doris/fe/bin/start_fe.sh --helper 10.x.xx.157:9010 --daemon查看日志确认是否启动成功tail -f /home/bigdata/doris/fe/log/fe.log由上述截图可以得出receive report from be ...:这是最关键的信号。说明 FE 收到了 BE 节点发来的心跳汇报。type: DISK: BE 正在汇报磁盘使用情况。type: CPU: BE 正在汇报 CPU 负载。这意味着FE 和 BE 之间的网络是通的且 BE 进程已存活。BeLoadRebalancer ... get number of low load paths ...:这是 Doris 的均衡调度器在工作。它正在检查是否需要在节点间迁移数据副本。isUrgent false: 表示当前没有紧急的均衡任务正常因为是新集群没有数据倾斜。BinlogManager.gc() ... no gc binlog:这是清理过期的 Binlog 日志属于正常的后台维护任务。3. 注册节点在 157 上使用 MySQL 客户端连接mysql -h 10.x.xx.157 -P 9030 -u root执行 SQLALTER SYSTEM ADD FOLLOWER 10.x.xx.167:9010; ALTER SYSTEM ADD FOLLOWER 10.x.xx.206:9010;SHOW PROC /frontends;验证SHOW PROC /frontends; 确保 3 节点 Alivetrue5. BE (Backend) 部署5.1 修改be.conf操作对象3 台机器。vi /home/bigdata/doris/be/conf/be.conf# 1. 绑定网段 priority_networks 10.8.15.0/24 # 2. 数据目录 storage_root_path /home/bigdata/doris/doris-storage # 3. 【关键】指定 JDK 17 (Paimon 插件依赖) JAVA_HOME /home/bigdata/doris/jdk-17.0.2 # 4. 【关键】端口避让 (CDH NodeManager 占用 8040改为 18040) webserver_port 180405.2 启动与注册1. 启动所有 BE/home/bigdata/doris/be/bin/start_be.sh --daemon # 停止服务 # /home/bigdata/doris/be/bin/stop_be.sh --daemon查看日志确认是否启动成功tail -f /home/bigdata/doris/be/log/be.log由上面截图可以得出success to build all report tablets info:这是最重要的信号。说明 BE 已经成功扫描了本地的数据分片Tablets并准备好向 FE 汇报状态。tablet_count15: 说明它已经管理了一些内部表通常是 Doris 的系统表。Scheduled(every 10s) WAL info:预写日志WAL管理器正在例行检查用于保障数据写入的可靠性。状态正常。query for dictionary status, return 0 rows:这是 BE 内部的字典缓存查询没有报错属于正常的心跳或检测机制。集群状态总结FE 日志: 正常接收 BE 汇报无报错。BE 日志: 正常维护存储定期汇报无报错。进程: FE 和 BE 均已存活。2. 注册 BE在 157 的 MySQL 客户端执行ALTER SYSTEM ADD BACKEND 10.x.xx.157:9050; ALTER SYSTEM ADD BACKEND 10.x.xx.167:9050; ALTER SYSTEM ADD BACKEND 10.x.xx.206:9050;SHOW PROC /backends;验证SHOW PROC /backends; 确保 3 节点 Alivetrue6. Paimon 数据湖集成 (最终验证方案)背景 在对接 CDH 6.3.2 (Hive 2.1.1) 时Doris 默认的 Hive 3 客户端协议会触发Invalid method name报错。 在 Doris 2.x 版本中为了规避此问题通常被迫使用filesystem模式或者在配置了HA模式下hive.metastore.uris参数只设置一个地址当其中一个节点挂掉之后需要手动进行切换。但在 Doris 4.0.1 版本中内核优化了对多版本协议的支持完全修复了 HA 模式下版本参数失效的问题。解决方案 采用HMS (Hive Metastore) 模式配置双节点 HA并通过显式指定hive.version进行协议降级。这是兼顾高性能支持 CBO 优化、高可用与数据一致性支持 ACID的最佳实践方案。6.1 创建 Catalog SQL (生产推荐)在 Doris 4.0.1 中可以放心使用 HA 配置。在 Doris MySQL 客户端执行DROP CATALOG IF EXISTS paimon_catalog; CREATE CATALOG paimon_catalog PROPERTIES ( -- 1. 指定 Catalog 类型为 Paimon type paimon, -- 2. 【核心配置】使用 hms 模式通过 Hive Metastore 管理元数据 -- 相比 filesystem 模式支持权限控制、ACID 事务读取和查询优化 paimon.catalog.type hms, -- 3. 【高可用配置】配置多个 Metastore 地址 (Doris 4.0 已修复 HA 兼容性问题) hive.metastore.uris thrift://nd1:9083,thrift://nd3:9083, -- 4. 【关键兼容修复】 -- 强制指定 Hive 版本为 2.1.1 (对应 CDH 6.3.2)禁用高版本不兼容的 API 调用 hive.version 2.1.1, -- 5. 指定数仓物理路径 warehouse hdfs://nd1:8020/user/hive/warehouse, -- 6. 加载 Hadoop 配置文件 (用于读取 HDFS HA、Kerberos 等配置) hadoop.conf.dir /home/bigdata/doris/conf/cdh_conf/, hadoop.username hdfs );注意此处paimon.catalog.type必须填写为hms不可使用hive旧版写法或filesystem。6.2 验证查询6.2.1 切换 Catalog 与 数据库SWITCH paimon_catalog; SHOW DATABASES;结果如下6.2.2 查看表列表USE ods; SHOW TABLES;结果如下6.2.3 数据查询验证SELECT * FROM t_admin_division_code LIMIT 5;结果如下6.3 方案优势总结 (Why HMS?)相比之前的 Filesystem 方案当前方案具有显著优势高可用性 (HA)配置了nd1和nd3双 Metastore 节点任意单点故障不影响 Doris 业务查询。性能优化 (CBO)Doris 可以从 HMS 获取表的行数、文件大小等统计信息生成更优的 Join 执行计划。数据准确性HMS 模式能正确识别 Paimon/Hive 的 ACID 事务状态避免读取到未提交或已删除的脏数据。运维规范统一通过 Metastore 管理元数据符合数仓建设标准。对于Doris 2.1.10的所有方案均可以在Doris 4.0.1里面进行查看相应的截图如下方案一【Doris 2.1.10】DROP CATALOG IF EXISTS paimon_catalog; CREATE CATALOG paimon_catalog PROPERTIES ( type paimon, paimon.catalog.type hms, hive.metastore.uris thrift://nd1:9083,thrift://nd3:9083, warehouse hdfs://nd1:8020/user/hive/warehouse, hadoop.conf.dir /home/bigdata/doris/conf/cdh_conf/, hadoop.username hdfs, -- 【关键修复】显式指定 Hive 版本禁止调用 Hive 3 的新 API hive.version 2.1.1 ); SWITCH paimon_catalog; USE ods; SELECT * FROM t_admin_division_code LIMIT 5;方案二【Doris 2.1.10】DROP CATALOG IF EXISTS paimon_catalog; CREATE CATALOG paimon_catalog PROPERTIES ( type paimon, paimon.catalog.type hms, hive.metastore.uris thrift://nd1:9083,thrift://nd3:9083, warehouse hdfs://nd1:8020/user/hive/warehouse, hadoop.conf.dir /home/bigdata/doris/conf/cdh_conf/, hadoop.username hdfs, -- 【关键修复】显式指定 Hive 版本禁止调用 Hive 3 的新 API hive.version 1.1.0 ); SWITCH paimon_catalog; USE ods; SELECT * FROM t_admin_division_code LIMIT 5;方案三【Doris 2.1.10】DROP CATALOG IF EXISTS paimon_catalog; CREATE CATALOG paimon_catalog PROPERTIES ( type paimon, paimon.catalog.type filesystem, -- 直接指向 HDFS 上的数仓根目录 (注意如果 nameservice 未解析直接写 active namenode 地址) warehouse hdfs://nd1:8020/user/hive/warehouse, hadoop.conf.dir /home/bigdata/doris/conf/cdh_conf/, hadoop.username hdfs ); SWITCH paimon_catalog; USE ods; SELECT * FROM t_admin_division_code LIMIT 5;方案四【Doris 2.1.10】DROP CATALOG IF EXISTS paimon_catalog; CREATE CATALOG paimon_catalog PROPERTIES ( type paimon, -- 【这里是修改点】将 hive 改为 hms paimon.catalog.type hms, -- 指定 HMS 地址 hive.metastore.uris thrift://nd1:9083, -- 【核心兼容配置】保持不变解决 CDH 兼容性 hive.version 2.1.1, -- 数仓路径 warehouse hdfs://nd1:8020/user/hive/warehouse, -- 复用配置文件 hadoop.conf.dir /home/bigdata/doris/conf/cdh_conf/, hadoop.username hdfs ); SWITCH paimon_catalog; USE ods; SELECT * FROM t_admin_division_code LIMIT 5;7. 部署过程问题排查记录 (Troubleshooting)在本次部署中我们遇到了以下关键问题并已解决#问题现象报错关键信息根本原因解决方案1FE 启动失败The jdk_version is 8, must be 17CDH 环境默认是 JDK 8SelectDB 4.0 强制要求 JDK 17。下载解压 JDK 17在fe.conf和be.conf中显式配置JAVA_HOME指向新 JDK 路径。2权限拒绝fe.out: Permission denied曾使用 root 用户启动过 Doris导致日志文件归属变为 root切回 bigdata 后无法写入。使用 root 执行chown -R bigdata:bigdata /home/bigdata/doris修复所有权并杀掉残留的 root 进程。3端口冲突tcp listen failed, errno98(8040)CDH 的 YARN NodeManager 占用了 8040 端口。修改be.conf设置webserver_port 18040。5内存隐患(无报错但在高负载下可能死机)14GB 内存下BE 默认尝试占用 90% 内存会挤压 FE 和 OS 空间。在be.conf中强制设置mem_limit 60%。8. 基础测试(增删改查)将下述测试代码保存为doris4_test.py【python解释器3.8.20、windows系统11】# -*- coding: utf-8 -*- import pymysql import random import time import logging import functools from sshtunnel import SSHTunnelForwarder # 配置信息 # 1. SSH 连接配置 (连接到 Doris FE 所在的服务器) # 您提供的 IP 列表: 10.x.xx.157, 10.x.xx.167, 10.x.xx.206 SSH_HOST 10.x.xx.157 # 选一个 FE 节点 (Master) SSH_PORT 22 SSH_USER xxxxxx # CDH 环境通常使用 bigdata 用户 SSH_PASSWORD xxxxxxxxxxxxxxxxxxxxx # 您提供的密码 # 2. Doris 数据库连接配置 DORIS_LOCAL_HOST 127.0.0.1 # 隧道建立后对本地而言就是连本机 DORIS_QUERY_PORT 9030 # Doris MySQL 协议查询端口 DORIS_DB_USER root DORIS_DB_PWD # 默认无密码 DB_NAME doris4_test_db TABLE_NAME user_profile_v4 LOG_FILE doris4_test.log # 日志与工具模块 def setup_logger(): logger logging.getLogger(Doris4Tester) logger.setLevel(logging.INFO) if logger.hasHandlers(): logger.handlers.clear() formatter logging.Formatter(%(asctime)s - %(levelname)s - %(message)s) # 文件日志 file_handler logging.FileHandler(LOG_FILE, modew, encodingutf-8) file_handler.setFormatter(formatter) # 控制台日志 console_handler logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger logger setup_logger() def measure_time(func): 装饰器用于记录函数执行耗时 functools.wraps(func) def wrapper(*args, **kwargs): start_time time.time() logger.info(f正在执行: [{func.__name__}] ...) try: result func(*args, **kwargs) duration time.time() - start_time logger.info(f执行完成: [{func.__name__}] | 耗时: {duration:.4f} 秒) return result except Exception as e: duration time.time() - start_time logger.error(f执行失败: [{func.__name__}] | 耗时: {duration:.4f} 秒 | 错误: {e}) raise e return wrapper # Doris 4.0 核心业务逻辑 measure_time def init_db_and_table(cursor): 初始化数据库和表 (适配 Doris 4.0 Unique Key 模型) cursor.execute(fCREATE DATABASE IF NOT EXISTS {DB_NAME}) cursor.execute(fUSE {DB_NAME}) # Doris 4.0 建表最佳实践 # 1. 使用 Unique Key 模型 # 2. 开启 enable_unique_key_merge_on_write (MoW) 实现类 MySQL 的高性能更新 # 3. replication_num 1 (测试环境节省资源生产建议 3) create_sql f CREATE TABLE IF NOT EXISTS {TABLE_NAME} ( user_id INT COMMENT 用户ID, username VARCHAR(50) COMMENT 用户名, age INT COMMENT 年龄, city VARCHAR(20) COMMENT 城市, balance DECIMAL(10, 2) COMMENT 余额, create_time DATETIME(3) COMMENT 创建时间(毫秒精度) ) UNIQUE KEY(user_id) DISTRIBUTED BY HASH(user_id) BUCKETS 1 PROPERTIES ( replication_num 1, enable_unique_key_merge_on_write true, store_row_column true ); # store_row_columntrue 是 4.0 特性优化部分列更新和点查性能 cursor.execute(create_sql) # 每次测试前清空表保证环境纯净 cursor.execute(fTRUNCATE TABLE {TABLE_NAME}) logger.info(f数据库 {DB_NAME} 和表 {TABLE_NAME} (Unique Key MoW) 已初始化) measure_time def insert_data(cursor, count10): 批量插入数据 logger.info(f准备插入 {count} 条数据...) data_list [] cities [Beijing, Shanghai, Guangzhou, Shenzhen, Chengdu] for i in range(1, count 1): user_id i username fUser_{i:03d} age random.randint(20, 60) city random.choice(cities) balance round(random.uniform(100.0, 5000.0), 2) # 模拟当前时间 create_time time.strftime(%Y-%m-%d %H:%M:%S) data_list.append((user_id, username, age, city, balance, create_time)) sql fINSERT INTO {TABLE_NAME} VALUES (%s, %s, %s, %s, %s, %s) cursor.executemany(sql, data_list) logger.info(f成功插入 {cursor.rowcount} 条数据) measure_time def update_data(cursor): 随机更新数据 (Doris 4.0 MoW 模型支持高效 UPDATE) # 1. 先查出所有 ID cursor.execute(fSELECT user_id FROM {TABLE_NAME}) all_ids [row[user_id] for row in cursor.fetchall()] if not all_ids: logger.warning(表中无数据跳过更新) return # 2. 随机选 3 个 ID 更新 target_ids random.sample(all_ids, min(len(all_ids), 3)) logger.info(f随机选中 ID 进行更新: {target_ids}) for uid in target_ids: new_balance 9999.99 new_city Updated_City # 标准 MySQL 更新语法 sql fUPDATE {TABLE_NAME} SET balance %s, city %s WHERE user_id %s cursor.execute(sql, (new_balance, new_city, uid)) logger.info(f - 已更新 ID{uid}: Balance设为 {new_balance}, City设为 {new_city}) measure_time def delete_data(cursor): 随机删除数据 # 1. 先查出所有 ID cursor.execute(fSELECT user_id FROM {TABLE_NAME}) all_ids [row[user_id] for row in cursor.fetchall()] if not all_ids: logger.warning(表中无数据跳过删除) return # 2. 随机选 2 个 ID 删除 target_ids random.sample(all_ids, min(len(all_ids), 2)) logger.info(f随机选中 ID 进行删除: {target_ids}) # 使用 IN 语法批量删除 format_strings ,.join([%s] * len(target_ids)) sql fDELETE FROM {TABLE_NAME} WHERE user_id IN ({format_strings}) cursor.execute(sql, tuple(target_ids)) logger.info(f - 删除操作完成受影响行数: {cursor.rowcount}) measure_time def query_final_result(cursor): 查询并展示最终结果 sql fSELECT * FROM {TABLE_NAME} ORDER BY user_id cursor.execute(sql) results cursor.fetchall() logger.info(- * 50) logger.info(f最终表数据 (总行数: {len(results)}):) logger.info(f{ID:5} {Name:10} {Age:5} {City:15} {Balance:10}) logger.info(- * 50) for row in results: # 这里的 row 是字典因为 connect 时指定了 DictCursor logger.info(f{row[user_id]:5} {row[username]:10} {row[age]:5} {row[city]:15} {row[balance]:10}) logger.info(- * 50) # 主程序入口 def main(): ssh_tunnel None db_conn None try: logger.info( 1. 正在建立 SSH 隧道 ...) # 配置 SSH 隧道本地随机端口 - SSH(10.8.15.157) - Doris FE(127.0.0.1:9030) ssh_tunnel SSHTunnelForwarder( (SSH_HOST, SSH_PORT), ssh_usernameSSH_USER, ssh_passwordSSH_PASSWORD, remote_bind_address(DORIS_LOCAL_HOST, DORIS_QUERY_PORT) ) ssh_tunnel.start() logger.info(f SSH 隧道建立成功! 本地映射端口: {ssh_tunnel.local_bind_port}) logger.info( 2. 正在连接 Doris 4.0.1 数据库 ...) db_conn pymysql.connect( host127.0.0.1, # 连接本机 portssh_tunnel.local_bind_port, # 隧道端口 userDORIS_DB_USER, passwordDORIS_DB_PWD, charsetutf8mb4, cursorclasspymysql.cursors.DictCursor, # 返回字典格式数据方便读取 autocommitTrue # 开启自动提交这对 Doris 很重要 ) cursor db_conn.cursor() # --- 执行测试步骤 --- logger.info( 3. 开始执行 CRUD 测试流程) # 1. 建库建表 init_db_and_table(cursor) # 2. 插入 10 条数据 insert_data(cursor, count10) time.sleep(2) # 稍作等待确保数据版本提交(虽然4.0很快但为了演示效果) # 3. 随机更新 update_data(cursor) time.sleep(1) # 4. 随机删除 delete_data(cursor) time.sleep(1) # 5. 查询结果 query_final_result(cursor) logger.info( 4. 测试全部通过Doris 4.0.1 运行正常。) except Exception as e: logger.error(f❌ 测试过程中发生错误: {e}) import traceback logger.error(traceback.format_exc()) finally: # 资源清理 if db_conn: db_conn.close() logger.info(数据库连接已关闭) if ssh_tunnel: ssh_tunnel.stop() logger.info(SSH 隧道已关闭) if __name__ __main__: main()对应log文件记录如下2025-12-09 11:46:53,544 - INFO - 1. 正在建立 SSH 隧道 ... 2025-12-09 11:46:53,830 - INFO - SSH 隧道建立成功! 本地映射端口: 64441 2025-12-09 11:46:53,831 - INFO - 2. 正在连接 Doris 4.0.1 数据库 ... 2025-12-09 11:46:54,028 - INFO - 3. 开始执行 CRUD 测试流程 2025-12-09 11:46:54,028 - INFO - 正在执行: [init_db_and_table] ... 2025-12-09 11:46:54,044 - INFO - 数据库 doris4_test_db 和表 user_profile_v4 (Unique Key MoW) 已初始化 2025-12-09 11:46:54,044 - INFO - 执行完成: [init_db_and_table] | 耗时: 0.0165 秒 2025-12-09 11:46:54,045 - INFO - 正在执行: [insert_data] ... 2025-12-09 11:46:54,045 - INFO - 准备插入 100 条数据... 2025-12-09 11:46:54,188 - INFO - 成功插入 100 条数据 2025-12-09 11:46:54,188 - INFO - 执行完成: [insert_data] | 耗时: 0.1437 秒 2025-12-09 11:46:56,189 - INFO - 正在执行: [update_data] ... 2025-12-09 11:46:56,224 - INFO - 随机选中 ID 进行更新: [46, 71, 41] 2025-12-09 11:46:56,275 - INFO - - 已更新 ID46: Balance设为 9999.99, City设为 Updated_City 2025-12-09 11:46:56,326 - INFO - - 已更新 ID71: Balance设为 9999.99, City设为 Updated_City 2025-12-09 11:46:56,387 - INFO - - 已更新 ID41: Balance设为 9999.99, City设为 Updated_City 2025-12-09 11:46:56,388 - INFO - 执行完成: [update_data] | 耗时: 0.1991 秒 2025-12-09 11:46:57,388 - INFO - 正在执行: [delete_data] ... 2025-12-09 11:46:57,405 - INFO - 随机选中 ID 进行删除: [58, 50] 2025-12-09 11:46:57,457 - INFO - - 删除操作完成受影响行数: 2 2025-12-09 11:46:57,457 - INFO - 执行完成: [delete_data] | 耗时: 0.0691 秒 2025-12-09 11:46:58,458 - INFO - 正在执行: [query_final_result] ... 2025-12-09 11:46:58,483 - INFO - -------------------------------------------------- 2025-12-09 11:46:58,483 - INFO - 最终表数据 (总行数: 98): 2025-12-09 11:46:58,484 - INFO - ID Name Age City Balance 2025-12-09 11:46:58,484 - INFO - -------------------------------------------------- 2025-12-09 11:46:58,484 - INFO - 1 User_001 32 Guangzhou 4987.09 2025-12-09 11:46:58,484 - INFO - 2 User_002 55 Shanghai 4799.23 2025-12-09 11:46:58,484 - INFO - 3 User_003 29 Guangzhou 3795.66 2025-12-09 11:46:58,484 - INFO - 4 User_004 21 Shanghai 1065.80 2025-12-09 11:46:58,484 - INFO - 5 User_005 48 Beijing 3250.84 2025-12-09 11:46:58,484 - INFO - 6 User_006 33 Shenzhen 2121.31 2025-12-09 11:46:58,484 - INFO - 7 User_007 39 Guangzhou 1792.82 2025-12-09 11:46:58,485 - INFO - 8 User_008 21 Chengdu 350.23 2025-12-09 11:46:58,485 - INFO - 9 User_009 59 Chengdu 4108.27 2025-12-09 11:46:58,485 - INFO - 10 User_010 57 Shanghai 2674.18 2025-12-09 11:46:58,485 - INFO - 11 User_011 43 Guangzhou 4362.53 2025-12-09 11:46:58,485 - INFO - 12 User_012 24 Chengdu 845.92 2025-12-09 11:46:58,485 - INFO - 13 User_013 47 Shenzhen 3597.69 2025-12-09 11:46:58,485 - INFO - 14 User_014 53 Shenzhen 1159.92 2025-12-09 11:46:58,485 - INFO - 15 User_015 48 Shanghai 4159.49 2025-12-09 11:46:58,485 - INFO - 16 User_016 30 Guangzhou 3863.11 2025-12-09 11:46:58,485 - INFO - 17 User_017 30 Guangzhou 2356.23 2025-12-09 11:46:58,485 - INFO - 18 User_018 43 Beijing 4038.47 2025-12-09 11:46:58,485 - INFO - 19 User_019 33 Shenzhen 3465.97 2025-12-09 11:46:58,486 - INFO - 20 User_020 33 Guangzhou 2548.67 2025-12-09 11:46:58,486 - INFO - 21 User_021 47 Shanghai 2104.14 2025-12-09 11:46:58,486 - INFO - 22 User_022 44 Beijing 4932.08 2025-12-09 11:46:58,486 - INFO - 23 User_023 60 Guangzhou 2993.93 2025-12-09 11:46:58,486 - INFO - 24 User_024 57 Chengdu 1831.64 2025-12-09 11:46:58,486 - INFO - 25 User_025 26 Shenzhen 2478.94 2025-12-09 11:46:58,486 - INFO - 26 User_026 31 Shanghai 4901.88 2025-12-09 11:46:58,486 - INFO - 27 User_027 42 Shanghai 1422.68 2025-12-09 11:46:58,486 - INFO - 28 User_028 46 Shenzhen 2586.16 2025-12-09 11:46:58,486 - INFO - 29 User_029 31 Guangzhou 3395.99 2025-12-09 11:46:58,486 - INFO - 30 User_030 50 Shanghai 657.19 2025-12-09 11:46:58,486 - INFO - 31 User_031 51 Shenzhen 4879.95 2025-12-09 11:46:58,487 - INFO - 32 User_032 58 Guangzhou 1523.34 2025-12-09 11:46:58,487 - INFO - 33 User_033 48 Shanghai 2711.63 2025-12-09 11:46:58,487 - INFO - 34 User_034 38 Shanghai 1920.85 2025-12-09 11:46:58,487 - INFO - 35 User_035 31 Shanghai 1700.61 2025-12-09 11:46:58,487 - INFO - 36 User_036 56 Chengdu 2682.61 2025-12-09 11:46:58,487 - INFO - 37 User_037 55 Shenzhen 1431.78 2025-12-09 11:46:58,487 - INFO - 38 User_038 31 Shenzhen 4727.04 2025-12-09 11:46:58,487 - INFO - 39 User_039 32 Chengdu 3227.39 2025-12-09 11:46:58,487 - INFO - 40 User_040 43 Shenzhen 3663.79 2025-12-09 11:46:58,487 - INFO - 41 User_041 50 Updated_City 9999.99 2025-12-09 11:46:58,487 - INFO - 42 User_042 43 Guangzhou 1648.04 2025-12-09 11:46:58,487 - INFO - 43 User_043 35 Shanghai 3318.13 2025-12-09 11:46:58,487 - INFO - 44 User_044 48 Chengdu 2464.68 2025-12-09 11:46:58,488 - INFO - 45 User_045 28 Shenzhen 4477.58 2025-12-09 11:46:58,488 - INFO - 46 User_046 31 Updated_City 9999.99 2025-12-09 11:46:58,488 - INFO - 47 User_047 59 Shenzhen 3873.40 2025-12-09 11:46:58,488 - INFO - 48 User_048 55 Beijing 4772.47 2025-12-09 11:46:58,488 - INFO - 49 User_049 50 Shenzhen 1199.26 2025-12-09 11:46:58,488 - INFO - 51 User_051 59 Beijing 1975.62 2025-12-09 11:46:58,488 - INFO - 52 User_052 52 Beijing 309.98 2025-12-09 11:46:58,488 - INFO - 53 User_053 34 Shenzhen 1315.21 2025-12-09 11:46:58,488 - INFO - 54 User_054 40 Guangzhou 4976.19 2025-12-09 11:46:58,488 - INFO - 55 User_055 59 Shenzhen 2495.20 2025-12-09 11:46:58,488 - INFO - 56 User_056 38 Shanghai 2183.50 2025-12-09 11:46:58,489 - INFO - 57 User_057 47 Shanghai 3532.53 2025-12-09 11:46:58,489 - INFO - 59 User_059 29 Guangzhou 3959.38 2025-12-09 11:46:58,489 - INFO - 60 User_060 57 Shenzhen 2794.60 2025-12-09 11:46:58,489 - INFO - 61 User_061 44 Guangzhou 1043.38 2025-12-09 11:46:58,489 - INFO - 62 User_062 44 Beijing 1445.02 2025-12-09 11:46:58,489 - INFO - 63 User_063 34 Chengdu 2018.03 2025-12-09 11:46:58,489 - INFO - 64 User_064 30 Beijing 1325.72 2025-12-09 11:46:58,489 - INFO - 65 User_065 60 Shenzhen 2405.65 2025-12-09 11:46:58,489 - INFO - 66 User_066 24 Shanghai 1521.32 2025-12-09 11:46:58,489 - INFO - 67 User_067 47 Beijing 3320.86 2025-12-09 11:46:58,489 - INFO - 68 User_068 39 Shenzhen 2205.58 2025-12-09 11:46:58,490 - INFO - 69 User_069 43 Beijing 4372.42 2025-12-09 11:46:58,490 - INFO - 70 User_070 48 Guangzhou 1719.70 2025-12-09 11:46:58,490 - INFO - 71 User_071 55 Updated_City 9999.99 2025-12-09 11:46:58,490 - INFO - 72 User_072 58 Shanghai 4531.46 2025-12-09 11:46:58,490 - INFO - 73 User_073 47 Shanghai 1505.61 2025-12-09 11:46:58,490 - INFO - 74 User_074 30 Beijing 1342.90 2025-12-09 11:46:58,490 - INFO - 75 User_075 31 Shanghai 1321.63 2025-12-09 11:46:58,490 - INFO - 76 User_076 60 Chengdu 2761.57 2025-12-09 11:46:58,490 - INFO - 77 User_077 33 Shanghai 3493.49 2025-12-09 11:46:58,490 - INFO - 78 User_078 37 Shanghai 254.65 2025-12-09 11:46:58,490 - INFO - 79 User_079 29 Beijing 2223.87 2025-12-09 11:46:58,491 - INFO - 80 User_080 46 Chengdu 4061.74 2025-12-09 11:46:58,491 - INFO - 81 User_081 26 Shanghai 1116.46 2025-12-09 11:46:58,491 - INFO - 82 User_082 22 Guangzhou 3541.36 2025-12-09 11:46:58,491 - INFO - 83 User_083 43 Beijing 1379.45 2025-12-09 11:46:58,491 - INFO - 84 User_084 47 Beijing 4405.15 2025-12-09 11:46:58,491 - INFO - 85 User_085 22 Guangzhou 328.10 2025-12-09 11:46:58,491 - INFO - 86 User_086 37 Shanghai 1957.66 2025-12-09 11:46:58,491 - INFO - 87 User_087 44 Shenzhen 4303.43 2025-12-09 11:46:58,491 - INFO - 88 User_088 24 Shenzhen 2125.09 2025-12-09 11:46:58,491 - INFO - 89 User_089 57 Shenzhen 3759.54 2025-12-09 11:46:58,491 - INFO - 90 User_090 45 Guangzhou 4788.86 2025-12-09 11:46:58,492 - INFO - 91 User_091 46 Beijing 683.87 2025-12-09 11:46:58,492 - INFO - 92 User_092 56 Chengdu 1321.36 2025-12-09 11:46:58,492 - INFO - 93 User_093 36 Shanghai 4819.07 2025-12-09 11:46:58,492 - INFO - 94 User_094 28 Beijing 4583.63 2025-12-09 11:46:58,492 - INFO - 95 User_095 29 Shenzhen 2064.74 2025-12-09 11:46:58,492 - INFO - 96 User_096 52 Beijing 3397.31 2025-12-09 11:46:58,492 - INFO - 97 User_097 33 Shanghai 3227.49 2025-12-09 11:46:58,492 - INFO - 98 User_098 27 Beijing 1809.42 2025-12-09 11:46:58,492 - INFO - 99 User_099 41 Chengdu 2890.17 2025-12-09 11:46:58,492 - INFO - 100 User_100 41 Chengdu 3228.25 2025-12-09 11:46:58,493 - INFO - -------------------------------------------------- 2025-12-09 11:46:58,493 - INFO - 执行完成: [query_final_result] | 耗时: 0.0348 秒 2025-12-09 11:46:58,493 - INFO - 4. 测试全部通过Doris 4.0.1 运行正常。 2025-12-09 11:46:58,493 - INFO - 数据库连接已关闭 2025-12-09 11:46:58,512 - INFO - SSH 隧道已关闭去nd14执行下述语句进行查看mysql -h 10.x.xx.157 -P 9030 -urootshow databases; use doris4_test_db; show tables;select * from user_profile_v4;8.1 插入数据这里展示部分数据可以看出插入了100条测试数据8.2 更新数据控制台打印结果终端查看结果8.3 删除数据控制台打印结果终端验证结果9. Paimon外表数据写入Doris内表基础测试9.1 Paimon数据准备对于Paimon外表数据写入Doris内表基础测试需要提前在Flink SQL会话里面创建Paimon表并插入测试数据-- 1. 创建 Flink 端的 Paimon Catalog CREATE CATALOG paimon_catalog WITH ( type paimon, warehouse hdfs:///user/hive/warehouse, metastore hive, hive-conf-dir /etc/hive/conf.cloudera.hive ); -- 2. 切换 Catalog 和 Database USE CATALOG my_paimon; CREATE DATABASE IF NOT EXISTS ods; USE ods; -- 3. 创建 Paimon 表 (源表) -- 这是一个记录用户行为的日志表 CREATE TABLE IF NOT EXISTS paimon_source_event ( user_id INT, item_id INT, behavior STRING, dt STRING, ts TIMESTAMP(3), PRIMARY KEY (dt, user_id, item_id) NOT ENFORCED ) PARTITIONED BY (dt) WITH ( bucket 1, file.format parquet ); -- 4. 写入测试数据 (Batch 模式写入) INSERT INTO paimon_source_event VALUES (1001, 501, click, 2025-12-12, TIMESTAMP 2025-12-12 10:00:00.123), (1002, 502, view, 2025-12-12, TIMESTAMP 2025-12-12 10:05:00.456), (1003, 501, buy, 2025-12-12, TIMESTAMP 2025-12-12 10:10:00.789), (1001, 503, view, 2025-12-13, TIMESTAMP 2025-12-13 11:00:00.000), (1004, 501, click, 2025-12-13, TIMESTAMP 2025-12-13 11:05:00.000);9.2 测试代码将下述测试代码保存为doris4_paimon_tel_test.py【python解释器3.8.20、windows系统11】# -*- coding: utf-8 -*- import pymysql import time import logging import functools from sshtunnel import SSHTunnelForwarder # 配置信息 # 1. SSH 连接信息 (使用 Doris 4.0 所在的节点 IP) # 参考 doris4_test.py 的配置 SSH_HOST 10.x.xx.157 # Doris 4.0 FE Master SSH_PORT 22 SSH_USER xxxxxx SSH_PASSWORD xxxxxxxxxxxxxxxxx # 2. Doris 数据库连接信息 DORIS_LOCAL_HOST 127.0.0.1 DORIS_QUERY_PORT 9030 DORIS_DB_USER root DORIS_DB_PWD # 3. Paimon Catalog 配置 # 【重要提示】: 请确保 Doris 4.0 的节点上该路径下也有 core-site.xml/hdfs-site.xml CATALOG_PROPS { type: paimon, paimon.catalog.type: hms, hive.metastore.uris: thrift://nd1:9083,thrift://nd3:9083, # HMS 地址保持不变 hive.version: 2.1.1, # 保持兼容性配置 warehouse: hdfs://nd1:8020/user/hive/warehouse, hadoop.conf.dir: /home/bigdata/doris/conf/cdh_conf/, hadoop.username: hdfs } # 4. 业务配置 PAIMON_CATALOG_NAME paimon_catalog PAIMON_DB ods PAIMON_TABLE paimon_source_event # 使用新的数据库名区分测试 DORIS_TEST_DB doris4_paimon_test_db DORIS_TARGET_TABLE doris4_target_event_sink LOG_FILE doris4_paimon_etl_report.log # 日志与工具模块 def setup_logger(): logger logging.getLogger(Doris4PaimonTester) logger.setLevel(logging.INFO) if logger.hasHandlers(): logger.handlers.clear() formatter logging.Formatter(%(asctime)s - %(levelname)s - %(message)s) file_handler logging.FileHandler(LOG_FILE, modew, encodingutf-8) file_handler.setFormatter(formatter) console_handler logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger logger setup_logger() def measure_time(func): functools.wraps(func) def wrapper(*args, **kwargs): start_time time.time() logger.info(f正在执行: [{func.__name__}] ...) try: result func(*args, **kwargs) duration time.time() - start_time logger.info(f执行完成: [{func.__name__}] | 耗时: {duration:.4f} 秒) return result except Exception as e: duration time.time() - start_time logger.error(f执行失败: [{func.__name__}] | 耗时: {duration:.4f} 秒 | 错误: {e}) raise e return wrapper # 核心测试逻辑 measure_time def init_doris_catalog(cursor): 在 Doris 4.0 中初始化 Paimon Catalog logger.info(f正在初始化 Doris Catalog: {PAIMON_CATALOG_NAME} ...) # 删除旧的 Catalog (如果存在) cursor.execute(fDROP CATALOG IF EXISTS {PAIMON_CATALOG_NAME}) # 拼接创建语句 props_str ,\n.join([f{k} {v} for k, v in CATALOG_PROPS.items()]) create_sql f CREATE CATALOG {PAIMON_CATALOG_NAME} PROPERTIES ( {props_str} ); logger.info(发送 Create Catalog 请求...) cursor.execute(create_sql) logger.info(Catalog 创建成功) time.sleep(2) # 稍微等待元数据同步 measure_time def check_paimon_source(cursor): 验证 Doris 是否能通过 Catalog 读取 Paimon 数据 logger.info(f检查 Paimon 数据源: {PAIMON_CATALOG_NAME}.{PAIMON_DB}.{PAIMON_TABLE}) # 1. 切换到 Paimon Catalog cursor.execute(fSWITCH {PAIMON_CATALOG_NAME}) # 2. 检查表是否存在 cursor.execute(fUSE {PAIMON_DB}) cursor.execute(SHOW TABLES) # Doris 不同版本 fetchall 返回结构可能微调这里做通用处理 tables [list(row.values())[0] for row in cursor.fetchall()] if PAIMON_TABLE not in tables: logger.warning(f当前 Catalog 下的表: {tables}) raise Exception(fPaimon 表 {PAIMON_TABLE} 未找到) # 3. 预览数据 sql fSELECT * FROM {PAIMON_TABLE} ORDER BY dt, user_id LIMIT 5 cursor.execute(sql) results cursor.fetchall() logger.info(fPaimon 数据预览 (前5条):) for row in results: logger.info(row) if not results: raise Exception(Paimon 表为空请先在 Flink 端写入数据) return len(results) measure_time def create_doris_target_table(cursor): 创建 Doris 4.0 内部表 (Unique Key MoW 模型) cursor.execute(SWITCH internal) cursor.execute(fCREATE DATABASE IF NOT EXISTS {DORIS_TEST_DB}) cursor.execute(fUSE {DORIS_TEST_DB}) # 注意Unique Key 模型要求 Key 字段必须排在 Value 字段前面 # Key: user_id, item_id, dt # Value: behavior, ts create_sql f CREATE TABLE IF NOT EXISTS {DORIS_TARGET_TABLE} ( user_id INT COMMENT 用户ID, item_id INT COMMENT 商品ID, dt VARCHAR(20) COMMENT 日期分区, behavior VARCHAR(50) COMMENT 行为类型, ts DATETIME(3) COMMENT 时间戳 ) UNIQUE KEY(user_id, item_id, dt) PARTITION BY LIST(dt) ( PARTITION p20251212 VALUES IN (2025-12-12), PARTITION p20251213 VALUES IN (2025-12-13) ) DISTRIBUTED BY HASH(user_id) BUCKETS 1 PROPERTIES ( replication_num 1, enable_unique_key_merge_on_write true, store_row_column true ); # store_row_column true 是 4.0 的特性用于优化点查 cursor.execute(create_sql) cursor.execute(fTRUNCATE TABLE {DORIS_TARGET_TABLE}) logger.info(fDoris 4.0 内表 {DORIS_TARGET_TABLE} 已准备就绪) measure_time def execute_etl_paimon_to_doris(cursor): 执行 INSERT INTO ... SELECT ... logger.info( 开始执行从 Paimon 到 Doris 4.0 的数据导入 (ETL) ) # 显式指定字段顺序防止 select * 顺序不一致 etl_sql f INSERT INTO internal.{DORIS_TEST_DB}.{DORIS_TARGET_TABLE} (user_id, item_id, dt, behavior, ts) SELECT user_id, item_id, dt, behavior, ts FROM {PAIMON_CATALOG_NAME}.{PAIMON_DB}.{PAIMON_TABLE} cursor.execute(etl_sql) logger.info(ETL SQL 提交完毕) measure_time def verify_data_consistency(cursor): 验证数据一致性 logger.info( 开始数据一致性校验 ) # 1. Paimon 源数据量 cursor.execute(fSELECT count(*) as cnt FROM {PAIMON_CATALOG_NAME}.{PAIMON_DB}.{PAIMON_TABLE}) paimon_count cursor.fetchone()[cnt] # 2. Doris 目标数据量 cursor.execute(fSELECT count(*) as cnt FROM internal.{DORIS_TEST_DB}.{DORIS_TARGET_TABLE}) doris_count cursor.fetchone()[cnt] logger.info(fPaimon 源表行数: {paimon_count}) logger.info(fDoris 目标表行数: {doris_count}) if paimon_count doris_count: logger.info(✅ 测试通过数据条数一致) else: logger.error(❌ 测试失败数据条数不一致) # 3. 抽样展示 cursor.execute(fSELECT * FROM internal.{DORIS_TEST_DB}.{DORIS_TARGET_TABLE} ORDER BY dt, user_id LIMIT 3) rows cursor.fetchall() logger.info(Doris 内表数据抽样:) for row in rows: logger.info(row) # 主流程 def main_process(): server None conn None try: logger.info( 1. 正在建立 SSH 隧道 (连接至 Doris 4.0) ...) # 建立隧道本地 - SSH(10.8.15.157) - Doris FE(127.0.0.1:9030) server SSHTunnelForwarder( (SSH_HOST, SSH_PORT), ssh_usernameSSH_USER, ssh_passwordSSH_PASSWORD, remote_bind_address(DORIS_LOCAL_HOST, DORIS_QUERY_PORT) ) server.start() logger.info(f SSH 隧道建立成功! 本地端口: {server.local_bind_port}) logger.info( 2. 连接 Doris 数据库 ...) conn pymysql.connect( host127.0.0.1, portserver.local_bind_port, userDORIS_DB_USER, passwordDORIS_DB_PWD, charsetutf8mb4, cursorclasspymysql.cursors.DictCursor, autocommitTrue ) cursor conn.cursor() # --- 测试步骤 --- # 步骤 1: 初始化 Catalog init_doris_catalog(cursor) # 步骤 2: 确认源端 Paimon 可读 check_paimon_source(cursor) # 步骤 3: 准备 Doris 4.0 目标表 create_doris_target_table(cursor) # 步骤 4: 执行导入 execute_etl_paimon_to_doris(cursor) # Doris 4.0 导入通常非常快但仍建议稍微sleep time.sleep(2) # 步骤 5: 校验结果 verify_data_consistency(cursor) logger.info( Doris 4.0 与 Paimon 集成测试全部完成 ) except Exception as e: logger.error(f主流程发生错误: {e}) import traceback logger.error(traceback.format_exc()) finally: if conn: conn.close() logger.info(数据库连接已关闭) if server: server.stop() logger.info(SSH 隧道已关闭) if __name__ __main__: main_process()对应的log文件内容如下2025-12-12 15:55:44,709 - INFO - 1. 正在建立 SSH 隧道 (连接至 Doris 4.0) ... 2025-12-12 15:55:45,144 - INFO - SSH 隧道建立成功! 本地端口: 60651 2025-12-12 15:55:45,144 - INFO - 2. 连接 Doris 数据库 ... 2025-12-12 15:55:45,353 - INFO - 正在执行: [init_doris_catalog] ... 2025-12-12 15:55:45,354 - INFO - 正在初始化 Doris Catalog: paimon_catalog ... 2025-12-12 15:55:45,359 - INFO - 发送 Create Catalog 请求... 2025-12-12 15:55:45,437 - INFO - Catalog 创建成功 2025-12-12 15:55:47,438 - INFO - 执行完成: [init_doris_catalog] | 耗时: 2.0848 秒 2025-12-12 15:55:47,444 - INFO - 正在执行: [check_paimon_source] ... 2025-12-12 15:55:47,445 - INFO - 检查 Paimon 数据源: paimon_catalog.ods.paimon_source_event 2025-12-12 15:55:48,296 - INFO - Paimon 数据预览 (前5条): 2025-12-12 15:55:48,296 - INFO - {user_id: 1001, item_id: 501, behavior: click, dt: 2025-12-12, ts: datetime.datetime(2025, 12, 12, 10, 0, 0, 123000)} 2025-12-12 15:55:48,296 - INFO - {user_id: 1002, item_id: 502, behavior: view, dt: 2025-12-12, ts: datetime.datetime(2025, 12, 12, 10, 5, 0, 456000)} 2025-12-12 15:55:48,296 - INFO - {user_id: 1003, item_id: 501, behavior: buy, dt: 2025-12-12, ts: datetime.datetime(2025, 12, 12, 10, 10, 0, 789000)} 2025-12-12 15:55:48,296 - INFO - {user_id: 1001, item_id: 503, behavior: view, dt: 2025-12-13, ts: datetime.datetime(2025, 12, 13, 11, 0)} 2025-12-12 15:55:48,297 - INFO - {user_id: 1004, item_id: 501, behavior: click, dt: 2025-12-13, ts: datetime.datetime(2025, 12, 13, 11, 5)} 2025-12-12 15:55:48,297 - INFO - 执行完成: [check_paimon_source] | 耗时: 0.8523 秒 2025-12-12 15:55:48,297 - INFO - 正在执行: [create_doris_target_table] ... 2025-12-12 15:55:48,345 - INFO - Doris 4.0 内表 doris4_target_event_sink 已准备就绪 2025-12-12 15:55:48,345 - INFO - 执行完成: [create_doris_target_table] | 耗时: 0.0479 秒 2025-12-12 15:55:48,345 - INFO - 正在执行: [execute_etl_paimon_to_doris] ... 2025-12-12 15:55:48,345 - INFO - 开始执行从 Paimon 到 Doris 4.0 的数据导入 (ETL) 2025-12-12 15:55:48,515 - INFO - ETL SQL 提交完毕 2025-12-12 15:55:48,516 - INFO - 执行完成: [execute_etl_paimon_to_doris] | 耗时: 0.1707 秒 2025-12-12 15:55:50,517 - INFO - 正在执行: [verify_data_consistency] ... 2025-12-12 15:55:50,517 - INFO - 开始数据一致性校验 2025-12-12 15:55:51,382 - INFO - Paimon 源表行数: 5 2025-12-12 15:55:51,384 - INFO - Doris 目标表行数: 5 2025-12-12 15:55:51,385 - INFO - ✅ 测试通过数据条数一致 2025-12-12 15:55:51,438 - INFO - Doris 内表数据抽样: 2025-12-12 15:55:51,438 - INFO - {user_id: 1001, item_id: 501, dt: 2025-12-12, behavior: click, ts: datetime.datetime(2025, 12, 12, 10, 0, 0, 123000)} 2025-12-12 15:55:51,438 - INFO - {user_id: 1002, item_id: 502, dt: 2025-12-12, behavior: view, ts: datetime.datetime(2025, 12, 12, 10, 5, 0, 456000)} 2025-12-12 15:55:51,439 - INFO - {user_id: 1003, item_id: 501, dt: 2025-12-12, behavior: buy, ts: datetime.datetime(2025, 12, 12, 10, 10, 0, 789000)} 2025-12-12 15:55:51,439 - INFO - 执行完成: [verify_data_consistency] | 耗时: 0.9222 秒 2025-12-12 15:55:51,439 - INFO - Doris 4.0 与 Paimon 集成测试全部完成 2025-12-12 15:55:51,440 - INFO - 数据库连接已关闭 2025-12-12 15:55:51,444 - INFO - SSH 隧道已关闭9.3 验证数据去Doris终端验证数据结果如下mysql -h 10.x.xx.157 -P 9030 -uroot执行下述sqlSWITCH internal; SHOW DATABASES; USE doris4_paimon_test_db; SHOW TABLES;SELECT * FROM doris4_target_event_sink ORDER BY user_id;也可以在同一个查询窗口中直接对比两边的数量不需要反复 SWITCH-- 这里的 internal 和 paimon_catalog 是 Catalog 名称 SELECT (SELECT count(*) FROM internal.doris4_paimon_test_db.doris4_target_event_sink) as doris_count, (SELECT count(*) FROM paimon_catalog.ods.paimon_source_event) as paimon_count;