关于数据库:greenplum-数据分布策略分析

11次阅读

共计 7880 个字符,预计需要花费 20 分钟才能阅读完成。

greenplum 数据分布策略剖析

greenplum 是一个 MPP 架构的数据库,由一个 master 和多个 segment 组成(还可选配置一个 standby master),其数据会依据设置的散布策略散布到在不同的 segment 上。

在 6 版本中,gp 提供了 3 个策略:随机散布、复制散布、hash 散布。

随机散布

在创立表的时候,应用 “DISTRIBUTED RANDOMLY” 子句。

该策略会使数据随机散布到各个 segment,即便是齐全一样的两行数据,也可能会被扩散至不同的 segment。尽管随机散布能够使数据均匀的扩散至所有的 segment(不会呈现数据歪斜),但进行表关联剖析时,依然会依照关联键进行重散布数据,所以该策略在生产环境中很少应用。

复制散布

在创立表的时候,应用 “DISTRIBUTED REPLICATED” 子句。

该策略会把数据发送至所有的 segment,即所有的 segment 都领有该表的所有数据,所以在表关联剖析时,能够缩小数据重散布,但该数据会保留到所有的 segment,所以会产生大量的反复数据。所以,该策略适宜一些小表应用。

hash 散布

在重建表的时候,应用 “DISTRIBUTED BY (column,[…])” 子句。

该策略须要用户指定哪些列作为散布键,且散布键必须是主键的子集。gp 会依据散布键的值,进行计算得出 hash key 值,再依据该 key 值计算得出该数据被调配到哪个 segment 上。用户能够联合本人的数据特点,以及当前数据分析的法则,为不同的表指定不同的散布键,以提供良好的数据存储以及数据分析性能。

hash 流程

这里间接贴出调用堆栈,重点剖析 directDispatchCalculateHash 函数:

调用堆栈

#0  cdbhashinit (h=0x2e4e738) at cdbhash.c:161
#1  0x0000000000b05017 in directDispatchCalculateHash (plan=0x2e4dce8, targetPolicy=0x2e4e178, hashfuncs=0x2e4e6b8)
    at cdbmutate.c:197
#2  0x0000000000b0a989 in sri_optimize_for_result (root=0x2e4cf18, plan=0x2e4dce8, rte=0x2e4cd88,
    targetPolicy=0x7ffe5fca0ec0, hashExprs_p=0x7ffe5fca0ed0, hashOpfamilies_p=0x7ffe5fca0ec8) at cdbmutate.c:3560
#3  0x0000000000810d6e in adjust_modifytable_flow (root=0x2e4cf18, node=0x2e4e068, is_split_updates=0x2e4d9b8)
    at createplan.c:6608
#4  0x00000000008108bd in make_modifytable (root=0x2e4cf18, operation=CMD_INSERT, canSetTag=1 '\001',
    resultRelations=0x2e4e038, subplans=0x2e4dfe8, withCheckOptionLists=0x0, returningLists=0x0,
    is_split_updates=0x2e4d9b8, rowMarks=0x0, epqParam=0) at createplan.c:6471
#5  0x0000000000817e24 in subquery_planner (glob=0x2cbcf70, parse=0x2d7cd80, parent_root=0x0, hasRecursion=0 '\000',
    tuple_fraction=0, subroot=0x7ffe5fca11b8, config=0x2e4cee8) at planner.c:907
#6  0x0000000000816d1d in standard_planner (parse=0x2d7cd80, cursorOptions=0, boundParams=0x0) at planner.c:345
#7  0x0000000000816904 in planner (parse=0x2cbd080, cursorOptions=0, boundParams=0x0) at planner.c:200
#8  0x00000000008e8f4a in pg_plan_query (querytree=0x2cbd080, cursorOptions=0, boundParams=0x0) at postgres.c:959
#9  0x00000000008e8ffd in pg_plan_queries (querytrees=0x2d7b458, cursorOptions=0, boundParams=0x0) at postgres.c:1018
#10 0x00000000008ea3e8 in exec_simple_query (query_string=0x2cbc0d8 "insert INTO hash values (1,'asdf','fdsa','qwer');") at postgres.c:1748
#11 0x00000000008ef189 in PostgresMain (argc=1, argv=0x2c9bc10, dbname=0x2c9bac0 "postgres",
    username=0x2c9baa8 "gpadmin") at postgres.c:5242
#12 0x000000000086db12 in BackendRun (port=0x2cc5830) at postmaster.c:4811
#13 0x000000000086d1da in BackendStartup (port=0x2cc5830) at postmaster.c:4468
#14 0x0000000000869424 in ServerLoop () at postmaster.c:1948
#15 0x00000000008689c3 in PostmasterMain (argc=6, argv=0x2c99c20) at postmaster.c:1518
#16 0x0000000000774e33 in main (argc=6, argv=0x2c99c20) at main.c:245

directDispatchCalculateHash

这里只贴出 directDispatchCalculateHash 函数的重点代码及正文:

static void
directDispatchCalculateHash(Plan *plan, GpPolicy *targetPolicy, Oid *hashfuncs)
{
    // ..... 以上代码省略

        // 为以后插入的数据会话创立 cdbHash 环境
        // 次要包含:// 1、以后 gp 的 segment 个数
        // 2、hash key 值到 segment 的 reduce 函数
        // 3、该表的散布键,以及该散布键类型对应计算 hash key 的函数
        h = makeCdbHash(targetPolicy->numsegments, targetPolicy->nattrs, hashfuncs);

        // 初始化 cdbHash,次要是初始化 hashkey 值
        cdbhashinit(h);

        // 遍历所有的散布键
        // nattrs 是散布键个数
        for (i = 0; i < targetPolicy->nattrs; i++)
        {
            // 进行 hash key 值计算
            cdbhash(h, i + 1, values[i], nulls[i]);
        }

        // 依据后面计算出来的 hash key, 
        // 再算出该数据数据应该映射到哪个 segment
        hashcode = cdbhashreduce(h);

    // ...... 以下代码省略
}

cdbhash

void
cdbhash(CdbHash *h, int attno, Datum datum, bool isnull)
{
    uint32      hashkey = h->hash;

    // ...... 省略一些非关键代码

        /* rotate hashkey left 1 bit at each step */
        hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);

        if (!isnull)
        {
            FunctionCallInfoData fcinfo;
            uint32      hkey;

            InitFunctionCallInfoData(fcinfo, &h->hashfuncs[attno - 1], 1,
                                     InvalidOid,
                                     NULL, NULL);

            fcinfo.arg[0] = datum;
            fcinfo.argnull[0] = false;

            hkey = DatumGetUInt32(FunctionCallInvoke(&fcinfo));

            /* Check for null result, since caller is clearly not expecting one */
            if (fcinfo.isnull)
                elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);

            hashkey ^= hkey;
        }

    // ...... 省略一些非关键代码

    h->hash = hashkey;
}

剖析:

1、InitFunctionCallInfoData 该宏开展为:

#define InitFunctionCallInfoData(Fcinfo, Flinfo, Nargs, Collation, Context, Resultinfo) \
    do { \
        (Fcinfo).flinfo = (Flinfo); \
        (Fcinfo).context = (Context); \
        (Fcinfo).resultinfo = (Resultinfo); \
        (Fcinfo).fncollation = (Collation); \
        (Fcinfo).isnull = false; \
        (Fcinfo).nargs = (Nargs); \
    } while (0)

这里次要是用来初始化 Fcinfo 构造体,fcinfo 类型为 FunctionCallInfoData,其定义为:typedef Datum (*PGFunction) (FunctionCallInfo fcinfo);。

FunctionCallInfoData 是一个通用的用于传递回调函数的入参构造体,

其中:

a、flinfo 字段是一个构造体,类型为 FmgrInfo,该构造体外面最重要的是 fn_addr 字段,它存储了前面真正调用的 hash 回调函数的地址。

b、nargs 字段示意回调函数的入参个数,这里固定为 1,阐明所有的 hash 函数的入参个数都只有 1 个。

2、FunctionCallInfoData 中的 arg 字段示意回调函数入参列表,这里只应用了 datum 赋值,从外层函数能够看进去,该值即为当前列的值。

所以从这里能够确定,散布键应用的 hash 回调函数的入参通过封装的 FunctionCallInfoData 构造体进行传输,且最终外面应用的 hash 函数的入参只有 1 个,就是散布键的值。

3、FunctionCallInvoke 开展后为 ((* (fcinfo)->flinfo->fn_addr) (fcinfo)),即这里真正调用了 hash 回调函数,并应用后面赋值好的 fcinfo 作为参数。

4、最终把 hash 回调函数的返回值强转为 uint32 类型,再与之前计算出来的 hash key 做异或操作后,作为最初的 hash key 保留到以后 cdbHash 环境中的 hash 里,即最初的赋值:h->hash = hashkey。

总结

外层,先对以后的会话创立一个 hash 环境,而后遍历每个散布键做一次 hash 计算,依据最终的 hash key 值,做一次 reduce,计算出 segment id。

内层,先初始化通用的回调函数入参,再调用回调函数,并与之前的 hash key 值做一次异或操作,得出以后的 hash key。

hash 回调函数剖析

smallint / int / bigint 类型
smallint 类型,对应的 hash 函数是 hashint2,

int 类型,对应的 hash 函数是 hashint4,

bigint 类型,对应的 hash 函数是 hashint8,

具体实现如下:

#define PG_GETARG_DATUM(n)   (fcinfo->arg[n])
#define PG_GETARG_INT16(n)   DatumGetInt16(PG_GETARG_DATUM(n))
#define PG_GETARG_INT32(n)   DatumGetInt32(PG_GETARG_DATUM(n))
#define PG_GETARG_INT64(n)   DatumGetInt64(PG_GETARG_DATUM(n))

Datum
hashint2(PG_FUNCTION_ARGS)
{return hash_uint32((int32) PG_GETARG_INT16(0));
}

Datum
hashint4(PG_FUNCTION_ARGS)
{return hash_uint32(PG_GETARG_INT32(0));
}

Datum
hashint8(PG_FUNCTION_ARGS)
{
    /*
     * The idea here is to produce a hash value compatible with the values
     * produced by hashint4 and hashint2 for logically equal inputs; this is
     * necessary to support cross-type hash joins across these input types.
     * Since all three types are signed, we can xor the high half of the int8
     * value if the sign is positive, or the complement of the high half when
     * the sign is negative.
     */
    int64       val = PG_GETARG_INT64(0);
    uint32      lohalf = (uint32) val;
    uint32      hihalf = (uint32) (val >> 32);

    lohalf ^= (val >= 0) ? hihalf : ~hihalf;

    return hash_uint32(lohalf);
}

把宏开展后,能够察看到,smallint、int 和 bigint 实际上底层调用的 hash 函数都是 hash_uint32,惟一有区别的是 hash_uint32 的入参。

当类型是 smallint 或 int 时,入参就是其自身,而当类型是 bigint 时,该类型长度为 8 字节,所以须要对其解决一下:当被 hash 的值大于等于 0 时,则应用高 4 字节与第 4 字节异或的值进行 hash;当被 hash 的值小于 0 时,则应用高 4 字节的相反数,与低 4 字节异或的值进行 hash。

char / varchar / text 类型
char 类型,对应的 hash 函数是 hashbpchar,

text / varchar 类型,对应的 hash 函数是:hashtext,

具体实现如下:

typedef struct varlena text;

#define PG_GETARG_DATUM(n)   (fcinfo->arg[n])
#define PG_DETOAST_DATUM_PACKED(datum) \
    pg_detoast_datum_packed((struct varlena *) DatumGetPointer(datum))
#define DatumGetTextPP(X)           ((text *) PG_DETOAST_DATUM_PACKED(X))
#define PG_GETARG_TEXT_PP(n)        DatumGetTextPP(PG_GETARG_DATUM(n))

Datum
hashtext(PG_FUNCTION_ARGS)
{text       *key = PG_GETARG_TEXT_PP(0);
    Datum       result;

    /*
     * Note: this is currently identical in behavior to hashvarlena, but keep
     * it as a separate function in case we someday want to do something
     * different in non-C locales.  (See also hashbpchar, if so.)
     */
    result = hash_any((unsigned char *) VARDATA_ANY(key),
                      VARSIZE_ANY_EXHDR(key));

    /* Avoid leaking memory for toasted inputs */
    PG_FREE_IF_COPY(key, 0);

    return result;
}
typedef struct varlena BpChar;

#define PG_GETARG_DATUM(n)   (fcinfo->arg[n])
#define PG_DETOAST_DATUM_PACKED(datum) \
    pg_detoast_datum_packed((struct varlena *) DatumGetPointer(datum)
#define DatumGetBpCharPP(X)         ((BpChar *) PG_DETOAST_DATUM_PACKED(X))
#define PG_GETARG_BPCHAR_PP(n)      DatumGetBpCharPP(PG_GETARG_DATUM(n))

Datum
hashbpchar(PG_FUNCTION_ARGS)
{BpChar     *key = PG_GETARG_BPCHAR_PP(0);
    char       *keydata;
    int         keylen;
    Datum       result;

    keydata = VARDATA_ANY(key);
    keylen = bcTruelen(key);

    result = hash_any((unsigned char *) keydata, keylen);

    /* Avoid leaking memory for toasted inputs */
    PG_FREE_IF_COPY(key, 0);

    return result;
}

把下面的宏开展后,比照这三种类型的 hash 函数,其实不难发现,它们的 hash 函数底层都一样,都是通过 hash_any 函数进行计算,入参都是自身字符串值,以及字符串长度。

附:所有类型对应的 hash 函数

类型 别名 函数
smallint int2 hashint2
integer “int int4″
bigint int8 hashint8
bit bithash
bit varying varbit bithash
boolean bool hashchar
bytea hashvarlena
character[(n)] char[(n)] hashbpchar
character varying[(n)] varchar[(n)] hashtext
text hashtext
cidr hashinet
date hashint4
inet hashinet
interval interval_hash
jsonb jsonb_hash
macaddr hashmacaddr
numeric decimal hash_numeric
time[without time zone] float4 time_hash

正文完
 0