deno原理篇-通信实现

1次阅读

共计 8832 个字符,预计需要花费 23 分钟才能阅读完成。

理解 deno- 基础篇 deno- 原理篇一启动加载
通信方式
deno 执行代码和 node 相似,包含同步和异步的方式,异步方式通过 Promise.then 实现。
Typescript/Javascript 调用 rust
在上一节中讲到 deno 的启动时会初始化 v8 isolate 实例,在初始化的过程中,会将 c ++ 的函数绑定到 v8 isolate 的实例上,在 v8 执行 Javascript 代码时,可以像调用 Javascript 函数一样调用这些绑定的函数。具体的绑定实现如下:
void InitializeContext(v8::Isolate* isolate, v8::Local<v8::Context> context) {
v8::HandleScope handle_scope(isolate);
v8::Context::Scope context_scope(context);

auto global = context->Global();

auto deno_val = v8::Object::New(isolate);
CHECK(global->Set(context, deno::v8_str(“libdeno”), deno_val).FromJust());

auto print_tmpl = v8::FunctionTemplate::New(isolate, Print);
auto print_val = print_tmpl->GetFunction(context).ToLocalChecked();
CHECK(deno_val->Set(context, deno::v8_str(“print”), print_val).FromJust());

auto recv_tmpl = v8::FunctionTemplate::New(isolate, Recv);
auto recv_val = recv_tmpl->GetFunction(context).ToLocalChecked();
CHECK(deno_val->Set(context, deno::v8_str(“recv”), recv_val).FromJust());

auto send_tmpl = v8::FunctionTemplate::New(isolate, Send);
auto send_val = send_tmpl->GetFunction(context).ToLocalChecked();
CHECK(deno_val->Set(context, deno::v8_str(“send”), send_val).FromJust());

auto eval_context_tmpl = v8::FunctionTemplate::New(isolate, EvalContext);
auto eval_context_val =
eval_context_tmpl->GetFunction(context).ToLocalChecked();
CHECK(deno_val->Set(context, deno::v8_str(“evalContext”), eval_context_val)
.FromJust());

auto error_to_json_tmpl = v8::FunctionTemplate::New(isolate, ErrorToJSON);
auto error_to_json_val =
error_to_json_tmpl->GetFunction(context).ToLocalChecked();
CHECK(deno_val->Set(context, deno::v8_str(“errorToJSON”), error_to_json_val)
.FromJust());

CHECK(deno_val->SetAccessor(context, deno::v8_str(“shared”), Shared)
.FromJust());
}
在完成绑定之后,在 Typescript 中可以通过如下代码实现 c ++ 方法和 Typescript 方法的映射
libdeno.ts
interface Libdeno {
recv(cb: MessageCallback): void;

send(control: ArrayBufferView, data?: ArrayBufferView): null | Uint8Array;

print(x: string, isErr?: boolean): void;

shared: ArrayBuffer;

/** Evaluate provided code in the current context.
* It differs from eval(…) in that it does not create a new context.
* Returns an array: [output, errInfo].
* If an error occurs, `output` becomes null and `errInfo` is non-null.
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
evalContext(code: string): [any, EvalErrorInfo | null];

errorToJSON: (e: Error) => string;
}

export const libdeno = window.libdeno as Libdeno;

在执行 Typescript 代码时,只需要引入 libdeno,就直接调用 c ++ 方法,例如:
import {libdeno} from “./libdeno”;
function sendInternal(
builder: flatbuffers.Builder,
innerType: msg.Any,
inner: flatbuffers.Offset,
data: undefined | ArrayBufferView,
sync = true
): [number, null | Uint8Array] {
const cmdId = nextCmdId++;
msg.Base.startBase(builder);
msg.Base.addInner(builder, inner);
msg.Base.addInnerType(builder, innerType);
msg.Base.addSync(builder, sync);
msg.Base.addCmdId(builder, cmdId);
builder.finish(msg.Base.endBase(builder));
const res = libdeno.send(builder.asUint8Array(), data);
builder.inUse = false;
return [cmdId, res];
}

调用 libdeno.send 方法可以将数据传给 c ++,然后通过 c ++ 去调用 rust 代码实现具体的工程操作。
Typescript 层同步异步实现
同步
在 Typescript 中只需要设置 sendInternal 方法的 sync 参数为 true 即可,在 rust 中会根据 sync 参数去判断是执行同步或者异步操作,如果 sync 为 true,libdeono.send 方法会返回执行的结果,rust 和 typescript 之间传递数据需要将数据序列化,这里序列化操作使用的是 flatbuffer 库。
const [cmdId, resBuf] = sendInternal(builder, innerType, inner, data, true);

异步实现
同理,实现异步方式,只需要设置 sync 参数为 false 即可,但是异步操作和同步相比,多了回掉方法,在执行异步通信时,libdeno.send 方法会返回一个唯一的 cmdId 标志这次调用操作。同时在异步通信完成后,会创建一个 promise 对象,将 cmdId 作为 key,promise 作为 value,加入 map 中。代码如下:
const [cmdId, resBuf] = sendInternal(builder, innerType, inner, data, false);
util.assert(resBuf == null);
const promise = util.createResolvable<msg.Base>();
promiseTable.set(cmdId, promise);
return promise;
rust 实现同步和异步
当在 Typescript 中调用 libdeno.send 方法时,调用了 C ++ 文件 binding.cc 中的 Send 方法,该方法是在 deno 初始化时绑定到 v8 isolate 上去的。在 Send 方法中去调用了 ops.rs 文件中的 dispatch 方法,该方法实现了消息到函数的映射。每个类型的消息对应了一种函数,例如读文件消息对应了读文件的函数。
pub fn dispatch(
isolate: &Isolate,
control: libdeno::deno_buf,
data: libdeno::deno_buf,
) -> (bool, Box<Op>) {
let base = msg::get_root_as_base(&control);
let is_sync = base.sync();
let inner_type = base.inner_type();
let cmd_id = base.cmd_id();

let op: Box<Op> = if inner_type == msg::Any::SetTimeout {
// SetTimeout is an exceptional op: the global timeout field is part of the
// Isolate state (not the IsolateState state) and it must be updated on the
// main thread.
assert_eq!(is_sync, true);
op_set_timeout(isolate, &base, data)
} else {
// Handle regular ops.
let op_creator: OpCreator = match inner_type {
msg::Any::Accept => op_accept,
msg::Any::Chdir => op_chdir,
msg::Any::Chmod => op_chmod,
msg::Any::Close => op_close,
msg::Any::FetchModuleMetaData => op_fetch_module_meta_data,
msg::Any::CopyFile => op_copy_file,
msg::Any::Cwd => op_cwd,
msg::Any::Dial => op_dial,
msg::Any::Environ => op_env,
msg::Any::Exit => op_exit,
msg::Any::Fetch => op_fetch,
msg::Any::FormatError => op_format_error,
msg::Any::Listen => op_listen,
msg::Any::MakeTempDir => op_make_temp_dir,
msg::Any::Metrics => op_metrics,
msg::Any::Mkdir => op_mkdir,
msg::Any::Open => op_open,
msg::Any::ReadDir => op_read_dir,
msg::Any::ReadFile => op_read_file,
msg::Any::Readlink => op_read_link,
msg::Any::Read => op_read,
msg::Any::Remove => op_remove,
msg::Any::Rename => op_rename,
msg::Any::ReplReadline => op_repl_readline,
msg::Any::ReplStart => op_repl_start,
msg::Any::Resources => op_resources,
msg::Any::Run => op_run,
msg::Any::RunStatus => op_run_status,
msg::Any::SetEnv => op_set_env,
msg::Any::Shutdown => op_shutdown,
msg::Any::Start => op_start,
msg::Any::Stat => op_stat,
msg::Any::Symlink => op_symlink,
msg::Any::Truncate => op_truncate,
msg::Any::WorkerGetMessage => op_worker_get_message,
msg::Any::WorkerPostMessage => op_worker_post_message,
msg::Any::Write => op_write,
msg::Any::WriteFile => op_write_file,
msg::Any::Now => op_now,
msg::Any::IsTTY => op_is_tty,
msg::Any::Seek => op_seek,
msg::Any::Permissions => op_permissions,
msg::Any::PermissionRevoke => op_revoke_permission,
_ => panic!(format!(
“Unhandled message {}”,
msg::enum_name_any(inner_type)
)),
};
op_creator(&isolate, &base, data)
};

// … 省略多余的代码
}
在每个类型的函数中会根据在 Typescript 中调用 libdeo.send 方法时传入的 sync 参数值去判断同步执行还是异步执行。
let (is_sync, op) = dispatch(isolate, control_buf, zero_copy_buf);

同步执行
在执行 dispatch 方法后,会返回 is_sync 的变量,如果 is_sync 为 true,表示该方法是同步执行的,op 表示返回的结果。rust 代码会调用 c ++ 文件 api.cc 中的 deno_respond 方法,将执行结果同步回去,deno_respond 方法中根据 current_args_的值去判断是否为同步消息,如果 current_args_存在值,则直接返回结果。
异步执行
在 deno 中,执行异步操作是通过 rust 的 Tokio 模块来实现的,在调用 dispatch 方法后,如果是异步操作,is_sync 的值为 false,op 不再是执行结果,而是一个执行函数。通过 tokio 模块派生一个线程程异步去执行该函数。
let task = op
.and_then(move |buf| {
let sender = tx; // tx is moved to new thread
sender.send((zero_copy_id, buf)).expect(“tx.send error”);
Ok(())
}).map_err(|_| ());
tokio::spawn(task);
在 deno 初始化时,会创建一个管道,代码如下:
let (tx, rx) = mpsc::channel::<(usize, Buf)>();

管道可以实现不同线程之间的通信,由于异步操作是创建了一个新的线程去执行的,所以子线程无法直接和主线程之间通信,需要通过管道的机制去实现。在异步代码执行完成后,调用 tx.send 方法将执行结果加入管道里面,event loop 会每次从管道里面去读取结果返回回去。
Event Loop
由于异步操作依赖事件循环,所以先解释一下 deno 中的事件循环,其实事件循环很简单,就是一段循环执行的代码,当达到条件后,事件循环会结束执行,deno 中主要的事件循环代码实现如下:
pub fn event_loop(&self) -> Result<(), JSError> {
// Main thread event loop.
while !self.is_idle() {
match recv_deadline(&self.rx, self.get_timeout_due()) {
Ok((zero_copy_id, buf)) => self.complete_op(zero_copy_id, buf),
Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(),
Err(e) => panic!(“recv_deadline() failed: {:?}”, e),
}
self.check_promise_errors();
if let Some(err) = self.last_exception() {
return Err(err);
}
}
// Check on done
self.check_promise_errors();
if let Some(err) = self.last_exception() {
return Err(err);
}
Ok(())
}
self.is_idle 方法用来判断是否所有的异步操作都执行完毕,当所有的异步操作都执行完毕后,停止事件循环,is_idle 方法代码如下:
fn is_idle(&self) -> bool {
self.ntasks.get() == 0 && self.get_timeout_due().is_none()
}
当产生一次异步方法调用时,会调用下面的方法,使 ntasks 内部的值加 1,
fn ntasks_increment(&self) {
assert!(self.ntasks.get() >= 0);
self.ntasks.set(self.ntasks.get() + 1);
}
在 event loop 循环中,每次从管道中去取值,这里 event loop 充消费者,执行异步方法的子线程充当生产者。如果在一次事件循环中,获取到了一次执行结果,那么会调用 ntasks_decrement 方法,使 ntasks 内部的值减 1,当 ntasks 的值为 0 的时候,事件循环会退出执行。在每次循环中,将管道中取得的值作为参数,调用 complete_op 方法,将结果返回回去。
rust 中将异步操作结果返回回去
在初始化 v8 实例时,绑定的 c ++ 方法中有一个 Recv 方法,该方法的作用时暴露一个 Typescript 的函数给 rust,在 deno 的 io.ts 文件的 start 方法中执行 libdeno.recv(handleAsyncMsgFromRust),将 handleAsyncMsgFromRust 函数通过 c ++ 方法暴露给 rust。具体实现如下:
export function start(source?: string): msg.StartRes {
libdeno.recv(handleAsyncMsgFromRust);

// First we send an empty `Start` message to let the privileged side know we
// are ready. The response should be a `StartRes` message containing the CLI
// args and other info.
const startResMsg = sendStart();

util.setLogDebug(startResMsg.debugFlag(), source);

setGlobals(startResMsg.pid(), startResMsg.noColor(), startResMsg.execPath()!);

return startResMsg;
}

当异步操作执行完成后,可以在 rust 中直接调用 handleAsyncMsgFromRust 方法,将结果返回给 Typescript。先看一下 handleAsyncMsgFromRust 方法的实现细节:
export function handleAsyncMsgFromRust(ui8: Uint8Array): void {
// If a the buffer is empty, recv() on the native side timed out and we
// did not receive a message.
if (ui8 && ui8.length) {
const bb = new flatbuffers.ByteBuffer(ui8);
const base = msg.Base.getRootAsBase(bb);
const cmdId = base.cmdId();
const promise = promiseTable.get(cmdId);
util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
promiseTable.delete(cmdId);
const err = errors.maybeError(base);
if (err != null) {
promise!.reject(err);
} else {
promise!.resolve(base);
}
}
// Fire timers that have become runnable.
fireTimers();
}
从代码 handleAsyncMsgFromRust 方法的实现中可以知道,首先通过 flatbuffer 反序列化返回的结果,然后获取返回结果的 cmdId,根据 cmdId 获取之前创建的 promise 对象,然后调用 promise.resolve 方法触发 promise.then 中的代码执行。
结尾
~ 下节讲一下 deno 中 import 的实现~

正文完
 0