DeepTime,是一个联合应用元学习的深度工夫指数模型。通过应用元学习公式来预测将来,以应答工夫序列中的常见问题(协变量偏移和条件散布偏移——非安稳)。该模型是工夫序列预测的元学习公式协同作用的一个很好的例子。
DeepTime 架构
DeepTime 组件
DeepTime 中有三种类型的层:
- 岭回归
- 多层感知机(MLP)
- 随机傅里叶特色
让咱们看看这些层在做什么:
岭回归
多层感知机(MLP)
这些是在神经网络 (nn) 中应用的线性回归公式。而后应用了一个 ReLU 函数激活。这些层非常适合将工夫指数映射到该工夫指数的工夫序列值。公式如下:
随机的傅里叶层
随机傅里叶容许 mlp 学习高频模式。只管随机傅里叶层须要为每个工作和数据集找到不同的超参数(只是为了不适度拟合或有余拟合),但作者通过将各种傅里叶基函数与各种尺度参数相结合来限度这种计算。
DeepTIME 架构
在每个工作中,抉择一个工夫序列,而后将其分为骨干窗口 (绿色) 和预测窗口 (蓝色) 两局部。而后,而后他们通过两个彼此共享信息并与元参数关联的元模型。在上图形容的架构上训练模型后,计算损失函数并尝试将其最小化。
其余工夫序列预测模型的区别
DeepTIME 是一个工夫指数模型,就像 Prophet,高斯过程等,而最近比较突出的模型如 N -HiTS, Autoformer, DeepAR, Informer 等都是历史价值模型。
当咱们说工夫序列的工夫指数模型时,确切的意思是预测相对随工夫变动(它思考了以后的工夫指数特色)。另一方面,历史价值模型应用以前的事件来预测将来。这个公式能让你更分明。:)
它蕴含了元学习公式,这意味着这个模型能够学会如何学习。因为它是一个工夫指数模型,能够在元学习中体现出更好的样本效率。
它采纳间接多步预计 (DMS) 的办法(DMS 模型一次间接预测几个数据点)。另外通过多步迭代(IMS),它只预测下一个值,而后应用它来预测下一个数据点,这与 ARIMA、DeepAR 等雷同。
元学习给工夫序列预测带来了什么?
- 更好的工作泛化
- 合乎左近工夫步长遵循部分安稳散布的假如。
- 还蕴含了类似的工夫点将具备类似的特色的假如。
模型如何预测
在每一次训练时,将数据分为两个窗口(通过应用第一个窗口预测第二个窗口)。这里为了简略起见应用 PyTorch Lightning 简化训练过程。
importnumpyasnp
importgin
importpytorch_lightningaspl
frommodelsimportget_model
importrandom
importtorch
importtorch.nn.functionalasF
fromtorchimportoptim
importmath
fromutilsimportCheckpoint, default_device, to_tensor
@gin.configurable
classDeepTimeTrainer(pl.LightningModule):
def__init__(self,
lr,
lambda_lr,
weight_decay,
warmup_epochs,
random_seed,
T_max,
eta_min,
dim_size,
datetime_feats,
):
gin.parse_config_file('/home/reza/Projects/PL_DeepTime/DeepTime/config/config.gin')
super(DeepTimeTrainer, self).__init__()
self.lr=lr
self.lambda_lr=lambda_lr
self.weight_decay=weight_decay
self.warmup_epochs=warmup_epochs
self.random_seed=random_seed
self.lr=lr
self.lambda_lr=lambda_lr
self.weight_decay=weight_decay
self.T_max=T_max
self.warmup_epochs=warmup_epochs
self.eta_min=eta_min
self.model=get_model(
model_type='deeptime',
dim_size=dim_size,
datetime_feats=datetime_feats
)
defon_fit_start(self):
torch.manual_seed(self.random_seed)
np.random.seed(self.random_seed)
random.seed(self.random_seed)
deftraining_step(self, batch, batch_idx):
x, y, x_time, y_time=map(to_tensor, batch)
forecast=self.model(x, x_time, y_time)
ifisinstance(forecast, tuple):
# for models which require reconstruction + forecast loss
loss=F.mse_loss(forecast[0], x) + \
F.mse_loss(forecast[1], y)
else:
loss=F.mse_loss(forecast, y)
self.log('train_loss', loss, prog_bar=True, on_epoch=True)
return {'loss': loss, 'train_loss': loss,}
deftraining_epoch_end(self, outputs):
avg_train_loss=torch.stack([x["train_loss"] forxinoutputs]).mean()
self.log('avg_train_loss', avg_train_loss, on_epoch=True, sync_dist=True)
defvalidation_step(self, batch, batch_idx):
x, y, x_time, y_time=map(to_tensor, batch)
forecast=self.model(x, x_time, y_time)
ifisinstance(forecast, tuple):
# for models which require reconstruction + forecast loss
loss=F.mse_loss(forecast[0], x) + \
F.mse_loss(forecast[1], y)
else:
loss=F.mse_loss(forecast, y)
self.log('val_loss', loss, prog_bar=True, on_epoch=True)
return {'val_loss': loss}
defvalidation_epoch_end(self, outputs):
returnoutputs
deftest_step(self, batch, batch_idx):
x, y, x_time, y_time=map(to_tensor, batch)
forecast=self.model(x, x_time, y_time)
ifisinstance(forecast, tuple):
# for models which require reconstruction + forecast loss
loss=F.mse_loss(forecast[0], x) + \
F.mse_loss(forecast[1], y)
else:
loss=F.mse_loss(forecast, y)
self.log('test_loss', loss, prog_bar=True, on_epoch=True)
return {'test_loss': loss}
deftest_epoch_end(self, outputs):
returnoutputs
@gin.configurable
defconfigure_optimizers(self):
group1= [] # lambda
group2= [] # no decay
group3= [] # decay
no_decay_list= ('bias', 'norm',)
forparam_name, paraminself.model.named_parameters():
if'_lambda'inparam_name:
group1.append(param)
elifany([modinparam_nameformodinno_decay_list]):
group2.append(param)
else:
group3.append(param)
optimizer=optim.Adam([{'params': group1, 'weight_decay': 0, 'lr': self.lambda_lr, 'scheduler': 'cosine_annealing'},
{'params': group2, 'weight_decay': 0, 'scheduler': 'cosine_annealing_with_linear_warmup'},
{'params': group3, 'scheduler': 'cosine_annealing_with_linear_warmup'}
], lr=self.lr, weight_decay=self.weight_decay)
scheduler_fns= []
forparam_groupinoptimizer.param_groups:
scheduler=param_group['scheduler']
ifscheduler=='none':
fn=lambdaT_cur: 1
elifscheduler=='cosine_annealing':
lr=eta_max=param_group['lr']
fn=lambdaT_cur: (self.eta_min+0.5* (eta_max-self.eta_min) * (
1.0+math.cos((T_cur-self.warmup_epochs) / (self.T_max-self.warmup_epochs) *math.pi))) /lr
elifscheduler=='cosine_annealing_with_linear_warmup':
lr=eta_max=param_group['lr']
fn=lambdaT_cur: T_cur/self.warmup_epochsifT_cur<self.warmup_epochselse (self.eta_min+0.5* (eta_max-self.eta_min) * (1.0+math.cos((T_cur-self.warmup_epochs) / (self.T_max-self.warmup_epochs) *math.pi))) /lr
else:
raiseValueError(f'No such scheduler, {scheduler}')
scheduler_fns.append(fn)
scheduler=optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=scheduler_fns)
return {'optimizer': optimizer, 'lr_scheduler': scheduler}
defforward(self, batch, z_0=None):
z_0=None
Y=batch['Y'].to(default_device)
sample_mask=batch['sample_mask'].to(default_device)
available_mask=batch['available_mask'].to(default_device)
# Forecasting
forecasting_mask=available_mask.clone()
ifself.n_time_out>0:
forecasting_mask[:, 0, -self.n_time_out:] =0
Y, Y_hat, z=self.model(Y=Y, mask=forecasting_mask, idxs=None, z_0=z_0)
ifself.n_time_out>0:
Y=Y[:, :, -self.n_time_out:]
Y_hat=Y_hat[:, :, -self.n_time_out:]
sample_mask=sample_mask[:, :, -self.n_time_out:]
returnY, Y_hat, sample_mask, z
作者在合成数据集和真实世界数据集上进行了宽泛的试验,表明 DeepTime 具备极具竞争力的性能,在基于 MSE 的多元预测基准的 24 个试验中,有 20 个取得了最先进的后果。
有趣味的能够看看源代码:https://avoid.overfit.cn/post/5736b84982b847f991def19a5af4c9a0
作者:Reza Yazdanfar