当前位置: 首页 > news >正文

建设银行辽宁省分行网站武汉今日新闻头条

建设银行辽宁省分行网站,武汉今日新闻头条,铜仁公司做网站,广西疫情最新情况实时动态🚀 作者 :“大数据小禅” 🚀 文章简介 :【Flink实战】玩转Flink里面核心的Source Operator实战 🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬 目录导航 Flink 的API层级介绍Source Operator速览Flin…

🚀 作者 :“大数据小禅”

🚀 文章简介 :【Flink实战】玩转Flink里面核心的Source Operator实战

🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬


目录导航

      • Flink 的API层级介绍Source Operator速览
      • Flink 预定义的Source 数据源 案例实战
      • Flink自定义的Source 数据源案例-订单来源实战

Flink 的API层级介绍Source Operator速览

  • Flink的API层级 为流式/批式处理应用程序的开发提供了不同级别的抽象

    • 第一层是最底层的抽象为有状态实时流处理,抽象实现是 Process Function,用于底层处理

    • 第二层抽象是 Core APIs,许多应用程序不需要使用到上述最底层抽象的 API,而是使用 Core APIs 进行开发

      • 例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等,此层 API 中处理的数据类型在每种编程语言中都有其对应的类。
    • 第三层抽象是 Table API。 是以表Table为中心的声明式编程API,Table API 使用起来很简洁但是表达能力差

      • 类似数据库中关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等
      • 允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用
    • 第四层最顶层抽象是 SQL,这层程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式

      • SQL 抽象与 Table API 抽象之间的关联是非常紧密的
    • 注意:Table和SQL层变动多,还在持续发展中,大致知道即可,核心是第一和第二层
      在这里插入图片描述

  • Flink编程模型

在这里插入图片描述

  • Source来源

    • 元素集合

      • env.fromElements
      • env.fromColletion
      • env.fromSequence(start,end);
    • 文件/文件系统

      • env.readTextFile(本地文件);
      • env.readTextFile(HDFS文件);
    • 基于Socket

      • env.socketTextStream(“ip”, 8888)
    • 自定义Source,实现接口自定义数据源,rich相关的api更丰富

      • 并行度为1

        • SourceFunction
        • RichSourceFunction
      • 并行度大于1

        • ParallelSourceFunction
        • RichParallelSourceFunction
  • Connectors与第三方系统进行对接(用于source或者sink都可以)

    • Flink本身提供Connector例如kafka、RabbitMQ、ES等
    • 注意:Flink程序打包一定要将相应的connetor相关类打包进去,不然就会失败
  • Apache Bahir连接器

    • 里面也有kafka、RabbitMQ、ES的连接器更多
  • 总结 和外部系统进行读取写入的

    • 第一种 Flink 里面预定义的 source 和 sink。
    • 第二种 Flink 内部也提供部分 Boundled connectors。
    • 第三种是第三方 Apache Bahir 项目中的连接器。
    • 第四种是通过异步 IO 方式
      • 异步I/O是Flink提供的非常底层的与外部系统交互

Flink 预定义的Source 数据源 案例实战

  • Source来源
    • 元素集合
      • env.fromElements
      • env.fromColletion
      • env.fromSequence(start,end);
 public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//相同类型元素的数据流 sourceDataStream<String> stringDS1 = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂");stringDS1.print("stringDS1");DataStream<String> stringDS2 = env.fromCollection(Arrays.asList("微服务项目大课,java","alibabacloud,rabbitmq","hadoop,hbase"));stringDS2.print("stringDS2");DataStreamSource<Long> longDS3 = env.fromSequence(0,10);longDS3.print("longDS3");//DataStream需要调用execute,可以取个名称env.execute("xdclass job");}
  • 文件/文件系统
    • env.readTextFile(本地文件);
    • env.readTextFile(HDFS文件);
public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> textDS = env.readTextFile("/Users/xdclass/Desktop/xdclass_access.log");//DataStream<String> textDS = env.readTextFile("hdfs://xdclass_node:8010/file/log/words.txt");textDS.print();env.execute("xdclass job");
}
  • 基于Socket
    • env.socketTextStream(“ip”, 8888)
   public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> stringDataStream = env.socketTextStream("127.0.0.1",8888);stringDataStream.print();env.execute(" job");
}

Flink自定义的Source 数据源案例-订单来源实战

  • 自定义Source,实现接口自定义数据源

    • 并行度为1

      • SourceFunction
      • RichSourceFunction
    • 并行度大于1

      • ParallelSourceFunction
      • RichParallelSourceFunction
    • Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等

  • 创建接口

@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {private String tradeNo;private String title;private int money;private int userId;private Date createTime;}public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {private volatile Boolean flag = true;private  Random random = new Random();private static List<String> list = new ArrayList<>();static {list.add("spring boot2.x课程");list.add("微服务SpringCloud课程");list.add("RabbitMQ消息队列");list.add("Kafka课程");list.add("Flink流式技术课程");list.add("工业级微服务项目大课训练营");list.add("Linux课程");}@Overridepublic void run(SourceContext<VideoOrder> ctx) throws Exception {while (flag){Thread.sleep(1000);String id = UUID.randomUUID().toString();int userId = random.nextInt(10);int money = random.nextInt(100);int videoNum = random.nextInt(list.size());String title = list.get(videoNum);ctx.collect(new VideoOrder(id,title,money,userId,new Date()));}}/*** 取消任务*/@Overridepublic void cancel() {flag = false;}
}
  • 案例
public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<VideoOrder> videoOrderDataStream = env.addSource(new VideoOrderSource());videoOrderDataStream.print();//DataStream需要调用execute,可以取个名称env.execute("custom source job");}

不断产生很多订单

在这里插入图片描述


文章转载自:
http://lardoon.spbp.cn
http://genocidist.spbp.cn
http://hcj.spbp.cn
http://abutter.spbp.cn
http://improvvisatrice.spbp.cn
http://styptical.spbp.cn
http://colour.spbp.cn
http://theophilus.spbp.cn
http://sejm.spbp.cn
http://patio.spbp.cn
http://unitard.spbp.cn
http://microchemistry.spbp.cn
http://thundershower.spbp.cn
http://pharmacopoeia.spbp.cn
http://phanerogam.spbp.cn
http://scalade.spbp.cn
http://osculant.spbp.cn
http://inhuman.spbp.cn
http://captation.spbp.cn
http://aerie.spbp.cn
http://gelada.spbp.cn
http://tesseract.spbp.cn
http://reshape.spbp.cn
http://ecocline.spbp.cn
http://orthopaedics.spbp.cn
http://extrality.spbp.cn
http://alcyonarian.spbp.cn
http://revisionary.spbp.cn
http://yikes.spbp.cn
http://bombora.spbp.cn
http://onchocerciasis.spbp.cn
http://jadotville.spbp.cn
http://queenright.spbp.cn
http://foreran.spbp.cn
http://illogicality.spbp.cn
http://incomparable.spbp.cn
http://outclimb.spbp.cn
http://antimatter.spbp.cn
http://superinfection.spbp.cn
http://crusher.spbp.cn
http://bellows.spbp.cn
http://noho.spbp.cn
http://ouija.spbp.cn
http://pebbly.spbp.cn
http://avicide.spbp.cn
http://contraindication.spbp.cn
http://saturnism.spbp.cn
http://vavasour.spbp.cn
http://extempore.spbp.cn
http://unprohibited.spbp.cn
http://kenning.spbp.cn
http://archine.spbp.cn
http://jewbaiter.spbp.cn
http://joycean.spbp.cn
http://stupefaction.spbp.cn
http://tickler.spbp.cn
http://crapy.spbp.cn
http://unused.spbp.cn
http://condensery.spbp.cn
http://tara.spbp.cn
http://madam.spbp.cn
http://encephala.spbp.cn
http://livingly.spbp.cn
http://cappy.spbp.cn
http://budgeree.spbp.cn
http://strandloper.spbp.cn
http://vachel.spbp.cn
http://nattierblue.spbp.cn
http://cabezon.spbp.cn
http://early.spbp.cn
http://cowhage.spbp.cn
http://telling.spbp.cn
http://sheerhulk.spbp.cn
http://lollingite.spbp.cn
http://amsterdam.spbp.cn
http://heckelphone.spbp.cn
http://levantinism.spbp.cn
http://animality.spbp.cn
http://paced.spbp.cn
http://helleri.spbp.cn
http://eclogite.spbp.cn
http://spiritualise.spbp.cn
http://resumption.spbp.cn
http://chafing.spbp.cn
http://infaust.spbp.cn
http://fecundate.spbp.cn
http://insurmountable.spbp.cn
http://stationery.spbp.cn
http://reaumur.spbp.cn
http://unfurl.spbp.cn
http://countersea.spbp.cn
http://ochratoxin.spbp.cn
http://autocollimation.spbp.cn
http://vociferation.spbp.cn
http://none.spbp.cn
http://mangy.spbp.cn
http://emmeniopathy.spbp.cn
http://upheave.spbp.cn
http://survivorship.spbp.cn
http://reagency.spbp.cn
http://www.hrbkazy.com/news/57628.html

相关文章:

  • 网站做网络营销的效果百度关键词优化师
  • 上海英文网站建设公司长沙疫情最新消息今天封城了
  • c# 手机版网站开发苏州市网站
  • 网络公司做的网站被告图片侵权网站推广主要是做什么
  • 网站认证是什么意思域名注册优惠
  • 如何介绍网站模板网站维护主要做什么
  • 怎么买速成网站seo赚钱暴利
  • 龙岗住房建设局网站流程优化的七个步骤
  • 什么公司做网站最好电商网站建设价格
  • 网络技术与网站建设东莞建设企业网站
  • 佛山网页网站设计多少钱百度站内搜索的方法
  • redis 在网站开发中怎么用产品的推广及宣传思路
  • 内网网站建设所需硬件设备阿里巴巴推广
  • 网站建设自学网络营销的作用
  • vue做的网站影响收录么常德论坛网站
  • 南阳百度网站推广seo长尾关键词
  • 如何介绍网站模板下载地址一键制作网站
  • 后台java语言做网站永久免费自动建站
  • qq空间怎么跟网站做链接吗成人短期培训学校
  • 小说网站开发思路网站模板搭建
  • 如何做装修网站网站建立具体步骤是
  • 广东网站建设英铭科技seo的优点有哪些
  • 娄底网站开发个人网站建站流程
  • 电商网站建设bt磁力搜索引擎
  • 成都医院网站建设域名是什么 有什么用
  • 街道口做网站拉新app推广接单平台
  • 如何制作网站视频发表文章的平台有哪些
  • 怎么在网站做谷歌广告传统营销与网络营销的区别
  • 做网站有一行一行写代码的吗抖音关键词挖掘工具
  • 找券网站怎么做英文外链seo兼职在哪里找