Skip to content

Latent Neural Operator LatentNO(or LNO)

# Darcy
# linux
wget -c -P ./datas/ https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Darcy_{train,val}.npy
# windows
# foreach ($f in "train","val") {curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Darcy_$f.npy --create-dirs -o ./datas/Darcy_$f.npy}
python LatentNO-steady.py --config-name=LatentNO-Darcy.yaml

# Elasticity
# linux
wget -c -P ./datas/ https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Elasticity_{train,val}.npy
# windows
# foreach ($f in "train","val") {curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Elasticity_$f.npy --create-dirs -o ./datas/Elasticity_$f.npy}
python LatentNO-steady.py --config-name=LatentNO-Elasticity.yaml

# Pipe
# linux
wget -c -P ./datas/ https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Pipe_{train,val}.npy
# windows
# foreach ($f in "train","val") {curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Pipe_$f.npy --create-dirs -o ./datas/Pipe_$f.npy}
python LatentNO-steady.py --config-name=LatentNO-Pipe.yaml

# NS2d
# linux
wget -c -P ./datas/ https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/NS2d_{train,val}.npy
# windows
# foreach ($f in "train","val") {curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/NS2d_$f.npy --create-dirs -o ./datas/Pipe_$f.npy}
python LatentNO-time.py --config-name=LatentNO-NS2d.yaml
# Darcy
# linux
wget -c -P ./datas/ https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Darcy_{train,val}.npy
# windows
# foreach ($f in "train","val") {curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Darcy_$f.npy --create-dirs -o ./datas/Darcy_$f.npy}
python LatentNO-steady.py --config-name=LatentNO-Darcy.yaml mode=eval EVAL.pretrained_model_path=https://paddle-org.bj.bcebos.com/paddlescience/models/LatentNO/LatentNO_Darcy_pretrained.pdparams

# Elasticity
# linux
wget -c -P ./datas/ https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Elasticity_{train,val}.npy
# windows
# foreach ($f in "train","val") {curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Elasticity_$f.npy --create-dirs -o ./datas/Elasticity_$f.npy}
python LatentNO-steady.py --config-name=LatentNO-Elasticity.yaml mode=eval EVAL.pretrained_model_path=https://paddle-org.bj.bcebos.com/paddlescience/models/LatentNO/LatentNO_Elasticity_pretrained.pdparams

# Pipe
# linux
wget -c -P ./datas/ https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Pipe_{train,val}.npy
# windows
# foreach ($f in "train","val") {curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/Pipe_$f.npy --create-dirs -o ./datas/Pipe_$f.npy}
python LatentNO-steady.py --config-name=LatentNO-Pipe.yaml mode=eval EVAL.pretrained_model_path=https://paddle-org.bj.bcebos.com/paddlescience/models/LatentNO/LatentNO_Pipe_pretrained.pdparams

# NS2d
# linux
wget -c -P ./datas/ https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/NS2d_{train,val}.npy
# windows
# foreach ($f in "train","val") {curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/LatentNO/NS2d_$f.npy --create-dirs -o ./datas/Pipe_$f.npy}
python LatentNO-time.py --config-name=LatentNO-NS2d.yaml mode=eval EVAL.pretrained_model_path=https://paddle-org.bj.bcebos.com/paddlescience/models/LatentNO/LatentNO_NS2d_pretrained.pdparams

1. Background Introduction

The forward problem of solving partial differential equations (PDEs) refers to finding the solution function given the specific form of the equation and the initial and boundary conditions. It can be unified into an operator learning task, thereby generalized into a sequence-to-sequence transformation framework. Neural operator models can learn the mapping from input functions to output functions in a data-driven manner based on paired training data, where both input and output functions are represented by sequences of sampling points. In recent years, the Transformer architecture has dominated the construction of neural operators. The attention mechanism models the long-range non-linear interaction relationships between all objects in the sequence, naturally fitting the sequence-to-sequence representation in the PDE solving process, and can provide more accurate modeling results compared to traditional fully connected structures. However, the time complexity of the attention mechanism is quadratic with respect to the sequence length, so the computational cost of using the attention mechanism to build neural operators increases dramatically. To reduce computational costs, some existing works attempt to replace the original attention mechanism with variants of linear time complexity attention mechanisms, but due to their limited modeling capabilities, they often sacrifice the solving accuracy of PDEs. Another part of existing works attempts to solve PDEs using a small number of physical features in the latent space, thereby getting rid of the intricate interaction relationships between a large number of sampling points in the original geometric space, and capturing the correlation between physical features in a compact latent space. However, these methods either rely on manually specified basis function features or fail to construct a persistent latent space. Therefore, this case proposes a physical cross-attention module, which decouples the positions of the input observation samples and the output samples to be predicted, and autonomously learns a persistent latent space from the data. Based on the physical cross-attention module, a latent neural operator model is further designed.

pipe

Structure diagram of Latent Neural Operator

2. Implementation of Latent Neural Operator

This section will explain how to implement the construction, training, testing and evaluation of the latent neural operator model based on PaddleScience code. The directory structure of the case is as follows.

LatentNO/
├── config
│     ├── LatentNO-Darcy.yaml
│     └── ...
├── datas
│   ├── Darcy_train.npy
│   ├── Darcy_val.npy
│   └── ...
├── LatentNO-steady.py
├── LatentNO-time.py
└── utils.py

2.1 Dataset Construction and Loading

For different tasks involved in this project, the dataset of this example can be divided into two categories: one is static data (Darcy, Pipe, Elasticity); the other is time-dependent data (NS2d). To be compatible with the automatic training process under the PaddleScience framework, this case designed and implemented dedicated dataset classes, corresponding to static scenarios and dynamic scenarios respectively, named LatentNODataset and LatentNODataset_time. Next, the construction of LatentNODataset will be explained in detail first.

For static data tasks, data is first stored in the ./datas directory in the form of .npy files. Each file is named according to the data name and mode (training set or validation set), such as Darcy_train.npy or Darcy_val.npy. These files internally store dictionaries containing three key variables x, y1 and y2. Both x and y1 will be used as inputs to the model, while y2 is the final prediction target. During the loading phase, data will be converted to Paddle tensor format, and the shape will be adjusted according to requirements to meet the model's input requirements, and x and y1 will be concatenated when necessary.

ppsci/data/dataset/latent_no_dataset.py
data_file = osp.join("datas", f"{data_name}_{data_mode}.npy")
if not os.path.exists(data_file):
    raise FileNotFoundError(f"Data file not found: {data_file}")

dataset = np.load(data_file, allow_pickle=True).tolist()

x = np.array(dataset["x"], dtype=np.float32)
y1 = np.array(dataset["y1"], dtype=np.float32)
y2 = np.array(dataset["y2"], dtype=np.float32)

x = np.reshape(x, (x.shape[0], -1, x.shape[-1]))
y1 = np.reshape(y1, (y1.shape[0], -1, y1.shape[-1]))
y2 = np.reshape(y2, (y2.shape[0], -1, y2.shape[-1]))

if data_concat:
    y1 = np.concatenate((x, y1), axis=-1)

x_tensor = paddle.to_tensor(x)
y1_tensor = paddle.to_tensor(y1)
y2_tensor = paddle.to_tensor(y2)

To enhance the training stability and generalization ability of the model, a normalization module is also built into the dataset class. This module calculates the mean and standard deviation of each variable during the initialization phase, and automatically performs normalization when loading data. At the same time, an interface for denormalization is provided to facilitate restoration to the true physical scale during inference or visualization.

ppsci/data/dataset/latent_no_dataset.py
self.normalizer = Normalizer(x_tensor, y1_tensor, y2_tensor)

if data_normalize:
    x = self.normalizer.apply_x(x_tensor, "cpu").numpy()
    y1 = self.normalizer.apply_y1(y1_tensor, "cpu").numpy()
    y2 = self.normalizer.apply_y2(y2_tensor, "cpu").numpy()
ppsci/data/dataset/latent_no_dataset.py
class Normalizer:
    def __init__(self, x, y1, y2):
        self.x_flag = False
        self.y1_flag = False
        self.y2_flag = False
        old_x_shape = x.shape
        old_y1_shape = y1.shape
        old_y2_shape = y2.shape
        x = paddle.reshape(x, (-1, x.shape[-1]))
        y1 = paddle.reshape(y1, (-1, y1.shape[-1]))
        y2 = paddle.reshape(y2, (-1, y2.shape[-1]))
        self.x_mean = paddle.mean(x, axis=0)
        self.x_std = paddle.std(x, axis=0) + 1e-8
        self.y1_mean = paddle.mean(y1, axis=0)
        self.y1_std = paddle.std(y1, axis=0) + 1e-8
        self.y2_mean = paddle.mean(y2, axis=0)
        self.y2_std = paddle.std(y2, axis=0) + 1e-8
        x = paddle.reshape(x, old_x_shape)
        y1 = paddle.reshape(y1, old_y1_shape)
        y2 = paddle.reshape(y2, old_y2_shape)

    def is_apply_x(self):
        return self.x_flag

    def is_apply_y1(self):
        return self.y1_flag

    def is_apply_y2(self):
        return self.y2_flag

    def apply_x(self, x, device, inverse=False):
        self.x_mean = self.x_mean.to(device)
        self.x_std = self.x_std.to(device)

        old_x_shape = x.shape
        x = paddle.reshape(x, (-1, x.shape[-1]))
        if not inverse:
            x = (x - self.x_mean) / self.x_std
            self.x_flag = True
        else:
            x = x * self.x_std + self.x_mean
        x = paddle.reshape(x, old_x_shape)
        return x

    def apply_y1(self, y1, device, inverse=False):
        self.y1_mean = self.y1_mean.to(device)
        self.y1_std = self.y1_std.to(device)

        old_y1_shape = y1.shape
        y1 = paddle.reshape(y1, (-1, y1.shape[-1]))
        if not inverse:
            y1 = (y1 - self.y1_mean) / self.y1_std
            self.y1_flag = True
        else:
            y1 = y1 * self.y1_std + self.y1_mean
        y1 = paddle.reshape(y1, old_y1_shape)
        return y1

    def apply_y2(self, y2, device, inverse=False):
        self.y2_mean = self.y2_mean.to(device)
        self.y2_std = self.y2_std.to(device)

        old_y2_shape = y2.shape
        y2 = paddle.reshape(y2, (-1, y2.shape[-1]))
        if not inverse:
            y2 = (y2 - self.y2_mean) / self.y2_std
            self.y2_flag = True
        else:
            y2 = y2 * self.y2_std + self.y2_mean
        y2 = paddle.reshape(y2, old_y2_shape)
        return y2

During the training process, by calling the __getitem__ method, the input, label and corresponding weight of a piece of data can be returned by index, thereby seamlessly connecting to the training pipeline.

ppsci/data/dataset/latent_no_dataset.py
def __getitem__(self, index: int):
    input_item = {
        key: paddle.to_tensor(value[index], dtype="float32")
        for key, value in self.input_data.items()
    }

    label_item = {
        key: paddle.to_tensor(value[index], dtype="float32")
        for key, value in self.label_data.items()
    }

    weight_item = {}
    if self.weight_dict:
        for key in self.label_keys:
            if key in self.weight_dict:
                weight_item[key] = self.weight_dict[key]

    if self.transform_fn:
        input_item, label_item = self.transform_fn(input_item, label_item)

    return input_item, label_item, weight_item

The entire data is stored in a dictionary in the format agreed by PaddleScience. input is used to provide input tensors, label is used to provide supervision signals, and weight_dict allows users to assign weights to different loss components.

For time-dependent data tasks, the construction of the dataset is similar. The main difference is that LatentNODataset_time retains x, y1 and y2 in the input dictionary at the same time, allowing the model to directly obtain time-related context information to assist training, while the label part is still y2 to supervise the final prediction result. This design ensures the capture of time-dependent characteristics during the training process, and also provides a good data interface for subsequent long-term evolution prediction.

2.2 Model Construction

The latent neural operator includes three processes: encoding, latent space operator fitting, and decoding. In processing static data tasks, the forward propagation process of the model is expressed in PaddleScience as follows:

ppsci/arch/latent_no.py
def forward(self, inputs: dict[str, paddle.Tensor]) -> dict[str, paddle.Tensor]:
    """
    Forward pass of LatentNO.

    Args:
        inputs (dict[str, paddle.Tensor]):
            Dictionary with keys:
                - "x": Trunk input tensor of shape (B, N, trunk_dim).
                - "y1": Branch input tensor of shape (B, N, branch_dim).

    Returns:
        dict[str, paddle.Tensor]: Dictionary containing:
            - "y2": Output tensor of shape (B, N, out_dim).
    """
    x = inputs[self.input_keys[0]]  # trunk input
    y = inputs[self.input_keys[1]]  # branch input

    x = self.trunk_mlp(x)
    y = self.branch_mlp(y)

    score = self.mode_mlp(x)
    score_encode = paddle.nn.functional.softmax(score, axis=1)
    score_decode = paddle.nn.functional.softmax(score, axis=-1)

    z = paddle.matmul(paddle.transpose(score_encode, perm=[0, 2, 1]), y)
    for block in self.attn_blocks:
        z = block(z)

    r = paddle.matmul(score_decode, z)
    r = self.out_mlp(r)

    return {self.output_keys[0]: r}

pipe

Physical cross-attention module used in encoding and decoding stages

The encoding process includes two parts: input projection and input function encoding. The input projection operation promotes the tuple composed of the sampling position of the observation function input in sequence form in the geometric space and the corresponding physical quantity value to a higher vector dimension. Geometric space is the original space of PDE input or output, which contains several sample points, each sample consisting of multi-dimensional spatial position coordinates and multi-dimensional physical quantity values. Through the input projection operation, the observation function can be projected into a space where it is easier to capture non-local features. The input function encoding operation maps the projected input data from the geometric space to the latent space. The latent neural operator model uses the representation Token of the imaginary sampling position in the latent space to re-represent the input function, where the number of imaginary sampling positions is much smaller than the number of sampling points of the input function in the geometric space, achieving the purpose of sequence compression. The latent neural operator model uses physical cross-attention to complete the encoding operation of the input function from geometric space to latent space. The relevant code for the encoding operation is expressed in PaddleScience as follows:

ppsci/arch/latent_no.py
x = inputs[self.input_keys[0]]  # trunk input
y = inputs[self.input_keys[1]]  # branch input

x = self.trunk_mlp(x)
y = self.branch_mlp(y)

score = self.mode_mlp(x)
score_encode = paddle.nn.functional.softmax(score, axis=1)
score_decode = paddle.nn.functional.softmax(score, axis=-1)

z = paddle.matmul(paddle.transpose(score_encode, perm=[0, 2, 1]), y)
ppsci/arch/latent_no.py
self.trunk_mlp = LatentMLP(trunk_dim, n_dim, n_dim, n_layer)
self.branch_mlp = LatentMLP(branch_dim, n_dim, n_dim, n_layer)
self.mode_mlp = LatentMLP(n_dim, n_dim, n_mode, n_layer)
ppsci/arch/latent_no.py
class LatentMLP(paddle.nn.Layer):
    """
    Multi-layer perceptron with residual connections used for trunk/branch/mode/out projections.

    Args:
        input_dim (int): Input feature dimension.
        hidden_dim (int): Hidden feature dimension.
        output_dim (int): Output feature dimension.
        n_layer (int): Number of hidden layers (residual blocks).

    Input:
        x (paddle.Tensor): shape (B, N, input_dim) or (..., input_dim)

    Returns:
        paddle.Tensor: shape (B, N, output_dim)
    """

    def __init__(
        self, input_dim: int, hidden_dim: int, output_dim: int, n_layer: int
    ) -> None:
        super().__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.n_layer = n_layer
        self.act = paddle.nn.GELU()
        self.input = paddle.nn.Linear(self.input_dim, self.hidden_dim)
        self.hidden = paddle.nn.LayerList(
            [
                paddle.nn.Linear(self.hidden_dim, self.hidden_dim)
                for _ in range(self.n_layer)
            ]
        )
        self.output = paddle.nn.Linear(self.hidden_dim, self.output_dim)

    def forward(self, x: paddle.Tensor) -> paddle.Tensor:
        """
        Args:
            x (paddle.Tensor): Input tensor of shape (B, N, input_dim).

        Returns:
            paddle.Tensor: Output tensor of shape (B, N, output_dim).
        """
        r = self.act(self.input(x))
        for i in range(0, self.n_layer):
            r = r + self.act(self.hidden[i](r))
        r = self.output(r)
        return r

After completing the encoding of the input function, the length of the sequence to be processed is significantly reduced to the number of imaginary sampling point positions in the latent space, so extracting and converting the features of the input function in the latent space is more efficient than in the original geometric space. The latent neural operator model fits the solution operator of the PDE problem in the latent space, uses stacked Transformer layers, and uses the self-attention mechanism as the kernel integral operator. Each layer performs information aggregation on the representation Tokens at the imaginary sampling positions in the latent space, thereby converting the features of the input function into the features of the output function. Fitting the solution operator based on shorter feature sequences in the latent space gives the latent neural operator model higher solving efficiency on PDE problems, and is also compatible with kernel integral operators with stronger modeling capabilities, thereby ensuring excellent solving accuracy on PDE problems. The stacked structure in the latent space is expressed in PaddleScience as follows:

ppsci/arch/latent_no.py
for block in self.attn_blocks:
    z = block(z)
ppsci/arch/latent_no.py
self.attn_blocks = paddle.nn.Sequential(
    *[AttentionBlock(n_mode, n_dim, n_head) for _ in range(n_block)]
)
ppsci/arch/latent_no.py
class AttentionBlock(paddle.nn.Layer):
    """
    Transformer-style block: LayerNorm -> Self-Attention (residual) -> LayerNorm -> MLP (residual).

    Args:
        n_mode (int): Sequence length / number of modes (documentation).
        n_dim (int): Feature dimension D.
        n_head (int): Number of attention heads.

    Input:
        y (paddle.Tensor): shape (B, N, D)

    Returns:
        paddle.Tensor: shape (B, N, D)
    """

    def __init__(self, n_mode: int, n_dim: int, n_head: int) -> None:
        super().__init__()
        self.n_mode = n_mode
        self.n_dim = n_dim
        self.n_head = n_head

        self.self_attn = SelfAttention(
            self.n_mode, self.n_dim, self.n_head, Attention_Vanilla
        )

        self.ln1 = paddle.nn.LayerNorm(self.n_dim)
        self.ln2 = paddle.nn.LayerNorm(self.n_dim)

        self.mlp = paddle.nn.Sequential(
            paddle.nn.Linear(self.n_dim, self.n_dim * 2),
            paddle.nn.GELU(),
            paddle.nn.Linear(self.n_dim * 2, self.n_dim),
        )

    def forward(self, y: paddle.Tensor) -> paddle.Tensor:
        """
        Forward pass of the Transformer-style attention block.

        Args:
            y (paddle.Tensor): Input tensor of shape (B, N, D).

        Returns:
            paddle.Tensor: Output tensor of shape (B, N, D).
        """
        y1 = self.ln1(y)
        y = y + self.self_attn(y1)
        y2 = self.ln2(y)
        y = y + self.mlp(y2)
        return y

The decoding process includes two parts: output function decoding and output projection. The output function decoding operation maps the representation Token at the imaginary sampling position converted by the stacked Transformer layer back to the geometric space. The latent neural operator model uses physical cross-attention again to decode the representation vector of the output function representation sequence in the latent space at the corresponding position to be predicted according to the query position of the output function. The output projection operation projects the decoded representation vector at the position to be predicted into the predicted low-dimensional physical quantity value. The relevant code for the decoding process is expressed in PaddleScience as follows:

ppsci/arch/latent_no.py
r = paddle.matmul(score_decode, z)
r = self.out_mlp(r)
ppsci/arch/latent_no.py
self.out_mlp = LatentMLP(n_dim, n_dim, out_dim, n_layer)

When dealing with time-dependent data, the overall structure of the model remains unchanged, but in order to meet the automatic training requirements of PaddleScience, the LatentNO_time class rewrites the forward propagation function to implement a time-unroll / autoregressive process. Inside the time iteration, LatentNO_time introduces two different next-step input sources: during training, externally provided y2 (label information) is additionally used, and the aligned segment y2[..., t:t+step] is cut out from y2 as part of the next input; during inference, the model's pred_step is used as the next input and stop_gradient=True is executed on it to block cross-step gradient propagation. Regardless of which source is used, the next current_y is updated through a sliding window method of "keeping the trunk part + discarding the earliest time slots + splicing new fragments at the end". Expressed in PaddleScience as follows:

ppsci/arch/latent_no.py
def forward(self, inputs: dict[str, paddle.Tensor]) -> dict[str, paddle.Tensor]:
    """
    Forward pass of LatentNO_time.

    Args:
        inputs (dict[str, paddle.Tensor]):
            Dictionary with keys:
                - "x": Trunk input tensor of shape (B, N, trunk_dim).
                - "y1": Branch input tensor of shape (B, N, branch_dim).
                - "y2" (optional): Ground-truth sequence for teacher forcing (B, N, T).

    Returns:
        dict[str, paddle.Tensor]:
            - If time_unroll == False:
                {"y2": (B, N, out_dim)}
            - If time_unroll == True:
                {"y2": (B, N, T_total), "y2_steps": (B, N, step, num_steps)}
    """
    x = inputs[self.input_keys[0]]
    y1 = inputs[self.input_keys[1]]
    y2_gt = inputs.get(self.input_keys[2], None)  # optional ground truth

    # simple single-step (original behaviour)
    if not getattr(self, "time_unroll", False):
        r = self._single_step_predict(x, y1)
        return {"y2": r}

    # time-unroll (autoregressive) mode
    if self.T is None or self.step is None:
        raise ValueError("time_unroll enabled but model.T or model.step is None.")
    if not hasattr(self, "trunk_split") or self.trunk_split is None:
        raise ValueError("time_unroll enabled but model.trunk_split is not set.")

    current_y = y1
    pred_steps = []

    # iterate time: mimic original `for t in range(0, T, step)`
    for t in range(0, self.T, self.step):
        # predict one step
        pred_step = self._single_step_predict(x, current_y)

        # append for final concatenation
        pred_steps.append(pred_step)

        # - training + use_teacher_forcing -> use GT slice from inputs["y2"] (teacher forcing)
        # - otherwise -> use pred_step (autoregressive)
        if (
            self.training
            and getattr(self, "use_teacher_forcing", False)
            and (y2_gt is not None)
        ):
            # use GT slice (must exist and have time alignment)
            next_input_part = y2_gt[..., t : t + self.step]
        else:
            # use prediction as next input; make sure to block gradient so predictions don't backprop through time
            pred_step.stop_gradient = True
            next_input_part = pred_step

        # update current_y: keep trunk part, drop earliest step slot(s), append the next part
        left = current_y[..., : self.trunk_split]
        right = current_y[..., self.trunk_split + self.step :]
        current_y = paddle.concat((left, right, next_input_part), axis=-1)

    # final outputs: concat along time dimension (last dim of out is per-step time dim)
    pred_full = paddle.concat(pred_steps, axis=-1)
    pred_steps_stack = paddle.stack(pred_steps, axis=-1)

    return {
        self.output_keys[0]: pred_full,
        f"{self.output_keys[0]}_steps": pred_steps_stack,
    }

In the training or validation function, the model is instantiated by the following code.

examples/LatentNO/LatentNO-steady.py
model = ppsci.arch.LatentNO(**cfg.MODEL)
examples/LatentNO/LatentNO-time.py
model = ppsci.arch.LatentNO_time(**cfg.MODEL)

2.3 Constraint Construction

This case adopts supervised learning. According to the API structure description of PaddleScience, the built-in SupervisedConstraint is used to construct supervised constraints. Expressed in PaddleScience code as follows (testing constraints are similar, the difference is that in some tasks, denormalization operations need to be performed when calculating test loss, that is, obtain the normalizer of the training set through sup_constraint.data_loader.dataset.normalizer and pass it into RelLpLoss as a parameter)

examples/LatentNO/LatentNO-steady.py
lr_scheduler = ppsci.optimizer.lr_scheduler.OneCycleLR(
    epochs=cfg.TRAIN.epochs,
    iters_per_epoch=cfg.TRAIN.iters_per_epoch,
    max_learning_rate=cfg.TRAIN.lr,
    divide_factor=cfg.TRAIN.div_factor,
    end_learning_rate=cfg.TRAIN.lr
    / cfg.TRAIN.div_factor
    / cfg.TRAIN.final_div_factor,
    phase_pct=cfg.TRAIN.pct_start,
)()

The loss function is relative Lp loss. For static tasks, the loss function RelLpLoss is expressed as follows.

examples/LatentNO/utils.py
class RelLpLoss(base.Metric):
    def __init__(
        self,
        p: int,
        key: str = "y2",
        normalizer: Optional[object] = None,
        eps: float = 1e-12,
        keep_batch: bool = False,
    ):
        if keep_batch:
            raise ValueError(f"keep_batch should be False, but got {keep_batch}.")
        super(RelLpLoss, self).__init__(keep_batch)
        self.p = p
        self.key = key
        self.normalizer = normalizer
        self.eps = eps

    def forward(
        self,
        output_dict: Dict[str, paddle.Tensor],
        label_dict: Dict[str, paddle.Tensor],
        weight_dicts: Optional[Dict] = None,
    ) -> Dict[str, "paddle.Tensor"]:
        losses: Dict[str, paddle.Tensor] = {}
        for label_key in label_dict:
            pred_key = self.key if self.key in output_dict else label_key
            pred = output_dict[pred_key]
            target = label_dict[label_key]

            if self.normalizer is not None:
                pred = self.normalizer.apply_y2(pred, device="cpu", inverse=True)
                target = self.normalizer.apply_y2(target, device="cpu", inverse=True)

            error = paddle.sum(
                paddle.abs(pred - target) ** self.p,
                axis=tuple(range(1, len(pred.shape))),
            ) ** (1.0 / self.p)
            target_norm = paddle.sum(
                paddle.abs(target) ** self.p, axis=tuple(range(1, len(target.shape)))
            ) ** (1.0 / self.p)

            denom = target_norm.clip(min=self.eps)
            rloss = paddle.mean(error / denom)
            losses[label_key] = rloss

        return losses

Similarly, adjustments have been made for time-dependent tasks to adapt to the automatic training framework. RelLpLoss_time implements gradient backpropagation update using cumulative error per time step through the use_full_sequence parameter, and uses full sequence one-time error as the evaluation metric.

examples/LatentNO/utils.py
class RelLpLoss_time(base.Metric):
    def __init__(
        self,
        p: int,
        key: str = "y2",
        normalizer: Optional[object] = None,
        eps: float = 1e-12,
        keep_batch: bool = False,
        use_full_sequence: bool = True,
    ):
        if keep_batch:
            raise ValueError(f"keep_batch should be False, but got {keep_batch}.")
        super(RelLpLoss_time, self).__init__(keep_batch)
        self.p = p
        self.key = key
        self.normalizer = normalizer
        self.eps = eps
        self.use_full_sequence = use_full_sequence  # True: use full sequence loss; False: accumulate step-wise losses

    def forward(
        self,
        output_dict: Dict[str, paddle.Tensor],
        label_dict: Dict[str, paddle.Tensor],
        weight_dicts: Optional[Dict] = None,
    ) -> Dict[str, "paddle.Tensor"]:
        losses: Dict[str, paddle.Tensor] = {}
        for label_key in label_dict:
            if f"{self.key}_steps" in output_dict and not self.use_full_sequence:
                # Method 1: Accumulate losses at each timestep (matches backpropagation loss)
                pred_stack = output_dict[f"{self.key}_steps"]
                target_full = label_dict[label_key]
                step = pred_stack.shape[2]
                num_steps = pred_stack.shape[3]

                total_loss = paddle.to_tensor(0.0)
                for s in range(num_steps):
                    pred_s = pred_stack[..., s]
                    t_start = s * step
                    t_end = t_start + step
                    tgt_s = target_full[..., t_start:t_end]

                    if self.normalizer is not None:
                        pred_s = self.normalizer.apply_y2(pred_s, inverse=True)
                        tgt_s = self.normalizer.apply_y2(tgt_s, inverse=True)

                    # Compute Lp error for current timestep
                    error = paddle.sum(
                        paddle.abs(pred_s - tgt_s) ** self.p,
                        tuple(range(1, len(pred_s.shape))),
                    ) ** (1 / self.p)
                    target_norm = paddle.sum(
                        paddle.abs(tgt_s) ** self.p, tuple(range(1, len(tgt_s.shape)))
                    ) ** (1 / self.p)
                    step_loss = paddle.mean(error / target_norm)
                    total_loss = total_loss + step_loss

                losses[label_key] = total_loss

            else:
                # Method 2: Use full sequence loss
                pred_full = (
                    output_dict[self.key]
                    if self.key in output_dict
                    else output_dict[label_key]
                )
                target_full = label_dict[label_key]

                if self.normalizer is not None:
                    pred_full = self.normalizer.apply_y2(pred_full, inverse=True)
                    target_full = self.normalizer.apply_y2(target_full, inverse=True)

                error = paddle.sum(
                    paddle.abs(pred_full - target_full) ** self.p,
                    tuple(range(1, len(pred_full.shape))),
                ) ** (1 / self.p)
                target_norm = paddle.sum(
                    paddle.abs(target_full) ** self.p,
                    tuple(range(1, len(target_full.shape))),
                ) ** (1 / self.p)
                losses[label_key] = paddle.mean(error / target_norm)

        return losses

2.4 Optimizer Construction

The trainer uses the AdamW optimizer, the learning rate setting is given by the configuration file, and OneCycleLR is used to control the learning rate change. Expressed in PaddleScience code as follows:

examples/LatentNO/LatentNO-steady.py
lr_scheduler = ppsci.optimizer.lr_scheduler.OneCycleLR(
    epochs=cfg.TRAIN.epochs,
    iters_per_epoch=cfg.TRAIN.iters_per_epoch,
    max_learning_rate=cfg.TRAIN.lr,
    divide_factor=cfg.TRAIN.div_factor,
    end_learning_rate=cfg.TRAIN.lr
    / cfg.TRAIN.div_factor
    / cfg.TRAIN.final_div_factor,
    phase_pct=cfg.TRAIN.pct_start,
)()

optimizer = ppsci.optimizer.AdamW(
    lr_scheduler,
    weight_decay=cfg.TRAIN.weight_decay,
    grad_clip=paddle.nn.ClipGradByNorm(clip_norm=cfg.TRAIN.clip_norm),
    beta1=cfg.TRAIN.beta0,
    beta2=cfg.TRAIN.beta1,
)(model)

metric_dict = {"L2Rel": RelLpLoss(p=2, key="y2", normalizer=normalizer)}

2.5 Model Training

After completing the above settings, you only need to pass the instantiated objects to ppsci.solver.Solver in order, and then start training. Expressed in PaddleScience code as follows:

examples/LatentNO/LatentNO-steady.py
solver = ppsci.solver.Solver(
    model=model,
    optimizer=optimizer,
    constraint=constraint,
    validator=validator,
    cfg=cfg,
)

solver.train()

3. Complete Code

examples/LatentNO/LatentNO-steady.py
import hydra
import paddle
from omegaconf import DictConfig
from utils import RelLpLoss

import ppsci


def train(cfg: DictConfig):
    model = ppsci.arch.LatentNO(**cfg.MODEL)

    train_dataloader_cfg = {
        "dataset": {
            "name": "LatentNODataset",
            "data_name": cfg.data_name,
            "data_mode": "train",
            "data_normalize": cfg.data_normalize,
            "data_concat": cfg.data_concat,
            "input_keys": ("x", "y1"),
            "label_keys": ("y2",),
        },
        "sampler": {"name": "BatchSampler", "drop_last": True, "shuffle": True},
        "batch_size": cfg.batch_size,
        "num_workers": cfg.get("num_workers", 0),
    }

    eval_dataloader_cfg = {
        "dataset": {
            "name": "LatentNODataset",
            "data_name": cfg.data_name,
            "data_mode": "val",
            "data_normalize": cfg.data_normalize,
            "data_concat": cfg.data_concat,
            "input_keys": ("x", "y1"),
            "label_keys": ("y2",),
        },
        "sampler": {"name": "BatchSampler", "drop_last": True, "shuffle": False},
        "batch_size": cfg.batch_size,
        "num_workers": cfg.get("num_workers", 0),
    }

    train_loss_fn = RelLpLoss(p=2, key="y2", normalizer=None)

    sup_constraint = ppsci.constraint.SupervisedConstraint(
        train_dataloader_cfg,
        train_loss_fn,
        output_expr={"y2": lambda out: out["y2"]},
        name="SupTrain",
    )
    if cfg.data_normalize:
        normalizer = sup_constraint.data_loader.dataset.normalizer
    else:
        normalizer = None
    constraint = {sup_constraint.name: sup_constraint}

    cfg.TRAIN.iters_per_epoch = len(sup_constraint.data_loader)
    lr_scheduler = ppsci.optimizer.lr_scheduler.OneCycleLR(
        epochs=cfg.TRAIN.epochs,
        iters_per_epoch=cfg.TRAIN.iters_per_epoch,
        max_learning_rate=cfg.TRAIN.lr,
        divide_factor=cfg.TRAIN.div_factor,
        end_learning_rate=cfg.TRAIN.lr
        / cfg.TRAIN.div_factor
        / cfg.TRAIN.final_div_factor,
        phase_pct=cfg.TRAIN.pct_start,
    )()

    optimizer = ppsci.optimizer.AdamW(
        lr_scheduler,
        weight_decay=cfg.TRAIN.weight_decay,
        grad_clip=paddle.nn.ClipGradByNorm(clip_norm=cfg.TRAIN.clip_norm),
        beta1=cfg.TRAIN.beta0,
        beta2=cfg.TRAIN.beta1,
    )(model)

    metric_dict = {"L2Rel": RelLpLoss(p=2, key="y2", normalizer=normalizer)}

    val_loss_fn = RelLpLoss(p=2, key="y2", normalizer=normalizer)

    sup_validator = ppsci.validate.SupervisedValidator(
        eval_dataloader_cfg,
        val_loss_fn,
        output_expr={"y2": lambda out: out["y2"]},
        metric=metric_dict,
        name="SupVal",
    )
    validator = {sup_validator.name: sup_validator}

    solver = ppsci.solver.Solver(
        model=model,
        optimizer=optimizer,
        constraint=constraint,
        validator=validator,
        cfg=cfg,
    )

    solver.train()
    solver.eval()


def evaluate(cfg: DictConfig):
    train_ds = ppsci.data.dataset.LatentNODataset(
        cfg.data_name,
        "train",
        cfg.data_normalize,
        cfg.data_concat,
        input_keys=("x", "y1"),
        label_keys=("y2",),
    )
    if cfg.data_normalize:
        normalizer = train_ds.normalizer
    else:
        normalizer = None

    eval_loss_fn = RelLpLoss(p=2, key="y2", normalizer=normalizer)

    model = ppsci.arch.LatentNO(**cfg.MODEL)

    eval_dataloader_cfg = {
        "dataset": {
            "name": "LatentNODataset",
            "data_name": cfg.data_name,
            "data_mode": "val",
            "data_normalize": cfg.data_normalize,
            "data_concat": cfg.data_concat,
            "input_keys": ("x", "y1"),
            "label_keys": ("y2",),
        },
        "sampler": {"name": "BatchSampler", "drop_last": True, "shuffle": False},
        "batch_size": cfg.batch_size,
        "num_workers": cfg.get("num_workers", 0),
    }

    metric_dict = {"L2Rel": RelLpLoss(p=2, key="y2", normalizer=normalizer)}

    validator = ppsci.validate.SupervisedValidator(
        eval_dataloader_cfg,
        eval_loss_fn,
        output_expr={"y2": lambda out: out["y2"]},
        metric=metric_dict,
        name="Evaluation",
    )

    solver = ppsci.solver.Solver(
        model=model,
        validator={"eval": validator},
        pretrained_model_path=cfg.EVAL.pretrained_model_path,
    )

    solver.eval()


@hydra.main(
    version_base=None, config_path="./config", config_name="LatentNO-Darcy.yaml"
)
def main(cfg: DictConfig):

    if cfg.mode == "train":
        train(cfg)
    elif cfg.mode == "eval":
        evaluate(cfg)
    else:
        raise ValueError(f"cfg.mode should in ['train', 'eval'], but got '{cfg.mode}'")


if __name__ == "__main__":
    main()

4. Result Display

The following shows the performance of the latent neural operator in several PDE forward problems.

pipe

Performance of latent neural operator in several PDE forward problems

5. References

[1] Wang T, Wang C. Latent neural operator for solving forward and inverse pde problems[J]. Advances in Neural Information Processing Systems, 2024, 37: 33085-33107.