KafkaProducer在通过send方法发送消息时,会首先将消息追加到一个名为 RecordAccumulator 的组件中。RecordAccumulator又名消息累加器,可以看成是KafkaProducer的一块消息缓冲区,主要用来按批次缓存消息,以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。

    // KafkaProducer.java          RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs); 




    // RecordAccumulator.java          public final class RecordAccumulator {              private volatile boolean closed;         private final AtomicInteger flushesInProgress;         private final AtomicInteger appendsInProgress;         private final int batchSize;         private final CompressionType compression;         private final long lingerMs;         private final long retryBackoffMs;         // 缓冲池,里面是一个个ByteBuffer         private final BufferPool free;         private final Time time;         // 分区和一批次消息的映射Map         private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;         private final IncompleteRecordBatches incomplete;         private final Set<TopicPartition> muted;         private int drainIndex;              public RecordAccumulator(int batchSize, long totalSize, CompressionType compression,                                  long lingerMs, long retryBackoffMs, Metrics metrics, Time time) {             this.drainIndex = 0;             this.closed = false;             this.flushesInProgress = new AtomicInteger(0);             this.appendsInProgress = new AtomicInteger(0);             this.batchSize = batchSize;             this.compression = compression;             this.lingerMs = lingerMs;             this.retryBackoffMs = retryBackoffMs;             this.batches = new CopyOnWriteMap<>();             String metricGrpName = "producer-metrics";             // 创建内部的BufferPool             this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);             this.incomplete = new IncompleteRecordBatches();             this.muted = new HashSet<>();             this.time = time;             registerMetrics(metrics, metricGrpName);         }     } 


  • BufferPool: 这是一块保存ByteBuffer的缓冲池,用来控制消息缓存的大小,消息的数据最终就是写到它的ByteBuffer中;
  • CopyOnWriteMap: 这是一个”写时复制“的Map,保存分区和批次消息的映射关系:<TopicPartition, Deque<RecordBatch>>,因为对分区的操作基本都是并发且读多写少的,所以适合”写时复制“算法。

1.1 BufferPool


    // BufferPool.java          public final class BufferPool {         // 缓冲池大小,默认32MB,通过参数buffer.memory控制         private final long totalMemory;                  // batch大小,也就是一个ByteBuffer的大小,默认16KB,通过batch.size控制         private final int poolableSize;             private final ReentrantLock lock;         // 可用         private final Deque<ByteBuffer> free;         private final Deque<Condition> waiters;         private long availableMemory;         private final Metrics metrics;         private final Time time;         private final Sensor waitTime;              public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {             this.poolableSize = poolableSize;             this.lock = new ReentrantLock();             this.free = new ArrayDeque<ByteBuffer>();             this.waiters = new ArrayDeque<Condition>();             this.totalMemory = memory;             this.availableMemory = memory;             this.metrics = metrics;             this.time = time;             this.waitTime = this.metrics.sensor("bufferpool-wait-time");             MetricName metricName = metrics.metricName("bufferpool-wait-ratio",                                                        metricGrpName,                                                        "The fraction of time an appender waits for space allocation.");             this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));         }         //...     } 


Deque里ByteBuffer数量 * 16kb 就是已使用的缓存空间大小,availableMemory就是剩余可使用的缓存空间大小,最大32mb,每用掉一个batch,就要减去batchSize的大小,即132mb - 16kb


    // BufferPool.java          public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {         if (size > this.totalMemory)             throw new IllegalArgumentException("Attempt to allocate " + size                                                + " bytes, but there is a hard limit of "                                                + this.totalMemory                                                + " on memory allocations.");         this.lock.lock();         try {             // 有可用空间,且要分配的ByteBuffer块大小就是poolableSize             if (size == poolableSize && !this.free.isEmpty())                 return this.free.pollFirst();                  // 计算剩余可用空间             int freeListSize = this.free.size() * this.poolableSize;             if (this.availableMemory + freeListSize >= size) {                 freeUp(size);                 this.availableMemory -= size;                 lock.unlock();                 return ByteBuffer.allocate(size);             } else {                 //...             }         } finally {             if (lock.isHeldByCurrentThread())                 lock.unlock();         }     } 

1.2 RecordBatch

RecordAccumulator会按照分区,将同一个分区的消息打包成一个个 RecordBatch ,每一个RecordBatch可能包含多条消息,这些消息在内存里是按照一定的格式紧凑拼接的:

    // RecordBatch.java          public final class RecordBatch {              final long createdMs;         final TopicPartition topicPartition;         final ProduceRequestResult produceFuture;              private final List<Thunk> thunks = new ArrayList<>();              // 内存消息构建器,这个很重要,最终是它将消息拼接         private final MemoryRecordsBuilder recordsBuilder;              volatile int attempts;         int recordCount;         int maxRecordSize;         long drainedMs;         long lastAttemptMs;         long lastAppendTime;         private String expiryErrorMessage;         private AtomicBoolean completed;         private boolean retry;              public RecordBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) {             this.createdMs = now;             this.lastAttemptMs = now;             this.recordsBuilder = recordsBuilder;             this.topicPartition = tp;             this.lastAppendTime = createdMs;             this.produceFuture = new ProduceRequestResult(topicPartition);             this.completed = new AtomicBoolean();         }              // 在内存里拼接消息         public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value,                                               Callback callback, long now) {             // 空间不足             if (!recordsBuilder.hasRoomFor(key, value)) {                 return null;             } else {                 // 通过MemoryRecordsBuilder,追加消息到内存                 long checksum = this.recordsBuilder.append(timestamp, key, value);                 this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));                 this.lastAppendTime = now;                 FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length);                 if (callback != null)                     thunks.add(new Thunk(callback, future));                 this.recordCount++;                 return future;             }         }              //...     } 

可以看到,消息追加的操作最终是通过 MemoryRecordsBuilder 完成的,每一条消息都是以crc|magic|attribute|timestamp...这样的格式最终追加到分配到ByteBuffer中:

    // MemoryRecordsBuilder.java          public long append(long timestamp, byte[] key, byte[] value) {         return appendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 1, timestamp, key, value);     }          public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) {         try {             if (lastOffset >= 0 && offset <= lastOffset)                 throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset));                  int size = Record.recordSize(magic, key, value);                  // LogEntry日志项,appendStream就是由ByteBuffer转化而来             LogEntry.writeHeader(appendStream, toInnerOffset(offset), size);                  if (timestampType == TimestampType.LOG_APPEND_TIME)                 timestamp = logAppendTime;             long crc = Record.write(appendStream, magic, timestamp, key, value, CompressionType.NONE, timestampType);             recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD);             return crc;         } catch (IOException e) {             throw new KafkaException("I/O exception when writing to the append stream, closing", e);         }     } 



KafkaProducer发送消息时,内部调用了RecordAccumulator.append方法,消息会被追加到 RecordAccumulator 内部的某个双端队列( Deque )中,并且多个消息会被打包成一个批次——RecordBatch:

2.1 整体流程


  1. 首先,根据消息的分区,从CopyOnWriteMap中找到一个已有的或新建一个Deque<RecordBatch>
  2. 每个RecordBatch可用的缓存块默认大小为16KB,如果消息超过这个大小,就单独作为一个自定义大小的batch入队;
  3. 如果消息没有超过16kb,就将多个消息打包成一个batch入队。
    // RecordAccumulator.java          public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value,                                      Callback callback, long maxTimeToBlock) throws InterruptedException {         appendsInProgress.incrementAndGet();         try {             // 1.根据分区,从内部的CopyOnWriteMap获取或新建一个双端队列             Deque<RecordBatch> dq = getOrCreateDeque(tp);             synchronized (dq) {                 if (closed)                     throw new IllegalStateException("Cannot send after the producer is closed.");                 // 尝试往Dequeue中追加消息,不存在可用Batch或Batch可用空间不足会追加失败                 RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);                 if (appendResult != null)                     // 追加成功                     return appendResult;             }                  // 2.执行到这里,说明Dequeue队尾没有可用batch,或有batch但可用空间不足             // 计算待新建的batch大小             int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));             log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());             // 从BufferPool中获取一块可用的ByteBuffer,如果空间不足会阻塞             ByteBuffer buffer = free.allocate(size, maxTimeToBlock);             synchronized (dq) {                 if (closed)                     throw new IllegalStateException("Cannot send after the producer is closed.");                 // 再次追加消息,不存在可用Batch会追加失败                 RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);                 // 这里用了一个双重锁检查,主要针对多个线程同时获取多个ByteBuffer的情况进行处理                 if (appendResult != null) {                     // 归还buffer                     free.deallocate(buffer);                     return appendResult;                 }                      // 3.执行到这里,说明是首次往Deque存入batch                 // MemoryRecordsBuilder负责真正的消息往ByteBuffer写入                 MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, compression, TimestampType.CREATE_TIME, this.batchSize);                 // 创建一个RecordBatch并入队,持有MemoryRecordsBuilder                 RecordBatch batch = new RecordBatch(tp, recordsBuilder, time.milliseconds());                 FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));                 dq.addLast(batch);                 incomplete.add(batch);                 return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);             }         } finally {             appendsInProgress.decrementAndGet();         }     } 
    // RecordAccumulator.java          // 新建或获取已存在的Deque<RecordBatch>     private Deque<RecordBatch> getOrCreateDeque(TopicPartition tp) {         // 从内部的CopyOnWriteMap获取         Deque<RecordBatch> d = this.batches.get(tp);         if (d != null)             return d;         // 如果不存在,则新建一个         d = new ArrayDeque<>();         Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d);         if (previous == null)             return d;         else             return previous;     }               // 尝试向Deque<RecordBatch>中追加消息     private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {         // 拿出队尾的Batch         RecordBatch last = deque.peekLast();         if (last != null) {             FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());             if (future == null)                 last.close();             else                 return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);         }         return null;     } 


  • request.max.size: 每条消息的最大大小,默认1MB;
  • batch.size: 每个RecordBatch的大小,默认16KB;
  • buffer.memory: 消息缓冲区的大小,默认32MB。




  • RecordAccumulator会按照分区维度将消息缓存,底层采用了一个CopyOnWriteMap来保存这种映射关系;
  • RecordAccumulator会将多个消息打成一个RecordBatch,目的是后续Sender线程可以按批次发送消息,减少网络传输的开销,提升整体吞吐量;
  • RecordAccumulator内部有一个缓冲池BufferPool,缓冲池里面划分了一块块固定大小的ByteBuffer,每一个RecordBatch都会使用一个ByteBuffer来写入多条消息,如果某条消息的大小超过单个ByteBuffer的默认大小(16KB),就会自定义一块ByteBuffer;
  • 消息最终是以一种紧凑的二进制格式offset | size | crc | magic | attibutes | timestamp | key size | key | value size | value写入到底层的ByteBuffer里去。

