简介
几个月前,咱们介绍了 Informer 这个模型,相干论文 (Zhou, Haoyi, et al., 2021) 是一篇取得了 AAAI 2021 最佳论文奖的工夫序列论文。咱们也展现了一个应用 Informer 进行多变量概率预测的例子。在本文中,咱们探讨以下问题: Transformer 模型对工夫序列预测真的无效吗?咱们给出的答案是,它们真的无效。
首先,咱们将会提供一些试验证据,展现其真正的有效性。咱们的比照试验将表明,DLinear 这个简略线性模型并没有像说的那样比 transformer 好。当咱们在等同模型大小和雷同设定的状况下比照时,咱们发现基于 transformer 的模型在咱们关注的测试规范上体现得更好。其次,咱们将会介绍 Autoformer 模型,相干论文 (Wu, Haixu, et al., 2021) 在 Informer 模型问世后发表在 NeurIPS 2021 上。Autoformer 的模型当初曾经能够在 🤗 Transformers 中 应用。最初,咱们还会探讨 DLinear 模型,该模型是一个简略的前向网络,应用了 Autoformer 中的合成层 (decomposition layer)。DLinear 模型是在 Are Transformers Effective for Time Series Forecasting? 这篇论文中提出的,文中宣称其性能在工夫序列预测畛域超过了 transformer 系列的算法。
上面咱们开始!
评估 Transformer 系列模型 和 DLinear 模型
在 AAAI 2023 的论文 Are Transformers Effective for Time Series Forecasting? 中,作者宣称 transformer 系列模型在工夫序列预测方面并不无效。他们拿基于 transformer 的模型与一个简略的线性模型 DLinear 作比照。DLinear 应用了 Autoformer 中的 decomposition layer 构造 (下文将会介绍),作者宣称其性能超过了基于 transformer 的模型。但事实真的是这样吗?咱们接下来看看。
Dataset | Autoformer (uni.) MASE | DLinear MASE |
---|---|---|
Traffic |
0.910 | 0.965 |
Exchange-Rate |
1.087 | 1.690 |
Electricity |
0.751 | 0.831 |
上表展现了 Autoformer 和 DLinear 在三个论文中用到的数据集上的体现。后果阐明 Autoformer 在三个数据集上体现都超过了 DLinear 模型。
接下来,咱们将介绍 Autoformer 和 DLinear 模型,演示咱们如何在上表 Traffic 数据集上比照它们的性能,并为后果提供一些可解释性。
先说论断: 一个简略的线性模型可能在某些特定状况下更有劣势,但可能无奈像 transformer 之类的简单模型那样解决协方差信息。
Autoformer 具体介绍
Autoformer 基于传统的工夫序列办法: 把工夫序列合成为季节性 (seasonality) 以及趋势 – 周期 (trend-cycle) 这些因素。这通过退出合成层 (Decomposition Layer ) 来实现,以此来加强模型获取这些信息的能力。此外,Autoformer 中还独创了自相干 (auto-correlation) 机制,替换掉了传统 transformer 中的自注意力 (self-attention)。该机制使得模型能够利用注意力机制中周期性的依赖,晋升了总体性能。
上面,咱们将深入探讨 Autoformer 的这两大次要奉献: 合成层 (Decomposition Layer ) 和自相干机制 (Autocorrelation Mechanism )。相干代码也会提供进去。
合成层
合成是一个工夫序列畛域非常罕用的办法,但在 Autoformer 以前都没有被密集集成入深度学习模型中。咱们先简略介绍这一概念,随后会应用 PyTorch 代码演示这一思路是如何利用到 Autoformer 中的。
工夫序列合成
在工夫序列剖析中,合成 (decomposition) 是把一个工夫序列拆分成三个系统性因素的办法: 趋势周期 (trend-cycle)、季节性变动 (seasonal variation) 和随机稳定 (random fluctuations)。趋势因素代表了工夫序列的长期走势方向; 节令因素反映了一些重复呈现的模式,例如以一年或一季度为周期呈现的模式; 而随机 (无规律) 因素则反映了数据中无奈被上述两种因素解释的随机噪声。
有两种支流的合成办法: 加法合成和乘法合成,这在 statsmodels 这个库里都有实现。通过合成工夫序列到这三个因素,咱们能更好地了解和建模数据中潜在的模式。
但怎么把合成集成进 transformer 构造呢?咱们能够参考参考 Autoformer 的做法。
Autoformer 中的合成
Autoformer 构造 (来自论文) |
Autoformer 把合成作为一个外部计算操作集成到模型中,如上图所示。能够看到,编码器和解码器都应用了合成模块来汇合 trend-cyclical 信息,并从序列中渐进地提取 seasonal 信息。这种外部合成的概念曾经从 Autoformer 中展现了其有效性。所以很多其它的工夫序列论文也开始采纳这一办法,例如 FEDformer (Zhou, Tian, et al., ICML 2022) 和 DLinear (Zeng, Ailing, et al., AAAI 2023),这更阐明了其在工夫序列建模中的意义。
当初,咱们正式地给合成层做出定义:
对一个长度为 $L$ 的序列 $\mathcal{X} \in \mathbb{R}^{L \times d}$,合成层返回的 $\mathcal{X}_\textrm{trend} 和 \mathcal{X}_\textrm{seasonal}$ 定义如下:
$$
\mathcal{X}_\textrm{trend} = \textrm{AvgPool(Padding(} \mathcal{X} \textrm{))} \\
\mathcal{X}_\textrm{seasonal} = \mathcal{X} – \mathcal{X}_\textrm{trend}
$$
对应的 PyTorch 代码实现是:
import torch
from torch import nn
class DecompositionLayer(nn.Module):
"""Returns the trend and the seasonal parts of the time series."""
def __init__(self, kernel_size):
super().__init__()
self.kernel_size = kernel_size
self.avg = nn.AvgPool1d(kernel_size=kernel_size, stride=1, padding=0) # moving average
def forward(self, x):
"""Input shape: Batch x Time x EMBED_DIM"""
# padding on the both ends of time series
num_of_pads = (self.kernel_size - 1) // 2
front = x[:, 0:1, :].repeat(1, num_of_pads, 1)
end = x[:, -1:, :].repeat(1, num_of_pads, 1)
x_padded = torch.cat([front, x, end], dim=1)
# calculate the trend and seasonal part of the series
x_trend = self.avg(x_padded.permute(0, 2, 1)).permute(0, 2, 1)
x_seasonal = x - x_trend
return x_seasonal, x_trend
可见,代码非常简单,能够很不便地用在其它模型中,正如 DLinear 那样。上面,咱们解说第二个翻新点: 注意力 (自相干) 机制 。
注意力 (自相干) 机制
最原始的注意力机制和自相干机制 (图片来自论文) |
除了合成层之外,Autoformer 还应用了一个原创的自相干 (autocorrelation) 机制,能够完满替换自注意力 (self-attention) 机制。在 最原始的工夫序列 transformer 模型 中,注意力权重是在时域计算并逐点聚合的。而从上图中能够看出,Autoformer 不同的是它在频域计算这些 (应用 疾速傅立叶变换),而后通过时延聚合它们。
接下来局部,咱们深刻细节,并应用代码作出解说。
时域的注意力机制
借助 FFT 在频域计算注意力权重 (图片来自论文) |
实践上讲,给定一个时间延迟 $\tau$,一个离散变量的 自相关性 $y$ 能够用来掂量这个变量以后时刻 $t$ 的值和过来时刻 $t-\tau$ 的值之间的“关系”(皮尔逊相关性,pearson correlation):
$$
\textrm{Autocorrelation}(\tau) = \textrm{Corr}(y_t, y_{t-\tau})
$$
应用自相关性,Autoformer 提取了 query 和 key 之间基于频域的相互依赖,而不是像之前那样两两之间的点乘。能够把这个操作看成是自注意力中 $QK^T$ 的替换。
实际操作中,query 和 key 之间的自相干是通过 FFT 一次性针对 所有时间延迟 计算出来的。通过这种办法,自相干机制达到了 $O(L \log L)$ 的工夫复杂度 ($L$ 是输出工夫长度),这个速度和 Informer 的 ProbSparse attention 靠近。值得一提的是,应用 FFT 计算自相关性的实践根底是 Wiener–Khinchin theorem,这里咱们不细讲了。
当初,咱们来看看相应的 PyTorch 代码:
import torch
def autocorrelation(query_states, key_states):
"""
Computes autocorrelation(Q,K) using `torch.fft`.
Think about it as a replacement for the QK^T in the self-attention.
Assumption: states are resized to same shape of [batch_size, time_length, embedding_dim].
"""
query_states_fft = torch.fft.rfft(query_states, dim=1)
key_states_fft = torch.fft.rfft(key_states, dim=1)
attn_weights = query_states_fft * torch.conj(key_states_fft)
attn_weights = torch.fft.irfft(attn_weights, dim=1)
return attn_weights
代码十分简洁!😎 请留神这只是 autocorrelation(Q,K)
的局部实现,残缺实现请参考 🤗 Transformers 中的代码。
接下来,咱们将看到如何应用时延值聚合咱们的 attn_weights
,这个过程被称为时延聚合 (Time Delay Aggregation )。
时延聚合
通过时延来聚合,图片来自 Autoformer 论文 |
咱们用 $\mathcal{R_{Q,K}}$ 来示意自相干 (即 attn_weights
)。那么问题是: 咱们应该如何聚合这些 $\mathcal{R_{Q,K}}(\tau_1), \mathcal{R_{Q,K}}(\tau_2), …, \mathcal{R_{Q,K}}(\tau_k)$ 到 $\mathcal{V}$ 下面?在规范的自注意力机制中,这种聚合通过点乘实现。但在 Autoformer 中,咱们应用了一种不同的办法。首先咱们在时延 $\tau_1, \tau_2, … \tau_k$ 上对齐 $\mathcal{V}$,计算在这些时延下它对应的值,这个操作叫作 Rolling。接下来,咱们将对齐的 $\mathcal{V}$ 和自相干的值进行逐点的乘法运算。在上图中,你能够看到在右边是基于时延对 $\mathcal{V}$ 进行的 Rolling 操作; 而左边就展现了与自相干进行的逐点乘法。
整个过程能够用以下公式总结:
$$
\tau_1, \tau_2, … \tau_k = \textrm{arg Top-k}(\mathcal{R_{Q,K}}(\tau)) \
\hat{\mathcal{R}}\mathcal{_{Q,K}}(\tau _1), \hat{\mathcal{R}}\mathcal{_ {Q,K}}(\tau _2), …, \hat{\mathcal{R}}\mathcal{_ {Q,K}}(\tau _k) = \textrm{Softmax}(\mathcal{R_ {Q,K}}(\tau _1), \mathcal{R_ {Q,K}}(\tau_2), …, \mathcal{R_ {Q,K}}(\tau_k)) \
\textrm{Autocorrelation-Attention} = \sum_{i=1}^k \textrm{Roll}(\mathcal{V}, \tau_i) \cdot \hat{\mathcal{R}}\mathcal{_{Q,K}}(\tau _i)
$$
就是这样!须要留神的是,$k$ 是一个超参数,咱们称之为 autocorrelation_factor
(相似于 Informer 里的 sampling_factor
) ; 而 softmax 是在乘法操作之前使用到自相干下面的。
当初,咱们曾经能够看看最终的代码了:
import torch
import math
def time_delay_aggregation(attn_weights, value_states, autocorrelation_factor=2):
"""
Computes aggregation as value_states.roll(delay)* top_k_autocorrelations(delay).
The final result is the autocorrelation-attention output.
Think about it as a replacement of the dot-product between attn_weights and value states.
The autocorrelation_factor is used to find top k autocorrelations delays.
Assumption: value_states and attn_weights shape: [batch_size, time_length, embedding_dim]
"""
bsz, num_heads, tgt_len, channel = ...
time_length = value_states.size(1)
autocorrelations = attn_weights.view(bsz, num_heads, tgt_len, channel)
# find top k autocorrelations delays
top_k = int(autocorrelation_factor * math.log(time_length))
autocorrelations_mean = torch.mean(autocorrelations, dim=(1, -1)) # bsz x tgt_len
top_k_autocorrelations, top_k_delays = torch.topk(autocorrelations_mean, top_k, dim=1)
# apply softmax on the channel dim
top_k_autocorrelations = torch.softmax(top_k_autocorrelations, dim=-1) # bsz x top_k
# compute aggregation: value_states.roll(delay)* top_k_autocorrelations(delay)
delays_agg = torch.zeros_like(value_states).float() # bsz x time_length x channel
for i in range(top_k):
value_states_roll_delay = value_states.roll(shifts=-int(top_k_delays[i]), dims=1)
top_k_at_delay = top_k_autocorrelations[:, i]
# aggregation
top_k_resized = top_k_at_delay.view(-1, 1, 1).repeat(num_heads, tgt_len, channel)
delays_agg += value_states_roll_delay * top_k_resized
attn_output = delays_agg.contiguous()
return attn_output
实现!Autoformer 模型当初曾经能够在 🤗 Transformers 中 应用 了,名字就叫 AutoformerModel
。
针对这个模型,咱们要比照单变量 transformer 模型与 DLinear 的性能,DLinear 实质也是单变量的。前面咱们也会展现两个多变量 transformer 模型的性能 (在同一数据上训练的)。
DLinear 具体介绍
实际上,DLinear 构造非常简单,仅仅是从 Autoformer 的 DecompositionLayer
上连贯全连贯层。它应用 DecompositionLayer
来合成输出的世界序列到残差局部 (季节性) 和趋势局部。前向过程中,每个局部都被输出到各自的线性层,并被映射成 prediction_length
长度的输入。最终的输入就是两个输出的和:
def forward(self, context):
seasonal, trend = self.decomposition(context)
seasonal_output = self.linear_seasonal(seasonal)
trend_output = self.linear_trend(trend)
return seasonal_output + trend_output
在这种设定下,首先咱们把输出的序列映射成 prediction-length * hidden
维度 (通过 linear_seasonal
和 linear_trend
两个层 ) ; 失去的后果会被相加起来,并转换为 (prediction_length, hidden)
形态; 最初,维度为 hidden
的隐性表征会被映射到某种散布的参数上。
在咱们的测评中,咱们应用 GluonTS 中 DLinear 的实现。
示例: Traffic 数据集
咱们心愿用试验后果展现库中基于 transformer 模型的性能,这里咱们应用 Traffic 数据集,该数据集有 862 条工夫序列数据。咱们将在每条工夫序列上训练一个共享的模型 (单变量设定)。每个工夫序列都代表了一个传感器的占有率值,值的范畴在 0 到 1 之间。上面的这些超参数咱们将在所有模型中保持一致。
# Traffic prediction_length is 24. Reference:
# https://github.com/awslabs/gluonts/blob/6605ab1278b6bf92d5e47343efcf0d22bc50b2ec/src/gluonts/dataset/repository/_lstnet.py#L105
prediction_length = 24
context_length = prediction_length*2
batch_size = 128
num_batches_per_epoch = 100
epochs = 50
scaling = "std"
应用的 transformer 模型都很小:
encoder_layers=2
decoder_layers=2
d_model=16
这里咱们不再解说如何用 Autoformer
训练模型,读者能够参考之前两篇博客 (TimeSeriesTransformer 和 Informer) 并替换模型为 Autoformer
、替换数据集为 traffic
。咱们也训练了现成的模型放在 HuggingFace Hub 上,稍后的评测将会应用这里的模型。
载入数据集
首先装置必要的库:
!pip install -q transformers datasets evaluate accelerate "gluonts[torch]" ujson tqdm
traffic
数据集 (Lai et al. (2017)) 蕴含了旧金山的交通数据。它蕴含 862 条以小时为工夫单位的工夫序列,代表了路线占有率的数值,其数值范畴为 $[0, 1]$,记录了旧金山湾区高速公路从 2015 年到 2016 年的数据。
from gluonts.dataset.repository.datasets import get_dataset
dataset = get_dataset("traffic")
freq = dataset.metadata.freq
prediction_length = dataset.metadata.prediction_length
咱们可视化一条工夫序列看看,并画出训练和测试集的划分:
import matplotlib.pyplot as plt
train_example = next(iter(dataset.train))
test_example = next(iter(dataset.test))
num_of_samples = 4*prediction_length
figure, axes = plt.subplots()
axes.plot(train_example["target"][-num_of_samples:], color="blue")
axes.plot(test_example["target"][-num_of_samples - prediction_length :],
color="red",
alpha=0.5,
)
plt.show()
定义训练和测试集划分:
train_dataset = dataset.train
test_dataset = dataset.test
定义数据变换
接下来,咱们定义数据的变换,尤其是工夫相干特色的制作 (基于数据集自身和一些普适做法)。
咱们定义一个 Chain
,代表 GluonTS 中一系列的变换 (这相似图像里 torchvision.transforms.Compose
)。这让咱们将一系列变换集成到一个解决流水线中。
上面代码中,每个变换都增加了正文,用以阐明它们的作用。从更高层次讲,咱们将遍历每一个工夫序列,并增加或删除一些特色:
from transformers import PretrainedConfig
from gluonts.time_feature import time_features_from_frequency_str
from gluonts.dataset.field_names import FieldName
from gluonts.transform import (
AddAgeFeature,
AddObservedValuesIndicator,
AddTimeFeatures,
AsNumpyArray,
Chain,
ExpectedNumInstanceSampler,
RemoveFields,
SelectFields,
SetField,
TestSplitSampler,
Transformation,
ValidationSplitSampler,
VstackFeatures,
RenameFields,
)
def create_transformation(freq: str, config: PretrainedConfig) -> Transformation:
# create a list of fields to remove later
remove_field_names = []
if config.num_static_real_features == 0:
remove_field_names.append(FieldName.FEAT_STATIC_REAL)
if config.num_dynamic_real_features == 0:
remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL)
if config.num_static_categorical_features == 0:
remove_field_names.append(FieldName.FEAT_STATIC_CAT)
return Chain(
# step 1: remove static/dynamic fields if not specified
[RemoveFields(field_names=remove_field_names)]
# step 2: convert the data to NumPy (potentially not needed)
+ (
[
AsNumpyArray(
field=FieldName.FEAT_STATIC_CAT,
expected_ndim=1,
dtype=int,
)
]
if config.num_static_categorical_features > 0
else [])
+ (
[
AsNumpyArray(
field=FieldName.FEAT_STATIC_REAL,
expected_ndim=1,
)
]
if config.num_static_real_features > 0
else [])
+ [
AsNumpyArray(
field=FieldName.TARGET,
# we expect an extra dim for the multivariate case:
expected_ndim=1 if config.input_size == 1 else 2,
),
# step 3: handle the NaN's by filling in the target with zero
# and return the mask (which is in the observed values)
# true for observed values, false for nan's
# the decoder uses this mask (no loss is incurred for unobserved values)
# see loss_weights inside the xxxForPrediction model
AddObservedValuesIndicator(
target_field=FieldName.TARGET,
output_field=FieldName.OBSERVED_VALUES,
),
# step 4: add temporal features based on freq of the dataset
# these serve as positional encodings
AddTimeFeatures(
start_field=FieldName.START,
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_TIME,
time_features=time_features_from_frequency_str(freq),
pred_length=config.prediction_length,
),
# step 5: add another temporal feature (just a single number)
# tells the model where in the life the value of the time series is
# sort of running counter
AddAgeFeature(
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_AGE,
pred_length=config.prediction_length,
log_scale=True,
),
# step 6: vertically stack all the temporal features into the key FEAT_TIME
VstackFeatures(
output_field=FieldName.FEAT_TIME,
input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE]
+ ([FieldName.FEAT_DYNAMIC_REAL]
if config.num_dynamic_real_features > 0
else []),
),
# step 7: rename to match HuggingFace names
RenameFields(
mapping={
FieldName.FEAT_STATIC_CAT: "static_categorical_features",
FieldName.FEAT_STATIC_REAL: "static_real_features",
FieldName.FEAT_TIME: "time_features",
FieldName.TARGET: "values",
FieldName.OBSERVED_VALUES: "observed_mask",
}
),
]
)
定义 InstanceSplitter
咱们须要创立一个 InstanceSplitter
,用来给训练、验证和测试集提供采样窗口,失去一段时间的内的工夫序列 (咱们不可能把残缺的整段数据输出给模型,毕竟工夫太长,而且也有内存限度)。
这个实例宰割工具每一次将会随机选取 context_length
长度的数据,以及紧随其后的 prediction_length
长度的窗口,并为相应的窗口标注 past_
或 future_
。这样能够保障 values
能被分为 past_values
和随后的 future_values
,各自作为编码器和解码器的输出。除了 values
,对于 time_series_fields
中的其它 key 对应的数据也是一样。
from gluonts.transform import InstanceSplitter
from gluonts.transform.sampler import InstanceSampler
from typing import Optional
def create_instance_splitter(
config: PretrainedConfig,
mode: str,
train_sampler: Optional[InstanceSampler] = None,
validation_sampler: Optional[InstanceSampler] = None,
) -> Transformation:
assert mode in ["train", "validation", "test"]
instance_sampler = {
"train": train_sampler
or ExpectedNumInstanceSampler(num_instances=1.0, min_future=config.prediction_length),
"validation": validation_sampler
or ValidationSplitSampler(min_future=config.prediction_length),
"test": TestSplitSampler(),}[mode]
return InstanceSplitter(
target_field="values",
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
instance_sampler=instance_sampler,
past_length=config.context_length + max(config.lags_sequence),
future_length=config.prediction_length,
time_series_fields=["time_features", "observed_mask"],
)
创立 PyTorch 的 DataLoader
接下来就该创立 PyTorch DataLoader 了: 这让咱们能把数据整顿成 batch 的模式,即 (input, output) 对的模式,或者说是 (past_values
, future_values
) 的模式。
from typing import Iterable
import torch
from gluonts.itertools import Cyclic, Cached
from gluonts.dataset.loader import as_stacked_batches
def create_train_dataloader(
config: PretrainedConfig,
freq,
data,
batch_size: int,
num_batches_per_epoch: int,
shuffle_buffer_length: Optional[int] = None,
cache_data: bool = True,
**kwargs,
) -> Iterable:
PREDICTION_INPUT_NAMES = [
"past_time_features",
"past_values",
"past_observed_mask",
"future_time_features",
]
if config.num_static_categorical_features > 0:
PREDICTION_INPUT_NAMES.append("static_categorical_features")
if config.num_static_real_features > 0:
PREDICTION_INPUT_NAMES.append("static_real_features")
TRAINING_INPUT_NAMES = PREDICTION_INPUT_NAMES + [
"future_values",
"future_observed_mask",
]
transformation = create_transformation(freq, config)
transformed_data = transformation.apply(data, is_train=True)
if cache_data:
transformed_data = Cached(transformed_data)
# we initialize a Training instance
instance_splitter = create_instance_splitter(config, "train")
# the instance splitter will sample a window of
# context length + lags + prediction length (from the 366 possible transformed time series)
# randomly from within the target time series and return an iterator.
stream = Cyclic(transformed_data).stream()
training_instances = instance_splitter.apply(stream, is_train=True)
return as_stacked_batches(
training_instances,
batch_size=batch_size,
shuffle_buffer_length=shuffle_buffer_length,
field_names=TRAINING_INPUT_NAMES,
output_type=torch.tensor,
num_batches_per_epoch=num_batches_per_epoch,
)
def create_test_dataloader(
config: PretrainedConfig,
freq,
data,
batch_size: int,
**kwargs,
):
PREDICTION_INPUT_NAMES = [
"past_time_features",
"past_values",
"past_observed_mask",
"future_time_features",
]
if config.num_static_categorical_features > 0:
PREDICTION_INPUT_NAMES.append("static_categorical_features")
if config.num_static_real_features > 0:
PREDICTION_INPUT_NAMES.append("static_real_features")
transformation = create_transformation(freq, config)
transformed_data = transformation.apply(data, is_train=False)
# we create a Test Instance splitter which will sample the very last
# context window seen during training only for the encoder.
instance_sampler = create_instance_splitter(config, "test")
# we apply the transformations in test mode
testing_instances = instance_sampler.apply(transformed_data, is_train=False)
return as_stacked_batches(
testing_instances,
batch_size=batch_size,
output_type=torch.tensor,
field_names=PREDICTION_INPUT_NAMES,
)
在 Autoformer 上评测
咱们曾经在这个数据集上预训练了一个 Autoformer 了,所以咱们能够间接拿来模型在测试集上测一下:
from transformers import AutoformerConfig, AutoformerForPrediction
config = AutoformerConfig.from_pretrained("kashif/autoformer-traffic-hourly")
model = AutoformerForPrediction.from_pretrained("kashif/autoformer-traffic-hourly")
test_dataloader = create_test_dataloader(
config=config,
freq=freq,
data=test_dataset,
batch_size=64,
)
在推理时,咱们应用模型的 generate()
办法来预测 prediction_length
步的将来数据,基于最近应用的对应工夫序列的窗口长度。
from accelerate import Accelerator
accelerator = Accelerator()
device = accelerator.device
model.to(device)
model.eval()
forecasts_ = []
for batch in test_dataloader:
outputs = model.generate(static_categorical_features=batch["static_categorical_features"].to(device)
if config.num_static_categorical_features > 0
else None,
static_real_features=batch["static_real_features"].to(device)
if config.num_static_real_features > 0
else None,
past_time_features=batch["past_time_features"].to(device),
past_values=batch["past_values"].to(device),
future_time_features=batch["future_time_features"].to(device),
past_observed_mask=batch["past_observed_mask"].to(device),
)
forecasts_.append(outputs.sequences.cpu().numpy())
模型输入的数据形态是 (batch_size
, number of samples
, prediction length
, input_size
)。
在上面这个例子中,咱们为预测接下来 24 小时的交通数据而失去了 100 条可能的数值,而 batch size 是 64:
forecasts_[0].shape
>>> (64, 100, 24)
咱们在垂直方向把它们重叠起来 (应用 numpy.vstack
函数 ),以此获取所有测试集工夫序列的预测: 咱们有 7
个滚动的窗口,所以有 7 * 862 = 6034
个预测。
import numpy as np
forecasts = np.vstack(forecasts_)
print(forecasts.shape)
>>> (6034, 100, 24)
咱们能够把预测后果和 ground truth 做个比照。为此,咱们应用 🤗 Evaluate 这个库,它外面蕴含了 MASE 的度量办法。
咱们对每个工夫序列用这一度量规范计算相应的值,并算出其平均值:
from tqdm.autonotebook import tqdm
from evaluate import load
from gluonts.time_feature import get_seasonality
mase_metric = load("evaluate-metric/mase")
forecast_median = np.median(forecasts, 1)
mase_metrics = []
for item_id, ts in enumerate(tqdm(test_dataset)):
training_data = ts["target"][:-prediction_length]
ground_truth = ts["target"][-prediction_length:]
mase = mase_metric.compute(predictions=forecast_median[item_id],
references=np.array(ground_truth),
training=np.array(training_data),
periodicity=get_seasonality(freq))
mase_metrics.append(mase["mase"])
所以 Autoformer 模型的后果是:
print(f"Autoformer univariate MASE: {np.mean(mase_metrics):.3f}")
>>> Autoformer univariate MASE: 0.910
咱们还能够画出任意工夫序列预测针对其 ground truth 的比照,这须要以下函数:
import matplotlib.dates as mdates
import pandas as pd
test_ds = list(test_dataset)
def plot(ts_index):
fig, ax = plt.subplots()
index = pd.period_range(start=test_ds[ts_index][FieldName.START],
periods=len(test_ds[ts_index][FieldName.TARGET]),
freq=test_ds[ts_index][FieldName.START].freq,
).to_timestamp()
ax.plot(index[-5*prediction_length:],
test_ds[ts_index]["target"][-5*prediction_length:],
label="actual",
)
plt.plot(index[-prediction_length:],
np.median(forecasts[ts_index], axis=0),
label="median",
)
plt.gcf().autofmt_xdate()
plt.legend(loc="best")
plt.show()
比方,测试集中第四个工夫序列的后果比照,画进去是这样:
plot(4)
在 DLinear 上评测
gluonts
提供了一种 DLinear 的实现,咱们将应用这个实现区训练、测评该算法:
from gluonts.torch.model.d_linear.estimator import DLinearEstimator
# Define the DLinear model with the same parameters as the Autoformer model
estimator = DLinearEstimator(
prediction_length=dataset.metadata.prediction_length,
context_length=dataset.metadata.prediction_length*2,
scaling=scaling,
hidden_dimension=2,
batch_size=batch_size,
num_batches_per_epoch=num_batches_per_epoch,
trainer_kwargs=dict(max_epochs=epochs)
)
训练模型:
predictor = estimator.train(
training_data=train_dataset,
cache_data=True,
shuffle_buffer_length=1024
)
>>> INFO:pytorch_lightning.callbacks.model_summary:
| Name | Type | Params
---------------------------------------
0 | model | DLinearModel | 4.7 K
---------------------------------------
4.7 K Trainable params
0 Non-trainable params
4.7 K Total params
0.019 Total estimated model params size (MB)
Training: 0it [00:00, ?it/s]
...
INFO:pytorch_lightning.utilities.rank_zero:Epoch 49, global step 5000: 'train_loss' was not in top 1
INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=50` reached.
在测试集上评测:
from gluonts.evaluation import make_evaluation_predictions, Evaluator
forecast_it, ts_it = make_evaluation_predictions(
dataset=dataset.test,
predictor=predictor,
)
d_linear_forecasts = list(forecast_it)
d_linear_tss = list(ts_it)
evaluator = Evaluator()
agg_metrics, _ = evaluator(iter(d_linear_tss), iter(d_linear_forecasts))
所以 DLinear 对应的后果是:
dlinear_mase = agg_metrics["MASE"]
print(f"DLinear MASE: {dlinear_mase:.3f}")
>>> DLinear MASE: 0.965
同样地,咱们画出预测后果与 ground truth 的对比曲线图:
def plot_gluonts(index):
plt.plot(d_linear_tss[index][-4 * dataset.metadata.prediction_length:].to_timestamp(), label="target")
d_linear_forecasts[index].plot(show_label=True, color='g')
plt.legend()
plt.gcf().autofmt_xdate()
plt.show()
plot_gluonts(4)
实际上,traffic
数据集在素日和周末会呈现传感器中模式的散布偏移。那咱们还应该怎么做呢?因为 DLinear 没有足够的能力去解决协方差信息,或者说是任何的日期工夫的特色,咱们给出的窗口大小无奈笼罩全面,使得让模型有足够信息去晓得以后是在预测素日数据还是周末数据。因而模型只会去预测更为普适的后果,这就导致其预测散布偏差素日数据,因此导致对周末数据的预测变得更差。当然,如果咱们给一个足够大的窗口,一个线性模型也能够辨认出周末的模式,但当咱们的数据中存在以月或以季度为单位的模式散布时,那就须要更大的窗口了。
总结
所以 transformer 模型和线性模型比照的论断是什么呢?不同模型在测试集上的 MASE 指标如下所示:
Dataset | Transformer (uni.) | Transformer (mv.) | Informer (uni.) | Informer (mv.) | Autoformer (uni.) | DLinear |
---|---|---|---|---|---|---|
Traffic |
0.876 | 1.046 | 0.924 | 1.131 | 0.910 | 0.965 |
能够看到,咱们去年引入的 最原始的 Transformer 模型 取得了最好的性能指标。其次,多变量模型个别都比对应的单变量模型更差,起因在于序列间的相关性关系个别都较难预测。额定增加的稳定通常会损坏预测后果,或者模型可能会学到一些谬误的相关性信息。最近的一些论文,如 CrossFormer (ICLR 23) 和 CARD 也在尝试解决这些 transformer 模型中的问题。
多变量模型通常在训练数据足够大的时候才会体现得好。但当咱们与单变量模型在小的公开数据集上比照时,通常单变量模型会体现得更好。绝对于线性模型,通常其相应尺寸的单变量 transformer 模型或其它神经网络类模型会体现得更好。
总结来讲,transformer 模型在工夫序列预测畛域,远没有达到要被淘汰的地步。
然而大规模训练数据对它微小后劲的开掘是至关重要的,这一点不像 CV 或 NLP 畛域,工夫序列预测不足大规模公开数据集。
以后绝大多数的工夫序列预训练模型也不过是在诸如 UCR & UEA 这样的大量样本上训练的。
即便这些基准数据集为工夫序列预测的倒退提高提供了基石,其较小的规模和泛化性的缺失使得大规模预训练依然面临诸多困难。
所以对于工夫序列预测畛域来讲,倒退大规模、强泛化性的数据集 (就像 CV 畛域的 ImageNet 一样) 是以后最重要的事件。这将会极大地促成工夫序列剖析畛域与训练模型的倒退钻研,晋升与训练模型在工夫序列预测方面的能力。
申明
咱们诚挚感激 Lysandre Debut 和 Pedro Cuenca 提供的粗浅见解和对本我的项目的帮忙。❤️
英文原文: https://hf.co/blog/autoformer
作者: Eli Simhayev, Kashif Rasul, Niels Rogge
译者: Hoi2022
审校 / 排版: zhongdongy (阿东)