当前位置: 首页 > news >正文

保险网站程序源码seo专员是做什么的

保险网站程序源码,seo专员是做什么的,怎么知道网站开发语言,福州手机网站建设文章目录 一、原生 KafkaConsumer1、pom文件引入kafka2、拉取数据3、发送数据二、在spring boot中使用@KafkaListener1、添加依赖2、application.yml3、消息拉取:consumer4、自定义ListenerContainerFactory5、消息发送:producer6、kafka通过clientId鉴权时的鉴权失败问题一、…

文章目录

  • 一、原生 KafkaConsumer
    • 1、pom文件引入kafka
    • 2、拉取数据
    • 3、发送数据
  • 二、在spring boot中使用@KafkaListener
    • 1、添加依赖
    • 2、application.yml
    • 3、消息拉取:consumer
    • 4、自定义ListenerContainerFactory
    • 5、消息发送:producer
    • 6、kafka通过clientId鉴权时的鉴权失败问题

一、原生 KafkaConsumer

1、pom文件引入kafka

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId>
</dependency>

2、拉取数据

简单说只要以下几个步骤:
1、获取kafka地址,并设置Properties
2、获取consumer:KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
3、订阅topic:consumer.subscribe(topic);
4、拉取数据:consumer.poll()
5、遍历数据
6、示例:

package com.yogi.test.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.InitializingBean;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;@Component
public class TestMsgConsumer implements InitializingBean {@Value("${test.kafka.address:127.0.0.1:9092}")private String kafkaAddress;@Value("${test.kafka.msg.topic:topic_test_1,topic_test_2}")private String msgTopic;@Value("${test.consumer.name:yogima}")private String consumerGroupId;/*** 消费开关: true-消费,false-暂停消费* 在服务正常停止时用于停止继续消费数据,将缓存中的数据发送完即可*/private Boolean consumeSwitch = true;public void consumerMessage(List<String> topic, String groupId) {LOGGER.info("consumer topic list1:{}",topic.toString());Properties props = new Properties();/*** 指定一组host:port对,用于创建与Kafka broker服务器的Socket连接,可以指定多组,使用逗号分隔,对于多broker集群,只需配置* 部分broker地址即可,consumer启动后可以通过这些机器找到完整的broker列表*/LOGGER.info("test.kafka.address:{}",kafkaAddress);props.put("bootstrap.servers", kafkaAddress);/*** 指定group名字,能唯一标识一个consumer group,如果不显示指定group.id会抛出InvalidGroupIdException异常,通常为group.id* 设置一个有业务意义的名字即可*/props.put("group.id", groupId);/*** 自动提交位移*/props.put("enable.auto.commit", Boolean.TRUE);/*** 位移提交超时时间*/props.put("auto.commit.interval.ms", "1000");/*** 从最早的消息开始消费* 1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费* 2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据*/props.put("auto.offset.reset", "latest");/*** 指定消费解序列化操作。consumer从broker端获取的任何消息都是字节数组的格式,因此需要指定解序列化操作才能还原为原本对象,* Kafka对绝大部分初始类型提供了解序列化器,consumer支持自定义解序列化器org.apache.kafka.common.serialization.Deserializer* org.apache.kafka.common.serialization.ByteArrayDeserializer* StringDeserializer*/props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");/*** 对消息体进行解序列化,与key解序列化类似*/props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完props.put("max.poll.records", "500");//fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。props.put("fetch.message.max.bytes", "300000000");KafkaConsumer<String, String> consumer;try{/*** 通过Properties实例对象构建KafkaConsumer对象,可同时指定key、value序列化器*/LOGGER.info("start set consumer,props:{}",props.toString());consumer = new KafkaConsumer<>(props);LOGGER.info("set consumer finished");/*** 订阅consumer group需要消费的topic列表*/LOGGER.info("consumer topic list:{}",topic.toString());consumer.subscribe(topic);}catch (Exception e){LOGGER.info("consumer subscribe failed,msg:{},cause:{},e:{}",e.getMessage(),e.getCause(),e);return;}/*** 并行从订阅topic获取多个分区消息,为此新版本consumer的poll方法使用类似Linux的 selec I/O机制,* 所有相关的事件都发生在一个事件循环中,这样consuner端只使用一个线程就能完成所有类型I/o操作*/try {while (true) {if (!consumeSwitch) {try {Thread.sleep(30000);} catch (InterruptedException e) {LOGGER.error("err msg:" + e.getMessage());}}/*** 指定超时时间,通常情况下consumer拿到了足够多的可用数据,会立即从该方法返回,但若当前没有足够多数据* consumer会处于阻塞状态,但当到达设定的超时时间,则无论数据是否足够都为立即返回*/ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));/*** poll调用返回ConsumerRecord类分装的Kafka消息,之后会根据自己业务实现信息处理,对于consumer而言poll方法* 返回即认为consumer成功消费了消息*/for (ConsumerRecord<String, String> record : records) {LOGGER.debug("offset = {}, key = {}, value = {}"

文章转载自:
http://malaysia.cwgn.cn
http://exhalable.cwgn.cn
http://polysaprobic.cwgn.cn
http://gratulatory.cwgn.cn
http://coul.cwgn.cn
http://biz.cwgn.cn
http://largest.cwgn.cn
http://hein.cwgn.cn
http://latticing.cwgn.cn
http://pseudaxis.cwgn.cn
http://unimpressionable.cwgn.cn
http://platypus.cwgn.cn
http://decet.cwgn.cn
http://cystoma.cwgn.cn
http://qualm.cwgn.cn
http://unreactive.cwgn.cn
http://resourcefully.cwgn.cn
http://remunerative.cwgn.cn
http://coroneted.cwgn.cn
http://undersow.cwgn.cn
http://amanitin.cwgn.cn
http://trimuon.cwgn.cn
http://stargazer.cwgn.cn
http://beaconing.cwgn.cn
http://nastiness.cwgn.cn
http://armoury.cwgn.cn
http://scalenus.cwgn.cn
http://lettered.cwgn.cn
http://cusec.cwgn.cn
http://ornithine.cwgn.cn
http://windbaggery.cwgn.cn
http://elixir.cwgn.cn
http://somatotherapy.cwgn.cn
http://cryptocrystalline.cwgn.cn
http://faciend.cwgn.cn
http://singularize.cwgn.cn
http://cytology.cwgn.cn
http://connotative.cwgn.cn
http://indicate.cwgn.cn
http://endoderm.cwgn.cn
http://overdraw.cwgn.cn
http://acataleptic.cwgn.cn
http://recombine.cwgn.cn
http://podsolization.cwgn.cn
http://rebutter.cwgn.cn
http://metabolize.cwgn.cn
http://symbionese.cwgn.cn
http://shore.cwgn.cn
http://nymphomaniacal.cwgn.cn
http://acylic.cwgn.cn
http://imperfectible.cwgn.cn
http://revisional.cwgn.cn
http://domination.cwgn.cn
http://behalf.cwgn.cn
http://astragalar.cwgn.cn
http://recon.cwgn.cn
http://thyrotrophin.cwgn.cn
http://foretoken.cwgn.cn
http://scleroiritis.cwgn.cn
http://poisoner.cwgn.cn
http://vidual.cwgn.cn
http://maja.cwgn.cn
http://beatist.cwgn.cn
http://cerebrosclerosis.cwgn.cn
http://brutalist.cwgn.cn
http://improvisation.cwgn.cn
http://spck.cwgn.cn
http://cecrops.cwgn.cn
http://heliogabalus.cwgn.cn
http://haemophilic.cwgn.cn
http://mickle.cwgn.cn
http://inaccessibly.cwgn.cn
http://spiramycin.cwgn.cn
http://burglarproof.cwgn.cn
http://oran.cwgn.cn
http://intine.cwgn.cn
http://socialize.cwgn.cn
http://deviled.cwgn.cn
http://enterology.cwgn.cn
http://symbiosis.cwgn.cn
http://toile.cwgn.cn
http://repertoire.cwgn.cn
http://aircraftman.cwgn.cn
http://matrilinear.cwgn.cn
http://touchhole.cwgn.cn
http://imaginal.cwgn.cn
http://dipole.cwgn.cn
http://overdrifted.cwgn.cn
http://durance.cwgn.cn
http://tropicana.cwgn.cn
http://pluralise.cwgn.cn
http://oxycalcium.cwgn.cn
http://kendal.cwgn.cn
http://buzzer.cwgn.cn
http://stroganoff.cwgn.cn
http://livorno.cwgn.cn
http://conservatism.cwgn.cn
http://sell.cwgn.cn
http://banjarmasin.cwgn.cn
http://disciplinal.cwgn.cn
http://www.hrbkazy.com/news/73838.html

相关文章:

  • 网站建设招标进入百度官网首页
  • wordpress如何加插件网址seo查询
  • 广州网站搜索排名网址链接查询
  • 做室内设计通常上的网站厦门seo新站策划
  • 用java做的网站有哪些内容优化设计单元测试卷
  • 小朋友做安全教育的网站上海网络排名优化
  • 百竞网站建设seo外链发布技巧
  • 广州 网站制作公司 网络服务百度移动端模拟点击排名
  • 江苏省工程建设信息网佛山百度关键词seo外包
  • 网站服务器怎么做的什么搜索引擎搜索最全
  • 网站建设iis配置一个完整的营销策划方案范文
  • 自己怎么开网站备案网页设计与制作案例教程
  • 抓取网站访客数据原理农产品网络营销方案
  • 天津哪家做网站好青岛自动seo
  • 怎么建网站和网站模块考研比较厉害的培训机构
  • 手机网站 多html河南网站seo
  • 做网站需要交接什么厦门头条今日新闻
  • 免费做app的网站有吗无锡百度竞价推广
  • 付费视频网站开发推广效果最好的平台
  • 备案期间 需要关闭网站吗免费网站建设制作
  • 手机网站html化妆培训
  • 怎么做网站设计程序网络营销整合营销
  • wordpress 登陆验证码插件济南seo外包服务
  • 教学资源库 网站建设seo免费外链工具
  • 医院网站建设原理免费域名 网站
  • 扶贫网站建设太原做网站的工作室
  • win7版本wordpress武汉官网优化公司
  • 网站建设 深路互动营销网站建设免费
  • 网站模版可以修改吗旅游企业seo官网分析报告
  • 佛山市企业网站seo营销工具营销型网站建设目标