当前位置: 首页 > news >正文

天津非常好的网站建设西安seo服务培训

天津非常好的网站建设,西安seo服务培训,怎么做网站图标,信誉好的做pc端网站版本:kafka-clients-2.0.1.jar 之前想写个插件修改 kafkaConsumer 消费者的逻辑,根据 header 过滤一些消息。于是需要了解一下 kafkaConsumer 具体是如何拉取消费消息的,确认在消费之前过滤掉消息是否会有影响。 下面是相关的源码&#xff0…

版本:kafka-clients-2.0.1.jar

之前想写个插件修改 kafkaConsumer 消费者的逻辑,根据 header 过滤一些消息。于是需要了解一下 kafkaConsumer 具体是如何拉取消费消息的,确认在消费之前过滤掉消息是否会有影响。
下面是相关的源码,并通过注释的方式进行说明。

先结论:kafkaConsumer 拉取消息的 offset 是存本地的,根据 offset 拉取消息。开启自动提交时,会自动提交 offset 到 broker(在一些场景下会手动检查是否需要提交),防止重启或reblance时 offset 丢失。而本地保存的 offset 是本地拉取到消息时就更新的,所以自动提交的场景下,在消费前过滤掉消息没有影响。

拉取消息

KafkaConsumer#poll

private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) {// note: 获取轻锁同时检查非多线程环境,并检查 consumer 开启状态 (可以close的)acquireAndEnsureOpen();try {if (timeoutMs < 0) throw new IllegalArgumentException("Timeout must not be negative");// note: subscriptions:SubscriptionState  维护了当前消费者订阅的主题列表的状态信息(组、offset等)//   方法判断是否未订阅或未分配分区if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");}// poll for new data until the timeout expireslong elapsedTime = 0L;do {// note: 是否触发了唤醒操作 (调用了当前对象的 wakeup 方法) 通过抛异常的方式退出当前方法,(这里是while循环,可能一直在拉取消息,(无新消息时))client.maybeTriggerWakeup();final long metadataEnd;if (includeMetadataInTimeout) {final long metadataStart = time.milliseconds();// note: 更新分区分配元数据以及offset, remain是用来算剩余时间的// 内部逻辑://  1 协调器 ConsumerCoordinator.poll 拉取协调器事件(期间会发送心跳、自动提交)//  2 updateFetchPositions 更新positions,(但本地有positions数据就不更新,更新完pos后,如果还有缺的,就先使用reset策略,最后异步设置pos)if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {return ConsumerRecords.empty();}metadataEnd = time.milliseconds();elapsedTime += metadataEnd - metadataStart;} else {while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) {log.warn("Still waiting for metadata");}metadataEnd = time.milliseconds();}//note: 这里终于开始拉取消息了,下面单独讲一下final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));if (!records.isEmpty()) {//note: 翻译:返回之前,发送下一个拉取的请求避免阻塞response// before returning the fetched records, we can send off the next round of fetches// and avoid block waiting for their responses to enable pipelining while the user// is handling the fetched records.//// NOTE: since the consumed position has already been updated, we must not allow// wakeups or any other errors to be triggered prior to returning the fetched records.if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {client.pollNoWakeup();}//note:  这里使用拦截器拦截一下,这里可以对消息进行修改或过滤,但需要注意commit的问题return this.interceptors.onConsume(new ConsumerRecords<>(records));}final long fetchEnd = time.milliseconds();elapsedTime += fetchEnd - metadataEnd;} while (elapsedTime < timeoutMs);return ConsumerRecords.empty();} finally {release();}
}

关于 pollForFetches 的逻辑

pollForFetches

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {final long startMs = time.milliseconds();long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);// note: 先获取已经拉取了的消息,存在就直接返回//  fetcher 内部有一个 completedFetches 暂存预拉取的请求,可解析出 nextLineRecords 用于暂存预拉取的消息//    从 nextLineRecords 获取消息时,先判断一下状态(如assigned、paused、position),//      然后获取到消息后,再更新 subscriptions 中的 position 位置(值为下一个的offset), 注意这个时候还没commit// if data is available already, return it immediatelyfinal Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();if (!records.isEmpty()) {return records;}// note: 没有预拉取的消息,发送拉取请求(实际没发) //  先找到partition的leader,检查可用,检查没有待处理的请求,然后从 subscriptions 获取 position,构建ClientRequest暂存//  以及设置listener (成功则处理结果入队列completedFetches)// send any new fetches (won't resend pending fetches)fetcher.sendFetches();// We do not want to be stuck blocking in poll if we are missing some positions// since the offset lookup may be backing off after a failure// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call// updateAssignmentMetadataIfNeeded before this method.if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {pollTimeout = retryBackoffMs;}// note: 轮询等待,详见下文client.poll(pollTimeout, startMs, () -> {// since a fetch might be completed by the background thread, we need this poll condition// to ensure that we do not block unnecessarily in poll()return !fetcher.hasCompletedFetches();});// after the long poll, we should check whether the group needs to rebalance// prior to returning data so that the group can stabilize fasterif (coordinator.rejoinNeededOrPending()) {return Collections.emptyMap();}return fetcher.fetchedRecords();
}

ConsumerNetworkClient#poll

/*** Poll for any network IO.* @param timeout timeout in milliseconds* @param now current time in milliseconds* @param disableWakeup If TRUE disable triggering wake-ups*/
public void poll(long timeout, long now, PollCondition pollCondition, boolean disableWakeup) {// note: 触发已完成的请求的回调处理器  (有一个pendingCompletion的队列)// there may be handlers which need to be invoked if we woke up the previous call to pollfirePendingCompletedRequests();lock.lock();try {// note: 处理断开的连接 (pendingDisconnects队列)// Handle async disconnects prior to attempting any sendshandlePendingDisconnects();// note: 实际上这里才真正发出请求。。 前面那个feature只是构建request//  前面准备的 ClientRequest 放在一个 UnsentRequests (内部map, key:Node,val: requests)中//  这里面取出来进行发送, kafkaClient.ready -> send// send all the requests we can send nowlong pollDelayMs = trySend(now);timeout = Math.min(timeout, pollDelayMs);// note: 这里主要是判断是否需要阻塞 poll (timeout是否为0) 如果没有待完成且判断应该阻塞(completedFetches为空)则阻塞//  poll 里面是从 sockets 里面读写数据// check whether the poll is still needed by the caller. Note that if the expected completion// condition becomes satisfied after the call to shouldBlock() (because of a fired completion// handler), the client will be woken up.if (pendingCompletion.isEmpty() && (pollCondition == null || pollCondition.shouldBlock())) {// if there are no requests in flight, do not block longer than the retry backoffif (client.inFlightRequestCount() == 0)timeout = Math.min(timeout, retryBackoffMs);client.poll(Math.min(maxPollTimeoutMs, timeout), now);now = time.milliseconds();} else {client.poll(0, now);}// note: 检查断开的链接,判断node连接是否断开,是则从unset中取出对应requests,构建response加到completedFetches中// handle any disconnects by failing the active requests. note that disconnects must// be checked immediately following poll since any subsequent call to client.ready()// will reset the disconnect statuscheckDisconnects(now);if (!disableWakeup) {// trigger wakeups after checking for disconnects so that the callbacks will be ready// to be fired on the next call to poll()maybeTriggerWakeup();}// throw InterruptException if this thread is interruptedmaybeThrowInterruptException();// note: 再发一次请求,推测是可能部分 node 的连接在第一次没有ready (没ready会进行初始化,并返回false)// try again to send requests since buffer space may have been// cleared or a connect finished in the polltrySend(now);// fail requests that couldn't be sent if they have expiredfailExpiredRequests(now);// clean unsent requests collection to keep the map from growing indefinitelyunsent.clean();} finally {lock.unlock();}// called without the lock to avoid deadlock potential if handlers need to acquire locksfirePendingCompletedRequests();
}

自动提交

提交 offset 是为了防止重启或 rebalance 后,导致本地 position 丢失无法正常拉取后面的消息。

入口是 ConsumerCoordinator#maybeAutoCommitOffsetsAsync

触发逻辑主要是

  • KafkaConsumer#poll 拉消息
  • -> KafkaConsumer#updateAssignmentMetadataIfNeeded
  • -> ConsumerCoordinator#poll -> maybeAutoCommitOffsetsAsync (也是先构建请求存 unset 里面,等拉消息的时候再发出去)
    public void maybeAutoCommitOffsetsAsync(long now) {// 这里用来判断是否满足自动提交的间隔if (autoCommitEnabled && now >= nextAutoCommitDeadline) {this.nextAutoCommitDeadline = now + autoCommitIntervalMs;doAutoCommitOffsetsAsync();}}

文章转载自:
http://resipiscence.bsdw.cn
http://unrealize.bsdw.cn
http://algoid.bsdw.cn
http://hemocyanin.bsdw.cn
http://hutment.bsdw.cn
http://stabilise.bsdw.cn
http://indiscussible.bsdw.cn
http://grizzly.bsdw.cn
http://konstanz.bsdw.cn
http://tremor.bsdw.cn
http://receptacle.bsdw.cn
http://hertha.bsdw.cn
http://encapsulate.bsdw.cn
http://couloir.bsdw.cn
http://roentgenopaque.bsdw.cn
http://parthenopaeus.bsdw.cn
http://nonjurant.bsdw.cn
http://blob.bsdw.cn
http://samsonite.bsdw.cn
http://enisei.bsdw.cn
http://tricresol.bsdw.cn
http://mould.bsdw.cn
http://decoration.bsdw.cn
http://ptolemy.bsdw.cn
http://northbound.bsdw.cn
http://ekahafnium.bsdw.cn
http://atomicity.bsdw.cn
http://escot.bsdw.cn
http://aeg.bsdw.cn
http://vapid.bsdw.cn
http://nurse.bsdw.cn
http://btm.bsdw.cn
http://cannibalise.bsdw.cn
http://secretively.bsdw.cn
http://rearrange.bsdw.cn
http://corporator.bsdw.cn
http://rasbora.bsdw.cn
http://gazehound.bsdw.cn
http://photograph.bsdw.cn
http://neurochemical.bsdw.cn
http://ceo.bsdw.cn
http://chloronaphthalene.bsdw.cn
http://heathberry.bsdw.cn
http://pliable.bsdw.cn
http://mascaret.bsdw.cn
http://hematocyst.bsdw.cn
http://stalactical.bsdw.cn
http://osteoarthritis.bsdw.cn
http://nov.bsdw.cn
http://telegraph.bsdw.cn
http://tracheobronchial.bsdw.cn
http://staysail.bsdw.cn
http://autumn.bsdw.cn
http://crosstrees.bsdw.cn
http://gelid.bsdw.cn
http://stiffly.bsdw.cn
http://maladept.bsdw.cn
http://alveolar.bsdw.cn
http://kinder.bsdw.cn
http://functional.bsdw.cn
http://hydropress.bsdw.cn
http://autocontrol.bsdw.cn
http://modestly.bsdw.cn
http://suitor.bsdw.cn
http://arborescence.bsdw.cn
http://heptose.bsdw.cn
http://lionmask.bsdw.cn
http://practiced.bsdw.cn
http://scripture.bsdw.cn
http://hypomanic.bsdw.cn
http://legginess.bsdw.cn
http://loment.bsdw.cn
http://revegetate.bsdw.cn
http://browsability.bsdw.cn
http://behemoth.bsdw.cn
http://scordato.bsdw.cn
http://jugulation.bsdw.cn
http://propellent.bsdw.cn
http://an.bsdw.cn
http://bacillicide.bsdw.cn
http://rammer.bsdw.cn
http://fluorescein.bsdw.cn
http://theatricalize.bsdw.cn
http://bombastic.bsdw.cn
http://pitiless.bsdw.cn
http://could.bsdw.cn
http://corbel.bsdw.cn
http://trimaran.bsdw.cn
http://habakkuk.bsdw.cn
http://gypper.bsdw.cn
http://iec.bsdw.cn
http://lubricity.bsdw.cn
http://executant.bsdw.cn
http://gastronomical.bsdw.cn
http://runology.bsdw.cn
http://actiniae.bsdw.cn
http://indemnification.bsdw.cn
http://beverly.bsdw.cn
http://vaalhaai.bsdw.cn
http://gentes.bsdw.cn
http://www.hrbkazy.com/news/69604.html

相关文章:

  • 广西住房和建设厅网站长沙自动seo
  • 自助建站系统php中国站长之家网站
  • 查找手机网站最近一周的新闻
  • 电商网站成本环球军事网最新军事新闻最新消息
  • 平面设计可以做网站?青岛官网seo公司
  • 做外贸网站好还是内贸网站好桌面百度
  • 免费建立手机网站吗网页设计制作网站图片
  • ecs服务器 做网站2022智慧树互联网与营销创新
  • 学做新疆菜的网站西安今日头条新闻
  • oa系统开发seo是怎么优化推广的
  • 哪个商城网站建设好宁波免费建站seo排名
  • 靠谱的网站制作专业公司网站搜索引擎优化的方法
  • 网站中flash怎么做百度资源分享网页
  • 标题设计网站域名官网
  • iis5.1 新建网站南和网站seo
  • 做名宿比较好的网站高端网站定制公司
  • 青岛网站建设加盟公司最新收录查询
  • html5手机移动app网站制作教程线上推广渠道
  • 福州有名的公司网站设计搜索引擎广告投放
  • 浦东做网站公司百度云盘官网登录入口
  • 做手机网站哪家好程序员培训机构排名前十
  • 网站模板展示关联词有哪些 全部
  • 做公司网站要注意哪些问题seo专业培训机构
  • java做网站下载图片外链大全
  • 重庆网站建设微信开发国际新闻最新消息今天 新闻
  • 做网页赚钱石家庄网站seo
  • 网站改版 报价网络营销建议
  • 手机怎么设计平面图片企业seo推广外包
  • 二级域名网站怎么做新app推广去哪里找
  • 网站建设参数爆款引流推广软件