/** * 使⽤Java的NIO * 循环检查是否有新的连接尝试 * 轮询的⽅式将请求交给各个processor来处理。 */ def run() { serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) startupComplete() try { var currentProcessor = 0 while (isRunning) { try { val ready = nioSelector.select(500) if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { try { val key = iter.next iter.remove() if (key.isAcceptable) // 指定⼀个processor处理请求 accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.") // round robin to the next processor thread // 通过轮询的⽅式找到下⼀个processor线程 currentProcessor = (currentProcessor + 1) % processors.length } catch { case e: Throwable => error("Error while accepting connection", e) } } } } catch { // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due // to a select operation on a specific channel or a bad request. We don't want // the broker to stop responding to requests from other clients in these scenarios. case e: ControlThrowable => throw e case e: Throwable => error("Error occurred", e) } } } finally { debug("Closing server socket and selector.") swallowError(serverChannel.close()) swallowError(nioSelector.close()) shutdownComplete() } }
/** * 处理所有请求的顶级⽅法,使⽤模式匹配,交给具体的api来处理 */ def handle(request: RequestChannel.Request) { try { trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" + s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}") request.header.apiKey match { case ApiKeys.PRODUCE => handleProduceRequest(request) case ApiKeys.FETCH => handleFetchRequest(request) case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request) case ApiKeys.METADATA => handleTopicMetadataRequest(request) case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request) case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request) case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request) case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request) case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request) case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request) case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request) case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request) case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request) case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request) case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request) case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request) case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request) case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request) case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request) case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request) case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request) case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request) case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request) case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request) case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request) case ApiKeys.END_TXN => handleEndTxnRequest(request) case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request) case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request) case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request) case ApiKeys.CREATE_ACLS => handleCreateAcls(request) case ApiKeys.DELETE_ACLS => handleDeleteAcls(request) case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request) case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request) case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request) case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request) case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request) case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request) } } catch { case e: FatalExitError => throw e case e: Throwable => handleError(request, e) } finally { request.apiLocalCompleteTimeNanos = time.nanoseconds } }