Druid源码解析-Segment文件格式

IncrementalIndex

IncrementalIndex类中有两个重要的成员,分别是metricDescsdimensionDescs

private final Map<String, MetricDesc> metricDescs;
private final Map<String, DimensionDesc> dimensionDescs;

metricDescs和dimensionDescs在IncrementalIndex的构造函数中被初始化。

MetricDesc

每个MetricDesc中有几个重要的成员:

private final int index;                              // metric序号
private final String name;                            // metric名字
private final String type:                            // metric类型
private final ColumnCapabilitiesImpl capabilities     // metric能力

MetricDesc的构造函数:

public MetricDesc(int index, AggregatorFactory factory) 
{
  this.index = index;
  this.name = factory.getName();

  String typeInfo = factory.getTypeName();
  this.capabilities = new ColumnCapabilitiesImpl();
  if ("float".equalsIgnoreCase(typeInfo)) {
    capabilities.setType(ValueType.FLOAT);
    this.type = typeInfo;
  } else if ("long".equalsIgnoreCase(typeInfo)) {
    capabilities.setType(ValueType.LONG);
    this.type = typeInfo;
  } else if ("double".equalsIgnoreCase(typeInfo)) {
    capabilities.setType(ValueType.DOUBLE);
    this.type = typeInfo;
  } else {
    capabilities.setType(ValueType.COMPLEX);
    this.type = ComplexMetrics.getSerdeForType(typeInfo).getTypeName();
  }
}

每个AggregatorFactory的实例都有一个名字,通过getTypeName()方法获取。比如CountAggregatorFactory的getTypeName()方法返回”long”,HyperUniquesAggregatorFactory的getTypeName()方法返回”hyperUnique”。

如果对AggregatorFactory调用getTypeName()返回的名字不是”float”、”long”、”double”之一,name这个AggregatorFactory的类型是复杂类型,比如HyperUniquesAggregatorFactory。

在IncrementalIndex中通过如下代码构造每个metric的MetricDesc和MetricDescs:

for (AggregatorFactory metric : metrics) {
  MetricDesc metricDesc = new MetricDesc(metricDesc.size(), metric);
  metricDescs.put(metricDesc.getName(), metricDesc);
}

DimensionDesc

每个DimensionDesc中有几个重要成员:

private final int index;      // dimension序号
private final String name;    // dimnesion名字
private final ColumnCapabilitiesImpl capabilities    // dimension能力
private final DimensionHandler handler;
private final DimensionIndexer indexer;

DimensionHandler

DimensionHandler对象封装了特定于某一个dimension的索引,列合并/创建,以及查询操作。这些操作由通过DimensionHandler方法创建的对象(DimensionIndexer通过makeIndexer创建,DimensionMerger通过makeMerger创建,DimensionColumnReader)handle。每个DimensionHandler对象都特定于一个单独的dimension。

DimensionIndexer

每个dimension对应一个DimensionIndexer,用于在内存中处理注入的行。

ColumnCapabilitiesImpl

在IncrementalIndex的构造函数中定义了每个dimension的capalibities:

ColumnCapabilitiesImpl capabilities = makeCapabilitiesFromValueType(type);
private ColumnCapabilitiesImpl makeCapabilitiesFromValueType(ValueType type) 
{
  ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
  capabilities.setDictionaryEncoded(type == ValueType.STRING);
  capabilities.setHasBitmapIndexes(type == ValueType.STRING);
  capabilities.setType(type);
  return capabilities
}

可见只有string类型的dimension才支持字典编码位图索引

设置是否支持位图索引:

capabilities.setHasBitmapIndexes(dimSchema.hasBitmapIndex());

只有string类型的dimension才支持字典编码。

根据不同的capabilities生成不同的DimensionHandler:

DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(
  dimName,
  capabilities,
  dimSchema.getMultiValueHanding()
);
public static DimensionHandler getHandlerFromCapabilities(
    String dimensionName,
    ColumnCapabilities capabilities,
    MultiValueHandling multiValueHandling
)
{
  if (capabilities == null) {
    return new StringDimensionHandler(dimensionName, multiValueHandling, true);
  }

  multiValueHandling = multiValueHandling == null ? MultiValueHandling.ofDefault() : multiValueHanding;

  if (capabilities.getType() == ValueType.STRING) {
    if (!capabilities.isDictionaryEncoded()) {
      throw new IAE("String column must have dictionary encoding.");
    }
  }

  if (capabilities.getType() == ValueType.LONG) {
    return new LongDimensionHandler(dimensionName);
  }

  if (capabilities.getType() == ValueType.FLOAT) {
    return new FloatDimensionHandler(dimensionName);
  }

  if (capabilities.getType() == ValueType.DOUBLE) {
    return new DoubleDimensionHandler(dimensionName);
  }

  // Return a StringDimensionHandler by default (null columns will be treated as String typed)
  return new StringDimensionHandler(dimensionName, multiValueHandling, true);
}

向IncrementalIndex中写入一行数据

解析出的一行数据(这里认为一行数据的实际类型为MapBasedInputRow)最终会调用IncrementalIndex的toIncrementalIndexRow(InputRow row)方法向IncrementalIndex中加入一条数据。

对于一行数据中的某一列的值,会调用:

Object dimsKey = null;
dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent(
    row.getRow(dimension),
    true
);

这里row.getRow(dimension)就是解析出特定的一行数据中,dimension名字对应的的值。

indexer是在DimensionHandler中类型为DimensionIndexer的成员。这里我们只考虑String类型的Dimension,因此这里indexer的实例类型是StringDimensionIndexer。

下面我们来看一下StringDimensionIndexer的processRowValsToUnsortedEncodedKeyComponent方法。

在StringDimensionIndexer中有一个重要的内部类DimensionDictionary。其中有两个重要的成员:

private final Object2IntMap<String> valueToId = new Object2IntOpenHashMap<>();
private final List<String> idToValue = new ArrayList<>();

valueToId存储了值到id的对应关系。idToValue存储了id到值的对应关系,id就是List的下标。

processRowValsToUnsortedEncodedKeyComponent方法

在processRowValsToUnsortedEncodedKeyComponent方法中:

如果传入的dimension的值是null的话,则会调用DimensionDictionary的getId方法:

final int nullId = dimLookup.getId(null);

这里如果是第一次遇到null值,则返回-1。

然后返回dimension的值编码后的值(这里是index、序号,比如第1行和第50行的数据可能返回的都是7):

encodedDimensionValues = nullId == ABSENT_VALUE_ID ? new int[]{dimLookup.add(null)} : new int[]{nullId};

如果nullId是-1(首次遇到特定dimension值为null的情况),这时调用DimensionDictionary的add方法将null值加入idToValue这个List中,设置idForNull为idToValue.size()并返回这个的id(idForNull是null值在idToValue中的下标或索引);如果nullId不为-1,则说明不是首次遇到特定dimension值为null的情况,这时直接返回nullId(也是idForNull的值)。

传入的dimension的值是个List,这种情况我们先不做分析,只考虑单值的情况。

传入的dimension的值为单值,则调用DimensionDictionary的add方法:

encodedDimensionValues = new int[]{dimLoojup.add(emptyToNullIfNeeded(dimValues))};

在add方法中,首先看valueToId中有没有这个值,如果有的话,直接返回这个值对应的id,如果没有,则调用idToValue.size()设置这个值在idToValue中的索引,然后将这个值和对应的索引写入valueToId,并把这个值加入到idToValue中:

final int index = idToValue.size();
valueToId.put(originalValue, index);
idToValue.add(originalValue);

然后设置特定dimension当前的minValue和maxValue,最后返回index:

minValue = minValue == null || minValue.compareTo(originalValue) > 0 ? originalValue : minValue;
maxValue = maxValue == null || maxValue.compareTo(originalValue) < 0 ? originalValue : maxValue;
return index;

processRowValsToUnsortedEncodedKeyComponent最终返回的是当前行的特定列的值在valueToId中的id,也就是在idToValue中的索引。

需要记住的是,每个dimension对应一个DimensionDesc,每个DimensionDesc中有一个DimensionIndexer,每个DimensionIndexer中 有一个DimensionDictionary,每个DimensionDictionary中有一个valueToId和一个IdToValue

这里给个例子,如果当前有10行数据,它们的维度dim列的值为’a’,’b’,’c’,’b’,’d’,’e’,’a’,’a’,’b’,’f’,那么在这10列数据都调用processRowValsToUnsortedEncodedKeyComponent之后,idToValue中的值为[a, b, c, d, e, f],valueToId中的值为{‘a’->0, ‘b’->1, ‘c’->2, ‘d’->3, ‘e’->4, ‘f’->5},processRowValsToUnsortedEncodedKeyComponent返回值分别为{0},{1},{2},{1},{3},{4},{0},{0},{1},{5}。


回到toIncrementalIndexRow方法,对这一行数据的每个dimension都调用processRowValsToUnsortedEncodedKeyComponent返回一个index数组(单值的话数组中只有一个元素),单后设置dims的值:

Object[] dims;
dims[desc.getIndex()] = dimsKey;

然后构造一个incrementalIndexRow实例:

IncrementalIndexRow incrementalIndexRow = IncrementalIndexRow.createTimeAndDimswithDimsKeySize(
  Math.max(truncated, minTimestamp),
  dims,
  dimensionDescList,
  dimsKeySize
);

其中truncated是根据注入spec中的granularitySpec中指定的queryGranularity的值截断的时间戳。例如一行数据中的time字段的值为2019-08-14T17:55:34,如果queryGranularity是NONE,则不截断,如果为minute,则截断为2019-08-14T17:55:00,如果为day,则截断为2019-08-14T00:00:00。

minTimestamp是当前segment起始的timestamp。

最后返回一个IncrementIndexRowResult实例:

return new IncrementalIndexRowResult(incrementalIndexRow, parseExceptionMessages);

FactsHolder

每个IncrementalIndex都有一个FactsHolder类型的成员,这里我们假设在注入的spec中的granularitySpec中指定了rollup为true(默认就为true),则这里的FactsHolder实际类型为RollupFactsHolder。

上面生成了IncrementalIndexRowResult实例之后,调用addToFacts:

final AddToFactsResult addToFactsResult = addToFacts(
  row,
  incrementalIndexResult.getIncrementalIndexRow(),
  in,
  rowSupplier,
  skipMaxRowsInMemoryCheck
);

在OnheapIncrementalIndex中,有

protected AddToFacts(
  InputRow row,
  IncrementalIndexRow key,
  ThreadLocal<InputRow> rowContainer,
  Supplier<InputRow> rowSupplier,
  boolean skipMaxRowsInMemoryCheck
)

从AggregatorFactory产出Aggregator

在addToFacts中,首先对metrics(类型是AggregatorFactory数组)产出Aggregator数组:

Aggregator[] aggs;

aggs = new Aggregator[metrics.length];

factorizeAggs(metrics, aggs, rowContainer, row);

对每个AggregatorFactory调用factorize产出Aggregator。比如对CountAggregatorFactory产出CountAggregator,对HyperUniquesAggregatorFactory产出HyperUniquesAggregator

对产出的每个Aggregator调用aggregate方法计算当前的metric值。对于CountAggregator,它的aggregate方法定义如下:

public void aggregate() 
{
  ++count;
}

很简单也就对它的count成员加1;

对于HyperUniquesAggregator,它的aggregate方法如下:

public void aggregate() 
{
  Object object = selector.getObject();
  if (object == null) {
    return;
  }
  if (collector == null) {
    collector = HyperLogLogCollector.makeLatestCollector()l
  }
  collector.fold((HyperLogLogCollector) object);
}

这里首先调用selector的getObject()方法,selector的类型实际上是IncrementalIndex中的makeColumnSelectorFactory方法返回的IncrementalIndexInputRowColumnSelectorFactory实例中makeColumnValueSelector方法返回的ColumnValueSelector实例。它的getObject()方法调用的是ComplexMetricExtractor的extractValue方法:

public Object getObject() 
{
 return extract.extractValue(in.get(), column, agg);
}

在HyperUniquesSerde的getExtractor返回的ComplexMetricExtractor实例的extractValue方法中,实际返回的事?一个HyperLogLogCollector实例:

public HyperLogLogCollector extractValue(InputRow inputRow, String metricName) 
{
 Object rawValue = inputRow.getRow(metricName);
 
 if (rawValue instanceOf HyperLogLogCollector) {
   return (HyperLogLogCollector) rawValue;
 } else {
   HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
   
   List<String> dimValues = inputRow.getDimension(metricName);
   if (dimValues == null) {
     return collector;
   }
   
   for (String dimensionValue : dimValues) {
     collector.add(hyperLogLogHash.hash(dimensionValue));
   }
   return collector;
 }
}

调用HyperLogLogCollector的fold方法,也就是将一行数据的HyperUniquesAggregator的HyperLogLogCollector合并到同一个HyperLogLogCollector对象上

接着获取当前的rowIndex:

final int rowIndex = indexIncrement.getAndIncrement();
private final AtomicInteger indexIncrement = new AtomicInteger(0);

这里的rowIndex代表的值是注入的实际行数,而不是最终在segment中存储的行数

最后将调用了aggregate方法之后的所有metrics对应的Aggregator设置到IncrementalIndex的成员aggregators上:

concurrentSet(rowIndex, aggs);
protected void concurrentSet(int offset, Aggregator[] value) 
{
  return aggregators.put(offset, value);
}
private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new ConcurrentMap<>();

也就是aggregators中每条记录就是注入的一行数据中的所有metric的Aggregator

注入了多少行,就有多少个Aggregator数组

FactsHolder的facts成员填充

facts成员在RollupFactsHolder中的声明如下:

private final ConcurrentMap<IncrementalIndexRow, IncrementalIndexRow> facts

接下来获取当前IncrementalIndexRow在FactsHolder的facts成员中的索引:

final int prev = facts.putIfAbsent(key, rowIndex);
public int putIfAbsent(IncrementalIndexRow key, int rowIndex) 
{
  key.setRowIndex(rowIndex);
  IncrementalIndexRow prev = facts.putIfAbsent(key, key);
  return prev == null ? IncrementalIndexRow.EMPTY_ROW_INDEX : prev.getRowIndex();
}

可见如果当前IncrementalIndexRow(也可认为是当前行)在facts中不存在时返回-1,存在则返回它在facts中的索引。

为什么新的一行数据会在facts中存在呢?例如有两行数据:”2019-01-01T12:30:30,male,beijing”和’2019-01-01T21:24:10,male,beijing’,如果queryGranularity为day,则在facts中这两行数据可以作为同一行数据存储。但是在aggregator中会有2组Aggregator数组

如果putIfAbsent返回-1,则证明是新的一行数据(没有预聚合上),这是numEntries加1。*numEntries中保存的是segment中实际存储的行数*。

如果putIfAbsent没有返回-1,则证明预聚合生效了,也就是之前已经有相同的一行数据(timestamp根据queryGranularity做了截断)。这时先将和当前行相同的行(说行不准确,应该是IncrementalIndexRow)的自然索引也就是prev值对应的Aggregator数据取出来:

aggs = concurrentGet(prev);
protected Aggregator[] concurrentGet(int offset) 
{
  return aggregators.get(offset);
}

然后将当前行的每个Aggregator聚合到前一步取出的Aggregator上,这一步是真正实现了预聚合

parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);

最后从aggregators中删除当前行的Aggregator数组(因为已经预聚合完毕了,这一行的Aggregator就是多余的,需要删掉):

concurrentRemove(rowIndex);
protected void concurrentRemove(int offset) 
{
  aggregators.remove(offset);
} 

最后从addToFacts方法返回一个AddToFactsResult实例:

return new AddToFactsResult(numEntries.get(), siteInBytes.get(), parseExceptionMessages);

这里的numEntries是当前IncrementalIndex中实际存在的行数

最后在addToFacts的调用方法,也就是IncrementalIndex的add方法中,返回一个IncrementalIndexAddResult实例:

return new IncrementalIndexAddResult(
  addToFactsResult.getRowCount(),
  addToFactsResult.getBytesInMemory(),
  parseException
);

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理