写了消费者,那再写下生产者吧,兴许应该再写写网络模型和Broker端的内容,免得偏科,毕竟也吃过亏。虽然仍是聚焦于一点,但是生产者的内容也不少,与分析消费者如何拉取数据那篇一样,着重描述一下如何发送数据,理解线程模型始终是最重要的。本篇不涉及事务相关内容,就展示一下整体逻辑,然后挑几个核心方法进行说明。

本篇讨论的版本为Java客户端v2.8。

运行概览

相比于消费者有poll或者push的花样,在基本模式上生产者就直白很多,它肯定是往服务端去推数据。但是同样的,kafka生产者并不是调用一次API就发起一次网络请求,为了高性能其中做了很多操作。首先来看一下kafka生产者运行的大体逻辑:

从线程方面来说主要涉及发送调用线程、Sender线程以及图中没有展示的网络IO线程。整体运作也是一个生产者消费者模型,调用线程是“生产者”,Sender线程是“消费者”,它们通过Accumulator这个包含N个队列的类对象来进行数据交互。可以看到这套东西核心就是批处理,加大吞吐量,提高效率。

Send方法

对于应用程序来说,要发送数据就要调用kafka生产者的send方法,它是与应用程序交互的门面,本文所述的其他内容都是其背后执行的服务,该方法描述如下:

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

很明显这是一个异步形式的调用,这与kafka的网络模型息息相关,当然异步也可以转同步,只需要对返回的Future调用get方法即可。API非常简洁,代码也是一贯的不错,接下来对核心方法逐行看个大概:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124



private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {

throwIfProducerClosed();

long nowMs = time.milliseconds();


ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;


byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}





int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);

setReadOnly(record.headers());
Header[] headers = record.headers().toArray();


int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
if (log.isTraceEnabled()) {
log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
}



Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

if (transactionManager != null && transactionManager.isTransactional()) {
transactionManager.failIfNotReadyForSend();
}



RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}

interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}

if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);


if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}


return result.future;



} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {

this.interceptors.onSendError(record, tp, e);
throw e;
}
}

这个方法的逻辑并不是特别重,或者说重的逻辑被忽略或放到后面,比较直白,首先把数据序列化,然后确定要发送到哪个分区,最后把数据塞给中介Accumulator进行处理。

Sender线程

send方法并不会直接触发或者说准备触发消息的实际发送,这个工作由Sender线程来完成。KafkaProducer有两个相关的成员:ioThreadSender,该线程在生产者的构造函数中启动,在close方法中被等待关闭或强制关闭。看一下重点代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");


while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}


}





void runOnce() {
if (transactionManager != null) {

}

long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs);
client.poll(pollTimeout, currentTimeMs);
}


private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();



RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);



if (!result.unknownLeaderTopics.isEmpty()) {



for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic, now);

log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}




Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}





Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
if (guaranteeMessageOrder) {

for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}


accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);




if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {

transactionManager.markSequenceUnresolved(expiredBatch);
}
}


sensors.updateProduceRequestMetrics(batches);







long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);




pollTimeout = 0;
}


sendProduceRequests(batches, now);
return pollTimeout;
}

其中两个核心操作都是Accumulator的方法调用:ready和drain,直白点来说就是先获取能够发送数据的节点,然后从accumulator中取出相关数据,给每个节点发送请求(异步)。

Sender线程正常情况下是一个无限循环,而每次循环之间的等待主要是调用NetClient.poll(timeout, currentMs),这个timeout值的计算略复杂:

所有待发送的batch计算后取最小:每个batch的计算值为 lingerMs 减去 batch已经等待的时间,这个值记作 nextReadyCheckDelayMs

对所有没有ready的节点计算后取最小:每个节点取 pollDelayMs ,这个值记作 notReadyTimeout

accumulator有个数值成员 nextExpiryTimeMs ,应该是最近的一个Batch的过期时间,这个值减去当前时间, 记作 nextExpiryDurationMs

nextReadyCheckDelayMs 、notReadyTimeout、nextExpiryDurationMs 三者取小值就得出了 pollTimeout 值

所以基本上是以配置的 linger.ms 作为消息的缓冲时间

Accumulator

如上所述,Accumulator是实现批量发送核心介质,看一下这个类的注释:

1
2
3
4
5
6
7







核心两个点:缓冲队列和内存分配。内存分配暂不细说,在缓冲队列这一点上,它的实现方式是每个TopicPartition一个队列(ConcurrentMap),队列的具体实现为ArrayDeque,队列中的成员类型为ProducerBatch,正如其名称所示它代表了一批消息数据。在处理队列时均使用synchronized进行包装,以此便捷地解决并发问题。下面依次看一下上文提到过的三个主要方法。

append方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98

















public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs) throws InterruptedException {


appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {


Deque<ProducerBatch> dq = getOrCreateDeque(tp);


synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");

RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null)
return appendResult;
}




if (abortOnNewBatch) {

return new RecordAppendResult(null, false, false, true);
}


byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);

buffer = free.allocate(size, maxTimeToBlock);


nowMs = time.milliseconds();

synchronized (dq) {

if (closed)
throw new KafkaException("Producer closed while send in progress");




RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null) {

return appendResult;
}


MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callback, nowMs));

dq.addLast(batch);
incomplete.add(batch);



buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}

ProducerBatch内部操作主要是数据的转换和写入以及一些校验,这里不详细描述。简而言之append方法就是往队列里的Batch写入数据,如果队列尾部Batch写满了或者无法写入则创建新的batch写入数据后插入队列,创建batch时会涉及内存分配,防止无限膨胀内存溢出。

ready方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74





















public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();

boolean exhausted = this.free.queued() > 0;

for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
Deque<ProducerBatch> deque = entry.getValue();

synchronized (deque) {



ProducerBatch batch = deque.peekFirst();
if (batch != null) {

TopicPartition part = entry.getKey();
Node leader = cluster.leaderFor(part);
if (leader == null) {



unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !isMuted(part)) {

long waitedTimeMs = batch.waitedTimeMs(nowMs);

boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;

long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
boolean full = deque.size() > 1 || batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;

boolean sendable = full || expired || exhausted || closed || flushInProgress();


if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);



nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}

return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

这个方法返回了三项内容:

  1. 根据应用逻辑得出的可以发送数据的节点。并不是所有节点都处于可发送的状态,并且后续还会对这些节点进行网络连接层面的检查。
  2. 下一次ready检查的延迟。这个值会影响sender线程循环的polltimeout计算。
  3. 存在未知分片leader的topic集合。这是为了更新集群元数据信息,分片leader未知情况下无法发送数据到对应的分片。

drain方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91










public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();


Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
batches.put(node.id(), ready);
}
return batches;
}


private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
int size = 0;

List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
List<ProducerBatch> ready = new ArrayList<>();


int start = drainIndex = drainIndex % parts.size();

do {
PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
this.drainIndex = (this.drainIndex + 1) % parts.size();


if (isMuted(tp))
continue;

Deque<ProducerBatch> deque = getDeque(tp);
if (deque == null)
continue;


synchronized (deque) {


ProducerBatch first = deque.peekFirst();
if (first == null)
continue;


boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;

if (backoff)
continue;

if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {



break;
} else {

if (shouldStopDrainBatchesForPartition(first, tp))
break;

boolean isTransactional = transactionManager != null && transactionManager.isTransactional();
ProducerIdAndEpoch producerIdAndEpoch =
transactionManager != null ? transactionManager.producerIdAndEpoch() : null;

ProducerBatch batch = deque.pollFirst();

if (producerIdAndEpoch != null && !batch.hasSequence()) {

}

batch.close();
size += batch.records().sizeInBytes();
ready.add(batch);


batch.drained(now);
}
}
} while (start != drainIndex);
return ready;
}

该方法展现了Sender是如何从accumulator中取出待发送的数据,可以简述为:在计算得出可发送的节点后,获取每个节点持有leader的分片,从这些分片对应的队列中各自至多取出一个batch,得出每个节点需要发送的batch列表。

这些数据随后会被封装为请求对象,异步发送到对应的节点,然后结束一轮sender逻辑,等待网络客户端polltimeout后进入下一轮循环。

相关参数

核心相关参数可以查看Accumulator的构造函数,主要有如下:

  • linger.ms:如上所述,这个值会影响Sender线程的循环速度,该值越大循环等待的时间就可能更长,允许更多的消息合并到一个request中发送到服务端,减少网络IO,提高吞吐量。不过也要注意到Sender线程的循环等待时间并不是固定值,而且可以通过wake调用提前结束。
  • retry.backoff.ms:重试情况下需要等待的时间
  • delivery.timeout.ms:由于Send方法并不会立即发送数据,而是先写入缓冲队列,为了保证数据不会一直滞留不发送,可以配置这个超时时间,过期的batch会被取消发送,同时反馈失败给方法返回的Future对象
  • batch.size:限制每个batch的数据大小,在做写入操作时batch写满了便会生成新的batch。如果这个值设的较小会生成更多的batch导致需要发送的请求变多,降低吞吐量;如果这个值设置的过大,则会浪费内存,因为每次生成batch时会直接分配该值指定的内存大小。
  • buffer.memory:缓冲区能够使用的最大内存大小,防止缓冲区内存溢出。