当前位置: 首页 > news >正文

几何印花图案设计网站什么是网络营销?

几何印花图案设计网站,什么是网络营销?,网站正在建设中 htmll,icp备案查看网站内容吗文章目录 在RuntimeContext 中声明键值分区状态通过ListCheckPonitend 接口实现算子列表状态使用CheckpointedFunction接口接收检查点完成通知参考文档 在RuntimeContext 中声明键值分区状态 Flink为键值分区状态(Keyed State)提供了几种不同的原语&…

文章目录

  • 在RuntimeContext 中声明键值分区状态
  • 通过ListCheckPonitend 接口实现算子列表状态
  • 使用CheckpointedFunction接口
  • 接收检查点完成通知
  • 参考文档

在RuntimeContext 中声明键值分区状态

Flink为键值分区状态(Keyed State)提供了几种不同的原语(数据类型)。这是因为不同的算法和操作可能需要管理不同类型的状态。其中一些原语包括:

  1. ValueState: 这种状态类型用于存储单个的,可能更新的值。常见的用途包括存储计数器或聚合。

  2. ListState: 这种状态用于存储一组元素(通常是元素的长列表)。借助此状态,可以简单地追加元素和迭代所有元素。

  3. ReducingStateAggregatingState<IN, OUT>: 这两种状态都用于合并元素,通常在窗口操作中使用。

    • ReducingState:将添加的元素与现有元素通过reduce函数进行合并,最后只会保留一个元素,即合并的结果。

    • AggregatingState:与ReducingState类似,但是其可以存储转换后的聚合结果,而不是输入元素。

  4. MapState<UK, UV>: 这种状态类型存储一个key-value映射。

要使用某一类型的 keyed state,需要提供一个 StateDescriptor,用于声明状态的名称和类型。然后可以通过 RuntimeContext 获取状态。

这些状态类型都是接口,并将存储后端(Flink提供了内存和RocksDB两种用于存储状态的后端)的具体实现细节隔离出来,因此用户可以不用关心状态是如何存储和访问的。

Flink 的键控状态使我们能够通过简单的API调用,就能够很自然地处理键控数据流,我们只需要关心特定键的当前事件和状态,Flink 框架会自动地处理状态的分布式存储和故障恢复等

我们需要了解在 Flink 中,RuntimeContext 提供访问在运行期间的任务 (比如 Map、Reduce 或 Filter function) 可以访问的上下文信息,例如任务的并行度,任务名称,任务 ID,输入和输出信号等。此外,RuntimeContext 还为用户代码提供了生成和维护分布式累加器和键值状态的方法。

在 Apache Flink 中,键值状态(Keyed State)是一种类型的状态,它是以 key 为中心的。每一个 key 都可以对应一个状态。我们可以在 Flink 算子的open()方法中通过 RuntimeContext 获取和初始化它。

举个例子,假设我们正在构建一个实时的网络游戏分析系统,我们可能关注每位玩家的实时得分,这个得分基于他们在游戏中执行的动作(例如完成一项任务,击败一个敌人等)。在这个场景中,每个玩家的ID就是一个 "键",同时他们的游戏得分就是与键关联的 "状态"。当玩家在游戏中执行动作时,我们需要调整他们的分数状态

然后,我们的 Flink 代码可以定义一个 RichMapFunction 来维护每个玩家的分数状态:

public class PlayerScoreFunction extends RichMapFunction<GameEvent, Tuple2<String, Long>> {// 定义键控状态private transient ValueState<Long> scoreState;@Overridepublic void open(Configuration params) throws Exception {ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("playerScore", // 状态的名称TypeInformation.of(new TypeHint<Long>() {}),0L); // 默认值scoreState = getRuntimeContext().getState(descriptor);}@Overridepublic Tuple2<String, Long> map(GameEvent gameEvent) throws Exception {// update the statelong currentScore = scoreState.value();currentScore += gameEvent.getScore();scoreState.update(currentScore);// return the updated scorereturn new Tuple2<>(gameEvent.getPlayerId(), currentScore);}
}

在这个例子中,PlayerScoreFunction 接收 GameEvent 流,这是玩家在游戏中的各种动作生成的事件。我们将玩家的 ID 作为键来处理这个流。通过 getRuntimeContext().getState(descriptor) 我们获得了状态。然后我们在每次新的 GameEvent 到来时,根据事件中的分数增量用 scoreState.update(currentScore) 更新状态,然后将更新后的得分以及玩家的 ID 一起输出给下一个算子,例如,连接到实时的游戏分数仪表盘,将每个玩家的最新得分显示给观众看。

通过ListCheckPonitend 接口实现算子列表状态

算子状态(Operator State)在流处理系统(比如 Apache Flink)中,是一种特殊类型的状态,针对的是整个算子,而不是特定的键值。它存储的是某一特定算子的所有记录的全局信息。

算子状态的维护主要包括以下步骤:

  1. 定义算子状态:首先,我们需要在处理函数中定义一个或多个算子状态。我们可以指定算子状态的名字,并定义它存储的数据类型。

  2. 读取和写入算子状态:一旦定义了算子状态,我们就可以在流处理函数中对它进行读取和写入。读取算子状态通常在需要根据状态信息做出处理决策时进行。写入算子状态通常在我们需要更新状态信息时进行。

  3. 保持状态一致:为了保持状态的一致性,我们需要定期将算子状态进行快照(Snapshot)并保存到远程存储系统中。在系统中断后,我们可以从最新的快照恢复算子状态。

  4. 状态恢复:在系统中断后,我们可以使用保存的快照恢复算子状态,恢复流处理的执行。

维护算子状态的方法可能会根据具体的流处理系统有所不同,但基本原理是相同的。这四步是维护算子状态的基本过程。

在 Flink 中,ListState 是 CheckpointedState 的一种。ListState 可以为每一条数据保存不止一个值,也就是说,所有的数据都会添加到该状态中。在故障恢复时,这些元素按添加的顺序重放。我们从 CheckpointedFunctionListCheckpointed 接口的抽象类型继承,然后实现 snapshotStaterestoreState 方法,以完成状态恢复。

具体来说,如果我们想使用 ListCheckpointed 接口实现算子列表状态,可以参考以下的代码:

我们每次接收到未序列化的 String 类型的数值,就把它转成 Integer 类型存储在一个列表(List)中。在每个 Checkpoint 操作当中,通过 snapshotState 方法进行状态的快照并返回。当故障发生后,Flink 会调用 restoreState 方法将状态恢复回来。

如果算子是并行的,Flink 会为每一个子任务调用 restoreState 方法,并在算子的每个子任务中创建一个新的列表状态实例。在故障后进行状态恢复时,Flink 将提取快照并将其分发到每个子任务。

public class ListStateFunction extends RichMapFunction<String, Integer> implements ListCheckpointed<Integer> {private List<Integer> bufferElements;public ListStateFunction(){this.bufferElements = new ArrayList<>();}@Overridepublic Integer map(String value) throws Exception {int parsedValue = Integer.parseInt(value);bufferElements.add(parsedValue);return bufferElements.size();}// 每次 checkpoint 时,将缓存的元素进行快照@Overridepublic List<Integer> snapshotState(long checkpointId, long timestamp)  {return this.bufferElements;}// 从存储中恢复状态@Overridepublic void restoreState(List<Integer> state) {this.bufferElements.addAll(state);}
}

使用 ListCheckpointed 还是 CheckpointedFunction 取决于特定的需求和上下文,两者在功能上是相似的,但 CheckpointedFunction 提供了更多的灵活性,可以让你自己决定如何存储和恢复状态以及存储于哪种类型的状态后端。

使用CheckpointedFunction接口

Apache Flink提供了一个特殊的接口CheckpointedFunction,可以在自定义函数中使用它来操作和管理算子状态。这个接口会在检查点(checkpoint)操作时触发,允许访问和编辑操作员状态。

h使用CheckpointedFunction的例子:

public class CountWithCheckpoint implements CheckpointedFunction, MapFunction<Long, Long> {private transient ValueState<Long> counter;@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("counter", TypeInformation.of(new TypeHint<Long>() {}));counter = getRuntimeContext().getState(descriptor);}@Overridepublic Long map(Long value) throws Exception {Long currentCount = counter.value();Long newCount = currentCount == null ? 1L : currentCount + 1;counter.update(newCount);return newCount;}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {counter.clear();}
}

此示例创建一个计数但在每个检查点清空的函数。initializeState()方法会在各种生命周期事件(例如,开始和恢复)时调用并初始化状态变量。然后在map()方法中,状态被更新。snapshotState()在checkpoint操作时触发,这里我们仅清空状态,无任何持久化操作。

在操作和维护算子状态时,我们需要考虑状态的一致性和恢复,以处理可能的故障和中断。实际中可能会对snapshotState()方法更复杂的逻辑,比如将状态存储至远端。

接收检查点完成通知

在Apache Flink中,当所有任务成功从接头位置创建检查点后,作业管理器将坐标控制条以通知所有任务检查点的成功完成。然后,所有任务都会得到一个新的检查点的完成通知。

如果要接收这样的通知并对其做出反应,可以让你的RichFunction实现CheckpointListener接口。以下是一个基本示例:

函数使用ListState进行状态管理,每个接收到的元素都会被添加到状态中。并且,我们实现了notifyCheckpointComplete(long checkpointId)函数,以便在每次成功完成检查点后接收到通知。这个函数里你可以进行一些操作如清除状态、更新外部系统等。

触发的notifyCheckpointComplete方法是在下一次checkpoint发生在Task周的快照操作之前,具体的实现要根据你的检查点配置和故障恢复能力进行规划。

public class MyFunction extends RichMapFunction<Long, Long> implements CheckpointListener {private transient ListState<Long> checkpointedState;@Overridepublic void open(Configuration parameters) throws Exception {ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("state", Long.class);checkpointedState = getRuntimeContext().getListState(descriptor);}@Overridepublic Long map(Long value) throws Exception {checkpointedState.add(value);return value;}@Overridepublic void notifyCheckpointComplete(long checkpointId) throws Exception {// 监听到检查点成功完成的通知,此处可以进行相关逻辑处理}
}

参考文档

  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/
http://www.hrbkazy.com/news/16041.html

相关文章:

  • 南昌制作网站软件教育培训网站模板
  • 重庆永川网站建设价格广东云浮疫情最新情况
  • 做计算机题的网站链接优化方法
  • 360未经证实的网站如何做百度世界排名
  • 网站建设那家公司好遵义网站seo
  • 如何查看网站页面大小快速排名工具免费查询
  • 范县网站建设公司站长统计 网站统计
  • 金湖县网站建设怎么上百度搜索
  • 高校网络网站建设意义及措施湖南seo网站开发
  • 兰州做网站价格网站流量统计分析报告
  • 泉州网站建设武汉seo网站推广培训
  • 盱眙在仕德伟做网站的有几家百度最新财报
  • 网站开发图标下载抖音关键词排名
  • 荥阳网站推广seo引擎优化怎么做
  • 微信网站开发制作公司搜索引擎营销有哪些
  • 邯郸做移动网站的公司快速网络推广
  • 做天猫转让网站友情链接交换方式有哪些
  • 商会网站制作武汉电脑培训学校有哪些
  • WordPress小说漫画主题国外独立站seo外链平台
  • 惠州做网站的公司有哪些seo人员培训
  • 创意网站建设设计seo整站优化推广
  • 商贸信息网站互联网项目推广
  • 网站做加QQ群链接百度网站禁止访问怎么解除
  • 个人网站备案网址导航制作网页用什么软件
  • 重庆网站seo公司哪家好海南seo代理加盟供应商
  • 中国建设网站培训通知想建立自己的网站怎么建立
  • 如何删除网站的信息吗bt种子磁力搜索
  • 深圳 手机网站建设免费的网络推广渠道
  • 怎么用flash做游戏下载网站快速排名刷
  • 怎样看出一个网站是那个公司做的nba最新交易一览表