defstartup(): Unit = { // 省略... //当inter-broker protocol (IBP) < 1.0的时候,如果存在logDir的一些异常则直接让整个Broker启动失败; val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_1_0_IV0 logDirFailureHandler = newLogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure) logDirFailureHandler.start() } privateclassLogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extendsShutdownableThread(name) { overridedefdoWork(): Unit = { //从队列 offlineLogDirQueue 取数据 val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir() if (haltBrokerOnDirFailure) { fatal(s"Halting broker because dir $newOfflineLogDir is offline") Exit.halt(1) } handleLogDirFailure(newOfflineLogDir) } }
1 2 3 4 5 6 7 8 9 10 11
// logDir should be an absolute path // sendZkNotification is needed for unit test defhandleLogDirFailure(dir: String, sendZkNotification: Boolean = true): Unit = { // 省略... logManager.handleLogDirFailure(dir)
if (sendZkNotification) zkClient.propagateLogDirEvent(localBrokerId) warn(s"Stopped serving replicas in dir $dir") }
/** * Lock all the given directories */ privatedeflockLogDirs(dirs: Seq[File]): Seq[FileLock] = { dirs.flatMap { dir => try { val lock = newFileLock(newFile(dir, LockFile)) if (!lock.tryLock()) thrownewKafkaException("Failed to acquire lock on file .lock in " + lock.file.getParent + ". A Kafka instance in another process or thread is using this directory.") Some(lock) } catch { case e: IOException => logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while locking directory $dir", e) None } } } defmaybeAddOfflineLogDir(logDir: String, msg: => String, e: IOException): Unit = { error(msg, e) if (offlineLogDirs.putIfAbsent(logDir, logDir) == null) offlineLogDirQueue.add(logDir) }