关于node.js:理解Nodejs中的进程间通信

39次阅读

共计 8862 个字符,预计需要花费 23 分钟才能阅读完成。

前置常识

文件描述符

在 Linux 零碎中,所有都看成文件,当过程关上现有文件时,会返回一个文件描述符。
文件描述符是操作系统为了治理曾经被过程关上的文件所创立的索引,用来指向被关上的文件。
当咱们的过程启动之后,操作系统会给每一个过程调配一个 PCB 管制块,PCB 中会有一个文件描述符表,寄存以后过程所有的文件描述符,即以后过程关上的所有文件。

🤔 过程中的文件描述符是如何和系统文件对应起来的?
在内核中,零碎会保护另外两种表

  • 关上文件表(Open file table)
  • i-node 表(i-node table)

文件描述符就是数组的下标,从 0 开始往上递增,0/1/2 默认是咱们的输出 / 输入 / 谬误流的文件描述符
在 PCB 中保护的文件形容表中,能够依据文件描述符找到对应了文件指针,找到对应的关上文件表
关上文件表中保护了:文件偏移量 (读写文件的时候会更新);对于文件的状态标识;指向 i-node 表的指针
想要真正的操作文件,还得靠 i-node 表,可能获取到实在文件的相干信息

他们之间的关系

图解

  • 在过程 A 中,文件描述符 1 /20 均指向了同一关上文件表项 23,这可能是对同一文件屡次调用了 open 函数造成的
  • 过程 A/B 的文件描述符 2 都指向同一文件,这可能是调用了 fork 创立子过程,A/B 是父子关系过程
  • 过程 A 的文件描述符 0 和过程 B 的文件描述符指向了不同的关上文件表项,但这些表项指向了同一个文件,这可能是 A/B 过程别离对同一文件发动了 open 调用

总结

  • 同一过程的不同文件描述符能够指向同一个文件
  • 不同过程能够领有雷同的文件描述符
  • 不同过程的同一文件描述符能够指向不同的文件
  • 不同过程的不同文件描述符能够指向同一个文件

文件描述符的重定向

每次读写过程的时候,都是从文件描述符下手,找到对应的关上文件表项,再找到对应的 i-node 表

🤔如何实现文件描述符重定向?
因为在文件描述符表中,可能找到对应的文件指针,如果咱们扭转了文件指针,是不是后续的两个表内容就产生了扭转
例如:文件描述符 1 指向的显示器,那么将文件描述符 1 指向 log.txt 文件,那么文件描述符 1 也就和 log.txt 对应起来了

shell 对文件描述符的重定向

是输入重定向符号,< 是输出重定向符号,它们是文件描述符操作符
和 < 通过批改文件描述符扭转了文件指针的指向,来可能实现重定向的性能

咱们应用 cat hello.txt 时,默认会将后果输入到显示器上,应用 > 来重定向。cat hello.txt 1 > log.txt 以输入的形式关上文件 log.txt,并绑定到文件描述符 1 上

c 函数对文件描述符的重定向

dup

dup 函数是用来关上一个新的文件描述符,指向和 oldfd 同一个文件,共享文件偏移量和文件状态,

int main(int argc, char const *argv[])
{int fd = open("log.txt");
    int copyFd = dup(fd);
    // 将 fd 浏览文件置于文件开端,计算偏移量。cout << "fd =" << fd << "偏移量:" << lseek(fd, 0, SEEK_END) << endl;
    // 当初咱们计算 copyFd 的偏移量
    cout << "copyFd =" << copyFd << "偏移量:" << lseek(copyFd, 0, SEEK_CUR) << endl;
    return 0;
}

调用 dup(3) 的时候,会关上新的最小描述符,也就是 4,这个 4 指向了 3 所指向的文件,操作任意一个 fd 都是批改的一个文件

dup2

dup2 函数,把指定的 newfd 也指向 oldfd 指向的文件。执行完 dup2 之后,newfd 和 oldfd 同时指向同一个文件,共享文件偏移量和文件状态

int main(int argc, char const *argv[])
{int oldfd = open("log.txt");
    int newfd = open("log1.txt");
    dup2(oldfd, newfd);
    // 将 fd 浏览文件置于文件开端,计算偏移量。cout << "fd =" << fd << "偏移量:" << lseek(fd, 0, SEEK_END) << endl;
    // 当初咱们计算 copyFd 的偏移量
    cout << "copyFd =" << copyFd << "偏移量:" << lseek(copyFd, 0, SEEK_CUR) << endl;
    return 0;
}

Node 中通信原理

Node 中的 IPC 通道具体实现是由 libuv 提供的。依据零碎的不同实现形式不同,window 下采纳命名管道实现,*nix 下采纳 Domain Socket 实现。在应用层只体现为 message 事件和 send 办法。参考 nodejs 进阶视频解说:进入学习

父过程在理论创立子过程之前,会创立 IPC 通道并监听它,等到创立出实在的子过程后,通过环境变量 (NODE_CHANNEL_FD) 通知子过程该 IPC 通道的文件描述符。

子过程在启动的过程中,会依据该文件描述符去连贯 IPC 通道,从而实现父子过程的连贯。

建设连贯之后能够自在的通信了,IPC 通道是应用命名管道或者 Domain Socket 创立的,属于双向通信。并且它是在零碎内核中实现的过程通信

⚠️ 只有在启动的子过程是 Node 过程时,子过程才会依据环境变量去连贯对应的 IPC 通道,对于其余类型的子过程则无奈实现过程间通信,除非其余过程也按着该约定去连贯这个 IPC 通道。

unix domain socket

是什么

咱们晓得经典的通信形式是有 Socket,咱们平时熟知的 Socket 是基于网络协议的,用于两个不同主机上的两个过程通信,通信须要指定 IP/Host 等。
但如果咱们同一台主机上的两个过程想要通信,如果应用 Socket 须要指定 IP/Host,通过网络协议等,会显得过于繁琐。所以 Unix Domain Socket 诞生了。

UDS 的劣势:

  • 绑定 socket 文件而不是绑定 IP/Host;不须要通过网络协议,而是数据的拷贝
  • 也反对 SOCK_STREAM(流套接字)和 SOCK_DGRAM(数据包套接字),但因为是在本机通过内核通信,不会丢包也不会呈现发送包的秩序和接管包的秩序不统一的问题

如何实现

流程图

Server 端
int main(int argc, char *argv[])
{
    int server_fd ,ret, client_fd;
    struct sockaddr_un serv, client;
    socklen_t len = sizeof(client);
    char buf[1024] = {0};
    int recvlen;

    // 创立 socket
    server_fd = socket(AF_LOCAL, SOCK_STREAM, 0);

    // 初始化 server 信息
    serv.sun_family = AF_LOCAL;
    strcpy(serv.sun_path, "server.sock");

    // 绑定
    ret = bind(server_fd, (struct sockaddr *)&serv, sizeof(serv));

    // 设置监听,设置可能同时和服务端连贯的客户端数量
    ret = listen(server_fd, 36);

    // 期待客户端连贯
    client_fd = accept(server_fd, (struct sockaddr *)&client, &len);
    printf("=====client bind file:%s\n", client.sun_path);

    while (1) {recvlen = recv(client_fd, buf, sizeof(buf), 0);
        if (recvlen == -1) {perror("recv error");
            return -1;
        } else if (recvlen == 0) {printf("client disconnet...\n");
            close(client_fd);
            break;
        } else {printf("recv buf %s\n", buf);
            send(client_fd, buf, recvlen, 0);
        }
    }

    close(client_fd);
    close(server_fd);
    return 0;
}
Client 端
int main(int argc, char *argv[])
{
    int client_fd ,ret;
    struct sockaddr_un serv, client;
    socklen_t len = sizeof(client);
    char buf[1024] = {0};
    int recvlen;

    // 创立 socket
    client_fd = socket(AF_LOCAL, SOCK_STREAM, 0);

    // 给客户端绑定一个套接字文件
    client.sun_family = AF_LOCAL;
    strcpy(client.sun_path, "client.sock");
    ret = bind(client_fd, (struct sockaddr *)&client, sizeof(client));

    // 初始化 server 信息
    serv.sun_family = AF_LOCAL;
    strcpy(serv.sun_path, "server.sock");
    // 连贯
    connect(client_fd, (struct sockaddr *)&serv, sizeof(serv));

    while (1) {fgets(buf, sizeof(buf), stdin);
        send(client_fd, buf, strlen(buf)+1, 0);

        recv(client_fd, buf, sizeof(buf), 0);
        printf("recv buf %s\n", buf);
    }

    close(client_fd);
    return 0;
}

命名管道(Named Pipe)

是什么

命名管道是能够在同一台计算机的不同过程之间,或者逾越一个网络的不同计算机的不同过程之间的牢靠的单向或者双向的数据通信。
创立命名管道的过程被称为管道服务端(Pipe Server),连贯到这个管道的过程称为管道客户端(Pipe Client)。

命名管道的命名标准:\server\pipe[\path]\name

  • 其中 server 指定一个服务器的名字,本机实用 . 示意,\192.10.10.1 示意网络上的服务器
  • \pipe 是一个不可变动的字串,用于指定该文件属于 NPFS(Named Pipe File System)
  • [\path]\name 是惟一命名管道名称的标识

怎么实现

流程图

Pipe Server
void ServerTest()
{
    HANDLE  serverNamePipe;
    char    pipeName[MAX_PATH] = {0};
    char    szReadBuf[MAX_BUFFER] = {0};
    char    szWriteBuf[MAX_BUFFER] = {0};
    DWORD   dwNumRead = 0;
    DWORD   dwNumWrite = 0;

    strcpy(pipeName, "\\\\.\\pipe\\shuangxuPipeTest");
    // 创立管道实例
    serverNamePipe = CreateNamedPipeA(pipeName,
        PIPE_ACCESS_DUPLEX|FILE_FLAG_WRITE_THROUGH,
        PIPE_TYPE_BYTE|PIPE_READMODE_BYTE|PIPE_WAIT,
        PIPE_UNLIMITED_INSTANCES, 0, 0, 0, NULL);
    WriteLog("创立管道胜利...");
    // 期待客户端连贯
    BOOL bRt= ConnectNamedPipe(serverNamePipe, NULL);
    WriteLog("收到客户端的连贯胜利...");
    // 接收数据
    memset(szReadBuf, 0, MAX_BUFFER);
    bRt = ReadFile(serverNamePipe, szReadBuf, MAX_BUFFER-1, &dwNumRead, NULL);
    // 业务逻辑解决(只为测试用返回原来的数据)WriteLog("收到客户数据:[%s]", szReadBuf);
    // 发送数据
    if(!WriteFile(serverNamePipe, szWriteBuf, dwNumRead, &dwNumWrite, NULL) )
    {WriteLog("向客户写入数据失败:[%#x]", GetLastError());
        return ;
    }
    WriteLog("写入数据胜利...");
}
Pipe Client
void ClientTest()
{char    pipeName[MAX_PATH] = {0};
    HANDLE  clientNamePipe;
    DWORD   dwRet;
    char    szReadBuf[MAX_BUFFER] = {0};
    char    szWriteBuf[MAX_BUFFER] = {0};
    DWORD   dwNumRead = 0;
    DWORD   dwNumWrite = 0;

    strcpy(pipeName, "\\\\.\\pipe\\shuangxuPipeTest");
    // 检测管道是否可用
    if(!WaitNamedPipeA(pipeName, 10000)){WriteLog("管道 [%s] 无奈关上", pipeName);
        return ;
    }
    // 连贯管道
    clientNamePipe = CreateFileA(pipeName,
        GENERIC_READ|GENERIC_WRITE,
        0,
        NULL,
        OPEN_EXISTING,
        FILE_ATTRIBUTE_NORMAL,
        NULL);
    WriteLog("管道连贯胜利...");
    scanf("%s", szWritebuf);
    // 发送数据
    if(!WriteFile(clientNamePipe, szWriteBuf, strlen(szWriteBuf), &dwNumWrite, NULL)){WriteLog("发送数据失败,GetLastError=[%#x]", GetLastError());
        return ;
    }
    printf("发送数据胜利:%s\n", szWritebuf);
    // 接收数据
    if(!ReadFile(clientNamePipe, szReadBuf, MAX_BUFFER-1, &dwNumRead, NULL)){WriteLog("接收数据失败,GetLastError=[%#x]", GetLastError());
        return ;
    }
    WriteLog("接管到服务器返回:%s", szReadBuf);
    // 敞开管道
    CloseHandle(clientNamePipe);
}

Node 创立子过程的流程

Unix

对于创立子过程、创立管道、重定向管道均是在 c++ 层实现的

创立子过程

int main(int argc,char *argv[]){pid_t pid = fork();
    if (pid < 0) {// 谬误} else if(pid == 0) {// 子过程} else {// 父过程}
}

创立管道

应用 socketpair 创立管道,其创立进去的管道是全双工的,返回的文件描述符中的任何一个都可读和可写

int main ()
{int fd[2];
    int r = socketpair(AF_UNIX, SOCK_STREAM, 0, fd);

    if (fork()){ /* 父过程 */
        int val = 0;
        close(fd[1]);
        while (1){sleep(1);
            ++val;
            printf("发送数据: %d\n", val);
            write(fd[0], &val, sizeof(val));
            read(fd[0], &val, sizeof(val));
            printf("接收数据: %d\n", val);
        }
    } else {  /* 子过程 */
        int val;
        close(fd[0]);
        while(1){read(fd[1], &val, sizeof(val));
            ++val;
            write(fd[1], &val, sizeof(val));
        }
    }
}

当咱们应用 socketpair 创立了管道之后,父过程敞开了 fd[1],子过程敞开了 fd[0]。子过程能够通过 fd[1] 读写数据;同理主过程通过 fd[0]读写数据实现通信。

child_process.fork 的具体调用

fork 函数开启一个子过程的流程

  • 初始化参数中的 options.stdio,并且调用 spawn 函数

    function spawn(file, args, options) {const child = new ChildProcess();
    
      child.spawn(options);
    }
  • 创立 ChildProcess 实例,创立子过程也是调用 C++ 层 this._handle.spawn 办法

    function ChildProcess() {
        // C++ 层定义
        this._handle = new Process();}
  • 通过 child.spawn 调用到 ChildProcess.prototype.spawn 办法中。其中 getValidStdio 办法会依据 options.stdio 创立和 C++ 交互的 Pipe 对象,并取得对应的文件描述符,将文件描述符写入到环境变量 NODE_CHANNEL_FD 中,调用 C++ 层创立子过程,在调用 setupChannel 办法

    ChildProcess.prototype.spawn = function(options) {
      // 预处理过程间通信的数据结构
        stdio = getValidStdio(stdio, false);
        const ipc = stdio.ipc;
        const ipcFd = stdio.ipcFd;
        // 将文件描述符写入环境变量中
        if (ipc !== undefined) {ArrayPrototypePush(options.envPairs, `NODE_CHANNEL_FD=${ipcFd}`);
      }
        // 创立过程
        const err = this._handle.spawn(options);
        // 增加 send 办法和监听 IPC 中数据
        if (ipc !== undefined) setupChannel(this, ipc, serialization);
    }
  • 子过程启动时,会依据环境变量中是否存在 NODE_CHANNEL_FD 判断是否调用 _forkChild 办法,创立一个 Pipe 对象, 同时调用 open 办法关上对应的文件描述符,在调用 setupChannel

    function _forkChild(fd, serializationMode) {const p = new Pipe(PipeConstants.IPC);
      p.open(fd);
      p.unref();
      const control = setupChannel(process, p, serializationMode);
    }

句柄传递

setupChannel
次要是实现了解决接管的音讯、发送音讯、解决文件描述符传递等

function setipChannel(){channel.onread = function(arrayBuffer){//...}
    target.on('internalMessage', function(message, handle){//...})
    target.send = function(message, handle, options, callback){//...}
    target._send = function(message, handle, options, callback){//...}
    function handleMessage(message, handle, internal){//...}
}
  • target.send: process.send 办法,这里 target 就是过程对象自身.
  • target._send: 执行具体 send 逻辑的函数, 当参数 handle 不存在时, 示意一般的消息传递;若存在,包装为外部对象,表明是一个 internalMessage 事件触发。调用应用 JSON.stringify 序列化对象, 应用 channel.writeUtf8String 写入文件描述符中
  • channel.onread: 获取到数据时触发, 跟 channel.writeUtf8String 绝对应。通过 JSON.parse 反序列化 message 之后, 调用 handleMessage 进而触发对应事件
  • handleMessage: 用来判断是触发 message 事件还是 internalMessage 事件
  • target.on(‘internalMessage’): 针对外部对象做非凡解决,在调用 message 事件

过程间消息传递

  • 父过程通过 child.send 发送音讯 和 server/socket 句柄对象
  • 一般音讯间接 JSON.stringify 序列化;对于句柄对象来说,须要先包装成为外部对象

    message = {
        cmd: 'NODE_HANDLE',
        type: null,
        msg: message
    };

    通过 handleConversion.[message.type].send 的办法取出句柄对象对应的 C++ 层面的 TCP 对象,在采纳 JSON.stringify 序列化

    const handleConversion = {
        'net.Server': {
        simultaneousAccepts: true,
    
        send(message, server, options) {return server._handle;},
    
        got(message, handle, emit) {const server = new net.Server();
          server.listen(handle, () => {emit(server);
          });
        }
      }
    //....
    }
  • 最初将序列化后的外部对象和 TCP 对象写入到 IPC 通道中
  • 子过程在接管到音讯之后,应用 JSON.parse 反序列化音讯,如果为外部对象触发 internalMessage 事件
  • 查看是否带有 TCP 对象,通过 handleConversion.[message.type].got 失去和父过程一样的句柄对象
  • 最初发触发 message 事件传递解决好的音讯和句柄对象,子过程通过 process.on 接管

正文完
 0