工作中有对Kafka重度使用的项目,其实很早就想写这一篇,但是当时也看到过一些类似的文章以及有一些其他的分享内容,所以一直拖到现在。对Kafka的使用聚焦在消费端,所以详细聊一下Kafka consumer消费过程中是如何拉取数据的。
本篇讨论的版本为Java客户端v2.8。
Poll方法Kafka在消息消费上的实现是poll模型,消费者需要主动向broker拉取数据,Kafka consumer需要不停的调用KafkaConsumer#poll(java.time.Duration)方法。Kafka作为一个成熟复杂的消息系统,为了平衡完备的功能和易用的客户端,因而将consumer设计为这种单线程持续调用poll方法的形式(但是Kafka并没有限制获取数据后要如何消费,可以参考KafkaConsumer这个类上长长的javadoc描述的最后一部分),poll方法内部做了很多额外的工作,最大程度减少了客户端使用的复杂度。那么如果抛开其他有的没的,是不是poll方法内部就是简单地发送请求给broker然后等待获取响应的消息数据最后返回给调用方呢?在没有深入了解之前,很容易产生这样的想法,这也是这个方法给人最直观的感受,但是事实并非如此。
但是无论如何数据都是从poll方法调用后返回的,除了拉取数据之外的其他内容同样不可忽视,只不过不是本篇所述的重点,所以就以一张图和原代码展示一下poll方法的基本调用过程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 @Override public ConsumerRecords<K, V> poll (final Duration timeout) { return poll(time.timer(timeout), true ); } private ConsumerRecords<K, V> poll (final Timer timer, final boolean includeMetadataInTimeout) { acquireAndEnsureOpen(); try { this .kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs()); if (this .subscriptions.hasNoSubscriptionOrUserAssignment()) { throw new IllegalStateException ("Consumer is not subscribed to any topics or assigned any partitions" ); } do { client.maybeTriggerWakeup(); if (includeMetadataInTimeout) { updateAssignmentMetadataIfNeeded(timer, false ); } else { while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true )) { log.warn("Still waiting for metadata" ); } } final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer); if (!records.isEmpty()) { if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { client.transmitSends(); } return this .interceptors.onConsume(new ConsumerRecords <>(records)); } } while (timer.notExpired()); return ConsumerRecords.empty(); } finally { release(); this .kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs()); } }
Fetcherpoll方法的最外层其实写的非常紧凑清晰,数据的拉取相关操作都在它的pollForFetches方法中。这个方法的代码抛开进一步的封装调用也很简短,把英文注释翻译了直接贴出来:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches (Timer timer) { long pollTimeout = coordinator == null ? timer.remainingMs() : Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs()); final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); if (!records.isEmpty()) { return records; } fetcher.sendFetches(); if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) { pollTimeout = retryBackoffMs; } log.trace("Polling for fetches with timeout {}" , pollTimeout); Timer pollTimer = time.timer(pollTimeout); client.poll(pollTimer, () -> { return !fetcher.hasAvailableFetches(); }); timer.update(pollTimer.currentTimeMs()); return fetcher.fetchedRecords(); }
整个消息拉取过程简述如下:
调用 fetcher.fetchRecords()
判断调用是否有数据返回,如果有数据直接返回数据
如果没有数据,调用 fetcher.sendFetches()
调用networkClient.poll,调用返回条件为收到fetch请求响应数据或者等待超时
再次调用 fetcher.fetchRecords()
可以看到消息拉取的处理都是由org.apache.kafka.clients.consumer.internals.Fetcher这个类对象来处理,并且核心方法只有两个:fetchRecords 和 sendFetches,这两个方法代码看起来会多一些,但是同样很清晰,所以还是直接贴出来跟这源码走一遍。
sendFetches1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;public synchronized int sendFetches () { sensors.maybeUpdateAssignment(subscriptions); Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests(); for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) { final Node fetchTarget = entry.getKey(); final FetchSessionHandler.FetchRequestData data = entry.getValue(); final FetchRequest.Builder request = FetchRequest.Builder .forConsumer(this .maxWaitMs, this .minBytes, data.toSend()) .isolationLevel(isolationLevel) .setMaxBytes(this .maxBytes) .metadata(data.metadata()) .toForget(data.toForget()) .rackId(clientRackId); log.debug("Sending {} {} to broker {}" , isolationLevel, data, fetchTarget); RequestFuture<ClientResponse> future = client.send(fetchTarget, request); this .nodesWithPendingFetchRequests.add(entry.getKey().id()); future.addListener(new RequestFutureListener <ClientResponse>() { @Override public void onSuccess (ClientResponse resp) { synchronized (Fetcher.this ) { try { @SuppressWarnings("unchecked") FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody(); FetchSessionHandler handler = sessionHandler(fetchTarget.id()); if (handler == null ) { log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response." , fetchTarget.id()); return ; } if (!handler.handleResponse(response)) { return ; } Set<TopicPartition> partitions = new HashSet <>(response.responseData().keySet()); FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator (sensors, partitions); for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) { TopicPartition partition = entry.getKey(); FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition); if (requestData == null ) { String message; if (data.metadata().isFull()) { message = MessageFormatter.arrayFormat( "Response for missing full request partition: partition={}; metadata={}" , new Object []{partition, data.metadata()}).getMessage(); } else { message = MessageFormatter.arrayFormat( "Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}" , new Object []{partition, data.metadata(), data.toSend(), data.toForget()}).getMessage(); } throw new IllegalStateException (message); } else { long fetchOffset = requestData.fetchOffset; FetchResponse.PartitionData<Records> partitionData = entry.getValue(); log.debug("Fetch {} at offset {} for partition {} returned fetch data {}" , isolationLevel, fetchOffset, partition, partitionData); Iterator<? extends RecordBatch > batches = partitionData.records().batches().iterator(); short responseVersion = resp.requestHeader().apiVersion(); completedFetches.add(new CompletedFetch (partition, partitionData, metricAggregator, batches, fetchOffset, responseVersion)); } } sensors.fetchLatency.record(resp.requestLatencyMs()); } finally { nodesWithPendingFetchRequests.remove(fetchTarget.id()); } } } @Override public void onFailure (RuntimeException e) { synchronized (Fetcher.this ) { try { FetchSessionHandler handler = sessionHandler(fetchTarget.id()); if (handler != null ) { handler.handleError(e); } } finally { nodesWithPendingFetchRequests.remove(fetchTarget.id()); } } } }); } return fetchRequestMap.size(); }
总的来说这个方法就是在做发送请求的工作,根据订阅的分片信息,给所有相关的broker发送fetch请求。
fetchRecords1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 private CompletedFetch nextInLineFetch = null ;public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords () { Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap <>(); Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque <>(); int recordsRemaining = maxPollRecords; try { while (recordsRemaining > 0 ) { if (nextInLineFetch == null || nextInLineFetch.isConsumed) { CompletedFetch records = completedFetches.peek(); if (records == null ) break ; if (records.notInitialized()) { try { nextInLineFetch = initializeCompletedFetch(records); } catch (Exception e) { FetchResponse.PartitionData<Records> partition = records.partitionData; if (fetched.isEmpty() && (partition.records() == null || partition.records().sizeInBytes() == 0 )) { completedFetches.poll(); } throw e; } } else { nextInLineFetch = records; } completedFetches.poll(); } else if (subscriptions.isPaused(nextInLineFetch.partition)) { log.debug("Skipping fetching records for assigned partition {} because it is paused" , nextInLineFetch.partition); pausedCompletedFetches.add(nextInLineFetch); nextInLineFetch = null ; } else { List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineFetch, recordsRemaining); if (!records.isEmpty()) { TopicPartition partition = nextInLineFetch.partition; List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition); if (currentRecords == null ) { fetched.put(partition, records); } else { List<ConsumerRecord<K, V>> newRecords = new ArrayList <>(records.size() + currentRecords.size()); newRecords.addAll(currentRecords); newRecords.addAll(records); fetched.put(partition, newRecords); } recordsRemaining -= records.size(); } } } } catch (KafkaException e) { if (fetched.isEmpty()) throw e; } finally { completedFetches.addAll(pausedCompletedFetches); } return fetched; }
可以看到该方法主要是从响应缓冲区中取出数据,进行解析、相关数据处理,返回解析好的ConsumerRecord。
线程模型概述看下来这个线程模型核心逻辑并不复杂。如果只论请求响应,可以描述为根据订阅情况给各个broker并发发送请求,异步接收请求(底层使用NIO)将收到的数据写入响应缓冲区链表,注意对于每个broker不会同时发起多个请求。从整个调用过程来看:消费的时候首先查看缓冲区(线程安全的链表)中是否还有未消费的数据,如果有则直接解码处理后返回(同时异步发起下一轮请求),如果没有则根据订阅情况针对所有相关broker节点并发做异步请求,异步响应结果都会存入缓冲区;消费线程会等待直到缓冲区有任何可用的数据或者超时;循环解析缓冲链表中的数据,返回不超过配置数量(max.poll.records)的消息。
相关参数和数据绝大部分相关参数都可以在fetch请求构建时看到(观察 FetchRequest.Builder),加上poll方法的一个经典配置项,涉及的相关参数主要有如下:
fetch.min.bytes: 在响应fetch请求时服务器应返回的最小数据量。如果没有足够的数据可用,请求将等待累积相应数据量后再作响应。默认设置为1字节意味着只要有一个字节的数据可用,或者请求等待超时,fetch请求就会得到响应。将其设置为大于1的值将导致服务器等待累计更大的数据量,这样可以稍微提高服务器的吞吐量,但也会增加一些延迟。
fetch.max.bytes: 服务器响应fetch请求时应返回的最大数据量。数据记录由使用者分批获取,如果第一个非空分区中的第一个记录集合大于其值,则仍将返回记录集合,以确保使用者能够正常执行下去。因此,这不是一个绝对的最大值。broker接受的最大记录集合大小通过 message.max.bytes (broker config) 或 max.message.bytes (topic config)定义。请注意,使用者会并行执行多个读取操作。
fetch.max.wait.ms: 如果没有足够的数据立即满足 fetch.min.bytes 提供的要求,服务器在响应fetch请求之前将阻塞的最大时间。
max.partition.fetch.bytes: 服务器返回的每个分区的最大数据量。记录由使用者分批获取。如果提取的第一个非空分区中的第一个记录批处理大于此限制,则仍将返回该批处理,以确保使用者能够正常执行下去。
max.poll.records: 单次调用 poll() 返回的最大记录数。
在本人的实际的项目应用中,总消费量最高可以达到十万每秒的级别,在这种消费速度下如果不做任何人工配置全部采用默认值,每次fetch请求基本上还是只会返回1条消息记录,此时单机fetch请求的QPS可以达到4,5k。在消费速度和生产速度匹配的情况下Kafka的消费延迟(从生产方发送到消费方收到这个数据)能够维持在一个比较低的水平(几十毫秒以内),这也可以反映出Kafka的吞吐量确实相当不错,上限很高(处理日志的集群会达到更高的数量级),同时延迟也能够在这个量级下得到保证。此外,如果想要降低CPU、提升吞吐量、能够接受一定的延迟提升,Kafka也提供了上述相关配置进行更改,满足不同情况下的使用需求。