博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka源码剖析(三)之日志管理-LogManager
阅读量:6845 次
发布时间:2019-06-26

本文共 10733 字,大约阅读时间需要 35 分钟。

  hot3.png

1 入口

/* start log manager */        // 启动日志管理模块        logManager = LogManager(config, zkUtils, brokerState, kafkaScheduler, time, brokerTopicStats)        logManager.startup()

2 开启代码

/**   *  Start the background threads to flush logs and do log cleanup    *  启动后台线程来冲洗日志和日志清理 依赖多线程   */  def startup() {    /* Schedule the cleanup task to delete old logs */    if(scheduler != null) {      info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))      scheduler.schedule("kafka-log-retention",                         cleanupLogs _,                         delay = InitialTaskDelayMs,                         period = retentionCheckMs,                         TimeUnit.MILLISECONDS)      info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))      scheduler.schedule("kafka-log-flusher",                         flushDirtyLogs _,                         delay = InitialTaskDelayMs,                         period = flushCheckMs,                         TimeUnit.MILLISECONDS)      scheduler.schedule("kafka-recovery-point-checkpoint",                         checkpointRecoveryPointOffsets _,                         delay = InitialTaskDelayMs,                         period = flushRecoveryOffsetCheckpointMs,                         TimeUnit.MILLISECONDS)      scheduler.schedule("kafka-log-start-offset-checkpoint",                         checkpointLogStartOffsets _,                         delay = InitialTaskDelayMs,                         period = flushStartOffsetCheckpointMs,                         TimeUnit.MILLISECONDS)      scheduler.schedule("kafka-delete-logs",                         deleteLogs _,                         delay = InitialTaskDelayMs,                         period = defaultConfig.fileDeleteDelayMs,                         TimeUnit.MILLISECONDS)    }    if(cleanerConfig.enableCleaner)      cleaner.startup()  }

3核心代码

image.png

3.1 相关配置信息

  • 配置项log.cleaner.threads,默认值1.用于配置清理过期日志的线程个数(用于日志合并).

  • 配置项log.cleaner.dedupe.buffer.size,默认值128MB,用于配置清理过期数据的内存缓冲区,这个用于数据清理时,选择的压缩方式时,用于对重复数据的清理排序内存,用于日志合并.

  • 配置项log.cleaner.io.buffer.load.factor,默认值0.9,用于配置清理内存缓冲区的数据装载因子,主要是用于hash,这个因子越小,对桶的重复可能越小,但内存占用越大,用于日志合并.

  • 配置项log.cleaner.io.buffer.size,默认值512KB,用于清理过期数据的IO缓冲区大小,用于日志合并.

  • 配置项message.max.bytes,默认值1000012字节,用于设置单条数据的最大大小.

  • 配置项log.cleaner.io.max.bytes.per.second,用于控制过期数据清理时的IO速度限制,默认不限制速度,用于日志合并.

  • 配置项log.cleaner.backoff.ms,用于定时检查日志是否需要清理的时间间隔(这个主要是在日志合并时使用),默认是15秒.

  • 配置项log.cleaner.enable,是否启用日志的定时清理,默认是启用.

  • 配置项num.recovery.threads.per.data.dir,用于在启动时,用于日志恢复的线程个数,默认是1.

  • 配置项log.flush.scheduler.interval.ms,用于检查日志是否被flush到磁盘,默认不检查.

  • 配置项log.flush.offset.checkpoint.interval.ms,用于定时对partition的offset进行保存的时间间隔,默认值60000ms.

  • 配置项log.retention.check.interval.ms,定期检查保留日志的时间间隔,默认值5分钟.

3.2 启动步骤zk 模块

// 首先先在zk 读取日志  这块就不多解释了    val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,     dedupeBufferSize = config.logCleanerDedupeBufferSize,     dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,     ioBufferSize = config.logCleanerIoBufferSize,     maxMessageSize = config.messageMaxBytes,     maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,     backOffMs = config.logCleanerBackoffMs,     enableCleaner = config.logCleanerEnable)   new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,     topicConfigs = topicConfigs,     defaultConfig = defaultLogConfig,     cleanerConfig = cleanerConfig,     ioThreads = config.numRecoveryThreadsPerDataDir,     flushCheckMs = config.logFlushSchedulerIntervalMs,     flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,     flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,     retentionCheckMs = config.logCleanupIntervalMs,     maxPidExpirationMs = config.transactionIdExpirationMs,     scheduler = kafkaScheduler,     brokerState = brokerState,     time = time,     brokerTopicStats = brokerTopicStats) }

3.3 启动运行流程

threadsafeclass LogManager(val logDirs: Array[File],                 val topicConfigs: Map[String, LogConfig], // note that this doesn't get updated after creation                 val defaultConfig: LogConfig,                 val cleanerConfig: CleanerConfig,                 ioThreads: Int,                 val flushCheckMs: Long,                 val flushRecoveryOffsetCheckpointMs: Long,                 val flushStartOffsetCheckpointMs: Long,                 val retentionCheckMs: Long,                 val maxPidExpirationMs: Int,                 scheduler: Scheduler,                 val brokerState: BrokerState,                 brokerTopicStats: BrokerTopicStats,                 time: Time) extends Logging {  val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"  val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"  val LockFile = ".lock"  val InitialTaskDelayMs = 30*1000  private val logCreationOrDeletionLock = new Object  private val logs = new Pool[TopicPartition, Log]()  private val logsToBeDeleted = new LinkedBlockingQueue[Log]()//  检查日志目录是否被创建,如果没有创建目录,同时检查目录是否有读写的权限.  createAndValidateLogDirs(logDirs)//  生成每个目录的.lock文件,并通过这个文件锁定这个目录.  private val dirLocks = lockLogDirs(logDirs)//  根据每个目录下的recovery-point-offset-checkpoint文件,生成出checkpoints的集合.这个用于定期更新每个partition的offset记录.  private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile)))).toMap  private val logStartOffsetCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile)))).toMap//  根据每一个目录,生成一个线程池,线程池的大小是num.recovery.threads.per.data.dir配置的值,//  读取每个目录下的topic-partitionid的目录,并根据zk中针对此topic的配置文件(或者默认的配置文件),通过offset-checkpoint中记录的此partition对应的offset,生成Log实例.并通过线程池来执行Log实例的加载,也就是日志的恢复.  loadLogs()  // public, so we can access this from kafka.admin.DeleteTopicTest  val cleaner: LogCleaner =    if(cleanerConfig.enableCleaner)      new LogCleaner(cleanerConfig, logDirs, logs, time = time)    else      null

3.4 清理过期日志

/**  * Runs through the log removing segments older than a certain age  */ private def cleanupExpiredSegments(log: Log): Int = {   if (log.config.retentionMs < 0)     return 0   val startMs = time.milliseconds   log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs) }

这块又涉及到一个配置:retention.ms,这个参数表示日志保存的时间。如果小于0,表示永不失效,也就没有了删除这一说。

当然,如果文件的修改时间跟当前时间差,大于设置的日志保存时间,就要执行删除动作了。具体的删除方法为:

/**   * Delete any log segments matching the given predicate function,   * starting with the oldest segment and moving forward until a segment doesn't match.   * @param predicate A function that takes in a single log segment and returns true iff it is deletable   * @return The number of segments deleted   */  def deleteOldSegments(predicate: LogSegment => Boolean): Int = {    lock synchronized {      //find any segments that match the user-supplied predicate UNLESS it is the final segment      //and it is empty (since we would just end up re-creating it)      val lastEntry = segments.lastEntry      val deletable =        if (lastEntry == null) Seq.empty        else logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastEntry.getValue.baseOffset || s.size > 0))      val numToDelete = deletable.size      if (numToDelete > 0) {        // we must always have at least one segment, so if we are going to delete all the segments, create a new one first        if (segments.size == numToDelete)          roll()        // remove the segments for lookups        deletable.foreach(deleteSegment(_))      }      numToDelete    }  }

这块的逻辑是:根据传入的predicate来判断哪些日志符合被删除的要求,放入到deletable中,最后遍历deletable,进行删除操作。

private def deleteSegment(segment: LogSegment) {  info("Scheduling log segment %d for log %s for deletion.".format(segment.baseOffset, name))  lock synchronized {    segments.remove(segment.baseOffset)    asyncDeleteSegment(segment)  }}  private def asyncDeleteSegment(segment: LogSegment) {  segment.changeFileSuffixes("", Log.DeletedFileSuffix)  def deleteSeg() {    info("Deleting segment %d from log %s.".format(segment.baseOffset, name))    segment.delete()  }  scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)}

这块是一个异步删除文件的过程,包含一个配置:file.delete.delay.ms。表示每隔多久删除一次日志文件。删除的过程是先把日志的后缀改为.delete,然后定时删除。

3.5 清理过大日志

/**  *  Runs through the log removing segments until the size of the log  *  is at least logRetentionSize bytes in size  */ private def cleanupSegmentsToMaintainSize(log: Log): Int = {   if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)     return 0   var diff = log.size - log.config.retentionSize   def shouldDelete(segment: LogSegment) = {     if(diff - segment.size >= 0) {       diff -= segment.size       true     } else {       false     }   }   log.deleteOldSegments(shouldDelete) }

这块代码比较清晰,如果日志大小大于retention.bytes,那么就会被标记为待删除,然后调用的方法是一样的,也是deleteOldSegments。就不赘述了。

3.6 定期对log的磁盘缓冲区进行flush:

这个通过后台的调度组件定期去执行LogManager中的flushDirtyLogs的函数,

这个函数中迭代所有的partition的log,并执行flush的操作,这个操作中通过当前最后一个offset找到上一次进行checkpoint的offset与当前的offset中间的segment,并执行segment中log与index的flush操作.对应log文件执行文件管道的force函数,对于index文件,执行文件管道map的force函数.

private def flushDirtyLogs() = {  debug("Checking for dirty logs to flush...")  for ((topicAndPartition, log) <- logs) {    try {      val timeSinceLastFlush = time.milliseconds - log.lastFlushTime      debug("Checking if flush is needed on " + topicAndPartition.topic            + " flush interval  " + log.config.flushMs +            " last flushed " + log.lastFlushTime + " time since last flush: "            + timeSinceLastFlush)      if(timeSinceLastFlush >= log.config.flushMs)        log.flush    } catch {      case e: Throwable =>        error("Error flushing topic " + topicAndPartition.topic, e)    }  }}

3.7 定期对partition的offset进行checkpoint操作:

这个通过后台的调度组件定期去

执行LogManager中的checkpointRecoveryPointOffsets的函数,

def checkpointRecoveryPointOffsets() {  this.logDirs.foreach(checkpointLogsInDir)}

这里对每个dir中存储的partition的最后一个offset进行checkpoint的操作.

在这个函数中,迭代每个dir中对应的partition的offset记录到对应目录下的checkpoint文件中.

第一行写入一个0,表示是checkpoint文件的版本.

第二行写入的是partition的个数,当前checkpoint时,这个dir已经存在数据的partition的个数.

后面对应第二个的值个数的条数的数据,每条数据写入topic partition offset的值.

private def checkpointLogsInDir(dir: File): Unit = {  val recoveryPoints = this.logsByDir.get(dir.toString)  if (recoveryPoints.isDefined) {    this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(        _.recoveryPoint))  }}

LogCleaner实例中,定期执行的日志压缩: 这个实例中,通过CleanerThread的线程进行处理:

  1. 配置项log.cleaner.io.max.bytes.per.second,用于控制这个线程操作的IO速度,默认不控制速度
  2. 配置项log.cleaner.dedupe.buffer.size,默认值128MB,用于配置清理过期数据的内存缓冲区,这个用于数据清理时,选择的压缩方式时,用于对重复数据的清理排序内存.
  3. 配置项log.cleaner.threads,默认值1.用于配置清理过期日志的线程个数.
  4. 配置项log.cleaner.backoff.ms,用于定时检查日志是否需要清理的时间间隔,默认是15秒.

输入图片说明

转载于:https://my.oschina.net/sunzy/blog/1636202

你可能感兴趣的文章
小程序页面之间的通讯利器 - nsevent
查看>>
JavaScript从初级往高级走系列————ES6
查看>>
Vue项目Webpack优化实践,构建效率提高50%
查看>>
mysql命令集
查看>>
学习Vue.js-Day3.1
查看>>
tradingview-websocket进阶
查看>>
Vue动态加载异步组件
查看>>
[面试专题]从for循环看let和var的区别
查看>>
有用的Guava(二)
查看>>
关于BEM的反思
查看>>
前端知识点总结
查看>>
[vscode]“收藏”那些经常访问的资源
查看>>
Django | admin 后台美化处理 JSONField
查看>>
统计语言模型浅谈
查看>>
VR里面做分屏后处理(Split screen post process UE4)
查看>>
NumberPicker实现滑动选择
查看>>
.NET Core 3.0中的数据库驱动框架System.Data
查看>>
ReactOS:基于Windows的开源操作系统
查看>>
IBM AI辩手对战世界级人类辩手,炒作还是秀肌肉?
查看>>
Facebook如何重新设计HHVM JIT编译器的性能
查看>>