Traffic prediction aims to predict future traffic time series conditions (such as traffic flow or traffic speed) by analyzing historical observation data (such as sensor records on traffic networks). As an important component of the Intelligent Transportation System (ITS), the traffic prediction task is the core foundation for realizing smart cities, including active dynamic traffic control and intelligent route guidance, which helps reduce road safety hazards and improve the operational efficiency of urban transportation systems.
TGCN, a Temporal Graph Convolutional Network for traffic flow prediction. Specifically, by modeling the traffic network as graph structure data, the Graph Convolutional Network (GCN) module is used to extract spatial features; by modeling the traffic signal as time series information, the Temporal Convolutional Network (TCN) module is used to capture temporal features. TGCN finally completes the traffic flow prediction task by iteratively executing two modules.
classgraph_conv(nn.Layer):def__init__(self,in_dim,out_dim,dropout,num_layer=2):super(graph_conv,self).__init__()self.mlp=nn.Conv2D((num_layer+1)*in_dim,out_dim,kernel_size=(1,1),weight_attr=KaimingNormal(),)self.dropout=dropoutself.num_layer=num_layerdefforward(self,x,adj):# B C N Tout=[x]for_inrange(self.num_layer):new_x=pp.matmul(adj,x)out.append(new_x)x=new_xh=pp.concat(out,axis=1)h=self.mlp(h)h=F.dropout(h,self.dropout,training=self.training)returnh
classtempol_conv(nn.Layer):def__init__(self,in_dim,out_dim,hidden,num_layer=3,k_s=3,alpha=0.1):super(tempol_conv,self).__init__()self.leakyrelu=nn.LeakyReLU(alpha)self.tc_convs=nn.LayerList()self.num_layer=num_layerforiinrange(num_layer):in_channels=in_dimifi==0elsehiddenself.tc_convs.append(nn.Conv2D(in_channels=in_channels,out_channels=hidden,kernel_size=(1,k_s),padding=(0,i+1),dilation=i+1,weight_attr=KaimingNormal(),))self.mlp=nn.Conv2D(in_channels=in_dim+hidden*num_layer,out_channels=out_dim,kernel_size=(1,1),weight_attr=KaimingNormal(),)defforward(self,x):# B C N Tx_cat=[x]foriinrange(self.num_layer):x=self.leakyrelu(self.tc_convs[i](x))x_cat.append(x)tc_out=self.mlp(pp.concat(x_cat,axis=1))returntc_out
# TC1tc1_out=self.tc1_conv(emb_x)# B hidden N T# SC1sc1_out=self.sc1_conv(tc1_out,self.adj)# B hidden N Tsc1_out=sc1_out+tc1_outsc1_out=self.bn1(sc1_out)# TC2tc2_out=self.tc2_conv(sc1_out)# B hidden N T# SC2sc2_out=self.sc2_conv(tc2_out,self.adj)# B hidden N Tsc2_out=sc2_out+tc2_outsc2_out=self.bn2(sc2_out)
Finally, the model concatenates the initial node features with the inputs of the two GCN modules, and uses a two-layer MLP to obtain the target output (i.e., traffic flow prediction of traffic nodes in the future period):
# readout blockx_out=F.relu(pp.concat((emb_x,sc1_out,sc2_out),axis=1))x_out=F.relu(self.end_conv_1(x_out))# transformx_out=self.end_conv_2(x_out)# B T N 1
The case uses preprocessed PEMSD4 and PEMSD8 datasets. PEMSD4 is traffic data from the San Francisco Bay Area, selecting traffic data recorded by 307 sensors on 29 roads from January to February 2018. PEMSD8 is traffic data collected by 170 detectors on 8 roads in San Bernardino from July to August 2016.
Both datasets are saved as N x T x 1 matrices, recording traffic data of corresponding traffic nodes and times, where N is the number of traffic nodes and T is the length of the time series. The two datasets are divided into training set, validation set, and test set according to 7:2:1 respectively. The mean and standard deviation of traffic data are pre-calculated in the case for subsequent normalization operations.
# set modelmodel=TGCN(input_keys=cfg.MODEL.input_keys,output_keys=cfg.MODEL.label_keys,adj=adj,in_dim=cfg.input_dim,emb_dim=cfg.emb_dim,hidden=cfg.hidden,gc_layer=cfg.gc_layer,tc_layer=cfg.tc_layer,k_s=cfg.tc_kernel_size,dropout=cfg.dropout,alpha=cfg.leakyrelu_alpha,input_len=cfg.input_len,label_len=cfg.label_len,)
This case solves the problem based on data-driven methods, so it is necessary to use SupervisedConstraint built in PaddleScience to construct supervised constraints. Before defining constraints, you need to first specify various parameters used for data loading in constraints.
# set train dataloader configtrain_dataloader_cfg={"dataset":{"name":"PEMSDataset","file_path":cfg.data_path,"split":"train","input_keys":cfg.MODEL.input_keys,"label_keys":cfg.MODEL.label_keys,"norm_input":cfg.norm_input,"norm_label":cfg.norm_label,"input_len":cfg.input_len,"label_len":cfg.label_len,},"sampler":{"name":"BatchSampler","drop_last":True,"shuffle":True,},"batch_size":cfg.TRAIN.batch_size,}
The code for defining supervised constraints is as follows:
# set constraintsup_constraint=ppsci.constraint.SupervisedConstraint(train_dataloader_cfg,ppsci.loss.L1Loss(),name="train")constraint={sup_constraint.name:sup_constraint}
The first parameter of SupervisedConstraint is the data loading method, here train_dataloader_cfg defined above is used;
The second parameter is the definition of loss function, here the custom loss function L1_loss is used;
The third parameter is the name of the constraint condition, which is convenient for subsequent indexing. Here it is named train.
During the training process of this case, the training status of the current model will be evaluated using the validation set at certain training round intervals, and SupervisedValidator is needed to construct the validator.
# set eval dataloader configeval_dataloader_cfg={"dataset":{"name":"PEMSDataset","file_path":cfg.data_path,"split":"val","input_keys":cfg.MODEL.input_keys,"label_keys":cfg.MODEL.label_keys,"norm_input":cfg.norm_input,"norm_label":cfg.norm_label,"input_len":cfg.input_len,"label_len":cfg.label_len,},"sampler":{"name":"BatchSampler",},"batch_size":cfg.EVAL.batch_size,}
The code for defining supervised validator is as follows:
# set validatorsup_validator=ppsci.validate.SupervisedValidator(eval_dataloader_cfg,ppsci.loss.L1Loss(),metric={"MAE":ppsci.metric.MAE(),"RMSE":ppsci.metric.RMSE()},name="val",)validator={sup_validator.name:sup_validator}
The SupervisedValidator validator is similar to SupervisedConstraint constraint, the difference is that the validator needs to set evaluation metric metric, here the evaluation metrics used are MAE and RMSE.
By setting the eval_during_train parameter in ppsci.solver.Solver, the model parameters with the best effect on the validation set can be automatically saved.
fromtypingimportTupleimportpaddleasppimportpaddle.nn.functionalasFfromnumpyimportndarrayfrompaddleimportnnfrompaddle.nn.initializerimportKaimingNormalfromppsci.arch.baseimportArchclassgraph_conv(nn.Layer):def__init__(self,in_dim,out_dim,dropout,num_layer=2):super(graph_conv,self).__init__()self.mlp=nn.Conv2D((num_layer+1)*in_dim,out_dim,kernel_size=(1,1),weight_attr=KaimingNormal(),)self.dropout=dropoutself.num_layer=num_layerdefforward(self,x,adj):# B C N Tout=[x]for_inrange(self.num_layer):new_x=pp.matmul(adj,x)out.append(new_x)x=new_xh=pp.concat(out,axis=1)h=self.mlp(h)h=F.dropout(h,self.dropout,training=self.training)returnhclasstempol_conv(nn.Layer):def__init__(self,in_dim,out_dim,hidden,num_layer=3,k_s=3,alpha=0.1):super(tempol_conv,self).__init__()self.leakyrelu=nn.LeakyReLU(alpha)self.tc_convs=nn.LayerList()self.num_layer=num_layerforiinrange(num_layer):in_channels=in_dimifi==0elsehiddenself.tc_convs.append(nn.Conv2D(in_channels=in_channels,out_channels=hidden,kernel_size=(1,k_s),padding=(0,i+1),dilation=i+1,weight_attr=KaimingNormal(),))self.mlp=nn.Conv2D(in_channels=in_dim+hidden*num_layer,out_channels=out_dim,kernel_size=(1,1),weight_attr=KaimingNormal(),)defforward(self,x):# B C N Tx_cat=[x]foriinrange(self.num_layer):x=self.leakyrelu(self.tc_convs[i](x))x_cat.append(x)tc_out=self.mlp(pp.concat(x_cat,axis=1))returntc_outclassTGCN(Arch):""" TGCN is a class that represents an Temporal Graph Convolutional Network model. Args: input_keys (Tuple[str, ...]): A tuple of input keys. output_keys (Tuple[str, ...]): A tuple of output keys. adj (ndarray): The adjacency matrix of the graph. in_dim (int): The dimension of the input data. emb_dim (int): The dimension of the embedded space. hidden (int): The dimension of the latent space. gc_layer (int): The number of the graph convolutional layer. tc_layer (int): The number of the temporal convolutional layer. k_s (int): The kernel size of the temporal convolutional layer. dropout (float): The dropout rate. alpha (float): The negative slope of LeakyReLU. input_len (int): The input timesteps. label_len (int): The output timesteps. Examples: >>> import paddle >>> import ppsci >>> model = ppsci.arch.TGCN( ... input_keys=("input",), ... output_keys=("label",), ... adj=numpy.ones((307, 307), dtype=numpy.float32), ... in_dim=1, ... emb_dim=32 ... hidden=64, ... gc_layer=2, ... tc_layer=2 ... k_s=3, ... dropout=0.25, ... alpha=0.1, ... input_len=12, ... label_len=12, ... ) >>> input_dict = {"input": paddle.rand([64, 12, 307, 1]),} >>> label_dict = model(input_dict) >>> print(label_dict["label"].shape) [64, 12, 307, 1] """def__init__(self,input_keys:Tuple[str,...],output_keys:Tuple[str,...],adj:ndarray,in_dim:int,emb_dim:int,hidden:int,gc_layer:int,tc_layer:int,k_s:int,dropout:float,alpha:float,input_len:int,label_len:int,):super(TGCN,self).__init__()self.input_keys=input_keysself.output_keys=output_keysself.register_buffer("adj",pp.to_tensor(data=adj))self.emb_conv=nn.Conv2D(in_channels=in_dim,out_channels=emb_dim,kernel_size=(1,1),weight_attr=KaimingNormal(),)self.tc1_conv=tempol_conv(emb_dim,hidden,hidden,num_layer=tc_layer,k_s=k_s,alpha=alpha)self.sc1_conv=graph_conv(hidden,hidden,dropout,num_layer=gc_layer)self.bn1=nn.BatchNorm2D(hidden)self.tc2_conv=tempol_conv(hidden,hidden,hidden,num_layer=tc_layer,k_s=k_s,alpha=alpha)self.sc2_conv=graph_conv(hidden,hidden,dropout,num_layer=gc_layer)self.bn2=nn.BatchNorm2D(hidden)self.end_conv_1=nn.Conv2D(in_channels=emb_dim+hidden+hidden,out_channels=2*hidden,kernel_size=(1,1),weight_attr=KaimingNormal(),)self.end_conv_2=nn.Conv2D(in_channels=2*hidden,out_channels=label_len,kernel_size=(1,input_len),weight_attr=KaimingNormal(),)defforward(self,raw):# emb blockx=raw[self.input_keys[0]]x=x.transpose(perm=[0,3,2,1])# B in_dim N Temb_x=self.emb_conv(x)# B emd_dim N T# TC1tc1_out=self.tc1_conv(emb_x)# B hidden N T# SC1sc1_out=self.sc1_conv(tc1_out,self.adj)# B hidden N Tsc1_out=sc1_out+tc1_outsc1_out=self.bn1(sc1_out)# TC2tc2_out=self.tc2_conv(sc1_out)# B hidden N T# SC2sc2_out=self.sc2_conv(tc2_out,self.adj)# B hidden N Tsc2_out=sc2_out+tc2_outsc2_out=self.bn2(sc2_out)# readout blockx_out=F.relu(pp.concat((emb_x,sc1_out,sc2_out),axis=1))x_out=F.relu(self.end_conv_1(x_out))# transformx_out=self.end_conv_2(x_out)# B T N 1return{self.output_keys[0]:x_out}
importhydrafromomegaconfimportDictConfigimportppscifromppsci.arch.tgcnimportTGCNfromppsci.data.dataset.pems_datasetimportget_edge_indexdeftrain(cfg:DictConfig):# set train dataloader configtrain_dataloader_cfg={"dataset":{"name":"PEMSDataset","file_path":cfg.data_path,"split":"train","input_keys":cfg.MODEL.input_keys,"label_keys":cfg.MODEL.label_keys,"norm_input":cfg.norm_input,"norm_label":cfg.norm_label,"input_len":cfg.input_len,"label_len":cfg.label_len,},"sampler":{"name":"BatchSampler","drop_last":True,"shuffle":True,},"batch_size":cfg.TRAIN.batch_size,}# set constraintsup_constraint=ppsci.constraint.SupervisedConstraint(train_dataloader_cfg,ppsci.loss.L1Loss(),name="train")constraint={sup_constraint.name:sup_constraint}# set eval dataloader configeval_dataloader_cfg={"dataset":{"name":"PEMSDataset","file_path":cfg.data_path,"split":"val","input_keys":cfg.MODEL.input_keys,"label_keys":cfg.MODEL.label_keys,"norm_input":cfg.norm_input,"norm_label":cfg.norm_label,"input_len":cfg.input_len,"label_len":cfg.label_len,},"sampler":{"name":"BatchSampler",},"batch_size":cfg.EVAL.batch_size,}# set validatorsup_validator=ppsci.validate.SupervisedValidator(eval_dataloader_cfg,ppsci.loss.L1Loss(),metric={"MAE":ppsci.metric.MAE(),"RMSE":ppsci.metric.RMSE()},name="val",)validator={sup_validator.name:sup_validator}# get adj_,_,adj=get_edge_index(cfg.data_path,reduce=cfg.reduce)# set modelmodel=TGCN(input_keys=cfg.MODEL.input_keys,output_keys=cfg.MODEL.label_keys,adj=adj,in_dim=cfg.input_dim,emb_dim=cfg.emb_dim,hidden=cfg.hidden,gc_layer=cfg.gc_layer,tc_layer=cfg.tc_layer,k_s=cfg.tc_kernel_size,dropout=cfg.dropout,alpha=cfg.leakyrelu_alpha,input_len=cfg.input_len,label_len=cfg.label_len,)# init optimizeroptimizer=ppsci.optimizer.Adam(learning_rate=cfg.TRAIN.learning_rate)(model)# set iters_per_epoch by dataloader lengthiters_per_epoch=len(sup_constraint.data_loader)# initialize solversolver=ppsci.solver.Solver(model=model,constraint=constraint,output_dir=cfg.output_dir,optimizer=optimizer,epochs=cfg.TRAIN.epochs,iters_per_epoch=iters_per_epoch,log_freq=cfg.log_freq,eval_during_train=True,validator=validator,pretrained_model_path=cfg.TRAIN.pretrained_model_path,eval_with_no_grad=True,)# train modelsolver.train()defeval(cfg:DictConfig):# set eval dataloader configtest_dataloader_cfg={"dataset":{"name":"PEMSDataset","file_path":cfg.data_path,"split":"test","input_keys":cfg.MODEL.input_keys,"label_keys":cfg.MODEL.label_keys,"norm_input":cfg.norm_input,"norm_label":cfg.norm_label,"input_len":cfg.input_len,"label_len":cfg.label_len,},"sampler":{"name":"BatchSampler",},"batch_size":cfg.EVAL.batch_size,}# set validatorsup_validator=ppsci.validate.SupervisedValidator(test_dataloader_cfg,ppsci.loss.L1Loss(),metric={"MAE":ppsci.metric.MAE(),"RMSE":ppsci.metric.RMSE()},name="test",)validator={sup_validator.name:sup_validator}# get adj_,_,adj=get_edge_index(cfg.data_path,reduce=cfg.reduce)# set modelmodel=TGCN(input_keys=cfg.MODEL.input_keys,output_keys=cfg.MODEL.label_keys,adj=adj,in_dim=cfg.input_dim,emb_dim=cfg.emb_dim,hidden=cfg.hidden,gc_layer=cfg.gc_layer,tc_layer=cfg.tc_layer,k_s=cfg.tc_kernel_size,dropout=cfg.dropout,alpha=cfg.leakyrelu_alpha,input_len=cfg.input_len,label_len=cfg.label_len,)# initialize solversolver=ppsci.solver.Solver(model=model,output_dir=cfg.output_dir,log_freq=cfg.log_freq,validator=validator,pretrained_model_path=cfg.EVAL.pretrained_model_path,eval_with_no_grad=True,)# evaluatesolver.eval()@hydra.main(version_base=None,config_path="./conf",config_name="run.yaml")defmain(cfg:DictConfig):ifcfg.mode=="train":train(cfg)elifcfg.mode=="eval":eval(cfg)else:raiseValueError("cfg.mode should in [train, eval], but got {}".format(cfg.mode))if__name__=="__main__":main()
defaults:-ppsci_default-TRAIN:train_default-TRAIN/ema:ema_default-TRAIN/swa:swa_default-EVAL:eval_default-INFER:infer_default-hydra/job/config/override_dirname/exclude_keys:exclude_keys_default-_self_hydra:run:# dynamic output directory according to running time and override namedir:outputs_tgcn/${now:%Y-%m-%d}/${now:%H-%M-%S}job:name:${mode}# name of logfilechdir:false# keep current working directory unchangedcallbacks:init_callback:_target_:ppsci.utils.callbacks.InitCallbacksweep:# output directory for multirundir:${hydra.run.dir}subdir:./# general settingsdevice:gpumode:trainoutput_dir:${hydra:run.dir}log_freq:100# task settingsdata_name:PEMSD8data_path:./Data/${data_name}input_len:12label_len:12norm_input:Truenorm_label:Falsereduce:mean# model settingsMODEL:input_keys:["input"]label_keys:["label"]seed:3407batch_size:64input_dim:1output_dim:1emb_dim:32hidden:64gc_layer:2tc_layer:2tc_kernel_size:3dropout:0.25leakyrelu_alpha:0.1# training settingsTRAIN:epochs:200learning_rate:0.01pretrained_model_path:nullbatch_size:${batch_size}# evaluation settingsEVAL:pretrained_model_path:nullbatch_size:${batch_size}