关于postgresql:GreenPlum数据库网络层Segment空闲后端进程IdleQE

6次阅读

共计 6975 个字符,预计需要花费 18 分钟才能阅读完成。


QD(Query Dispatcher、查问调度器):Master 节点上负责解决用户查问申请的过程称为 QD(PostgreSQL 中称之为 Backend 过程)。QD 收到用户发来的 SQL 申请后,进行解析、重写和优化,将优化后的并行打算分发给每个 segment 上执行,并将最终后果返回给用户。此外还负责整个 SQL 语句波及到的所有的 QE 过程间的通信管制和协调,譬如某个 QE 执行时呈现谬误时,QD 负责收集谬误详细信息,并勾销所有其余 QEs;如果 LIMIT n 语句曾经满足,则停止所有 QE 的执行等。QD 的入口是 exec_simple_query()。

QE(Query Executor、查问执行器):Segment 上负责执行 QD 散发来的查问工作的过程称为 QE。Segment 实例运行的也是一个 PostgreSQL,所以对于 QE 而言,QD 是一个 PostgreSQL 的客户端,它们之间通过 PostgreSQL 规范的 libpq 协定进行通信。对于 QD 而言,QE 是负责执行其查问申请的 PostgreSQL Backend 过程。通常 QE 执行整个查问的一部分(称为 Slice)。QE 的入口是 exec_mpp_query()。

QD 和 QE 都是 PostgreSQL backend 过程,其执行逻辑十分类似。对于数据操作(DML)语句(数据定义语句的执行逻辑更简略),其外围执行逻辑由 ExecutorStart, ExecutorRun, ExecutorEnd 实现。

  • ExecutorStart 负责执行器的初始化和启动。Greenplum 通过 CdbDispatchPlan 把残缺的查问打算发送给每个 Gang 中的每个 QE 过程。Greenplum 有两种发送打算给 QE 的形式:1)异步形式,应用 libpq 的异步 API 以非阻塞形式发送查问打算给 QE;2)同步多线程形式:应用 libpq 的同步 API,应用多个线程同时发送查问打算给 QE。GUC gp_connections_per_thread 管制应用线程数量,缺省值为 0,示意采纳异步形式。Greenplum 从 6.0 开始去掉了异步形式。
  • ExecutorRun 启动执行器,执行查问树中每个算子的代码,并以火山模型(volcano)格调返回后果元组给客户端。在 QD 上,ExecutorRun 调用 ExecutePlan 解决查问树,该查问树的最下端的节点是一个 Motion 算子。其对应的函数为 ExecMotion,该函数期待来自于各个 QE 的后果。QD 取得来自于 QE 的元组后,执行某些必要操作(譬如排序)而后返回给最终用户。
  • ExecutorEnd 负责执行器的清理工作,包含查看后果,敞开 interconnect 连贯等。

E 上的 ExecutorStart/ExecutorRun/ExecutorEnd 函数和单节点的 PostgreSQL 代码逻辑相似。次要的区别在 QE 执行的是 Greenplum 分布式打算中的一个 slice,因此其查问树的根节点肯定是个 Motion 节点。其对应的执行函数为 ExecMotion,该算子从查问树下部取得元组,并依据 Motion 的类型发送给不同的接管方。低一级的 Gang 的 QE 把 Motion 节点的后果元组发送给上一级 Gang 的 QE,最顶层 Gang 的 QE 的 Motion 会把后果元组发送给 QD。Motion 的 Flow 类型确定了数据传输的形式,有两种:播送和重散布。播送形式将数据发送给上一级 Gang 的每一个 QE;重散布形式将数据依据重散布键计算其对应的 QE 解决节点,并发送给该 QE。

QD 和 QE 之间有两种类型的网络连接:
libpq:QD 通过 libpq 与各个 QE 间传输管制信息,包含发送查问打算、收集错误信息、解决勾销操作等。libpq 是 PostgreSQL 的标准协议,Greenplum 对该协定进行了加强,譬如新增了‘M’音讯类型(QD 应用该音讯发送查问打算给 QE)。libpq 是基于 TCP 的。
interconnect:QD 和 QE、QE 和 QE 之间的表元组数据传输通过 interconnect 实现。Greenplum 有两种 interconnect 实现形式,一种基于 TCP,一种基于 UDP。缺省形式为 UDP interconnect 连贯形式。

这里咱们学习 QD 上缓存的 Segment 闲暇后端过程 IdleQE 列表,其实就是代表之前应用 libpq 连贯的 segment 的 QE 后端。次要目标就是缩小 QD 应用 libpq 连贯 QE 后端的工夫,能够间接应用之前连贯过的 Segment 闲暇后端过程 IdleQE,只须要确认连贯失常就能够重复使用了。

IdleQE

cdbcomponent_allocateIdleQE 函数用于调配一个 segdb,如果 freelist 中有闲暇的 segdb,则返回,否则,初始化一个新的 segdb。idle segdbs 曾经与 segment 建设了连贯,但新的 segdb 尚未建设,调用者须要本人建设连贯。函数流程如下所示:
通过 cdbcomponent_getComponentInfo 获取 CdbComponentDatabases 中的 segment_db_info 或 entry_db_info 中的 CdbComponentDatabaseInfo 构造体,其就是 primary segment 或 master 在 QD 上获取的信息结构体。
获取 CdbComponentDatabaseInfo 构造体的 freelist 中的第一个 IdleQE SegmentDatabaseDescriptor,如果缓冲的 IdleQE 的类型 WRITER/READER 不是 cdbcomponent_allocateIdleQE 函数申请的类型,就持续断定 freelist 下一个 SegmentDatabaseDescriptor。
如果获取到符合要求的 SegmentDatabaseDescriptor,从 freelist 中删除该元素,递加 numIdleQEs。
如果没有获取到 Segment 闲暇后端过程 IdleQE,则调用 cdbconn_createSegmentDescriptor 函数创立一个新的 SegmentDatabaseDescriptor
递增 numActiveQEs

SegmentDatabaseDescriptor *cdbcomponent_allocateIdleQE(int contentId, SegmentType segmentType) {
    SegmentDatabaseDescriptor    *segdbDesc = NULL;
    CdbComponentDatabaseInfo    *cdbinfo;
    ListCell                     *curItem = NULL;
    ListCell                     *nextItem = NULL;
    ListCell                    *prevItem = NULL;
    MemoryContext                 oldContext;
    bool                        isWriter;
    cdbinfo = cdbcomponent_getComponentInfo(contentId);    
    oldContext = MemoryContextSwitchTo(CdbComponentsContext);

    /* Always try to pop from the head.  Make sure to push them back to head in cdbcomponent_recycleIdleQE(). */
    curItem = list_head(cdbinfo->freelist);
    while (curItem != NULL) {SegmentDatabaseDescriptor *tmp = (SegmentDatabaseDescriptor *)lfirst(curItem);
        nextItem = lnext(curItem);
        if ((segmentType == SEGMENTTYPE_EXPLICT_WRITER && !tmp->isWriter) || (segmentType == SEGMENTTYPE_EXPLICT_READER && tmp->isWriter)) {
            prevItem = curItem;
            curItem = nextItem;
            continue;
        }
        cdbinfo->freelist = list_delete_cell(cdbinfo->freelist, curItem, prevItem);         
        DECR_COUNT(cdbinfo, numIdleQEs); /* update numIdleQEs */
        segdbDesc = tmp;
        break;
    }

    if (!segdbDesc) {
        /* 1. for entrydb, it's never be writer.
         * 2. for first QE, it must be a writer. 第一个 QE,必须是 writer 
         */
        isWriter = contentId == -1 ? false: (cdbinfo->numIdleQEs == 0 && cdbinfo->numActiveQEs == 0);
        segdbDesc = cdbconn_createSegmentDescriptor(cdbinfo, nextQEIdentifer(cdbinfo->cdbs), isWriter);
    }
    cdbconn_setQEIdentifier(segdbDesc, -1);
    INCR_COUNT(cdbinfo, numActiveQEs);
    MemoryContextSwitchTo(oldContext);
    return segdbDesc;
}

cdbcomponent_recycleIdleQE 函数用于回收应用完的 IdleQE。首先递加 numActiveQEs,如果指定须要捣毁该 IdleQE,或者该 IdleQE 因为某些起因无奈重用,则须要进入清理该 IdleQE 的流程。如果闲暇 freelist 链表的长度大于 gp_cached_gang_threshold,则该 IdleQE 也须要进行清理。重用流程:如果是 Writer IdleQE,则须要将其退出 freelist 头部;如果是 Reader IdleQE,在 cdbcomponent_allocateIdleQE() 中,Reader 总是从头部弹出,因而要复原原始程序,咱们必须将它们推回头部,并记住读取器必须放在 Writer 之后;递增 numIdleQEs。捣毁流程:调用 cdbconn_termSegmentDescriptor 清理 libpq 连贯,如果是 Writer IdleQE,调用 markCurrentGxactWriterGangLost 函数设置 MyTmGxactLocal->writerGangLost 为 true。

void cdbcomponent_recycleIdleQE(SegmentDatabaseDescriptor *segdbDesc, bool forceDestroy) {
    CdbComponentDatabaseInfo    *cdbinfo;
    MemoryContext                oldContext;    
    int                            maxLen;
    bool                        isWriter;

    cdbinfo = segdbDesc->segment_database_info;
    isWriter = segdbDesc->isWriter;
    
    DECR_COUNT(cdbinfo, numActiveQEs); /* update num of active QEs */
    oldContext = MemoryContextSwitchTo(CdbComponentsContext);

    if (forceDestroy || !cleanupQE(segdbDesc)) goto destroy_segdb;
    /* If freelist length exceed gp_cached_gang_threshold, destroy it */
    maxLen = segdbDesc->segindex == -1 ? MAX_CACHED_1_GANGS : gp_cached_gang_threshold;
    if (!isWriter && list_length(cdbinfo->freelist) >= maxLen) goto destroy_segdb;

    /* Recycle the QE, put it to freelist */
    if (isWriter) { /* writer is always the header of freelist */        
        segdbDesc->segment_database_info->freelist = lcons(segdbDesc, segdbDesc->segment_database_info->freelist);
    } else {
        ListCell   *lastWriter = NULL;
        ListCell   *cell;

        /* In cdbcomponent_allocateIdleQE() readers are always popped from the head, so to restore the original order we must pushed them back to the head, and keep in mind readers must be put after the writers. 在 cdbcomponent_allocateIdleQE() 中,reader 总是从头部弹出,因而要复原原始程序,咱们必须将它们推回头部,并记住读取器必须放在写入器之后。*/
        for (cell = list_head(segdbDesc->segment_database_info->freelist); cell && ((SegmentDatabaseDescriptor *) lfirst(cell))->isWriter; lastWriter = cell, cell = lnext(cell)) ;
        if (lastWriter) lappend_cell(segdbDesc->segment_database_info->freelist, lastWriter, segdbDesc);
        else segdbDesc->segment_database_info->freelist = lcons(segdbDesc, segdbDesc->segment_database_info->freelist);
    }
    INCR_COUNT(cdbinfo, numIdleQEs);
    MemoryContextSwitchTo(oldContext);
    return;

destroy_segdb:
    cdbconn_termSegmentDescriptor(segdbDesc);
    if (isWriter) {markCurrentGxactWriterGangLost();
    }

    MemoryContextSwitchTo(oldContext);
}

QEIdentifer

CdbComponentDatabases 的 freeCounterList 中保留了曾经开释可重用的 QEIdentifier,这是一个 int 类型的值,用于调配给 SegmentDatabaseDescriptor 构造体中的 identifier 成员。nextQEIdentifer 就是用于调配该 QEIdentifier,如果 freeCounterList 有闲暇的 QEIdentifier,就应用第一个;如果 freeCounterList 为闲暇的,则递增 CdbComponentDatabases 的 qeCounter 并返回。

static int nextQEIdentifer(CdbComponentDatabases *cdbs) {
    int result;
    if (!cdbs->freeCounterList) return cdbs->qeCounter++;
    result = linitial_int(cdbs->freeCounterList);
    cdbs->freeCounterList = list_delete_first(cdbs->freeCounterList);
    return result;
}

cdbconn_termSegmentDescriptor 函数用于清理 libpq 连贯,并将 segdbDesc->identifier 放回 CdbComponentDatabases 的 freeCounterList 中。

void cdbconn_termSegmentDescriptor(SegmentDatabaseDescriptor *segdbDesc) {
    CdbComponentDatabases *cdbs;
    cdbs = segdbDesc->segment_database_info->cdbs;
    /* put qe identifier to free list for reuse */
    cdbs->freeCounterList = lappend_int(cdbs->freeCounterList, segdbDesc->identifier);
    cdbconn_disconnect(segdbDesc);
    if (segdbDesc->whoami != NULL){pfree(segdbDesc->whoami);
        segdbDesc->whoami = NULL;
    }
    pfree(segdbDesc);
}

https://blog.csdn.net/gp_comm…

正文完
 0