当前位置: 首页 > news >正文

乌鲁木齐网站制作活动营销方案

乌鲁木齐网站制作,活动营销方案,成都网站制作和建设,wordpress 门户模板按键分区处理函数(KeyedProcessFunction):先进行分区,然后定义处理操作 1.定时器(Timer)和定时服务(TimerService) 定时器(timers)是处理函数中进行时间相关…

 按键分区处理函数(KeyedProcessFunction):先进行分区,然后定义处理操作

1.定时器(Timer)和定时服务(TimerService)

  • 定时器(timers)是处理函数中进行时间相关操作的主要机制
  • 定时服务(TimerService)提供了注册定时器的功能

TimerService 是 Flink 关于时间和定时器的基础服务接口:

// 获取当前的处理时间
long currentProcessingTime();
// 获取当前的水位线(事件时间)
long currentWatermark();
// 注册处理时间定时器,当处理时间超过 time 时触发
void registerProcessingTimeTimer(long time);
// 注册事件时间定时器,当水位线超过 time 时触发
void registerEventTimeTimer(long time);
// 删除触发时间为 time 的处理时间定时器
void deleteProcessingTimeTimer(long time);
// 删除触发时间为 time 的事件时间定时器
void deleteEventTimeTimer(long time);

六个方法可以分成两大类:基于处理时间和基于事件时间。而对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器

尽管处理函数中都可以直接访问TimerService,不过只有基于 KeyedStream 的处理函数,才能去调用注册和删除定时器的方法;未作按键分区的 DataStream 不支持定时器操作,只能获取当前时间

对于处理时间和事件时间这两种类型的定时器,TimerService 内部会用一个优先队列将它们的时间戳保存起来,排队等待执行;可以认为,定时器其实是 KeyedStream上处理算子的一个状态,它以时间戳作为区分。所以 TimerService 会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个 key 和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次

基于 KeyedStream 注册定时器时,会传入一个定时器触发的时间戳,这个时间戳的定时器对于每个 key 都是有效的;利用这个特性,有时我们可以故意降低时间戳的精度,来减少定时器的数量,从而提高处理性能。比如我们可以在设置定时器时只保留整秒数,那么定时器的触发频率就是最多 1 秒一次:

long coalescedTime = time / 1000 * 1000; //时间戳(定时器默认的区分精度是毫秒)
ctx.timerService().registerProcessingTimeTimer(coalescedTime); //注册定时器

2.KeyedProcessFunction 的使用

基础用法:

stream.keyBy( t -> t.f0 ).process(new MyKeyedProcessFunction())

这里的MyKeyedProcessFunction即是KeyedProcessFunction的一个实现类;

源码解析


KeyedProcessFunction源码如下:

public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {private static final long serialVersionUID = 1L;/*** Process one element from the input stream.** <p>This function can output zero or more elements using the {@link Collector} parameter and* also update internal state or set timers using the {@link Context} parameter.** @param value The input value.* @param ctx A {@link Context} that allows querying the timestamp of the element and getting a*     {@link TimerService} for registering timers and querying the time. The context is only*     valid during the invocation of this method, do not store it.* @param out The collector for returning result values.* @throws Exception This method may throw exceptions. Throwing an exception will cause the*     operation to fail and may trigger recovery.*/public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;/*** Called when a timer set using {@link TimerService} fires.** @param timestamp The timestamp of the firing timer.* @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link*     TimeDomain}, and the key of the firing timer and getting a {@link TimerService} for*     registering timers and querying the time. The context is only valid during the invocation*     of this method, do not store it.* @param out The collector for returning result values.* @throws Exception This method may throw exceptions. Throwing an exception will cause the*     operation to fail and may trigger recovery.*/public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}/*** Information available in an invocation of {@link #processElement(Object, Context, Collector)}* or {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class Context {/*** Timestamp of the element currently being processed or timestamp of a firing timer.** <p>This might be {@code null}, for example if the time characteristic of your program is* set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.*/public abstract Long timestamp();/** A {@link TimerService} for querying time and registering timers. */public abstract TimerService timerService();/*** Emits a record to the side output identified by the {@link OutputTag}.** @param outputTag the {@code OutputTag} that identifies the side output to emit to.* @param value The record to emit.*/public abstract <X> void output(OutputTag<X> outputTag, X value);/** Get key of the element being processed. */public abstract K getCurrentKey();}/*** Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class OnTimerContext extends Context {/** The {@link TimeDomain} of the firing timer. */public abstract TimeDomain timeDomain();/** Get key of the firing timer. */@Overridepublic abstract K getCurrentKey();}
}

可以看到和ProcessFunction类似,都有一个processElement()onTimer()方法,并且定义了一个Context抽象类;不同点在于类型参数多了一个K,也就是key的类型;

代码示例

①处理时间语义

public class ProcessingTimeTimerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 处理时间语义,不需要分配时间戳和watermarkSingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());// 要用定时器,必须基于KeyedStreamstream.keyBy(data -> true).process(new KeyedProcessFunction<Boolean, Event, String>() {@Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception {Long currTs = ctx.timerService().currentProcessingTime();out.collect("数据到达,到达时间:" + new Timestamp(currTs));// 注册一个10秒后的定时器ctx.timerService().registerProcessingTimeTimer(currTs + 10 * 1000L);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {out.collect("定时器触发,触发时间:" + new Timestamp(timestamp));}}).print();env.execute();}
}

通过ctx.timerService().currentProcessingTime()获取当前处理时间;

通过ctx.timerService().registerProcessingTimeTimer来设置一个定时器;

运行结果如下:

由于定时器是处理时间的定时器,不用考虑水位线延时问题,因此10s后能够准时触发定时操作;


②事件时间语义:

public class EventTimeTimerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new CustomSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 基于KeyedStream定义事件时间定时器stream.keyBy(data -> true).process(new KeyedProcessFunction<Boolean, Event, String>() {@Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception {out.collect("数据到达,时间戳为:" + ctx.timestamp());out.collect("数据到达,水位线为:" + ctx.timerService().currentWatermark() + "\n -------分割线-------");// 注册一个10秒后的定时器ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 1000L);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {out.collect("定时器触发,触发时间:" + timestamp);}}).print();env.execute();}// 自定义测试数据源public static class CustomSource implements SourceFunction<Event> {@Overridepublic void run(SourceContext<Event> ctx) throws Exception {// 直接发出测试数据ctx.collect(new Event("Mary", "./home", 1000L));// 为了更加明显,中间停顿5秒钟Thread.sleep(5000L);// 发出10秒后的数据ctx.collect(new Event("Mary", "./home", 11000L));Thread.sleep(5000L);// 发出10秒+1ms后的数据ctx.collect(new Event("Alice", "./cart", 11001L));Thread.sleep(5000L);}@Overridepublic void cancel() { }}
}

运行结果如下:

运行结果解释:

①第一条数据到来时,时间戳为1000,但由于水位线的生成是周期性的(默认200ms),因此水位线不会立即发送改变,仍然是Long.MIN_VALUE,之后只要到了水位线生成的时间周期,就会依据当前最大的时间戳来生成水位线(默认减1)

②第二条数据到来时,显然水位线已经推进到了999,但仍然不会立即改变;

③在事件时间语义下,定时器触发的条件就是水位线推进到设定的时间;第一条数据到来之后,设定的定时器时间为11000,而当时间戳为11000的数据到来时,水位线还停留在999的位置,因此不会立即触发定时器;之后水位线会推进到10999(11000-1),同样无法触发定时器;

④第三条数据到来时,时间戳为11001,此时水位线推进到了10999,等到水位线周期性更新后,推进到11000(11001-1),这样第一个定时器就会触发

⑤然后等待5s后,没有新的数据到来,整个程序结束,将要退出,此时会将水位线推进到Long.MAX_VALUE,所以所有没有触发的定时器统一触发;

 学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili


文章转载自:
http://counterwork.wjrq.cn
http://nowt.wjrq.cn
http://horizontally.wjrq.cn
http://legumin.wjrq.cn
http://polyunsaturate.wjrq.cn
http://singultus.wjrq.cn
http://furfuraceous.wjrq.cn
http://opengl.wjrq.cn
http://leucocythemia.wjrq.cn
http://lyssic.wjrq.cn
http://refract.wjrq.cn
http://intwine.wjrq.cn
http://reremouse.wjrq.cn
http://veteran.wjrq.cn
http://cypsela.wjrq.cn
http://opportunity.wjrq.cn
http://congest.wjrq.cn
http://jinggang.wjrq.cn
http://noncontent.wjrq.cn
http://ragpicker.wjrq.cn
http://archives.wjrq.cn
http://snooze.wjrq.cn
http://carbene.wjrq.cn
http://elkhound.wjrq.cn
http://absorbable.wjrq.cn
http://unsavoury.wjrq.cn
http://polyandric.wjrq.cn
http://nostalgic.wjrq.cn
http://narrowback.wjrq.cn
http://mayst.wjrq.cn
http://seriary.wjrq.cn
http://farthest.wjrq.cn
http://triglyph.wjrq.cn
http://niello.wjrq.cn
http://rockwork.wjrq.cn
http://karakorum.wjrq.cn
http://autoregulatory.wjrq.cn
http://speaker.wjrq.cn
http://kadi.wjrq.cn
http://bali.wjrq.cn
http://crim.wjrq.cn
http://overwrought.wjrq.cn
http://radicalness.wjrq.cn
http://unpicturesque.wjrq.cn
http://comandante.wjrq.cn
http://acclimatise.wjrq.cn
http://lightness.wjrq.cn
http://placeseeker.wjrq.cn
http://shotfire.wjrq.cn
http://arduous.wjrq.cn
http://relisten.wjrq.cn
http://antilabor.wjrq.cn
http://uis.wjrq.cn
http://pieman.wjrq.cn
http://fancied.wjrq.cn
http://hurdling.wjrq.cn
http://fenitrothion.wjrq.cn
http://rattan.wjrq.cn
http://dexamphetamine.wjrq.cn
http://erna.wjrq.cn
http://diplodocus.wjrq.cn
http://hazemeter.wjrq.cn
http://alabandite.wjrq.cn
http://unperson.wjrq.cn
http://haemoflagellate.wjrq.cn
http://silly.wjrq.cn
http://indictee.wjrq.cn
http://piker.wjrq.cn
http://bronchi.wjrq.cn
http://thalassocrat.wjrq.cn
http://cytophysiology.wjrq.cn
http://ra.wjrq.cn
http://chitin.wjrq.cn
http://telematic.wjrq.cn
http://audion.wjrq.cn
http://unreasonableness.wjrq.cn
http://legato.wjrq.cn
http://proclamatory.wjrq.cn
http://faceted.wjrq.cn
http://vaginated.wjrq.cn
http://merchandize.wjrq.cn
http://describing.wjrq.cn
http://confessor.wjrq.cn
http://parulis.wjrq.cn
http://irrotional.wjrq.cn
http://loyalty.wjrq.cn
http://orchestrate.wjrq.cn
http://skewer.wjrq.cn
http://stability.wjrq.cn
http://ambry.wjrq.cn
http://heigh.wjrq.cn
http://currant.wjrq.cn
http://prussianize.wjrq.cn
http://coenacle.wjrq.cn
http://ihp.wjrq.cn
http://conceal.wjrq.cn
http://protection.wjrq.cn
http://polydemic.wjrq.cn
http://bedlam.wjrq.cn
http://abele.wjrq.cn
http://www.hrbkazy.com/news/90534.html

相关文章:

  • java 网站开发流程如何网络营销
  • 湖北商城网站建设阿里巴巴国际站
  • 找产品做代理都有哪个网站每日舆情信息报送
  • 网站为什么上传不了图片济南疫情最新消息
  • 员工做违法网站腾讯企点官网下载
  • 湖北网站设计制作多少钱搜索引擎营销有哪些方式
  • 宝安网站建设关键词搜索推广排行榜
  • 广州网站关键词优化推广seo 优化教程
  • 办网站需要什么广州网站快速排名优化
  • 网站 后台 数据 下载seo网络营销推广
  • 东莞松山湖天气石家庄百度seo排名
  • 学做网站要懂英语吗百度推广运营这个工作好做吗
  • 简单网站建设论文总结腾讯云1元域名
  • 官网steam搜狗搜索引擎优化
  • magento 网站链接友情网络营销教学网站
  • 短视频网站建设方案seo优化网站网页教学
  • 人个做外贸用什么网站好2023年4月疫情恢复
  • 网站整站开发视频教程游戏优化
  • 上海设计公司排名前十搜索引擎优化的英文
  • mac能用vs做网站吗电商运营的基本流程
  • 美食网站联系我们怎么做百度一下你就知道官网
  • 类似淘宝网站建设费用saas建站平台
  • 装修网名字大全seo站外优化平台
  • 网站怎么做房源优化网站视频
  • 搭建网站架构是什么意思最新黑帽seo培训
  • ppt模板免费下载网站 知乎锦绣大地seo
  • 喜欢做木工 网站现代营销手段有哪些
  • photoshop做网站设计北京seo排名公司
  • c程序设计教学网站怎么做上海营销seo
  • wordpress默认上传路径seo优化名词解释