Skip to main content
  1. Posts/

braft 执行 snapshot

·5 mins·
源码分析 raft c++
Tech Enthusiast running out of Coffee.
Table of Contents

SnapshotExecutor
#

braft 抽象了一个 SnapshotExecutor 封装了 snaoshot 相关逻辑

  • init 初始化 snapshot,集群启动时
  • do_snapshot 创建 snapshot
  • install_snapshot 实现了 raft 算法的要求

// Executing Snapshot related stuff
class BAIDU_CACHELINE_ALIGNMENT SnapshotExecutor {
    DISALLOW_COPY_AND_ASSIGN(SnapshotExecutor);
public:
    SnapshotExecutor();
    ~SnapshotExecutor();

    int init(const SnapshotExecutorOptions& options);

    // Return the owner NodeImpl
    NodeImpl* node() const { return _node; }

    // Start to snapshot StateMachine, and |done| is called after the execution
    // finishes or fails.
    void do_snapshot(Closure* done);

    // Install snapshot according to the very RPC from leader
    // After the installing succeeds (StateMachine is reset with the snapshot)
    // or fails, done will be called to respond
    // 
    // Errors:
    //  - Term dismatches: which happens interrupt_downloading_snapshot was 
    //    called before install_snapshot, indicating that this RPC was issued by
    //    the old leader.
    //  - Interrupted: happens when interrupt_downloading_snapshot is called or
    //    a new RPC with the same or newer snapshot arrives
    //  - Busy: the state machine is saving or loading snapshot
    void install_snapshot(brpc::Controller* controller,
                          const InstallSnapshotRequest* request,
                          InstallSnapshotResponse* response,
                          google::protobuf::Closure* done);

  
private:
friend class SaveSnapshotDone;
friend class FirstSnapshotLoadDone;
friend class InstallSnapshotDone;

    void on_snapshot_load_done(const butil::Status& st);
    int on_snapshot_save_done(const butil::Status& st,
                              const SnapshotMeta& meta, 
                              SnapshotWriter* writer);

    struct DownloadingSnapshot {
        const InstallSnapshotRequest* request;
        InstallSnapshotResponse* response;
        brpc::Controller* cntl;
        google::protobuf::Closure* done;
    };

    int register_downloading_snapshot(DownloadingSnapshot* ds);
    int parse_install_snapshot_request(
            const InstallSnapshotRequest* request,
            SnapshotMeta* meta);
    void load_downloading_snapshot(DownloadingSnapshot* ds,
                                  const SnapshotMeta& meta);
    void report_error(int error_code, const char* fmt, ...);

    raft_mutex_t _mutex;
    int64_t _last_snapshot_term;
    int64_t _last_snapshot_index;
    int64_t _term;
    bool _saving_snapshot;
    bool _loading_snapshot;
    bool _stopped;
    bool _usercode_in_pthread;
    SnapshotStorage* _snapshot_storage;
    SnapshotCopier* _cur_copier;
    FSMCaller* _fsm_caller;
    NodeImpl* _node;
    LogManager* _log_manager;
    // The ownership of _downloading_snapshot is a little messy:
    // - Before we start to replace the FSM with the downloaded one. The
    //   ownership belongs with the downloding thread
    // - After we push the load task to FSMCaller, the ownership belongs to the
    //   closure which is called after the Snapshot replaces FSM
    butil::atomic<DownloadingSnapshot*> _downloading_snapshot;
    SnapshotMeta _loading_snapshot_meta;
    bthread::CountdownEvent _running_jobs;
    scoped_refptr<SnapshotThrottle> _snapshot_throttle;
};

初始化 snapshot
#

集群启动时,需要从持久化中加载 snapshot 初始化集群,主要加载 snapshot meta 部分

  • last_index
  • last_term
  • 成员配置
// vim braft/snapshot_executor.cpp +340
int SnapshotExecutor::init(const SnapshotExecutorOptions& options) {
    //...
    SnapshotReader* reader = _snapshot_storage->open();
    if (reader == NULL) {
        return 0;
    }
    if (reader->load_meta(&_loading_snapshot_meta) != 0) {
        LOG(ERROR) << "Fail to load meta from `" << options.uri << "'";
        _snapshot_storage->close(reader);
        return -1;
    }
    _loading_snapshot = true;
    _running_jobs.add_count(1);
    // Load snapshot ater startup
    FirstSnapshotLoadDone done(this, reader);
    CHECK_EQ(0, _fsm_caller->on_snapshot_load(&done));
    done.wait_for_run();
}

通过调用由用户提供的 SnapshotReader 读取 meta,然后 FirstSnapshotLoadDone 执行初始化

// vim braft/snapshot_executor.cpp +247
void SnapshotExecutor::on_snapshot_load_done(const butil::Status& st) {
    std::unique_lock<raft_mutex_t> lck(_mutex);

    CHECK(_loading_snapshot);
    DownloadingSnapshot* m = _downloading_snapshot.load(butil::memory_order_relaxed);

    if (st.ok()) {
        _last_snapshot_index = _loading_snapshot_meta.last_included_index();
        _last_snapshot_term = _loading_snapshot_meta.last_included_term();
        _log_manager->set_snapshot(&_loading_snapshot_meta);
    }
    std::stringstream ss;
    if (_node) {
        ss << "node " << _node->node_id() << ' ';
    }
    ss << "snapshot_load_done, "
              << _loading_snapshot_meta.ShortDebugString();
    LOG(INFO) << ss.str();
    lck.unlock();
    if (_node) {
        // FIXME: race with set_peer, not sure if this is fine
        _node->update_configuration_after_installing_snapshot();
    }
    lck.lock();
    _loading_snapshot = false;
    _downloading_snapshot.store(NULL, butil::memory_order_release);
    lck.unlock();
    if (m) {
        // Respond RPC
        if (!st.ok()) {
            m->cntl->SetFailed(st.error_code(), "%s", st.error_cstr());
        } else {
            m->response->set_success(true);
        }
        m->done->Run();
        delete m;
    }
    _running_jobs.signal();
}

首先初始化 _last_snapshot_index 和 _last_snapshot_term,然后由 _log_manager 初始化成员配置。

// vim braft/log_manager.cpp +622
void LogManager::set_snapshot(const SnapshotMeta* meta) {
    BRAFT_VLOG << "Set snapshot last_included_index="
              << meta->last_included_index()
              << " last_included_term=" <<  meta->last_included_term();
    std::unique_lock<raft_mutex_t> lck(_mutex);
    if (meta->last_included_index() <= _last_snapshot_id.index) {
        return;
    }
    Configuration conf;
    for (int i = 0; i < meta->peers_size(); ++i) {
        conf.add_peer(meta->peers(i));
    }
    Configuration old_conf;
    for (int i = 0; i < meta->old_peers_size(); ++i) {
        old_conf.add_peer(meta->old_peers(i));
    }
    ConfigurationEntry entry;
    entry.id = LogId(meta->last_included_index(), meta->last_included_term());
    entry.conf = conf;
    entry.old_conf = old_conf;
    _config_manager->set_snapshot(entry);
    int64_t term = unsafe_get_term(meta->last_included_index());

    const LogId last_but_one_snapshot_id = _last_snapshot_id;
    _last_snapshot_id.index = meta->last_included_index();
    _last_snapshot_id.term = meta->last_included_term();
    if (_last_snapshot_id > _applied_id) {
        _applied_id = _last_snapshot_id;
    }
    // NOTICE: not to update disk_id here as we are not sure if this node really
    // has these logs on disk storage. Just leave disk_id as it was, which can keep
    // these logs in memory all the time until they are flushed to disk. By this 
    // way we can avoid some corner cases which failed to get logs.
    
    if (term == 0) {
        // last_included_index is larger than last_index
        // FIXME: what if last_included_index is less than first_index?
        _virtual_first_log_id = _last_snapshot_id;
        truncate_prefix(meta->last_included_index() + 1, lck);
        return;
    } else if (term == meta->last_included_term()) {
        // Truncating log to the index of the last snapshot.
        // We don't truncate log before the latest snapshot immediately since
        // some log around last_snapshot_index is probably needed by some
        // followers
        if (last_but_one_snapshot_id.index > 0) {
            // We have last snapshot index
            _virtual_first_log_id = last_but_one_snapshot_id;
            truncate_prefix(last_but_one_snapshot_id.index + 1, lck);
        }
        return;
    } else {
        // TODO: check the result of reset.
        _virtual_first_log_id = _last_snapshot_id;
        reset(meta->last_included_index() + 1, lck);
        return;
    }
    CHECK(false) << "Cannot reach here";
}

初始化的成员配置由 node 初始化作为当前集群的配置启动

// vim braft/node.cpp +2637
void NodeImpl::update_configuration_after_installing_snapshot() {
    BAIDU_SCOPED_LOCK(_mutex);
    _log_manager->check_and_set_configuration(&_conf);
}

// vim braft/log_manager.cpp +786
bool LogManager::check_and_set_configuration(ConfigurationEntry* current) {
    if (current == NULL) {
        CHECK(false) << "current should not be NULL";
        return false;
    }
    BAIDU_SCOPED_LOCK(_mutex);

    const ConfigurationEntry& last_conf = _config_manager->last_configuration();
    if (current->id != last_conf.id) {
        *current = last_conf;
        return true;
    }
    return false;
}

创建 snapshot
#

  do_snapshot
  ├── NodeImpl::handle_snapshot_timeout [vim src/braft/node.cpp +342]
  │   └── SnapshotTimer::run    [vim src/braft/node.cpp +3648]
  ├── NodeImpl::bootstrap       [vim src/braft/node.cpp +386]
  │   └── bootstrap     [vim src/braft/raft.cpp +324]
  ├── NodeImpl::snapshot        [vim src/braft/node.cpp +938]
  │   ├── snapshot      [vim tools/braft_cli.cpp +140]
  │   ├── run_command   [vim tools/braft_cli.cpp +184]
  │   ├── CliService::CallMethod        [vim build/braft/cli.pb.cc +4341]
  │   ├── CliService_Stub::add_peer     [vim build/braft/cli.pb.cc +4471]
  │   ├── CliServiceImpl::snapshot      [vim src/braft/cli_service.cpp +176]
  │   ├── snapshot      [vim src/braft/cli.cpp +176]
  │   └── Node::snapshot        [vim src/braft/raft.cpp +205]
  └── NodeImpl::do_snapshot     [vim src/braft/node.cpp +942]
      ├── NodeImpl::handle_snapshot_timeout     [vim src/braft/node.cpp +342]
      ├── NodeImpl::bootstrap   [vim src/braft/node.cpp +386]
      └── NodeImpl::snapshot    [vim src/braft/node.cpp +938]

触发 snapshot 的点

  • SnapshotTimer::run 定时器定期触发
  • NodeImpl::bootstrap 启动时,如果 options.last_log_index > 0,可以看作给 braft 一个 hint,指示做一次 snapshot
  • NodeImpl::snapshot 实现了 rpc service,由 cli 手动触发

Related

论文阅读: Analysis of Six Distributed File Systems
·5 mins
论文 分布式系统 分布式存储
论文阅读: Finding a needle in Haystack: Facebook’s photo storage
·2 mins
论文 分布式系统 分布式存储
使用 Perf 发现程序执行热点
·8 mins
性能优化 Perf
论文阅读: The Hadoop Distributed File System
·4 mins
论文 分布式系统 分布式存储
Handling Sparse Files on Linux
·3 mins
Linux 存储