关于人工智能:下篇-使用-🤗-Transformers-进行概率时间序列预测

在《应用 🤗 Transformers 进行概率工夫序列预测》的第一局部里,咱们为大家介绍了传统工夫序列预测和基于 Transformers 的办法,也一步步筹备好了训练所需的数据集并定义了环境、模型、转换和 InstanceSplitter。本篇内容将蕴含从数据加载器,到前向流传、训练、推理和展望未来倒退等精彩内容。

创立 PyTorch 数据加载器

有了数据,下一步须要创立 PyTorch DataLoaders。它容许咱们批量解决成对的 (输出, 输入) 数据,即 (past_values , future_values)。

from gluonts.itertools import Cyclic, IterableSlice, PseudoShuffled
from gluonts.torch.util import IterableDataset
from torch.utils.data import DataLoader

from typing import Iterable

def create_train_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    num_batches_per_epoch: int,
    shuffle_buffer_length: Optional[int] = None,
    **kwargs,
) -> Iterable:
    PREDICTION_INPUT_NAMES = [
        "static_categorical_features",
        "static_real_features",
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
        ]

    TRAINING_INPUT_NAMES = PREDICTION_INPUT_NAMES + [
        "future_values",
        "future_observed_mask",
        ]
    
    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=True)
    
    # we initialize a Training instance
    instance_splitter = create_instance_splitter(
        config, "train"
    ) + SelectFields(TRAINING_INPUT_NAMES)


    # the instance splitter will sample a window of 
    # context length + lags + prediction length (from the 366 possible transformed time series)
    # randomly from within the target time series and return an iterator.
    training_instances = instance_splitter.apply(
        Cyclic(transformed_data)
        if shuffle_buffer_length is None
        else PseudoShuffled(
            Cyclic(transformed_data), 
            shuffle_buffer_length=shuffle_buffer_length,
        )
    )

    # from the training instances iterator we now return a Dataloader which will 
    # continue to sample random windows for as long as it is called
    # to return batch_size of the appropriate tensors ready for training!
    return IterableSlice(
        iter(
            DataLoader(
                IterableDataset(training_instances),
                batch_size=batch_size,
                **kwargs,
            )
        ),
        num_batches_per_epoch,
    )
def create_test_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    **kwargs,
):
    PREDICTION_INPUT_NAMES = [
        "static_categorical_features",
        "static_real_features",
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
        ]
    
    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=False)
    
    # we create a Test Instance splitter which will sample the very last 
    # context window seen during training only for the encoder.
    instance_splitter = create_instance_splitter(
        config, "test"
    ) + SelectFields(PREDICTION_INPUT_NAMES)
    
    # we apply the transformations in test mode
    testing_instances = instance_splitter.apply(transformed_data, is_train=False)
    
    # This returns a Dataloader which will go over the dataset once.
    return DataLoader(IterableDataset(testing_instances), batch_size=batch_size, **kwargs)
train_dataloader = create_train_dataloader(
    config=config, 
    freq=freq, 
    data=train_dataset, 
    batch_size=256, 
    num_batches_per_epoch=100,
)

test_dataloader = create_test_dataloader(
    config=config, 
    freq=freq, 
    data=test_dataset,
    batch_size=64,
)

让咱们查看第一批:

batch = next(iter(train_dataloader))
for k,v in batch.items():
  print(k,v.shape, v.type())

>>> static_categorical_features torch.Size([256, 1]) torch.LongTensor
    static_real_features torch.Size([256, 1]) torch.FloatTensor
    past_time_features torch.Size([256, 181, 2]) torch.FloatTensor
    past_values torch.Size([256, 181]) torch.FloatTensor
    past_observed_mask torch.Size([256, 181]) torch.FloatTensor
    future_time_features torch.Size([256, 24, 2]) torch.FloatTensor
    future_values torch.Size([256, 24]) torch.FloatTensor
    future_observed_mask torch.Size([256, 24]) torch.FloatTensor

能够看出,咱们没有将 input_idsattention_mask 提供给编码器 (训练 NLP 模型时也是这种状况),而是提供 past_values,以及 past_observed_maskpast_time_featuresstatic_categorical_featuresstatic_real_features 几项数据。

解码器的输出包含 future_valuesfuture_observed_maskfuture_time_featuresfuture_values 能够看作等同于 NLP 训练中的 decoder_input_ids

咱们能够参考 Time Series Transformer 文档 以取得对它们中每一个的具体解释。

前向流传

让咱们对刚刚创立的批次执行一次前向流传:

# perform forward pass
outputs = model(
    past_values=batch["past_values"],
    past_time_features=batch["past_time_features"],
    past_observed_mask=batch["past_observed_mask"],
    static_categorical_features=batch["static_categorical_features"],
    static_real_features=batch["static_real_features"],
    future_values=batch["future_values"],
    future_time_features=batch["future_time_features"],
    future_observed_mask=batch["future_observed_mask"],
    output_hidden_states=True
)
print("Loss:", outputs.loss.item())

>>> Loss: 9.141253471374512

目前,该模型返回了损失值。这是因为解码器会主动将 future_values 向右挪动一个地位以取得标签。这容许计算预测后果和标签值之间的误差。

另请留神,解码器应用 Causal Mask 来防止预测将来,因为它须要预测的值在 future_values 张量中。

训练模型

是时候训练模型了!咱们将应用规范的 PyTorch 训练循环。

这里咱们用到了 🤗 Accelerate 库,它会主动将模型、优化器和数据加载器搁置在适当的 device 上。

from accelerate import Accelerator
from torch.optim import Adam

accelerator = Accelerator()
device = accelerator.device

model.to(device)
optimizer = Adam(model.parameters(), lr=1e-3)
 
model, optimizer, train_dataloader = accelerator.prepare(
    model, optimizer, train_dataloader, 
)

for epoch in range(40):
    model.train()
    for batch in train_dataloader:
        optimizer.zero_grad()
        outputs = model(
            static_categorical_features=batch["static_categorical_features"].to(device),
            static_real_features=batch["static_real_features"].to(device),
            past_time_features=batch["past_time_features"].to(device),
            past_values=batch["past_values"].to(device),
            future_time_features=batch["future_time_features"].to(device),
            future_values=batch["future_values"].to(device),
            past_observed_mask=batch["past_observed_mask"].to(device),
            future_observed_mask=batch["future_observed_mask"].to(device),
        )
        loss = outputs.loss

        # Backpropagation
        accelerator.backward(loss)
        optimizer.step()

        print(loss.item())

推理

在推理时,倡议应用 generate() 办法进行自回归生成,相似于 NLP 模型。

预测的过程会从测试实例采样器中取得数据。采样器会将数据集的每个工夫序列的最初 context_length 那么长时间的数据采样进去,而后输出模型。请留神,这里须要把提前已知的 future_time_features 传递给解码器。

该模型将从预测散布中自回归采样肯定数量的值,并将它们传回解码器最终失去预测输入:

model.eval()

forecasts = []

for batch in test_dataloader:
    outputs = model.generate(
        static_categorical_features=batch["static_categorical_features"].to(device),
        static_real_features=batch["static_real_features"].to(device),
        past_time_features=batch["past_time_features"].to(device),
        past_values=batch["past_values"].to(device),
        future_time_features=batch["future_time_features"].to(device),
        past_observed_mask=batch["past_observed_mask"].to(device),
    )
    forecasts.append(outputs.sequences.cpu().numpy())

该模型输入一个示意构造的张量 (batch_size, number of samples, prediction length)。

上面的输入阐明: 对于大小为 64 的批次中的每个示例,咱们将取得接下来 24 个月内的 100 个可能的值:

forecasts[0].shape

>>> (64, 100, 24)

咱们将垂直重叠它们,以取得测试数据集中所有工夫序列的预测:

forecasts = np.vstack(forecasts)
print(forecasts.shape)

>>> (366, 100, 24)

咱们能够依据测试集中存在的样本值,依据真实情况评估生成的预测。这里咱们应用数据集中的每个工夫序列的 MASE 和 sMAPE 指标 (metrics) 来评估:

from evaluate import load
from gluonts.time_feature import get_seasonality

mase_metric = load("evaluate-metric/mase")
smape_metric = load("evaluate-metric/smape")

forecast_median = np.median(forecasts, 1)

mase_metrics = []
smape_metrics = []
for item_id, ts in enumerate(test_dataset):
    training_data = ts["target"][:-prediction_length]
    ground_truth = ts["target"][-prediction_length:]
    mase = mase_metric.compute(
        predictions=forecast_median[item_id], 
        references=np.array(ground_truth), 
        training=np.array(training_data), 
        periodicity=get_seasonality(freq))
    mase_metrics.append(mase["mase"])
    
    smape = smape_metric.compute(
        predictions=forecast_median[item_id], 
        references=np.array(ground_truth), 
    )
    smape_metrics.append(smape["smape"])
print(f"MASE: {np.mean(mase_metrics)}")

>>> MASE: 1.361636922541396

print(f"sMAPE: {np.mean(smape_metrics)}")

>>> sMAPE: 0.17457818831512306

咱们还能够独自绘制数据集中每个工夫序列的后果指标,并察看到其中多数工夫序列对最终测试指标的影响很大:

plt.scatter(mase_metrics, smape_metrics, alpha=0.3)
plt.xlabel("MASE")
plt.ylabel("sMAPE")
plt.show()

为了依据根本事实测试数据绘制任何工夫序列的预测,咱们定义了以下辅助绘图函数:

import matplotlib.dates as mdates

def plot(ts_index):
    fig, ax = plt.subplots()

    index = pd.period_range(
        start=test_dataset[ts_index][FieldName.START],
        periods=len(test_dataset[ts_index][FieldName.TARGET]),
        freq=freq,
    ).to_timestamp()

    # Major ticks every half year, minor ticks every month,
    ax.xaxis.set_major_locator(mdates.MonthLocator(bymonth=(1, 7)))
    ax.xaxis.set_minor_locator(mdates.MonthLocator())

    ax.plot(
        index[-2*prediction_length:], 
        test_dataset[ts_index]["target"][-2*prediction_length:],
        label="actual",
    )

    plt.plot(
        index[-prediction_length:], 
        np.median(forecasts[ts_index], axis=0),
        label="median",
    )
    
    plt.fill_between(
        index[-prediction_length:],
        forecasts[ts_index].mean(0) - forecasts[ts_index].std(axis=0), 
        forecasts[ts_index].mean(0) + forecasts[ts_index].std(axis=0), 
        alpha=0.3, 
        interpolate=True,
        label="+/- 1-std",
    )
    plt.legend()
    plt.show()

例如:

plot(334)

咱们如何与其余模型进行比拟? Monash Time Series Repository 有一个测试集 MASE 指标的比拟表。咱们能够将本人的后果增加到其中作比拟:

Dataset SES Theta TBATS ETS (DHR-)ARIMA PR CatBoost FFNN DeepAR N-BEATS WaveNet Transformer (Our)
Tourism Monthly 3.306 1.649 1.751 1.526 1.589 1.678 1.699 1.582 1.409 1.574 1.482 1.361

请留神,咱们的模型击败了所有已知的其余模型 (另请参见相应 论文 中的表 2) ,并且咱们没有做任何超参数优化。咱们仅仅花了 40 个残缺训练调参周期来训练 Transformer。

当然,咱们应该虚心。从历史倒退的角度来看,当初认为神经网络解决工夫序列预测问题是正途,就好比当年的论文得出了 “你须要的就是 XGBoost” 的论断。咱们只是很好奇,想看看神经网络能带咱们走多远,以及 Transformer 是否会在这个畛域发挥作用。这个特定的数据集仿佛表明它相对值得摸索。

下一步

咱们激励读者尝试咱们的 Jupyter Notebook 和来自 Hugging Face Hub 的其余工夫序列数据集,并替换适当的频率和预测长度参数。对于您的数据集,须要将它们转换为 GluonTS 的习用格局,在他们的 文档 里有十分清晰的阐明。咱们还筹备了一个示例 Notebook,向您展现如何将数据集转换为 🤗 Hugging Face 数据集格局。

正如工夫序列钻研人员所知,人们对“将基于 Transformer 的模型利用于工夫序列”问题很感兴趣。传统 vanilla Transformer 只是泛滥基于注意力 (Attention) 的模型之一,因而须要向库中补充更多模型。

目前没有什么能障碍咱们持续摸索对多变量工夫序列 (multivariate time series) 进行建模,然而为此须要应用多变量散布头 (multivariate distribution head) 来实例化模型。目前曾经反对了对角独立散布 (diagonal independent distributions),后续会减少其余多元散布反对。请持续关注将来的博客文章以及其中的教程。

路线图上的另一件事是工夫序列分类。这须要将带有分类头的工夫序列模型增加到库中,例如用于异样检测这类工作。

以后的模型会假如日期工夫和工夫序列值都存在,但在事实中这可能不能齐全满足。例如 WOODS 给出的神经科学数据集。因而,咱们还须要对以后模型进行泛化,使某些输出在整个流水线中可选。

最初,NLP/CV 畛域从大型预训练模型 中获益匪浅,但据咱们所知,工夫序列畛域并非如此。基于 Transformer 的模型仿佛是这一钻研方向的必然之选,咱们急不可待地想看看钻研人员和从业者会发现哪些冲破!


英文原文: Probabilistic Time Series Forecasting with 🤗 Transformers

译者、排版: zhongdongy (阿东)

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理