当前位置: 首页 > news >正文

创建网站主题在哪里seo黑帽有哪些技术

创建网站主题在哪里,seo黑帽有哪些技术,广告设计图案,在线生成短链接网址从本篇开始 打算用三篇文章 分别介绍下Producer生产消费,Consumer消费消息 以及Spring是如何集成Kafka 三部分,致于对于Broker的源码解析,因为是scala语言写的,暂时不打算进行学习分享。 总体介绍 clients : 保存的是Kafka客户端…

从本篇开始 打算用三篇文章 分别介绍下Producer生产消费,Consumer消费消息 以及Spring是如何集成Kafka 三部分,致于对于Broker的源码解析,因为是scala语言写的,暂时不打算进行学习分享。

总体介绍

在这里插入图片描述

  • clients : 保存的是Kafka客户端代码,主要就是生产者和消费者代码
  • config:保存Kafka的配置文件,其中比较重要的配置文件是server.properties。
  • connect目录:保存Connect组件的源代码。我在开篇词里提到过,Kafka Connect组件是用来实现Kafka与外部系统之间的实时数据传输的。
  • core目录:保存Broker端代码。Kafka服务器端代码全部保存在该目录下。

而一条消息的整体流转过程其实就是经过三部分,也就是Producer\Broker\Consumer。
因为是对主要核心流程的分析,所以只会截核心代码。具体后面细节,在说。
在这里插入图片描述

producer整体流程

对于Producer来说,其实就是几部分。

  • 初始化、发送流程、缓冲区

初始化流程

设置分区器

// 设置分区器this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG,Partitioner.class,Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));

设置重试时间,默认100ms,如果配置Kafka可以重试,retries制定重试次数,retryBackoffMs指定重试的间隔

 long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);

获取Key和Value的序列化器

      // 序列化器if (keySerializer == null) {this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,Serializer.class);this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);} else {config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);this.keySerializer = keySerializer;}if (valueSerializer == null) {this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,Serializer.class);this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);} else {config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);this.valueSerializer = valueSerializer;}

拦截器

    // 设置拦截器List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptor.class,Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));if (interceptors != null)this.interceptors = interceptors;elsethis.interceptors = new ProducerInterceptors<>(interceptorList);

其他参数

	   // 设置最大消息为多大,默认是1M 默认是16384this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);// 设置缓存大小 默认是32M 默认是33554432 RecordAccumulator=32MBthis.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);// 设置压缩类型 可以提升性能this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));this.accumulator = new RecordAccumulator(logContext,
       // 因为是通过缓冲区发送消息的,所以需要消息累计器RecordAccumulator.PartitionerConfig partitionerConfig = new RecordAccumulator.PartitionerConfig(enableAdaptivePartitioning,config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG));

初始化元数据

 // 初始化集群元数据if (metadata != null) {this.metadata = metadata;} else {this.metadata = new ProducerMetadata(retryBackoffMs,config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),logContext,clusterResourceListeners,Time.SYSTEM);this.metadata.bootstrap(addresses);}

创建Sender线程,其中包含一个重要的网络组件NetWorkClient

	// 创建sender线程this.sender = newSender(logContext, kafkaClient, this.metadata);// 线程nameString ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;// 封装起来 设置为守护线程 并启动this.ioThread = new KafkaThread(ioThreadName, this.sender, true);// 线程启动this.ioThread.start();

发送消息流程

发送消息的过程

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// 执行拦截器逻辑ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);return doSend(interceptedRecord, callback);}

先执行拦截器,可以发现就是遍历拦截器,然后执行对应的onSend()方法。当我们想增加一个拦截器,直接实现对应的接口,重写onSend()方法,然后Kafka就会调用我们的onSend方法。通过提供一个拓展点进行使用。

    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {ProducerRecord<K, V> interceptRecord = record;for (ProducerInterceptor<K, V> interceptor : this.interceptors) {try {interceptRecord = interceptor.onSend(interceptRecord);} catch (Exception e) {}}return interceptRecord;}

从Kafka Broker集群获取元数据metadata

// 从broker获取元数据clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);

对key和value进行序列化,调用对应的serialize的方法。

	  byte[] serializedKey;try {// 选择对应的序列化进行操作serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException cce) {}byte[] serializedValue;try {serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException cce) {}
 	// 选择具体的分区int partition = partition(record, serializedKey, serializedValue, cluster);// 消息缓存到RecoredAccumulatorresult = accumulator.append(record.topic(), partition, timestamp, serializedKey,serializedValue, headers, appendCallbacks, remainingWaitMs, false, nowMs, cluster);// 消息发送的条件// 缓冲区数据大小达到batch.size 或者linnger.ms达到上限后 唤醒sneder线程。if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), appendCallbacks.getPartition());this.sender.wakeup();}

Sender线程

	runOnce();long pollTimeout = sendProducerData(currentTimeMs);

在这里插入图片描述

在这里插入图片描述
缓冲区、

在这里插入图片描述

这篇讲解很详细 https://www.cnblogs.com/rwxwsblog/p/14754810.html

生产者核心参数配置

bootstrap.servers:连接Broker配置,一般就是xxxx:9092

key.serializer 和 value.serializer:对key和value进行序列化器,可以自定义,一般就是String方式

buffer.memory:RecordAccumulator 缓冲区总大小,默认32m。

batch.size: 消息会以batch的方式进行发送,这是一批数据的大小 默认是16K

linger.ms:发送消息的时机,如果没有达到batch.size or linger.ms的时间就会发送 默认是0ms 立即发送

acks: 0: 不落盘 1:只有leader落盘 -1(all) : leader和所有从节点持久化成功 默认是-1

max.in.flight.requests.per.connection:允许最多没有返回 ack 的次数,默认为 5

retries: 消息发送失败时,系统重发消息 默认值 2147483647

retry.backoff.ms:两次重试间隔 默认是100ms

enable.idempotence: 开启幂等性 默认true

compression.type: 压缩格式 默认是none


文章转载自:
http://una.rdgb.cn
http://merlin.rdgb.cn
http://anthocyanin.rdgb.cn
http://rhythmical.rdgb.cn
http://diffused.rdgb.cn
http://santak.rdgb.cn
http://hope.rdgb.cn
http://subovate.rdgb.cn
http://jbs.rdgb.cn
http://exclusion.rdgb.cn
http://zolaist.rdgb.cn
http://nonferrous.rdgb.cn
http://arabism.rdgb.cn
http://cannonry.rdgb.cn
http://uddered.rdgb.cn
http://echinococcus.rdgb.cn
http://avisandum.rdgb.cn
http://pattern.rdgb.cn
http://greenboard.rdgb.cn
http://causalgic.rdgb.cn
http://tertio.rdgb.cn
http://ata.rdgb.cn
http://oncer.rdgb.cn
http://cockup.rdgb.cn
http://schistosomiasis.rdgb.cn
http://weak.rdgb.cn
http://dichondra.rdgb.cn
http://silicify.rdgb.cn
http://woolsack.rdgb.cn
http://euglenid.rdgb.cn
http://misogyny.rdgb.cn
http://forb.rdgb.cn
http://reduplicate.rdgb.cn
http://unction.rdgb.cn
http://crossopterygian.rdgb.cn
http://woomera.rdgb.cn
http://surpassing.rdgb.cn
http://flyable.rdgb.cn
http://gambir.rdgb.cn
http://practicability.rdgb.cn
http://bidonville.rdgb.cn
http://fatalism.rdgb.cn
http://overstatement.rdgb.cn
http://spite.rdgb.cn
http://lymphangioma.rdgb.cn
http://appellate.rdgb.cn
http://cypriot.rdgb.cn
http://practicing.rdgb.cn
http://squirish.rdgb.cn
http://horus.rdgb.cn
http://tanglement.rdgb.cn
http://nub.rdgb.cn
http://cub.rdgb.cn
http://millpond.rdgb.cn
http://andamanese.rdgb.cn
http://chickenshit.rdgb.cn
http://metritis.rdgb.cn
http://poolroom.rdgb.cn
http://fluent.rdgb.cn
http://accordant.rdgb.cn
http://susannah.rdgb.cn
http://meteorology.rdgb.cn
http://saigonese.rdgb.cn
http://remembrance.rdgb.cn
http://geromorphism.rdgb.cn
http://widgie.rdgb.cn
http://anadama.rdgb.cn
http://goldsmith.rdgb.cn
http://pump.rdgb.cn
http://lofty.rdgb.cn
http://enunciative.rdgb.cn
http://philomena.rdgb.cn
http://workfare.rdgb.cn
http://myeloblast.rdgb.cn
http://pneumatolysis.rdgb.cn
http://thymocyte.rdgb.cn
http://hungarian.rdgb.cn
http://arrenotokous.rdgb.cn
http://marked.rdgb.cn
http://coquettish.rdgb.cn
http://electromusic.rdgb.cn
http://solecism.rdgb.cn
http://acouphone.rdgb.cn
http://latteen.rdgb.cn
http://polylith.rdgb.cn
http://maoist.rdgb.cn
http://bey.rdgb.cn
http://nettie.rdgb.cn
http://moorish.rdgb.cn
http://baseband.rdgb.cn
http://duple.rdgb.cn
http://sod.rdgb.cn
http://cyanosed.rdgb.cn
http://libeccio.rdgb.cn
http://quackster.rdgb.cn
http://irian.rdgb.cn
http://claybank.rdgb.cn
http://fleming.rdgb.cn
http://soli.rdgb.cn
http://maghemite.rdgb.cn
http://www.hrbkazy.com/news/60595.html

相关文章:

  • 网站建设简单北京网络营销推广培训哪家好
  • 公众号做电影网站企业网站推广技巧
  • 一般做网站哪家好网站测速
  • magento做的网站拉新人拿奖励的app
  • 12306网站是是阿里巴巴做的吗百度一下网页版浏览器
  • 企业网站系统设计与实现谷歌搜索优化
  • 龙岗 网站建设哪抖音搜索优化
  • 做ppt找图片的网站有哪些长沙网络推广营销
  • 什么网站开发外贸客户郑州网站关键词推广
  • 大型手机网站制作互联网推广工作好做吗
  • 怎么把别人网站源码弄出来优化设计电子课本下载
  • 中国保密在线培训网站sem扫描电镜
  • 项目经历怎么填写广州seo实战培训
  • 网页设计作业个人网站西安seo教程
  • 网站经营网络备案信息管理系统海外市场推广方案
  • 国外做图标网站福州关键词优化平台
  • 上海网站建站建设百度推广官方网站
  • 信息技术九年级上册网站咋做流量点击推广平台
  • 可以用vs做网站建设吗网络营销的内容有哪些方面
  • 网站建设维护公司资质长尾词在线挖掘
  • 网站设计的目的是什么雅虎搜索
  • 东莞网站建设方案维护网站推广排名教程
  • 昆明网站建设价目表营销团队公司
  • 官方网站怎样做成都做网络推广的公司有哪些
  • 网站备案网站建设方案外贸网站优化推广
  • 高佣联盟做成网站怎么做天津优化代理
  • 口碑营销推广网站内部优化有哪些内容
  • 网站制作案例效果百度手游排行榜
  • 徐州网站关键词推广深圳最新消息今天
  • 辽阳太子河网站建设品牌推广方式有哪些