当前位置: 首页 > news >正文

蔚县网站建设广东近期新闻

蔚县网站建设,广东近期新闻,视频网站中滑动列表怎么做的,徐州建筑工程招投标网站一、说明 基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行. Context和OnTimerContext所持有的TimerService对象拥有以下方法: currentProcessingTime(): Long 返回当前处理时间 currentWatermark(): Long 返回当前watermark的时间戳 registe…

一、说明

基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行.
Context和OnTimerContext所持有的TimerService对象拥有以下方法:
currentProcessingTime(): Long 返回当前处理时间
currentWatermark(): Long 返回当前watermark的时间戳
registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。
registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。

二、基于处理时间的定时器

package com.lyh.flink08;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class ProcessTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999).map(line -> {String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));});stream.keyBy(WaterSensor::getId).process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value,Context ctx,Collector<String> out) throws Exception {ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);out.collect(value.toString());}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {System.out.println(timestamp);out.collect("wo be chu fa le ");}}).print();env.execute();}
}

三、基于事件时间的定时器

package com.lyh.flink08;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;public class EventTime_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999).map(line -> {String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));});WatermarkStrategy<WaterSensor> wms = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element,recordTimestamp) -> element.getTs() * 1000);stream.assignTimestampsAndWatermarks(wms).keyBy(WaterSensor::getId).process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value,Context ctx,Collector<String> out) throws Exception {System.out.println(ctx.timestamp());ctx.timerService().registerProcessingTimeTimer(ctx.timestamp()+5000);out.collect(value.toString());}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {System.out.println("定时器被触发了");}}).print();env.execute();}
}
http://www.hrbkazy.com/news/28386.html

相关文章:

  • 广州商城网站建设报价百度seo技术
  • 网站选项按钮淘宝推广公司
  • 如何建设情趣用品网站适合交换友情链接的是
  • 网站里可以添加视频做背景吗软文广告经典案例300字
  • 找券网站怎么做马鞍山seo
  • wordpress mono-lab淘宝seo什么意思
  • 如何建好一个网站网络营销和直播电商专业学什么
  • 建站公司获客成本无锡百度推广平台
  • 9w域名上海优化外包公司排名
  • 大连集团网站建设国内搜索引擎排名第一的是
  • 建设银行网站怎么设置转账额度十大营销策略
  • asp网站制作软件网站建设报价明细表
  • 网站建设的大公司排名万词霸屏百度推广seo
  • 建网站 域名营销团队找产品合作
  • 上海网站建设升千网推软文推广平台
  • 杭州本地网站有哪些奉化云优化seo
  • 无锡企业网站排名分销平台
  • 南京明辉建设集团有限公司网站seo教程培训
  • 网站建设资料总结如何在百度发布文章
  • 东莞网站推广团队广告关键词
  • 做网站一定要注册公司吗附近成人电脑培训班
  • 建行企业网站百度联系电话
  • 设计导航网站 左侧菜单栏外链屏蔽逐步解除
  • 做网站开发需要学什么软件百度普通版下载
  • 徐州编程培训机构海南seo
  • 国外专门做图像增强的网站长春网站优化页面
  • termux wordpress关键词排名优化教程
  • 网站建设好弄吗西安seo霸屏
  • 红色logo做网站如何利用网络广告进行推广
  • 做网站服务器是什么杭州百度竞价推广公司