privatevoidcompleteFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) { // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call produceFuture.set(baseOffset, logAppendTime, exception);
// execute callbacks for (Thunk thunk : thunks) { try { if (exception == null) { RecordMetadatametadata= thunk.future.value(); if (thunk.callback != null) thunk.callback.onCompletion(metadata, null); } else { if (thunk.callback != null) thunk.callback.onCompletion(null, exception); } } catch (Exception e) { log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e); } }
public ByteBuffer allocate(int size, long maxTimeToBlockMs)throws InterruptedException {
try { // check if we have a free buffer of the right size pooled if (size == poolableSize && !this.free.isEmpty()){ returnthis.free.pollFirst(); }
// now check if the request is immediately satisfiable with the // memory on hand or if we need to block intfreeListSize= freeSize() * this.poolableSize; if (this.nonPooledAvailableMemory + freeListSize >= size) { //省略... } else { // 当内存不够的时候将会被阻塞阻塞 intaccumulated=0; ConditionmoreMemory=this.lock.newCondition(); try { longremainingTimeToBlockNs= TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); // loop over and over until we have a buffer or have reserved // enough memory to allocate one // 当内存不够,则会阻塞,当有内存释放的时候会唤醒阻塞,继续内存分配 // 但是释放的内存不一定满足当前需要的内存size,则继续阻塞,等到下一次的内存继续释放,循环这个过程知道内存足够分配。 // 阻塞的最大时长maxTimeToBlockMs,注意:就算循环了多次唤醒分配,这个时候是从第一次开始算的,也就是累积时间超过这个时间就会超时 while (accumulated < size) { longstartWaitNs= time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { System.out.println("allocate开始阻塞,等待内存释放,剩余等待时间:="+remainingTimeToBlockNs); waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } finally { longendWaitNs= time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); recordWaitTime(timeNs); }
if (this.closed) thrownewKafkaException("Producer closed while allocating memory");
if (waitingTimeElapsed) { this.metrics.sensor("buffer-exhausted-records").record(); thrownewBufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); }
remainingTimeToBlockNs -= timeNs;
// check if we can satisfy this request from the free list, // otherwise allocate memory if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { // just grab a buffer from the free list // 从内存池中获取第一个bytebuffer返回 buffer = this.free.pollFirst(); accumulated = size; System.out.println("allocate 被唤醒,将bufferPool第一个pool返回并分配:size="+size);
[2022-04-25 13:42:23,457] WARN [Producer clientId=console-producer] Got error produce response with correlation id 47 on topic-partition Topic4-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) [2022-04-25 13:42:54,604] WARN [Producer clientId=console-producer] Received invalid metadata error in produce request on partition Topic4-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
异常源码
Sender#completeBatch
1 2 3 4 5 6 7 8 9 10 11 12
if (error.exception() instanceof InvalidMetadataException) { if (error.exception() instanceof UnknownTopicOrPartitionException) { log.warn("Received unknown topic or partition error in produce request on partition {}. The " + "topic-partition may not exist or the user may not have Describe access to it", batch.topicPartition); } else { log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " + "to request metadata update now", batch.topicPartition, error.exception(response.errorMessage).toString()); } metadata.requestUpdate(); }
异常原因
出现上面2行警告日志的原因是分别是
1 2
Received invalid metadata error in produce request on partition Topic4-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
if (!expiredBatches.isEmpty()) log.trace("Expired {} batches in accumulator", expiredBatches.size()); for (ProducerBatch expiredBatch : expiredBatches) { StringerrorMessage="Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation"; failBatch(expiredBatch, -1, NO_TIMESTAMP, newTimeoutException(errorMessage), false); if (transactionManager != null && expiredBatch.inRetry()) { // This ensures that no new batches are drained until the current in flight batches are fully resolved. transactionManager.markSequenceUnresolved(expiredBatch); } }