当前位置: 首页 > news >正文

会议网站建设方案百度如何投放广告

会议网站建设方案,百度如何投放广告,深圳集团网站建设公司,全网营销代运营公司Kafka与ZooKeeper Apache ZooKeeper是一个基于观察者模式的分布式服务管理框架,即服务注册中心。同时ZooKeeper还具有存储数据的能力。Kafka的每台服务器作为一个broker注册到ZooKeeper,多个broker借助ZooKeeper形成了Kafka集群。同时ZooKeeper会保存一…

Kafka与ZooKeeper

Apache ZooKeeper是一个基于观察者模式的分布式服务管理框架,即服务注册中心。同时ZooKeeper还具有存储数据的能力。Kafka的每台服务器作为一个broker注册到ZooKeeper,多个broker借助ZooKeeper形成了Kafka集群。同时ZooKeeper会保存一些信息,例如消息消费的进度Offset等。另外ZooKeeper还负责Kafka集群中Leader的选举。

使用

下载

从官方下载最新版Kafka:

https://kafka.apache.org/downloads.html

当前最新版本为kafka_2.13-3.4.0。
下载后解压。注意解压路径不能有空格及其他特殊字符。
Kafka中默认自带了ZooKeeper,因此不需要单独下载ZooKeeper。
当然也可不使用Kafka自带的ZooKeeper,注意配置文件保持一致即可。

配置文件

进入Kafka根目录,新建一个 tmp 文件夹,然后在其下创建 zookeeperkafka-logs 文件夹,分别用于存放ZooKeeper的数据和Kafka的日志。
打开 /config/zookeeper.properties ,找到 dataDir=/tmp/zookeeper ,将其修改为新创建的 zookeeper 文件夹。然后在最后添加配置audit.enable=true
打开 /config/server.properties ,找到 log.dirs=/tmp/kafka-logs ,将其修改为新创建的 kafka-logs 文件夹。
注意路径必须使用/,不能使用\
若不修改上述目录,则会在磁盘的根目录下自动创建 zookeeperkafka-logs 文件夹。

配置的一致性

ZooKeeper默认的端口为2181,Kafka默认的端口为9092
若要更改Kafka的端口,则将listeners=PLAINTEXT://:9092配置开放,修改其端口值,同时要加上hostname,即改为:listeners=PLAINTEXT://localhost:9092
Kafka配置中默认指定zookeeper.connect=localhost:2181,若使用非默认配置,需修改该属性。若有多个ZooKeeper地址可使用,隔开。

运行和关闭

运行

Kafka的运行基于ZooKeeper,因此需要在ZooKeeper服务启动后再运行Kafka。
进入Kafka根目录。
首先要启动ZooKeeper。在根目录下打开cmd命令行,输入:

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

然后启动Kafka。在根目录下打开cmd命令行,输入:

.\bin\windows\kafka-server-start.bat .\config\server.properties

Kafka运行后会在根目录下生成 logs 文件夹。

命令行

进入Kafka根目录,打开cmd命令行。

topic

新建:

.\bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic1

查看topic列表:

.\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

查看单个topic详情:

.\bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic topic1

删除topic:

.\bin\windows\kafka-topics.bat --delete --bootstrap-server localhost:9092 -topic topic1

group

查看group列表:

.\bin\windows\kafka-consumer-groups.bat --list --bootstrap-server localhost:9092

查看group详情:

.\bin\windows\kafka-consumer-groups.bat --describe --bootstrap-server localhost:9092 --group test-consumer-group

ZooKeeper命令行

若运行的是Kafka自带的ZooKeeper,则要进入其命令行:

  1. 进入Kafka根目录,打开cmd命令行。
  2. 输入命令: .\bin\windows\zookeeper-shell.bat localhost:2181

注意地址和端口要正确,否则会提示JLine support is disabled
执行后会打印:

Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is disabledWATCHER::WatchedEvent state:SyncConnected type:None path:null

此时即可输入ZooKeeper命令:

# 查看服务
ls /# 查看0号broker
get /brokers/ids/0

关闭

关闭时,要先关闭Kafka,再关闭ZooKeeper。
关闭Kafka时不能直接关闭命令行窗口,这样可能会导致Kafka无法完成对日志文件的解锁,于是下次启动Kafka时就会因为日志文件被锁而无法启动成功。正确的关闭方式是运行 /bin/windows/kafka-server-stop.bat
关闭ZooKeeper时同样要运行 /bin/windows/zookeeper-server-stop.bat

暴力关闭导致异常

如果暴力关闭了Kafka,再次启动时遇到提示:

ERROR Fatal error during KafkaServer startup. Prepare to shutdown

此时删除根目录下的 logs 文件夹即可。

Spring Boot集成

首先要启动ZooKeeper和Kafka服务。

依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

配置文件

spring:kafka:bootstrap-servers: localhost:9092 #Kafka的地址producer: # 生产者key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: # 消费者key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: test-consumer-group #在Kafka的/config/consumer.properties中查看和修改

以上为最基础的配置。可以根据具体需求进行更加详细的配置。

生产者和消费者

对于Kafka,必须设置生产者和消费者。

  • 生产者: 生产者负责将消息发送给Kafka。有异步和同步两种方式。
  • 消费者: 消费者使用监听的方式接收Kafka的消息。当存在消息时会被及时消费。

生产者调用命令push来发送数据,而消费者调用命令pull来拉取数据。注意消费者的数据不是Kafka主动推送的。

生产者

为了方便测试,定义一个Controller来接收触发信号并将信息发送给Kafka。

@RequestMapping("/producer")
@RestController
public class ProducerController {@AutowiredKafkaTemplate<String, String> kafkaTemplate;// 异步发送@RequestMapping("/register")public String register(User user) {String message = JSON.toJSONString(user);System.out.println("接收到用户信息:" + message);kafkaTemplate.send("register", message);return "OK";}// 同步发送@RequestMapping("/register/sync")public String registerSync(User user) throws Exception {String message = JSON.toJSONString(user);System.out.println("接收到用户信息:" + message);ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("register", message);// 设置等待时间,超出后,不再等待返回SendResult<String, String> result = future.get(3, TimeUnit.SECONDS);return result.getProducerRecord().value();}
}

使用 KafkaTemplate<K, V> 来发送消息。其中send()方法第一个参数指定了topics的名称。
若要使用同步发送,可对kafkaTemplate.send()的返回结果调用get()方法,并在其中设置等待时间。
对于异步发送,发送后会立即收到返回结果;对于同步发送,会一直等待发送结果并返回,直到设置的等待时间耗尽。
对于异步发送,若想得知发送的最终结果,则需要注册一个监听器KafkaTemplate.setProducerListener()来等待回调:

@Configuration
public class KafkaListener {private final static Logger logger = LoggerFactory.getLogger(KafkaListener.class);@AutowiredKafkaTemplate kafkaTemplate;// 配置监听@PostConstructprivate void listener() {kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {@Overridepublic void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {logger.info("ok,message={}", producerRecord.value());}@Overridepublic void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {logger.error("error!message={}", producerRecord.value());}});}
}

消费者

对于消费者,需设置一个监听器来监听指定的topics:

@Component
public class Consumer {@KafkaListener(topics = "register")public void consume(String message) {System.out.println("收到消息:" + message);User user = JSON.parseObject(message, User.class);System.out.println("为 " + user.getName() + " 进行账号注册");}
}

当Kafka收到消息后,监听者的回调方法被触发。

指定分区

Producer发送时指定分区和key值:

// 发送时指定0号分区,key为test
@RequestMapping("/register")
public String register(User user) {String message = JSON.toJSONString(user);System.out.println("收到用户信息:" + message);kafkaTemplate.send("register", 0, "test", message);return "OK";
}

Consumer接收时指定分区:

@KafkaListener(topics = {"register"}, topicPattern = "0")
public void onMessage(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {String message = (String)optional.get();User user = JSON.parseObject(message, User.class);System.out.println("为 " + user.getName() + " 进行账号注册");}
}

编码器和解码器

在yml中配置了编码器和解码器:

spring:kafka:...producer: # 生产者key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: # 消费者key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer...

其中:

  • key-serializer: 生产者key的编码器。
  • value-serializer: 生产者value的编码器。
  • key-deserializer: 消费者key的解码器。
  • value-deserializer: 消费者value的解码器。

编码器和解码器统称序列化,其作用是生产者将消息编码为字节流发送,消费者接收到字节流再进行解码
这里使用的是Kafka提供的字符串编码器StringSerializer和字符串解码器StringDeserializer
Kafka提供了多种编码器,包含: StringSerializerJsonSerializerBytesSerializerIntegerSerializerLongSerializerListSerializerStringOrBytesSerializer等等。对应地也提供了多种解码器,包含StringDeserializerJsonDeserializerBytesDeserializerIntegerDeserializerLongDeserializerListDeserializerStringOrBytesDeserializer等等。
Kafka默认提供的序列化类可满足绝大多数场景。用户也可自定义编码器和解码器。

编码器

编码器需要从Serializer类派生,并实现serialize()方法。
以下为StringSerializer源码:

package org.apache.kafka.common.serialization;public class StringSerializer implements Serializer<String> {private String encoding = StandardCharsets.UTF_8.name();@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";Object encodingValue = configs.get(propertyName);if (encodingValue == null)encodingValue = configs.get("serializer.encoding");if (encodingValue instanceof String)encoding = (String) encodingValue;}@Overridepublic byte[] serialize(String topic, String data) {try {if (data == null)return null;elsereturn data.getBytes(encoding);} catch (UnsupportedEncodingException e) {throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);}}
}

解码器

编码器需要从Deserializer类派生,并实现deserialize()方法。
以下为StringDeserializer源码:

package org.apache.kafka.common.serialization;public class StringDeserializer implements Deserializer<String> {private String encoding = StandardCharsets.UTF_8.name();@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";Object encodingValue = configs.get(propertyName);if (encodingValue == null)encodingValue = configs.get("deserializer.encoding");if (encodingValue instanceof String)encoding = (String) encodingValue;}@Overridepublic String deserialize(String topic, byte[] data) {try {if (data == null)return null;elsereturn new String(data, encoding);} catch (UnsupportedEncodingException e) {throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);}}
}

自定义编码器和解码器

现在希望,生产者和消费者以Object类型来处理消息:

// 编码器类
public class TestSerializer implements Serializer {@Overridepublic byte[] serialize(String topic, Object data) {String json = JSON.toJSONString(data);return json.getBytes();}
}// 解码器类
public class TestDeserializer implements Deserializer {@Overridepublic Object deserialize(String topic, byte[] data) {try {String json = new String(data,"utf-8");return JSON.parse(json);} catch (UnsupportedEncodingException e) {e.printStackTrace();}return null;}}

然后在yml中为value配置自定义的编码器和解码器:

spring:kafka:...producer: # 生产者key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: com.example.test.component.TestSerializerconsumer: # 消费者key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: com.example.test.component.TestDeserializer...

这样就可以使用自定义解码器了。

topic内分区

每一个topic实际是分成多个区(partition)的,这些topic都存储在Kafka内。无论是生产者将消息发送到Kafka还是消费者从Kafka中获取消息,都需要明确告知Kafka分区信息是什么。
生产者发送消息时,消息要发往哪个分区是根据条件来确定的:

  • 若指定分区号,则消息直接发到Kafka的指定分区。
  • 若未指定分区号,但给定了数据key值,则消息可对key值取Hashcode,自动计算分区。
  • 若未指定分区号,且未给定数据key值,则直接轮循分区。(默认方案)
  • 自定义分区策略。

使用指定分区号及Key值的方式,只需要在调用KafkaTemplate.send()时传入对应的参数即可。

自定义分区策略

自定义分区策略,即生产者在将消息发给Kafka前,先经过自定义的分区器进行分区计算,计算出目标分区后再发给Kafka。故而自定义分区器应添加在生产者工程中。
使用自定义分区的流程为:

  1. Partitioner派生,重载其partition()方法。在这里自定义分区逻辑。
  2. 添加一个@Configuration类,在其中更新KafkaTemplate的属性,将自定义的分区器设置进来。
  3. 正常发送消息。

自定义分区器:

public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 自定义分区策略。若key以0开头,则放入分区0;其他放入分区1String keyStr = String.valueOf(key);return keyStr.startsWith("0") ? 0 : 1;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

添加一个配置类来更新KafkaTemplate的属性:

@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;KafkaTemplate kafkaTemplate;@PostConstructpublic void setKafkaTemplate() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 将自定义的分区器设置进来props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);this.kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));}public KafkaTemplate getKafkaTemplate(){return kafkaTemplate;}
}

然后正常发送消息即可。

SpringBoot手动提交offset

offset默认是自动计算自动提交的。若要对offset进行手动提交,流程为:

  1. 修改配置,关闭自动提交。
  2. consumer在处理结束时手动提交。

在application.yml中配置了offset自动提交:

enable-auto-commit: true # 是否自动提交offset
auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset,默认单位为ms)

现在添加一个配置类来覆盖yml中的配置:

@Configuration
public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 关闭自动提交configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));/*** AckMode 在ENABLE_AUTO_COMMIT_CONFIG = false时生效。所有取值为:*      RECORD: 每处理一条commit一次*      BATCH(默认): 每次poll的时候批量提交一次,频率取决于每次poll的调用频率*      TIME: 每次间隔ackTime的时间去commit*      COUNT: 累积达到ackCount次的ack去commit*      COUNT_TIME: ackTime或ackCount哪个条件先满足,就commit*      MANUAL: listener负责ack,但是背后也是批量上去*      MANUAL_IMMEDIATE: listner负责ack,每调用一次,就立即commit*/factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}
}

然后在consumer的监听方法中调用Consumer.commitSync()来手动提交offset:

/**
* 消费者1
*/
@KafkaListener(topics = {"register"})
public void consumer1(@Payload String message, ConsumerRecord record, Consumer consumer) {System.out.println("consumer1收到消息并消费:" + message);// 消息消费结束,同步提交偏移量,默认为offset+1consumer.commitSync();System.out.println("consumer1提交位移");}/**
* 消费者2
*/
@KafkaListener(topics = {"register"})
public void consumer2(@Payload String message, ConsumerRecord record, Consumer consumer) {System.out.println("consumer2收到消息并消费:" + message);// 消息消费结束,同步提交偏移量,手动更改为offset+2Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();currentOffset.put(new org.apache.kafka.common.TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 2));consumer.commitSync(currentOffset);System.out.println("consumer2提交位移");}

注意消费者2中手动更改偏移量使用的类为org.apache.kafka.common.TopicPartition。该类这里直接手动引入而没有放在import中,是因为该类与注解@TopicPartition的类org.springframework.kafka.annotation.TopicPartition同名。如果放在import中则会造成冲突。
上述消费者2为了说明手动更改偏移量的操作而令offset+2,这样会导致偏移量与实际不符,实际开发应按实际情况处理。
如果关闭了偏移量自动提交,且在消费者逻辑中没有提交偏移量,则会导致偏移量始终不变,于是每次消费者拉取的消息都是同一条,从而造成消息重复消费。


文章转载自:
http://voracious.rnds.cn
http://mercurous.rnds.cn
http://submatrix.rnds.cn
http://illumination.rnds.cn
http://lignin.rnds.cn
http://imposing.rnds.cn
http://zoologize.rnds.cn
http://opendoc.rnds.cn
http://breathlessly.rnds.cn
http://drudgery.rnds.cn
http://isomerism.rnds.cn
http://weftwise.rnds.cn
http://behtlehem.rnds.cn
http://activize.rnds.cn
http://astigmatometry.rnds.cn
http://tuesdays.rnds.cn
http://principia.rnds.cn
http://sucking.rnds.cn
http://misspell.rnds.cn
http://transcurrence.rnds.cn
http://finsbury.rnds.cn
http://maoist.rnds.cn
http://magnetic.rnds.cn
http://freeminded.rnds.cn
http://yaleman.rnds.cn
http://tranquillo.rnds.cn
http://corndog.rnds.cn
http://convertibility.rnds.cn
http://scranton.rnds.cn
http://spencerian.rnds.cn
http://tonto.rnds.cn
http://oysterage.rnds.cn
http://acari.rnds.cn
http://curcuma.rnds.cn
http://myna.rnds.cn
http://foretold.rnds.cn
http://operatise.rnds.cn
http://deoxygenate.rnds.cn
http://bloated.rnds.cn
http://embden.rnds.cn
http://sector.rnds.cn
http://adagio.rnds.cn
http://battlefront.rnds.cn
http://retrovert.rnds.cn
http://leavening.rnds.cn
http://meet.rnds.cn
http://excuse.rnds.cn
http://infirmatory.rnds.cn
http://paraphrasis.rnds.cn
http://serpentinize.rnds.cn
http://surrogateship.rnds.cn
http://mediatress.rnds.cn
http://umbrage.rnds.cn
http://modillion.rnds.cn
http://overwinter.rnds.cn
http://demonstrability.rnds.cn
http://intercept.rnds.cn
http://underbreath.rnds.cn
http://hydrated.rnds.cn
http://incorruptible.rnds.cn
http://resentful.rnds.cn
http://shammos.rnds.cn
http://reprehension.rnds.cn
http://phytochemical.rnds.cn
http://iwis.rnds.cn
http://messuage.rnds.cn
http://greek.rnds.cn
http://chasseur.rnds.cn
http://fobs.rnds.cn
http://synesthesea.rnds.cn
http://clomp.rnds.cn
http://cradleland.rnds.cn
http://epitaph.rnds.cn
http://glossematic.rnds.cn
http://divorcement.rnds.cn
http://joro.rnds.cn
http://monocracy.rnds.cn
http://enteropathy.rnds.cn
http://didynamous.rnds.cn
http://verbicidal.rnds.cn
http://drunk.rnds.cn
http://sideling.rnds.cn
http://inefficient.rnds.cn
http://demoticist.rnds.cn
http://pinocchio.rnds.cn
http://polemicize.rnds.cn
http://wainrope.rnds.cn
http://bombast.rnds.cn
http://commence.rnds.cn
http://royalties.rnds.cn
http://cetological.rnds.cn
http://column.rnds.cn
http://superego.rnds.cn
http://logicise.rnds.cn
http://vedanta.rnds.cn
http://leadbelly.rnds.cn
http://ftc.rnds.cn
http://tern.rnds.cn
http://pickproof.rnds.cn
http://asterid.rnds.cn
http://www.hrbkazy.com/news/83735.html

相关文章:

  • 定制制作网站哪家好网站快速收录教程
  • 做网站工作室名字西安网站seo价格
  • 团风做网站营销战略包括哪些方面
  • 延边州网站建设品牌营销策略有哪些方法
  • 百度站长提交网址网络推广的重要性与好处
  • 品牌网站建设磐石网络优等百度网盘网页版登录入口官网
  • 小榄做网站简述seo的基本步骤
  • 打开网站后直接做跳转页面灰色行业推广平台网站
  • 广元网站建设价格竞价托管运营哪家好
  • 湖北网站推广公司技巧seo和sem哪个工资高
  • 如何设计一个漂亮的网站互联网营销师报名官网
  • 网站建设服务合同 律师南昌seo优化公司
  • 国外做的好的鲜花网站泸州网站优化推广
  • 佛山做pc端网站seo外链推广工具下载
  • 买做指甲的材料在哪个网站微信小程序开发平台官网
  • 深度网网站建设方案刷网站seo排名软件
  • 婚纱网站设计如何提升网站搜索排名
  • 青海政府网站建设公司软文新闻发稿平台
  • 沈阳奇搜建站百度竞价优缺点
  • 程序员给别人做的网站违法了安徽百度推广怎么做
  • 南山商城网站建设哪家技术好网页平台做个业务推广
  • 企业网站的形式推广营销软件app
  • icp备案域名网站备案信息网络销售适合什么人做
  • 党的建设 杂志官方网站网络销售模式有哪些
  • 定制开发网站如何报价单黑帽seo培训大神
  • vps 网站打不开恶意点击竞价是用的什么软件
  • 怎样做农村电商网站竞价推广是做什么的
  • 商务网站建设多少钱短视频培训机构
  • 在百度怎么做网站百度风云榜各年度小说排行榜
  • 政府部门网站建设要求西安网站设计公司