说明

LevelDB是日志型KV存储系统,所有的更新操作(包括插入、更新、删除)都以(key, value)的形式追加至log文件的尾部,这样,系统中更新的访问模式就变成了顺序写,很适合更新频繁的应用场景。

在LevelDB中,磁盘上的数据文件格式如下:

LevelDB详解: Compaction

最终承载KV记录的数据文件(SSTable)在磁盘上被进行了分层,称之为level。levelN+1的数据文件是由levelN的文件处理得到。

LevelDB之所以需要Compaction是有以下几方面原因:

  1. 数据文件中的被删除的KV记录占用的存储空间需要被回收;
  2. 将key存在重合的不同Level的SSTable进行Compaction,可以减少磁盘上的文件数量,提高读取效率。

LevelDB中存在两种类型的Compaction:Minor Compaction和Major Compaction。

Minor Compaction

Major Compaction

Compaction时机

从前面的说明中我们知道,Compaction是对系统的一种优化,那么其必然不能阻塞正常的数据读写流程中。

参考LevelDB的实现,有两个地方可能会启动Compaction任务:

  1. 后台线程的定期任务
  2. 正常的读写流程中判定系统达到了一个临界状态,此时必须要进行Compaction

需要说明的是2,这里进行的Compaction也只是发起一次异步的Compaction任务,不会阻塞正常的读写流程。

启动时

Db_impl.cc::Open()在完成所有的启动准备工作以后,会发起一次Compaction任务,如下:

Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
    ......
    if (s.ok()) {
        impl->DeleteObsoleteFiles();
        // 这里发起一次Compaction任务
        impl->MaybeScheduleCompaction();
    }
    ......
}

疑问:为什么在启动时需要触发一次Compaction任务?

数据写入过程中

在这里还不得不简单说下另外一个函数MakeRoomForWrite。Leveldb在写入数据前需要调用这个函数来确认内存的memtable中有空间可以写入kv记录。

在MakeRoomForWrite函数中:

  1. 先判断是否有后台合并错误,如果有,则啥都不做;如果没有,则执行2;
  2. 如果后台没错误,则判断mem_的大小是是否小于事先定义阈值:如果是,则啥都不做返回,继续插入数据;如果大于事先定义的阈值,则需要进行一次合并;
  3. 如果imm_不为空,代表后台有线程在执行合并,在此等待;
  4. 如果0层文件个数太多,则也需要等待;
  5. 如果都不是以上情况,表示此时memtable空间不足且immu memtable不为空,需要将immune memtable的数据dump至磁盘sstable文件中。 这就是Minor Compaction了,调用MaybeScheduleCompaction()函数执行此事。

说明下为什么会有第4点:因为每进行一次minor compaction,level 0层文件个数可能超过事先定义的值,所以会又进行一次major compcation。而这次major compaction,imm_是空的,所以才会有第4条判断。

Compaction技术原理

Minor Compaction

Minor Compaction的原理比较简单:无非就是将内存immune memtable的数据dump至磁盘上的sstable文件。LevelDB实现是函数DBImpl::CompactMemTable(),这里不再赘述,请自行阅读代码。

Major Compaction

Major Compaction的原理说起来就复杂了,不像Minor Compaction,它因为需要涉及到多个level众多SSTable之间的合并。需要解决的基本问题有2:

  • 选择哪些level的哪些SSTable进行Compaction ?
  • SSTable之间如何进行Compaction ?

选择Compaction对象

在搞清楚第一个问题之前,我们需要注意的一点是:除level0外,每个level内的SSTable之间不会有key的重叠:也就是说,某一个key只会出现在该level(level > 0)内的某个SSTable中。
但是某个key可能出现在多个不同level的SSTable中。因此,我们可以设想,Compaction应该是发生在不同的level之间的SSTable之间。

例如,对于一个记录key = ‘hello’

LevelDB详解: Compaction

其更新记录可能同时存在于level k的SSTable X和level k + 1 SSTable N的两个SSTable文件内。此时,

对level K的某个SSTable S1,Level K+1中能够与它进行Compaction的SSTable必须满足条件:

与S1存在key范围的重合

LevelDB详解: Compaction

如上图所示,对于SSTable X,其key范围为hello ~ world,在level K+1中,SSTable M的key范围为 mine ~ yours,与SSTable X存在key范围的重合,同时SSTable N也是这样。因此,对于 SSTable X,其Compaction的对象是Level K+1的SSTable M和SSTable N。

这里还有另外一种特殊情况:k = 0。前面我们提到过,Level 0的SSTable之间也会存在key范围的重合,因此进行Compaction的时候,不仅需要在level 1寻找可Compaction的SSTable,同时也要在level 0寻找,如下图示:

LevelDB详解: Compaction

关于挑选level K + 1层的可Compaction对象实现位于version_set.cc:PickCompaction()

Compaction* VersionSet::PickCompaction() {
  Compaction* c;
  int level;

  const bool size_compaction = (current_->compaction_score_ >= 1);
  const bool seek_compaction = (current_->file_to_compact_ != NULL);
  if (size_compaction) {
    level = current_->compaction_level_;
    assert(level >= 0);
    assert(level+1 < config::kNumLevels);
    c = new Compaction(options_, level);

    // Pick the first file that comes after compact_pointer_[level]
    for (size_t i = 0; i < current_->files_[level].size(); i++) {
      FileMetaData* f = current_->files_[level][i];
      if (compact_pointer_[level].empty() ||
          icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
        c->inputs_[0].push_back(f);
        break;
      }
    }
    if (c->inputs_[0].empty()) {
      // Wrap-around to the beginning of the key space
      c->inputs_[0].push_back(current_->files_[level][0]);
    }
  } else if (seek_compaction) {
    level = current_->file_to_compact_level_;
    c = new Compaction(options_, level);
    c->inputs_[0].push_back(current_->file_to_compact_);
  } else {
    return NULL;
  }

  c->input_version_ = current_;
  c->input_version_->Ref();

  // Files in level 0 may overlap each other, so pick up all overlapping ones
  if (level == 0) {
    InternalKey smallest, largest;
    GetRange(c->inputs_[0], &smallest, &largest);
    current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
    assert(!c->inputs_[0].empty());
  }

  SetupOtherInputs(c);

  return c;
}

LevelDB中,Compaction操作有两种触发方式:

  • 某一level的文件数太多
  • 某一文件的查找次数超过允许值

因此,便有了代码中的size_compaction和seek_compaction的判断。在进行合并时,将优先考虑文件数过多的情况。

接下来判断如果当前准备Compaction的level为0的话,需要在该层寻找可以进行Compaction的其他SSTable,调用的是函数GetOverlappingInputs。

最后,调用SetupOtherInputs从level K+1的SSTable中寻找可Compaction的SSTable,将其信息保存在Compaction对象中返回,这里就不再赘述其实现了,可自行阅读代码。

Compaction过程

经过上面的过程我们得到了待合并的SSTable,下一步就是合并这些SSTable。

所谓合并,就是去掉垃圾数据,保留有用信息的过程,最终的结果是多个SSTable文件会被压缩成为一个SSTable文件。

由于SSTable中存储的一条条的KV记录(包含特殊的删除记录),因此,对垃圾数据的定义是:

  • 如果该KV记录是一个删除标识
  • 如果该Key存在多条更新记录,那么只需保留最近的一次更新,老的更新全部可以丢弃。

这里的关键问题是:如何判断一个存在多条更新记录的Key,其哪个更新记录是最新的?

这里就需要用到每个记录的Seq了:

  • 不同level之间,可能存在Key值相同的记录,但是记录的Seq不同。最新的数据存放在较低的level中,其对应的seq也一定比level+1中的记录的seq要大,因此当出现相同Key值的记录时,只需要记录第一条记录,后面的都可以丢弃。
  • level 0中也可能存在Key值相同的数据,但其Seq也不同。数据越新,其对应的Seq越大。且level 0中的记录是按照user_key递增,seq递减的方式存储的,相同user_key对应的记录被聚集在一起按照Seq递减的方式存放的。在更高层的Compaction时,只需要处理第一条出现的user_key相同的记录即可,后面的相同user_key的记录都可以丢弃。
  • 删除记录的操作也会在此时完成,删除数据的记录会被直接丢弃,而不会被写入到更高level的文件。

因此合并后的level +1层的文件中不会存在Key值相同的记录。

LevelDB详解: Compaction

不妨看看真正的Compaction实现过程:

Status DBImpl::DoCompactionWork(CompactionState* compact) {
  const uint64_t start_micros = env_->NowMicros();
  int64_t imm_micros = 0;  // Micros spent doing imm_ compactions

  assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
  assert(compact->builder == NULL);
  assert(compact->outfile == NULL);
  if (snapshots_.empty()) {
    compact->smallest_snapshot = versions_->LastSequence();
  } else {
    compact->smallest_snapshot = snapshots_.oldest()->number_;
  }

  mutex_.Unlock();

  Iterator* input = versions_->MakeInputIterator(compact->compaction);
  input->SeekToFirst();
  Status status;
  ParsedInternalKey ikey;
  std::string current_user_key;
  bool has_current_user_key = false;
  SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
  for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
    // Prioritize immutable compaction work
    if (has_imm_.NoBarrier_Load() != NULL) {
      const uint64_t imm_start = env_->NowMicros();
      mutex_.Lock();
      if (imm_ != NULL) {
        CompactMemTable();
        bg_cv_.SignalAll();  // Wakeup MakeRoomForWrite() if necessary
      }
      mutex_.Unlock();
      imm_micros += (env_->NowMicros() - imm_start);
    }

    Slice key = input->key();
    if (compact->compaction->ShouldStopBefore(key) &&
        compact->builder != NULL) {
      status = FinishCompactionOutputFile(compact, input);
      if (!status.ok()) {
        break;
      }
    }
    ......
}

这里的流程很简单:逐个判断参与Compaction的所有SSTable的K/V记录,判断是否是垃圾记录,如果是,丢弃,否则将该记录写入到L+1层的新的SSTable文件中。

这里的核心是如何判断当前的K/V记录是否有效?问题的复杂性在于这些SSTable之间可能存在Key的重复,如果按照SSTable顺序遍历,无法做到一次性判断某个K/V记录是否有效,如下图:

LevelDB详解: Compaction

如果顺序遍历SSTable,那么在读取到Key = ‘hello’这条记录时完全无法判断它到底是不是有效记录,因为我不知道尚未读取的SSTable文件中是否还包含Key相同的记录,怎么办???

解决该问题的奥秘就在上面的VersionSet::MakeInputIterator函数内:

Iterator* VersionSet::MakeInputIterator(Compaction* c) {
  ReadOptions options;
  options.verify_checksums = options_->paranoid_checks;
  options.fill_cache = false;

  const int space = (c->level() == 0 ? c->inputs_[0].size() + 1 : 2);
  Iterator** list = new Iterator*[space];
  int num = 0;
  for (int which = 0; which < 2; which++) {
    if (!c->inputs_[which].empty()) {
      if (c->level() + which == 0) {
        const std::vector<FileMetaData*>& files = c->inputs_[which];
        for (size_t i = 0; i < files.size(); i++) {
          list[num++] = table_cache_->NewIterator(
              options, files[i]->number, files[i]->file_size);
        }
      } else {
        // Create concatenating iterator for the files from this level
        list[num++] = NewTwoLevelIterator(
            new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]),
            &GetFileIterator, table_cache_, options);
      }
    }
  }
  assert(num <= space);
  Iterator* result = NewMergingIterator(&icmp_, list, num);
  delete[] list;
  return result;
}

这里构造Iterator,输入是L和L+1层的SSTable文件。最终返回的是NewMergingIterator,这个MergingIterator其实也是对原始的多个SSTable的Iterator再次封装,我们不妨看一下它的一些API的大概实现:

virtual void SeekToFirst() {
    for (int i = 0; i < n_; i++) {
      children_[i].SeekToFirst();
    }
    FindSmallest();
    direction_ = kForward;
}

virtual void Next() {
    assert(Valid());

    if (direction_ != kForward) {
      for (int i = 0; i < n_; i++) {
        IteratorWrapper* child = &children_[i];
        if (child != current_) {
          child->Seek(key());
          if (child->Valid() &&
              comparator_->Compare(key(), child->key()) == 0) {
            child->Next();
          }
        }
      }
      direction_ = kForward;
    }

    current_->Next();
    FindSmallest();
}

看了上面的实现,我们就很清楚了,原来这个Iterator封装了多个孩子Iterator,而每个孩子Iterator则是用来遍历每个SSTable的K/V记录。

而MergingIterator对所有的孩子Iterator按照key的字母序进行了一个merge排序,每次Next()都会选择到所有孩子Iterator(SSTable)的Key最小的记录。

有了这个MergingIterator,上层的使用者Compaction就可以放心大胆地进行判断了:因为每次选择出的记录都是所有参与Compaction的SSTable中Key最小的,因此重复的Key记录一定会被连续的读取上来,进而很容易判断哪些是无效记录,便可以被丢弃。效果类似于下面:

LevelDB详解: Compaction