乐趣区

关于rust:Datenlord-Rust实现RDMA异步编程二async-Rust-封装-UCX-通信库

UCX 是一个高性能网络通信库,它作为 MPI 所依赖的通信模块之一在高性能计算畛域失去宽泛的应用。UCX 应用 C 语言编写,为了在 Rust 我的项目中应用它,咱们须要将它的 C 接口包装成 Rust 库。在这个过程中咱们充分利用了 Rust 的杀手级个性—— async-await 协程来包装异步 IO 接口,从而极大升高了利用的编程复杂度。去年咱们用 Rust 实现的高性能分布式文件系统 MadFS,底层就应用了咱们本人包装过的 UCX 作为通信模块,它在大规模 RDMA 网络上展现出了良好的性能。UCX 官网在得悉这一音讯后也很开心地宣传了咱们这个我的项目 : )

本文首先会介绍一下 UCX 通信库的性能和编程模型,而后介绍咱们用 async Rust 封装 UCX 的过程,具体代码能够参考 GitHub 仓库:async-ucx。值得注意的是,这里介绍的 IO 模型和封装异步 IO 的办法是通用的,能够实用到其它 IO 库当中。

UCX 通信接口简介

UCX 的全称是 Unified Communication X。正如它名字所展现的,UCX 旨在提供一个对立的形象通信接口,可能适配任何通信设施,并反对各种利用的需要。
下图是 UCX 官网提供的架构图:

能够看到,UCX 整体分为两层:下层的 UCP 接口和底层的 UCT 接口。

底层的 UCT 适配了各种通信设施:从单机的共享内存,到罕用的 TCP Socket,以及数据中心罕用的 RDMA 协定,甚至新兴的 GPU 上的通信,都有很好的反对。

下层的 UCP 则是在 UCT 不同设施的根底上,封装了更形象的通信接口,以不便利用应用。具体来说有以下几类:

  • Active Message:最底层的接口,提供相似 RPC 的语义。每条 Active Message 会触发接收端进行一些操作。
  • RMA / Atomic:是对近程间接内存拜访(RDMA)的形象。通信单方能够间接读写远端的内存,然而须要有额定的内存注册过程。
  • Tag Matching:罕用于高性能计算 MPI 程序中。每条音讯都会附带一个 64 位整数作为 tag,接管方每次能够指定接管哪种 tag 的音讯。
  • Stream:对字节流(TCP)的形象。

一般来说,和底层通信设施模型最匹配的接口具备最高的性能,其它不匹配的接口都会有一次软件转换过程。另一方面,同一种 UCP 接口发送不同大小的音讯可能也会应用不同的 UCT 办法。例如在 RDMA 网络中,因为内存注册也有不小的开销,因而对于小音讯来说,拷贝到预注册好的缓冲区再发送的性能更高。这些策略默认是由 UCX 本人决定的,用户也能够通过设置环境变量的形式手动批改。

在咱们的零碎中,应用了 UCP Tag 接口并基于此实现了轻量级的 RPC。在 RPC 场景下,Tag 能够用于辨别不同上下文的音讯:每个链接单方首先随机生成一个 tag 作为申请的标识,对于每次申请再随机生成一个 tag 作为回复的标识。此外 Tag 接口还反对 IO Vector,行将不间断的多个内存段合并成一个音讯发送。这个个性能够用来将用户提供的数据缓冲区和 RPC 申请打包在一起,肯定水平上防止数据拷贝。

UCX 编程模型简介

UCX 采纳了以异步 IO 为外围的编程模型。其中 UCP 层定义的外围对象有以下四种:

  • Context:全局资源的上下文,治理所有通信设施。个别每个过程创立一个即可。
  • Worker:工作的治理调度核心,以轮询形式执行工作。个别每个线程创立一个,会映射为网卡上的一个队列。
  • Listener:相似 TCP Listener,用来在 worker 之间创立连贯。
  • Endpoint:示意一个曾经建设的连贯。在此之上提供了各种类型的通信接口。它们之间的所属关系如下图所示:

建设连贯

UCX 中单方首先要建设连贯,拿到一个 Endpoint 之后能力进行通信。建设连贯个别要通过 Listener,过程和 TCP 比拟相似:

通信单方 A/B 首先建设各自的 Context 和 Worker,其中一方 A 在 Worker 上创立 Listener 监听连贯申请,Listener 的地址会绑定到本机的一个端口上。用户须要通过某种办法将这个地址传递给另一方 B。B 拿到地址后在 Worker 上发动 connect 操作,此时 A 会收到新连贯申请,它能够抉择承受或回绝。如果承受则须要在 Worker 上 accept 这个申请,将其转换为 Endpoint。之后 B 会收到 A 的回复,connect 操作实现,返回一个 Endpoint。尔后单方就能够通过这对 Endpoint 进行通信了。

内存注册

对于惯例的通信接口,用户能够间接在 Endpoint 上发动申请。但对于 RMA(近程内存拜访)操作,须要被拜访的一方首先在本人的 Context 上注册内存,同时指定拜访权限,取得一个 Mem handle。而后将这个本地 handle 转化为其余节点能够拜访的一串 token,称为 remote key(rkey)。最初想方法把 rkey 传给远端。远端拿着这个 rkey 进行近程内存拜访操作。

异步工作解决(重点)

为了施展最高的性能,整个 UCX 通信接口是全异步的。所谓异步指的是 IO 操作的执行不会阻塞以后线程,一次操作的发动和实现是独立的两个步骤。如此一来 CPU 就能够同时发动很多 IO 申请,并且在它们执行的过程中能够做别的事件。

不过接下来问题来了:程序如何晓得一个异步工作是否实现了?常见的有两种做法:被动轮询,被动告诉。前者还是须要占用 CPU 资源,所以个别都采纳告诉机制。在 C 这种传统过程式语言中,异步实现的告诉个别通过 回调函数(callback)实现:每次发动异步操作时,用户都须要传入一个函数指针作为参数。当工作实现时,后盾的运行时框架会调用这个函数来告诉用户。上面是 UCX 中一个异步接管接口的定义:

ucs_status_ptr_t ucp_tag_recv_nb (
  ucp_worker_h worker,
  void ∗ buffer,
  size_t count,
  ucp_datatype_t datatype,
  ucp_tag_t tag,
  ucp_tag_t tag_mask,
  ucp_tag_recv_callback_t cb  // <-- 回调函数
);

// 回调函数接口的定义
typedef void(∗ ucp_tag_recv_callback_t) (
  void ∗request, 
  ucs_status_t status,        // 执行后果,错误码
  ucp_tag_recv_info_t ∗info   // 更多信息,如收到的音讯长度等
);

这个接口的语义是:发动一个异步 Tag-Matching 接管操作,并立刻返回。当真的收到 tag 匹配的音讯时,UCX 后盾会解决这个音讯,将其放到用户提供的 buffer 中,最初调用用户传入的 callback,告诉用户工作的执行后果。

这里有一个很重要的问题是:下面提到的“后盾解决”到底是什么时候执行的?答案是 UCX 并不会本人创立后盾线程去执行它们, 所有异步工作的后续解决和回调都是在 worker.progress() 函数中,也就是用户被动向 worker 轮询的过程中实现的 。这个函数的语义是:“看看你手头要解决的事件,有哪些是能做的?尽力去推动一下,做完的告诉我。”换句话说,Worker 正在解决的所有工作组成了一个状态机,progress 函数的作用就是用新事件推动整个状态机的演进。前面咱们会看到,对应到 async Rust 世界中,所有异步 IO 工作组成了最根底的 Future,worker 对应 Runtime,而 progress 及其中的回调函数则充当了 Reactor 的角色。

回到传统的 C 语言,在这里异步 IO 的最大难点是编程复杂性:多个并发工作在同一个线程上交替执行,只能通过回调函数来形容下一步做什么, 会使得本来间断的执行逻辑被打散到多个回调函数中 。原本局部变量就能够保护的状态,到这里就须要额定的构造体来在多个回调函数之间传递。随着异步操作数量的减少,代码的保护难度将会迅速回升。上面的伪代码展现了在 UCX 中如何通过异步回调函数来实现最简略的 echo 服务:

// 因为 C 语言语法的限度,这段代码须要从下往上读

// 这里寄存所有须要逾越函数的状态变量
struct CallbackContext {
  ucp_endpoint_h ep;
  void *buf;
} ctx;

void send_cb(void ∗request, ucs_status_t status) {
  //【4】发送结束
  ucp_request_free(request);
  exit(0);
}

void recv_cb(void ∗request, ucs_status_t status, ucp_tag_recv_info_t ∗info) {
  //【3】收到音讯,发动发送申请
  ucp_tag_send_nb(ctx->ep, ctx->buf, info->length, ..., send_cb);
  ucp_request_free(request);
}

int main() {
  // 省略 UCX 初始化局部
  //【0】初始化工作状态
  ctx->ep = ep;
  ctx->buf = malloc(0x1000);
  //【1】发动异步接管申请
  ucp_tag_recv_nb(worker, ctx->buf, 0x1000, ..., recv_cb);
  //【2】一直轮询,驱动后续工作实现
    while(true) {ucp_worker_progress(worker);
  }
}

作为比照,如果 UCX 提供的是同步接口,那么同样的逻辑只须要以下几行就够了:

int main() {
  // 省略 UCX 初始化局部
  void *buf = malloc(0x1000);
  int len;
  ucp_tag_recv(worker, buf, 0x1000, &len, ...);
  ucp_tag_send(ep, buf, len, ...);
  return 0;
}

面对传统异步编程带来的“回调天堂”,支流编程语言通过了十几年的继续摸索,终于必由之路,纷纷引入了管制异步的终极解决方案—— async-await 协程。它的杀手锏就是能让开发者用同步的格调编写异步的逻辑。通过咱们的封装过后,在 Rust 中用 async 协程编写同样的逻辑是长这样的:

async fn main() {
  // 省略 UCX 初始化局部
  let mut buf = vec![0u8; 0x1000];
  let len = worker.tag_recv(&mut buf, ...).await.unwrap();
  ep.tag_send(&buf[..len], ...).await.unwrap();}

上面咱们就来介绍如何用 Rust 的协程机制包装 UCX 异步接口。

Rust 封装 UCX

生成 Rust 接口

用 Rust 包装 C 语言库的第一步是用社区提供的 bindgen 工具,从 C 头文件主动生成 Rust 绑定代码。生成的代码个别间接作为 *-sys 库公布,具体实现能够参考咱们封装的 ucx-sys。接下来咱们要在它的根底上持续封装出高层接口,也就是 async-ucx。

封装 UCX 对象

async-ucx 做的第一件事就是封装 UCX 对象。在 C 语言中对象创立进去后用户会拿到一个 handle,也就是一个指针。用户之后须要本人治理对象的生命周期,在用完后手动开释掉资源。

在 Rust 中咱们须要将 C 的 handle 包装成一个 struct,通过援用计数来主动治理对象的生命周期,在对象的 Drop 函数中开释其资源。上面的代码展现了对 Worker 对象的封装过程:

// 创立 Worker 须要依赖 Context,具体实现代码略过
pub struct Context {handle: ucp_context_h,}

pub struct Worker {
  handle: ucp_worker_h,        // 包装 C handle
  context: Arc<Context>,    // 援用下级对象
}
impl Worker {
  // 从 Context 创立 Worker
  pub fn new(context: &Arc<Context>) -> Rc<Self> {
    // 筹备参数和返回值
    let mut params = MaybeUninit::<ucp_worker_params_t>::uninit();
    unsafe {(*params.as_mut_ptr()).field_mask = 0 };
    let mut handle = MaybeUninit::uninit();
    // 调用 C 函数创建对象,取得 handle
    let status =
    unsafe {ucp_worker_create(context.handle, params.as_ptr(), handle.as_mut_ptr()) };
    assert_eq!(status, ucs_status_t::UCS_OK);
    // 包装为 Rust 对象
    Rc::new(Worker {handle: unsafe { handle.assume_init() },
      context: context.clone(),})
  }
}
// 析构时开释资源
impl Drop for Worker {fn drop(&mut self) {unsafe { ucp_worker_destroy(self.handle) }
  }
}

对象其它接口的包装也是相似的:首先将用户传入的参数转化成 C 接口的模式,而后 unsafe 调用 C 接口,最初将返回后果转化成 Rust 的状态返回给用户。

封装异步操作(重点)

接下来到了最重要的一步:用 Future 封装基于回调函数的异步接口。

首先咱们来回顾一下 Future 的工作原理:它实质是一个状态机,只提供一个 poll 函数来驱动外部状态的演进。poll 函数的实质是事件轮询,它会查看本人关怀的事件是否曾经实现,如果实现就持续推动执行,否则就挂起期待。在挂起之前 Future 须要将本人的 waker 注册到后盾的事件响应器(Reactor)中,以便在事件产生时可能被唤醒。当事件产生后,Reactor 通过 waker 唤醒 Future,Future 再次执行上述 poll 的过程,这一次它会看到事件曾经产生,于是状态机得以持续推动。

基于 Future 的整个异步运行时的工作原理如下图所示:

其中 Future 接口的具体定义如下:

pub trait Future {
  type Output;
  // 尝试推动状态机://   如果事件就绪就返回 Poll::Ready(T)
  //   否则从 cx 中拿一个 waker 注册给 reactor,而后返回 Poll::Pending
  fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

上面咱们要做的就是:利用回调函数来实现下面的 poll 函数。次要解决两个问题:查问事件状态,注册 waker。

  • 查问事件状态:这个比拟好办。因为 UCX 的异步函数都会返回一个 request 对象,并且提供了接口来查问 request 的状态。所以咱们只须要将 request 对象退出 Future 的状态中即可。
// 异步函数的返回值就是 request
ucs_status_ptr_t ucp_tag_recv_nb (...);
// 查问 request 状态的接口
ucs_status_t ucp_tag_recv_request_test (
  void ∗request,
    ucp_tag_recv_info_t ∗info
);
  • 注册 waker:这件事须要一些技巧。之前咱们提到 callback 实际上充当了 Reactor 的角色,所以这里咱们须要将 waker 从 Future 传递给 callback。察看 UCX callback 函数的定义,能够发现其中一个参数就是 request:并且 UCX 容许咱们塞一些本人的私货到 request 对象中,正好就解决了传递 waker 的问题。
typedef void(∗ ucp_tag_recv_callback_t) (
  void ∗request,  // 能够通过这里,在 Future 和 callback 之间传递信息
  ucs_status_t status,
  ucp_tag_recv_info_t ∗info
);

接下来正式开始实现!首先定义咱们要向 request 中夹带的私货:

#[derive(Default)]
struct Request {
  // 只有一个 waker,应用了 futures 库中的 AtomicWaker 来保障原子性
  waker: futures::task::AtomicWaker,
}
// NOTE:这里无需 #[repr(C)],因为操作外部变量都是在 Rust 中实现的 

而后向 UCX Context 注册这个私货:

impl Context {pub fn new() -> Arc<Self> {
    // 创立 Context 的时候注册私货信息
    let params = ucp_params_t {request_size: std::mem::size_of::<Request>() as u64,
      request_init: Some(Request::init),
      request_cleanup: Some(Request::cleanup),
            ...
    };
    // 结构 Context 略
  }
}
impl Request {
    // 初始化私货(原地结构)unsafe extern "C" fn init(request: *mut c_void) {(request as *mut Self).write(Request::default());
    // 留神:不能应用 *request = xxx; 的写法
    // 因为 request 指向的内存是未初始化状态,会导致在未定义内存上触发析构函数!}
    // 清理私货(原地析构)unsafe extern "C" fn cleanup(request: *mut c_void) {std::ptr::drop_in_place(request as *mut Self)
  }
}
接下来将整个 request 包装成一个 Future:// 因为不同的操作会有不同的返回值,这里就用泛型参数 T 示意返回值类型
struct RequestHandle<T> {
  // UCX 返回的 request 对象,它的头部就是咱们的私货
  ptr: ucs_status_ptr_t,
    // 查问 request 状态的函数
  poll_fn: unsafe fn(ucs_status_ptr_t) -> Poll<T>,
}
//////////////////////// 外围代码 /////////////////////////
impl<T> Future for RequestHandle<T> {
  type Output = T;
  fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
    // 查问状态,如果就绪间接返回
    if let ret @ Poll::Ready(_) = unsafe {(self.poll_fn)(self.ptr) } {return ret;}
    // 注册 waker(通过私货)let request = unsafe {&mut *(self.ptr as *mut Request) };
    request.waker.register(cx.waker());
    // 返回期待,挂起工作
    Poll::Pending
  }
}
//////////////////////////////////////////////////////////
impl<T> Drop for RequestHandle<T> {fn drop(&mut self) {
    // request 要求手动开释
    unsafe {ucp_request_free(self.ptr as _) };
  }
}

最初,用这个 Future 实现一个残缺的 recv 过程:

impl Endpoint {pub async fn tag_recv(&self, tag: u64, buf: &mut [u8]) -> usize {
    // 首先实现 callback
    unsafe extern "C" fn callback(request: *mut c_void, ...) {
      // 只需简略地从 request 中取出 waker 唤醒即可
      let request = &mut *(request as *mut Request);
      request.waker.wake();}
    // 发动异步操作,调用 C 函数,传入 callback
    let status = unsafe {ucp_tag_recv_nb(self.handle, ..., Some(callback))
    };
    if UCS_PTR_IS_PTR(status) {panic!("failed to recv tag: {:?}", UCS_PTR_RAW_STATUS(status));
    }
    // 将返回的 request 包装成 Future 并 await 后果!RequestHandle {
      ptr: status,
      poll_fn: poll_tag,    // 查问状态的具体实现略
    }.await
  }
}

通过这样一番包装,咱们就能够简略地应用一行命令,以同步格调实现 IO 操作了:

let len = worker.tag_recv(tag, &mut buf, ...).await;

封装 worker progress

终于完结了吗?别忘了所有的回调函数都是由 worker.progress() 驱动的。用户必须定期调用这个函数,不然所有工作都会卡住不动了。

那么解决办法也非常简单:咱们在异步运行时中首先创立一个协程,不停地调用 worker.progress(),而后 yield 让出。为了做到与具体的异步运行时库无关,咱们这里只实现了这个协程自身,用户须要手动 spawn 它:

impl Worker {pub async fn polling(self: Rc<Self>) {
    // 不停循环直到 worker 的其它援用都开释了为止
    while Rc::strong_count(&self) > 1 {
      // 不停地调用 progress,直到没有新事件产生
      while self.progress() != 0 {}
      // 临时让出 CPU,期待下次调度
      futures_lite::future::yield_now().await;}
  }
}
// 用法示例:tokio::task::spawn_local(worker.clone().polling());

当所有其它协程工作都挂起时,运行时就会调度运行 polling 协程,用新事件触发若干回调函数,从而唤醒对应的协程。

在 Rust 异步运行时中应用 UCX

到此为止咱们就实现了对 UCX 异步封装的次要工作。值得注意的是,下面过程只用到了 Rust 语言内建的 async-await 语法,以及规范库中定义的最外围接口(Future 等),齐全不依赖于任何一种特定的运行时环境(Tokio,async-std,smol 等)。

接下来咱们以 Tokio 为例,介绍在 Rust 异步运行时中应用 async-ucx 构建一个残缺可用程序的过程。这里咱们实现一个最简略的 echo 程序:客户端向服务端发送一条音讯,服务端收到后打印进去。残缺代码在 examples/tag.rs。

首先写主函数:

use async_ucx::ucp::*;
use std::io::Result;

// 应用 Tokio 默认的单线程运行时
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {env_logger::init();
  // 这里须要创立一个 LocalSet,能力 spawn !Send 的单线程 Future
  let local = tokio::task::LocalSet::new();
  // 依据命令行参数决定是服务端还是客户端
  if let Some(server_addr) = std::env::args().nth(1) {local.run_until(client(server_addr)).await?;
  } else {local.run_until(server()).await?;
  }
  Ok(())
}

接下来实现服务端:

async fn server() -> Result<()> {println!("server");
  // 创立 worker 和后盾 progress 协程
  let context = Context::new();
  let worker = context.create_worker();
  tokio::task::spawn_local(worker.clone().polling());

  // 创立 listener 并期待连贯
  let mut listener = worker.create_listener("0.0.0.0:0".parse().unwrap());
  println!("listening on {}", listener.socket_addr());
  let connection = listener.next().await;
  let _endpoint = worker.accept(connection);
  println!("accept");

  // 接管音讯并打印进去
  let mut buf = [0u8; 0x100];
  let len = worker.tag_recv(100, &mut buf).await;
  let msg = std::str::from_utf8(unsafe { transmute(&buf[..len]) }).unwrap();
  println!("recv: {:?}", msg);
  Ok(())
}

客户端更加简略:

async fn client(server_addr: String) -> Result<()> {println!("client");
  // 创立 worker 和后盾 progress 协程
  let context = Context::new();
  let worker = context.create_worker();
  tokio::task::spawn_local(worker.clone().polling());

  // 建设连贯
  let endpoint = worker.connect(server_addr.parse().unwrap());
  println!("connect to {:?}", server_addr);
  
  // 发送音讯
  let msg = b"hello";
  endpoint.tag_send(100, msg).await;
  println!("send: {:?}", msg);
  Ok(())
}

就这么简略,不到 50 行代码就能够实现一个残缺的客户端和服务端。相比之下,UCX 官网提供的 ucp_hello_world 程序足足用了好几百行 C 代码才实现了同样的性能。这阐明咱们的封装更加简略易用,同时 async Rust 可能极大的进步异步编程开发效率。

总结与瞻望

本文介绍了用 async Rust 封装 UCX 异步接口的次要过程。其中波及的外围常识技巧包含:

  • C 语言中用回调函数解决异步逻辑的办法
  • Rust 语言中 Future 的角色和性能,异步运行时的基本原理
  • 如何用 Future 包装回调函数,如何在二者之间传递信息
  • 如何实现 Reactor,用事件驱动整个异步框架的运行

目前 async-ucx 曾经根本实现了对 UCP 接口的封装,还剩下另一半 UCT 接口没有实现。

此外,目前 async-ucx 还只反对以 busy-polling 轮询模式处理事件,这会使得运行它的 CPU 核始终处于 100% 满载状态。这样做当然性能是最高的,然而不太节能。UCX 本身反对休眠 - 唤醒机制,能够在没有事件产生的时候休眠以后线程、让出 CPU、等事件产生时再唤醒。其外部实现是为每个 worker 都创立了一个 event_fd,用户能够拿它去做 epoll,从而阻塞线程期待事件产生。

然而,要用好这一机制须要与 Rust 异步执行器进行深度整合。据我理解,至多 Tokio 1.0 是没有裸露相干接口的。而且这部分的实现一旦有任何闪失,都会导致唤醒遗失、程序卡住的问题呈现。将来如何迷信地实现休眠机制也是一个不小的挑战。

总的来说,异步 IO 在反对极高性能的同时也带来了极大的编程复杂性,而 Rust 的 async 协程机制很好的压抑了这种复杂性。在我看来,Rust 是这个时代最适宜编写简单 IO 零碎的语言。欢送感兴趣的读者持续关注,一起退出到咱们颠覆 C/C++ 旧世界的行列中来!

作者 | 王润基

转自《Rust Magazine 中文精选》

退出移动版