SnapshotExecutor #
braft 抽象了一个 SnapshotExecutor 封装了 snaoshot 相关逻辑
- init 初始化 snapshot,集群启动时
- do_snapshot 创建 snapshot
- install_snapshot 实现了 raft 算法的要求
// Executing Snapshot related stuff
class BAIDU_CACHELINE_ALIGNMENT SnapshotExecutor {
DISALLOW_COPY_AND_ASSIGN(SnapshotExecutor);
public:
SnapshotExecutor();
~SnapshotExecutor();
int init(const SnapshotExecutorOptions& options);
// Return the owner NodeImpl
NodeImpl* node() const { return _node; }
// Start to snapshot StateMachine, and |done| is called after the execution
// finishes or fails.
void do_snapshot(Closure* done);
// Install snapshot according to the very RPC from leader
// After the installing succeeds (StateMachine is reset with the snapshot)
// or fails, done will be called to respond
//
// Errors:
// - Term dismatches: which happens interrupt_downloading_snapshot was
// called before install_snapshot, indicating that this RPC was issued by
// the old leader.
// - Interrupted: happens when interrupt_downloading_snapshot is called or
// a new RPC with the same or newer snapshot arrives
// - Busy: the state machine is saving or loading snapshot
void install_snapshot(brpc::Controller* controller,
const InstallSnapshotRequest* request,
InstallSnapshotResponse* response,
google::protobuf::Closure* done);
private:
friend class SaveSnapshotDone;
friend class FirstSnapshotLoadDone;
friend class InstallSnapshotDone;
void on_snapshot_load_done(const butil::Status& st);
int on_snapshot_save_done(const butil::Status& st,
const SnapshotMeta& meta,
SnapshotWriter* writer);
struct DownloadingSnapshot {
const InstallSnapshotRequest* request;
InstallSnapshotResponse* response;
brpc::Controller* cntl;
google::protobuf::Closure* done;
};
int register_downloading_snapshot(DownloadingSnapshot* ds);
int parse_install_snapshot_request(
const InstallSnapshotRequest* request,
SnapshotMeta* meta);
void load_downloading_snapshot(DownloadingSnapshot* ds,
const SnapshotMeta& meta);
void report_error(int error_code, const char* fmt, ...);
raft_mutex_t _mutex;
int64_t _last_snapshot_term;
int64_t _last_snapshot_index;
int64_t _term;
bool _saving_snapshot;
bool _loading_snapshot;
bool _stopped;
bool _usercode_in_pthread;
SnapshotStorage* _snapshot_storage;
SnapshotCopier* _cur_copier;
FSMCaller* _fsm_caller;
NodeImpl* _node;
LogManager* _log_manager;
// The ownership of _downloading_snapshot is a little messy:
// - Before we start to replace the FSM with the downloaded one. The
// ownership belongs with the downloding thread
// - After we push the load task to FSMCaller, the ownership belongs to the
// closure which is called after the Snapshot replaces FSM
butil::atomic<DownloadingSnapshot*> _downloading_snapshot;
SnapshotMeta _loading_snapshot_meta;
bthread::CountdownEvent _running_jobs;
scoped_refptr<SnapshotThrottle> _snapshot_throttle;
};
初始化 snapshot #
集群启动时,需要从持久化中加载 snapshot 初始化集群,主要加载 snapshot meta 部分
- last_index
- last_term
- 成员配置
// vim braft/snapshot_executor.cpp +340
int SnapshotExecutor::init(const SnapshotExecutorOptions& options) {
//...
SnapshotReader* reader = _snapshot_storage->open();
if (reader == NULL) {
return 0;
}
if (reader->load_meta(&_loading_snapshot_meta) != 0) {
LOG(ERROR) << "Fail to load meta from `" << options.uri << "'";
_snapshot_storage->close(reader);
return -1;
}
_loading_snapshot = true;
_running_jobs.add_count(1);
// Load snapshot ater startup
FirstSnapshotLoadDone done(this, reader);
CHECK_EQ(0, _fsm_caller->on_snapshot_load(&done));
done.wait_for_run();
}
通过调用由用户提供的 SnapshotReader 读取 meta,然后 FirstSnapshotLoadDone 执行初始化
// vim braft/snapshot_executor.cpp +247
void SnapshotExecutor::on_snapshot_load_done(const butil::Status& st) {
std::unique_lock<raft_mutex_t> lck(_mutex);
CHECK(_loading_snapshot);
DownloadingSnapshot* m = _downloading_snapshot.load(butil::memory_order_relaxed);
if (st.ok()) {
_last_snapshot_index = _loading_snapshot_meta.last_included_index();
_last_snapshot_term = _loading_snapshot_meta.last_included_term();
_log_manager->set_snapshot(&_loading_snapshot_meta);
}
std::stringstream ss;
if (_node) {
ss << "node " << _node->node_id() << ' ';
}
ss << "snapshot_load_done, "
<< _loading_snapshot_meta.ShortDebugString();
LOG(INFO) << ss.str();
lck.unlock();
if (_node) {
// FIXME: race with set_peer, not sure if this is fine
_node->update_configuration_after_installing_snapshot();
}
lck.lock();
_loading_snapshot = false;
_downloading_snapshot.store(NULL, butil::memory_order_release);
lck.unlock();
if (m) {
// Respond RPC
if (!st.ok()) {
m->cntl->SetFailed(st.error_code(), "%s", st.error_cstr());
} else {
m->response->set_success(true);
}
m->done->Run();
delete m;
}
_running_jobs.signal();
}
首先初始化 _last_snapshot_index 和 _last_snapshot_term,然后由 _log_manager
初始化成员配置。
// vim braft/log_manager.cpp +622
void LogManager::set_snapshot(const SnapshotMeta* meta) {
BRAFT_VLOG << "Set snapshot last_included_index="
<< meta->last_included_index()
<< " last_included_term=" << meta->last_included_term();
std::unique_lock<raft_mutex_t> lck(_mutex);
if (meta->last_included_index() <= _last_snapshot_id.index) {
return;
}
Configuration conf;
for (int i = 0; i < meta->peers_size(); ++i) {
conf.add_peer(meta->peers(i));
}
Configuration old_conf;
for (int i = 0; i < meta->old_peers_size(); ++i) {
old_conf.add_peer(meta->old_peers(i));
}
ConfigurationEntry entry;
entry.id = LogId(meta->last_included_index(), meta->last_included_term());
entry.conf = conf;
entry.old_conf = old_conf;
_config_manager->set_snapshot(entry);
int64_t term = unsafe_get_term(meta->last_included_index());
const LogId last_but_one_snapshot_id = _last_snapshot_id;
_last_snapshot_id.index = meta->last_included_index();
_last_snapshot_id.term = meta->last_included_term();
if (_last_snapshot_id > _applied_id) {
_applied_id = _last_snapshot_id;
}
// NOTICE: not to update disk_id here as we are not sure if this node really
// has these logs on disk storage. Just leave disk_id as it was, which can keep
// these logs in memory all the time until they are flushed to disk. By this
// way we can avoid some corner cases which failed to get logs.
if (term == 0) {
// last_included_index is larger than last_index
// FIXME: what if last_included_index is less than first_index?
_virtual_first_log_id = _last_snapshot_id;
truncate_prefix(meta->last_included_index() + 1, lck);
return;
} else if (term == meta->last_included_term()) {
// Truncating log to the index of the last snapshot.
// We don't truncate log before the latest snapshot immediately since
// some log around last_snapshot_index is probably needed by some
// followers
if (last_but_one_snapshot_id.index > 0) {
// We have last snapshot index
_virtual_first_log_id = last_but_one_snapshot_id;
truncate_prefix(last_but_one_snapshot_id.index + 1, lck);
}
return;
} else {
// TODO: check the result of reset.
_virtual_first_log_id = _last_snapshot_id;
reset(meta->last_included_index() + 1, lck);
return;
}
CHECK(false) << "Cannot reach here";
}
初始化的成员配置由 node 初始化作为当前集群的配置启动
// vim braft/node.cpp +2637
void NodeImpl::update_configuration_after_installing_snapshot() {
BAIDU_SCOPED_LOCK(_mutex);
_log_manager->check_and_set_configuration(&_conf);
}
// vim braft/log_manager.cpp +786
bool LogManager::check_and_set_configuration(ConfigurationEntry* current) {
if (current == NULL) {
CHECK(false) << "current should not be NULL";
return false;
}
BAIDU_SCOPED_LOCK(_mutex);
const ConfigurationEntry& last_conf = _config_manager->last_configuration();
if (current->id != last_conf.id) {
*current = last_conf;
return true;
}
return false;
}
创建 snapshot #
do_snapshot
├── NodeImpl::handle_snapshot_timeout [vim src/braft/node.cpp +342]
│ └── SnapshotTimer::run [vim src/braft/node.cpp +3648]
├── NodeImpl::bootstrap [vim src/braft/node.cpp +386]
│ └── bootstrap [vim src/braft/raft.cpp +324]
├── NodeImpl::snapshot [vim src/braft/node.cpp +938]
│ ├── snapshot [vim tools/braft_cli.cpp +140]
│ ├── run_command [vim tools/braft_cli.cpp +184]
│ ├── CliService::CallMethod [vim build/braft/cli.pb.cc +4341]
│ ├── CliService_Stub::add_peer [vim build/braft/cli.pb.cc +4471]
│ ├── CliServiceImpl::snapshot [vim src/braft/cli_service.cpp +176]
│ ├── snapshot [vim src/braft/cli.cpp +176]
│ └── Node::snapshot [vim src/braft/raft.cpp +205]
└── NodeImpl::do_snapshot [vim src/braft/node.cpp +942]
├── NodeImpl::handle_snapshot_timeout [vim src/braft/node.cpp +342]
├── NodeImpl::bootstrap [vim src/braft/node.cpp +386]
└── NodeImpl::snapshot [vim src/braft/node.cpp +938]
触发 snapshot 的点
- SnapshotTimer::run 定时器定期触发
- NodeImpl::bootstrap 启动时,如果
options.last_log_index > 0
,可以看作给 braft 一个 hint,指示做一次 snapshot - NodeImpl::snapshot 实现了 rpc service,由 cli 手动触发