关于rust:译集成Axum-Hyper-Tonic-and-Tower-做webgRPC混合应用02

52次阅读

共计 9229 个字符,预计需要花费 24 分钟才能阅读完成。

这是系列的第二篇,总的四个局部如下:

  • Tower 概览
  • Hyper 原理和 Axum 初试
  • Tonic gRPC 客户端和服务端示范
  • 如何把 Axum 和 Tonic 服务集成到一个利用中

如果您还没有筹备好,我建议您先阅读文章的第一局部。

疾速回顾
  • Tower 提供了一个 Serivce trait,是一个根本的从申请到响应的异步函数。
  • Service 是参数化的申请类型,并且有一个 Response 的关联类型。
  • 并且还有 Error 和 Future 两个关联类型。
  • Serivce 容许在查看服务是否承受新的申请和解决申请都能够是异步。
  • 一个 web 利用最终会有两种异步申请和响应行为:

    • 外部:一个服务承受 HTTP 申请并返回响应。
    • 内部:一个服务承受新的网络连接并返回外部服务。

记住下面实现,让我看看 Hyper 实现形式。

Hyper 中服务

既然咱们对 Tower 有些理解,是时候让咱们投入到 Hyper 微妙世界。下面咱们看到的咱们间接用 Hyper 实现一次,然而 Hyper 有一些额定的麻烦须要解决:

  • Request 和 Response 类型的参数化是通过 request/response 主体示意的。
  • 有很多的个性 (traits) 和参数化的公共 API,很多参考文档中并没有提及而且很多表述不明确。

Hyper 是听从创建者模式初始化 Http 服务,来代替咱们以前假的服务示例中 run 函数。提供配置参数后,你就创立了一个沉闷的服务通过构建器提供的 serve 办法。不用深究太多,让我门看看文档中函数签名:

pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E>
where
    I: Accept,
    I::Error: Into<Box<dyn StdError + Send + Sync>>,
    I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
    S: MakeServiceRef<I::Conn, Body, ResBody = B>,
    S::Error: Into<Box<dyn StdError + Send + Sync>>,
    B: HttpBody + 'static,
    B::Error: Into<Box<dyn StdError + Send + Sync>>,
    E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
    E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,

有很多参数限度,并且很多文档表述不清晰。心愿咱们能搞清楚这些。然而目前,让咱们从简略的开始:Hyper 主页的 ”Hello world” 示例:

use std::{convert::Infallible, net::SocketAddr};
use hyper::{Body, Request, Response, Server};
use hyper::service::{make_service_fn, service_fn};

async fn handle(_: Request<Body>) -> Result<Response<Body>, Infallible> {Ok(Response::new("Hello, World!".into()))
}

#[tokio::main]
async fn main() {let addr = SocketAddr::from(([127, 0, 0, 1], 3000));

    let make_svc = make_service_fn(|_conn| async {Ok::<_, Infallible>(service_fn(handle))
    });

    let server = Server::bind(&addr).serve(make_svc);

    if let Err(e) = server.await {eprintln!("server error: {}", e);
    }
}

以下听从咱们以上创立的雷同模式:

  • handle 是一个从 Http 申请到响应的异步函数,并且如果失败返回 Infallible 谬误值。

    • Request 和 Response 都是一 Body 作为参数,Body 是默认的 Http 申请体示意。
  • handle 被 service_fn 包装并返回 Service<Request<Body>> 类型,这就是下面提及的 app_fn。
  • 咱们调用 make_service_fn,相似上文 app_factory_fn,返回须要的 Service<&AddrStream>(咱们简要说下 &AddrStream):

    • 咱们并不关怀 &AddrStream 值,所以能够疏忽。
    • 从外部函数 make_service_fn 返回的值必须是 Future,所以咱们要用 async 包起来。
    • Future 的返回值是一个 Result 类型,所以包装返回 Ok
    • 咱们须要帮忙编译器一个小忙给 Infallible 做下类型标注,否则编译器不晓得 Ok(service_fn(handle))类型表达式。

至多有三个理由可阐明用这种档次的形象写一个 web 利用是一件苦楚的事件:

  • 手动治理这些碎片化的服务是一种煎熬。
  • 高层次辅助函数的形式是非常少的,比方,申请体 Json 化。
  • 任何品种谬误在咱们类型中可能导致十分大的非本地错误信息,以致调试艰难。

所以,咱们稍后很快乐从 Hyper 转到 Axum,然而当初,让咱们持续摸索 Hyper。

躲避service_fnmake_service_fn

我感觉最有帮忙的是当尝试 Hyper 实现简略的 app 时候,不应用 service_fn 和 make_service_fn。所以当初让咱们来实现它。咱们将创立一个简略的计数器 app(如果不如意料,那就失败了)。咱们须要两种不同的数据类型:一个是是 ”app factory”, 一个是 app 本身。让咱们从 app 本身开始:

struct DemoApp {counter: Arc<AtomicUsize>,}

impl Service<Request<Body>> for DemoApp {
    type Response = Response<Body>;
    type Error = hyper::http::Error;
    type Future = Ready<Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut std::task::Context) -> Poll<Result<(), Self::Error>> {Poll::Ready(Ok(()))
    }

    fn call(&mut self, _req: Request<Body>) -> Self::Future {let counter = self.counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
        let res = Response::builder()
            .status(200)
            .header("Content-Type", "text/plain; charset=utf-8")
            .body(format!("Counter is at: {}", counter).into());
        std::future::ready(res)
    }
}

这个实现用了规范库的 std::future::Ready 构造体创立了一个了解 Ready 的 Future。换句话说,咱们的利用没有异步行为。我设置了一个 Error 关联类型 hyper::http::Error。例如:你向 header 提交了一个非无效的字符串,比方非 ASCII 字符,便会导致谬误。因为咱们浏览了很屡次,poll_ready 仅仅是期待解决下一个申请。

DemoAppFactory 的实现并没有多少不同:

struct DemoAppFactory {counter: Arc<AtomicUsize>,}

impl Service<&AddrStream> for DemoAppFactory {
    type Response = DemoApp;
    type Error = Infallible;
    type Future = Ready<Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut std::task::Context) -> Poll<Result<(), Self::Error>> {Poll::Ready(Ok(()))
    }

    fn call(&mut self, conn: &AddrStream) -> Self::Future {println!("Accepting a new connection from {:?}", conn);
        std::future::ready(Ok(DemoApp {counter: self.counter.clone()
        }))
    }
}

咱们给 Service 传了不同的参数,这次是 &AddrStream。我的确最后发现命名很迷糊。在 Tower 中,Service 以 Request 作为参数。DemoApp 中,是 Request<Body>。DemoAppFactory 中,参数是 &AddrStream,记住,一个 Service 仅仅是生成一个从输出到输入的可失败的异步函数。如参数可能是 Request<Body> 或者是 &AddrStream,也或者是全副。

类似的,除了 DemoApp 之外,”response” 这里也不是 HTTP 响应。我又发现是用术语 ” 输出 ” 和 ” 输入 ” 来防止申请和响应的名称重载更容易一些。

最初,咱们的 main 函数和以前 ”hello word” 的例子一样:

#[tokio::main]
async fn main() {let addr = SocketAddr::from(([0, 0, 0, 0], 3000));

    let factory = DemoAppFactory {counter: Arc::new(AtomicUsize::new(0)),
    };

    let server = Server::bind(&addr).serve(factory);

    if let Err(e) = server.await {eprintln!("server error: {}", e);
    }
}

如果你想更深刻了解,我举荐你在这个利用下面增加一些异步行为。你如何批改 Future?如果你用 trait 对象,如何精确定住这个对象?

当初是时候深刻我统一防止的话题了。

traits 深刻了解

然咱们从新回到下面 serve 函数签名:

pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E>
where
    I: Accept,
    I::Error: Into<Box<dyn StdError + Send + Sync>>,
    I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
    S: MakeServiceRef<I::Conn, Body, ResBody = B>,
    S::Error: Into<Box<dyn StdError + Send + Sync>>,
    B: HttpBody + 'static,
    B::Error: Into<Box<dyn StdError + Send + Sync>>,
    E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
    E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,

写这篇文章之前,我从没有尝试深刻了解这些绑定。所以这对咱们来说是一个冒险!!!(或者最初以我的一些文件 PRs 完结)然咱们以类型变量开始。总的来说,咱们有四个类型变量:两个在 impl 块,两个在办法上:

  • I 示意新的链接流
  • E 代表执行器
  • S 示意咱们将运行的 service。应用咱们下面的术语就是 ”app factory”。用 Tower/Hyper 术语,便是 ”make service”
  • B 是 service 返回的 response 体(是 app 不是 “app factory”)
I:Accept

I 须要实现 Accept trait, 代表能够从一些资源中承受新的链接的能力。惟一实现这个的是装箱的 AddrIncoming, 能够从 SocketAddr 创立。实际上,这就是 Server::bind 性能。

Accept 有两个关联类型,Error 必须能够转化成谬误对象或者Into<Box<dyn StdError + Send + Sync>>. 这是每一个(简直?)咱们看到的谬误关联类型的需要,所以尔后咱们略过。咱们须要可能将任何产生谬误转换成对立的表达式。

Conn 关联类型代表的是私人链接。鉴于 AddrIncoming,Conn 的关联类型是addrStream. 为了通信必须实现 AsyncRead 和 AsyncWrite traits,为了不同线程间传递,必须实现 send trait 和‘static 以及 unpin。Unpin 的需要须要深刻到栈存储,我的确不晓得这么驱动的。

S: MakeServiceRef

很多 traits 没有呈现在公共文档中,MakeServiceRef 便是其中之一。这仿佛是成心的。上面材料:

Just a sort-of "trait alias" of MakeService, not to be implemented by anyone, only used as bounds.

你是否困惑咱们为什么失去 AddrStream 的援用?这个 trait 具备转变的能力。总的来看,绑定 S: MakeServiceRef<I::Conn, Body, ResBody = B> 意味着:

  • S 必须是 Service
  • S 必须承受输出类型 &I::Conn
  • 并且转换为新的 Service 为输入
  • 新的 Service 承受 Request<Body> 为输出,Response<ResBody> 为输入

当咱们探讨时:ResBody 必须实现HttpBody. 正如你所想的,下面提到的 Body 构造体要实现了HttpBody. 还有很多实现的。实际上,当咱们应用 Tonic 和 gRPC 时,其余响应体也须要咱们去解决。

NewSvcExec and ConnStreamExec

E 参数的默认值是 Exec,在生成的文档中并没有呈现。然而你能够在这些材料中看到。Exec 的次要思维是指定如何派生工作,默认应用的是 tokio::spawn;

我不太确定所有这些是如何实现的。然而我置信题目中两个 trait 容许对链接 service 和 申请 service 应用不同的工作解决。

Axum 初试

Axum 是一个新的 web 框架,它是撰写这残缺的博客文章初衷。咱们不再像下面那样间接应用 Hyper 解决,而是应用 Axum 从新实现咱们的计数器 web 服务。咱们用 axum = "0.2",crate 文档提供了 Axum 很好的概述,我不打算在这里复制信息。相同,这里是我重写代码,咱们将剖析以下几个次要局部:

use axum::extract::Extension;
use axum::handler::get;
use axum::{AddExtensionLayer, Router};
use hyper::{HeaderMap, Server, StatusCode};
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

#[derive(Clone, Default)]
struct AppState {counter: Arc<AtomicUsize>,}

#[tokio::main]
async fn main() {let addr = SocketAddr::from(([0, 0, 0, 0], 3000));

    let app = Router::new()
        .route("/", get(home))
        .layer(AddExtensionLayer::new(AppState::default()));

    let server = Server::bind(&addr).serve(app.into_make_service());

    if let Err(e) = server.await {eprintln!("server error: {}", e);
    }
}

async fn home(state: Extension<AppState>) -> (StatusCode, HeaderMap, String) {
    let counter = state
        .counter
        .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
    let mut headers = HeaderMap::new();
    headers.insert("Content-Type", "text/plain; charset=utf-8".parse().unwrap());
    let body = format!("Counter is at: {}", counter);
    (StatusCode::OK, headers, body)
}

首先,我不探讨 AddExtensionLayer/Extension,这是在咱们利用中分享共享状态的形式。这和咱们剖析 Hyper 和 Tower 不相干,所以我提供了个链接 link to the docs demonstrating how this works。乏味的是,你会发现这个实现依赖的是 Tower 提供的中间件,所以,两者并没有平安分来到。

总之,回到咱们探讨的内容。在 main 函数办法内,咱们当初用路由的概念去构建咱们的利用。

let app = Router::new()
    .route("/", get(home))
    .layer(AddExtensionLayer::new(AppState::default()));

实质上说就是:” 当申请门路 ’/’ 时,调用 home 函数,并增加中间件解决拓展的事件 ”。home 函数应用提取器失去 AppState,并返回(StatusCode, HeaderMap, String) 元祖作为响应。在 Axum 中,任何实现了IntoResponse trait 的可作为处理函数的返回值。

无论如何,咱们 app 的值是路由。然而路由不能间接被 Hyper 运行。相同,咱们须要转换为 MakeService,侥幸的是,这比较简单:咱们能够调用 app.into_make_service(). 转换。让咱们看看办法签名:

impl<S> Router<S> {pub fn into_make_service(self) -> IntoMakeService<S>
    where
        S: Clone;
}

让我走的更远一些:

pub struct IntoMakeService<S> {/* fields omitted */}

impl<S: Clone, T> Service<T> for IntoMakeService<S> {
    type Response = S;
    type Error = Infallible;
    // other stuff omitted
}

Router<S> 是一个能够生成 S 服务的值,ntoMakeService<S> 将获取某种类型的连贯信息 T,并异步生成该服务 S。因为 Error 是 Infallible 类型,咱们晓得是不可能失败的。正如咱们所说的异步,浏览 IntoMakeService 的 service 实现,咱们看到了一种相熟的模式:

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {Poll::Ready(Ok(()))
}

fn call(&mut self, _target: T) -> Self::Future {
    future::MakeRouteServiceFuture {future: ready(Ok(self.service.clone())),
    }
}

并且,能够留神到作为链接信息 T 的值并没有任何绑定和信息。IntoMakeService 仅仅扔掉了链接信息(如果你想进一步理解,请查看 into_make_service_with_connect_info.)换句话说:

  • Router<S> 是一个能够让咱们增加路由和中间件的类型
  • 能够转换 Router<S> 成 IntoMakeService<S>
  • 然而 IntoMakeService<S> 只是对 S 的包装以合乎 Hyper 的需要
  • 因而,真正的重点是 S

所以,S 的类型来自哪里?这取决于 router 和 layer 你的调用。比方 get 办法的签名如下:

pub fn get<H, B, T>(handler: H) -> OnMethod<H, B, T, EmptyRouter>
where
    H: Handler<B, T>,

pub struct OnMethod<H, B, T, F> {/* fields omitted */}

impl<H, B, T, F> Service<Request<B>> for OnMethod<H, B, T, F>
where
    H: Handler<B, T>,
    F: Service<Request<B>, Response = Response<BoxBody>, Error = Infallible> + Clone,
    B: Send + 'static,
{
    type Response = Response<BoxBody>;
    type Error = Infallible;
    // and more stuff
}

get 办法返回 OnMethod 值,OnMethod 是一个 Service, 承受 Request作为参数并且返回Response<BoxBody>. 因为办法体表述有很多有意思的逻辑,咱们最终会深刻探讨。然而基于咱们对 Hyper 和 Tower 新的了解,这里的类型也变的不那么艰涩难懂,实际上,反而更容易了解。

对于下面例子的最初一点须要阐明的是,Axum 能够间接和 Hyper 协同单干,包含 Server 类型。Axum 能够从 Hyper 从新导出许多内容,如果须要,您能够间接从 Hyper 应用这些类型。换句话说,Axum 十分靠近底层库,只是在下面提供了一些便当。这也是为什么我对深入研究 Axum 感到十分兴奋的起因之一。

综上所述:

  • Tower 提供了从输出到输入的形象可异步的函数,可能会失败,被称之为 service.
  • HTTP 服务有两个层面的服务,低层次的服务是从 HTTP 申请到 HTTP 响应,高层次的服务是依据链接信息返回低层次的服务
  • Hyper 有很多额定的个性,有些是可见的,有些是不可见的,这容许更多的通用性,也让事件变得更简单。
  • Axum 位于 Hyper 的下层,为许多常见状况提供了一个更易于应用的接口。它通过提供 Hyper 冀望看到的雷同类型的 service 来实现这一点。围绕 HTTP 主体示意进行了一系列扭转。

下一段旅程:让咱们来看下一个构建 Hyper 服务的库,咱们将在下一篇进行介绍。

浏览原文

正文完
 0