QD(Query Dispatcher、查问调度器):Master 节点上负责解决用户查问申请的过程称为 QD(PostgreSQL 中称之为 Backend 过程)。 QD 收到用户发来的 SQL 申请后,进行解析、重写和优化,将优化后的并行打算分发给每个 segment 上执行,并将最终后果返回给用户。此外还负责整个 SQL 语句波及到的所有的QE过程间的通信管制和协调,譬如某个 QE 执行时呈现谬误时,QD 负责收集谬误详细信息,并勾销所有其余 QEs;如果 LIMIT n 语句曾经满足,则停止所有 QE 的执行等。QD 的入口是 exec_simple_query()。
QE(Query Executor、查问执行器):Segment 上负责执行 QD 散发来的查问工作的过程称为 QE。Segment 实例运行的也是一个 PostgreSQL,所以对于 QE 而言,QD 是一个 PostgreSQL 的客户端,它们之间通过 PostgreSQL 规范的libpq 协定进行通信。对于 QD 而言,QE 是负责执行其查问申请的PostgreSQL Backend过程。通常 QE 执行整个查问的一部分(称为 Slice)。QE 的入口是 exec_mpp_query()。
QD 和 QE 都是 PostgreSQL backend 过程,其执行逻辑十分类似。对于数据操作(DML)语句(数据定义语句的执行逻辑更简略),其外围执行逻辑由 ExecutorStart, ExecutorRun, ExecutorEnd 实现。
- ExecutorStart 负责执行器的初始化和启动。Greenplum 通过 CdbDispatchPlan 把残缺的查问打算发送给每个 Gang 中的每个 QE 过程。Greenplum 有两种发送打算给 QE 的形式:1)异步形式,应用 libpq 的异步 API 以非阻塞形式发送查问打算给QE;2)同步多线程形式:应用 libpq 的同步 API,应用多个线程同时发送查问打算给 QE。GUC gp_connections_per_thread 管制应用线程数量,缺省值为0,示意采纳异步形式。Greenplum 从6.0开始去掉了异步形式。
- ExecutorRun 启动执行器,执行查问树中每个算子的代码,并以火山模型(volcano)格调返回后果元组给客户端。在 QD 上,ExecutorRun 调用 ExecutePlan 解决查问树,该查问树的最下端的节点是一个 Motion 算子。其对应的函数为 ExecMotion,该函数期待来自于各个 QE 的后果。QD 取得来自于 QE 的元组后,执行某些必要操作(譬如排序)而后返回给最终用户。
- ExecutorEnd 负责执行器的清理工作,包含查看后果,敞开 interconnect 连贯等。
E上的ExecutorStart/ExecutorRun/ExecutorEnd函数和单节点的PostgreSQL代码逻辑相似。次要的区别在 QE 执行的是 Greenplum 分布式打算中的一个 slice,因此其查问树的根节点肯定是个 Motion 节点。其对应的执行函数为 ExecMotion,该算子从查问树下部取得元组,并依据 Motion 的类型发送给不同的接管方。低一级的 Gang 的QE把 Motion 节点的后果元组发送给上一级 Gang 的QE,最顶层 Gang 的 QE 的 Motion 会把后果元组发送给 QD。Motion 的 Flow 类型确定了数据传输的形式,有两种:播送和重散布。播送形式将数据发送给上一级 Gang的每一个 QE;重散布形式将数据依据重散布键计算其对应的QE解决节点,并发送给该 QE。
QD 和 QE 之间有两种类型的网络连接:
libpq:QD 通过 libpq 与各个QE间传输管制信息,包含发送查问打算、收集错误信息、解决勾销操作等。libpq 是 PostgreSQL 的标准协议,Greenplum 对该协定进行了加强,譬如新增了 ‘M’ 音讯类型 (QD 应用该音讯发送查问打算给 QE)。libpq 是基于 TCP 的。
interconnect:QD 和 QE、QE 和 QE 之间的表元组数据传输通过 interconnect 实现。Greenplum 有两种 interconnect 实现形式,一种基于 TCP,一种基于UDP。缺省形式为 UDP interconnect 连贯形式。
这里咱们学习QD上缓存的Segment闲暇后端过程IdleQE列表,其实就是代表之前应用libpq连贯的segment的QE后端。次要目标就是缩小QD应用libpq连贯QE后端的工夫,能够间接应用之前连贯过的Segment闲暇后端过程IdleQE,只须要确认连贯失常就能够重复使用了。
IdleQE
cdbcomponent_allocateIdleQE函数用于调配一个segdb,如果freelist中有闲暇的segdb,则返回,否则,初始化一个新的segdb。idle segdbs 曾经与 segment 建设了连贯,但新的 segdb 尚未建设,调用者须要本人建设连贯。函数流程如下所示:
通过cdbcomponent_getComponentInfo获取CdbComponentDatabases中的segment_db_info或entry_db_info中的CdbComponentDatabaseInfo构造体,其就是primary segment或master在QD上获取的信息结构体。
获取CdbComponentDatabaseInfo构造体的freelist中的第一个IdleQE SegmentDatabaseDescriptor,如果缓冲的IdleQE的类型WRITER/READER不是cdbcomponent_allocateIdleQE函数申请的类型,就持续断定freelist下一个SegmentDatabaseDescriptor。
如果获取到符合要求的SegmentDatabaseDescriptor,从freelist中删除该元素,递加numIdleQEs。
如果没有获取到Segment闲暇后端过程IdleQE,则调用cdbconn_createSegmentDescriptor函数创立一个新的SegmentDatabaseDescriptor
递增numActiveQEs
SegmentDatabaseDescriptor *cdbcomponent_allocateIdleQE(int contentId, SegmentType segmentType) { SegmentDatabaseDescriptor *segdbDesc = NULL; CdbComponentDatabaseInfo *cdbinfo; ListCell *curItem = NULL; ListCell *nextItem = NULL; ListCell *prevItem = NULL; MemoryContext oldContext; bool isWriter; cdbinfo = cdbcomponent_getComponentInfo(contentId); oldContext = MemoryContextSwitchTo(CdbComponentsContext); /* Always try to pop from the head. Make sure to push them back to head in cdbcomponent_recycleIdleQE(). */ curItem = list_head(cdbinfo->freelist); while (curItem != NULL) { SegmentDatabaseDescriptor *tmp = (SegmentDatabaseDescriptor *)lfirst(curItem); nextItem = lnext(curItem); if ((segmentType == SEGMENTTYPE_EXPLICT_WRITER && !tmp->isWriter) || (segmentType == SEGMENTTYPE_EXPLICT_READER && tmp->isWriter)) { prevItem = curItem; curItem = nextItem; continue; } cdbinfo->freelist = list_delete_cell(cdbinfo->freelist, curItem, prevItem); DECR_COUNT(cdbinfo, numIdleQEs); /* update numIdleQEs */ segdbDesc = tmp; break; } if (!segdbDesc) { /* 1. for entrydb, it's never be writer. * 2. for first QE, it must be a writer. 第一个QE,必须是writer */ isWriter = contentId == -1 ? false: (cdbinfo->numIdleQEs == 0 && cdbinfo->numActiveQEs == 0); segdbDesc = cdbconn_createSegmentDescriptor(cdbinfo, nextQEIdentifer(cdbinfo->cdbs), isWriter); } cdbconn_setQEIdentifier(segdbDesc, -1); INCR_COUNT(cdbinfo, numActiveQEs); MemoryContextSwitchTo(oldContext); return segdbDesc;}
cdbcomponent_recycleIdleQE函数用于回收应用完的IdleQE。首先递加numActiveQEs,如果指定须要捣毁该IdleQE,或者该IdleQE因为某些起因无奈重用,则须要进入清理该IdleQE的流程。如果闲暇freelist链表的长度大于gp_cached_gang_threshold,则该IdleQE也须要进行清理。重用流程:如果是Writer IdleQE,则须要将其退出freelist头部;如果是Reader IdleQE,在 cdbcomponent_allocateIdleQE() 中,Reader总是从头部弹出,因而要复原原始程序,咱们必须将它们推回头部,并记住读取器必须放在Writer之后;递增numIdleQEs。捣毁流程:调用cdbconn_termSegmentDescriptor清理libpq连贯,如果是Writer IdleQE,调用markCurrentGxactWriterGangLost函数设置MyTmGxactLocal->writerGangLost为true。
void cdbcomponent_recycleIdleQE(SegmentDatabaseDescriptor *segdbDesc, bool forceDestroy) { CdbComponentDatabaseInfo *cdbinfo; MemoryContext oldContext; int maxLen; bool isWriter; cdbinfo = segdbDesc->segment_database_info; isWriter = segdbDesc->isWriter; DECR_COUNT(cdbinfo, numActiveQEs); /* update num of active QEs */ oldContext = MemoryContextSwitchTo(CdbComponentsContext); if (forceDestroy || !cleanupQE(segdbDesc)) goto destroy_segdb; /* If freelist length exceed gp_cached_gang_threshold, destroy it */ maxLen = segdbDesc->segindex == -1 ? MAX_CACHED_1_GANGS : gp_cached_gang_threshold; if (!isWriter && list_length(cdbinfo->freelist) >= maxLen) goto destroy_segdb; /* Recycle the QE, put it to freelist */ if (isWriter) { /* writer is always the header of freelist */ segdbDesc->segment_database_info->freelist = lcons(segdbDesc, segdbDesc->segment_database_info->freelist); } else { ListCell *lastWriter = NULL; ListCell *cell; /* In cdbcomponent_allocateIdleQE() readers are always popped from the head, so to restore the original order we must pushed them back to the head, and keep in mind readers must be put after the writers. 在 cdbcomponent_allocateIdleQE() 中,reader总是从头部弹出,因而要复原原始程序,咱们必须将它们推回头部,并记住读取器必须放在写入器之后。 */ for (cell = list_head(segdbDesc->segment_database_info->freelist); cell && ((SegmentDatabaseDescriptor *) lfirst(cell))->isWriter; lastWriter = cell, cell = lnext(cell)) ; if (lastWriter) lappend_cell(segdbDesc->segment_database_info->freelist, lastWriter, segdbDesc); else segdbDesc->segment_database_info->freelist = lcons(segdbDesc, segdbDesc->segment_database_info->freelist); } INCR_COUNT(cdbinfo, numIdleQEs); MemoryContextSwitchTo(oldContext); return;destroy_segdb: cdbconn_termSegmentDescriptor(segdbDesc); if (isWriter) { markCurrentGxactWriterGangLost(); } MemoryContextSwitchTo(oldContext);}
QEIdentifer
CdbComponentDatabases的freeCounterList中保留了曾经开释可重用的QEIdentifier,这是一个int类型的值,用于调配给SegmentDatabaseDescriptor构造体中的identifier成员。nextQEIdentifer就是用于调配该QEIdentifier,如果freeCounterList有闲暇的QEIdentifier,就应用第一个;如果freeCounterList为闲暇的,则递增CdbComponentDatabases的qeCounter并返回。
static int nextQEIdentifer(CdbComponentDatabases *cdbs) { int result; if (!cdbs->freeCounterList) return cdbs->qeCounter++; result = linitial_int(cdbs->freeCounterList); cdbs->freeCounterList = list_delete_first(cdbs->freeCounterList); return result;}
cdbconn_termSegmentDescriptor函数用于清理libpq连贯,并将segdbDesc->identifier放回CdbComponentDatabases的freeCounterList中。
void cdbconn_termSegmentDescriptor(SegmentDatabaseDescriptor *segdbDesc) { CdbComponentDatabases *cdbs; cdbs = segdbDesc->segment_database_info->cdbs; /* put qe identifier to free list for reuse */ cdbs->freeCounterList = lappend_int(cdbs->freeCounterList, segdbDesc->identifier); cdbconn_disconnect(segdbDesc); if (segdbDesc->whoami != NULL){ pfree(segdbDesc->whoami); segdbDesc->whoami = NULL; } pfree(segdbDesc);}
https://blog.csdn.net/gp_comm...