Skip to main content
  1. Posts/

Leveldb 源码分析 04: Write 操作

·6 mins·
leveldb source-code
Tech Enthusiast running out of Coffee.
Table of Contents

leveldb 归一化写接口, 统一由 DBimpl::Write 处理,写操作关键点

  • 写操作提供的接口是线程安全的,内部使用 mutex 保护
  • 允许多个线程并发写操作,但只有一个写操作进行,该写操作可以合并其他线程的写操作
  • 提供 WriteBatch 对象来支持批量写
  • 写操作会根据条件调度 compaction

WriteBatch 对象
#

WriteBatch 表示一组 key/value,格式为

// vim db/write_batch.cc +5

// WriteBatch::rep_ :=
//    sequence: fixed64
//    count: fixed32
//    data: record[count]
// record :=
//    kTypeValue varstring varstring         |
//    kTypeDeletion varstring
// varstring :=
//    len: varint32
//    data: uint8[len]

Put

  • 记录数据对的数量
  • 记录数据操作类型
  • 编码 key 和 value
// vim db/write_batch.cc +96

void WriteBatch::Put(const Slice& key, const Slice& value) {
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeValue));
  PutLengthPrefixedSlice(&rep_, key);
  PutLengthPrefixedSlice(&rep_, value);
}

Delete

  • 删除作为一个记录
  • 只需要编码 key
// vim db/write_batch.cc +105

void WriteBatch::Delete(const Slice& key) {
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeDeletion));
  PutLengthPrefixedSlice(&rep_, key);
}

写入流程
#

step1. 获取 mutex 同时将 writer 加入队列,如果没有被其他线程合并,或者 writer 不是队列第一个元素则等待,如果被唤醒并且其他线程合并了写操作,则返回写入状态。

// vim db/db_impl.cc +1200
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
  Writer w(&mutex_);
  w.batch = updates;
  w.sync = options.sync;
  w.done = false;


  MutexLock l(&mutex_);
  writers_.push_back(&w);
  while (!w.done && &w != writers_.front()) {
    w.cv.Wait();
  }
  if (w.done) {
    return w.status;
  }

step2. MakeRoomForWrite 是写操作的核心方法,该方法会进行

  • 写入速率限制,因为 level-0 如果 sstable 文件太多会影响查询,因此如果 level-0 文件达到 kL0_SlowdownWritesTrigger 限制,写入会放缓 。

  • 如果 memtable 到达限制,则会调度 minor compaction,如果正在 compaction 合并,会停止写入,需要等待合并结束

  • 如果 level-0 文件太多,超过 kL0_StopWritesTrigger (默认12) 会停止写入等待 major 合并结束(major 在 level-0 文件为4时就进行了)

step4. 写入

  • 分配全局唯一的 LSN
  • BuildBatchGroup 合并写队列中的写操作,当 BuildBatchGroup 返回时,last_writer 之前的 WriteBatch 对象都被合并了
  • 释放锁,其他线程这个时候可以进入 Write 了,因为锁保护的是 SN 和写队列
  • 写 wal
  • 写 memtable
// vim db/db_impl.cc +1215

  // May temporarily unlock and wait.
  Status status = MakeRoomForWrite(updates == nullptr);
  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
    WriteBatch* write_batch = BuildBatchGroup(&last_writer);
    WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(write_batch);


    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
    // into mem_.
    {
      mutex_.Unlock();
      status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
      bool sync_error = false;
      if (status.ok() && options.sync) {
        status = logfile_->Sync();
        if (!status.ok()) {
          sync_error = true;
        }
      }
      if (status.ok()) {
        status = WriteBatchInternal::InsertInto(write_batch, mem_);
      }
      mutex_.Lock();
      if (sync_error) {
        // The state of the log file is indeterminate: the log record we
        // just added may or may not show up when the DB is re-opened.
        // So we force the DB into a mode where all future writes fail.
        RecordBackgroundError(status);
      }
    }
    if (write_batch == tmp_batch_) tmp_batch_->Clear();


    versions_->SetLastSequence(last_sequence);
  }

step5. 唤醒队列中等待的 writer

// vim db/db_impl.cc +1254
  while (true) {
    Writer* ready = writers_.front();
    writers_.pop_front();
    if (ready != &w) {
      ready->status = status;
      ready->done = true;
      ready->cv.Signal();
    }
    if (ready == last_writer) break;
  }

step7. 唤醒写队列中第一个 writer 继续执行执行写入

// vim db/db_impl.cc +1265
  // Notify new head of write queue
  if (!writers_.empty()) {
    writers_.front()->cv.Signal();
  }


  return status;

MakeRoomForWrite
#

  • 控制写入速度
  • 触发 compaction

正常的写入(force = fasle)允许限制写入速度

// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force) {
  mutex_.AssertHeld();
  assert(!writers_.empty());
  bool allow_delay = !force;
  Status s;

减少写入速度

  while (true) {
    if (!bg_error_.ok()) {
      // Yield previous error
      s = bg_error_;
      break;
    } else if (allow_delay && versions_->NumLevelFiles(0) >=
                                  config::kL0_SlowdownWritesTrigger) {
      // We are getting close to hitting a hard limit on the number of
      // L0 files.  Rather than delaying a single write by several
      // seconds when we hit the hard limit, start delaying each
      // individual write by 1ms to reduce latency variance.  Also,
      // this delay hands over some CPU to the compaction thread in
      // case it is sharing the same core as the writer.
      mutex_.Unlock();
      env_->SleepForMicroseconds(1000);
      allow_delay = false;  // Do not delay a single write more than once
      mutex_.Lock();
	} 

不需要触发 minor compaction, 结束

	else if (!force &&
               (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
      // There is room in current memtable
      break;
	}

等待 minor 或者 major compaction 结束

    else if (imm_ != nullptr) {
      // We have filled up the current memtable, but the previous
      // one is still being compacted, so we wait.
      Log(options_.info_log, "Current memtable full; waiting...\n");
      background_work_finished_signal_.Wait();
    } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
      // There are too many level-0 files.
      Log(options_.info_log, "Too many L0 files; waiting...\n");
      background_work_finished_signal_.Wait();
    } 

进行 minor compaction

else {
      // Attempt to switch to a new memtable and trigger compaction of old
      assert(versions_->PrevLogNumber() == 0);
      uint64_t new_log_number = versions_->NewFileNumber();
      WritableFile* lfile = nullptr;
      s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
      if (!s.ok()) {
        // Avoid chewing through file number space in a tight loop.
        versions_->ReuseFileNumber(new_log_number);
        break;
      }


      delete log_;


      s = logfile_->Close();
      if (!s.ok()) {
        // We may have lost some data written to the previous log file.
        // Switch to the new log file anyway, but record as a background
        // error so we do not attempt any more writes.
        //
        // We could perhaps attempt to save the memtable corresponding
        // to log file and suppress the error if that works, but that
        // would add more complexity in a critical code path.
        RecordBackgroundError(s);
      }
      delete logfile_;


      logfile_ = lfile;
      logfile_number_ = new_log_number;
      log_ = new log::Writer(lfile);
      imm_ = mem_;
      has_imm_.store(true, std::memory_order_release);
      mem_ = new MemTable(internal_comparator_);
      mem_->Ref();
      force = false;  // Do not force another compaction if have room
      MaybeScheduleCompaction();
    }
  }
  return s;
}

BuildBatchGroup
#

  • 设置 batch 最大值
    • 默认 max_size = 1MB
    • 如果 WriteBatch 过小(小于 128 KB),就会将 max_size 设置为 size + 128KB,目的是减少小数据量写入的延迟
  • batch 在一起的写操作要么都是 sync 的, 要么是非 sync 的.
    • 所以在 batch 时, 第一个是 sync 或者非 sync 的将决定后续合并哪一类写操作.
    • 如果多个 WriteBatch sync 状态不一致则停止 batch
// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-null batch
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
  mutex_.AssertHeld();
  assert(!writers_.empty());
  Writer* first = writers_.front();
  WriteBatch* result = first->batch;
  assert(result != nullptr);


  size_t size = WriteBatchInternal::ByteSize(first->batch);


  // Allow the group to grow up to a maximum size, but if the
  // original write is small, limit the growth so we do not slow
  // down the small write too much.
  size_t max_size = 1 << 20;
  if (size <= (128 << 10)) {
    max_size = size + (128 << 10);
  }


  *last_writer = first;
  std::deque<Writer*>::iterator iter = writers_.begin();
  ++iter;  // Advance past "first"
  for (; iter != writers_.end(); ++iter) {
    Writer* w = *iter;
    if (w->sync && !first->sync) {
      // Do not include a sync write into a batch handled by a non-sync write.
      break;
    }


    if (w->batch != nullptr) {
      size += WriteBatchInternal::ByteSize(w->batch);
      if (size > max_size) {
        // Do not make batch too big
        break;
      }


      // Append to *result
      if (result == first->batch) {
        // Switch to temporary batch instead of disturbing caller's batch
        result = tmp_batch_;
        assert(WriteBatchInternal::Count(result) == 0);
        WriteBatchInternal::Append(result, first->batch);
      }
      WriteBatchInternal::Append(result, w->batch);
    }
    *last_writer = w;
  }
  return result;
}

Related

Leveldb 源码分析 01: 基本元素
·3 mins
leveldb source-code
Leveldb 源码分析 02: Memtable
·14 mins
leveldb source-code
Leveldb 源码分析 02: 文件抽象
·1 min
leveldb source-code
Leveldb 源码分析 03: WAL
·8 mins
leveldb source-code
Leveldb 源码分析 05: SSTable
·12 mins
leveldb source-code
Leveldb 源码分析 06: Compaction
·17 mins
leveldb 源码分析