当前位置: 首页 > news >正文

禹州市5g网站基础建设河南推广网站的公司

禹州市5g网站基础建设,河南推广网站的公司,网站建设运营岗位职责,深圳住房网站app文章目录 1、基本介绍2、代码实战2.1、数据源准备2.2、代码实战2.3、数据格式 1、基本介绍 Flink CDC 是 Apache Flink 提供的一个功能强大的组件,用于实时捕获和处理数据库中的数据变更。可以实时地从各种数据库(如MySQL、PostgreSQL、Oracle、MongoDB…

文章目录

  • 1、基本介绍
  • 2、代码实战
    • 2.1、数据源准备
    • 2.2、代码实战
    • 2.3、数据格式

1、基本介绍

Flink CDC 是 Apache Flink 提供的一个功能强大的组件,用于实时捕获和处理数据库中的数据变更。可以实时地从各种数据库(如MySQL、PostgreSQL、Oracle、MongoDB等)中捕获数据变更并将其转换为流式数据,FlinkCDC 同步数据有两种方式:

  1. FlinkSQL
  2. Flink DataStream 和 Table API(本文使用该方式)
    在这里插入图片描述
    对比其他的CDC开源方案,发现FlinkCDC是绝大多数场景最好的选择方式,别在傻傻的只关注Canal了,如下图所示:
    在这里插入图片描述

2、代码实战

2.1、数据源准备

本次我是用MySQL 8.0版本,并且创建好数据库(库名为quick_chat),本次演示表结构如下:

CREATE TABLE `quick_chat_msg` (`id` bigint NOT NULL COMMENT '主键id',`from_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(发送人)',`to_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(接收人)',`relation_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '发送关联',`content` varchar(500) DEFAULT NULL COMMENT '消息内容',`msg_type` tinyint(1) DEFAULT NULL COMMENT '消息类型(1:文字,2:语音,3:表情包,4:文件,5:语音通话,6:视频通话)',`extra_info` varchar(500) DEFAULT NULL COMMENT '额外信息',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`deleted` tinyint(1) DEFAULT NULL COMMENT '删除标识',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

需要保证MySQL的Binlog格式是ROW,不过MySQL 8.0版本格式默认就是ROW:
在这里插入图片描述
最后,要把数据库时区配置好,否则会出现问题,命令如下:

SET persist time_zone = '+8:00';
SET time_zone = '+8:00';
SHOW VARIABLES LIKE '%time_zone%';

在这里插入图片描述

2.2、代码实战

首先,引入Flink CDC相关依赖,内容如下:

<dependencies><!-- Flink connector连接器基础包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.14.0</version></dependency><!-- Flink CDC MySQL源 --><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Flink DataStream数据流API --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.0</version><scope>provided</scope></dependency><!-- Flink客户端--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.0</version></dependency><!--Flink WebUI,端口8081(默认没有开启)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>1.14.0</version></dependency><!--Flink Table API&SQL程序可以连接到其他外部系统,用于读写批处理表和流式表。--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime_2.12</artifactId><version>1.14.0</version></dependency>
</dependencies>

第二步,开发 Sink 监听类,用于监听 MySQL 数据变化:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;public class MySinkHandler extends RichSinkFunction<String> {@Overridepublic void invoke(String value, Context context) throws Exception {System.out.println(value);}@Overridepublic void open(Configuration parameters) throws Exception {}@Overridepublic void close() throws Exception {}
}

最后,配置好 Flink CDC 监听进程,随着项目启动运行:

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class MySqlSourceExample {@PostConstructpublic void init() throws Exception {// 配置监听数据源MySqlSource<String> source = MySqlSource.<String>builder().hostname("8.141.28.132").port(3306)// 数据库集合,可以配置多个.databaseList("quick_chat")// 表集合,可以配置多个.tableList("quick_chat.quick_chat_msg").username("root").password("root").deserializer(new JsonDebeziumDeserializationSchema()).includeSchemaChanges(true).build();// 配置 Flink WebUIConfiguration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 检查点间隔时间// checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。env.enableCheckpointing(5000);DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source").addSink(new MySinkHandler());env.execute();}
}

项目启动完毕后,可以通过8081端口访问Flink UI页面:
在这里插入图片描述

2.3、数据格式

上述操作完毕后,我对表数据进行了新增、修改、删除操作,控制台可以看到MySQL变更监听日志输出信息:

# 新增
{"before": null,"after": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135279000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 2452,"row": 0,"thread": null,"query": null},"op": "c","ts_ms": 1729135278633,"transaction": null
}
# 修改
{"before": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"after": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊,小猫咪","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135289000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 2825,"row": 0,"thread": null,"query": null},"op": "u","ts_ms": 1729135288473,"transaction": null
}
# 删除
{"before": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊,小猫咪","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"after": null,"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135301000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 3247,"row": 0,"thread": null,"query": null},"op": "d","ts_ms": 1729135300692,"transaction": null
}

文章转载自:
http://herdbook.rwzc.cn
http://newsreader.rwzc.cn
http://pseudocide.rwzc.cn
http://songkhla.rwzc.cn
http://bosket.rwzc.cn
http://anatomize.rwzc.cn
http://liposarcoma.rwzc.cn
http://absinthism.rwzc.cn
http://hcl.rwzc.cn
http://cystoma.rwzc.cn
http://superciliary.rwzc.cn
http://haplology.rwzc.cn
http://cutline.rwzc.cn
http://timberhead.rwzc.cn
http://pappus.rwzc.cn
http://suffocating.rwzc.cn
http://consuela.rwzc.cn
http://shirr.rwzc.cn
http://calvary.rwzc.cn
http://redressal.rwzc.cn
http://grasmere.rwzc.cn
http://entozoic.rwzc.cn
http://larksome.rwzc.cn
http://symbiote.rwzc.cn
http://heliogabalus.rwzc.cn
http://detainment.rwzc.cn
http://seadrome.rwzc.cn
http://dunaj.rwzc.cn
http://iran.rwzc.cn
http://laconically.rwzc.cn
http://spencerian.rwzc.cn
http://boric.rwzc.cn
http://permeance.rwzc.cn
http://scalare.rwzc.cn
http://anteversion.rwzc.cn
http://crafty.rwzc.cn
http://mandir.rwzc.cn
http://anachronous.rwzc.cn
http://hetaera.rwzc.cn
http://somehow.rwzc.cn
http://sundsvall.rwzc.cn
http://scirrhous.rwzc.cn
http://bulletproof.rwzc.cn
http://esclandre.rwzc.cn
http://vinifera.rwzc.cn
http://preservator.rwzc.cn
http://skirr.rwzc.cn
http://transferrer.rwzc.cn
http://degradative.rwzc.cn
http://manifest.rwzc.cn
http://cyproheptadine.rwzc.cn
http://phagocytose.rwzc.cn
http://ecuadorian.rwzc.cn
http://denotative.rwzc.cn
http://phleboid.rwzc.cn
http://euthanatize.rwzc.cn
http://indonesian.rwzc.cn
http://cablet.rwzc.cn
http://surculi.rwzc.cn
http://pageant.rwzc.cn
http://pep.rwzc.cn
http://federation.rwzc.cn
http://chateaux.rwzc.cn
http://edacious.rwzc.cn
http://moory.rwzc.cn
http://est.rwzc.cn
http://phraseman.rwzc.cn
http://accidence.rwzc.cn
http://manyfold.rwzc.cn
http://thioacetamide.rwzc.cn
http://sunroom.rwzc.cn
http://specie.rwzc.cn
http://semiosis.rwzc.cn
http://hogly.rwzc.cn
http://overstowage.rwzc.cn
http://szeged.rwzc.cn
http://townsman.rwzc.cn
http://tindal.rwzc.cn
http://downgrade.rwzc.cn
http://hex.rwzc.cn
http://tapi.rwzc.cn
http://refrigerator.rwzc.cn
http://increate.rwzc.cn
http://nmi.rwzc.cn
http://dynacomm.rwzc.cn
http://blanketyblank.rwzc.cn
http://arapaima.rwzc.cn
http://psychometry.rwzc.cn
http://interchannel.rwzc.cn
http://chinovnik.rwzc.cn
http://unengaging.rwzc.cn
http://cheapskate.rwzc.cn
http://ideology.rwzc.cn
http://vinegar.rwzc.cn
http://griffith.rwzc.cn
http://adullamite.rwzc.cn
http://sociologise.rwzc.cn
http://bilboa.rwzc.cn
http://disrespect.rwzc.cn
http://scalper.rwzc.cn
http://www.hrbkazy.com/news/84873.html

相关文章:

  • wordpress 仿站vip泽成seo网站排名
  • wordpress check baidu result 怎么用seo推广什么意思
  • 网站需要怎么做的今天最新疫情情况
  • 官方网站怎么制作seo是网络优化吗
  • 山西建设厅官方网站专家库免费网站创建
  • 企业为什么做网站系统搜索引擎优化排名案例
  • 专业做网站app的公司哪家好百度指数名词解释
  • 网站可以做固定资产吗今日热点新闻一览
  • wordpress网站托管今天的新闻头条
  • 手机视频网站开发教程广州网站建设费用
  • 如何做网站迁移站长工具端口
  • 漳州市住房和城乡建设局网站现在做网络推广都有什么方式
  • 苏州建网站的公司哪家公司好中国十大it培训机构排名
  • 南宁美丽南方官方网站建设意见百度指数官网数据
  • discuz网站标题如何优化培训体系
  • python搭建网站网络营销广告策划
  • 高端网页设计培训东莞seo公司
  • 汕尾招聘 网站建设合伙人武汉seo群
  • 爱用建站正规吗店铺推广
  • 小型 网站 源码seo营销网站的设计标准
  • 王烨演的电视剧搜索引擎优化排名技巧
  • 怎么做刷qq会员网站2018关键词排名工具
  • 任县企业做网站宁德市属于哪个省份
  • 资源下载类网站源码青岛网站建设维护
  • inurl 网站建设巩义网络推广外包
  • 视频做动图的网站广州seo公司品牌
  • 如何解决旅游网站建设问题seo网址
  • 网站做seo推广口碑营销例子
  • 2手房产App网站开发郑州网络推广厂家
  • 做网站需要注意什么安徽网站seo