竞价是什么意思,网站seo优化建议,上海网站制作培训班,北京seo设计公司前面我们已经了解了 Flink 几个核心概念#xff0c;分别是时间、Watermark 已经窗口。今天我们来一起了解下 Flink 是怎么进行多个流的 Join 的。我们今天从两个流的 Join 来入手#xff0c;扩展到多个流也是一样的道理。Flink 中的 Join 可以分为两种#xff1a;Window Joi…前面我们已经了解了 Flink 几个核心概念分别是时间、Watermark 已经窗口。今天我们来一起了解下 Flink 是怎么进行多个流的 Join 的。我们今天从两个流的 Join 来入手扩展到多个流也是一样的道理。Flink 中的 Join 可以分为两种Window Join 和 Interval Join。Window JoinWindow Join 是将两个流中在相同窗口中且有相同 key 的元素进行关联。关联后可以使用 JoinFunction 和 FlatJoinFunction 进行处理。Window Join 可以根据窗口类型分为三种Tumbling Window Join、Sliding Window Join 和 Session Window Join。Tumbling Window Join首先来看Tumbling Window Join其实就是对应的使用滚动窗口进行 Join。TumblingWindowJoin具体使用方法如下DataStreamTuple2String, Double result source1.join(source2).where(record - record.f0).equalTo(record - record.f0).window(TumblingEventTimeWindows.of(Time.seconds(2L))).apply(new JoinFunctionTuple2String, Double, Tuple2String, Double, Tuple2String, Double() {Overridepublic Tuple2String, Double join(Tuple2String, Double record1, Tuple2String, Double record2) throws Exception {return Tuple2.of(record1.f0, record1.f1);}});其中 source1 和 source2 分别代表两个流where 为 source1 的 join key 提取方法equalTo 为 source2 的 join key 提取方法最后join 好之后的数据通过 JoinFunction 来处理。Sliding Window JoinSliding Window Join 和 Tumbling Window Join 的用法基本一致只是将窗口指定为滑动窗口。SlidingWindowJoinSession Window JoinSession Window Join 也类似只是指定的窗口不同具体的处理流程都是一样的这里也不过多解释。Interval JoinInterval Join 是将两个流中 key 相同且一个流的 timestamp 处于另一个流的 timestamp 上下波动范围内。假设我们有两个流 a 和 bInterval Join可以表达为b.timestamp ∈ [a.timestamp lowerBound; a.timestamp upperBound] 或 a.timestamp lowerBound b.timestamp a.timestamp upperBound。需要注意的是目前 Interval Join 仅支持 event time。IntervalJoin它的使用方法也很简单只需要定义上下偏移量以及处理函数即可。DataStreamTuple2String, Double intervalJoinResult source1.keyBy(record - record.f0).intervalJoin(source2.keyBy(record - record.f0)).between(Time.seconds(-2), Time.seconds(2)).process(new ProcessJoinFunctionTuple2String, Double, Tuple2String, Double, Tuple2String, Double() {Overridepublic void processElement(Tuple2String, Double record1, Tuple2String, Double record2, ProcessJoinFunctionTuple2String, Double, Tuple2String, Double, Tuple2String, Double.Context context, CollectorTuple2String, Double out) throws Exception {out.collect(Tuple2.of(record1.f0, record1.f1 record2.f1));}});CoGroup前面介绍的两种 Join 都是 inner join那么 Flink 有没有办法支持 left join 呢答案是肯定的我们可以使用 coGroup 来实现。coGroup 的通用用法如下stream.coGroup(otherStream).where(KeySelector).equalTo(KeySelector).window(WindowAssigner).apply(CoGroupFunction);我们通过自定义 CoGroupFunction 来实现 left join。private static class LeftJoinFunction implements CoGroupFunctionTuple2String, Double, Tuple2String, Double, Tuple2String, Double {Overridepublic void coGroup(IterableTuple2String, Double iterable1, IterableTuple2String, Double iterable2, CollectorTuple2String, Double collector) throws Exception {for (Tuple2String, Double record1 : iterable1) {boolean match false;for (Tuple2String, Double record2 : iterable2) {match true;collector.collect(Tuple2.of(record1.f0, record1.f1 record2.f1));}if (!match) {System.out.println(没有join的元素 key: record1.f0);collector.collect(Tuple2.of(record1.f0, record1.f1));}}}}在 coGroupFunction 中需要实现 coGroup 方法方法的参数包括两个输入流的 Iterable 和输出的 collector。如果第二个流中没有匹配的元素那么就直接输出第一个流的元素。总结最后来总结一下Flink 中有两种 Join 方法分别为 Window Join 和 Interval JoinWindow Join 是依赖窗口来执行对窗口内的元素进行 joinInterval Join 不依赖窗口是根据 event time 的范围来进行 join。最后还介绍了 CoGroup我们可以使用 CoGroup 来实现 left join 和 right join。