中国沙漠建设 志愿者 兵团官方网站网络推广是做什么工作的
中国沙漠建设 志愿者 兵团官方网站,网络推广是做什么工作的,深度网络,马鞍山建站在使用 Flink SQL 进行实时数据处理的过程中#xff0c;双流 Join 是非常常见的操作之一。典型的场景包括分析广告效果#xff08;曝光流订单流实时关联#xff09;、实时推荐#xff08;点击流和商品信息#xff09;等等。然而#xff0c;双流 Join 需要在状态中维护两侧…在使用 Flink SQL 进行实时数据处理的过程中双流 Join 是非常常见的操作之一。典型的场景包括分析广告效果曝光流订单流实时关联、实时推荐点击流和商品信息等等。然而双流 Join 需要在状态中维护两侧全量的历史数据以确保计算结果的准确性。随着作业的持续运行双流 Join 会逐渐带来一些问题运维层面状态过大开发者需要不断加大作业的资源才能维持较高的吞吐。Checkpoint 易超时导致作业不稳定、持续 Failover。状态是 Flink 内部产物排查问题时其内部数据难以探查。开发层面Query 迭代修改后状态难以复用且重启回追代价高。为了解决这些问题Flink 社区在 2.1 引入了新的 Delta Join 算子并在 2.2 对其进行了进一步的扩展。Delta Join 的核心思想是舍弃算子内本地状态冗余的数据存储利用双向 Lookup Join 直接查询源表中的数据而非查询状态中的数据从而复用源表数据。Delta Join 结合流存储 Apache Fluss在阿里巴巴淘宝天猫团队成功落地并且对比双流 Join拥有如下几个优势消除了将近 50 TB 的 双流 Join 状态计算 CU 降低 10 倍作业恢复速度提升 87 %Checkpoint 秒级完成Flink Delta Join 介绍请参考《Delta Join为超大规模流处理实现计算与历史数据解耦》https://developer.aliyun.com/article/169055801双流 Join 实现原理让我们先简单描述 Flink 双流 Join 的工作原理。我们以处理左侧表来的 changelog 数据为例流入的数据主要经过以下三个阶段。1. 通过 Join Key 查询对侧即右侧的状态获取右侧历史上曾经流入该算子的全量数据。2. 使用 Join 条件过滤查询得到的数据并输出。3. 将输入的本条数据存入本侧即左侧的状态中以供后续右侧的数据来临时能正确的匹配数据。之所以要把所有的数据用状态记录下来是因为流计算是没有边界的左侧数据和右侧数据匹配的时间点会存在时间差即使一侧的数据延迟到达也需要保证可以关联上另一侧的数据最终输出。双流 Join 的算法确保了数据的正确性但是其状态会随着时间的推移而无限制增大成为影响作业资源消耗和稳定性的关键因素。虽然目前已有 Interval Join[1]、Lookup Join[2]、State TTL Hint[3] 等手段来缓解或解决该问题但是均面向了特定的业务场景牺牲了一定的功能如 Lookup Join 舍弃了维表侧数据的更新追踪State TTL Hint 放弃匹配超过 TTL 期限的数据。02Delta Join 技术原理从双流 Join 的原理上我们可以观察到状态里记录的全量数据与源表中的数据基本相同那么一个直观的想法是可以复用源表的数据来取代原有的状态。Delta Join 正是基于这个思路它利用了外部存储系统提供的索引能力并不从状态中查找数据而是直接对外部存储发出高效的、基于索引的数据查询以获取匹配的记录。通过这种方式Delta Join 消除了双流 Join 状态与外部系统之间冗余的数据存储。理论推导我们以两路输入为例增量更新 Join 结果的公式为其中A 代表了左表的全量历史数据,代表了左表中的增量数据。B和的定义与此类似。每当我们需要计算 Join 结果的增量部分时我们只需要获取源表中从上次计算到当前时间之间新生成的数据并查询对侧源表中的历史快照数据。因此我们需要1. 感知源表的增量数据2. 访问源表历史快照数据这对源表的物理存储引擎提出了很高的要求存储引擎需要支持快照隔离以确保强一致性语义。然而目前存在以下几个问题1. 目前只有有限的存储支持了快照的概念例如 Paimon、Iceberg、Hudi 等等2. 快照生成的时间间隔为分钟级别无法满足实时处理的要求3. 当指定快照查询数据时快照可能会在存储系统中过期考虑到上述这些问题Flink 2.1 提出了一种满足实时性要求的、最终一致性的 Delta Join 方案。最终一致性语义的 Delta Join最终一致性语义的 Delta Join 并不要求源表的存储引擎支持快照。它总是去查询源表当前最新的数据。其对应的变种公式如下和强一致性 Delta Join 相比最终一致性 Delta Join 多出了一部分额外的中间结果因此这种方法只能确保最终的结果是一致的。以下是双流 Join 和两种语义的 Delta Join 的对比。双流 Join强一致性 Delta Join最终一致性 Delta Join延迟低高低状态大小大小小状态内数据详情两侧输入全量明细数据上一次触发计算的源表快照id等待触发计算的异步队列数据一致性强一致性强一致性最终一致性03Delta Join 算子实现为了提高算子的吞吐在 Delta Join 算子中分别引入了一个 TableAsyncExecutionController 组件和两个双侧完全相同的 DeltaJoinRunner 组件。TableAsyncExecutionController 原理该组件由 FLIP-519 Introduce async lookup key ordered mode[4] 引入其严格限制相同 key 之间的数据必须串型执行而允许不同 key 之间的数据并行处理同时结合异步处理机制大大提高了算子的吞吐能力。该组件的运行原理如下TableAsyncExecutionController 在接收到数据后按照 key 放入 BlockingBuffer 内不同 key 的队列里然后通过 KeyAccountingUnit 检查该 key 是否被抢占、有对应的数据正在执行。如果 key 被抢占直接返回如果 key 未被抢占则抢占该 key 同时 poll 队列数据放入 ActiveBuffer交给后续计算逻辑处理同时注册回调函数在数据处理结束、输出后在 KeyAccountingUnit 内释放该 key去 BlockingBuffer 内拿下一条数据。这套机制保证了相同 key 之间的数据是串行执行的以避免出现分布式乱序问题。该机制在某种程度上是 FLIP-425 Asynchronous Execution Model[5] 的简化版本感兴趣的可以另行研究。在实际场景下Delta Join 算子的吞吐会受到 BlockingBuffer 能允许的最大容量各个 key 的队列大小之和影响当 BlockingBuffer 最大容量过小时即使收到的每个 key 都不一样也会由于无法充分利用异步并行的能力而导致吞吐较小。此时可以适当调整下面的参数来增大 BlockingBuffer 的最大容量。但如果设置的过大BlockingBuffer 会占用比较高的内存同时也可能会给外部存储带来较大的查询压力。// 默认 100table.exec.async-lookup.buffer-capacity: 1000我们可以通过监测 Delta Join 算子内以下几个 metric来判断是否需要调整该参数。aec_blocking_size当前 BlockingBuffer 内被阻塞的所有 key 的队列大小之和。该值越大代表 join key 较为密集考虑开启或增大 delta join cache该值越小但吞吐不佳的情况下考虑增大table.exec.async-lookup.buffer-capacity的值。aec_inflight_size当前 ActiveBuffer 内正在执行计算的数据数量。该值越大代表当前同时请求外部存储集群的数据较多存在请求堆积的情况需要进一步查看外部存储系统是否存在异常或查看是否有相关参数可以提高查询效率该值越小代表 join key 较为密集考虑开启或增大 delta join cache。注当 Fluss 流存储的表作为 Delta Join 的源表时你可以通过 Flink Table Hint[6]在 Fluss 表上配置以下这些关键参数来提高查询效率。client.lookup.queue-sizeclient.lookup.max-batch-sizeclient.lookup.max-inflight-requestsclient.lookup.batch-timeout具体请参考 Fluss Connector Options[7]04DeltaJoinRunner 原理DeltaJoinRunner 是负责执行 Lookup 的组件。由于 Delta Join 算子会处理两侧的数据因此对于不同侧的数据各有一个完全相同的 DeltaJoinRunner 负责 Lookup 对应表的数据。想象一下如果我们对每条数据都要去外部存储进行查询对外部吞吐的压力会非常大算子的吞吐性能完全取决于请求外部系统的吞吐。但如果用普通的 cache 来对 Lookup 的数据进行缓存Lookup 目标表的数据更新消息将无法订阅。为此我们引入了驱动侧仅构建、Lookup 侧仅更新的特殊 cache。DeltaJoinRunner 组件的运行原理如下图例是用于左侧输入流查询右侧源表的 DeltaJoinRunner分别由 LocalCache 和 LookupFetcher 组成。当左侧数据到达时先去 LocalCache 查询是否有 cache。当有 cache 时直接输出当没有 cache 时借助 LookupFetcher 通过右表的 index 查询右表的数据然后将查询回来的数据在 LocalCache 中构建 cache最后输出。同时右表的数据到达时将会查看此 DeltaJoinRunner 中的 LocalCache 是否有 cache。如果没有cache忽略更新如果有 cache更新 cache。该 cache 机制一方面确保了在 join key 较为密集的场景算子的吞吐能够得到巨大的提升同时对外部存储也不会构成很大的查询压力另一方面确保了对侧最新的数据能够更新 cache从而在后续的流程中能被正确地匹配上。该 cache 是一个 LRU 的 cache合理的设置该 cache 的大小是非常必要的。过小的 cache 大小将导致 cache 的命中率受到影响过大的 cache 会占用较多的内存。我们可以通过下面的参数来分别调节左右两侧 cache 的大小甚至是在每条数据 join key 都不相同、cache 基本无用时关闭 cache。// 是否启用cache默认为 truetable.exec.delta-join.cache-enabled: true// 设置用于缓存左表数据的cache大小默认为 10000// 推荐在左表较小、或右流 join key 较为密集时设置较大值table.exec.delta-join.left.cache-size: 10000// 设置用于缓存右表数据的cache大小默认为 10000// 推荐在右表较小、或左流 join key 较为密集时设置较大值table.exec.delta-join.right.cache-size: 10000我们可以通过监测 Delta Join 算子上的 metric来判断是否需要适当增加 cache 的大小。deltaJoin_leftCache_hitRate: 在右流查询左表的场景下缓存左表数据的 cache 的命中率百分比。该值越高越好。deltaJoin_rightCache_hitRate在左流查询右表的场景下缓存右表数据的 cache 的命中率百分比。该值越高越好。注该图来自于“实战”章节 Nexmark q20 变种 query。右表 Auction 表每次都产生不同的id故而deltaJoin_leftCache_hitRate的命中率始终为 0。05实战我们借用 nexmark 数据集[8] 中 q20 的 query略微修改后作为本次实战的样例代码。-- 获取包含相应拍卖信息的出价表INSERT INTO nexmark_q20SELECT auction, bidder, price, channel, url, B.dateTime, B.extra, itemName, description, initialBid, reserve, A.dateTime, expires, seller, category, A.extraFROM bid AS B INNER JOIN auction AS A on B.auction A.id;-- WHERE A.category 10;方式一使用 Docker 环境测试1. 环境准备1类 Unix 操作系统如 Linux、Mac OS X2内存建议至少 4 GB磁盘建议至少 4 GB2. 下载 Docker 镜像在命令行中运行如下命令安装 Docker 测试镜像。docker pull xuyangzzz/delta_join_example:1.0运行如下命令运行该测试镜像进入测试 docker container 的命令行。docker run -p 8081:8081 -p 9123:9123 --name delta_join_example -it xuyangzzz/delta_join_example:1.0 bash3. 运行任务 SQL# 运行 flink 和 fluss 集群./start-flink-fluss.sh # 创建相关表和 delta join 作业./create-tables-and-run-delta-join.sh此时在宿主机localhost:8081或其他绑定的端口即可查看 Flink UI 界面可以看到此时 Delta Join 作业正在运行。4. 插入数据到源表在测试 docker container 中执行下面的命令为源表插入数据。# 在源表插入数据./insert-data.sh5. 观察 Delta Join 作业在宿主机localhost:8081或其他绑定的端口的 flink-ui 界面就可以看到 Delta Join 作业在正常消费数据了。方式二手工搭建环境测试1. 环境准备1运行环境a. 类 Unix 操作系统如 Linux、Mac OS Xb. 内存建议至少 4 GB磁盘建议至少 4 GBc. Java 11 及以上版本且将环境变量JAVA_HOME设置为 Java 的安装目录2准备 Apache Flink 计算引擎a. 下载在 Apache Flink 官方下载网站[9] 下载最新的 Flink 2.2.0 版本并解压。b. 修改相关配置修改 ./conf/config.yaml 文件将 TaskManager numberOfTaskSlots 设置成 4 默认为13准备 Apache Fluss 流存储引擎在 Apache Fluss 官方下载网站[9] 分别下载 Fluss 0.8 版本并解压和适配 Apahce Flink 2.1 的连接器。4准备 Nexmark 源数据生成器下载 Nexmark 项目[10] master 分支在该项目根目录下用 maven-3.8.6 版本执行以下的 maven 命令mvn clean install -DskipTeststrue在./nexmark-flink/target/ 文件夹下将会生成 nexmark-flink-0.3-SNAPSHOT.jar 文件2. 服务启动1启动 Flink将 Fluss 适配 Flink 2.1 的连接器以及 Nexmark 项目生成的 nexmark-flink-0.3-SNAPSHOT.jar 文件放入 Flink 目录的 ./lib 目录下。参考 Flink 本地模式安装文档[11]在 Flink 目录中执行下面的语句启动本地 Standalone 集群。## 请确保在 Flink 目录下执行该语句./bin/start-cluster.sh检查 http://localhost:8081/#/overview 界面是否可正常访问。2启动 Fluss参考 Fluss 部署 Local Cluster 文档[12]在 Fluss 目录下执行下面的语句启动本地集群。## 请确保在 Fluss 目录下执行该语句./bin/local-cluster.sh start3. 运行任务 SQL1创建 Fluss 表将下面的 SQL 代码保存为“prepare_table.sql”文件其中定义了 2 张源表和 1 张结果表。CREATE CATALOG fluss_catalogWITH ( typefluss ,bootstrap.serverslocalhost:9123); USE CATALOG fluss_catalog; CREATE DATABASE IF NOT EXISTS my_db; USE my_db; -- 创建左侧源表CREATE TABLE IF NOT EXISTS fluss_catalog.my_db.bid( auction BIGINT ,bidder BIGINT ,price BIGINT ,channel VARCHAR ,url VARCHAR ,dateTime TIMESTAMP(3) ,extra VARCHAR ,PRIMARY KEY (auction, bidder) NOT ENFORCED)WITH (-- fluss prefix lookup key可用于 index bucket.keyauction-- Flink 2.2 中delta join 仅支持消费不带 delete 操作的 cdc 源表 ,table.delete.behaviorIGNORE); -- 创建右侧源表CREATE TABLE IF NOT EXISTS fluss_catalog.my_db.auction( id BIGINT ,itemName VARCHAR ,description VARCHAR ,initialBid BIGINT ,reserve BIGINT ,dateTime TIMESTAMP(3) ,expires TIMESTAMP(3) ,seller BIGINT ,category BIGINT ,extra VARCHAR ,PRIMARY KEY (id) NOT ENFORCED)WITH (-- Flink 2.2 中delta join 仅支持消费不带 delete 操作的 cdc 源表 table.delete.behaviorIGNORE); -- 创建 delta join 写入的结果表CREATE TABLE IF NOT EXISTS fluss_catalog.my_db.delta_join_sink( auction BIGINT ,bidder BIGINT ,price BIGINT ,channel VARCHAR ,url VARCHAR ,bid_dateTime TIMESTAMP(3) ,bid_extra VARCHAR ,itemName VARCHAR ,description VARCHAR ,initialBid BIGINT ,reserve BIGINT ,auction_dateTime TIMESTAMP(3) ,expires TIMESTAMP(3) ,seller BIGINT ,category BIGINT ,auction_extra VARCHAR ,PRIMARY KEY (auction, bidder) NOT ENFORCED);在 Flink 目录下执行下面的语句创建持久化的表。## 请确保在 Flink 目录下执行该语句## 注意请将 ${your_path} 替换为 prepare_table.sql 实际所在的目录./bin/sql-client.sh -f ${your_path}/prepare_table.sql2启动 Delta Join 作业将下面的 SQL 代码保存为“run_delta_join.sql”文件其中包含了可转化为 delta join 的 q20 变体查询。CREATE CATALOG fluss_catalogWITH ( typefluss ,bootstrap.serverslocalhost:9123); USE CATALOG fluss_catalog; USE my_db; INSERT INTO delta_join_sinkSELECT auction ,bidder ,price ,channel ,url ,B.dateTime ,B.extra ,itemName ,description ,initialBid ,reserve ,A.dateTime ,expires ,seller ,category ,A.extraFROM bid AS BINNER JOIN auction AS AON B.auction A.id;在 Flink 目录下执行下面的语句启动 delta join 作业。## 请确保在 Flink 目录下执行该语句## 注意请将 ${your_path} 替换为 run_delta_join.sql 实际所在的目录./bin/sql-client.sh -f ${your_path}/run_delta_join.sql在 Flink UI 上我们可以看到 Delta Join 作业正常跑起来了。4. 插入数据到源表将下面的 SQL 代码保存为“insert_data.sql”文件其中包含了向两张源表灌入 Nexmark 数据源产生模拟数据的作业。CREATE CATALOG fluss_catalogWITH ( type fluss ,bootstrap.servers localhost:9123); USE CATALOG fluss_catalog; USE my_db; -- nexmark 模拟数据源CREATE TEMPORARY TABLE datagen( event_type int ,person ROW id BIGINT ,name VARCHAR ,emailAddress VARCHAR ,creditCard VARCHAR ,city VARCHAR ,state VARCHAR ,dateTime TIMESTAMP(3) ,extra VARCHAR ,auction ROW id BIGINT ,itemName VARCHAR ,description VARCHAR ,initialBid BIGINT ,reserve BIGINT ,dateTime TIMESTAMP(3) ,expires TIMESTAMP(3) ,seller BIGINT ,category BIGINT ,extra VARCHAR ,bid ROW auction BIGINT ,bidder BIGINT ,price BIGINT ,channel VARCHAR ,url VARCHAR ,dateTime TIMESTAMP(3) ,extra VARCHAR ,dateTime AS CASE WHEN event_type 0 THEN person.dateTime WHEN event_type 1 THEN auction.dateTime ELSE bid.dateTime END ,WATERMARK FOR dateTime AS dateTime - INTERVAL 4 SECOND)WITH ( connector nexmark -- 下面两个参数为每秒数据生成速度 ,first-event.rate 1000 ,next-event.rate 1000 -- 生成的数据总条数过大可能导致 OOM ,events.num 100000 -- 下面三个参数为 Bid/Auction/Persion 三个数据的生成占比 ,person.proportion 2 ,auction.proportion 24 ,bid.proportion 24); CREATE TEMPORARY VIEW auction_viewAS SELECT auction.id ,auction.itemName ,auction.description ,auction.initialBid ,auction.reserve ,dateTime ,auction.expires ,auction.seller ,auction.category ,auction.extraFROM datagenWHERE event_type 1; CREATE TEMPORARY VIEW bid_viewAS SELECT bid.auction ,bid.bidder ,bid.price ,bid.channel ,bid.url ,dateTime ,bid.extraFROM datagenWHERE event_type 2; INSERT INTO bidSELECT *FROM bid_view; INSERT INTO auctionSELECT *FROM auction_view;在 Flink 目录下执行下面的语句启动两个将 nexmark 模拟数据写入源表的作业。## 请确保在 Flink 目录下执行该语句## 注意请将 ${your_path} 替换为 insert_data.sql 实际所在的目录./bin/sql-client.sh -f ${your_path}/insert_data.sql5. 观察 Delta Join 作业重新点击 Flink UI 上的 Delta Join 作业可以看到 Delta Join 作业正常在消费数据了。06现状和未来工作目前 Delta Join 仍然在持续演进中Flink 2.2 已经支持了一些常用的 SQL pattern具体可以参考文档[13]。在未来我们将会持续推进以下几个方向1. 持续完善最终一致性 Delta Join1支持 Left / Right Join2支持消费 Delete3支持级联 Delta Join2. 结合 Paimon/Iceberg/Hudi 等支持快照的存储支持分钟级的强一致性 Delta Join参考文档1. Apache Flink 社区 Delta Join 用户文档 https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/tuning/2. Apache Flink 社区 Delta Join FLIP https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3AIntroduceANewDeltaJoin?srccontextnavpagetreemode3. Apache Fluss (Incubating) 社区 Delta Join 用户文档 https://fluss.apache.org/docs/engine-flink/delta-joins/参考链接[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#interval-joins[2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join[3] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#state-ttl-hints[4] https://cwiki.apache.org/confluence/display/FLINK/FLIP-519:Introduceasynclookupkeyorderedmode[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3AAsynchronousExecutionModel[6] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#dynamic-table-options[7] https://fluss.apache.org/docs/engine-flink/options/#lookup-options[8] https://github.com/nexmark/nexmark/[9] https://flink.apache.org/downloads/[10] https://github.com/nexmark/nexmark/tree/master[11] https://nightlies.apache.org/flink/flink-docs-release-2.2/zh/docs/try-flink/local_installation/#%e6%ad%a5%e9%aa%a4-2%e5%90%af%e5%8a%a8%e9%9b%86%e7%be%a4[12] https://fluss.apache.org/docs/install-deploy/deploying-local-cluster/[13] https://nightlies.apache.org/flink/flink-docs-release-2.2/zh/docs/dev/table/tuning/#%E6%94%AF%E6%8C%81%E7%9A%84%E5%8A%9F%E8%83%BD%E5%92%8C%E9%99%90%E5%88%B6▼ 「FlinkHologres 搭建实时数仓」 ▼复制下方链接或者扫描二维码即可快速体验 “一体化的实时数仓联合解决方案”了解活动详情https://www.aliyun.com/solution/tech-solution/flink-hologres▼ 关注「Apache Flink」 ▼回复 FFA 2024 获取大会资料点击「阅读原文」跳转阿里云实时计算 Flink