当前位置: 首页 > news >正文

泉州网站制作google框架三件套

泉州网站制作,google框架三件套,邢台列表网,h5响应式音乐网站模板增量聚合函数 ——指窗口每进入一条数据就计算一次 例如:要计算数字之和,进去一个12 计算结果为20, 再进入一个7 ——结果为27 reduce aggregate(aggregateFunction) package com.bigdata.day04;public class _04_agg函数 {public static …

增量聚合函数

——指窗口每进入一条数据就计算一次

例如:要计算数字之和,进去一个12 计算结果为20, 再进入一个7 ——结果为27

 reduce

aggregate(aggregateFunction)

package com.bigdata.day04;public class _04_agg函数 {public static final Tuple3[] ENGLISH = new Tuple3[] {Tuple3.of("class1", "张三", 100L),Tuple3.of("class1", "李四", 40L),Tuple3.of("class1", "王五", 60L),Tuple3.of("class2", "赵六", 20L),Tuple3.of("class2", "小七", 30L),Tuple3.of("class2", "小八", 50L),};public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);// 此时我要获取每个班级的平均成绩// 输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)// IN——Tuple3<String, String, Long>// ACC——Tuple3<String, Integer,Long> 第一个是班级(key)第二个是数量,第三个是总的成绩// OUT —— Tuple2<String,Double> 第一个是班级 第二个是平均成绩dataStreamSource.countWindowAll(3).aggregate(new AggregateFunction<Tuple3<String, String, Long>, Tuple3<String, Integer,Long>, Tuple2<String,Double>>() {// 初始化一个 累加器@Overridepublic Tuple3<String, Integer, Long> createAccumulator() {return Tuple3.of(null,0,0L);}// 累加器和输入的值进行累加// Tuple3<String, String, Long> value 第一个是传入的值// Tuple3<String, Integer, Long> accumulator 第二个是累加器的值@Overridepublic Tuple3<String, Integer, Long> add(Tuple3<String, String, Long> value, Tuple3<String, Integer, Long> accumulator) {return Tuple3.of(value.f0,accumulator.f1+1,accumulator.f2+value.f2);}// 获取结果——在不同节点的结果进行汇总后实现@Overridepublic Tuple2<String, Double> getResult(Tuple3<String, Integer, Long> accumulator) {return Tuple2.of(accumulator.f0, (double) accumulator.f2 / accumulator.f1);}// 由于flink是分布式,所以在别的节点也会进行累加 ,该方法是不同节点的结果进行汇总// 即累加器之间的累加@Overridepublic Tuple3<String, Integer, Long> merge(Tuple3<String, Integer, Long> a, Tuple3<String, Integer, Long> b) {return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

sum()

min()

max()

 全量聚合函数

指在窗口触发的时候才会对窗口内的所有数据进行一次计算(等窗口的数据到齐,才开始进行聚合计算,可实现对窗口内的数据进行排序等需求)

全量聚合函数比较简单,但是会将所有的数据存放在内存中,因此会占用大量的内存空间

apply 

package com.bigdata.day04;public class _05_app函数 {public static final Tuple3[] ENGLISH = new Tuple3[] {Tuple3.of("class1", "张三", 100L),Tuple3.of("class1", "李四", 40L),Tuple3.of("class1", "王五", 60L),Tuple3.of("class2", "赵六", 20L),Tuple3.of("class2", "小七", 30L),Tuple3.of("class2", "小八", 50L),};public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);//2. source-加载数据dataStreamSource.countWindowAll(3).apply(new AllWindowFunction<Tuple3<String,String,Long>, Tuple2<String,Double>, GlobalWindow>() {@Overridepublic void apply(GlobalWindow window, Iterable<Tuple3<String, String, Long>> values, Collector<Tuple2<String,Double>> out) throws Exception {Long sum = 0L;int length = 0;String key = null;for (Tuple3<String, String, Long> value : values) {sum += value.f2;length++;key = value.f0;}out.collect(Tuple2.of(key,(double) sum/length));}}).print();env.execute();}
}// 总结// 接口
new AllWindowFunction<Tuple3<String,String,Long>, Tuple2<String,Double>, GlobalWindow>()
GlobalWindow 窗口对象  Tuple3<String,String,Long> 传入的值  Tuple2<String,Double> 结果// 重写的方法
public void apply(GlobalWindow window, Iterable<Tuple3<String, String, Long>> values, Collector<Tuple2<String,Double>> out) Iterable<Tuple3<String, String, Long>> values 传入值的迭代器 进行遍历
Collector<Tuple2<String,Double>> out 收集器 调用collect方法收集即可
window 窗口对象//使用窗口对象我们可以拿到窗口的起始时间long start = window.getStart();long end = window.getEnd();

process

使用方式一:在connect合流之后对两个类型不同的流进行处理

使用方式二:在分流的时候使用,可以通过context.output方法对每个数据添加一个标签

 使用方式一
new CoProcessFunction<Long, String, String>()  
// 第一个泛型是第一个流的类型 第二个泛型是第二个流的类型  第三个泛型是合并后流的类型@Overridepublic void processElement1(Long l, CoProcessFunction<Long, String, String>.Context context, Collector<String> collector) throws Exception {// Long 是数据类型 结果使用collector中的collect 收集collector.collect(String.valueOf(l));}@Overridepublic void processElement2(String s, CoProcessFunction<Long, String, String>.Context context, Collector<String> collector) throws Exception {//  String 是数据类型 结果使用collector中的collect 收集collector.collect(s);}
使用方式二
此时使用的是context中的context.output(odd, element); 方法
odd 是标签
element 是元素OutputTag<Long> odd = new OutputTag<>("奇数",TypeInformation.of(Long.class));
OutputTag<Long> even = new OutputTag<>("偶数", TypeInformation.of(Long.class));

http://www.hrbkazy.com/news/46779.html

相关文章:

  • 威海做网站哪家好win10优化大师免费版
  • 福州网站设计公司seo公司运营
  • 免费做网站教程营销广告
  • 招聘网站建设方案模板引擎搜索是什么意思
  • 丽水建设局网站文件实时新闻热点
  • 免费个人网站空间web前端培训费用大概多少
  • ppt做的最好的网站app开发公司
  • 临朐网站建设建站百度关键词优化手段
  • 深圳建设网站排名网站设计培训
  • 用asp.net做校园网站怎样做自己的网站
  • dw制作简单网站模板下载地址bt磁力猪
  • wordpress纯静态网站免费发布产品的平台
  • 成都网站建设定制开发服务临沂网站建设优化
  • 网站换模板要怎么做网站前期推广
  • 做网站的心得陕西优化疫情防控措施
  • 多语言网站制作2021年新闻摘抄
  • 有做soho网站的吗重庆排名优化整站优化
  • 单页的网站怎么做的怎样策划一个营销型网站
  • 淘客怎么做网站单页备案查询
  • 嘉兴的网站设计公司有哪些东莞网站建设推广哪家好
  • 阜宁做网站的公司电话湖人最新消息
  • 网络设计报告提纲范文爱站seo工具包
  • 东莞南城网站开发公司电话十堰seo排名公司
  • 安徽省建设厅网站怎么进不去好的seo平台
  • 引擎搜索个人做seo怎么赚钱
  • 东营哪里有做网络推广的网络seo营销推广
  • 用jsp加点mvc做网站怎么样北京seo优化厂家
  • 网站建设公司做销售好不好百度导航下载2021最新版
  • 没有网站没有推广如何做外贸关键词分为哪三类
  • 怎么做网站源代码seo的基本工作内容