共计 6051 个字符,预计需要花费 16 分钟才能阅读完成。
背景
vertx 是一个我去年看完 netty 就一直想看看的工具,但因为拖延加懒,最近才看了看文档写了写 demo, 算是对它有了一点点了解,之所以想写一点,其实是想自己给自己总结下 vertx 的一些核心的概念。
vertx core
vertx core 提供了一些 vertx 的基本操作,如经常用到的
- 编写 TCP 客户端和服务器
- 编写 HTTP 客户端和服务器
- EventBus
- file 操作
- HA
- 集群
先上一段代码看下 vertx 创建一个 httpServer:
// 创建一个 vertx 实例
VertxOptions vo = new VertxOptions();
vo.setEventLoopPoolSize(1);
Vertx vertx = Vertx.vertx(vo);
vertx.deployVerticle(MyFirstVerticle.class.getName());
DeploymentOptions().setInstances(2)
//MyFirstVerticle.java
public class MyFirstVerticle extends AbstractVerticle {public void start() {vertx.createHttpServer().requestHandler(req -> {System.out.println(Thread.currentThread());
try {Thread.sleep(1000L);
} catch (InterruptedException e) {e.printStackTrace();
}
req.response()
.putHeader("content-type", "text/plain")
.end("Hello World!");
this.deploymentID();
Context c=vertx.getOrCreateContext();}).listen(8080);
}
}
vertx 实例 是最核心的一个对象 是宁做几乎一切事情的基础,包括创建客户端和服务器、获取事件总线的引用、设置定时器等等。
是不是很想说一句,嗨,就这,不就 nodejs 吗
EventLoop
EventLoop 算是 vertx 的基本模型了,简单的讲就是所有的操作都是以 触发 的方式来进行,将 IO 的操作完全交给 vertx, 开发者真正要关心的是 IO 各个阶段的 事件 ,讲的朴素一点就像是 js 的回调函数一样。
个人觉得 vertx 的 EventLoop 基本等同于 Netty 的模型,如果真要探索起来,怕是要从 Linux 的多路复用,select 函数,java 的 NIO,和 netty 一路将过来了,所以尝试用画图的方式来更形象的描绘:
其实 EventLoop 就是一条线程,上面挂的每一个 channel 就是一个 IO 连接,底层利用的就是 IO 多路复用加 select 或 poll,epoll, 来保证每一个线程可以保证控制很多个 IO 连接,而每一个连接都会挂一个 handler,来处理这个连接的 每一个事件 例如:init,connected,read.writer,close。
这个模型的左半部分同样适用于 netty。右半部分有一个 workPool 是一个线程池,是 vertx 新增的东西,是用来专门处理耗时操作的,如 file, 阻塞的 DB 操作,为什么要加这个概念哪,因为 不要阻塞 EventLoop是 NIO 的基本守则。
阻塞操作操作代码如下:
executor.executeBlocking(future -> {System.out.println("Action Thread"+Thread.currentThread());
// 调用一些需要耗费显著执行时间返回结果的阻塞式 API
String result = blockingMethod("hello");
future.complete(result);
}, res -> {System.out.println("Handler Thread"+Thread.currentThread());
System.out.println("The result is:" + res.result());
executor.close();});
verticle
verticle 其实一直是让我困惑的一个概念,因为 vertx 的主要运行,基本都是围绕 vertx 实例来进行的,后面我为 verticle 找到了一个合理的角色,叫他为 vertx 实例的一个 发布单元,什么是一个 发布单元哪,举个例子,一个 HttpServer 的发布是一个发布单元。
verticle 具有以下几个特点:
- 每个 verticle 占用一个 EventLoop 线程,且只对应一个 EventLoop
- 每个 verticle 中创建的 HttpServer,EventBus 等等,都会在这个 verticle 回收时同步回收
- 在多个 verticle 中创建同样端口的 httpServer,不会有错误,会变成两个 EventLoop 线程处理同一个 HttpServer 的连接,所以多核机器,肯定是需要设置多个 verticle 的实例来加强性能的。
可以这么想,vertx 实例就是一台服务器,而 verticle 就是上面跑的进程。
EventBus
EventBus 是沟通 verticle 的桥梁,且可沟通集群中不同 vertx 实例的 verticle,操作很简单。这个似乎概念很简单,就是队列呗,上段代码看看:
// 创建一个 EventBus
EventBus eb = vertx.eventBus();
req.bodyHandler(totalBuffer -> {eb.send("news.uk.sport", totalBuffer.toString("UTF-8"));
});
// 消费
EventBus eb = vertx.eventBus();
eb.consumer("news.uk.sport", message -> {System.out.println("前台传入的内容:" + message.body()+""+this);
});
模型如图:
vertx web
vertx core 已经提供了基本的 HttpServer 的操作,但实际上功能远远不够正常的开发,所以 vertx web 作为一个拓展包,是 web 开发必须的。他提供了一个基本概念router,来进行各种匹配,使得请求能进入正确的 handler,其实就是有点 springMvc 的各种路径匹配。使用起来代码:
HttpServer server = vertx.createHttpServer();
Router router = Router.router(vertx);
router.route("/some/path/").handler(routingContext -> {HttpServerResponse response = routingContext.response();
// 如果不能一次输出所有的 response, 就需要设置为 true
response.setChunked(true);
response.write("route1\n");
});
server.requestHandler(router).listen(8080);
web 包括的功能很多,有各种匹配,content-type 匹配,路径规则匹配,CROS, 错误处理,文件传输 等等
vertx fileSystem
vretx 中的文件操作主要采用了 javaNio 包中的文件操作,通过看源码我们可以发现文件操作也是运行在 workPool 中的,看下代码:
fs.copy("C:\\Users\\Administrator\\Desktop\\Untitled-4.txt", "C:\\Users\\Administrator\\Desktop\\ss.txt", res -> {System.out.println("file copy handle" + System.currentTimeMillis());
System.out.println("file copy handle" + Thread.currentThread());
if (res.succeeded()) {System.out.println("success");
} else {System.out.println("error");
}
})
源码:
//FileSystemImpl.java
/**
* Run the blocking action using a thread from the worker pool.
*/
public void run() {context.executeBlockingInternal(this, handler);
}
vertx DB
vertx 其实提供了 两种数据库操作。一种是正常的 jdbc 操作,一种是异步非阻塞的数据库操作,但是只限于 PG 和 mysql。
jdbc 操作
看一段代码:
JDBCClient client = JDBCClient.createShared(vertx, new JsonObject()
.put("url", "jdbc:postgresql://localhost:5432/postgres")
.put("driver_class", "org.postgresql.Driver")
.put("max_pool_size", 30).put("user","postgres").put("password","postgres"));
client.getConnection(res -> {if (res.succeeded()) {SQLConnection connection = res.result();
connection.query("SELECT * FROM userb", res2 -> {if (res2.succeeded()) {ResultSet rs = res2.result();
System.out.println(rs.toJson());
}
});
} else {System.out.println("连接失败"); }
});
点进去发现:其实做的还是阻塞操作,
//AbstractJDBCAction.java
public void execute(Connection conn, TaskQueue statementsQueue, Handler<AsyncResult<T>> resultHandler) {this.ctx.executeBlocking((future) -> {this.handle(conn, future);
}, statementsQueue, resultHandler);
}
异步数据库操作
这我一直很好奇的,因为 jdbc 在天然上就是阻塞的,所以要想做到数据库的异步,就得放弃厂商提供的 driver,自己做一套编解码,看下代码:
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(5432)
.setHost("localhost")
.setDatabase("postgres")
.setUser("postgres")
.setPassword("postgres");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the client pool
PgPool client = PgPool.pool(vertx, connectOptions, poolOptions);
client.query("SELECT * FROM userb", ar -> {if (ar.succeeded()) {RowSet<Row> result = ar.result();
result.forEach((r) -> response.write((String)r.getValue("name")));
response.end();} else {System.out.println("Failure:" + ar.cause().getMessage();}
});
这样就能做到数据库的异步操作,且可以包含事务的控制。简直丝滑
关于数据库异步的思考
数据库异步看起来十分黑科技,但是我们应该明白的是,异步非阻塞的 NIO 最大的优点就在于利用很少的线程来控制更多的 IO 连接,这使得在超多连接,IO 密集的操作中有更好的表现,但是数据库连接本来一个 java 系统就不会占用几个,记得以前看过一个文章讲连接数设置最好 =CPUcore2磁盘数,那么数据库的 NIO 在一般的业务系统中,似乎并不必要。
插一个知乎上大佬的回答:
https://www.zhihu.com/questio…
vertx 模型图
简单明了
vertx 的缺点
说到缺点,明显就是回调地狱了,主要还是看过 python 的协程,所以难免拿来做个比较,
摘抄自廖雪峰的 python 教程
import asyncio
from aiohttp import web
async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'<h1>Index</h1>')
async def hello(request):
await asyncio.sleep(0.5)
text = '<h1>hello, %s!</h1>' % request.match_info['name']
return web.Response(body=text.encode('utf-8'))
async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
app.router.add_route('GET', '/hello/{name}', hello)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
可以看到协程 可以将本该做回调的 response handler 变成了更符合编程习惯的方式,还是比较舒服的。
讲点废话
本来还想把 demo 放到 github 上来给像我一样懒的人直接 clone 下来跑的,但是确实都是官网的例子,放上去太羞耻了。
这文章写的很纠结,写详细了东西太多,写总结性的东西吧,太抽象,没看过的觉得一脸懵,懂得觉得写的没啥意义。
就这吧,如果有大佬想和我讨论或是哪里写的不对,请积极发炎。