乐趣区

springboot六springboot与webflux结合初探

spring-cloud-gateway 的 ReactorHttpHandlerAdapter

这几天看了看 spring-cloud-gateway 的请求处理流程,因为之前一直用的 springboot1.x 和 spring4,一开始对 spring-cloud-gateway 的处理流程有点懵逼,找不到入口,后来跟了代码,在网上找了点资料,发现 spring-cloud-gateway 的入口在 ReactorHttpHandlerAdapter 的 apply 方法

public class ReactorHttpHandlerAdapter implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> {private static final Log logger = HttpLogging.forLogName(ReactorHttpHandlerAdapter.class);


    private final HttpHandler httpHandler;
    public ReactorHttpHandlerAdapter(HttpHandler httpHandler) {Assert.notNull(httpHandler, "HttpHandler must not be null");
        this.httpHandler = httpHandler;
    }
    @Override
    public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
        try {ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
            ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);

            if (request.getMethod() == HttpMethod.HEAD) {response = new HttpHeadResponseDecorator(response);
            }

            return this.httpHandler.handle(request, response)
                    .doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete:" + ex.getMessage()))
                    .doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
        }
        catch (URISyntaxException ex) {if (logger.isDebugEnabled()) {logger.debug("Failed to get request URI:" + ex.getMessage());
            }
            reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
            return Mono.empty();}
    }

}

该方法的作用就是把接收到的 HttpServerRequest 或者最终需要返回的 HttpServerResponse,包装转换为 ReactorServerHttpRequest 和 ReactorServerHttpResponse。

spring-webflux

当然,这篇文章的主要内容不是谈论 spring-cloud-gateway 了,因为之前一直用的 spring4,所以对 spring5 当中的反应式编程范式和 webflux 不太了解,所以先写个 demo 了解一下
第一步:引入相关 pom, 测试的相关 pom 根据自己的需要引入

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

第二步:创建一个 HandlerFunction

public class TestFunction implements HandlerFunction<ServerResponse> {

    @Override
    public Mono<ServerResponse> handle(ServerRequest serverRequest) {return ServerResponse.ok().body(Mono.just(parse(serverRequest, "args1") + parse(serverRequest, "args2"))
                , Integer.class);
    }

    private int parse(final ServerRequest request, final String param) {return Integer.parseInt(request.queryParam(param).orElse("0"));
    }
}

第三步:注入一个 RouterFunction

@Configuration
public class TestRouteFunction {

    @Bean
    public RouterFunction<ServerResponse> routerFunction() {return RouterFunctions.route(RequestPredicates.GET("/add"), new TestFunction());
    }
}

第四步:在 webflux 中,也可以使用之前的 java 注解的编程方式,我们也创建一个 controller

@RestController
@RequestMapping("/api/test")
public class HelloController {@RequestMapping("/hello")
    public Mono<String> hello() {return Mono.just("hello world");
    }
}

第五步:创建启动类

@SpringBootApplication
public class Spring5DemoApplication {public static void main(String[] args) {SpringApplication.run(Spring5DemoApplication.class, args);
    }
}

第六步:启动项目,访问如下两个接口都可以

http://localhost:8080/api/test/hello
http://localhost:8080/add?args1=2&args2=3

和 spring-boot 结合

通过上面的例子,我们看到基本的两个类:HandlerFunction 和 RouterFunction,同时 webflux 有如下特性:

  1. 异步非阻塞
  2. 响应式 (reactive) 函数编程,纯 lambda 表达式
  3. 不仅仅是在 Servlet 容器中 tomcat/jetty 中运行,同时支持 NIO 的 Netty 和 Undertow 中,实际项目中,我们往往与 spring-boot 项目结合,我们跟进代码可以看看 spring-boot 是在什么时候创建的 server

一、SpringApplication

    public ConfigurableApplicationContext run(String... args) {StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        ConfigurableApplicationContext context = null;
        Collection<SpringBootExceptionReporter> exceptionReporters = new ArrayList<>();
        configureHeadlessProperty();
        SpringApplicationRunListeners listeners = getRunListeners(args);
        listeners.starting();
        try {
            ApplicationArguments applicationArguments = new DefaultApplicationArguments(args);
            ConfigurableEnvironment environment = prepareEnvironment(listeners,
                    applicationArguments);
            configureIgnoreBeanInfo(environment);
            Banner printedBanner = printBanner(environment);
            context = createApplicationContext();
            exceptionReporters = getSpringFactoriesInstances(
                    SpringBootExceptionReporter.class,
                    new Class[] { ConfigurableApplicationContext.class}, context);
            prepareContext(context, environment, listeners, applicationArguments,
                    printedBanner);
            refreshContext(context);
            afterRefresh(context, applicationArguments);
            stopWatch.stop();
            if (this.logStartupInfo) {new StartupInfoLogger(this.mainApplicationClass)
                        .logStarted(getApplicationLog(), stopWatch);
            }
            listeners.started(context);
            callRunners(context, applicationArguments);
        }
        catch (Throwable ex) {handleRunFailure(context, ex, exceptionReporters, listeners);
            throw new IllegalStateException(ex);
        }

        try {listeners.running(context);
        }
        catch (Throwable ex) {handleRunFailure(context, ex, exceptionReporters, null);
            throw new IllegalStateException(ex);
        }
        return context;
    }

我们只分析入口,其它代码暂时不管,找到 refreshContext(context); 这一行进去

二、ReactiveWebServerApplicationContext 的 refresh()

    @Override
    public final void refresh() throws BeansException, IllegalStateException {
        try {super.refresh();
        }
        catch (RuntimeException ex) {stopAndReleaseReactiveWebServer();
            throw ex;
        }
    }

三、ReactiveWebServerApplicationContext 的 onRefresh()

    @Override
    protected void onRefresh() {super.onRefresh();
        try {createWebServer();
        }
        catch (Throwable ex) {
            throw new ApplicationContextException("Unable to start reactive web server",
                    ex);
        }
    }

四、看到这里我们就找到入口方法了:createWebServer(), 跟进去,找到 NettyReactiveWebServerFactory 中创建 webserver

    @Override
    public WebServer getWebServer(HttpHandler httpHandler) {HttpServer httpServer = createHttpServer();
        ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);
        return new NettyWebServer(httpServer, handlerAdapter, this.lifecycleTimeout);
    }

看到 ReactorHttpHandlerAdapter 这个类想必特别亲切,在开篇说过是 spring-cloud-gateway 的入口,createHttpServer 方法的细节暂时没有去学习了,后续有时间去深入了解下

结语

spring5 的相关新特性也是在学习中,这一篇文章算是和 springboot 结合的入门吧,后续有时间再深入学习

更多文章可以访问博客:<https:www.zplxjj.com&gt; 和公众号

退出移动版