query 启动入口
Databend-query server 的启动入口在 databend/src/binaries/query/main.rs 下,在初始化配置之后,它会创立一个 GlobalServices
和 server 敞开时负责解决 shutdown 逻辑的 shutdown_handle
GlobalServices::init(conf.clone()).await?;let mut shutdown_handle = ShutdownHandle::create()?;
GlobalServices
GlobalServices
负责启动 databend-query 的所有全局服务,这些服务都遵循繁多责任准则。
pub struct GlobalServices { global_runtime: UnsafeCell<Option<Arc<Runtime>>>, // 负责解决 query log query_logger: UnsafeCell<Option<Arc<QueryLogger>>>, // 负责 databend query 集群发现 cluster_discovery: UnsafeCell<Option<Arc<ClusterDiscovery>>>, // 负责与 storage 层交互来读写数据 storage_operator: UnsafeCell<Option<Operator>>, async_insert_manager: UnsafeCell<Option<Arc<AsyncInsertManager>>>, cache_manager: UnsafeCell<Option<Arc<CacheManager>>>, catalog_manager: UnsafeCell<Option<Arc<CatalogManager>>>, http_query_manager: UnsafeCell<Option<Arc<HttpQueryManager>>>, data_exchange_manager: UnsafeCell<Option<Arc<DataExchangeManager>>>, session_manager: UnsafeCell<Option<Arc<SessionManager>>>, users_manager: UnsafeCell<Option<Arc<UserApiProvider>>>, users_role_manager: UnsafeCell<Option<Arc<RoleCacheManager>>>,}
GlobalServices
中的全局服务都实现了单例 trait,这些全局管理器后续会有对应的源码剖析文章介绍,本文介绍与 Session 解决相干的逻辑。
pub trait SingletonImpl<T>: Send + Sync { fn get(&self) -> T; fn init(&self, value: T) -> Result<()>;}pub type Singleton<T> = Arc<dyn SingletonImpl<T>>;
ShutdownHandle
接下来会依据网络协议初始化 handlers,并把它们注册到 shutdown_handler
的 services 中,任何实现 Server trait 的类型都能够被增加到 services 中。
#[async_trait::async_trait]pub trait Server: Send { async fn shutdown(&mut self, graceful: bool); async fn start(&mut self, listening: SocketAddr) -> Result<SocketAddr>;}
目前 Databend 反对三种协定提交查问申请(mysql, clickhouse http, raw http)。
// MySQL handler.{ let hostname = conf.query.mysql_handler_host.clone(); let listening = format!("{}:{}", hostname, conf.query.mysql_handler_port); let mut handler = MySQLHandler::create(session_manager.clone()); let listening = handler.start(listening.parse()?).await?; // 注册服务到 shutdown_handle 来解决 server shutdown 时候的敞开逻辑,下同 shutdown_handle.add_service(handler);}// ClickHouse HTTP handler.{ let hostname = conf.query.clickhouse_http_handler_host.clone(); let listening = format!("{}:{}", hostname, conf.query.clickhouse_http_handler_port); let mut srv = HttpHandler::create(session_manager.clone(), HttpHandlerKind::Clickhouse); let listening = srv.start(listening.parse()?).await?; shutdown_handle.add_service(srv);}// Databend HTTP handler.{ let hostname = conf.query.http_handler_host.clone(); let listening = format!("{}:{}", hostname, conf.query.http_handler_port); let mut srv = HttpHandler::create(session_manager.clone(), HttpHandlerKind::Query); let listening = srv.start(listening.parse()?).await?; shutdown_handle.add_service(srv);}
之后会创立一些其它服务
- Metric service: 指标服务
- Admin service: 负责解决治理信息
- RPC service: query 节点的 rpc 服务,负责 query 节点之间的通信,应用 arrow flight 协定
// Metric API service.{ let address = conf.query.metric_api_address.clone(); let mut srv = MetricService::create(session_manager.clone()); let listening = srv.start(address.parse()?).await?; shutdown_handle.add_service(srv); info!("Listening for Metric API: {}/metrics", listening);}// Admin HTTP API service.{ let address = conf.query.admin_api_address.clone(); let mut srv = HttpService::create(session_manager.clone()); let listening = srv.start(address.parse()?).await?; shutdown_handle.add_service(srv); info!("Listening for Admin HTTP API: {}", listening);}// RPC API service.{ let address = conf.query.flight_api_address.clone(); let mut srv = RpcService::create(session_manager.clone()); let listening = srv.start(address.parse()?).await?; shutdown_handle.add_service(srv); info!("Listening for RPC API (interserver): {}", listening);}
最初会将这个 query 节点注册到 meta server 中。
// Cluster register.{ let cluster_discovery = session_manager.get_cluster_discovery(); let register_to_metastore = cluster_discovery.register_to_metastore(&conf); register_to_metastore.await?;}
Session 相干
session 次要分为 4 个局部
- session_manager: 全局惟一,负责管理 client session
- session: 每当有新的 client 连贯到 server 之后会创立一个新的 session 并且注册到 session_manager
- query_ctx: 每一条查问语句会有一个 query_ctx,用来存储以后查问的一些上下文信息
- query_ctx_shared: 查问语句中的子查问共享的上下文信息
上面逐个来剖析
SessionManager (query/src/sessions/session_mgr.rs)
pub struct SessionManager { pub(in crate::sessions) conf: Config, pub(in crate::sessions) max_sessions: usize, pub(in crate::sessions) active_sessions: Arc<RwLock<HashMap<String, Arc<Session>>>>, pub status: Arc<RwLock<SessionManagerStatus>>, // When session type is MySQL, insert into this map, key is id, val is MySQL connection id. pub(crate) mysql_conn_map: Arc<RwLock<HashMap<Option<u32>, String>>>, pub(in crate::sessions) mysql_basic_conn_id: AtomicU32,}
SessionManager
次要用来创立和销毁 session,对应办法如下
// 依据 client 协定类型来创立 sessionpub async fn create_session(self: &Arc<Self>, typ: SessionType) -> Result<SessionRef>// 依据 session id 来销毁 sessionpub fn destroy_session(self: &Arc<Self>, session_id: &String)
Session (query/src/sessions/session.rs)
session 次要存储 client-server 的上下文信息,代码命名曾经很清晰了,这里就不再过多赘述。
pub struct Session { pub(in crate::sessions) id: String, pub(in crate::sessions) typ: RwLock<SessionType>, pub(in crate::sessions) session_ctx: Arc<SessionContext>, status: Arc<RwLock<SessionStatus>>, pub(in crate::sessions) mysql_connection_id: Option<u32>,}pub struct SessionContext { conf: Config, abort: AtomicBool, current_catalog: RwLock<String>, current_database: RwLock<String>, current_tenant: RwLock<String>, current_user: RwLock<Option<UserInfo>>, auth_role: RwLock<Option<String>>, client_host: RwLock<Option<SocketAddr>>, io_shutdown_tx: RwLock<Option<Sender<Sender<()>>>>, query_context_shared: RwLock<Option<Arc<QueryContextShared>>>,}pub struct SessionStatus { pub session_started_at: Instant, pub last_query_finished_at: Option<Instant>,}
Session
的另一个大的性能是负责创立和获取 QueryContext
,每次接管到新的 query 申请都会创立一个 QueryContext
并绑定在对应的 query 语句上。
QueryContext (query/src/sessions/query_ctx.rs)
QueryContext 次要是保护查问的上下文信息,它通过 QueryContext::create_from_shared(query_ctx_shared)
创立。
#[derive(Clone)]pub struct QueryContext { version: String, statistics: Arc<RwLock<Statistics>>, partition_queue: Arc<RwLock<VecDeque<PartInfoPtr>>>, shared: Arc<QueryContextShared>, precommit_blocks: Arc<RwLock<Vec<DataBlock>>>, fragment_id: Arc<AtomicUsize>,}
其中 partition_queue
次要存储查问对应的 PartInfo,包含 part 的地址、版本信息、波及数据的行数,part 应用的压缩算法、以及波及到 column 的 meta 信息。在 pipeline build 时候会去设置 partition。pipeline 后续会有专门的文章介绍。precommit_blocks
负责暂存插入操作的时曾经写入到存储, 然而尚未提交的元数据,DataBlock
次要蕴含 Column 的元信息援用和 arrow schema 的信息。
QueryContextShared (query/src/sessions/query_ctx_shared.rs)
对于蕴含子查问的查问,须要共享很多上下文信息,这就是 QueryContextShared
存在的理由。
/// 数据须要在查问上下文中被共享,这个很重要,比方:/// USE database_1;/// SELECT/// (SELECT scalar FROM table_name_1) AS scalar_1,/// (SELECT scalar FROM table_name_2) AS scalar_2,/// (SELECT scalar FROM table_name_3) AS scalar_3/// FROM table_name_4;/// 对于上体面查问, 会共享 runtime, session, progress, init_query_idpub struct QueryContextShared { /// scan_progress for scan metrics of datablocks (uncompressed) pub(in crate::sessions) scan_progress: Arc<Progress>, /// write_progress for write/commit metrics of datablocks (uncompressed) pub(in crate::sessions) write_progress: Arc<Progress>, /// result_progress for metrics of result datablocks (uncompressed) pub(in crate::sessions) result_progress: Arc<Progress>, pub(in crate::sessions) error: Arc<Mutex<Option<ErrorCode>>>, pub(in crate::sessions) session: Arc<Session>, pub(in crate::sessions) runtime: Arc<RwLock<Option<Arc<Runtime>>>>, pub(in crate::sessions) init_query_id: Arc<RwLock<String>>, ...}
它提供了 query 上下文所须要的所有根本信息。
Handler
之前提到了 Databend 反对多种 handler,上面就以 mysql 为例,看一下 handler 的解决流程以及如何与 session 产生交互。
首先 MySQLHandler
会蕴含一个 SessionManager
的援用
pub struct MySQLHandler { abort_handle: AbortHandle, abort_registration: Option<AbortRegistration>, join_handle: Option<JoinHandle<()>>,}
MySQLHandler
在启动后,会 spawn 一个 tokio task 来继续监听 tcp stream,并且创立一个 session 再启动一个 task 去执行之后的查问申请。
fn accept_socket(session_mgr: Arc<SessionManager>, executor: Arc<Runtime>, socket: TcpStream) { executor.spawn(async move { // 创立 session match session_mgr.create_session(SessionType::MySQL).await { Err(error) => Self::reject_session(socket, error).await, Ok(session) => { info!("MySQL connection coming: {:?}", socket.peer_addr()); // 执行查问 if let Err(error) = MySQLConnection::run_on_stream(session, socket) { error!("Unexpected error occurred during query: {:?}", error); }; } } });}
在 MySQLConnection::run_on_stream
中,session 会先 attach 到对应的 client host 并且注册一个 shutdown 闭包来解决敞开连贯敞开时须要执行的清理,要害代码如下:
// mysql_session.rspub fn run_on_stream(session: SessionRef, stream: TcpStream) -> Result<()> { let blocking_stream = Self::convert_stream(stream)?; MySQLConnection::attach_session(&session, &blocking_stream)?; ...}fn attach_session(session: &SessionRef, blocking_stream: &std::net::TcpStream) -> Result<()> { let host = blocking_stream.peer_addr().ok(); let blocking_stream_ref = blocking_stream.try_clone()?; session.attach(host, move || { // 注册 shutdown 逻辑 if let Err(error) = blocking_stream_ref.shutdown(Shutdown::Both) { error!("Cannot shutdown MySQL session io {}", error); } }); Ok(())}// session.rspub fn attach<F>(self: &Arc<Self>, host: Option<SocketAddr>, io_shutdown: F)where F: FnOnce() + Send + 'static { let (tx, rx) = oneshot::channel(); self.session_ctx.set_client_host(host); self.session_ctx.set_io_shutdown_tx(Some(tx)); common_base::base::tokio::spawn(async move { // 在 session quit 时候触发清理 if let Ok(tx) = rx.await { (io_shutdown)(); tx.send(()).ok(); } });}
之后会启动一个 MySQL InteractiveWorker 来解决后续的查问。
let join_handle = query_executor.spawn(async move { let client_addr = non_blocking_stream.peer_addr().unwrap().to_string(); let interactive_worker = InteractiveWorker::create(session, client_addr); let opts = IntermediaryOptions { process_use_statement_on_query: true, }; let (r, w) = non_blocking_stream.into_split(); let w = BufWriter::with_capacity(DEFAULT_RESULT_SET_WRITE_BUFFER_SIZE, w); AsyncMysqlIntermediary::run_with_options(interactive_worker, r, w, &opts).await});let _ = futures::executor::block_on(join_handle);
该 InteractiveWorker
会实现 AsyncMysqlShim trait 的办法,比方:on_execute
、on_query
等。查问到来时会回调这些办法来执行查问。这里以 on_query
为例,要害代码如下:
async fn on_query<'a>( &'a mut self, query: &'a str, writer: QueryResultWriter<'a, W>,) -> Result<()> { ... // response writer let mut writer = DFQueryResultWriter::create(writer); let instant = Instant::now(); // 执行查问 let blocks = self.base.do_query(query).await; // 回写后果 let format = self.base.session.get_format_settings()?; let mut write_result = writer.write(blocks, &format); ... // metrics 信息 histogram!( super::mysql_metrics::METRIC_MYSQL_PROCESSOR_REQUEST_DURATION, instant.elapsed() ); write_result}
在 do_query
中会创立 QueryContext
并开始解析 sql 流程来实现后续的整个 sql 查问。要害代码如下:
// 创立 QueryContextlet context = self.session.create_query_context().await?;// 关联到查问语句context.attach_query_str(query);let settings = context.get_settings();// parse sqllet stmts_hints = DfParser::parse_sql(query, context.get_current_session().get_type());...// 创立并生成查问打算let mut planner = Planner::new(context.clone());let interpreter = planner.plan_sql(query).await.and_then(|v| { has_result_set = has_result_set_by_plan(&v.0); InterpreterFactoryV2::get(context.clone(), &v.0)})// 执行查问,返回后果Self::exec_query(interpreter.clone(), &context).await?;let schema = interpreter.schema();Ok(QueryResult::create( blocks, extra_info, has_result_set, schema,))
序幕
以上就是从 Databend 启动服务到承受 sql 申请并开始解决的流程。最近咱们因为一些起因(Clickhouse tcp 协定偏差 clickhouse 的底层,协定没有公开的文档阐明,同时外面历史包袱比拟重,排查问题节约大量精力)去掉了 ClickHouse native tcp client,具体请参见: https://github.com/datafusela...
如果你浏览完代码有好的提议,欢送来这里探讨,另外如果发现相干的问题,能够提交到 issue 来帮忙咱们进步 Databend 的稳定性。Databend 社区欢送所有善意的意见和倡议 :)
对于 Databend
Databend 是一款开源、弹性、低成本,基于对象存储也能够做实时剖析的旧式数仓。期待您的关注,一起摸索云原生数仓解决方案,打造新一代开源 Data Cloud。
- Databend 文档:https://databend.rs/
- Twitter:https://twitter.com/Datafuse_...
- Slack:https://datafusecloud.slack.com/
- Wechat:Databend
- GitHub :https://github.com/datafusela...
文章首发于公众号:Databend