IncrementalIndex
IncrementalIndex类中有两个重要的成员,分别是metricDescs
和dimensionDescs
:
private final Map<String, MetricDesc> metricDescs;
private final Map<String, DimensionDesc> dimensionDescs;
metricDescs和dimensionDescs在IncrementalIndex的构造函数中被初始化。
MetricDesc
每个MetricDesc中有几个重要的成员:
private final int index; // metric序号
private final String name; // metric名字
private final String type: // metric类型
private final ColumnCapabilitiesImpl capabilities // metric能力
MetricDesc的构造函数:
public MetricDesc(int index, AggregatorFactory factory)
{
this.index = index;
this.name = factory.getName();
String typeInfo = factory.getTypeName();
this.capabilities = new ColumnCapabilitiesImpl();
if ("float".equalsIgnoreCase(typeInfo)) {
capabilities.setType(ValueType.FLOAT);
this.type = typeInfo;
} else if ("long".equalsIgnoreCase(typeInfo)) {
capabilities.setType(ValueType.LONG);
this.type = typeInfo;
} else if ("double".equalsIgnoreCase(typeInfo)) {
capabilities.setType(ValueType.DOUBLE);
this.type = typeInfo;
} else {
capabilities.setType(ValueType.COMPLEX);
this.type = ComplexMetrics.getSerdeForType(typeInfo).getTypeName();
}
}
每个AggregatorFactory的实例都有一个名字,通过getTypeName()方法获取。比如CountAggregatorFactory的getTypeName()方法返回”long”,HyperUniquesAggregatorFactory的getTypeName()方法返回”hyperUnique”。
如果对AggregatorFactory调用getTypeName()返回的名字不是”float”、”long”、”double”之一,name这个AggregatorFactory的类型是复杂类型,比如HyperUniquesAggregatorFactory。
在IncrementalIndex中通过如下代码构造每个metric的MetricDesc和MetricDescs:
for (AggregatorFactory metric : metrics) {
MetricDesc metricDesc = new MetricDesc(metricDesc.size(), metric);
metricDescs.put(metricDesc.getName(), metricDesc);
}
DimensionDesc
每个DimensionDesc中有几个重要成员:
private final int index; // dimension序号
private final String name; // dimnesion名字
private final ColumnCapabilitiesImpl capabilities // dimension能力
private final DimensionHandler handler;
private final DimensionIndexer indexer;
DimensionHandler
DimensionHandler对象封装了特定于某一个dimension的索引,列合并/创建,以及查询操作。这些操作由通过DimensionHandler方法创建的对象(DimensionIndexer通过makeIndexer创建,DimensionMerger通过makeMerger创建,DimensionColumnReader)handle。每个DimensionHandler对象都特定于一个单独的dimension。
DimensionIndexer
每个dimension对应一个DimensionIndexer,用于在内存中处理注入的行。
ColumnCapabilitiesImpl
在IncrementalIndex的构造函数中定义了每个dimension的capalibities:
ColumnCapabilitiesImpl capabilities = makeCapabilitiesFromValueType(type);
private ColumnCapabilitiesImpl makeCapabilitiesFromValueType(ValueType type)
{
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setDictionaryEncoded(type == ValueType.STRING);
capabilities.setHasBitmapIndexes(type == ValueType.STRING);
capabilities.setType(type);
return capabilities
}
可见只有string类型的dimension才支持字典编码
和位图索引
。
设置是否支持位图索引:
capabilities.setHasBitmapIndexes(dimSchema.hasBitmapIndex());
只有string类型的dimension才支持字典编码。
根据不同的capabilities生成不同的DimensionHandler:
DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(
dimName,
capabilities,
dimSchema.getMultiValueHanding()
);
public static DimensionHandler getHandlerFromCapabilities(
String dimensionName,
ColumnCapabilities capabilities,
MultiValueHandling multiValueHandling
)
{
if (capabilities == null) {
return new StringDimensionHandler(dimensionName, multiValueHandling, true);
}
multiValueHandling = multiValueHandling == null ? MultiValueHandling.ofDefault() : multiValueHanding;
if (capabilities.getType() == ValueType.STRING) {
if (!capabilities.isDictionaryEncoded()) {
throw new IAE("String column must have dictionary encoding.");
}
}
if (capabilities.getType() == ValueType.LONG) {
return new LongDimensionHandler(dimensionName);
}
if (capabilities.getType() == ValueType.FLOAT) {
return new FloatDimensionHandler(dimensionName);
}
if (capabilities.getType() == ValueType.DOUBLE) {
return new DoubleDimensionHandler(dimensionName);
}
// Return a StringDimensionHandler by default (null columns will be treated as String typed)
return new StringDimensionHandler(dimensionName, multiValueHandling, true);
}
向IncrementalIndex中写入一行数据
解析出的一行数据(这里认为一行数据的实际类型为MapBasedInputRow
)最终会调用IncrementalIndex的toIncrementalIndexRow(InputRow row)方法向IncrementalIndex中加入一条数据。
对于一行数据中的某一列的值,会调用:
Object dimsKey = null;
dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent(
row.getRow(dimension),
true
);
这里row.getRow(dimension)就是解析出特定的一行数据中,dimension名字对应的的值。
indexer是在DimensionHandler中类型为DimensionIndexer的成员。这里我们只考虑String类型的Dimension,因此这里indexer的实例类型是StringDimensionIndexer。
下面我们来看一下StringDimensionIndexer的processRowValsToUnsortedEncodedKeyComponent方法。
在StringDimensionIndexer中有一个重要的内部类DimensionDictionary
。其中有两个重要的成员:
private final Object2IntMap<String> valueToId = new Object2IntOpenHashMap<>();
private final List<String> idToValue = new ArrayList<>();
valueToId
存储了值到id的对应关系。idToValue
存储了id到值的对应关系,id就是List的下标。
processRowValsToUnsortedEncodedKeyComponent方法
在processRowValsToUnsortedEncodedKeyComponent方法中:
如果传入的dimension的值是null的话,则会调用DimensionDictionary的getId方法:
final int nullId = dimLookup.getId(null);
这里如果是第一次遇到null值,则返回-1。
然后返回dimension的值编码后的值(这里是index、序号,比如第1行和第50行的数据可能返回的都是7):
encodedDimensionValues = nullId == ABSENT_VALUE_ID ? new int[]{dimLookup.add(null)} : new int[]{nullId};
如果nullId是-1(首次遇到特定dimension值为null的情况),这时调用DimensionDictionary的add方法将null值加入idToValue
这个List中,设置idForNull为idToValue.size()并返回这个的id(idForNull是null值在idToValue中的下标或索引);如果nullId不为-1,则说明不是首次遇到特定dimension值为null的情况,这时直接返回nullId(也是idForNull的值)。
传入的dimension的值是个List,这种情况我们先不做分析,只考虑单值的情况。
传入的dimension的值为单值,则调用DimensionDictionary的add方法:
encodedDimensionValues = new int[]{dimLoojup.add(emptyToNullIfNeeded(dimValues))};
在add方法中,首先看valueToId
中有没有这个值,如果有的话,直接返回这个值对应的id,如果没有,则调用idToValue.size()设置这个值在idToValue中的索引,然后将这个值和对应的索引写入valueToId,并把这个值加入到idToValue中:
final int index = idToValue.size();
valueToId.put(originalValue, index);
idToValue.add(originalValue);
然后设置特定dimension当前的minValue和maxValue,最后返回index:
minValue = minValue == null || minValue.compareTo(originalValue) > 0 ? originalValue : minValue;
maxValue = maxValue == null || maxValue.compareTo(originalValue) < 0 ? originalValue : maxValue;
return index;
processRowValsToUnsortedEncodedKeyComponent最终返回的是当前行的特定列的值在valueToId中的id,也就是在idToValue中的索引。
需要记住的是,每个dimension对应一个DimensionDesc,每个DimensionDesc中有一个DimensionIndexer,每个DimensionIndexer中 有一个DimensionDictionary,每个DimensionDictionary中有一个valueToId和一个IdToValue。
这里给个例子,如果当前有10行数据,它们的维度dim列的值为’a’,’b’,’c’,’b’,’d’,’e’,’a’,’a’,’b’,’f’,那么在这10列数据都调用processRowValsToUnsortedEncodedKeyComponent之后,idToValue中的值为[a, b, c, d, e, f],valueToId中的值为{‘a’->0, ‘b’->1, ‘c’->2, ‘d’->3, ‘e’->4, ‘f’->5},processRowValsToUnsortedEncodedKeyComponent返回值分别为{0},{1},{2},{1},{3},{4},{0},{0},{1},{5}。
回到toIncrementalIndexRow方法,对这一行数据的每个dimension都调用processRowValsToUnsortedEncodedKeyComponent返回一个index数组(单值的话数组中只有一个元素),单后设置dims的值:
Object[] dims;
dims[desc.getIndex()] = dimsKey;
然后构造一个incrementalIndexRow实例:
IncrementalIndexRow incrementalIndexRow = IncrementalIndexRow.createTimeAndDimswithDimsKeySize(
Math.max(truncated, minTimestamp),
dims,
dimensionDescList,
dimsKeySize
);
其中truncated是根据注入spec中的granularitySpec中指定的queryGranularity的值截断的时间戳。例如一行数据中的time字段的值为2019-08-14T17:55:34,如果queryGranularity是NONE,则不截断,如果为minute,则截断为2019-08-14T17:55:00,如果为day,则截断为2019-08-14T00:00:00。
minTimestamp是当前segment起始的timestamp。
最后返回一个IncrementIndexRowResult实例:
return new IncrementalIndexRowResult(incrementalIndexRow, parseExceptionMessages);
FactsHolder
每个IncrementalIndex都有一个FactsHolder类型的成员,这里我们假设在注入的spec中的granularitySpec中指定了rollup为true(默认就为true),则这里的FactsHolder实际类型为RollupFactsHolder。
上面生成了IncrementalIndexRowResult实例之后,调用addToFacts:
final AddToFactsResult addToFactsResult = addToFacts(
row,
incrementalIndexResult.getIncrementalIndexRow(),
in,
rowSupplier,
skipMaxRowsInMemoryCheck
);
在OnheapIncrementalIndex中,有
protected AddToFacts(
InputRow row,
IncrementalIndexRow key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier,
boolean skipMaxRowsInMemoryCheck
)
从AggregatorFactory产出Aggregator
在addToFacts中,首先对metrics(类型是AggregatorFactory数组)产出Aggregator数组:
Aggregator[] aggs;
aggs = new Aggregator[metrics.length];
factorizeAggs(metrics, aggs, rowContainer, row);
对每个AggregatorFactory调用factorize产出Aggregator。比如对CountAggregatorFactory
产出CountAggregator
,对HyperUniquesAggregatorFactory
产出HyperUniquesAggregator
。
对产出的每个Aggregator调用aggregate方法计算当前的metric值。对于CountAggregator,它的aggregate方法定义如下:
public void aggregate()
{
++count;
}
很简单也就对它的count成员加1;
对于HyperUniquesAggregator,它的aggregate方法如下:
public void aggregate()
{
Object object = selector.getObject();
if (object == null) {
return;
}
if (collector == null) {
collector = HyperLogLogCollector.makeLatestCollector()l
}
collector.fold((HyperLogLogCollector) object);
}
这里首先调用selector的getObject()方法,selector的类型实际上是IncrementalIndex中的makeColumnSelectorFactory方法返回的IncrementalIndexInputRowColumnSelectorFactory实例中makeColumnValueSelector方法返回的ColumnValueSelector实例。它的getObject()方法调用的是ComplexMetricExtractor的extractValue方法:
public Object getObject() { return extract.extractValue(in.get(), column, agg); }
在HyperUniquesSerde的getExtractor返回的ComplexMetricExtractor实例的extractValue方法中,实际返回的事?一个HyperLogLogCollector实例:
public HyperLogLogCollector extractValue(InputRow inputRow, String metricName) { Object rawValue = inputRow.getRow(metricName); if (rawValue instanceOf HyperLogLogCollector) { return (HyperLogLogCollector) rawValue; } else { HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); List<String> dimValues = inputRow.getDimension(metricName); if (dimValues == null) { return collector; } for (String dimensionValue : dimValues) { collector.add(hyperLogLogHash.hash(dimensionValue)); } return collector; } }
调用HyperLogLogCollector的fold方法,也就是将一行数据的HyperUniquesAggregator的HyperLogLogCollector合并到同一个HyperLogLogCollector对象上。
接着获取当前的rowIndex:
final int rowIndex = indexIncrement.getAndIncrement();
private final AtomicInteger indexIncrement = new AtomicInteger(0);
这里的rowIndex代表的值是注入的实际行数,而不是最终在segment中存储的行数。
最后将调用了aggregate方法之后的所有metrics对应的Aggregator设置到IncrementalIndex的成员aggregators上:
concurrentSet(rowIndex, aggs);
protected void concurrentSet(int offset, Aggregator[] value)
{
return aggregators.put(offset, value);
}
private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new ConcurrentMap<>();
也就是aggregators中每条记录就是注入的一行数据中的所有metric的Aggregator。
注入了多少行,就有多少个Aggregator数组。
FactsHolder的facts成员填充
facts成员在RollupFactsHolder中的声明如下:
private final ConcurrentMap<IncrementalIndexRow, IncrementalIndexRow> facts
接下来获取当前IncrementalIndexRow在FactsHolder的facts成员中的索引:
final int prev = facts.putIfAbsent(key, rowIndex);
public int putIfAbsent(IncrementalIndexRow key, int rowIndex)
{
key.setRowIndex(rowIndex);
IncrementalIndexRow prev = facts.putIfAbsent(key, key);
return prev == null ? IncrementalIndexRow.EMPTY_ROW_INDEX : prev.getRowIndex();
}
可见如果当前IncrementalIndexRow(也可认为是当前行)在facts中不存在时返回-1,存在则返回它在facts中的索引。
为什么新的一行数据会在facts中存在呢?例如有两行数据:”2019-01-01T12:30:30,male,beijing”和’2019-01-01T21:24:10,male,beijing’,如果queryGranularity为day,则在facts中这两行数据可以作为同一行数据存储。但是在aggregator中会有2组Aggregator数组。
如果putIfAbsent返回-1,则证明是新的一行数据(没有预聚合上),这是numEntries加1。*numEntries中保存的是segment中实际存储的行数*。
如果putIfAbsent没有返回-1,则证明预聚合生效了,也就是之前已经有相同的一行数据(timestamp根据queryGranularity做了截断)。这时先将和当前行相同的行(说行不准确,应该是IncrementalIndexRow)的自然索引也就是prev值对应的Aggregator数据取出来:
aggs = concurrentGet(prev);
protected Aggregator[] concurrentGet(int offset)
{
return aggregators.get(offset);
}
然后将当前行的每个Aggregator聚合到前一步取出的Aggregator上,这一步是真正实现了预聚合:
parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
最后从aggregators中删除当前行的Aggregator数组(因为已经预聚合完毕了,这一行的Aggregator就是多余的,需要删掉):
concurrentRemove(rowIndex);
protected void concurrentRemove(int offset)
{
aggregators.remove(offset);
}
最后从addToFacts方法返回一个AddToFactsResult实例:
return new AddToFactsResult(numEntries.get(), siteInBytes.get(), parseExceptionMessages);
这里的numEntries是当前IncrementalIndex中实际存在的行数。
最后在addToFacts的调用方法,也就是IncrementalIndex的add方法中,返回一个IncrementalIndexAddResult实例:
return new IncrementalIndexAddResult(
addToFactsResult.getRowCount(),
addToFactsResult.getBytesInMemory(),
parseException
);
发表回复