当前位置: 首页 > news >正文

大型门户网站建设价格首码项目推广平台

大型门户网站建设价格,首码项目推广平台,湖南企业竞价优化首选,互助平台网站制作背景 当flink消费kafka的消息时,我们经常会用到FlinkKafkaConsumer进行水位线的发送,本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程 FlinkKafkaConsumer水位线发送 1.首先从Fetcher类开始&#xff0c…

背景

当flink消费kafka的消息时,我们经常会用到FlinkKafkaConsumer进行水位线的发送,本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程

FlinkKafkaConsumer水位线发送

1.首先从Fetcher类开始,创建Fetcher类的时候会构建一个周期性的水位线发送线程并启动

        // if we have periodic watermarks, kick off the interval schedulerif (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {PeriodicWatermarkEmitter<T, KPH> periodicEmitter =new PeriodicWatermarkEmitter<>(checkpointLock,subscribedPartitionStates,watermarkOutputMultiplexer,processingTimeProvider,autoWatermarkInterval);periodicEmitter.start();}

2.随后,PeriodicWatermarkEmitter中注册处理时间定时器,周期性执行

        public void start() {timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}@Overridepublic void onProcessingTime(long timestamp) {synchronized (checkpointLock) {for (KafkaTopicPartitionState<?, ?> state : allPartitions) {// 这里当前算子任务消费的kafka 分区分别记录每个分区的水位值state.onPeriodicEmit();}//这里当前算子会把自己消费的kafka分区的所有水位线取最小值后当成当前算子任务自身的水位线发送出去,注意这里是当前算子任务级别的watermarkOutputMultiplexer.onPeriodicEmit();}// schedule the next watermarktimerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}}

3.对应state.onPeriodicEmit();记录每个kafka分区的水位线方法

    @Overridepublic void onPeriodicEmit(WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next = wms.getCurrentWatermark();if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}
其中 WatermarkOutput output.emitWatermark(new Watermark(next.getTimestamp()))代码如下:public DeferredOutput(OutputState state) {this.state = state;}@Overridepublic void emitWatermark(Watermark watermark) {state.setWatermark(watermark.getTimestamp());}
所以这里最终效果只是对应state(kafka分区[注意,一个算子任务有可能消费好几个kafka分区])上设置了水位线/*** Returns true if the watermark was advanced, that is if the new watermark is larger than* the previous one.** <p>Setting a watermark will clear the idleness flag.*/public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;// 这里也可以看出来,即使代码里面发送了更小值的水位线,水位线也不会回退this.watermark = Math.max(watermark, this.watermark);return updated;}        

4.对应算子任务组合当前任务消费的所有分区水位线的方法

private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit that// 如果算子任务不消费任何分区,它不会发出任何水位线,这里是不是就是kafka消费者要小于kafka主题的原因所在???if (!hasOutputs) {return;}if (allIdle) {// 如果当前算子任务处于空闲时间,标识空闲,以便后续算子可以继续推进underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}```
http://www.hrbkazy.com/news/17258.html

相关文章:

  • 如何优化公司的网站郑州seo优化服务
  • wordpress内存使用搜索引擎优化seo优惠
  • php网站整合discuz如何在百度上添加店铺的位置
  • jsp做网站步骤东莞网络优化哪家公司好
  • 手机怎么建立自己的网站营销策划推广
  • 免费的企业网站源码自媒体视频发布平台
  • 然后做网站论坛推广的特点
  • 佳木斯做网站的公司网店如何引流与推广
  • 网站的站点地图设计优化大师有必要花钱吗
  • 蒙特网设计公司汉中seo培训
  • 昆明网站建设猫咪深圳专门做seo的公司
  • 宝安网站建设知名的搜索引擎优化
  • 网站在线咨询怎么做百度人工客服电话多少
  • 像wordpress一样的网站吗深圳网站优化排名
  • 深圳企业建站高性价比的选择中国seo关键词优化工具
  • 潍坊做网站公司潍坊网络公司sem账户托管公司
  • ajax数据库网页网站设计鹤壁网络推广哪家好
  • wordpress美女站主题谷歌浏览器下载手机版中文
  • 淄博住房和城乡建设局网站怎么做app推广和宣传
  • wordpress首页搭建搜索广告优化
  • 中小企业网站官网网络营销案例分析论文
  • 怎么把自己做的网站上传到网上百度营销登录入口
  • 如何做网站的seo优化网络营销方案如何写
  • python网站开发论文经典广告推广词
  • 南冒网站建设制作推广公司东莞网络推广优化排名
  • 网站推广计划seo关键词排名优化官网
  • 怎么查询网站是什么时候做的网络宣传渠道有哪些
  • 东莞网站建设推广有哪些新区快速seo排名
  • 做本地网站需要什么资质你对网络营销的理解
  • 网站页面布局分析网络推广教程