工作中有对Kafka重度使用的项目,其实很早就想写这一篇,但是当时也看到过一些类似的文章以及有一些其他的分享内容,所以一直拖到现在。对Kafka的使用聚焦在消费端,所以详细聊一下Kafka consumer消费过程中是如何拉取数据的。

本篇讨论的版本为Java客户端v2.8。

Poll方法

Kafka在消息消费上的实现是poll模型,消费者需要主动向broker拉取数据,Kafka consumer需要不停的调用KafkaConsumer#poll(java.time.Duration)方法。Kafka作为一个成熟复杂的消息系统,为了平衡完备的功能和易用的客户端,因而将consumer设计为这种单线程持续调用poll方法的形式(但是Kafka并没有限制获取数据后要如何消费,可以参考KafkaConsumer这个类上长长的javadoc描述的最后一部分),poll方法内部做了很多额外的工作,最大程度减少了客户端使用的复杂度。那么如果抛开其他有的没的,是不是poll方法内部就是简单地发送请求给broker然后等待获取响应的消息数据最后返回给调用方呢?在没有深入了解之前,很容易产生这样的想法,这也是这个方法给人最直观的感受,但是事实并非如此。

但是无论如何数据都是从poll方法调用后返回的,除了拉取数据之外的其他内容同样不可忽视,只不过不是本篇所述的重点,所以就以一张图和原代码展示一下poll方法的基本调用过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Override
public ConsumerRecords<K, V> poll(final Duration timeout) {
return poll(time.timer(timeout), true);
}




private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {

acquireAndEnsureOpen();
try {

this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());


if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}

do {
client.maybeTriggerWakeup();


if (includeMetadataInTimeout) {

updateAssignmentMetadataIfNeeded(timer, false);
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
log.warn("Still waiting for metadata");
}
}


final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {







if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.transmitSends();
}


return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
} while (timer.notExpired());


return ConsumerRecords.empty();
} finally {

release();
this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
}
}

Fetcher

poll方法的最外层其实写的非常紧凑清晰,数据的拉取相关操作都在它的pollForFetches方法中。这个方法的代码抛开进一步的封装调用也很简短,把英文注释翻译了直接贴出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {

long pollTimeout = coordinator == null ? timer.remainingMs() :
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());



final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records;
}



fetcher.sendFetches();






if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}

log.trace("Polling for fetches with timeout {}", pollTimeout);


Timer pollTimer = time.timer(pollTimeout);
client.poll(pollTimer, () -> {


return !fetcher.hasAvailableFetches();
});
timer.update(pollTimer.currentTimeMs());


return fetcher.fetchedRecords();
}

整个消息拉取过程简述如下:

  1. 调用 fetcher.fetchRecords()
  2. 判断调用是否有数据返回,如果有数据直接返回数据
  3. 如果没有数据,调用 fetcher.sendFetches()
  4. 调用networkClient.poll,调用返回条件为收到fetch请求响应数据或者等待超时
  5. 再次调用 fetcher.fetchRecords()

可以看到消息拉取的处理都是由org.apache.kafka.clients.consumer.internals.Fetcher这个类对象来处理,并且核心方法只有两个:fetchRecords 和 sendFetches,这两个方法代码看起来会多一些,但是同样很清晰,所以还是直接贴出来跟这源码走一遍。

sendFetches

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119

private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;





public synchronized int sendFetches() {


sensors.maybeUpdateAssignment(subscriptions);



Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
final Node fetchTarget = entry.getKey();
final FetchSessionHandler.FetchRequestData data = entry.getValue();

final FetchRequest.Builder request = FetchRequest.Builder
.forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
.isolationLevel(isolationLevel)
.setMaxBytes(this.maxBytes)
.metadata(data.metadata())
.toForget(data.toForget())
.rackId(clientRackId);

log.debug("Sending {} {} to broker {}", isolationLevel, data, fetchTarget);


RequestFuture<ClientResponse> future = client.send(fetchTarget, request);







this.nodesWithPendingFetchRequests.add(entry.getKey().id());

future.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
synchronized (Fetcher.this) {
try {
@SuppressWarnings("unchecked")
FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();
FetchSessionHandler handler = sessionHandler(fetchTarget.id());
if (handler == null) {
log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
fetchTarget.id());
return;
}
if (!handler.handleResponse(response)) {
return;
}

Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);

for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition);
if (requestData == null) {
String message;
if (data.metadata().isFull()) {
message = MessageFormatter.arrayFormat(
"Response for missing full request partition: partition={}; metadata={}",
new Object[]{partition, data.metadata()}).getMessage();
} else {
message = MessageFormatter.arrayFormat(
"Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}",
new Object[]{partition, data.metadata(), data.toSend(), data.toForget()}).getMessage();
}


throw new IllegalStateException(message);
} else {
long fetchOffset = requestData.fetchOffset;
FetchResponse.PartitionData<Records> partitionData = entry.getValue();

log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
isolationLevel, fetchOffset, partition, partitionData);

Iterator<? extends RecordBatch> batches = partitionData.records().batches().iterator();
short responseVersion = resp.requestHeader().apiVersion();


completedFetches.add(new CompletedFetch(partition, partitionData,
metricAggregator, batches, fetchOffset, responseVersion));
}
}


sensors.fetchLatency.record(resp.requestLatencyMs());
} finally {
nodesWithPendingFetchRequests.remove(fetchTarget.id());
}
}
}

@Override
public void onFailure(RuntimeException e) {
synchronized (Fetcher.this) {
try {
FetchSessionHandler handler = sessionHandler(fetchTarget.id());
if (handler != null) {
handler.handleError(e);
}
} finally {
nodesWithPendingFetchRequests.remove(fetchTarget.id());
}
}
}
});

}
return fetchRequestMap.size();
}

总的来说这个方法就是在做发送请求的工作,根据订阅的分片信息,给所有相关的broker发送fetch请求。

fetchRecords

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89

private CompletedFetch nextInLineFetch = null;











public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque<>();

int recordsRemaining = maxPollRecords;

try {

while (recordsRemaining > 0) {
if (nextInLineFetch == null || nextInLineFetch.isConsumed) {
CompletedFetch records = completedFetches.peek();
if (records == null) break;

if (records.notInitialized()) {
try {

nextInLineFetch = initializeCompletedFetch(records);
} catch (Exception e) {





FetchResponse.PartitionData<Records> partition = records.partitionData;
if (fetched.isEmpty() && (partition.records() == null || partition.records().sizeInBytes() == 0)) {
completedFetches.poll();
}
throw e;
}
} else {
nextInLineFetch = records;
}
completedFetches.poll();
} else if (subscriptions.isPaused(nextInLineFetch.partition)) {


log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineFetch.partition);
pausedCompletedFetches.add(nextInLineFetch);
nextInLineFetch = null;
} else {


List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineFetch, recordsRemaining);


if (!records.isEmpty()) {
TopicPartition partition = nextInLineFetch.partition;
List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
if (currentRecords == null) {
fetched.put(partition, records);
} else {




List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
fetched.put(partition, newRecords);
}

recordsRemaining -= records.size();
}
}
}
} catch (KafkaException e) {
if (fetched.isEmpty())
throw e;
} finally {


completedFetches.addAll(pausedCompletedFetches);
}

return fetched;
}

可以看到该方法主要是从响应缓冲区中取出数据,进行解析、相关数据处理,返回解析好的ConsumerRecord。

线程模型概述

看下来这个线程模型核心逻辑并不复杂。如果只论请求响应,可以描述为根据订阅情况给各个broker并发发送请求,异步接收请求(底层使用NIO)将收到的数据写入响应缓冲区链表,注意对于每个broker不会同时发起多个请求。从整个调用过程来看:消费的时候首先查看缓冲区(线程安全的链表)中是否还有未消费的数据,如果有则直接解码处理后返回(同时异步发起下一轮请求),如果没有则根据订阅情况针对所有相关broker节点并发做异步请求,异步响应结果都会存入缓冲区;消费线程会等待直到缓冲区有任何可用的数据或者超时;循环解析缓冲链表中的数据,返回不超过配置数量(max.poll.records)的消息。

相关参数和数据

绝大部分相关参数都可以在fetch请求构建时看到(观察 FetchRequest.Builder),加上poll方法的一个经典配置项,涉及的相关参数主要有如下:

  • fetch.min.bytes:在响应fetch请求时服务器应返回的最小数据量。如果没有足够的数据可用,请求将等待累积相应数据量后再作响应。默认设置为1字节意味着只要有一个字节的数据可用,或者请求等待超时,fetch请求就会得到响应。将其设置为大于1的值将导致服务器等待累计更大的数据量,这样可以稍微提高服务器的吞吐量,但也会增加一些延迟。
  • fetch.max.bytes:服务器响应fetch请求时应返回的最大数据量。数据记录由使用者分批获取,如果第一个非空分区中的第一个记录集合大于其值,则仍将返回记录集合,以确保使用者能够正常执行下去。因此,这不是一个绝对的最大值。broker接受的最大记录集合大小通过 message.max.bytes (broker config) 或 max.message.bytes (topic config)定义。请注意,使用者会并行执行多个读取操作。
  • fetch.max.wait.ms:如果没有足够的数据立即满足 fetch.min.bytes 提供的要求,服务器在响应fetch请求之前将阻塞的最大时间。
  • max.partition.fetch.bytes:服务器返回的每个分区的最大数据量。记录由使用者分批获取。如果提取的第一个非空分区中的第一个记录批处理大于此限制,则仍将返回该批处理,以确保使用者能够正常执行下去。
  • max.poll.records:单次调用 poll() 返回的最大记录数。

在本人的实际的项目应用中,总消费量最高可以达到十万每秒的级别,在这种消费速度下如果不做任何人工配置全部采用默认值,每次fetch请求基本上还是只会返回1条消息记录,此时单机fetch请求的QPS可以达到4,5k。在消费速度和生产速度匹配的情况下Kafka的消费延迟(从生产方发送到消费方收到这个数据)能够维持在一个比较低的水平(几十毫秒以内),这也可以反映出Kafka的吞吐量确实相当不错,上限很高(处理日志的集群会达到更高的数量级),同时延迟也能够在这个量级下得到保证。此外,如果想要降低CPU、提升吞吐量、能够接受一定的延迟提升,Kafka也提供了上述相关配置进行更改,满足不同情况下的使用需求。