关于数据库:ClickHouse最佳实战之分布表写入流程分析

8次阅读

共计 8529 个字符,预计需要花费 22 分钟才能阅读完成。

云妹导读:

前不久,京东智联云正式上线了基于 Clickhouse 的剖析型云数据库 JCHDB,一经推出便受到宽广用户的极大关注。有趣味的小伙伴能够回顾上一篇文章《比 MySQL 快 839 倍!揭开剖析型数据库 JCHDB 的神秘面纱》

ClickHouse 像 ElasticSearch 一样具备数据分片(shard)的概念,这也是分布式存储的特点之一,即通过并行读写提高效率。ClickHouse 依附 Distributed 引擎实现了 Distributed(分布式)表机制,在所有分片(本地表)上建设视图进行分布式查问,应用很不便。

Distributed 表引擎是 一种非凡的表引擎,本身不会存储任何数据,而是通过读取或写入其余远端节点上的表进行数据处理的表引擎。该表引擎须要依赖各个节点的本地表来创立,本地表的存在是 Distributed 表创立的依赖条件,创立语句如下:

CREATE TABLE {teble} ON CLUSTER {cluster}
AS {local_table}
ENGINE= Distributed({cluster}, {database}, {local_table},{policy}) 

这里的 policy 个别能够应用随机(例如 rand())或哈希(例如 halfMD5hash(id))。

再来看下 ClickHouse 集群节点配置文件,相干参数如下:

<remote_servers>
    <logs>
        <shard>
            <weight>1</weight>
            <internal_replication>true</internal_replication>
            <replica>
                <priority>1</priority>
                <host>example01-01-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-01-2</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <weight>2</weight>
            <internal_replication>true</internal_replication>
            <replica>
                <host>example01-02-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-02-2</host>
                <port>9000</port>
            </replica>
        </shard>
    </logs>
</remote_servers> 

有了下面的根底理解,就将进入主题了,本文次要是对 Distributed 表如何写入及如何散发做一下剖析,略过 SQL 的词法解析、语法解析等步骤,从写入流开始,其构造方法如下:

DistributedBlockOutputStream(const Context & context_, StorageDistributed &
storage_, const ASTPtr & query_ast_, const ClusterPtr & cluster_, bool
insert_sync_, UInt64 insert_timeout_); 

如果 insert_sync_为 true,示意是同步写入,并配合 insert_timeout_参数应用(insert_timeout_为零示意没有超时工夫);如果 insert_sync_为 false,示意写入是异步。

1,同步写入还是异步写入

同步写入是指数据直写入理论的表中,而异步写入是指数据首先被写入本地文件系统,而后发送到远端节点。

BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const Context &
context)
{
   ......

   /// Force sync insertion if it is remote() table function
   bool insert_sync = settings.insert_distributed_sync || owned_cluster;
   auto timeout = settings.insert_distributed_timeout;
   /// DistributedBlockOutputStream will not own cluster, but will own 
ConnectionPools of the cluster
   return std::make_shared(
       context, *this, createInsertToRemoteTableQuery(remote_database,
remote_table, getSampleBlockNonMaterialized()), cluster,
       nsert_sync, timeout);
} 

是否执行同步写入是由 insert_sync 决定的,最终是由是否配置 insert_distributed_sync(默认为 false)和 owned_cluster 值的或关系决定的,个别在应用 MergeTree 之类的一般表引擎时,通常是异步写入,但在应用表函数时(应用 owned_cluster 来判断是否是表函数),通常会应用同步写入。这也是在设计业务逻辑时须要留神的。

owned_cluster 是什么时候赋值的呢?

StoragePtr TableFunctionRemoteexecuteImpl(const ASTPtr & astfunction, const Context & 
context, const stdstring & tablename) const
{ 
 ......
 StoragePtr res = remotetablefunction_ptr
     ? StorageDistributed::createWithOwnCluster(
       table_name,
       structureremotetable,
       remotetablefunction_ptr,
       cluster,
       context)
     : StorageDistributed::createWithOwnCluster(
       table_name,
       structureremotetable,
       remote_database,
       remote_table,
       cluster,
       context);
 ......
}  
StoragePtr StorageDistributed::createWithOwnCluster(
  const std::string & tablename, 
  const ColumnsDescription & columns_,
  ASTPtr & remotetablefunctionptr, 
  ClusterPtr & ownedcluster, 
  const Context & context_)
{auto res = create(String{}, tablename, columns, ConstraintsDescription{}, 
remotetablefunctionptr, String{}, context, ASTPtr(), String(), false);
  res->ownedcluster = ownedcluster_;
  return res;
} 

能够发现在创立 remote 表时会依据 remote_table_function_ptr 参数对最终的 owned_cluster_赋值为 true。

2,异步写入是如何实现的

理解了什么时候应用同步写入什么时候异步写入后,再持续剖析正式的写入过程,同步写入个别场景中波及较少,这里次要对异步写入逻辑进行剖析。outStream 的 write 办法主逻辑如下:

DistributedBlockOutputStream::write()
                 ↓
            if insert_sync
             |         |
           true      false
             ↓         ↓
      writeSync()   writeAsync() 

其实这个 write 办法是重写了 virtual void IBlockOutputStream::write(const Block & block),所以节点在接管到流并调用流的 write 办法就会进入该逻辑中。并且依据 insert_sync 来决定走同步写还是异步写。

3,写入本地节点还是远端节点

次要还是对异步写入进行剖析,其实 writeAsync()最终的实现办法是 writeAsyncImpl(),大抵逻辑图如下:

 writeAsyncImpl()
               ↓
 if shard_info.hasInternalReplication()
    |                          |
   true                       false
    ↓                          ↓
writeToLocal()             writeToLocal()
    ↓                          ↓
writeToShard()        for(every shard){writeToShard()}
    ↓                          ↓ 
   end                        end 

其中 getShardsInfo()办法就是获取 config.xml 配置文件中获取集群节点信息,hasInternalReplication()就对应着配置文件中的 internal_replication 参数,如果为 true,就会进入最外层的 if 逻辑,否则就会进入 else 逻辑。

其中 writeToLocal()办法是雷同的,是指如果 shard 蕴含本地节点,优先选择本地节点进行写入;后半局部 writeToShard()就是依据 internal_replication 参数的取值来决定是写入其中一个远端节点,还是所有远端节点都写一次。

4,数据如何写入本地节点

当然个别状况 Distributed 表还是基于 ReplicatedMergeTree 系列表进行创立,而不是基于表函数的,所以大多数场景还是会先写入本地再散发到远端节点。那写入 Distributed 表的数据是如何保障原子性落盘而不会在数据正在写入的过程中就把不残缺的数据发送给远端其余节点呢?看下 writeToShard()办法大抵逻辑,如下:

 writeToShard()
          ↓
for(every dir_names){
          |
          └──if first iteration
                 |       |
               false     true
                 ↓       ↓ 
                 |       ├──storage.requireDirectoryMonitor()
                 |       ├──CompressedWriteBuffer
                 |       ├──writeStringBinary()
                 |       ├──stream.writePrefix()
                 |       ├──stream.write(block)
                 |       ├──stream.writeSuffix()
                 ↘     ↙ 
             link(tmp_file, file) 
                    └──} 

持续具体再看下源码的具体实现,如下:

void DistributedBlockOutputStream::writeToShard(const Block & block, const
std::vector<std::string> & dir_names) 
{
   /** tmp directory is used to ensure atomicity of transactions
     * and keep monitor thread out from reading incomplete data
     */
   std::string first_file_tmp_path{};

   auto first = true;

   /// write first file, hardlink the others
   for (const auto & dir_name : dir_names)
   {const auto & path = storage.getPath() + dir_name + '/';

       /// ensure shard subdirectory creation and notify storage
       if (Poco::File(path).createDirectory())
           storage.requireDirectoryMonitor(dir_name);

       const auto & file_name = toString(storage.file_names_increment.get()) +
".bin";
       const auto & block_file_path = path + file_name;

       /** on first iteration write block to a temporary directory for 
subsequent hardlinking to ensure
           * the inode is not freed until we're done */
       if (first)
       {
           first = false;

           const auto & tmp_path = path + "tmp/";
           Poco::File(tmp_path).createDirectory();
           const auto & block_file_tmp_path = tmp_path + file_name;

           first_file_tmp_path = block_file_tmp_path;

           WriteBufferFromFile out{block_file_tmp_path};
           CompressedWriteBuffer compress{out};
           NativeBlockOutputStream stream{compress, ClickHouseRevision::get(),
block.cloneEmpty()};

           writeVarUInt(UInt64(DBMS_DISTRIBUTED_SENDS_MAGIC_NUMBER), out);
           context.getSettingsRef().serialize(out);
           writeStringBinary(query_string, out);

          stream.writePrefix();
          stream.write(block);
          stream.writeSuffix();}

       if (link(first_file_tmp_path.data(), block_file_path.data()))
           throwFromErrnoWithPath("Could not link" + block_file_path + "to"
+ first_file_tmp_path, block_file_path,
                  ErrorCodes::CANNOT_LINK);
   }
       ......
} 

首先来理解下 Distributed 表在目录中的存储形式,默认地位都是 /var/lib/clickhouse/data/{database}/{table}/ 在该目录下会为每个 shard 生成不同的目录,其中寄存须要发送给该 shard 的数据文件,例如:

[root@ck test]# tree
.
├── 'default@ck2-0:9000,default@ck2-1:9000'
│   ├── 25.bin
│   └── tmp
│   └── 26.bin
└── 'default@ck3-0:9000,default@ck3-1:9000'
└── tmp 

能够发现每个 shard 对应的目录名是 {darabse}@{hostname}:{tcpPort} 的格局,如果多个正本会用,分隔。并且每个 shard 目录中还有个 tmp 目录,这个目录的设计在 writeToShard()办法中做了解释,是为了防止数据文件在没写完就被发送到远端。

数据文件在本地写入的过程中会先写入 tmp 门路中,写完后通过硬链接 link 到 shard 目录,保障只有在 shard 目录中呈现的数据文件都是残缺写入的数据文件。

数据文件的命名是通过全局递增的数字加.bin 命名,是为了在后续散发到远端节点放弃程序性。

5,数据如何散发到各个节点

仔细的你可能曾经发现在 writeToShard()办法中有个 requireDirectoryMonitor(),这个办法就是将 shard 目录注册监听,并通过专用类 StorageDistributedDirectoryMonitor 来实现数据文件的散发,依据不同配置能够实现逐个散发或批量散发。并且蕴含对坏文件的容错解决。

剖析到这,可能还有人会感觉云里雾里,感觉整个流程串不起来,其实这样写是为了先不影响 Distributed 表写入的主流程,明确了这个再附加上 sharding_key 拆分和权重拆分就很好了解了。

下面提到过 writeAsync()的最终实现办法是 writeAsyncImpl,这个说法是没问题的,然而两头还有段要害逻辑,如下:

 writeAsync()
                           ↓
if storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1
               |                       |
             true                     false
              ↓                        ↓
      writeAsyncImpl(block)      writeSplitAsync(block)
                                        ↓
                                   splitBlock(block)
                                        ↓
                        writeAsyncImpl(splitted_blocks,shard_idx) 

getShardingKeyExpr()办法就是去获取 sharding_key 生成的表达式指针,该表达式是在创立表时就生成的,如下:

sharding_key_expr = buildShardingKeyExpression(sharding_key_, global_context,
getColumns().getAllPhysical(), false); 

那 sharding_key 和 sharding_key_expr 是什么关系呢?如下:

const ExpressionActionsPtr & getShardingKeyExpr() const { return 
sharding_key_expr; } 

所以说 sharding_key_expr 最终次要就是由 sharding_key 决定的。

个别状况下 getShardingKeyExpr()办法都为 true,如果再满足 shard 数量大于 1,就会对 block 进行拆分,由 splitBlock()办法次要逻辑就是创立 selector 并应用 selector 进行切割,大抵逻辑如下:

 splitBlock()
                  ↓
           createSelector(block)
                  ↓
for(every shard){column->scatter(num_shards, selector);} 

对于如何创立 selector 以及 selector 中都做了什么事儿,来具体看下源码截取,如下:

IColumn::Selector DistributedBlockOutputStream::createSelector(const Block &
source_block)
{
    Block current_block_with_sharding_key_expr = source_block;
    storage.getShardingKeyExpr()- 
>execute(current_block_with_sharding_key_expr);

    const auto & key_column =
current_block_with_sharding_key_expr.getByName(storage.getShardingKeyColumnName
());
    const auto & slot_to_shard = cluster->getSlotToShard();
    ......
   throw Exception{"Sharding key expression does not evaluate to an integer 
type", ErrorCodes::TYPE_MISMATCH};
} 

看 splitBlock()办法,ClickHouse 是利用 createSelector()办法结构 selector 来进行后续的解决。在 createSelector()办法中最重要的就是 key_column 和 slot_to_shard。

key_column 是通过 sharding_key 间接取得的,是为了依据主键列进行切割;slot_to_shard 是 shard 插槽,这里就是为了解决权重,在后续向插槽中插入数据时就会联合 config.xml 中的 weight 进行按比例解决。

细节比较复杂这里不做太粗疏的剖析,有趣味能够自行看下(如 template IColumn::Selector createBlockSelector())。

到此,对于 Distributed 表的写入流程的关键点就大抵剖析完了。篇幅无限有些细节没有做过多阐明,有趣味的能够自行再理解下。

通过对 Distributed 表写入流程的剖析,理解了该类型表的理论工作原理,所以在理论利用中有几个点还须要关注一下:

  1. Distributed 表在写入时会在本地节点生成长期数据,会产生写放大,所以会对 CPU 及内存造成一些额定耗费,倡议尽量少应用 Distributed 表进行写操作;
  2. Distributed 表写的长期 block 会把原始 block 依据 sharding_key 和 weight 进行再次拆分,会产生更多的 block 散发到远端节点,也减少了 merge 的累赘;
  3. Distributed 表如果是基于表函数创立的,个别是同步写,须要留神。

理解原理能力更好的应用,遇到问题能力更好的优化。

点击【[浏览原文]】即可返回京东智联云控制台开明试用 JCHDB。

正文完
 0