乐趣区

Deno源码简析三JS与Rust交互

开始

今天开始分析 JS 与 Rust 是如何交互的,毕竟 JS 的性能在某些场景下还是不能胜任,这个时候就是 Rust 闪亮登场的时候,两者互相补足,无往不利!

op

之前一直说的 op,我个人觉得就是 deno 上的一个插件机制,deno 上所有的功能基本是都是在这个插件机制基础上工作的。

send 和 recv

在一开始的架构图我们就可以看到,在 deno 里面 JS 与 Rust 的交互只能通过 send 和 recv 这两个方法,调用 send 的实际原理也很简单根据 opId 去调用对应的 rust 方法,如果是同步的方法那就可以直接返回,但是如果是异步方法就需要用到 recv 去接收返回的值。

直接从打开文件 open/openAsync 这个 op 开始分析:

export function openSync(path: string, options: OpenOptions): number {
  const mode: number | undefined = options?.mode;
  return sendSync("op_open", { path, options, mode});
}

export function open(path: string, options: OpenOptions): Promise<number> {
  const mode: number | undefined = options?.mode;
  return sendAsync("op_open", {
    path,
    options,
    mode,
  });
}

这里直接调用 sendSync/sendAsync 方法,然后再跟踪下去 sendSync 和 sendAsync:

export function sendSync(
  opName: string,
  args: object = {},
  zeroCopy?: Uint8Array
): Ok {const opId = OPS_CACHE[opName];
  util.log("sendSync", opName, opId);
  const argsUi8 = encode(args);
  const resUi8 = core.dispatch(opId, argsUi8, zeroCopy);
  util.assert(resUi8 != null);

  const res = decode(resUi8);
  util.assert(res.promiseId == null);
  return unwrapResponse(res);
}

export async function sendAsync(
  opName: string,
  args: object = {},
  zeroCopy?: Uint8Array
): Promise<Ok> {const opId = OPS_CACHE[opName];
  util.log("sendAsync", opName, opId);
  const promiseId = nextPromiseId();
  args = Object.assign(args, { promiseId});
  const promise = util.createResolvable<Ok>();

  const argsUi8 = encode(args);
  const buf = core.dispatch(opId, argsUi8, zeroCopy);
  if (buf) {
    // Sync result.
    const res = decode(buf);
    promise.resolve(res);
  } else {
    // Async result.
    promiseTable[promiseId] = promise;
  }

  const res = await promise;
  return unwrapResponse(res);
}

sendSync 相对 sendAsync 会简单一点,直接从 OPS_CACHE 拿到对应的 opId,然后再把参数转成 Uint8Array 就可以分发这次调用下去。

而 sendAsync 则需要多创建一个 promise,然后把 promiseId 附加到参数上,在分发这个次调用下去,那么这次异步调用怎么从 recv 方法接收结果回来的尼?

再去到 core.js,deno 在调用 init 的时候就设置了一个回调 handleAsyncMsgFromRust:

function init() {
    const shared = core.shared;
    assert(shared.byteLength > 0);
    assert(sharedBytes == null);
    assert(shared32 == null);
    sharedBytes = new Uint8Array(shared);
    shared32 = new Int32Array(shared);
    asyncHandlers = [];
    // Callers should not call core.recv, use setAsyncHandler.
    recv(handleAsyncMsgFromRust);
  }

而 handleAsyncMsgFromRust 所做的就是从 SharedQueue 上取出异步操作结果然后触发相应的异步处理器:

function handleAsyncMsgFromRust(opId, buf) {if (buf) {// This is the overflow_response case of deno::Isolate::poll().
      asyncHandlers[opId](buf);
    } else {while (true) {const opIdBuf = shift();
        if (opIdBuf == null) {break;}
        assert(asyncHandlers[opIdBuf[0]] != null);
        asyncHandlers[opIdBuf[0]](opIdBuf[1]);
      }
    }
  }

SharedQueue 本质上是一块 JS 与 Rust 都能共同访问的内存,而 SharedQueue 也有一套自身的内存布局:

总的来说这块内存最多能存 100 条异步操作结果或者少于 128 * 100bit(125kb)大小的内容,一旦超过这些设定,就会触发 overflow,立马从 Rust 切回到 JS 运行,让 JS 能够及时处理这些内容,所以这个 SharedQueue 是很重要的,可以影响整个应用的吞吐量。

再回到触发异步处理器,但是没到怎么触发 promise 的 resolve 方法,所以继续深入吧,再来到一开始初始化 ops 的地方:

function getAsyncHandler(opName: string): (msg: Uint8Array) => void {switch (opName) {
    case "op_write":
    case "op_read":
      return dispatchMinimal.asyncMsgFromRust;
    default:
      return dispatchJson.asyncMsgFromRust;
  }
}

// TODO(bartlomieju): temporary solution, must be fixed when moving
// dispatches to separate crates
export function initOps(): void {OPS_CACHE = core.ops();
  for (const [name, opId] of Object.entries(OPS_CACHE)) {core.setAsyncHandler(opId, getAsyncHandler(name));
  }
  core.setMacrotaskCallback(handleTimerMacrotask);
}

可以发现,除了 op_write/op_read 这两个 op 使用的是 dispatchMinimal.asyncMsgFromRust 方法,其余的都是使用 dispatchJson.asyncMsgFromRust 响应回调。而在 dispatchJson.asyncMsgFromRust 这个方法里面我们就可以看到它专门对 promise 做了处理:

export function asyncMsgFromRust(resUi8: Uint8Array): void {const res = decode(resUi8);
  util.assert(res.promiseId != null);

  const promise = promiseTable[res.promiseId!];
  util.assert(promise != null);
  delete promiseTable[res.promiseId!];
  promise.resolve(res);
}

根据我们之前传入的 promiseId,来获取 promise 然后直接 resolve。
那么还有一个小问题,dispatchMinimal.asyncMsgFromRust 和 dispatchJson.asyncMsgFromRust 有啥区别尼?实际上 dispatchMinimal.asyncMsgFromRust 是专门处理 io 的读写的,一般都是传入资源 id 和一个 buffer,等待 rust 的处理,然后返回处理后的字节数;而 dispatchJson.asyncMsgFromRust 参数都是经过 JSON.stringify 然后传给 rust 那边再解析取参。

那么还有 send 和 recv 这两个方法是在哪里定义的尼?
直接来到 core/bingding.rs 的initialize_context,在这里是 deno 初始化核心方法的地方(都是挂在 Deno.core 这个对象下),send 和 recv 也是在这里注入到 JS 的世界里面:

pub fn initialize_context<'s>(scope: &mut impl v8::ToLocal<'s>,) -> v8::Local<'s, v8::Context> {
    ...
    let mut recv_tmpl = v8::FunctionTemplate::new(scope, recv);
      let recv_val = recv_tmpl.get_function(scope, context).unwrap();
      core_val.set(
        context,
        v8::String::new(scope, "recv").unwrap().into(),
        recv_val.into(),);

      let mut send_tmpl = v8::FunctionTemplate::new(scope, send);
      let send_val = send_tmpl.get_function(scope, context).unwrap();
      core_val.set(
        context,
        v8::String::new(scope, "send").unwrap().into(),
        send_val.into(),);
      ...
}

手残画张图整理一下:

插件编写

… 待续

总结

整体下来,deno 在 js 与 rust 的交互方面是挺好理解的,感觉对 deno 的未来又多加了几分信心了。

退出移动版