当前位置: 首页 > news >正文

020网站建设产品软文是什么意思

020网站建设,产品软文是什么意思,好的制造公司站制作,浙江建设集团文章目录背景环境工具选型实操MM1MM2以MM2集群运行以Standalone模式运行验证附录MM2配置表其他背景 一个测试环境的kafka集群,Topic有360,Partition有2000,部署在虚拟机上,由于多方面原因,要求迁移至k8s容器内&#x…

文章目录

  • 背景
  • 环境
  • 工具选型
  • 实操
    • MM1
    • MM2
      • 以MM2集群运行
      • 以Standalone模式运行
  • 验证
  • 附录
    • MM2配置表
    • 其他

背景

一个测试环境的kafka集群,Topic有360+,Partition有2000+,部署在虚拟机上,由于多方面原因,要求迁移至k8s容器内(全量迁移),正好可以拿来练一下手。本文主要记录对MM1和MM2的实际操作过程,以及使用过程中遇到的问题及解决方案。

环境

source集群:kafka-2.6.0、2个broker、虚拟机

target集群:kafka-2.6.0、3个broker、k8s

工具:MM1(kafka-mirror-maker.sh)、MM2(connect-mirror-maker.sh)

需求:Topic名称不能改变、数据完整

条件:target集群需要开启自动创建Topic:auto.create.topics.enable=true

工具选型

本质上MM1是Kafka的消费者和生产者结合体,可以有效地将数据从源群集移动到目标群集,但没有提供太多其他功能。

并且在MM1多年的使用过程中发现了以下局限性:

  1. 静态的黑名单和白名单
  2. Topic信息不能同步,所有Topic同步到目标端都只有一个Partition
  3. 必须通过手动配置来解决active-active场景下的循环同步问题(MM2为解决这个问题,也做了体验很不好的改动)
  4. rebalance导致的性能问题
  5. 缺乏监控手段
  6. 无法保证Exactly Once
  7. 无法提供容灾恢复
  8. 无法同步Topic列表,只能同步有数据的Topic

MM2是基于kafka connect框架开发的。与其它的kafka connecet一样MM2有source connector和sink connetor组成,可以支持同步以下数据:

  1. 完整的Topic列表
  2. Topic配置
  3. ACL信息(如果有)
  4. consumer group和offset(kafka2.7.0之后版本才行)
  5. 其他功能:
    • 支持循环同步检测
    • 多集群自定义同步(同一个任务中,可以多集群同步:A->B、B->C、B->D)
    • 提供可监控Metrics
    • 可通过配置保证Exactly Once

实操

秉着实操前先演练的原则,我自己搭建了一个和目标集群相同配置的集群,用于验证不同工具的操作结果。有足够把握之后,再对目标集群实际操作。

MM1

执行 --help 查看参数选项:

[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./kafka-mirror-maker.sh --help
This tool helps to continuously copy data between two Kafka clusters.
Option                                   Description
------                                   -----------
--abort.on.send.failure <String: Stop    Configure the mirror maker to exit onthe entire mirror maker when a send      a failed send. (default: true)failure occurs>
--consumer.config <String: config file>  Embedded consumer config for consumingfrom the source cluster.
--consumer.rebalance.listener <String:   The consumer rebalance listener to useA custom rebalance listener of type      for mirror maker consumer.ConsumerRebalanceListener>
--help                                   Print usage information.
--message.handler <String: A custom      Message handler which will processmessage handler of type                  every record in-between consumer andMirrorMakerMessageHandler>               producer.
--message.handler.args <String:          Arguments used by custom messageArguments passed to message handler      handler for mirror maker.constructor.>
--new.consumer                           DEPRECATED Use new consumer in mirrormaker (this is the default so thisoption will be removed in a futureversion).
--num.streams <Integer: Number of        Number of consumption streams.threads>                                 (default: 1)
--offset.commit.interval.ms <Integer:    Offset commit interval in ms.offset commit interval in                (default: 60000)millisecond>
--producer.config <String: config file>  Embedded producer config.
--rebalance.listener.args <String:       Arguments used by custom rebalanceArguments passed to custom rebalance     listener for mirror maker consumer.listener constructor as a string.>
--version                                Display Kafka version.
--whitelist <String: Java regex          Whitelist of topics to mirror.(String)>
[root@XXGL-T-TJSYZ-REDIS-03 bin]#         

核心参数就两个:消费者和生产者的配置文件:

consumer.properties:(消费source集群)

bootstrap.servers=source:9092
auto.offset.reset=earliest
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=mm1-consumer
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";

producer.properties:(发送消息至目标集群)

bootstrap.servers= target:29092
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";
acks=-1
linger.ms=10
batch.size=10000
retries=3

执行脚本:

./kafka-mirror-maker.sh --consumer.config ./consumer.properties --producer.config ./producer.properties --offset.commit.interval.ms 5000 --num.streams 2 --whitelist "projects.*"

MM1比较简单,只要两个配置文件没问题,sasl配置正确,基本就OK了,适合简单的数据同步,比如指定topic进行同步。

MM2

有四种运行MM2的方法:

  • As a dedicated MirrorMaker cluster.(作为专用的MirrorMaker群集)
  • As a Connector in a distributed Connect cluster.(作为分布式Connect群集中的连接器)
  • As a standalone Connect worker.(作为独立的Connect工作者)
  • In legacy mode using existing MirrorMaker scripts.(在旧模式下,使用现有的MirrorMaker脚本。)

本文介绍第一种和第三种:作为专用的MirrorMaker群集、作为独立的Connect工作者,第二种需要搭建connect集群,操作比较复杂。

以MM2集群运行

这种模式是最简单的,只需要提供一个配置文件即可,配置文件定制化程度比较高,根据业务需求配置即可

老样子,执行 --help 看看使用说明:

[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./connect-mirror-maker.sh --help
usage: connect-mirror-maker [-h] [--clusters CLUSTER [CLUSTER ...]] mm2.propertiesMirrorMaker 2.0 driverpositional arguments:mm2.properties         MM2 configuration file.optional arguments:-h, --help             show this help message and exit--clusters CLUSTER [CLUSTER ...]Target cluster to use for this node.
[root@XXGL-T-TJSYZ-REDIS-03 bin]#  

可以看到,参数简单了许多,核心参数就一个配置文件。

mm2.properties:

name = event-center-connector
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 2# 定义集群别名
clusters = event-center, event-center-new# 设置event-center集群的kafka地址列表
event-center.bootstrap.servers = source:9193
event-center.security.protocol=SASL_PLAINTEXT
event-center.sasl.mechanism=PLAIN
event-center.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 设置event-center-new集群的kafka地址列表
event-center-new.bootstrap.servers = target:29092
event-center-new.security.protocol=SASL_PLAINTEXT
event-center-new.sasl.mechanism=PLAIN
event-center-new.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 开启event-center集群向event-center-new集群同步
event-center->event-center-new.enabled = true
# 允许同步topic的正则
event-center->event-center-new.topics = projects.*
event-center->event-center-new.groups = .*# MM2内部同步机制使用的topic,replication数量设置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1# 自定义参数
# 是否同步源topic配置
sync.topic.configs.enabled=true
# 是否同步源event-centerCL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=60
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=60
# DefaultReplicationPolicy / CustomReplicationPolicy
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
# 远端创建新topic的replication数量设置
replication.factor=3

需要注意的是:replication.policy.class 默认为:DefaultReplicationPolicy,这个策略会把同步至目标集群的topic都加上一个源集群别名的前缀,比如源集群别名为A,topic为:bi-log,该topic同步到目标集群后会变成:A.bi-log,为啥这么做呢,就是为了避免双向同步的场景出现死循环。

官方也给出了解释:

这是 MirrorMaker 2.0 中的默认行为,以避免在复杂的镜像拓扑中重写数据。 需要在复制流设计和主题管理方面小心自定义此项,以避免数据丢失。 可以通过对“replication.policy.class”使用自定义复制策略类来完成此操作。

针对如何自定义策略及使用方法,见我的另一篇文章:

为了保证脚本后台运行,写一个脚本包装一下:

run-mm2.sh:

#!/bin/bashexec ./connect-mirror-maker.sh MM2.properties >log/mm2.log 2>&1 &

之后执行脚本即可。

以Standalone模式运行

这种模式会麻烦点,需要提供一个kafka,作为worker节点来同步数据,使用的脚本为:connect-standalone.sh

–help看看如何使用:

./connect-standalone.sh --help
[2023-03-09 20:36:33,479] INFO Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...] (org.apache.kafka.connect.cli.ConnectStandalone:63)
[root@XXGL-T-TJSYZ-REDIS-03 bin]# 

需要两个配置文件,一个是作为worker的kafka集群信息(worker.properties),另一个是同步数据的配置(connector.properties)

worker.properties:

bootstrap.servers=worker:29092
security.protocol=PLAINTEXT
sasl.mechanism=PLAINkey.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverteroffset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

connector.properties:

name = MirrorSourceConnector
topics = projects.*
groups = *
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1# source
# 这个配置会使同步之后的Topic都加上一个前缀,慎重
source.cluster.alias = old
source.cluster.bootstrap.servers = source:9193
source.cluster.security.protocol=SASL_PLAINTEXT
source.cluster.sasl.mechanism=PLAIN
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# target
target.cluster.alias = new
target.cluster.bootstrap.servers = target:29092
target.cluster.security.protocol=SASL_PLAINTEXT
target.cluster.sasl.mechanism=PLAIN
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";# 是否同步源topic配置信息
sync.topic.configs.enabled=true
# 是否同步源ACL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=30
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=30
# 连接器消费者预读队列大小
# readahead.queue.capacity=500
# 使用自定义策略
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
replication.factor = 3

执行:

./connect-standalone.sh worker.properties connector.properties

这种方式做一个简单的介绍,我最后采用的是上一种方式,比较简单直接

验证

验证:

  • 消息数量 OK

    使用kafka-tool工具连接上两个集群进行比对

  • Topic数量 OK

    • source:
    ./kafka-topics.sh --bootstrap-server source:9193 --command-config command.properties --list > topics-source.txt 
    
    • sink
    ./kafka-topics.sh --bootstrap-server sink:29092 --command-config command.properties --list > topics-sink.txt 
    
    • command.properties示例:
    security.protocol = SASL_PLAINTEXT
    sasl.mechanism = PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
    
  • 新消息是否同步 OK

  • 新Topic是否同步 OK

  • Consumer是否同步 NO

./kafka-consumer-groups.sh --bootstrap-server source:9193 --command-config command.properties --list > consumer-source.txt 

​ 如果需要同步consumer,需要使用官方提供的工具:RemoteClusterUtils

  • consumer offset是否同步 NO

  • ACL是否同步 OK
    通过kafka-acls.sh或者客户端工具kafka-tool可以查看

附录

MM2配置表

propertydefault valuedescription
namerequiredname of the connector, e.g. “us-west->us-east”
topicsempty stringregex of topics to replicate, e.g. “topic1|topic2|topic3”. Comma-separated lists are also supported.
topics.blacklist“..internal, ..replica, __consumer_offsets” or similartopics to exclude from replication
groupsempty stringregex of groups to replicate, e.g. “.*”
groups.blacklistempty stringgroups to exclude from replication
source.cluster.aliasrequiredname of the cluster being replicated
target.cluster.aliasrequiredname of the downstream Kafka cluster
source.cluster.bootstrap.serversrequiredupstream cluster to replicate
target.cluster.bootstrap.serversrequireddownstream cluster
sync.topic.configs.enabledtruewhether or not to monitor source cluster for configuration changes
sync.topic.acls.enabledtruewhether to monitor source cluster ACLs for changes
emit.heartbeats.enabledtrueconnector should periodically emit heartbeats
emit.heartbeats.interval.seconds5 (seconds)frequency of heartbeats
emit.checkpoints.enabledtrueconnector should periodically emit consumer offset information
emit.checkpoints.interval.seconds5 (seconds)frequency of checkpoints
refresh.topics.enabledtrueconnector should periodically check for new topics
refresh.topics.interval.seconds5 (seconds)frequency to check source cluster for new topics
refresh.groups.enabledtrueconnector should periodically check for new consumer groups
refresh.groups.interval.seconds5 (seconds)frequency to check source cluster for new consumer groups
readahead.queue.capacity500 (records)number of records to let consumer get ahead of producer
replication.policy.classorg.apache.kafka.connect.mirror.DefaultReplicationPolicyuse LegacyReplicationPolicy to mimic legacy MirrorMaker
heartbeats.topic.retention.ms1 dayused when creating heartbeat topics for the first time
checkpoints.topic.retention.ms1 dayused when creating checkpoint topics for the first time
offset.syncs.topic.retention.msmax longused when creating offset sync topic for the first time
replication.factor2used when creating remote topics

其他

参考:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%253A+MirrorMaker+2.0

https://www.reddit.com/r/apachekafka/comments/q5s3al/mirrormaker2_is_not_able_to_replicate_groups_in/?sort=new

https://dev.to/keigodasu/transferring-commit-offset-with-mirrormaker-2-3kbf

https://learn.microsoft.com/zh-cn/azure/hdinsight/kafka/kafka-mirrormaker-2-0-guide


文章转载自:
http://peacemaker.nLkm.cn
http://anthropophagy.nLkm.cn
http://inoxidized.nLkm.cn
http://compliable.nLkm.cn
http://relapse.nLkm.cn
http://unwholesome.nLkm.cn
http://atheist.nLkm.cn
http://caladium.nLkm.cn
http://monopteros.nLkm.cn
http://bologna.nLkm.cn
http://liver.nLkm.cn
http://quadrat.nLkm.cn
http://peaceless.nLkm.cn
http://gingerliness.nLkm.cn
http://harmfully.nLkm.cn
http://excoriation.nLkm.cn
http://millimho.nLkm.cn
http://juba.nLkm.cn
http://mahoganize.nLkm.cn
http://vaccination.nLkm.cn
http://nipplewort.nLkm.cn
http://loadstar.nLkm.cn
http://insincerely.nLkm.cn
http://kuwait.nLkm.cn
http://bregma.nLkm.cn
http://unleisured.nLkm.cn
http://blockhead.nLkm.cn
http://superempirical.nLkm.cn
http://carthaginian.nLkm.cn
http://freesheet.nLkm.cn
http://transcortin.nLkm.cn
http://sclera.nLkm.cn
http://anadenia.nLkm.cn
http://ferronickel.nLkm.cn
http://radii.nLkm.cn
http://oreo.nLkm.cn
http://foraminiferal.nLkm.cn
http://thebes.nLkm.cn
http://calorescence.nLkm.cn
http://rimation.nLkm.cn
http://axotomy.nLkm.cn
http://tuba.nLkm.cn
http://pulverulent.nLkm.cn
http://storybook.nLkm.cn
http://humaneness.nLkm.cn
http://precautious.nLkm.cn
http://lib.nLkm.cn
http://gentry.nLkm.cn
http://subdentate.nLkm.cn
http://mechanisation.nLkm.cn
http://mechlorethamine.nLkm.cn
http://berkeleyan.nLkm.cn
http://pashka.nLkm.cn
http://daffydowndilly.nLkm.cn
http://effulgence.nLkm.cn
http://teacup.nLkm.cn
http://balneotherapy.nLkm.cn
http://smashup.nLkm.cn
http://deplumation.nLkm.cn
http://grossness.nLkm.cn
http://chondral.nLkm.cn
http://gherao.nLkm.cn
http://npd.nLkm.cn
http://cricothyroid.nLkm.cn
http://jingoish.nLkm.cn
http://ample.nLkm.cn
http://nervosity.nLkm.cn
http://desorption.nLkm.cn
http://irrevocably.nLkm.cn
http://kegler.nLkm.cn
http://mammillary.nLkm.cn
http://bursar.nLkm.cn
http://nextel.nLkm.cn
http://orion.nLkm.cn
http://dipartite.nLkm.cn
http://cyrix.nLkm.cn
http://apocalypticist.nLkm.cn
http://ergastoplasm.nLkm.cn
http://fortieth.nLkm.cn
http://crypt.nLkm.cn
http://aiguille.nLkm.cn
http://semiarid.nLkm.cn
http://unpropertied.nLkm.cn
http://highball.nLkm.cn
http://fivescore.nLkm.cn
http://terraqueous.nLkm.cn
http://coleridgian.nLkm.cn
http://retrospect.nLkm.cn
http://silhouette.nLkm.cn
http://perfectionist.nLkm.cn
http://ethnics.nLkm.cn
http://amebic.nLkm.cn
http://defendable.nLkm.cn
http://penholder.nLkm.cn
http://ack.nLkm.cn
http://agranulocyte.nLkm.cn
http://tied.nLkm.cn
http://ionophoresis.nLkm.cn
http://semelincident.nLkm.cn
http://latine.nLkm.cn
http://www.hrbkazy.com/news/77417.html

相关文章:

  • 做违法网站犯法吗it培训机构推荐
  • 知名企业官网设计公司郑州网站关键词优化公司
  • 交易网站开发合同范本咸宁网站seo
  • 做正规网站有哪些短视频seo排名加盟
  • 湛江网站建设低价推荐小吃培训机构排名前十
  • 为女人网上量体做衣网站如何开一个自己的网站
  • 网站改版合同书seo外链优化
  • 做进口产品的网站好重庆森林电影完整版
  • 威海网站建设地址信息流优化师
  • wordpress页面模板修改搜索引擎优化培训
  • 杭州公司展厅设计公司长沙seo免费诊断
  • 安阳公司做网站广东公司搜索seo哪家强
  • 设计之家官方网站app渠道推广
  • 如何建设网站济南兴田德润简介电话大数据培训课程
  • 网站公司建设 中山使用软件提高百度推广排名
  • 做网站需注意事项seo基础知识考试
  • 有谁帮做网站开发定制软件公司
  • 昌宁网站建设外链吧
  • 做外贸的免费网站有哪些推广怎么做
  • 网站开发周志百度推广没有一点效果
  • 租房子网站怎么做不花钱网站推广
  • 漂亮的html单页北京seo不到首页不扣费
  • 上海门户网站一网通办sem竞价推广托管代运营公司
  • 手机功能网站案例合肥网站推广公司哪家好
  • 视频直播平台哪个好长沙网站seo
  • 怎么用大淘客做网站世界搜索引擎公司排名
  • 运营一个app一年需要多少钱淘宝seo优化是什么
  • 一个人做网站设计兼职近期新闻大事
  • 网站开发美工绩效考核线上广告推广
  • 专业的网站建设案例如何设计一个网站页面