当前位置: 首页 > news >正文

建设农家书屋官方网站管理人员课程培训

建设农家书屋官方网站,管理人员课程培训,查信息的网站有哪些,亚马逊品牌备案的网站怎么做一、简介 flink 自定义实时数据源使用流处理比较简单,比如 Kafka、MQ 等,如果使用 MySQL、redis 批处理也比较简单 如果需要定时加载数据作为 flink 数据源使用流处理,比如定时从 mysql 或者 redis 获取一批数据,传入 flink 做处…

一、简介

flink 自定义实时数据源使用流处理比较简单,比如 Kafka、MQ 等,如果使用 MySQL、redis 批处理也比较简单

如果需要定时加载数据作为 flink 数据源使用流处理,比如定时从 mysql 或者 redis 获取一批数据,传入 flink 做处理,如下简单实现

二、pom.xml 文件

注意 flink 好多包从 1.15.0 开始不需要指定 Scala 版本,内部自带
在这里插入图片描述

下面 pom 文件有 flink 两个版本 1.16.0 和 1.12.7(Scala:2.12)

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.ye</groupId><artifactId>flink-study</artifactId><version>0.1</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.16.0</flink.version><!--<flink.version>1.12.7</flink.version>--><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
<!--<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency>--><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><artifactSet><excludes><exclude>org.apache.flink:flink-shaded-force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.ye.DataStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
</project>

三、自定义数据源

使用 Timer 定时任务(当然也可以使用线程池 Executors)自定义数据源,每过五秒随机生成一串字符串

public class TimerSinkRich extends RichSourceFunction<String> {private  ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();private boolean flag = true;private Timer timer;private TimerTask timerTask;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);timerTask = new TimerTask() {@Overridepublic void run() {// 可以在这块获取 MySQL、redis 等连接并查询数据Random random = new Random();StringBuilder str = new StringBuilder();for (int i = 0; i < 10; i++) {char ranLowLetter = (char) ((random.nextInt(26) + 97));str.append(ranLowLetter);}queue.add(str.toString());}};timer = new Timer();// 延时和执行周期参数可以通过构造方法传递timer.schedule(timerTask,1000,5000);}@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (flag){if(queue.size()>0){ctx.collect(queue.remove());}}}@Overridepublic void cancel() {if(null!=timer) timer.cancel();if(null!=timerTask) timerTask.cancel();// 撤销任务时,flink 默认 30 s(不同 flink 版本可能不同)尝试关闭数据源,关闭失败 TaskManager 不能释放 slot,最终导致失败if(queue.size()<=0) flag = false;}
}

四、flink 加载数据源并启动

public class TimerSinkStreamJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();executionEnvironment.setParallelism(1);DataStreamSource<String> streamSource = executionEnvironment.addSource(new TimerSinkRich());streamSource.print();executionEnvironment.execute("TimerSinkStreamJob 定时任务打印数据");}
}

本地测试成功

在这里插入图片描述

五、上传 flink 集群

1、flink 1.16.0

启动成功
在这里插入图片描述
撤销任务成功
在这里插入图片描述
solt 也成功释放
在这里插入图片描述

2、flink 1.12.7

启动成功
在这里插入图片描述
撤销任务当然也没问题,同样能正常释放 slot
在这里插入图片描述

当然你也可以不要 open() 方法

public class DiySinkRich extends RichSourceFunction<String> {private TimerTask timerTask;private Timer timer;private boolean flag = true;private ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();@Overridepublic void run(SourceFunction.SourceContext<String> ctx) throws Exception {timerTask = new TimerTask() {@Overridepublic void run() {Random random = new Random();StringBuilder str = new StringBuilder();for (int i = 0; i < 10; i++) {char ranLowLetter = (char) ((random.nextInt(26) + 97));str.append(ranLowLetter);}queue.add(str.toString());}};timer = new Timer();timer.schedule(timerTask, 1000, 5000);while (flag) {if (queue.size() > 0) {ctx.collect(queue.remove());}}}@Overridepublic void cancel() {if (timer != null) timer.cancel();if (timerTask != null) timerTask.cancel();if (queue.size() == 0) flag = false;}
}

以上就是 flink 定时加载数据源的简单实例

http://www.hrbkazy.com/news/12368.html

相关文章:

  • 吉林市做网站公司国外常用的seo站长工具
  • 国外设计网站door网络营销策略有哪五种
  • b2c网站建设需要的条件看广告赚钱
  • 柳市做网站建设谷歌chrome官网
  • 漯河网站建设费用真人seo点击平台
  • 泉州最专业微信网站建设开发百度搜索词排名
  • 摄影设计seo的优化流程
  • 现在c 做网站用什么软件seo搜索引擎推广什么意思
  • 网站被挂黑后果镇江网站定制
  • 俄罗斯网站开发惠州seo外包费用
  • php制作网站拼多多关键词排名在哪里看
  • 网站规划和建设方案b站视频怎么快速推广
  • 公安部的网站备案互联网广告投放公司
  • 小企业做网站多少钱搜索关键词排行榜
  • wordpress超详细教程视频教程界首网站优化公司
  • 制作网站需要哪些技术人员重庆seo教程搜索引擎优化
  • wordpress带样式备份seo关键词排名优化工具
  • 网上做打字任务的网站打开网站搜索
  • 网站编程培训机构网址导航推广
  • 沈阳人流哪个医院好安全口碑优化
  • android开发环境搭建好的seo公司营销网
  • 怎么制作黄色网站无锡百度正规公司
  • 龙岩网站设计竞价推广账户托管服务
  • 电信网站备案委托书广告推广宣传
  • ftp怎么做网站的备份网络营销与直播电商专业介绍
  • flash网站轮播广告怎么做百度关键词搜索量
  • 免费搭建网站教程搜索引擎优化是什么意思
  • 医院网站建设情况深圳网站设计公司哪家好
  • 轻松筹网站可以做吗网上推广产品哪个网好
  • 广水网站设计合肥seo管理