当前位置: 首页 > news >正文

交易网站怎么做近期重大新闻事件

交易网站怎么做,近期重大新闻事件,深圳微商城网站制作公司,网站还建设 域名可以备案吗背景 FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线,比如这条特殊的记录代表一个完整记录的结束等,本文就来解析下发送punctuated水位线的源码 punctuated 水位线发送源码解析 1.首先KafkaFetcher中的runFetchLoop方法 public…

背景

FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线,比如这条特殊的记录代表一个完整记录的结束等,本文就来解析下发送punctuated水位线的源码

punctuated 水位线发送源码解析

1.首先KafkaFetcher中的runFetchLoop方法

public void runFetchLoop() throws Exception {try {// kick off the actual Kafka consumerconsumerThread.start();while (running) {// this blocks until we get the next records// it automatically re-throws exceptions encountered in the consumer threadfinal ConsumerRecords<byte[], byte[]> records = handover.pollNext();// get the records for each topic partitionfor (KafkaTopicPartitionState<T, TopicPartition> partition :subscribedPartitionStates()) {List<ConsumerRecord<byte[], byte[]>> partitionRecords =records.records(partition.getKafkaPartitionHandle());
// 算子任务消费的每个分区都调用这个方法partitionConsumerRecordsHandler(partitionRecords, partition);}}} finally {// this signals the consumer thread that no more work is to be doneconsumerThread.shutdown();}

2.查看partitionConsumerRecordsHandler方法处理当前算子任务对应的每个分区的水位线

    protected void emitRecordsWithTimestamps(Queue<T> records,KafkaTopicPartitionState<T, KPH> partitionState,long offset,long kafkaEventTimestamp) {// emit the records, using the checkpoint lock to guarantee// atomicity of record emission and offset state updatesynchronized (checkpointLock) {T record;while ((record = records.poll()) != null) {long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);// 发送kafka记录到下游算子sourceContext.collectWithTimestamp(record, timestamp);// this might emit a watermark, so do it after emitting the record// 处理分区的水位线,记录这个分区的水位线,并在满足条件时更新整个算子任务的水位线partitionState.onEvent(record, timestamp);}partitionState.setOffset(offset);}}```3.处理每个分区的水位线```javapublic void onEvent(T event, long timestamp) {watermarkGenerator.onEvent(event, timestamp, immediateOutput);}public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next =wms.checkAndGetNextWatermark(event, eventTimestamp);if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}其中 output.emitWatermark(new Watermark(next.getTimestamp()));对应方法如下public void emitWatermark(Watermark watermark) {long timestamp = watermark.getTimestamp();// 更新每个分区对应的水位线,并且更新boolean wasUpdated = state.setWatermark(timestamp);// if it's higher than the max watermark so far we might have to update the// combined watermark 这个表明这个算子任务的最低水位线,也就是算子任务级别的水位线,而不是分区级别的了if (wasUpdated && timestamp > combinedWatermark) {updateCombinedWatermark();}}//每个分区水位线的更新如下public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;this.watermark = Math.max(watermark, this.watermark);return updated;}       

4.最后是发送算子任务级别的水位线的方法

private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit thatif (!hasOutputs) {return;}if (allIdle) {underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}

你可以看这个流程,是不是意味着如果使用Punctuated的方式,是不支持Idle空闲时间的?–答案是的

http://www.hrbkazy.com/news/50600.html

相关文章:

  • 又拍网站怎么做的推广app是什么工作
  • 校园社交网站怎么做中牟网络推广
  • 怎么用外网校内网站做英语信息推广平台有哪些
  • 做网站路由器映射外网ip营销的概念
  • 网站备案登陆用户名是什么关键词挖掘网站
  • xampp安装网站模板抖音搜索引擎优化
  • 织梦做双语网站何鹏seo
  • 网站建设的税收分类编码百度一下你就知道啦
  • 做百度移动端网站优化seo快速排名上首页
  • 资讯文章网站模板百度推广一个月多少钱
  • 大学网页制作与网站建设seo关键词优化报价
  • 运城做网站价格松原今日头条新闻
  • 廊坊建设局网站6淘宝指数查询入口
  • 网站安装系统怎么安装教程视频昆明百度推广开户
  • office文件包里的做网站软件上海推广系统
  • 电商网站在线支付怎么做流量平台
  • 建设网站那家公司好百度首页排名优化服务
  • 西安有哪些网站建设外包公司互联网广告平台排名
  • 大连网站建设佳熙科技公司seo关键词排名优化案例
  • 做企业形象网站百度推广业务员
  • 身份证被别人做网站备案最新热搜榜
  • 网站做担保交易2024年3月新冠高峰
  • 学做粤菜的网站有哪些网络公司网站模板
  • 企业独立建站网站性能优化方法
  • 电商网站源码网站怎么制作教程
  • 物流企业网站建设网络推广优化是干啥的
  • 好看的旅游网站模板下载会计培训班的费用是多少
  • 做带v头像的网站seo搜索引擎优化实训报告
  • 2023广州这俩天有疫情吗新乡百度关键词优化外包
  • 网站横幅背景图片seo网站关键词优化报价