DeepTime,是一个联合应用元学习的深度工夫指数模型。通过应用元学习公式来预测将来,以应答工夫序列中的常见问题(协变量偏移和条件散布偏移——非安稳)。该模型是工夫序列预测的元学习公式协同作用的一个很好的例子。
DeepTime架构
DeepTime组件
DeepTime中有三种类型的层:
- 岭回归
- 多层感知机(MLP)
- 随机傅里叶特色
让咱们看看这些层在做什么:
岭回归
多层感知机(MLP)
这些是在神经网络(nn)中应用的线性回归公式。而后应用了一个ReLU函数激活。这些层非常适合将工夫指数映射到该工夫指数的工夫序列值。公式如下:
随机的傅里叶层
随机傅里叶容许mlp学习高频模式。只管随机傅里叶层须要为每个工作和数据集找到不同的超参数(只是为了不适度拟合或有余拟合),但作者通过将各种傅里叶基函数与各种尺度参数相结合来限度这种计算。
DeepTIME架构
在每个工作中,抉择一个工夫序列,而后将其分为骨干窗口(绿色)和预测窗口(蓝色)两局部。而后,而后他们通过两个彼此共享信息并与元参数关联的元模型。 在上图形容的架构上训练模型后,计算损失函数并尝试将其最小化。
其余工夫序列预测模型的区别
DeepTIME是一个工夫指数模型,就像Prophet,高斯过程等,而最近比较突出的模型如N-HiTS, Autoformer, DeepAR, Informer等都是历史价值模型。
当咱们说工夫序列的工夫指数模型时,确切的意思是预测相对随工夫变动(它思考了以后的工夫指数特色)。另一方面,历史价值模型应用以前的事件来预测将来。这个公式能让你更分明。:)
它蕴含了元学习公式,这意味着这个模型能够学会如何学习。因为它是一个工夫指数模型,能够在元学习中体现出更好的样本效率。
它采纳间接多步预计(DMS)的办法(DMS模型一次间接预测几个数据点)。另外通过多步迭代(IMS),它只预测下一个值,而后应用它来预测下一个数据点,这与ARIMA、DeepAR等雷同。
元学习给工夫序列预测带来了什么?
- 更好的工作泛化
- 合乎左近工夫步长遵循部分安稳散布的假如。
- 还蕴含了类似的工夫点将具备类似的特色的假如。
模型如何预测
在每一次训练时,将数据分为两个窗口(通过应用第一个窗口预测第二个窗口)。这里为了简略起见应用PyTorch Lightning简化训练过程。
importnumpyasnp importgin importpytorch_lightningaspl frommodelsimportget_model importrandom importtorch importtorch.nn.functionalasF fromtorchimportoptim importmath fromutilsimportCheckpoint, default_device, to_tensor @gin.configurable classDeepTimeTrainer(pl.LightningModule): def__init__(self, lr, lambda_lr, weight_decay, warmup_epochs, random_seed, T_max, eta_min, dim_size, datetime_feats, ): gin.parse_config_file('/home/reza/Projects/PL_DeepTime/DeepTime/config/config.gin') super(DeepTimeTrainer, self).__init__() self.lr=lr self.lambda_lr=lambda_lr self.weight_decay=weight_decay self.warmup_epochs=warmup_epochs self.random_seed=random_seed self.lr=lr self.lambda_lr=lambda_lr self.weight_decay=weight_decay self.T_max=T_max self.warmup_epochs=warmup_epochs self.eta_min=eta_min self.model=get_model( model_type='deeptime', dim_size=dim_size, datetime_feats=datetime_feats ) defon_fit_start(self): torch.manual_seed(self.random_seed) np.random.seed(self.random_seed) random.seed(self.random_seed) deftraining_step(self, batch, batch_idx): x, y, x_time, y_time=map(to_tensor, batch) forecast=self.model(x, x_time, y_time) ifisinstance(forecast, tuple): # for models which require reconstruction + forecast loss loss=F.mse_loss(forecast[0], x) + \ F.mse_loss(forecast[1], y) else: loss=F.mse_loss(forecast, y) self.log('train_loss', loss, prog_bar=True, on_epoch=True) return {'loss': loss, 'train_loss': loss, } deftraining_epoch_end(self, outputs): avg_train_loss=torch.stack([x["train_loss"] forxinoutputs]).mean() self.log('avg_train_loss', avg_train_loss, on_epoch=True, sync_dist=True) defvalidation_step(self, batch, batch_idx): x, y, x_time, y_time=map(to_tensor, batch) forecast=self.model(x, x_time, y_time) ifisinstance(forecast, tuple): # for models which require reconstruction + forecast loss loss=F.mse_loss(forecast[0], x) + \ F.mse_loss(forecast[1], y) else: loss=F.mse_loss(forecast, y) self.log('val_loss', loss, prog_bar=True, on_epoch=True) return {'val_loss': loss} defvalidation_epoch_end(self, outputs): returnoutputs deftest_step(self, batch, batch_idx): x, y, x_time, y_time=map(to_tensor, batch) forecast=self.model(x, x_time, y_time) ifisinstance(forecast, tuple): # for models which require reconstruction + forecast loss loss=F.mse_loss(forecast[0], x) + \ F.mse_loss(forecast[1], y) else: loss=F.mse_loss(forecast, y) self.log('test_loss', loss, prog_bar=True, on_epoch=True) return {'test_loss': loss} deftest_epoch_end(self, outputs): returnoutputs @gin.configurable defconfigure_optimizers(self): group1= [] # lambda group2= [] # no decay group3= [] # decay no_decay_list= ('bias', 'norm',) forparam_name, paraminself.model.named_parameters(): if'_lambda'inparam_name: group1.append(param) elifany([modinparam_nameformodinno_decay_list]): group2.append(param) else: group3.append(param) optimizer=optim.Adam([ {'params': group1, 'weight_decay': 0, 'lr': self.lambda_lr, 'scheduler': 'cosine_annealing'}, {'params': group2, 'weight_decay': 0, 'scheduler': 'cosine_annealing_with_linear_warmup'}, {'params': group3, 'scheduler': 'cosine_annealing_with_linear_warmup'} ], lr=self.lr, weight_decay=self.weight_decay) scheduler_fns= [] forparam_groupinoptimizer.param_groups: scheduler=param_group['scheduler'] ifscheduler=='none': fn=lambdaT_cur: 1 elifscheduler=='cosine_annealing': lr=eta_max=param_group['lr'] fn=lambdaT_cur: (self.eta_min+0.5* (eta_max-self.eta_min) * ( 1.0+math.cos( (T_cur-self.warmup_epochs) / (self.T_max-self.warmup_epochs) *math.pi))) /lr elifscheduler=='cosine_annealing_with_linear_warmup': lr=eta_max=param_group['lr'] fn=lambdaT_cur: T_cur/self.warmup_epochsifT_cur<self.warmup_epochselse (self.eta_min+0.5* ( eta_max-self.eta_min) * (1.0+math.cos( (T_cur-self.warmup_epochs) / (self.T_max-self.warmup_epochs) *math.pi))) /lr else: raiseValueError(f'No such scheduler, {scheduler}') scheduler_fns.append(fn) scheduler=optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=scheduler_fns) return {'optimizer': optimizer, 'lr_scheduler': scheduler} defforward(self, batch, z_0=None): z_0=None Y=batch['Y'].to(default_device) sample_mask=batch['sample_mask'].to(default_device) available_mask=batch['available_mask'].to(default_device) # Forecasting forecasting_mask=available_mask.clone() ifself.n_time_out>0: forecasting_mask[:, 0, -self.n_time_out:] =0 Y, Y_hat, z=self.model(Y=Y, mask=forecasting_mask, idxs=None, z_0=z_0) ifself.n_time_out>0: Y=Y[:, :, -self.n_time_out:] Y_hat=Y_hat[:, :, -self.n_time_out:] sample_mask=sample_mask[:, :, -self.n_time_out:] returnY, Y_hat, sample_mask, z
作者在合成数据集和真实世界数据集上进行了宽泛的试验,表明DeepTime具备极具竞争力的性能,在基于MSE的多元预测基准的24个试验中,有20个取得了最先进的后果。
有趣味的能够看看源代码:https://avoid.overfit.cn/post/5736b84982b847f991def19a5af4c9a0
作者:Reza Yazdanfar