做一个公司的门户网站多少钱百度上海分公司
目录大纲
- 一、环境配置
- 1.1 docker-compose.yml 配置
- 1.2 docker-compose 常用命令
- 1.3 镜像服务启动状态
- 二、MySQL binlog 配置
- 2.1 docker-compose command 配置 binlog
- 2.2 创建canal用户,以及查看是否开启binlog
- 三、canal 相关配置文件
- 3.1 canal.properties 完整文件
- 3.2 instance.properties 完整文件
- 3.3 检查配置是否与宿主机一致
- 3.4 开启相关端口防火墙配置
- 四、代码实现
- 4.1 相关pom依赖引入
- 4.2 完整pom.xml
- 4.3 application.yml 配置
- 4.4 完整application.yml配置
- 4.5 RabbitConstants 基础常量配置
- 4.6 CanalMqConfigure MQ队列交换机配置
- 4.7 CanalConsumer 消费者
- 五、运行与测试
一、环境配置
1.1 docker-compose.yml 配置
version: '3.8'services:redis:container_name: redisimage: redis:6.2.7restart: alwaysnetworks:- app_netports:- "6379:6379"volumes:- /usr/local/docker/redis/data:/data- /usr/local/docker/redis/config/redis.conf:/usr/local/redis/config/redis.conf- /usr/local/docker/redis/logs:/logscommand: [ "redis-server","/usr/local/redis/config/redis.conf" ]mysql:container_name: mysqlimage: mysql:8.0.30restart: alwaysnetworks:- app_netports:- "3306:3306"volumes:- /usr/local/docker/mysql/data:/var/lib/mysql- /usr/local/docker/mysql/config:/etc/mysql/conf.denvironment:MYSQL_ROOT_PASSWORD: rootTZ: Asia/Shanghaicommand:--default-authentication-plugin=mysql_native_password--character-set-server=utf8mb4--collation-server=utf8mb4_general_ci--explicit_defaults_for_timestamp=true--lower_case_table_names=1--log-bin=/var/lib/mysql/mysql-bin--server-id=1--binlog-format=ROW--expire_logs_days=7--max_binlog_size=500Mcanal:image: canal/canal-server:v1.1.5container_name: canalrestart: alwaysports:- 11110:11110- 11111:11111- 11112:11112volumes:- /usr/local/docker/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties- /usr/local/docker/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties- /usr/local/docker/canal/logs:/home/admin/canal-server/logsnetworks:- app_netdepends_on:- mysql- rabbitmqrabbitmq:image: rabbitmq:3-managementcontainer_name: rabbitmqrestart: alwaysports:- "5672:5672"- "15672:15672"volumes:- /usr/local/docker/rabbitmq/data/:/var/lib/rabbitmq/- /usr/local/docker/rabbitmq/log/:/var/log/rabbitmq/environment:- RABBITMQ_DEFAULT_USER=guest- RABBITMQ_DEFAULT_PASS=guestnetworks:- app_netnetworks:app_net:driver: bridge
1.2 docker-compose 常用命令
# 后台启动容器编排文件
docker-compose up -d [service]# 停止up命令所启动的容器,并移除网络
docker-compose down# 进入指定容器
docker-compose exec [service]# 列出项目中所有的容器
docker-compose ps [service]# 重启项目中容器
docker-compose restart [service]# 删除项目中所有容器
docker-compose rm -f [service]# 启动项目中容器(或指定容器)
docker-compose start [service]# 暂停项目中容器(或指定容器)
docker-compose stop [service]
1.3 镜像服务启动状态
[root@lavm-13jmyj9ugf docker]# docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0d4260bc557b canal/canal-server:v1.1.5 "/alidata/bin/main.s…" 38 minutes ago Up 38 minutes 9100/tcp, 0.0.0.0:11110-11112->11110-11112/tcp, :::11110-11112->11110-11112/tcp canal
c66b3f1f13a9 mysql:8.0.30 "docker-entrypoint.s…" 38 minutes ago Up 38 minutes 0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp mysql
645e27bd4001 rabbitmq:3-management "docker-entrypoint.s…" 5 hours ago Up 49 minutes 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp rabbitmq
f55d42cbbd8e redis:6.2.7 "docker-entrypoint.s…" 3 days ago Up 49 minutes 0.0.0.0:6379->6379/tcp, :::6379->6379/tcp redis
二、MySQL binlog 配置
2.1 docker-compose command 配置 binlog
--log-bin=/var/lib/mysql/mysql-bin
--server-id=1
--binlog-format=ROW
--expire_logs_days=7
--max_binlog_size=500M
2.2 创建canal用户,以及查看是否开启binlog
mysql> CREATE USER canal IDENTIFIED BY 'canal';
Query OK, 0 rows affected (0.05 sec)mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
Query OK, 0 rows affected (0.05 sec)mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.05 sec)mysql> select * from mysql.user where User = 'canal';
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
| Host | User | Select_priv | Insert_priv | Update_priv | Delete_priv | Create_priv | Drop_priv | Reload_priv | Shutdown_priv | Process_priv | File_priv | Grant_priv | References_priv | Index_priv | Alter_priv | Show_db_priv | Super_priv | Create_tmp_table_priv | Lock_tables_priv | Execute_priv | Repl_slave_priv | Repl_client_priv | Create_view_priv | Show_view_priv | Create_routine_priv | Alter_routine_priv | Create_user_priv | Event_priv | Trigger_priv | Create_tablespace_priv | ssl_type | ssl_cipher | x509_issuer | x509_subject | max_questions | max_updates | max_connections | max_user_connections | plugin | authentication_string | password_expired | password_last_changed | password_lifetime | account_locked | Create_role_priv | Drop_role_priv | Password_reuse_history | Password_reuse_time | Password_require_current | User_attributes |
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
| % | canal | Y | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | Y | Y | N | N | N | N | N | N | N | N | | | | | 0 | 0 | 0 | 0 | mysql_native_password | *E3619321C1A937C46A0D8BD1DAC39F93B27D4458 | N | 2025-03-10 11:53:49 | NULL | N | N | N | NULL | NULL | NULL | NULL |
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
1 row in set (0.08 sec)mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.30 sec)
三、canal 相关配置文件
canal.properties
主要核心配置
canal.serverMode=rabbitMQ
:选择 RabbitMQ 作为通知服务模型。
rabbitmq.host=rabbitmq
:基于 Docker 同一网络下,可以使用容器名称代替 host。
rabbitmq.queue
、rabbitmq.routingKey
、rabbitmq.exchange
: RabbitMQ 的三件套,用于后续创建具体通道监听。
#################################################
######### common argument #############
#################################################
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 支持的服务模型,tcp直连或mq,此处我选择RabbitMQ
canal.serverMode=rabbitMQ##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host=rabbitmq
rabbitmq.virtual.host=/
rabbitmq.exchange=canal-exchange
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.queue=canal-queue
rabbitmq.routingKey=canal-routing-key
rabbitmq.deliveryMode=
instance.properties
主要核心配置
canal.instance.master.address
数据库地址
canal.instance.dbUsername
数据库用户名
canal.instance.dbPassword
数据库密码
canal.mq.topic
RabbitMQ路由
# 数据地址,此处mysql,是因为canal和mysql是同一network下,可以使用容器名称代替具体ip
canal.instance.master.address=mysql:3306# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root# mq config
canal.mq.topic=canal-routing-key
3.1 canal.properties 完整文件
#################################################
######### common argument #############
#################################################
canal.ip=
canal.register.ip=
canal.port=11111
canal.metrics.pull.port=11112
canal.admin.port=11110
canal.admin.user=admin
canal.admin.passwd=
canal.zkServers=
canal.zookeeper.flush.period=1000
canal.withoutNetty=false
canal.serverMode=rabbitMQ
canal.file.data.dir=${canal.conf.dir}
canal.file.flush.period=1000
canal.instance.memory.buffer.size=16384
canal.instance.memory.buffer.memunit=1024
canal.instance.memory.batch.mode=MEMSIZE
canal.instance.memory.rawEntry=true
canal.instance.detecting.enable=false
canal.instance.detecting.sql=select 1
canal.instance.detecting.interval.time=3
canal.instance.detecting.retry.threshold=3
canal.instance.detecting.heartbeatHaEnable=false
canal.instance.transaction.size=1024
canal.instance.fallbackIntervalInSeconds=60
canal.instance.network.receiveBufferSize=16384
canal.instance.network.sendBufferSize=16384
canal.instance.network.soTimeout=30
canal.instance.filter.druid.ddl=true
canal.instance.filter.query.dcl=false
canal.instance.filter.query.dml=false
canal.instance.filter.query.ddl=false
canal.instance.filter.table.error=false
canal.instance.filter.rows=false
canal.instance.filter.transaction.entry=false
canal.instance.filter.dml.insert=false
canal.instance.filter.dml.update=false
canal.instance.filter.dml.delete=false
canal.instance.binlog.format=ROW,STATEMENT,MIXED
canal.instance.binlog.image=FULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolation=false
canal.instance.parser.parallel=true
canal.instance.parser.parallelThreadSize = 16
canal.instance.parser.parallelBufferSize=256
canal.instance.tsdb.enable=true
canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal
canal.instance.tsdb.snapshot.interval=24
canal.instance.tsdb.snapshot.expire=360
#################################################
######### destinations #############
#################################################
canal.destinations=example
canal.conf.dir=../conf
canal.auto.scan=true
canal.auto.scan.interval=5
canal.auto.reset.latest.pos.mode=false
canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml
canal.instance.global.mode=spring
canal.instance.global.lazy=false
canal.instance.global.manager.address=${canal.admin.manager}
canal.instance.global.spring.xml=classpath:spring/file-instance.xml
##################################################
######### MQ Properties #############
##################################################
canal.aliyun.accessKey=
canal.aliyun.secretKey=
canal.aliyun.uid=
canal.mq.flatMessage=true
canal.mq.canalBatchSize=50
canal.mq.canalGetTimeout=100
canal.mq.accessChannel=local
canal.mq.database.hash=true
canal.mq.send.thread.size=30
canal.mq.build.thread.size=8
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers=127.0.0.1:9092
kafka.acks=all
kafka.compression.type=none
kafka.batch.size=16384
kafka.linger.ms=1
kafka.max.request.size=1048576
kafka.buffer.memory=33554432
kafka.max.in.flight.requests.per.connection=1
kafka.retries=0
kafka.kerberos.enable=false
kafka.kerberos.krb5.file=../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file=../conf/kerberos/jaas.conf
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group=test
rocketmq.enable.message.trace=false
rocketmq.customized.trace.topic=
rocketmq.namespace=
rocketmq.namesrv.addr=127.0.0.1:9876
rocketmq.retry.times.when.send.failed=0
rocketmq.vip.channel.enabled=false
rocketmq.tag=
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host=rabbitmq
rabbitmq.virtual.host=/
rabbitmq.exchange=canal-exchange
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.queue=canal-queue
rabbitmq.routingKey=canal-routing-key
rabbitmq.deliveryMode=
##################################################
######### Pulsar #############
##################################################
pulsarmq.serverUrl=
pulsarmq.roleToken=
pulsarmq.topicTenantPrefix=
3.2 instance.properties 完整文件
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0# enable gtid use true/false
canal.instance.gtidon=false# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# position info
canal.instance.master.address=mysql:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# multi stream for polardbx
canal.instance.multi.stream.on=false# ssl
#canal.instance.master.sslMode=DISABLED
#canal.instance.master.tlsVersions=
#canal.instance.master.trustCertificateKeyStoreType=
#canal.instance.master.trustCertificateKeyStoreUrl=
#canal.instance.master.trustCertificateKeyStorePassword=
#canal.instance.master.clientCertificateKeyStoreType=
#canal.instance.master.clientCertificateKeyStoreUrl=
#canal.instance.master.clientCertificateKeyStorePassword=# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=admin123!@#
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topic=canal-routing-key
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
3.3 检查配置是否与宿主机一致
进入容器内部:docker exec -it canal bash
检查配置文件内容是否与宿主机一致:
cat /home/admin/canal-server/conf/canal.properties
cat /home/admin/canal-server/conf/example/instance.properties
3.4 开启相关端口防火墙配置
canal
:11110、11111、11112mysql
:3306redis
:6379RabbitMQ
: 15672、5672
四、代码实现
4.1 相关pom依赖引入
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/> <!-- lookup parent from repository -->
</parent><!-- Spring Boot MQ 依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency><!-- canal -->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version>
</dependency>
4.2 完整pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.neo</groupId><artifactId>code-repository</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/> <!-- lookup parent from repository --></parent><name>code-repository</name><properties><java.version>17</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><hutool.version>5.8.20</hutool.version><mysql.version>8.0.30</mysql.version><mybatis-plus.version>3.5.3.1</mybatis-plus.version><redis.version>3.1.0</redis.version><druid.version>1.2.16</druid.version><fastjson.version>1.2.83</fastjson.version><sa-token.version>1.37.0</sa-token.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>${mybatis-plus.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>${druid.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>${hutool.version}</version></dependency><dependency><groupId>cn.dev33</groupId><artifactId>sa-token-spring-boot-starter</artifactId><version>${sa-token.version}</version></dependency><dependency><groupId>cn.dev33</groupId><artifactId>sa-token-redis-jackson</artifactId><version>${sa-token.version}</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
4.3 application.yml 配置
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /publisher-confirm-type: correlatedpublisher-returns: true
4.4 完整application.yml配置
server:port: 8088servlet:context-path: /apispring:datasource:type: com.alibaba.druid.pool.DruidDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/db_v1?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghaiusername: rootpassword: rootdruid:initial-size: 5min-idle: 5max-active: 20max-wait: 60000validation-query: SELECT 1test-while-idle: truestat-view-servlet:enabled: trueurl-pattern: /druid/*login-username: adminlogin-password: admin123web-stat-filter:enabled: trueurl-pattern: /*exclusions: "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*"filter:stat:enabled: truelog-slow-sql: trueslow-sql-millis: 1000wall:enabled: trueconfig:drop-table-allow: falseredis:host: localhostport: 6379password: 123456database: 0timeout: 5000lettuce:pool:max-active: 8max-wait: -1max-idle: 8min-idle: 0mail:host: smtp.aliyun.comusername: password: port: 25properties:mail:smtp:auth: truestarttls:enable: truerequired: truerabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /publisher-confirm-type: correlatedpublisher-returns: truemybatis-plus:mapper-locations: classpath*:mapper/*_Mapper.xmlglobal-config:db-config:logic-delete-field: delFlaglogic-delete-value: 1logic-not-delete-value: 0configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
4.5 RabbitConstants 基础常量配置
public interface RabbitConstants {interface Canal {String QUEUE = "canal-queue";String EXCHANGE = "canal-exchange";String ROUTING = "canal-routing-key";}interface EventType {String INSERT = "INSERT";String UPDATE = "UPDATE";String DELETE = "DELETE";}}
4.6 CanalMqConfigure MQ队列交换机配置
@Configuration
public class CanalMqConfigure {@Beanpublic Queue queue() {return new Queue(RabbitConstants.Canal.QUEUE, true);}@Beanpublic DirectExchange directExchange() {return new DirectExchange(RabbitConstants.Canal.EXCHANGE, true, false);}@Beanpublic Binding bindingCanal() {return BindingBuilder.bind(queue()).to(directExchange()).with(RabbitConstants.Canal.ROUTING);}
}
4.7 CanalConsumer 消费者
package com.neo.core.canal;import com.neo.core.constant.RabbitConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.Map;@Slf4j
@Component
@RabbitListener(queues = RabbitConstants.Canal.QUEUE)
public class CanalConsumer {@RabbitHandlerpublic void execute(Map<String, Object> msg) {log.info("canal消息监听事件触发,消息内容:{}", msg);boolean isDdl = (boolean) msg.get("isDdl");if (isDdl) {return;}String database = (String) msg.get("database");String table = (String) msg.get("table");String type = (String) msg.get("type");List<?> data = (List<?>) msg.get("data");log.info("database:{}.table:{}", database, table);if (RabbitConstants.EventType.INSERT.equalsIgnoreCase(type)) {System.out.println("INSERT");} else if (RabbitConstants.EventType.UPDATE.equalsIgnoreCase(type)) {System.out.println("UPDATE");} else if (RabbitConstants.EventType.DELETE.equalsIgnoreCase(type)) {System.out.println("DELETE");} else {// 其他事件}}
}
五、运行与测试
当MySQL数据出现变动后,会触发canal-queue的监听事件,后续可根据具体业务逻辑实现业务处理。
25-03-10.16:51:28.431 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - canal消息监听事件触发,消息内容:{data=[{id=2, account=test, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test1, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596660000, id=5, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=null, pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596660932, type=DELETE}
25-03-10.16:51:28.431 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - database:db_v1.table:sys_user
DELETE
25-03-10.16:51:31.523 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - canal消息监听事件触发,消息内容:{data=[{id=2, account=test, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596663000, id=6, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=null, pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596664038, type=INSERT}
25-03-10.16:51:31.523 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - database:db_v1.table:sys_user
INSERT
25-03-10.16:51:36.030 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - canal消息监听事件触发,消息内容:{data=[{id=2, account=test1, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596668000, id=7, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=[{account=test}], pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596668545, type=UPDATE}
25-03-10.16:51:36.030 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - database:db_v1.table:sys_user
UPDATE
以上是RabbitMQ+Canal数据一致性的完整解决方案,包括环境配置、代码实现以及运行测试等环节,确保了数据在不同系统间的一致性和可靠性。