kafka管控推荐使用 滴滴开源 的 Kafka运维管控平台 更符合国人的操作习惯 ,
更强大的管控能力 ,更高效的问题定位能力 、更便捷的集群运维能力 、更专业的资源治理 、 更友好的运维生态
客户端发起请求
我们在分析消费者的时候, 有看到调用FindCoordinatorRequest的请求
1 2 3 4 5 6 7 8 9 10 11 12 13
| private RequestFuture<Void> sendFindCoordinatorRequest(Node node) { log.debug("Sending FindCoordinator request to broker {}", node); FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder( new FindCoordinatorRequestData() .setKeyType(CoordinatorType.GROUP.id()) .setKey(this.rebalanceConfig.groupId)); return client.send(node, requestBuilder) .compose(new FindCoordinatorResponseHandler()); }
|
Broker处理请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
|
def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = { val findCoordinatorRequest = request.body[FindCoordinatorRequest] if (findCoordinatorRequest.data.keyType == CoordinatorType.GROUP.id && !authorize(request.context, DESCRIBE, GROUP, findCoordinatorRequest.data.key)) sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception) else if (findCoordinatorRequest.data.keyType == CoordinatorType.TRANSACTION.id && !authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key)) sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) else { val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { case CoordinatorType.GROUP => val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key) val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName) (partition, metadata)
case CoordinatorType.TRANSACTION => val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key) val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName) (partition, metadata)
case _ => throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request") }
def createResponse(requestThrottleMs: Int): AbstractResponse = { def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = { new FindCoordinatorResponse( new FindCoordinatorResponseData() .setErrorCode(error.code) .setErrorMessage(error.message) .setNodeId(node.id) .setHost(node.host) .setPort(node.port) .setThrottleTimeMs(requestThrottleMs)) } val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) { createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) } else { val coordinatorEndpoint = topicMetadata.partitions.asScala .find(_.partitionIndex == partition) .filter(_.leaderId != MetadataResponse.NO_LEADER_ID) .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId)) .flatMap(_.getNode(request.context.listenerName)) .filterNot(_.isEmpty)
coordinatorEndpoint match { case Some(endpoint) => createFindCoordinatorResponse(Errors.NONE, endpoint) case _ => createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) } } trace("Sending FindCoordinator response %s for correlation id %d to client %s." .format(responseBody, request.header.correlationId, request.header.clientId)) responseBody } sendResponseMaybeThrottle(request, createResponse) } }
|
简单校验
根据协调器类型判断是否有被授权。协调器类型有 GROUP((byte) 0), TRANSACTION((byte) 1)
两种
获取分区号和元信息
这里的接口分两种情况,一个是协调列席为GROUP 一个是 TRANSACTION
他们的处理逻辑都是一样的,只是处理的Topic不一样
GROUP 对应的Topic是 __consumer_offsets
TRANSACTION 对应的Topic是__transaction_state
这里我们主要分析一下 GROUP的情况
- 去zk获取
/brokers/topic/__consumer_offsets
数据 找到消费者Topic的分区总数。默认是50. (由offsets.topic.num.partitions
控制)找到分区数之和后, 则计算 Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
(groupID按分区数取模运算)获取到了分区号partition
;
- 然后接着获取该Topic的元信息, 这里需要注意的是 去获取元信息应该走的是什么 监听协议(listenerName) 呢?这个主要是看当前处理请求的Broker是通过哪个入口来的。比如说该Broker有两个监听口,
listeners = INTER://xxx.xx.xx.100:9091, OUTSIDE://xxx.xx.xx.101:9092
.如果客户端发起请求的时候是对xxx.xx.xx.101:9092
发起的请求,那么这个对应的监听器就是 OUTSIDE
. 那么Broker去获取__consumer_offsets
元信息发起请求的时候也是会用的 OUTSIDE
协议。
- 如果发现没有这个Topic的元信息,则需要去创建
__consumer_offsets
Topic 。 注意:创建这个Topic的的几个特殊属性:
属性 |
值 |
描述 |
cleanup.policy |
compact |
日志清理策略为 :紧缩 |
segment.bytes |
10010241024 |
一个日志段的大小 |
compression.type |
producer |
压缩类型 为跟生产者保持一致 |
构建返回数据 createResponse
这里才是真正的找到协调器的主要逻辑, 这里的判断逻辑是
上面我们获取到的分区号是partition
, 我们同样获取到了__consumer_offsets
的元信息Metadata。
那我们就可以获取到这个分区号, 并且就能够找到该分区的LeaderId所属在哪个Broker上。
知道了哪个Broker, 那我们就能够获取到对应的EndPoint, 一个Broker可能同时有多个EndPoint(配置了多个监听器),那么我们应该使用哪个EndPoint呢?
这个的判断逻辑与上面说过的一样,客户端发起请求时候的监听器是哪个,那么这里就应该用哪个监听器。

注意:如果找到的分区Leader不存在 那么这个协调器就不存在
然后会返回异常:
1 2 3
| The coordinator is not available
|
问题
- 如果客户端走的外网监听器访问的集群,那么在客户端发起请求之后到集群内部,触发内部调用链的请求,那么内部这个调用链是用什么监听器访问的呢?
从客户端 -> Broker -> 其他Broker. 这是一个调用链路,从最开始用的是什么监听器那么这条链路上都是用的这个监听器!具体请看:多网络情况下,Kafka客户端如何选择合适的网络发起请求

作者石臻臻,工作8年的互联网老兵,丰富的开发和管理经验,全网「 粉丝数4万 」,
先后从事 「 电商 」、「 中间件 」、「 大数据」 等工作
现在任职于「 滴滴技术专家 」岗位,从事开源建设工作
目前在维护 个人公众号「 石臻臻的杂货铺 」 ; 关注公众号会有「 日常送书活动 」;
欢迎进「 高质量 」 「 滴滴开源技术答疑群 」 , 群内每周技术专家轮流值班答疑
可帮忙「 内推 」一二线大厂
