作者:冉攀峰,StarRocks 外围研发,知乎账号 satanson
导读:欢送来到 StarRocks 技术底细系列文章,咱们将为你全方位揭晓 StarRocks 背地的技术原理和实际细节,助你疾速上手这款明星开源数据库产品。本期 StarRocks 技术底细将次要介绍 StarRocks Pipeline 执行框架的基本概念、原理及代码逻辑。
StarRocks Pipeline 执行框架(上)篇中,次要为大家解说了 Pipeline 执行引擎想解决的问题及一般性原理。对于 Pipeline 执行引擎的实现,BE 端拆分 Pipeline 的逻辑,以及 Pipeline 实例 PipelineDriver 的调度执行逻辑,将在本篇中持续与大家分享。
#01
背景介绍
—
详见 StarRocks Pipeline 执行框架(上)篇
#02
基本概念
—
详见 StarRocks Pipeline 执行框架(上)篇
#03
源码解析
—
章节二的基本概念输出完当前,咱们开始从以下几个方面解析 StarRocks 的源码:
- BE 初始化 Pipeline 执行引擎:次要介绍 BE 启动后,如何初始化 Pipeline 执行引擎的全局资源。
- BE 端 Query 生命周期治理:次要介绍在 BE 上,如何用 QueryContext 治理所属的整体 Fragment Instance,以及 Fragment Instance 的筹备、执行和销毁逻辑。
- 物理算子拆分 Pipeline 算子:算子拆分逻辑在 Pipeline 执行引擎中占比很重,波及到每个算子的重构,后续增加新算子,须要听从肯定的准则拆分算子,为 Pipeline 算子的接口定义正确的语义。
- PipelineDriver 的调度逻辑:次要波及到 PipelineDriver 的 Ready、Blocked 和 Running 三种状态的转换。
1、BE 初始化 Pipeline 执行引擎
BE 初始化全局对象的办法次要有两种:
- 将全局对象定义在 ExecEnv 对象中,参考 be/src/runtime/exec_env.h,be/src/runtime/exec_env.cpp 文件。
- 定义全局性的单例 (Singleton) 对象,例如 be/src/exec/pipeline/query_context.cpp,如果对象自身能够独立实现初始化、不依赖参数设置、不依赖于其余对象的初始化程序,则能够定义为单例。
Pipeline 执行引擎的全局性对象
PipelineDriver 执行器
定义为 ExecEnv::_driver_executor,类型为 pipeline::GlobalDriverExecutor,次要由执行线程池和轮询线程形成。其中执行线程的数量默认为机器的硬件核数,轮询线程数量为 1。
// 源码文件: be/src/runtime/exec_env.cpp
// 函数: Status ExecEnv::_init(const std::vector<StorePath>& store_paths)
std::unique_ptr<ThreadPool> driver_executor_thread_pool;
auto max_thread_num = std::thread::hardware_concurrency();
if (config::pipeline_exec_thread_pool_thread_num > 0) {max_thread_num = config::pipeline_exec_thread_pool_thread_num;}
LOG(INFO) << strings::Substitute("[PIPELINE] Exec thread pool: thread_num=$0", max_thread_num);
RETURN_IF_ERROR(ThreadPoolBuilder("pip_executor") // pipeline executor
.set_min_threads(0)
.set_max_threads(max_thread_num)
.set_max_queue_size(1000)
.set_idle_timeout(MonoDelta::FromMilliseconds(2000))
.build(&driver_executor_thread_pool));
_driver_executor = new pipeline::GlobalDriverExecutor(std::move(driver_executor_thread_pool), false);
_driver_executor->initialize(max_thread_num)
pipeline::GlobalDriverExecutor 的构造如下:
Pipeline IO 线程池
ExecEnv._pipeline_scan_io_thread_pool,次要用于执行 ScanOperator 读数据操作的异步化 IO 工作。IO 线程池队列大小和线程数目,取决于参数:
- config::pipeline_scan_thread_pool_queue_size
- config::pipeline_scan_thread_pool_thread_num
// 源码文件: be/src/runtime/exec_env.cpp
// 函数: Status ExecEnv::_init(const std::vector<StorePath>& store_paths)
int num_io_threads = config::pipeline_scan_thread_pool_thread_num <= 0
? std::thread::hardware_concurrency()
: config::pipeline_scan_thread_pool_thread_num;
_pipeline_scan_io_thread_pool =
new PriorityThreadPool("pip_scan_io", // pipeline scan io
num_io_threads, config::pipeline_scan_thread_pool_queue_size);
WorkGroup(即资源组 ResourceGroup)执行器
定义为 ExecEnv._wg_driver_executor,WorkGroup 用于 Pipeline 执行引擎的资源隔离,次要的设计动机是为了把不同业务场景的 Workload 划分到相应的 WorkGroup 中,每个 WorkGroup 有本人的 CPU、Memory 和并行数量资源 Quota。每个 WorkGroup 依照资源 Quota 的限度复用计算资源,从而实现隔离性。(WorkGroup 的具体源码剖析请参考专门解析文档,此处提及,是为了保障本文的完整性。)WorkGroup 执行器和 PipelineDriver 执行器性能相似,实现了基于 WorkGroup 的调度逻辑。
WorkGroup Scan 执行器
定义为 ExecEnv._scan_executor,也用于 WorkGroup 性能,相似 Pipeline IO 线程池,该执行器能够依据 WorkGroup 的资源 Quota 限度,执行 ScanOperator 提交的异步化 IO 工作。
QueryContextManager
QueryContext 治理一个查问在某台执行节点上的整体 Fragment Instance,QueryContextManager 顾名思义就是对 QueryContext 进行操作,次要用于其生命周期的治理。参考源码文件:be/src/exec/pipeline/query_context.cpp。
WorkGroupManager
WorkGroupManager 用于治理 WorkGroup,详见 WorkGroup 相干文档。
2、BE 端 Query 生命周期治理
QueryContext 和 FragmentContext
计算节点 BE 为查问保护下列对象:
- QueryContext:在 QueryContextManager 中注册,领有 FragmentContextManager 对象治理 Fragment Instance。
- FragmentContext:在 QueryContext.fragment_mgr 中注册,每个 Fragment Instance 对应一个 FragmentContext。
- Pipelines:FragmentContext 蕴含一组 Pipeline,来源于 Fragment Instance 的执行子树的拆解。
- Drivers:FragmentContext 蕴含一组 PipelineDriver,PipelineDriver 通过 Pipeline 创立,来自同一个 Pipeline 的 PipelineDriver 的数量,取决于 Pipeline 并行度。
- MorselQueues:ScanOperator 和 MorselQueue 的映射表,MorselQueue 蕴含一组 Morsel,Morsel 是 ScanOperator 读取数据的分片。
QueryContext 的生命周期比 FragmentContext 生命周期久,跨所有 Fragment Instance,属于 Query 层面的对象,能够由 QueryContext 治理。比方管制 Query 内存应用 MemTracker,所有 Fragment Instance 共享的 DescriptorTable。FragmentContext 只治理 Fragment Instance 范畴的资源,次要包含 Pipelines、PipelineDrivers 和 MorselQueues。
只有当 FragmentContext 中的所有 PipelineDriver 都实现计算,FragmentContext 的生命周期才完结;只有当所有 FragmentContext 的生命周期完结,QueryContext 的生命周期才完结。QueryContext 生命周期完结后,就能够析构并且开释 QueryContext 占用的资源。
Fragment Instance 执行逻辑的入口
BE 收到来自 FE 的 exec_plan_fragment 后,创立 FragmentExecutor 执行该 Fragment Instance,代码如下:
// 文件:be/src/service/internal_service.cpp
template <typename T>
void PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController* cntl_base,
const PExecPlanFragmentRequest* request,
PExecPlanFragmentResult* response, google::protobuf::Closure* done) {ClosureGuard closure_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
auto st = _exec_plan_fragment(cntl);
if (!st.ok()) {LOG(WARNING) << "exec plan fragment failed, errmsg=" << st.get_error_msg();}
st.to_protobuf(response->mutable_status());
}
PInternalServiceImpl::exec_plan_fragment 调用 _exec_plan_fragment:
// 文件:be/src/service/internal_service.cpp
template <typename T>
Status PInternalServiceImpl<T>::_exec_plan_fragment(brpc::Controller* cntl) {auto ser_request = cntl->request_attachment().to_string();
TExecPlanFragmentParams t_request;
{const uint8_t* buf = (const uint8_t*)ser_request.data();
uint32_t len = ser_request.size();
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, TProtocolType::BINARY, &t_request));
}
bool is_pipeline = t_request.__isset.is_pipeline && t_request.is_pipeline;
LOG(INFO) << "exec plan fragment, fragment_instance_id=" << print_id(t_request.params.fragment_instance_id)
<< ", coord=" << t_request.coord << ", backend=" << t_request.backend_num
<< ", is_pipeline=" << is_pipeline << ", chunk_size=" << t_request.query_options.batch_size;
if (is_pipeline) {auto fragment_executor = std::make_unique<starrocks::pipeline::FragmentExecutor>();
auto status = fragment_executor->prepare(_exec_env, t_request);
if (status.ok()) {return fragment_executor->execute(_exec_env);
} else {return status.is_duplicate_rpc_invocation() ? Status::OK() : status;}
} else {return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
}
}
当采纳 Pipeline 执行引擎时,创立 FragmentExecutor,实现下列操作:
- 调用 FragmentExecutor::prepare 函数, 初始化 Fragment Instance 的执行环境,创立和注册 QueryContex、FragmentContext,将 Fragment Instance 拆分成 Pipelines,创立 PipelineDrivers。
- 调用 FragmentExecutor::execute 函数,向 Pipeline 执行线程提交 PipelineDrivers 运行。
FragmentExecutor::prepare 函数
参考 be/src/exec/pipeline/fragment_executor.cpp,次要的逻辑如下:
- 判断 Fragment Instance 是否为反复投递,如果是,间接返回谬误状态 Status::DuplicateRpcInvocation。
- 注册或取得已有的 QueryContext,解决 Query 的第一个 Fragment Instance 时,注册 QueryContext,后续达到的 Fragment Instance 复用已注册的 QueryContext。设置 QueryContext 须要解决的 Fragment Instance 的数量和 Query 过期工夫等参数,Query 过期工夫用于主动勾销长期得不到执行的 Query;如果 Query 有大量的 Fragment Instance,先达到的局部 Fragment Instance 实现执行而退出,在没有沉闷的 Fragment Instance 的状况下,QueryContext 仍然须要保留一段时间,等到后续 Fragment Instance 全副达到或者主动过期而勾销执行。
- 创立和初始化 FragmentContext 对象,FragmentContext 须要注册到 QueryContext.fragment_mgr 中,注册的机会为 FragmentContext::prepare 函数的开端。因为有的异步逻辑(比方 Global Runtime Filter 的投递),须要拜访 FragmentContext 的成员变量,在 FragmentContext 未实现所有的初始化之前注册,会对异步逻辑裸露 FragmentContext,导致拜访未初始化的成员变量而出错。
- 调用函数 Exec::create_tree 生成 Non-pipeline 执行树,应用 PipelineBuilder 将执行树拆解成为 Pipeline。
- 调用 convert_scan_range_to_morsel 函数将 ScanNode 须要拜访的 TScanRangeParams 转换为 ScanOperator 可拜访的 Morsel。
- 将 Non-pipeline 执行引擎的 DataSink 转换为 Pipeline 引擎的 SinkOperator,调用 _decompose_data_sink_to_operator
- 依据 DOP(degree-of-parallelism),为 Pipeline 创立 PipelineDriver,并且将 MorselQueue 和相应的 ScanOperator 关联。
- 实现其余的必要的初始化,并且注册 FragmentContext。
FragmentExecutor::execute 函数
// 文件: be/src/exec/pipeline/fragment_executor.cpp
Status FragmentExecutor::execute(ExecEnv* exec_env) {for (const auto& driver : _fragment_ctx->drivers()) {RETURN_IF_ERROR(driver->prepare(_fragment_ctx->runtime_state()));
}
if (_fragment_ctx->enable_resource_group()) {for (const auto& driver : _fragment_ctx->drivers()) {exec_env->wg_driver_executor()->submit(driver.get());
}
} else {for (const auto& driver : _fragment_ctx->drivers()) {exec_env->driver_executor()->submit(driver.get());
}
}
return Status::OK();}
FragmentExecutor::execute 函数的次要操作如下:
1. 变量 FragmentContext 中的所有 PipelineDriver,执行 PipelineDriver::prepare 函数。 该函数次要实现 PipelineDriver 范畴的 profile 注册、调用每个算子的 prepare 函数、设置 Driver 之间前置期待条件,比方 HashJoin 左侧的 PipelineDriver 须要期待右侧 PipelineDriver 实现,生产 RuntimeFilter 的 PipelineDriver 须要期待生产 RuntimeFilter 的 PipelineDriver 实现。
2. 把 PipelineDriver 提交给 Pipeline 执行线程。 PipelineDriver 提交后,FragmentExecutor 的生命周期完结,FragmentExecutor 是临时性的,禁止在 FragmentExecutor 中定义 PipelineDriver 可援用的对象。
3、PipelineBuilder 拆分 pipeline
BE 上的 PipelineBuilder 会把 PlanFragment 拆分成多个 Pipeline,拆分过程中,PlanFragment 中物理算子会转化为 Pipeline 算子。
物理算子
物理算子是 ExecNode 的子类,FE 投递给 BE 的 Fragment Instance 中,蕴含形成所属 PlanFragment 的物理算子,物理算子如下图所示:
另外,在 Fragment Instance 中,个别用 DataSink 的子类形容该 Fragment Instance 计算结果的去向,比方 DataStreamSink 会把计算结果发给上游 Fragment Instance 的 ExchangeNode。在 Pipeline 执行引擎中,DataStreamSink 和 ExchangeNode 会别离转化为 ExchangeSinkOperator 和 ExchangeSourceOperator。
Pipeline 算子
Pipeline 算子的数量比物理算子的数量多,这是因为,Pipeline 算子最多只有一路输出和一路输入,多路输出的物理算子和全量物化的物理算子,会拆解成多个 Pipeline 算子。Pipeline 算子接口定义,请参考 be/src/exec/pipeline/operator.h,局部接口定义如下:
- pull_chunk:从算子中拉取 chunk,个别计算时,从一对算子的前置算子拉取 chunk,而后推给后继算子。
- push_chunk:向算子推 chunk。
- has_output:示意状态,以后算子可输入,能够执行 pull_chunk。
- need_input:示意状态,以后算子可输出,能够执行 push_chunk。
- is_finished:以后算子曾经完结,不能执行 pull_chunk/push_chunk。
- prepare:prepare 和 open 表达式和调用其余外部数据结构的 prepare 函数。
- close:close 表达式和调用其余外部数据结构的 close 函数。
- set_finishing:敞开输出,执行 set_finishing 之后,算子的 need_input 始终返回 false,不可调用 push_chunk,但算子外部可能有缓存的计算结果,has_output 可能返回 true,能够调用 pull_chunk。
- set_finished:敞开输出和输入,调用后,is_finished、has_output、need_input 都返回 false,pull_chunk 和 push_chunk 不可调用,当 Pipeline 中 HashJoinProbeOperator 和 LimitOperator 算子产生短路并且提前结束时,须要调用前置算子 set_finished 函数。如果两个算子之间通过专门的 Context 替换数据,则 set_finished 函数中,须要正确地重置 Context 状态,一个算子须要感知到另外一个算子的 set_finished 函数调用。比方 LocalExchangeSinkOperator 和 LocalExchangeSourceOperator。
- set_canceled:相似 set_finished,但示意算子异样完结,如果算子须要辨别失常或者异样完结,则须要重载 set_canceled 函数,目前只有 ExchangeSinkOperator 用到该函数。
- pending_finish:示意状态,当算子实现了异步化,算子完结时,异步化工作尚未实现,算子须要期待异步化工作完结后,能力销毁所在的 PipelineDriver。提前销毁 PipelineDriver 可能会导致异步化工作延后执行援用算子中的已销毁对象。
一个算子会通过 prepare -> finishing -> finished -> [cancelled] -> closed 的转换,Pipeline 执行引擎依据算子的状态, 执行相应的接口。
Pipeline 执行引擎中,每个算子有一个 OperatorFactory 类,Pipeline 由 OperatorFactory 组成,PipelineDriver 是 Pipeline 的实例,PipelineDriver 由 Operator 形成,OperatorFactory 创立 Operator 对象,从 Pipeline 创立 PipelineDriver 时,遍历 Pipeline 中的 OperatorFactory,调用 OperatorFactory::create 办法。
// 文件:be/src/exec/pipeline/pipeline.h
// 函数: Pipeline::create_operators
Operators create_operators(int32_t degree_of_parallelism, int32_t i) {
Operators operators;
for (const auto& factory : _op_factories) {operators.emplace_back(factory->create(degree_of_parallelism, i));
}
return operators;
}
OperatorFactory 如下:
对于某些算子,比方 ScanOperator,来自同一个 OperatorFactory 的多个算子,会共享一份表达式,共享的表达式搁置在 OperatorFactory 中,通过 OperatorFactory::prepare 函数调用表达式的 prepare/open 函数,通过 OperatorFactory::close 函数调用表达式的 close 函数。
如果表达式不可重入,在计算时,算子在多个线程中执行,存在线程不平安问题。因而新增加的表达式,要保障:
- 表达式是线程平安的。
- 线程不平安的表达式,不在 OperatorFactory 中共享,每个算子有本人公有的正本。
Pipeline 拆分
当 BE 收到 FE 发来的 Fragment Instance 时,会创立一个 FragmentExecutor 对象初始化 Fragment Instance 的执行环境。FragmentExecutor::prepare 函数应用 PipelineBuilder 将 PlanFragment 拆分成 Pipeline。
// file: be/src/exec/pipeline/fragment_executor.cpp
// function: FragmentExecutor::prepare
ExecNode* plan = nullptr;
RETURN_IF_ERROR(ExecNode::create_tree(runtime_state, obj_pool, fragment.plan, *desc_tbl, &plan))
// ...
PipelineBuilderContext context(_fragment_ctx, degree_of_parallelism);
PipelineBuilder builder(context);
_fragment_ctx->set_pipelines(builder.build(*_fragment_ctx, plan))
- 首先通过 ExecNode::create_tree 函数取得 PlanFragment 的物理算子形成执行树。
- 初始化 PipelineBuilderContext 对象,传入 degree_of_parallelism 参数。
- 构建 PipelineBuilder 对象,调用 PipelineBuilder::build 拆分 Pipeline。
PipelineBuilder::build 次要从执行树 root 节点开始,递归调用 decompose_to_pipeline 函数。
Pipelines PipelineBuilder::build(const FragmentContext& fragment, ExecNode* exec_node) {pipeline::OpFactories operators = exec_node->decompose_to_pipeline(&_context);
_context.add_pipeline(operators);
_context.get_pipelines().back()->set_root();
return _context.get_pipelines();}
物理算子须要重载 ExecNode 的 decompose_to_pipeline 函数。
decompose_to_pipeline 函数递归地调用,实现算子的拆分。以 ProjectNode 为例,ProjectNode 调用 decompose_to_pipeline 函数对 _children[0] 先实现 Pipeline 拆解,并返回 OperatorFactory 数组,而后 ProjectNode 本身转变为 ProjectOperatorFactory,追加 OperatorFactory 数组的开端,参考上面代码:
pipeline::OpFactories ProjectNode::decompose_to_pipeline(pipeline::PipelineBuilderContext* context) {
using namespace pipeline;
OpFactories operators = _children[0]->decompose_to_pipeline(context);
// Create a shared RefCountedRuntimeFilterCollector
auto&& rc_rf_probe_collector = std::make_shared<RcRfProbeCollector>(1, std::move(this->runtime_filter_collector()));
operators.emplace_back(std::make_shared<ProjectOperatorFactory>(context->next_operator_id(), id(), std::move(_slot_ids), std::move(_expr_ctxs),
std::move(_type_is_nullable), std::move(_common_sub_slot_ids), std::move(_common_sub_expr_ctxs)));
// Initialize OperatorFactory's fields involving runtime filters.
this->init_runtime_filter_for_operator(operators.back().get(), context, rc_rf_probe_collector);
if (limit() != -1) {operators.emplace_back(std::make_shared<LimitOperatorFactory>(context->next_operator_id(), id(), limit()));
}
return operators;
}
简单算子的拆分可能会用到 LocalExchange 算子,目前 LocalExchange 算子反对 Passthrough、broadcast 和 shuffle 模式。更多简单的算子拆分能够参考针对这些算子的具体源码解析。
DataSink 的拆分和 ExecNode 不同,能够参考函数 FragmentExecutor::_decompose_data_sink_to_operator,此处不再赘述。
4、PipelineDriver 的调度逻辑
PipelineDriver 的调度次要波及上面几个函数:
- GlobalDriverExecutor::worker_thread:Pipeline 引擎执行线程的入口函数,该函数继续从就绪 Driver 队列获取 PipelineDriver,执行 PipelineDriver::process 函数。在 PipelineDriver 阻塞或者工夫片用完时,被动 yield,换其余就绪 PipelineDriver 执行。
- PipelineDriver::process 函数:调用 Operator::pull_chunk/push_chunk 函数进行计算,判断 PipelineDriver 是否阻塞或者须要 yield。
- PipelineDriverPoller::run_internal:阻塞 PipelineDriver 的轮询线程的函数,遍历阻塞 PipelineDriver,将曾经解除阻塞的 PipelineDriver 放回就绪队列。
GlobalDriverExecutor::worker_thread
该函数的次要性能是:
- 从就绪队列取 PipelineDriver。
- 执行 PipelineDriver::process 函数
- PipelineDriver 执行完一轮之后,判断 PipelineDriver 的以后状态
PipelineDriver 失常完结,异样完结或者计算出错,则调用 PipelineDriver::finalize_driver 函数实现 PipelineDriver 的清理;
PipelineDriver 依然处于 RUNNING 状态,则设置其状态为 READY,放回就绪 Driver 队列;
PipelineDriver 处于阻塞状态,则调用 PipelineDriverPoller->add_blocked_driver 函数,将 PipelineDriver 退出到阻塞 Driver 队列中。
- 就绪 Driver 队列采纳多级反馈队列 (mlfq) 实现,小查问优先调度,同时防止大查问饥饿。
- 请参考 https://github.com/StarRocks/…
PipelineDriver::process
该函数的性能次要有:
- 遍历 PipelineDriver 中的相邻算子对,只有当两个算子 is_finished() 返回 false,前置算子 has_output() 返回 true,后置算子 need_input() 返回 true 时,调用前置算子的 pull_chunk 取得 chunk,调用后置算子的 push_chunk,将 chunk 推给它,从而实现 chunk 的转移。请参考:https://github.com/StarRocks/…
- 当 PipelineDriver 中转移的 chunk 数量超过 100 个,本轮累积执行工夫超过 100ms,则被动 yield,退出以后 process,返回就绪队列,换其余 PipelineDriver 执行。
- 当 PipelineDriver 中以后无 chunk 能够挪动,则阐明 PipelineDriver 处于阻塞状态,退出以后 process,放回阻塞队列。
PipelineDriverPoller::run_internal
该函数遍历阻塞 Driver 队列,唤醒解除阻塞态的 PipelineDriver,放入就绪 Driver 队列,期待 Pipeline 执行线程调度。具体参考见:https://github.com/StarRocks/…
#04
总结
—
本文次要解说了 Pipeline 执行引擎想解决的问题及一般性原理。针对 Pipeline 执行引擎的实现,着重阐明了 BE 端拆分 Pipeline 的逻辑,以及 Pipeline 实例 PipelineDriver 的调度执行逻辑。
想要深刻学习 StarRocks 的执行引擎,还须要钻研 MPP 调度和向量化执行,后续咱们会持续撰文与大家分享。
读到这里,好学的你是不是又产生了一些新思考与启发?
扫描下方用户群二维码退出 StarRocks 社区一起自在交换!
对于 StarRocks
面世两年多来,StarRocks 始终专一打造世界顶级的新一代极速全场景 MPP 数据库,帮忙企业建设“极速对立”的数据分析新范式,助力企业全面数字化经营。
以后曾经帮忙腾讯、携程、顺丰、Airbnb、滴滴、京东、众安保险等超过 170 家大型用户构建了全新的数据分析能力,生产环境中稳固运行的 StarRocks 服务器数目达数千台。
2021 年 9 月,StarRocks 源代码凋谢,在 GitHub 上的星数已超过 3400 个。StarRocks 的寰球社区飞速成长,至今已有超百位贡献者,社群用户冲破 7000 人,吸引几十家国内外行业头部企业参加共建。