Skip to main content
  1. Posts/

mmap-sync: cloudflare 对于低延迟 IPC 的解决方案

·8 mins·
Tech Enthusiast running out of Coffee.
Table of Contents

背景
#

Cloudflare的机器学习检测在保障网络安全和完整性方面起着关键作用,每秒处理超过4600万个HTTP请求,峰值时可达到超过6300万个请求。机器学习模型推理是系统延迟的一个组成部分,需要从请求中提取和准备特征,并通过CatBoost模型生成检测结果。

原先的设计使用Gagarin作为Go语言的特征服务平台,通过Unix域套接字提供机器学习特征。然而,随着特征数量和请求量的增加,缓存命中率下降,导致延迟增加。在高峰时段,延迟进一步恶化,导致性能不佳、资源利用率低下以及机器学习特征的可用性下降。

Fig1

为了改进系统性能,他们探索了多种解决方案,包括优化现有的Gagarin平台、考虑使用Quicksilver、增加多层缓存、对Unix套接字进行分片,以及切换到RPC等。

但尝试的这些方案都面临一些问题

  • 高尾延迟:系统在高峰期遇到了尾延迟增加的问题,这是由于Unix套接字上的CPU争用和Lua垃圾收集器引起的。

  • 资源利用率不佳:系统的CPU和内存利用率没有被充分优化,导致在服务器上运行其他服务时剩余资源有限。

  • 机器学习特征的可用性下降:由于memcached超时,机器学习特征的可用性下降,从而增加了一部分请求的误报或漏报的可能性。

  • 可扩展性限制:随着添加更多的机器学习特征,系统接近了可扩展性的极限。

采用基于第一性原则的设计方法,他们提出了一个问题:“操作系统提供的最有效的低级数据传输方法是什么?” 他们使用 ipc-bench 评估了 Linux 生态系统中各种 IPC 通信的性能基准,测量方式是通过在两个进程之间发送 ping-pong 消息

IPC method Avg duration, μs Avg throughput, msg/s
eventfd (bi-directional) 9.456 105,533
TCP sockets 8.74 114,143
Unix domain sockets 5.609 177,573
FIFOs (named pipes) 5.432 183,388
Pipe 4.733 210,369
Message Queue 4.396 226,421
Unix Signals 2.45 404,844
Shared Memory 0.598 1,616,014
Memory-Mapped Files 0.503 1,908,613

根据评估,他们发现Unix套接字虽然可以处理同步问题,但不是最快的IPC方法。最快的IPC机制是共享内存和内存映射文件。这两种方法具有类似的性能,前者使用特定的tmpfs卷在/dev/shm中,利用专门的系统调用,而后者可以存储在任何卷中,包括tmpfs或HDD/SDD。

mmap 并没有提供像 unix 套接字那样提供数据同步功能,因此面临几个关键问题

  • 特征获取和更新:需要有效地从文件中获取和更新数百个浮点特征数组,这在处理文件数据时是一个挑战。
  • 数据同步和并发更新:由于多个进程同时访问数据,他们需要确保安全、并发且频繁的更新操作。
  • CPU争用:需要解决之前在Unix套接字上遇到的CPU争用问题,以改善性能。
  • 可扩展性和未来增加特征的支持:需要确保系统能够有效地支持将来更多的维度和特征的添加,以满足不断增长的需求。

根据以上问题,对 mmap 做了一些增强

最终 cloudflare 开发出了 mmap-sync 这个库,并将其开源。

使用 mmap-sync 之后,整个系统架构进行了重构

Fig 3.

  • bliss 使用 rust 实现,作为 sidecar 运行,为大数据量和高 I/O 操作进行批处理设计,使用 tokio
  • bliss lib 使用 mmap-sync 获取机器学习特征数据,还进行了一些增强
    • 预分配数据结构,整个运行时无 alloc
    • SIMD 优化
    • 编译器参数调优
      [profile.release]
      codegen-units = 1
      debug = true
      lto = "fat"
      opt-level = 3
      

优化之后的数据对比

Fig 4.

机器学习特征提取延迟减少了几个数量级

Latency metric Before (μs) After (μs) Change
p50 532 9 -98.30% or x59
p99 9510 18 -99.81% or x528
p999 16000 29 -99.82% or x551

left-right 基础
#

  • left-right 提出了一种新颖的并发控制机制
  • 单个 writer 多个 reader
    • reader 是 wait- free 的, 且不需要 gc
    • reader 可以任意数量扩展,对并发读友好
    • writer 需要 lock

结构
#

  • left-right 要求保留两个数据副本, 称为 left_instance 和 right_instance.
  • 读和写在不同的 instance 上执行, 并使用 version 定序.
    • version 取值只有 0 和 1.
    • left-right 使用一个二维数组 read_indicator[][] 为每个 reader 记录 version.
              +----------------+
              |   left_right   |
              +----------------+
                       |
        left           |           right
          +------------+-------------+
          |                          |
          v                          v
 +-----------------+        +-----------------+
 |  left_instance  |        | right_instance  |
 +-----------------+        +-----------------+


+----------------------+----------------------+
| read_indicator[0][0] | read_indicator[1][0] |------->  thread_id = 0
+----------------------+----------------------+
| read_indicator[0][1] | read_indicator[1][1] |
+----------------------+----------------------+
| read_indicator[0][2] | read_indicator[1][2] |
+----------------------+----------------------+
| read_indicator[0][3] | read_indicator[1][3] |
+----------------------+----------------------+
            ^                      ^
          0 |                      | 1
            +----------+-----------+
                       |
                       |
              +-----------------+
              |  version_index  |
              +-----------------+

算法
#

reader 通过 containts() 读取

  • 读是 wait-free 的
  • 首先读取 versionIndex 的值,versionIndex 是一个原子变量,取值为 0 或 1
  • arrive() 设置当前 reader 使用 versionIndex
  • 根据 versionIndex 的指示读取 left 或者 right 实例
  • depart() 设置 reader 离开 versionIndex

Algorithm 1
arrive() 设置 version, depart
Algorithm 2

writer 通过 modify() 修改

  • 写需要 lock 保护,但是这个 lock 不一定是 mutex,可以是其他实现,这意味着 writer 可以进一步优化为 lock-free 或者 wait-free 的
  • writer 首先读取 left_right 指向的实例,并左右切换:如果是 left_instance,则修改 right_instance
  • 发布 versionIndex 变量,如果被 reader 持有,需要等待

Algorithm 3

算法可以证明对于任意数量的线程总是 wait-free 的

  • 算法1 指出读取操作没有循环等待,总是在常熟步骤完成,因此可以保证读是 wait-free 的
  • 对于写操作有一个循环,writer 需要等待上一个 reader 的进度,但由于 reader 是 wait-free 的,所以 reader 不会饿死 writer
  • 对于写操作,还需要一个 lock,这意味着写-写可能会饥饿,但这不在 left-right 的保证之内,同时如果 writer 的 lock 是 wait-free 的,则意味着 writer 在整体是 wait-free 的

left-right 内存顺序是可线性化的, 通过以上算法, 直觉上也很容易推理. 论文将单个 writer 多个 reader 建模为一个全状态的状态机进行的证明》

图2

实现
#

结构
#

  • Synchronizer 是 mmap-sync 的接口,提供 mmap 读写,内部由 StateContainerDataCotainer 组成

    • StateContainer 包含一个 mmap 文件,记录 State 运行时会解释为 State
      • State 布局遵循 left-right 的结构,包括一个 version 原子变量和一组 left-right reader
      • reader 并没有使用线程 id,使用原子变量表示多少个 reader,读取是递增,读取结束递减
    • DataContainer 遵循 left-right ,包括 version 和 left-right 两个 mmap read-only实例文件
  • InstanceVersion 就是 left-right 的 versionIndex, 不过编码上做了一些改动: 64-bit 无符号整数表示,idx (0 OR 1) | data size (<549GB) | checksum

    /// `InstanceVersion` represents data instance and consists of the following components:
    /// - data idx (0 or 1)   - 1 bit
    /// - data size (<549 GB) - 39 bits
    /// - data checksum       - 24 bits
    #[derive(Clone, Copy, Debug, PartialEq)]
    pub(crate) struct InstanceVersion(u64);
    
    const DATA_SIZE_BITS: usize = 39;
    const DATA_CHECKSUM_BITS: usize = 24;
    
classDiagram class State { version: AtomicU64 idx_readers: [AtomicU32, 2] } class StateContainer { base_path: String mmap: Option~MampMut~ state() ResultResult<&mut State, SynchronizerError> } class DataContainer { base_path: String verion: Option~InstanceVersion~ idx_mmaps: [Option~Mamp~;2] } class InstanceVersion { 0: u64 } class Synchronizer { state_container: StateContainer data_container: DataContainer } class ReadGuard~'a~ { state: &'a mut State version: InstanceVersion } DataContainer o-- InstanceVersion StateContainer <.. State Synchronizer o-- StateContainer Synchronizer o-- DataContainer ReadGuard o-- State ReadGuard o-- InstanceVersion

write
#

写入过程遵循 left-right 算法, 略微不同的是增加了 writer 等待 reader 释放版本时的 timeout 限制

  • 序列化数据
  • 获取或新建 state
  • 获取下一个版本
  • 在新版本的实例上写入数据,然后切换版本
// vim src/synchronizer.rs + 73

/// Writes a given `entity` into the next available data file.
///
/// Returns the number of bytes written to the data file and a boolean flag, for diagnostic
/// purposes, indicating whether the reader counter was reset due to a reader exiting without
/// decrementing it.
///
/// # Parameters
/// - `entity`: The entity to be written to the data file.
/// - `grace_duration`: The maximum period to wait for readers to finish before resetting the
///                     reader count to 0. This handles scenarios where a reader process has
///                     crashed or exited abnormally, failing to decrement the reader count.
///                     After the `grace_duration` has elapsed, if there are still active
///                     readers, the reader count is reset to 0 to restore synchronization state.
///
/// # Returns
/// A result containing a tuple of the number of bytes written and a boolean indicating whether
/// the reader count was reset, or a `SynchronizerError` if the operation fails.
pub fn write<T>(
    &mut self,
    entity: &T,
    grace_duration: Duration,
) -> Result<(usize, bool), SynchronizerError>
where
    T: Serialize<DefaultSerializer>,
    T::Archived: for<'b> CheckBytes<DefaultValidator<'b>>,
{
    // serialize given entity into bytes
    let mut serializer = DefaultSerializer::default();
    let _ = serializer
        .serialize_value(entity)
        .map_err(|_| FailedEntityWrite)?;
    let data = serializer.into_serializer().into_inner();

    // ensure that serialized bytes can be deserialized back to `T` struct successfully
    check_archived_root::<T>(&data).map_err(|_| FailedEntityRead)?;

    // fetch current state from mapped memory
    let state = self.state_container.state(true)?;

    // calculate data checksum
    let mut hasher = SeaHasher::new();
    hasher.write(&data);
    let checksum = hasher.finish();

    // acquire next available data file idx and write data to it
    let (new_idx, reset) = state.acquire_next_idx(grace_duration);
    let new_version = InstanceVersion::new(new_idx, data.len(), checksum)?;
    let size = self.data_container.write(&data, new_version)?;

    // switch readers to new version
    state.switch_version(new_version);

    Ok((size, reset))
}

State 保存在 mmap 文件中,调用 state() 方法将其解释为 State,如果文件不存在则初始化

// vim src/state.rs +117

    /// Fetch state from existing memory mapped file or create new one
    #[inline]
    pub(crate) fn state(&mut self, create: bool) -> Result<&mut State, SynchronizerError> {
        if self.mmap.is_none() {
            let state_file = OpenOptions::new()
                .read(true)
                .write(true)
                .create(create)
                .mode(0o660) // set file mode to allow read/write from owner/group only
                .open(&self.state_path)
                .map_err(FailedStateRead)?;

            let mut need_init = false;
            // Reset state file size to match exactly `STATE_SIZE`
            if state_file.metadata().map_err(FailedStateRead)?.len() as usize != STATE_SIZE {
                state_file
                    .set_len(STATE_SIZE as u64)
                    .map_err(FailedStateRead)?;
                need_init = true;
            }

            let mut mmap = unsafe { MmapMut::map_mut(&state_file).map_err(FailedStateRead)? };
            if need_init {
                // Create new state and write it to mapped memory
                let new_state = State::default();
                unsafe {
                    mmap.as_mut_ptr()
                        .copy_from((&new_state as *const State) as *const u8, STATE_SIZE)
                };
            };

            self.mmap = Some(mmap);
        }
        Ok(unsafe { &mut *(self.mmap.as_ref().unwrap().as_ptr() as *mut State) })
    }

acquire_next_id 获取下一个要写入的版本 next_idx, 然后等待该版本上的所有 reader 释放, 如果超时则退出

// vim src/state.rs +45

/// Acquire next `idx` of the state for writing
#[inline]
pub(crate) fn acquire_next_idx(&self, grace_duration: Duration) -> (usize, bool) {
    // calculate `next_idx` to acquire, in case of uninitialized version use 0
    let next_idx = match InstanceVersion::try_from(self.version.load(Ordering::SeqCst)) {
        Ok(version) => (version.idx() + 1) % 2,
        Err(_) => 0,
    };

    // check number of readers using `next_idx`
    let num_readers = &self.idx_readers[next_idx];

    // wait until either no more readers left for `next_idx` or grace period has expired
    let grace_expiring_at = Instant::now().add(grace_duration);
    let mut reset = false;
    while num_readers.load(Ordering::SeqCst) > 0 {
        // we should reach here only when one of the readers dies without decrement
        if Instant::now().gt(&grace_expiring_at) {
            // reset number of readers after expired grace period
            num_readers.store(0, Ordering::SeqCst);
            reset = true;
            break;
        } else {
            thread::sleep(SLEEP_DURATION);
        }
    }

    (next_idx, reset)
}

data_container write 写入数据,根据 version 选择写入哪个 mmap 文件

// vim src/data.rs +24

/// Write `data` into mapped data file with given `version`
pub(crate) fn write(
    &mut self,
    data: &[u8],
    version: InstanceVersion,
) -> Result<usize, SynchronizerError> {
    let data_file = OpenOptions::new()
        .read(true)
        .write(true)
        .create(true)
        .mode(0o640) // set file mode to allow read/write from owner, read from group only
        .open(version.path(&self.base_path))
        .map_err(FailedDataWrite)?;

    // grow data file when its current length exceeded
    let data_len = data.len() as u64;
    if data_len > data_file.metadata().map_err(FailedDataWrite)?.len() {
        data_file.set_len(data_len).map_err(FailedDataWrite)?;
    }

    // copy data to mapped file and ensure it's been flushed
    let mut mmap = unsafe { MmapMut::map_mut(&data_file).map_err(FailedDataWrite)? };
    mmap[..data.len()].copy_from_slice(data);
    mmap.flush().map_err(FailedDataWrite)?;

    Ok(data.len())
}

read
#

read 遵循 left-right 算法

  • 获取要读取的版本
  • 递增当前版本的 reader 数量
  • 读取数据进行 zero-copy 的序列化
  • 返回 guard,RAII 手法,析构时会递减这个版本的 reader
// vim src/synchronizer.rs +159

/// Reads and returns an `entity` struct from mapped memory wrapped in `ReadGuard`.
///
/// # Safety
///
/// This method is marked as unsafe due to the potential for memory corruption if the returned
/// result is used beyond the `grace_duration` set in the `write` method. The caller must ensure
/// the `ReadGuard` (and any references derived from it) are dropped before this time period
/// elapses to ensure safe operation.
///
/// Additionally, the use of `unsafe` here is related to the internal use of the
/// `rkyv::archived_root` function, which has its own safety considerations. Particularly, it
/// assumes the byte slice provided to it accurately represents an archived object, and that the
/// root of the object is stored at the end of the slice.
pub unsafe fn read<T>(&mut self) -> Result<ReadResult<T>, SynchronizerError>
where
    T: Archive,
    T::Archived: for<'b> CheckBytes<DefaultValidator<'b>>,
{
    // fetch current state from mapped memory
    let state = self.state_container.state(false)?;

    // fetch current version
    let version = state.version()?;

    // create and lock state guard for reading
    let guard = ReadGuard::new(state, version)?;

    // fetch data for current version from mapped memory
    let (data, switched) = self.data_container.data(version)?;

    // fetch entity from data using zero-copy deserialization
    let entity = archived_root::<T>(data);

    Ok(ReadResult::new(guard, entity, switched))
}

version 读取 state 的版本

// vim src/state.rs +33

    /// Return state's current instance version
    #[inline]
    pub(crate) fn version(&self) -> Result<InstanceVersion, SynchronizerError> {
        self.version.load(Ordering::SeqCst).try_into()
    }

ReadGuard 持有当前读取数据的版本,构造时递增 reader 数量,析构时递减版本上的 reader 数量

// vim src/guard.rs +22

/// An RAII implementation of a “scoped read lock” of a `State`
pub(crate) struct ReadGuard<'a> {
    state: &'a mut State,
    version: InstanceVersion,
}

impl<'a> ReadGuard<'a> {
    /// Creates new `ReadGuard` with specified parameters
    pub(crate) fn new(
        state: &'a mut State,
        version: InstanceVersion,
    ) -> Result<Self, SynchronizerError> {
        state.rlock(version);
        Ok(ReadGuard { version, state })
    }
}
// vim src/state.rs +39

    /// Locks given `version` of the state for reading
    #[inline]
    pub(crate) fn rlock(&mut self, version: InstanceVersion) {
        self.idx_readers[version.idx()].fetch_add(1, Ordering::SeqCst);
    }

// vim src/state.rs +75

    /// Unlocks given `version` from reading
    #[inline]
    pub(crate) fn runlock(&mut self, version: InstanceVersion) {
        self.idx_readers[version.idx()].fetch_sub(1, Ordering::SeqCst);
    }

Related

Leveldb 源码分析 05: SSTable
·12 mins
leveldb source-code
Leveldb 源码分析 01: 基本元素
·3 mins
leveldb source-code
Leveldb 源码分析 02: Memtable
·14 mins
leveldb source-code
Leveldb 源码分析 02: 文件抽象
·1 min
leveldb source-code
Leveldb 源码分析 03: WAL
·8 mins
leveldb source-code
Leveldb 源码分析 04: Write 操作
·6 mins
leveldb source-code