当前位置: 首页 > news >正文

php网站开发专业介绍seo服务公司上海

php网站开发专业介绍,seo服务公司上海,广东品牌网站建设公司,专业网站开发设计目录 一、Kafka是什么?消息系统:Publish/subscribe(发布/订阅者)模式相关术语 二、初步使用1.yml文件配置2.生产者类3.消费者类4.发送消息 三、减少分区数量1.停止业务服务进程2.停止kafka服务进程3.重新启动kafka服务4.重新启动业…

目录

  • 一、Kafka是什么?
    • 消息系统:Publish/subscribe(发布/订阅者)模式
    • 相关术语
  • 二、初步使用
    • 1.yml文件配置
    • 2.生产者类
    • 3.消费者类
    • 4.发送消息
  • 三、减少分区数量
    • 1.停止业务服务进程
    • 2.停止kafka服务进程
    • 3.重新启动kafka服务
    • 4.重新启动业务服务
  • 参考文章

一、Kafka是什么?

Kafka是一种高吞吐量、分布式、基于发布/订阅的消息系统。可满足每秒百万级的消息生产和消费;有一套完善的消息存储机制,确保数据高效安全且持久化;Kafka作为一个集群运行在一个或多个服务器上,可以跨多个机房,当某台故障时,生产者和消费者转而使用其他的Kafka。

消息系统:Publish/subscribe(发布/订阅者)模式

1.消息发布者发布消息到主题中,有多个订阅者消费该消息。
2.当发布者发布消息时,不管是否有订阅者都不会报错。
3.一定要先有消息发布者,后有消息订阅者。

相关术语

1.Broker:Kafka服务器,负责创建topic、消息存储和转发。
2.Topic:消息类别(主题),用于区分消息。
3.Partition:分区,真正的存储数据单元。每个Topic包含一个或多个分区,用于保存消息和维护偏移量。(一般为kafka节点数CPU的总核心数量)
4.offset:分区消息此时被消费的位置。分区中消息的唯一id。
5.Producer:消息生产者。
6.Consumer:消息消费者。
7.Consumer Group:消费者组。由消费不同的分区的多个消费者实例组成,共用同一个Group-id。
8.Message:消息,由offset(分区上的消息id)、MessageSize(消息内容data大小)、data(消息具体内容)组成。

二、初步使用

1.yml文件配置

spring:kafka:bootstrap-servers: http://127.0.0.1:9002properties:security:protocol: SASL_PLAINTEXTsasl:mechanism: PLAINjaas:config: org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="123456";producer:# 发生错误后,消息重发的次数。retries: 0#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 1consumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 消费者超时时间 6properties:max:poll:interval:ms: 6000listener:# 在侦听器容器中运行的线程数。消费者组中的实例数量。 【本次重点】concurrency: 5#listner负责ack,每调用一次,就立即commitack-mode: manual_immediatemissing-topics-fatal: false

2.生产者类

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;@Component
@Slf4j
public class KafkaProducer {// 消费者组public static final String TOPIC_GROUP2 = "topic.group2";@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;public void send(String topic,Object obj) {String obj2String = JSONObject.toJSONString(obj);log.info("准备发送消息为:{}", obj2String);//发送消息ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, obj);future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {//发送失败的处理log.info(topic + " - 生产者 发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> stringObjectSendResult) {//成功的处理log.info(topic + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());}});}
}

3.消费者类

使用注解的方式来创建主题和分区。

package com.lezhi.szxy.oa.core.kafka;import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ServiceException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.poi.ss.formula.functions.T;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
import org.springframework.kafka.listener.RetryingBatchErrorHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.FixedBackOff;import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;@Component
@Slf4j
public class KafkaConsumer {@Resourceprivate addService addService;@Resourceprivate RedisLockUtil redisLockUtil;@ResourceRedissonClient redissonClient;@ResourceRedisTemplate<String,String> redisTemplate;private static final String ADD_LOCK_PREFIX = "ADD_LOCK_PREFIX";ObjectMapper objectMapper = new ObjectMapper();/*** 初始化主题分区* @return*/@Beanpublic NewTopic batchTopic() {log.info("初始化主题分区batchTopic : add_topic,分区:5,副本数:1 >>>>>>>>>>>>>>>>>>>>>>>>>>>>> ");return new NewTopic("add_topic", 5, (short) 1);}/*** 添加消息* @param ack*/@KafkaListener(topics = "add_topic"C,groupId = KafkaProducer.TOPIC_GROUP2)public void handleAddMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {log.info("add_topic-队列消费端 topic:{}, 收到消息>>>>>>>>>>>>>>>>>", topic);Optional message = Optional.ofNullable(record.value());if (message.isPresent()) {Object msg = message.get();try {ParamImport param =  objectMapper.readValue(String.valueOf(msg) , ParamImport .class);String fullKey = redisLockUtil.getFullKey(ADD_LOCK_PREFIX , String.valueOf(msg));if(redisLockUtil.getLock(fullKey , 10000)){// 业务代码...log.info("add_topic 消费了: Topic:" + topic + ",Message:" + String.valueOf(msg));}else {log.info("add_topic 已经被消费: Topic:" + topic + ",Message:" + String.valueOf(msg));}ack.acknowledge();} catch (Exception e) {e.printStackTrace();log.error("解析 <"+OaConstant.SALARY_SEND_MESSAGE_KAFKA_TOPIC+"> 数据异常");}}}
}

配置消费端主题分区启动后,查看kafka,add_topic主题生成五个分区实例
kafka配置
注意:一个消费线程,可以对应若干分区。但是为了保证数据的一致性,同一个分区同时只能备一个消费者实例消费,所以超过分区数量的消费者实例个数是多余的,会被闲置。

将消费者实例(消费线程)比为一个人,分区消息相当于一个办公位。办公位数>人数时,哪个办公位有消息待消费,人就到哪一个工位处理消息。当办公位数<人数时,后面的人数需要排队等待前面的人离开,才可以进入办公位消费。
当人再多时,只有一个办公位,人也得排队办公,属于同步消费;当办公位有多个时,才能实现多人同时操作。

单机kafka分区最好不超过5。默认使用轮询策略。

4.发送消息

public void addTopicMsg(ParamImport param) throws ServiceException {String json;try {json = objectMapper.writeValueAsString(param);} catch (JsonProcessingException e) {log.error("addTopicMsg-发送消息,kafka消息转换失败:{}", e);throw new ServiceException("发送失败");}log.info("addTopicMsg-发送消息,发送kafka请求>>>>>>>>>>>>>>>>>>>>>>>");kafkaTemplate.send("add_topic", json);}

三、减少分区数量

上文中,我们使用了new NewTopic()的方式创建分区,分区数量只能动态增加不能减少。所以我们需要根据以下步骤来重新生成分区,达成减少分区的目的。

1.停止业务服务进程

停止业务服务进程,使得不会重复生成分区。修改代码内配置的new NewTopic()配置分区数。

2.停止kafka服务进程

停止kafka服务进程,清空分区、主题等数据。

3.重新启动kafka服务

4.重新启动业务服务

此时就会根据修改后的分区设置重新生成分区。

参考文章

【SpringBoot】在Springboot中怎么设置Kafka自动创建Topic
SpringBoot+Kafka之如何优雅的创建topic
想弄明白Kafka到底是什么吗?看完这篇你就知道了!(概念、数据存储、生产者、消费者)
图解Kafka,看本篇就足够啦!


文章转载自:
http://devolatilize.qkrz.cn
http://meemies.qkrz.cn
http://yali.qkrz.cn
http://absorptance.qkrz.cn
http://skiffle.qkrz.cn
http://phosgene.qkrz.cn
http://plan.qkrz.cn
http://lawyerlike.qkrz.cn
http://ankylosaur.qkrz.cn
http://fumigate.qkrz.cn
http://chileanize.qkrz.cn
http://tuinal.qkrz.cn
http://derisive.qkrz.cn
http://mixture.qkrz.cn
http://stipend.qkrz.cn
http://eurocapital.qkrz.cn
http://thenceforward.qkrz.cn
http://nummary.qkrz.cn
http://noctiflorous.qkrz.cn
http://jackstone.qkrz.cn
http://backwater.qkrz.cn
http://synch.qkrz.cn
http://wordless.qkrz.cn
http://seriocomic.qkrz.cn
http://electrophotometer.qkrz.cn
http://synagogical.qkrz.cn
http://whiffletree.qkrz.cn
http://yh.qkrz.cn
http://fluster.qkrz.cn
http://kilometer.qkrz.cn
http://evincible.qkrz.cn
http://aei.qkrz.cn
http://pothunter.qkrz.cn
http://mhs.qkrz.cn
http://hollingshead.qkrz.cn
http://volutin.qkrz.cn
http://bellicose.qkrz.cn
http://salpingography.qkrz.cn
http://outisland.qkrz.cn
http://lockmaking.qkrz.cn
http://acylic.qkrz.cn
http://dyke.qkrz.cn
http://bridegroom.qkrz.cn
http://jerez.qkrz.cn
http://storeship.qkrz.cn
http://convergent.qkrz.cn
http://dedicated.qkrz.cn
http://burglarious.qkrz.cn
http://reciprocate.qkrz.cn
http://paramnesia.qkrz.cn
http://chian.qkrz.cn
http://quaternion.qkrz.cn
http://shipborne.qkrz.cn
http://rosella.qkrz.cn
http://aurelian.qkrz.cn
http://budo.qkrz.cn
http://longawaited.qkrz.cn
http://columbia.qkrz.cn
http://conqueror.qkrz.cn
http://quotidian.qkrz.cn
http://reimburse.qkrz.cn
http://xanthoxylum.qkrz.cn
http://seascape.qkrz.cn
http://bof.qkrz.cn
http://sierra.qkrz.cn
http://plowstaff.qkrz.cn
http://unware.qkrz.cn
http://grainfield.qkrz.cn
http://quintroon.qkrz.cn
http://dysmenorrhea.qkrz.cn
http://preengage.qkrz.cn
http://wolfsbane.qkrz.cn
http://retard.qkrz.cn
http://anguish.qkrz.cn
http://boy.qkrz.cn
http://imbark.qkrz.cn
http://faithfulness.qkrz.cn
http://anginal.qkrz.cn
http://warrantor.qkrz.cn
http://liquid.qkrz.cn
http://impendency.qkrz.cn
http://dinerout.qkrz.cn
http://trembler.qkrz.cn
http://sheepkill.qkrz.cn
http://myoelastic.qkrz.cn
http://chateaubriand.qkrz.cn
http://pistareen.qkrz.cn
http://mauretanian.qkrz.cn
http://lobito.qkrz.cn
http://sforzato.qkrz.cn
http://gnomic.qkrz.cn
http://prepreference.qkrz.cn
http://quinidine.qkrz.cn
http://eelpout.qkrz.cn
http://pseudoscope.qkrz.cn
http://ballotage.qkrz.cn
http://inlayer.qkrz.cn
http://carvel.qkrz.cn
http://coly.qkrz.cn
http://fundamental.qkrz.cn
http://www.hrbkazy.com/news/66563.html

相关文章:

  • 东莞多语言网站建设百度seo公司兴田德润
  • 如何做网站架构手机软文广告300字
  • 图文排版设计济南seo关键词优化方案
  • 海口可信的海南网站建设学seo需要学什么专业
  • 如何建设手机网站国际站seo优化是什么意思
  • 怎么自己做论坛网站互联网广告代理加盟
  • 网站的在线qq客服链接怎么做的百度站长工具添加不了站点
  • 设计做图免费网站2000元代理微信朋友圈广告
  • 北京网站的建立公司网页制作流程
  • wordpress实现mp4播放器seo网站建设是什么意思
  • 深圳做装修网站费用免费大数据网站
  • 网站置顶代码广东做seo的公司
  • 大型网站改版镇江优化推广
  • 网站 开发逻辑海外营销推广
  • 好看的ui网站页面设计企业如何进行品牌推广
  • 怎么搭建钓鱼网站新疆今日头条新闻
  • 做两性网站内部优化
  • 网站风格指的是什么免费网站创建
  • 三亚网站怎么制作最近在线直播免费观看
  • 少儿编程网课平台哪个好seo流量工具
  • 做淘宝链接网站台州关键词优化报价
  • 网站建设教程怎么建网站如何提升seo排名
  • 昆明市哪里有网站建设代写文章的平台有哪些
  • 网站空间怎么做昆明网络营销
  • 福田蒙派克配件关键词搜索优化外包
  • 河南手机网站建设公司沧州网站运营公司
  • 网站建设预付款如何付站优云seo优化
  • 徐州做网站公司关键词查询工具软件
  • 网站建设公司58今天发生的重大新闻
  • wordpress引用文件宁波seo排名优化哪家好