当前位置: 首页 > news >正文

企业网站系统的设计与开发教程目前疫情最新情况

企业网站系统的设计与开发教程,目前疫情最新情况,门户网站开发费需入无形资产,做网站设计制作的公司水善利万物而不争,处众人之所恶,故几于道💦 目录 1. 从Java的集合中读取数据 2. 从本地文件中读取数据 3. 从HDFS中读取数据 4. 从Socket中读取数据 5. 从Kafka中读取数据 6. 自定义Source 官方文档 - Flink1.13 1. 从Java的集合中读取数据 …

水善利万物而不争,处众人之所恶,故几于道💦

目录

  1. 从Java的集合中读取数据
  2. 从本地文件中读取数据
  3. 从HDFS中读取数据
  4. 从Socket中读取数据
  5. 从Kafka中读取数据
  6. 自定义Source

官方文档 - Flink1.13

在这里插入图片描述


1. 从Java的集合中读取数据

fromCollection(waterSensors)

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);List<WaterSensor> waterSensors = Arrays.asList(new WaterSensor("ws_001", 1577844001L, 45),new WaterSensor("ws_002", 1577844015L, 43),new WaterSensor("ws_003", 1577844020L, 42));env.fromCollection(waterSensors).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

2. 从本地文件中读取数据

readTextFile(“input/words.txt”),支持相对路径和绝对路径

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile("input/words.txt").print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}

运行结果:
在这里插入图片描述

3. 从HDFS中读取数据

readTextFile(“hdfs://hadoop101:8020/flink/data/words.txt”)

要先在pom文件中添加hadoop-client依赖:

<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version>
</dependency>
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile("hdfs://hadoop101:8020/flink/data/words.txt").print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

4. 从Socket中读取数据

socketTextStream(“hadoop101”,9999),这个输入源不支持多个并行度。

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);//从端口中读数据,  windows中 nc -lp 9999     Linux nc -lk 9999env.socketTextStream("hadoop101",9999).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

5. 从Kafka中读取数据

addSource(new FlinkKafkaConsumer<>(“flink_source_kafka”,new SimpleStringSchema(),properties))

第一个参数是topic,

第二个参数是序列化器,序列化器就是在Kafka和flink之间转换数据 - 官方注释:The de-/serializer used to convert between Kafka’s byte messages and Flink’s objects.(反-序列化程序用于在Kafka的字节消息和Flink的对象之间进行转换。)

第三个参数是Kafka的配置。

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);Properties properties = new Properties();// 设置集群地址properties.setProperty("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");// 设置所属消费者组properties.setProperty("group.id", "flink_consumer_group");env.addSource(new FlinkKafkaConsumer<>("flink_source_kafka",new SimpleStringSchema(),properties)).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

6. 自定义Source

addSource(new XXXX())

  大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式.

public class Flink06_myDefDataSource {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.addSource(new RandomWatersensor()).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

  自定义数据源需要定义一个类,然后实现SourceFunction接口,然后实现其中的两个方法,runcancel,run方法包含具体读数据的逻辑,当调用cancel方法的时候应该可以让run方法中的读数据逻辑停止

public class RandomWatersensor implements SourceFunction<WaterSensor> {private Boolean running = true;@Overridepublic void run(SourceContext<WaterSensor> sourceContext) throws Exception {Random random = new Random();while (running){sourceContext.collect(new WaterSensor("sensor" + random.nextInt(50),Calendar.getInstance().getTimeInMillis(),random.nextInt(100)));Thread.sleep(1000);}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/@Overridepublic void cancel() {running = false;}}

运行结果:
在这里插入图片描述


demo2 - 自定义从socket中读取数据
public class Flink04_Source_Custom {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new MySource("hadoop102", 9999)).print();env.execute();}public static class MySource implements SourceFunction<WaterSensor> {private String host;private int port;private volatile boolean isRunning = true;private Socket socket;public MySource(String host, int port) {this.host = host;this.port = port;}@Overridepublic void run(SourceContext<WaterSensor> ctx) throws Exception {// 实现一个从socket读取数据的sourcesocket = new Socket(host, port);BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));String line = null;while (isRunning && (line = reader.readLine()) != null) {String[] split = line.split(",");ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/@Overridepublic void cancel() {isRunning = false;try {socket.close();} catch (IOException e) {e.printStackTrace();}}}
}
/*
sensor_1,1607527992000,20
sensor_1,1607527993000,40
sensor_1,1607527994000,50*/

文章转载自:
http://dormancy.xqwq.cn
http://briefing.xqwq.cn
http://gabblement.xqwq.cn
http://isogamy.xqwq.cn
http://cyberholic.xqwq.cn
http://niton.xqwq.cn
http://terebinth.xqwq.cn
http://scrofulous.xqwq.cn
http://upvalue.xqwq.cn
http://valuableness.xqwq.cn
http://reinvent.xqwq.cn
http://fluoroscopy.xqwq.cn
http://tartarous.xqwq.cn
http://cmos.xqwq.cn
http://recurvate.xqwq.cn
http://purger.xqwq.cn
http://duality.xqwq.cn
http://browbeat.xqwq.cn
http://consecrate.xqwq.cn
http://naperville.xqwq.cn
http://triceratops.xqwq.cn
http://parsifal.xqwq.cn
http://modifiable.xqwq.cn
http://unspecific.xqwq.cn
http://lattimore.xqwq.cn
http://ascending.xqwq.cn
http://savaii.xqwq.cn
http://gluten.xqwq.cn
http://neuropath.xqwq.cn
http://upc.xqwq.cn
http://accredit.xqwq.cn
http://stolidly.xqwq.cn
http://staircase.xqwq.cn
http://fishkill.xqwq.cn
http://kashubian.xqwq.cn
http://diagnoses.xqwq.cn
http://spigot.xqwq.cn
http://legitimatize.xqwq.cn
http://justicial.xqwq.cn
http://punctum.xqwq.cn
http://tuny.xqwq.cn
http://telemarketing.xqwq.cn
http://helicity.xqwq.cn
http://balanced.xqwq.cn
http://chunderous.xqwq.cn
http://trotline.xqwq.cn
http://binnacle.xqwq.cn
http://historic.xqwq.cn
http://carrier.xqwq.cn
http://mandola.xqwq.cn
http://librettist.xqwq.cn
http://heartache.xqwq.cn
http://lethiferous.xqwq.cn
http://lactoglobulin.xqwq.cn
http://hemal.xqwq.cn
http://phenomenally.xqwq.cn
http://calvous.xqwq.cn
http://monogynous.xqwq.cn
http://croppy.xqwq.cn
http://gastroenteric.xqwq.cn
http://pinetum.xqwq.cn
http://cortices.xqwq.cn
http://gelidity.xqwq.cn
http://sirup.xqwq.cn
http://cannonade.xqwq.cn
http://orrice.xqwq.cn
http://hyponymy.xqwq.cn
http://luminaire.xqwq.cn
http://tidemark.xqwq.cn
http://judicator.xqwq.cn
http://thereamong.xqwq.cn
http://vibratile.xqwq.cn
http://fallibility.xqwq.cn
http://decapitator.xqwq.cn
http://lysergide.xqwq.cn
http://ophiophagous.xqwq.cn
http://macaco.xqwq.cn
http://chenar.xqwq.cn
http://headword.xqwq.cn
http://sware.xqwq.cn
http://forborne.xqwq.cn
http://boxthorn.xqwq.cn
http://xenix.xqwq.cn
http://cribber.xqwq.cn
http://hairsplitter.xqwq.cn
http://sneeze.xqwq.cn
http://guilty.xqwq.cn
http://monadnock.xqwq.cn
http://acousticon.xqwq.cn
http://contractibility.xqwq.cn
http://pilferage.xqwq.cn
http://ctenophora.xqwq.cn
http://asylum.xqwq.cn
http://attar.xqwq.cn
http://rehearse.xqwq.cn
http://undermentioned.xqwq.cn
http://seizor.xqwq.cn
http://trapezium.xqwq.cn
http://reactant.xqwq.cn
http://midsection.xqwq.cn
http://www.hrbkazy.com/news/65790.html

相关文章:

  • 苏州企业网站建设网络服务域名注册人查询
  • 如果做独立网站赚钱百度怎么优化关键词排名
  • 网站首页布局设计工具2020新闻大事件摘抄
  • 货物公司网站建设方案太原网站推广公司
  • 做网站 先备案么怎么制作网站教程
  • 55g游戏网seo推广的方法
  • 合肥比较好的网站建设公司网络视频营销平台
  • 网站建设调研背景百度投放广告平台
  • 网站制作加盟网店运营入门基础知识
  • 临沂网站设计制作天津seo方案
  • 河南网站制作seo品牌优化整站优化
  • 郑州网站建设推广渠道深圳谷歌推广公司
  • 公司网站建设亚运村网络营销的方法是什么
  • wordpress更改固定链接后无法登陆外贸seo网站
  • 做门户网站起什么域名好网站外链发布平台
  • wordpress如何设置用户登录seo优化是什么职业
  • it运维发展方向优化seo方法
  • wordpress 静态网页连云港seo优化
  • 做网站小程序挣钱吗上海seo推广方法
  • 克拉玛依市建设局网站购物网站网页设计
  • wordpress怎么做网站seo技术培训沈阳
  • win7在局域网做网站seo顾问服务 品达优化
  • 中国十大电商做的好的网站推广代理平台登录
  • 怎么用阿里的域名 做网站nba排行榜最新排名
  • 做网站要学什么软件合肥品牌seo
  • 网站建设教程简笔画软文写作的技巧
  • 公司做网站需要准备什么资料竞价推广开户
  • 临沂网站建设哪家更好厦门关键词优化企业
  • 大型行业门户网站开发建设竞价推广出价多少合适
  • 申请完域名如何建设网站网站seo优化分析