Skip to content

hPINNs(PINN with hard constraints)

AI Studio Quick Experience

# linux
wget -c https://paddle-org.bj.bcebos.com/paddlescience/datasets/hPINNs/hpinns_holo_train.mat -P ./datasets/
wget -c https://paddle-org.bj.bcebos.com/paddlescience/datasets/hPINNs/hpinns_holo_valid.mat -P ./datasets/
# windows
# curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/hPINNs/hpinns_holo_train.mat --create-dirs -o ./datasets/hpinns_holo_train.mat
# curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/hPINNs/hpinns_holo_valid.mat --create-dirs -o ./datasets/hpinns_holo_valid.mat
python holography.py
# linux
wget -c https://paddle-org.bj.bcebos.com/paddlescience/datasets/hPINNs/hpinns_holo_train.mat -P ./datasets/
wget -c https://paddle-org.bj.bcebos.com/paddlescience/datasets/hPINNs/hpinns_holo_valid.mat -P ./datasets/
# windows
# curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/hPINNs/hpinns_holo_train.mat --create-dirs -o ./datasets/hpinns_holo_train.mat
# curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/hPINNs/hpinns_holo_valid.mat --create-dirs -o ./datasets/hpinns_holo_valid.mat
python holography.py mode=eval EVAL.pretrained_model_path=https://paddle-org.bj.bcebos.com/paddlescience/models/hPINNs/hpinns_pretrained.pdparams
python holography.py mode=export
# linux
wget -c https://paddle-org.bj.bcebos.com/paddlescience/datasets/hPINNs/hpinns_holo_train.mat -P ./datasets/
wget -c https://paddle-org.bj.bcebos.com/paddlescience/datasets/hPINNs/hpinns_holo_valid.mat -P ./datasets/
# windows
# curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/hPINNs/hpinns_holo_train.mat --create-dirs -o ./datasets/hpinns_holo_train.mat
# curl https://paddle-org.bj.bcebos.com/paddlescience/datasets/hPINNs/hpinns_holo_valid.mat --create-dirs -o ./datasets/hpinns_holo_valid.mat
python holography.py mode=infer
Pretrained Model Metrics
hpinns_pretrained.pdparams loss(opt_sup): 0.05352
MSE.eval_metric(opt_sup): 0.00002
loss(val_sup): 0.02205
MSE.eval_metric(val_sup): 0.00001

1. Background Introduction

Solving partial differential equations (PDEs) is a fundamental physical problem. In the past few decades, various numerical solutions for systems of partial differential equations represented by finite difference method (FDM), finite volume method (FVM), and finite element method (FEM) have matured. With the rapid development of artificial intelligence technology, using deep learning to solve partial differential equations has become a new research trend. PINNs (Physics-informed neural networks) are deep learning networks that incorporate physical constraints. Therefore, compared with pure data-driven neural network learning, PINNs can learn models with stronger generalization ability using fewer data samples. Its application scope includes but is not limited to fluid mechanics, heat conduction, electromagnetic fields, quantum mechanics and other fields.

The constraints in traditional PINNs networks are soft constraints, that is, PDE (partial differential equation) participates in network training as a loss term. In this case, hPINNs strictly incorporate constraints into the network structure by modifying the network output, forming a more effective hard constraint.

At the same time, hPINNs designed different combinations of constraints and conducted experiments under 3 conditions: soft constraints, hard constraints with regularization, and hard constraints applying augmented Lagrangian method. This document mainly explains the hard constraints applying augmented Lagrangian method, but the three training modes can be switched through the train_mode parameter in the complete code.

For this problem, please refer to AI Studio Project.

2. Problem Definition

This problem uses hPINNs to solve the holography problem based on Fourier optics, aiming to design the permittivity map of the scattering plate. This method makes the propagation intensity of the scattered light of the permittivity map have the shape of the target function.

objective function:

\[ \begin{aligned} \mathcal{J}(E) &= \dfrac{1}{Area(\Omega_3)} \left\| |E(x,y)|^2-f(x,y)\right\|^2_{2,\Omega_3} \\ &= \dfrac{1}{Area(\Omega_3)} \int_{\Omega_3} (|E(x,y)|^2-f(x,y))^2 {\rm d}x {\rm d}y \end{aligned} \]

Where E is the electric field intensity: \(\vert E\vert^2 = (\mathfrak{R} [E])^2+(\mathfrak{I} [E])^2\)

target function:

\[ f(x,y) = \begin{cases} \begin{aligned} & 1, \ (x,y) \in [-0.5,0.5] \cap [1,2]\\ & 0, \ otherwise \end{aligned} \end{cases} \]

PDE formula:

\[ \nabla^2 E + \varepsilon \omega^2 E = -i \omega \mathcal{J} \]

3. Problem Solving

Next, we will explain how to convert the problem into PaddleScience code step by step and solve the problem using deep learning methods. In order to quickly understand PaddleScience, only key steps such as model construction and constraint construction are described below, while other details please refer to API Documentation.

3.1 Dataset Introduction

The dataset is a processed holography dataset, containing \(x, y\) of training and test data, and the value \(bound\) representing the boundary between optimizer area data and full area data, stored in the form of a dictionary in a .mat file.

Before running the code for this problem, please download the training dataset and validation dataset according to the following commands:

wget -c -P ./datasets/ https://paddle-org.bj.bcebos.com/paddlescience/datasets/hPINNs/hpinns_holo_train.mat
wget -c -P ./datasets/ https://paddle-org.bj.bcebos.com/paddlescience/datasets/hPINNs/hpinns_holo_valid.mat

3.2 Model Construction

The model structure diagram of the holography problem is:

holography-arch

hPINNs network model for holography problem

In the holography problem, after applying the PMLs (perfectly matched layers) method, the PDE formula becomes:

\[ \dfrac{1}{1+i \dfrac{\sigma_x\left(x\right)}{\omega}} \dfrac{\partial}{\partial x} \left(\dfrac{1}{1+i \dfrac{\sigma_x\left(x\right)}{\omega}} \dfrac{\partial E}{\partial x}\right)+\dfrac{1}{1+i \dfrac{\sigma_y\left(y\right)}{\omega}} \dfrac{\partial}{\partial y} \left(\dfrac{1}{1+i \dfrac{\sigma_y\left(y\right)}{\omega}} \dfrac{\partial E}{\partial y}\right) + \varepsilon \omega^2 E = -i \omega \mathcal{J} \]

For PMLs method, please refer to Related Paper.

In this problem, the frequency \(\omega\) is a constant \(\dfrac{2\pi}{\mathcal{P}}\) (\(\mathcal{P}\) is Period), the unknown quantity \(E\) to be solved is related to the position parameter \((x, y)\). In this example, the permittivity \(\varepsilon\) is also an unknown quantity, \(\sigma_x(x)\) and \(\sigma_y(y)\) are variables related to \(x, y\) respectively obtained by PMLs. Here we use a relatively simple MLP (Multilayer Perceptron) to represent the mapping function \(f: \mathbb{R}^2 \to \mathbb{R}^2\) from \((x, y)\) to \((E, \varepsilon)\). However, as shown in the network structure above, in this problem \(E\) is divided into two parts \((\mathfrak{R} [E],\mathfrak{I} [E])\) according to the real part and imaginary part, and 3 parallel MLP networks are used to map \((\mathfrak{R} [E], \mathfrak{I} [E], \varepsilon)\) respectively. The mapping function is \(f_i: \mathbb{R}^2 \to \mathbb{R}^1\), that is:

\[ \mathfrak{R} [E] = f_1(x,y), \ \mathfrak{R} [E] = f_2(x,y), \ \varepsilon = f_3(x,y) \]

In the above formula, \(f_1, f_2, f_3\) are each an MLP model, and the three together form a Model List, expressed in PaddleScience code as follows

model_re = ppsci.arch.MLP(**cfg.MODEL.re_net)
model_im = ppsci.arch.MLP(**cfg.MODEL.im_net)
model_eps = ppsci.arch.MLP(**cfg.MODEL.eps_net)

In order to access the values of specific variables accurately and quickly during calculation, we specify here that the input variable names of the network model are ("x_cos_1","x_sin_1",...,"x_cos_6","x_sin_6","y","y_cos_1","y_sin_1"), and the output variable names are ("e_re",), ("e_im",), ("eps",) respectively. Note that the input variables here are much more than the two variables \((x, y)\), because as shown in the figure above, the input of the model is actually the terms of the Fourier expansion of \((x, y)\) rather than themselves. The training data provided in the dataset are \((x, y)\) values, which means we need to transform the input. At the same time, as shown in the figure above, due to the existence of hard constraints, the output variable name of the model is not the final output, so the output also needs to be transformed.

3.3 transform Construction

The transform of the input is the transformation of variables \((x, y)\) to \((\cos(\omega x),\sin(\omega x),...,\cos(6 \omega x),\sin(6 \omega x),y,\cos(\omega y),\sin(\omega y))\), and the output transform is the hard constraint on \((\mathfrak{R} [E], \mathfrak{I} [E], \varepsilon)\) respectively. The code is as follows

# transform
def transform_in(input):
    # Periodic BC in x
    P = BOX[1][0] - BOX[0][0] + 2 * DPML
    w = 2 * np.pi / P
    x, y = input["x"], input["y"]
    input_transformed = {}
    for t in range(1, 7):
        input_transformed[f"x_cos_{t}"] = paddle.cos(t * w * x)
        input_transformed[f"x_sin_{t}"] = paddle.sin(t * w * x)
    input_transformed["y"] = y
    input_transformed["y_cos_1"] = paddle.cos(OMEGA * y)
    input_transformed["y_sin_1"] = paddle.sin(OMEGA * y)

    return input_transformed


def transform_out_all(input, var):
    y = input["y"]
    # Zero Dirichlet BC
    a, b = BOX[0][1] - DPML, BOX[1][1] + DPML
    t = (1 - paddle.exp(a - y)) * (1 - paddle.exp(y - b))
    return t * var


def transform_out_real_part(input, out):
    re = out["e_re"]
    trans_out = transform_out_all(input, re)
    return {"e_real": trans_out}


def transform_out_imaginary_part(input, out):
    im = out["e_im"]
    trans_out = transform_out_all(input, im)
    return {"e_imaginary": trans_out}


def transform_out_epsilon(input, out):
    eps = out["eps"]
    # 1 <= eps <= 12
    eps = F.sigmoid(eps) * 11 + 1
    return {"epsilon": eps}

The corresponding transform needs to be registered for each MLP model separately, and then the 3 MLP models are formed into a Model List.

# register transform
model_re.register_input_transform(func_module.transform_in)
model_im.register_input_transform(func_module.transform_in)
model_eps.register_input_transform(func_module.transform_in)

model_re.register_output_transform(func_module.transform_out_real_part)
model_im.register_output_transform(func_module.transform_out_imaginary_part)
model_eps.register_output_transform(func_module.transform_out_epsilon)

model_list = ppsci.arch.ModelList((model_re, model_im, model_eps))

In this way, we instantiated a neural network model model list containing 3 MLP models, each MLP containing 4 layers of hidden neurons, each layer having 48 neurons, using "tanh" as the activation function, and containing input and output transforms.

3.4 Parameter and Hyperparameter Setting

We need to specify problem-related parameters, such as specifying training with hard constraints applying augmented Lagrangian method through the train_mode parameter.

# open FLAG for higher order differential operator
paddle.framework.core.set_prim_eager_enabled(True)

ppsci.utils.misc.set_random_seed(cfg.seed)
# initialize logger
logger.init_logger("ppsci", osp.join(cfg.output_dir, f"{cfg.mode}.log"), "info")
# initialize params
func_module.train_mode = cfg.TRAIN_MODE
loss_log_obj = []
# define constants
BOX = np.array([[-2, -2], [2, 3]])
DPML = 1
OMEGA = 2 * np.pi
SIGMA0 = -np.log(1e-20) / (4 * DPML**3 / 3)
l_BOX = BOX + np.array([[-DPML, -DPML], [DPML, DPML]])
beta = 2.0
mu = 2

# define variables which will be updated during training
lambda_re: np.ndarray = None
lambda_im: np.ndarray = None
loss_weight: List[float] = None
train_mode: str = None

# define log variables for plotting
loss_log = []  # record all losses, [pde, lag, obj]
loss_obj = 0.0  # record last objective loss of each k
lambda_log = []  # record all lambdas

Since the augmented Lagrangian method is applied, parameters \(\mu\) and \(\lambda\) are not constants, but change with training round \(k\). At this time, \(\beta\) is the coefficient of change, that is, every training round

\(\mu_k = \beta \mu_{k-1}\), \(\lambda_k = \beta \lambda_{k-1}\)

At the same time, hyperparameters such as training epochs and learning rate need to be specified.

    hidden_size: 48
    activation: "tanh"

# training settings
TRAIN:
  epochs: 20000
  iters_per_epoch: 1
  eval_during_train: false
  learning_rate: 0.001

3.5 Optimizer Construction

The training is divided into two stages. First, use the Adam optimizer for rough training, and then use the LBFGS optimizer to approximate the optimal point. Therefore, two optimizers are required, which also corresponds to the two EPOCHS values in the hyperparameters in the previous section.

optimizer_adam = ppsci.optimizer.Adam(cfg.TRAIN.learning_rate)(
    (model_re, model_im, model_eps)
)
optimizer_lbfgs = ppsci.optimizer.LBFGS(max_iter=cfg.TRAIN.max_iter)(
    (model_re, model_im, model_eps)
)

3.6 Constraint Construction

This problem adopts unsupervised learning, and the constraint is that the result needs to satisfy the PDE formula.

Although we are not training in a supervised learning manner, we can still use the supervised constraint SupervisedConstraint. Before defining constraints, we need to specify data reading configurations such as file paths for supervised constraints. Because there is no label data in the dataset, we need to use training data as label data when reading data, and pay attention not to use this part of "fake" label data later.

"alias_dict": {
    "e_real": "x",
    "e_imaginary": "x",
    "epsilon": "x",
    **{k: "x" for k in label_keys_derivative},
},

As above, all output labels will read the value of input x.

The following are specific contents such as constraints. Pay attention to the "fake" label data given as mentioned above:

# manually build constraint(s)
label_keys = ("x", "y", "bound", "e_real", "e_imaginary", "epsilon")
label_keys_derivative = (
    "de_re_x",
    "de_re_y",
    "de_re_xx",
    "de_re_yy",
    "de_im_x",
    "de_im_y",
    "de_im_xx",
    "de_im_yy",
)
output_expr = {
    "x": lambda out: out["x"],
    "y": lambda out: out["y"],
    "bound": lambda out: out["bound"],
    "e_real": lambda out: out["e_real"],
    "e_imaginary": lambda out: out["e_imaginary"],
    "epsilon": lambda out: out["epsilon"],
    "de_re_x": lambda out: jacobian(out["e_real"], out["x"]),
    "de_re_y": lambda out: jacobian(out["e_real"], out["y"]),
    "de_re_xx": lambda out: hessian(out["e_real"], out["x"]),
    "de_re_yy": lambda out: hessian(out["e_real"], out["y"]),
    "de_im_x": lambda out: jacobian(out["e_imaginary"], out["x"]),
    "de_im_y": lambda out: jacobian(out["e_imaginary"], out["y"]),
    "de_im_xx": lambda out: hessian(out["e_imaginary"], out["x"]),
    "de_im_yy": lambda out: hessian(out["e_imaginary"], out["y"]),
}

sup_constraint_pde = ppsci.constraint.SupervisedConstraint(
    {
        "dataset": {
            "name": "IterableMatDataset",
            "file_path": cfg.DATASET_PATH,
            "input_keys": ("x", "y", "bound"),
            "label_keys": label_keys + label_keys_derivative,
            "alias_dict": {
                "e_real": "x",
                "e_imaginary": "x",
                "epsilon": "x",
                **{k: "x" for k in label_keys_derivative},
            },
        },
    },
    ppsci.loss.FunctionalLoss(func_module.pde_loss_fun),
    output_expr,
    name="sup_constraint_pde",
)
sup_constraint_obj = ppsci.constraint.SupervisedConstraint(
    {
        "dataset": {
            "name": "IterableMatDataset",
            "file_path": cfg.DATASET_PATH,
            "input_keys": ("x", "y", "bound"),
            "label_keys": label_keys,
            "alias_dict": {"e_real": "x", "e_imaginary": "x", "epsilon": "x"},
        },
    },
    ppsci.loss.FunctionalLoss(func_module.obj_loss_fun),
    {key: lambda out, k=key: out[k] for key in label_keys},
    name="sup_constraint_obj",
)

The first parameter of SupervisedConstraint is the reading configuration of supervised constraints, where the "dataset" field represents the training dataset information used, and each field represents:

  1. name: Dataset type, here "IterableMatDataset" represents .mat type dataset read sequentially without batching;
  2. file_path: Dataset file path;
  3. input_keys: Input variable name;
  4. label_keys: Label variable name;
  5. alias_dict: Variable alias.

The second parameter is the loss function. Here FunctionalLoss is a custom loss function class reserved by PaddleScience. This class supports defining the calculation method of loss when writing code, rather than using existing methods such as MSE. In this problem, since there are multiple loss terms, multiple loss calculation functions need to be defined, which is also the reason why multiple constraints need to be constructed. For custom loss function code, please refer to Custom loss and metric.

The third parameter is the equation expression, used to describe how to calculate the constraint target. Here fill in output_expr. The calculated value will be stored in the output list according to the specified name, so as to ensure that these values can be used when calculating loss.

The fourth parameter is the name of the constraint condition. We need to name each constraint condition for subsequent indexing.

After the constraint is constructed, encapsulate it into a dictionary with the name we just named as the key for subsequent access.

constraint = {
    sup_constraint_pde.name: sup_constraint_pde,
    sup_constraint_obj.name: sup_constraint_obj,
}

3.7 Validator Construction

Similar to constraints, although this problem uses unsupervised learning, ppsci.validate.SupervisedValidator can still be used to construct a validator. There are two sampling point regions in this problem, one is a larger complete definition region, and the other is an objective region in the domain. The validator evaluates these two regions separately, so two validators need to be constructed. opt corresponds to the objective region, and val corresponds to the entire domain.

# manually build validator
sup_validator_opt = ppsci.validate.SupervisedValidator(
    {
        "dataset": {
            "name": "IterableMatDataset",
            "file_path": cfg.DATASET_PATH_VALID,
            "input_keys": ("x", "y", "bound"),
            "label_keys": label_keys + label_keys_derivative,
            "alias_dict": {
                "x": "x_opt",
                "y": "y_opt",
                "e_real": "x_opt",
                "e_imaginary": "x_opt",
                "epsilon": "x_opt",
                **{k: "x_opt" for k in label_keys_derivative},
            },
        },
    },
    ppsci.loss.FunctionalLoss(func_module.eval_loss_fun),
    output_expr,
    {"mse": ppsci.metric.FunctionalMetric(func_module.eval_metric_fun)},
    name="opt_sup",
)
sup_validator_val = ppsci.validate.SupervisedValidator(
    {
        "dataset": {
            "name": "IterableMatDataset",
            "file_path": cfg.DATASET_PATH_VALID,
            "input_keys": ("x", "y", "bound"),
            "label_keys": label_keys + label_keys_derivative,
            "alias_dict": {
                "x": "x_val",
                "y": "y_val",
                "e_real": "x_val",
                "e_imaginary": "x_val",
                "epsilon": "x_val",
                **{k: "x_val" for k in label_keys_derivative},
            },
        },
    },
    ppsci.loss.FunctionalLoss(func_module.eval_loss_fun),
    output_expr,
    {"mse": ppsci.metric.FunctionalMetric(func_module.eval_metric_fun)},
    name="val_sup",
)
validator = {
    sup_validator_opt.name: sup_validator_opt,
    sup_validator_val.name: sup_validator_val,
}

The evaluation metric metric is FunctionalMetric, which is a custom metric function class reserved by PaddleScience. This class supports defining the calculation method of metric when writing code, rather than using existing methods such as MSE, L2, etc. For custom metric function code, please refer to the next section Custom loss and metric.

Other configurations are similar to the settings of Constraint Construction.

3.8 Custom loss and metric

Since this problem adopts unsupervised learning and there is no label data in the data, loss and metric are calculated based on PDE, so custom loss and metric are required. The method is to first define relevant functions, and then pass the function name as a parameter to FunctionalLoss and FunctionalMetric.

Note that the input and output parameters of custom loss and metric functions need to be consistent with other functions such as MSE in PaddleScience, that is, input is model output output_dict and other dictionary variables, loss function output is loss value paddle.Tensor, metric function output is dictionary Dict[str, paddle.Tensor].

def pde_loss_fun(output_dict: Dict[str, paddle.Tensor], *args) -> paddle.Tensor:
    """Compute pde loss and lagrangian loss.

    Args:
        output_dict (Dict[str, paddle.Tensor]): Dict of outputs contains tensor.

    Returns:
        paddle.Tensor: PDE loss (and lagrangian loss if using Augmented Lagrangian method).
    """
    global loss_log
    bound = int(output_dict["bound"])
    loss_re, loss_im = compute_real_and_imaginary_loss(output_dict)
    loss_re = loss_re[bound:]
    loss_im = loss_im[bound:]

    loss_eqs1 = paddle.mean(loss_re**2)
    loss_eqs2 = paddle.mean(loss_im**2)
    # augmented_Lagrangian
    if lambda_im is None:
        init_lambda(output_dict, bound)
    loss_lag1 = paddle.mean(loss_re * lambda_re)
    loss_lag2 = paddle.mean(loss_im * lambda_im)

    losses = (
        loss_weight[0] * loss_eqs1
        + loss_weight[1] * loss_eqs2
        + loss_weight[2] * loss_lag1
        + loss_weight[3] * loss_lag2
    )
    loss_log.append(float(loss_eqs1 + loss_eqs2))  # for plotting
    loss_log.append(float(loss_lag1 + loss_lag2))  # for plotting
    return {"pde": losses}


def obj_loss_fun(output_dict: Dict[str, paddle.Tensor], *args) -> paddle.Tensor:
    """Compute objective loss.

    Args:
        output_dict (Dict[str, paddle.Tensor]): Dict of outputs contains tensor.

    Returns:
        paddle.Tensor: Objective loss.
    """
    global loss_log, loss_obj
    x, y = output_dict["x"], output_dict["y"]
    bound = int(output_dict["bound"])
    e_re = output_dict["e_real"]
    e_im = output_dict["e_imaginary"]

    f1 = paddle.heaviside((x + 0.5) * (0.5 - x), paddle.full([], 0.5))
    f2 = paddle.heaviside((y - 1) * (2 - y), paddle.full([], 0.5))
    j = e_re[:bound] ** 2 + e_im[:bound] ** 2 - f1[:bound] * f2[:bound]
    loss_opt_area = paddle.mean(j**2)

    if lambda_im is None:
        init_lambda(output_dict, bound)
    losses = loss_weight[4] * loss_opt_area
    loss_log.append(float(loss_opt_area))  # for plotting
    loss_obj = float(loss_opt_area)  # for plotting
    return {"obj": losses}


def eval_loss_fun(output_dict: Dict[str, paddle.Tensor], *args) -> paddle.Tensor:
    """Compute objective loss for evaluation.

    Args:
        output_dict (Dict[str, paddle.Tensor]): Dict of outputs contains tensor.

    Returns:
        paddle.Tensor: Objective loss.
    """
    x, y = output_dict["x"], output_dict["y"]
    e_re = output_dict["e_real"]
    e_im = output_dict["e_imaginary"]

    f1 = paddle.heaviside((x + 0.5) * (0.5 - x), paddle.full([], 0.5))
    f2 = paddle.heaviside((y - 1) * (2 - y), paddle.full([], 0.5))
    j = e_re**2 + e_im**2 - f1 * f2
    losses = paddle.mean(j**2)

    return {"eval": losses}
def eval_metric_fun(
    output_dict: Dict[str, paddle.Tensor], *args
) -> Dict[str, paddle.Tensor]:
    """Compute metric for evaluation.

    Args:
        output_dict (Dict[str, paddle.Tensor]): Dict of outputs contains tensor.

    Returns:
        Dict[str, paddle.Tensor]: MSE metric.
    """
    loss_re, loss_im = compute_real_and_imaginary_loss(output_dict)
    eps_opt = paddle.concat([loss_re, loss_im], axis=-1)
    metric = paddle.mean(eps_opt**2)

    metric_dict = {"eval_metric": metric}
    return metric_dict

3.9 Model Training and Evaluation

After completing the above settings, you only need to pass the instantiated objects to ppsci.solver.Solver, and then start training and evaluation.

# initialize solver
solver = ppsci.solver.Solver(
    model_list,
    constraint,
    cfg.output_dir,
    optimizer_adam,
    None,
    cfg.TRAIN.epochs,
    cfg.TRAIN.iters_per_epoch,
    eval_during_train=cfg.TRAIN.eval_during_train,
    validator=validator,
    checkpoint_path=cfg.TRAIN.checkpoint_path,
)

# train model
solver.train()
# evaluate after finished training
solver.eval()

Since there are multiple training modes in this problem, \([2,1+k]\) complete training and evaluations will be performed according to different modes. For specific code, please refer to the holography.py file in Complete Code.

3.10 Visualization

PaddleScience provides a visualizer, but due to the large number of pictures and complexity of this problem, a visualization function is customized in the code. Visualization can be achieved by calling the custom function.

################# plotting ###################
# log of loss
loss_log = np.array(func_module.loss_log).reshape(-1, 3)

plot_module.set_params(
    cfg.TRAIN_MODE, cfg.output_dir, cfg.DATASET_PATH, cfg.DATASET_PATH_VALID
)
plot_module.plot_6a(loss_log)
if cfg.TRAIN_MODE != "soft":
    plot_module.prepare_data(solver, expr_dict)
    plot_module.plot_6b(loss_log_obj)
    plot_module.plot_6c7c(func_module.lambda_log)
    plot_module.plot_6d(func_module.lambda_log)
    plot_module.plot_6ef(func_module.lambda_log)

For custom code, please refer to the plotting.py file in Complete Code.

4. Complete Code

The complete code includes PaddleScience specific implementation process code holography.py, all custom function code functions.py and custom visualization code plotting.py.

holography.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module is heavily adapted from https://github.com/lululxvi/hpinn
"""

from os import path as osp

import functions as func_module
import hydra
import numpy as np
import paddle
import plotting as plot_module
from omegaconf import DictConfig

import ppsci
from ppsci.autodiff import hessian
from ppsci.autodiff import jacobian
from ppsci.utils import logger


def train(cfg: DictConfig):
    # open FLAG for higher order differential operator
    paddle.framework.core.set_prim_eager_enabled(True)

    ppsci.utils.misc.set_random_seed(cfg.seed)
    # initialize logger
    logger.init_logger("ppsci", osp.join(cfg.output_dir, f"{cfg.mode}.log"), "info")

    model_re = ppsci.arch.MLP(**cfg.MODEL.re_net)
    model_im = ppsci.arch.MLP(**cfg.MODEL.im_net)
    model_eps = ppsci.arch.MLP(**cfg.MODEL.eps_net)

    # initialize params
    func_module.train_mode = cfg.TRAIN_MODE
    loss_log_obj = []

    # register transform
    model_re.register_input_transform(func_module.transform_in)
    model_im.register_input_transform(func_module.transform_in)
    model_eps.register_input_transform(func_module.transform_in)

    model_re.register_output_transform(func_module.transform_out_real_part)
    model_im.register_output_transform(func_module.transform_out_imaginary_part)
    model_eps.register_output_transform(func_module.transform_out_epsilon)

    model_list = ppsci.arch.ModelList((model_re, model_im, model_eps))

    # initialize Adam optimizer
    optimizer_adam = ppsci.optimizer.Adam(cfg.TRAIN.learning_rate)(
        (model_re, model_im, model_eps)
    )

    # manually build constraint(s)
    label_keys = ("x", "y", "bound", "e_real", "e_imaginary", "epsilon")
    label_keys_derivative = (
        "de_re_x",
        "de_re_y",
        "de_re_xx",
        "de_re_yy",
        "de_im_x",
        "de_im_y",
        "de_im_xx",
        "de_im_yy",
    )
    output_expr = {
        "x": lambda out: out["x"],
        "y": lambda out: out["y"],
        "bound": lambda out: out["bound"],
        "e_real": lambda out: out["e_real"],
        "e_imaginary": lambda out: out["e_imaginary"],
        "epsilon": lambda out: out["epsilon"],
        "de_re_x": lambda out: jacobian(out["e_real"], out["x"]),
        "de_re_y": lambda out: jacobian(out["e_real"], out["y"]),
        "de_re_xx": lambda out: hessian(out["e_real"], out["x"]),
        "de_re_yy": lambda out: hessian(out["e_real"], out["y"]),
        "de_im_x": lambda out: jacobian(out["e_imaginary"], out["x"]),
        "de_im_y": lambda out: jacobian(out["e_imaginary"], out["y"]),
        "de_im_xx": lambda out: hessian(out["e_imaginary"], out["x"]),
        "de_im_yy": lambda out: hessian(out["e_imaginary"], out["y"]),
    }

    sup_constraint_pde = ppsci.constraint.SupervisedConstraint(
        {
            "dataset": {
                "name": "IterableMatDataset",
                "file_path": cfg.DATASET_PATH,
                "input_keys": ("x", "y", "bound"),
                "label_keys": label_keys + label_keys_derivative,
                "alias_dict": {
                    "e_real": "x",
                    "e_imaginary": "x",
                    "epsilon": "x",
                    **{k: "x" for k in label_keys_derivative},
                },
            },
        },
        ppsci.loss.FunctionalLoss(func_module.pde_loss_fun),
        output_expr,
        name="sup_constraint_pde",
    )
    sup_constraint_obj = ppsci.constraint.SupervisedConstraint(
        {
            "dataset": {
                "name": "IterableMatDataset",
                "file_path": cfg.DATASET_PATH,
                "input_keys": ("x", "y", "bound"),
                "label_keys": label_keys,
                "alias_dict": {"e_real": "x", "e_imaginary": "x", "epsilon": "x"},
            },
        },
        ppsci.loss.FunctionalLoss(func_module.obj_loss_fun),
        {key: lambda out, k=key: out[k] for key in label_keys},
        name="sup_constraint_obj",
    )
    constraint = {
        sup_constraint_pde.name: sup_constraint_pde,
        sup_constraint_obj.name: sup_constraint_obj,
    }

    # manually build validator
    sup_validator_opt = ppsci.validate.SupervisedValidator(
        {
            "dataset": {
                "name": "IterableMatDataset",
                "file_path": cfg.DATASET_PATH_VALID,
                "input_keys": ("x", "y", "bound"),
                "label_keys": label_keys + label_keys_derivative,
                "alias_dict": {
                    "x": "x_opt",
                    "y": "y_opt",
                    "e_real": "x_opt",
                    "e_imaginary": "x_opt",
                    "epsilon": "x_opt",
                    **{k: "x_opt" for k in label_keys_derivative},
                },
            },
        },
        ppsci.loss.FunctionalLoss(func_module.eval_loss_fun),
        output_expr,
        {"mse": ppsci.metric.FunctionalMetric(func_module.eval_metric_fun)},
        name="opt_sup",
    )
    sup_validator_val = ppsci.validate.SupervisedValidator(
        {
            "dataset": {
                "name": "IterableMatDataset",
                "file_path": cfg.DATASET_PATH_VALID,
                "input_keys": ("x", "y", "bound"),
                "label_keys": label_keys + label_keys_derivative,
                "alias_dict": {
                    "x": "x_val",
                    "y": "y_val",
                    "e_real": "x_val",
                    "e_imaginary": "x_val",
                    "epsilon": "x_val",
                    **{k: "x_val" for k in label_keys_derivative},
                },
            },
        },
        ppsci.loss.FunctionalLoss(func_module.eval_loss_fun),
        output_expr,
        {"mse": ppsci.metric.FunctionalMetric(func_module.eval_metric_fun)},
        name="val_sup",
    )
    validator = {
        sup_validator_opt.name: sup_validator_opt,
        sup_validator_val.name: sup_validator_val,
    }

    # initialize solver
    solver = ppsci.solver.Solver(
        model_list,
        constraint,
        cfg.output_dir,
        optimizer_adam,
        None,
        cfg.TRAIN.epochs,
        cfg.TRAIN.iters_per_epoch,
        eval_during_train=cfg.TRAIN.eval_during_train,
        validator=validator,
        checkpoint_path=cfg.TRAIN.checkpoint_path,
    )

    # train model
    solver.train()
    # evaluate after finished training
    solver.eval()

    # initialize LBFGS optimizer
    optimizer_lbfgs = ppsci.optimizer.LBFGS(max_iter=cfg.TRAIN.max_iter)(
        (model_re, model_im, model_eps)
    )

    # train: soft constraint, epoch=1 for lbfgs
    if cfg.TRAIN_MODE == "soft":
        solver = ppsci.solver.Solver(
            model_list,
            constraint,
            cfg.output_dir,
            optimizer_lbfgs,
            None,
            cfg.TRAIN.epochs_lbfgs,
            cfg.TRAIN.iters_per_epoch,
            eval_during_train=cfg.TRAIN.eval_during_train,
            validator=validator,
            checkpoint_path=cfg.TRAIN.checkpoint_path,
        )

        # train model
        solver.train()
        # evaluate after finished training
        solver.eval()

    # append objective loss for plot
    loss_log_obj.append(func_module.loss_obj)

    # penalty and augmented Lagrangian, difference between the two is updating of lambda
    if cfg.TRAIN_MODE != "soft":
        train_dict = ppsci.utils.reader.load_mat_file(
            cfg.DATASET_PATH, ("x", "y", "bound")
        )
        in_dict = {"x": train_dict["x"], "y": train_dict["y"]}
        expr_dict = output_expr.copy()
        expr_dict.pop("bound")

        func_module.init_lambda(in_dict, int(train_dict["bound"]))
        func_module.lambda_log.append(
            [
                func_module.lambda_re.copy().squeeze(),
                func_module.lambda_im.copy().squeeze(),
            ]
        )

        for i in range(1, cfg.TRAIN_K + 1):
            pred_dict = solver.predict(
                in_dict,
                expr_dict,
                batch_size=np.shape(train_dict["x"])[0],
                no_grad=False,
            )
            func_module.update_lambda(pred_dict, int(train_dict["bound"]))

            func_module.update_mu()
            logger.message(f"Iteration {i}: mu = {func_module.mu}\n")

            solver = ppsci.solver.Solver(
                model_list,
                constraint,
                cfg.output_dir,
                optimizer_lbfgs,
                None,
                cfg.TRAIN.epochs_lbfgs,
                cfg.TRAIN.iters_per_epoch,
                eval_during_train=cfg.TRAIN.eval_during_train,
                validator=validator,
                checkpoint_path=cfg.TRAIN.checkpoint_path,
            )

            # train model
            solver.train()
            # evaluate
            solver.eval()
            # append objective loss for plot
            loss_log_obj.append(func_module.loss_obj)

    ################# plotting ###################
    # log of loss
    loss_log = np.array(func_module.loss_log).reshape(-1, 3)

    plot_module.set_params(
        cfg.TRAIN_MODE, cfg.output_dir, cfg.DATASET_PATH, cfg.DATASET_PATH_VALID
    )
    plot_module.plot_6a(loss_log)
    if cfg.TRAIN_MODE != "soft":
        plot_module.prepare_data(solver, expr_dict)
        plot_module.plot_6b(loss_log_obj)
        plot_module.plot_6c7c(func_module.lambda_log)
        plot_module.plot_6d(func_module.lambda_log)
        plot_module.plot_6ef(func_module.lambda_log)


def evaluate(cfg: DictConfig):
    # open FLAG for higher order differential operator
    paddle.framework.core.set_prim_eager_enabled(True)

    ppsci.utils.misc.set_random_seed(cfg.seed)
    # initialize logger
    logger.init_logger("ppsci", osp.join(cfg.output_dir, f"{cfg.mode}.log"), "info")

    model_re = ppsci.arch.MLP(**cfg.MODEL.re_net)
    model_im = ppsci.arch.MLP(**cfg.MODEL.im_net)
    model_eps = ppsci.arch.MLP(**cfg.MODEL.eps_net)

    # initialize params
    func_module.train_mode = cfg.TRAIN_MODE

    # register transform
    model_re.register_input_transform(func_module.transform_in)
    model_im.register_input_transform(func_module.transform_in)
    model_eps.register_input_transform(func_module.transform_in)

    model_re.register_output_transform(func_module.transform_out_real_part)
    model_im.register_output_transform(func_module.transform_out_imaginary_part)
    model_eps.register_output_transform(func_module.transform_out_epsilon)

    model_list = ppsci.arch.ModelList((model_re, model_im, model_eps))

    # manually build constraint(s)
    label_keys = ("x", "y", "bound", "e_real", "e_imaginary", "epsilon")
    label_keys_derivative = (
        "de_re_x",
        "de_re_y",
        "de_re_xx",
        "de_re_yy",
        "de_im_x",
        "de_im_y",
        "de_im_xx",
        "de_im_yy",
    )
    output_expr = {
        "x": lambda out: out["x"],
        "y": lambda out: out["y"],
        "bound": lambda out: out["bound"],
        "e_real": lambda out: out["e_real"],
        "e_imaginary": lambda out: out["e_imaginary"],
        "epsilon": lambda out: out["epsilon"],
        "de_re_x": lambda out: jacobian(out["e_real"], out["x"]),
        "de_re_y": lambda out: jacobian(out["e_real"], out["y"]),
        "de_re_xx": lambda out: hessian(out["e_real"], out["x"]),
        "de_re_yy": lambda out: hessian(out["e_real"], out["y"]),
        "de_im_x": lambda out: jacobian(out["e_imaginary"], out["x"]),
        "de_im_y": lambda out: jacobian(out["e_imaginary"], out["y"]),
        "de_im_xx": lambda out: hessian(out["e_imaginary"], out["x"]),
        "de_im_yy": lambda out: hessian(out["e_imaginary"], out["y"]),
    }

    # manually build validator
    sup_validator_opt = ppsci.validate.SupervisedValidator(
        {
            "dataset": {
                "name": "IterableMatDataset",
                "file_path": cfg.DATASET_PATH_VALID,
                "input_keys": ("x", "y", "bound"),
                "label_keys": label_keys + label_keys_derivative,
                "alias_dict": {
                    "x": "x_opt",
                    "y": "y_opt",
                    "e_real": "x_opt",
                    "e_imaginary": "x_opt",
                    "epsilon": "x_opt",
                    **{k: "x_opt" for k in label_keys_derivative},
                },
            },
        },
        ppsci.loss.FunctionalLoss(func_module.eval_loss_fun),
        output_expr,
        {"mse": ppsci.metric.FunctionalMetric(func_module.eval_metric_fun)},
        name="opt_sup",
    )
    sup_validator_val = ppsci.validate.SupervisedValidator(
        {
            "dataset": {
                "name": "IterableMatDataset",
                "file_path": cfg.DATASET_PATH_VALID,
                "input_keys": ("x", "y", "bound"),
                "label_keys": label_keys + label_keys_derivative,
                "alias_dict": {
                    "x": "x_val",
                    "y": "y_val",
                    "e_real": "x_val",
                    "e_imaginary": "x_val",
                    "epsilon": "x_val",
                    **{k: "x_val" for k in label_keys_derivative},
                },
            },
        },
        ppsci.loss.FunctionalLoss(func_module.eval_loss_fun),
        output_expr,
        {"mse": ppsci.metric.FunctionalMetric(func_module.eval_metric_fun)},
        name="val_sup",
    )
    validator = {
        sup_validator_opt.name: sup_validator_opt,
        sup_validator_val.name: sup_validator_val,
    }

    solver = ppsci.solver.Solver(
        model_list,
        output_dir=cfg.output_dir,
        seed=cfg.seed,
        validator=validator,
        pretrained_model_path=cfg.EVAL.pretrained_model_path,
    )

    # evaluate
    solver.eval()


def export(cfg: DictConfig):
    # set model
    model_re = ppsci.arch.MLP(**cfg.MODEL.re_net)
    model_im = ppsci.arch.MLP(**cfg.MODEL.im_net)
    model_eps = ppsci.arch.MLP(**cfg.MODEL.eps_net)

    # register transform
    model_re.register_input_transform(func_module.transform_in)
    model_im.register_input_transform(func_module.transform_in)
    model_eps.register_input_transform(func_module.transform_in)

    model_re.register_output_transform(func_module.transform_out_real_part)
    model_im.register_output_transform(func_module.transform_out_imaginary_part)
    model_eps.register_output_transform(func_module.transform_out_epsilon)

    # wrap to a model_list
    model_list = ppsci.arch.ModelList((model_re, model_im, model_eps))

    # initialize solver
    solver = ppsci.solver.Solver(
        model_list,
        pretrained_model_path=cfg.INFER.pretrained_model_path,
    )

    # export model
    from paddle.static import InputSpec

    input_spec = [
        {key: InputSpec([None, 1], "float32", name=key) for key in ["x", "y"]},
    ]
    solver.export(input_spec, cfg.INFER.export_path)


def inference(cfg: DictConfig):
    from deploy.python_infer import pinn_predictor

    predictor = pinn_predictor.PINNPredictor(cfg)

    valid_dict = ppsci.utils.reader.load_mat_file(
        cfg.DATASET_PATH_VALID, ("x_val", "y_val", "bound")
    )
    input_dict = {"x": valid_dict["x_val"], "y": valid_dict["y_val"]}

    output_dict = predictor.predict(input_dict, cfg.INFER.batch_size)

    # mapping data to cfg.INFER.output_keys
    output_dict = {
        store_key: output_dict[infer_key]
        for store_key, infer_key in zip(cfg.INFER.output_keys, output_dict.keys())
    }

    # plotting E and eps
    N = ((func_module.l_BOX[1] - func_module.l_BOX[0]) / 0.05).astype(int)
    input_eval = np.stack((input_dict["x"], input_dict["y"]), axis=-1).reshape(
        N[0], N[1], 2
    )
    e_re = output_dict["e_re"].reshape(N[0], N[1])
    e_im = output_dict["e_im"].reshape(N[0], N[1])
    eps = output_dict["eps"].reshape(N[0], N[1])
    v_visual = e_re**2 + e_im**2
    field_visual = np.stack((v_visual, eps), axis=-1)
    plot_module.field_name = ["Fig7_E", "Fig7_eps"]
    plot_module.FIGNAME = "hpinns_pred"
    plot_module.plot_field_holo(input_eval, field_visual)


@hydra.main(version_base=None, config_path="./conf", config_name="hpinns.yaml")
def main(cfg: DictConfig):
    if cfg.mode == "train":
        train(cfg)
    elif cfg.mode == "eval":
        evaluate(cfg)
    elif cfg.mode == "export":
        export(cfg)
    elif cfg.mode == "infer":
        inference(cfg)
    else:
        raise ValueError(
            f"cfg.mode should in ['train', 'eval', 'export', 'infer'], but got '{cfg.mode}'"
        )


if __name__ == "__main__":
    main()
functions.py
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module is heavily adapted from https://github.com/lululxvi/hpinn
"""

from typing import Dict
from typing import List

import numpy as np
import paddle
import paddle.nn.functional as F

"""All functions used in hpinns example, including functions of transform and loss."""

# define constants
BOX = np.array([[-2, -2], [2, 3]])
DPML = 1
OMEGA = 2 * np.pi
SIGMA0 = -np.log(1e-20) / (4 * DPML**3 / 3)
l_BOX = BOX + np.array([[-DPML, -DPML], [DPML, DPML]])
beta = 2.0
mu = 2

# define variables which will be updated during training
lambda_re: np.ndarray = None
lambda_im: np.ndarray = None
loss_weight: List[float] = None
train_mode: str = None

# define log variables for plotting
loss_log = []  # record all losses, [pde, lag, obj]
loss_obj = 0.0  # record last objective loss of each k
lambda_log = []  # record all lambdas


# transform
def transform_in(input):
    # Periodic BC in x
    P = BOX[1][0] - BOX[0][0] + 2 * DPML
    w = 2 * np.pi / P
    x, y = input["x"], input["y"]
    input_transformed = {}
    for t in range(1, 7):
        input_transformed[f"x_cos_{t}"] = paddle.cos(t * w * x)
        input_transformed[f"x_sin_{t}"] = paddle.sin(t * w * x)
    input_transformed["y"] = y
    input_transformed["y_cos_1"] = paddle.cos(OMEGA * y)
    input_transformed["y_sin_1"] = paddle.sin(OMEGA * y)

    return input_transformed


def transform_out_all(input, var):
    y = input["y"]
    # Zero Dirichlet BC
    a, b = BOX[0][1] - DPML, BOX[1][1] + DPML
    t = (1 - paddle.exp(a - y)) * (1 - paddle.exp(y - b))
    return t * var


def transform_out_real_part(input, out):
    re = out["e_re"]
    trans_out = transform_out_all(input, re)
    return {"e_real": trans_out}


def transform_out_imaginary_part(input, out):
    im = out["e_im"]
    trans_out = transform_out_all(input, im)
    return {"e_imaginary": trans_out}


def transform_out_epsilon(input, out):
    eps = out["eps"]
    # 1 <= eps <= 12
    eps = F.sigmoid(eps) * 11 + 1
    return {"epsilon": eps}


# loss
def init_lambda(output_dict: Dict[str, paddle.Tensor], bound: int):
    """Init lambdas of Lagrangian and weights of losses.

    Args:
        output_dict (Dict[str, paddle.Tensor]): Dict of outputs contains tensor.
        bound (int): The bound of the data range that should be used.
    """
    global lambda_re, lambda_im, loss_weight
    x, y = output_dict["x"], output_dict["y"]
    lambda_re = np.zeros((len(x[bound:]), 1), paddle.get_default_dtype())
    lambda_im = np.zeros((len(y[bound:]), 1), paddle.get_default_dtype())
    # loss_weight: [PDE loss 1, PDE loss 2, Lagrangian loss 1, Lagrangian loss 2, objective loss]
    if train_mode == "aug_lag":
        loss_weight = [0.5 * mu] * 2 + [1.0, 1.0] + [1.0]
    else:
        loss_weight = [0.5 * mu] * 2 + [0.0, 0.0] + [1.0]


def update_lambda(output_dict: Dict[str, paddle.Tensor], bound: int):
    """Update lambdas of Lagrangian.

    Args:
        output_dict (Dict[str, paddle.Tensor]): Dict of outputs contains tensor.
        bound (int): The bound of the data range that should be used.
    """
    global lambda_re, lambda_im, lambda_log
    loss_re, loss_im = compute_real_and_imaginary_loss(output_dict)
    loss_re = loss_re[bound:]
    loss_im = loss_im[bound:]
    lambda_re += mu * loss_re.numpy()
    lambda_im += mu * loss_im.numpy()
    lambda_log.append([lambda_re.copy().squeeze(), lambda_im.copy().squeeze()])


def update_mu():
    """Update mu."""
    global mu, loss_weight
    mu *= beta
    loss_weight[:2] = [0.5 * mu] * 2


def _sigma_1(d):
    return SIGMA0 * d**2 * np.heaviside(d, 0)


def _sigma_2(d):
    return 2 * SIGMA0 * d * np.heaviside(d, 0)


def sigma(x, a, b):
    """sigma(x) = 0 if a < x < b, else grows cubically from zero."""
    return _sigma_1(a - x) + _sigma_1(x - b)


def dsigma(x, a, b):
    return -_sigma_2(a - x) + _sigma_2(x - b)


def perfectly_matched_layers(x: paddle.Tensor, y: paddle.Tensor):
    """Apply the technique of perfectly matched layers(PMLs) proposed by paper arXiv:2108.05348.

    Args:
        x (paddle.Tensor): one of input contains tensor.
        y (paddle.Tensor): one of input contains tensor.

    Returns:
        np.ndarray: Parameters of pde formula.
    """
    x = x.numpy()
    y = y.numpy()

    sigma_x = sigma(x, BOX[0][0], BOX[1][0])
    AB1 = 1 / (1 + 1j / OMEGA * sigma_x) ** 2
    A1, B1 = AB1.real, AB1.imag

    dsigma_x = dsigma(x, BOX[0][0], BOX[1][0])
    AB2 = -1j / OMEGA * dsigma_x * AB1 / (1 + 1j / OMEGA * sigma_x)
    A2, B2 = AB2.real, AB2.imag

    sigma_y = sigma(y, BOX[0][1], BOX[1][1])
    AB3 = 1 / (1 + 1j / OMEGA * sigma_y) ** 2
    A3, B3 = AB3.real, AB3.imag

    dsigma_y = dsigma(y, BOX[0][1], BOX[1][1])
    AB4 = -1j / OMEGA * dsigma_y * AB3 / (1 + 1j / OMEGA * sigma_y)
    A4, B4 = AB4.real, AB4.imag
    return A1, B1, A2, B2, A3, B3, A4, B4


def obj_func_J(y):
    # Approximate the objective function
    y = y.numpy() + 1.5
    h = 0.2
    return 1 / (h * np.pi**0.5) * np.exp(-((y / h) ** 2)) * (np.abs(y) < 0.5)


def compute_real_and_imaginary_loss(
    output_dict: Dict[str, paddle.Tensor]
) -> paddle.Tensor:
    """Compute real and imaginary_loss.

    Args:
        output_dict (Dict[str, paddle.Tensor]): Dict of outputs contains tensor.

    Returns:
        paddle.Tensor: Real and imaginary_loss.
    """
    x, y = output_dict["x"], output_dict["y"]
    e_re = output_dict["e_real"]
    e_im = output_dict["e_imaginary"]
    eps = output_dict["epsilon"]

    condition = np.logical_and(y.numpy() < 0, y.numpy() > -1).astype(
        paddle.get_default_dtype()
    )

    eps = eps * condition + 1 - condition

    de_re_x = output_dict["de_re_x"]
    de_re_y = output_dict["de_re_y"]
    de_re_xx = output_dict["de_re_xx"]
    de_re_yy = output_dict["de_re_yy"]
    de_im_x = output_dict["de_im_x"]
    de_im_y = output_dict["de_im_y"]
    de_im_xx = output_dict["de_im_xx"]
    de_im_yy = output_dict["de_im_yy"]

    a1, b1, a2, b2, a3, b3, a4, b4 = perfectly_matched_layers(x, y)

    loss_re = (
        (a1 * de_re_xx + a2 * de_re_x + a3 * de_re_yy + a4 * de_re_y) / OMEGA
        - (b1 * de_im_xx + b2 * de_im_x + b3 * de_im_yy + b4 * de_im_y) / OMEGA
        + eps * OMEGA * e_re
    )
    loss_im = (
        (a1 * de_im_xx + a2 * de_im_x + a3 * de_im_yy + a4 * de_im_y) / OMEGA
        + (b1 * de_re_xx + b2 * de_re_x + b3 * de_re_yy + b4 * de_re_y) / OMEGA
        + eps * OMEGA * e_im
        + obj_func_J(y)
    )
    return loss_re, loss_im


def pde_loss_fun(output_dict: Dict[str, paddle.Tensor], *args) -> paddle.Tensor:
    """Compute pde loss and lagrangian loss.

    Args:
        output_dict (Dict[str, paddle.Tensor]): Dict of outputs contains tensor.

    Returns:
        paddle.Tensor: PDE loss (and lagrangian loss if using Augmented Lagrangian method).
    """
    global loss_log
    bound = int(output_dict["bound"])
    loss_re, loss_im = compute_real_and_imaginary_loss(output_dict)
    loss_re = loss_re[bound:]
    loss_im = loss_im[bound:]

    loss_eqs1 = paddle.mean(loss_re**2)
    loss_eqs2 = paddle.mean(loss_im**2)
    # augmented_Lagrangian
    if lambda_im is None:
        init_lambda(output_dict, bound)
    loss_lag1 = paddle.mean(loss_re * lambda_re)
    loss_lag2 = paddle.mean(loss_im * lambda_im)

    losses = (
        loss_weight[0] * loss_eqs1
        + loss_weight[1] * loss_eqs2
        + loss_weight[2] * loss_lag1
        + loss_weight[3] * loss_lag2
    )
    loss_log.append(float(loss_eqs1 + loss_eqs2))  # for plotting
    loss_log.append(float(loss_lag1 + loss_lag2))  # for plotting
    return {"pde": losses}


def obj_loss_fun(output_dict: Dict[str, paddle.Tensor], *args) -> paddle.Tensor:
    """Compute objective loss.

    Args:
        output_dict (Dict[str, paddle.Tensor]): Dict of outputs contains tensor.

    Returns:
        paddle.Tensor: Objective loss.
    """
    global loss_log, loss_obj
    x, y = output_dict["x"], output_dict["y"]
    bound = int(output_dict["bound"])
    e_re = output_dict["e_real"]
    e_im = output_dict["e_imaginary"]

    f1 = paddle.heaviside((x + 0.5) * (0.5 - x), paddle.full([], 0.5))
    f2 = paddle.heaviside((y - 1) * (2 - y), paddle.full([], 0.5))
    j = e_re[:bound] ** 2 + e_im[:bound] ** 2 - f1[:bound] * f2[:bound]
    loss_opt_area = paddle.mean(j**2)

    if lambda_im is None:
        init_lambda(output_dict, bound)
    losses = loss_weight[4] * loss_opt_area
    loss_log.append(float(loss_opt_area))  # for plotting
    loss_obj = float(loss_opt_area)  # for plotting
    return {"obj": losses}


def eval_loss_fun(output_dict: Dict[str, paddle.Tensor], *args) -> paddle.Tensor:
    """Compute objective loss for evaluation.

    Args:
        output_dict (Dict[str, paddle.Tensor]): Dict of outputs contains tensor.

    Returns:
        paddle.Tensor: Objective loss.
    """
    x, y = output_dict["x"], output_dict["y"]
    e_re = output_dict["e_real"]
    e_im = output_dict["e_imaginary"]

    f1 = paddle.heaviside((x + 0.5) * (0.5 - x), paddle.full([], 0.5))
    f2 = paddle.heaviside((y - 1) * (2 - y), paddle.full([], 0.5))
    j = e_re**2 + e_im**2 - f1 * f2
    losses = paddle.mean(j**2)

    return {"eval": losses}


def eval_metric_fun(
    output_dict: Dict[str, paddle.Tensor], *args
) -> Dict[str, paddle.Tensor]:
    """Compute metric for evaluation.

    Args:
        output_dict (Dict[str, paddle.Tensor]): Dict of outputs contains tensor.

    Returns:
        Dict[str, paddle.Tensor]: MSE metric.
    """
    loss_re, loss_im = compute_real_and_imaginary_loss(output_dict)
    eps_opt = paddle.concat([loss_re, loss_im], axis=-1)
    metric = paddle.mean(eps_opt**2)

    metric_dict = {"eval_metric": metric}
    return metric_dict
plotting.py
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module is heavily adapted from https://github.com/lululxvi/hpinn
"""

import os
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional

import functions as func_module
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from matplotlib import ticker

import ppsci

"""All plotting functions."""

# define constants
font = {"weight": "normal", "size": 10}
input_name = ("x", "y")
field_name = [
    "Fig7_E",
    "Fig7_eps",
    "Fig_6C_lambda_re_1",
    "Fig_6C_lambda_im_1",
    "Fig_6C_lambda_re_4",
    "Fig_6C_lambda_im_4",
    "Fig_6C_lambda_re_9",
    "Fig_6C_lambda_im_9",
]

# define constants which will be assigned later
FIGNAME: str = ""
OUTPUT_DIR: str = ""
DATASET_PATH: str = ""
DATASET_PATH_VALID: str = ""
input_valid: np.ndarray = None
output_valid: np.ndarray = None
input_train: np.ndarray = None


def set_params(figname, output_dir, dataset_path, dataset_path_valid):
    global FIGNAME, OUTPUT_DIR, DATASET_PATH, DATASET_PATH_VALID
    FIGNAME = figname
    OUTPUT_DIR = output_dir + "figure/"
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    DATASET_PATH = dataset_path
    DATASET_PATH_VALID = dataset_path_valid


def prepare_data(solver: ppsci.solver.Solver, expr_dict: Dict[str, Callable]):
    """Prepare data of input of training and validation and generate
        output of validation by predicting.

    Args:
        solver (ppsci.solver.Solver): Object of ppsci.solver.Solver().
        expr_dict (Dict[str, Callable]): Expression dict, which guide to
            compute equation variable with callable function.
    """
    global input_valid, output_valid, input_train
    # train data
    train_dict = ppsci.utils.reader.load_mat_file(DATASET_PATH, ("x", "y", "bound"))

    bound = int(train_dict["bound"])
    x_train = train_dict["x"][bound:]
    y_train = train_dict["y"][bound:]
    input_train = np.stack((x_train, y_train), axis=-1).reshape(-1, 2)

    # valid data
    N = ((func_module.l_BOX[1] - func_module.l_BOX[0]) / 0.05).astype(int)

    valid_dict = ppsci.utils.reader.load_mat_file(
        DATASET_PATH_VALID, ("x_val", "y_val", "bound")
    )
    in_dict_val = {"x": valid_dict["x_val"], "y": valid_dict["y_val"]}
    func_module.init_lambda(in_dict_val, int(valid_dict["bound"]))

    pred_dict_val = solver.predict(
        in_dict_val,
        expr_dict,
        batch_size=np.shape(valid_dict["x_val"])[0],
        no_grad=False,
        return_numpy=True,
    )

    input_valid = np.stack((valid_dict["x_val"], valid_dict["y_val"]), axis=-1).reshape(
        N[0], N[1], 2
    )
    output_valid = np.array(
        [
            pred_dict_val["e_real"],
            pred_dict_val["e_imaginary"],
            pred_dict_val["epsilon"],
        ]
    ).T.reshape(N[0], N[1], 3)


def plot_field_holo(
    coord_visual: np.ndarray,
    field_visual: np.ndarray,
    coord_lambda: Optional[np.ndarray] = None,
    field_lambda: Optional[np.ndarray] = None,
):
    """Plot fields of of holography example.

    Args:
        coord_visual (np.ndarray): The coord of epsilon and |E|**2.
        field_visual (np.ndarray): The filed of epsilon and |E|**2.
        coord_lambda (Optional[np.ndarray], optional): The coord of lambda. Defaults to None.
        field_lambda (Optional[np.ndarray], optional): The filed of lambda. Defaults to None.
    """
    fmin, fmax = np.array([0, 1.0]), np.array([0.6, 12])
    cmin, cmax = coord_visual.min(axis=(0, 1)), coord_visual.max(axis=(0, 1))
    emin, emax = np.array([-3, -1]), np.array([3, 0])
    x_pos = coord_visual[:, :, 0]
    y_pos = coord_visual[:, :, 1]

    for fi in range(len(field_name)):
        if fi == 0:
            # Fig7_E
            plt.figure(101, figsize=(8, 6))
            plt.clf()
            plt.rcParams["font.size"] = 20
            f_true = field_visual[..., fi]
            plt.pcolormesh(
                x_pos,
                y_pos,
                f_true,
                cmap="rainbow",
                shading="gouraud",
                antialiased=True,
                snap=True,
            )
            cb = plt.colorbar()
            plt.axis((cmin[0], cmax[0], cmin[1], cmax[1]))
            plt.clim(vmin=fmin[fi], vmax=fmax[fi])
        elif fi == 1:
            # Fig7_eps
            plt.figure(201, figsize=(8, 1.5))
            plt.clf()
            plt.rcParams["font.size"] = 20
            f_true = field_visual[..., fi]
            plt.pcolormesh(
                x_pos,
                y_pos,
                f_true,
                cmap="rainbow",
                shading="gouraud",
                antialiased=True,
                snap=True,
            )
            cb = plt.colorbar()
            plt.axis((emin[0], emax[0], emin[1], emax[1]))
            plt.clim(vmin=fmin[fi], vmax=fmax[fi])
        elif coord_lambda is not None and field_lambda is not None:
            # Fig_6C_lambda_
            plt.figure(fi * 100 + 101, figsize=(8, 6))
            plt.clf()
            plt.rcParams["font.size"] = 20
            f_true = field_lambda[..., fi - 2]
            plt.scatter(
                coord_lambda[..., 0],
                coord_lambda[..., 1],
                c=f_true,
                cmap="rainbow",
                alpha=0.6,
            )
            cb = plt.colorbar()
            plt.axis((cmin[0], cmax[0], cmin[1], cmax[1]))

        # colorbar settings
        cb.ax.tick_params(labelsize=20)
        tick_locator = ticker.MaxNLocator(
            nbins=5
        )  # the number of scale values ​​on the colorbar
        cb.locator = tick_locator
        cb.update_ticks()

        plt.xlabel(f"${str(input_name[0])}$", fontdict=font)
        plt.ylabel(f"${str(input_name[1])}$", fontdict=font)
        plt.yticks(size=10)
        plt.xticks(size=10)
        plt.savefig(
            os.path.join(
                OUTPUT_DIR,
                f"{FIGNAME}_{str(field_name[fi])}.jpg",
            )
        )


def plot_6a(log_loss: np.ndarray):
    """Plot Fig.6 A of paper.

    Args:
        log_loss (np.ndarray): Losses of all training's iterations.
    """
    plt.figure(300, figsize=(8, 6))
    smooth_step = 100  # how many steps of loss are squeezed to one point, num_points is epoch/smooth_step
    if log_loss.shape[0] % smooth_step != 0:
        vis_loss_ = log_loss[: -(log_loss.shape[0] % smooth_step), :].reshape(
            -1, smooth_step, log_loss.shape[1]
        )
    else:
        vis_loss_ = log_loss.reshape(-1, smooth_step, log_loss.shape[1])

    vis_loss = vis_loss_.mean(axis=1).reshape(-1, 3)
    vis_loss_total = vis_loss[:, :].sum(axis=1)
    vis_loss[:, 1] = vis_loss[:, 2]
    vis_loss[:, 2] = vis_loss_total
    for i in range(vis_loss.shape[1]):
        plt.semilogy(np.arange(vis_loss.shape[0]) * smooth_step, vis_loss[:, i])
    plt.legend(
        ["PDE loss", "Objective loss", "Total loss"],
        loc="lower left",
        prop=font,
    )
    plt.xlabel("Iteration ", fontdict=font)
    plt.ylabel("Loss ", fontdict=font)
    plt.grid()
    plt.yticks(size=10)
    plt.xticks(size=10)
    plt.savefig(os.path.join(OUTPUT_DIR, f"{FIGNAME}_Fig6_A.jpg"))


def plot_6b(log_loss_obj: List[float]):
    """Plot Fig.6 B of paper.

    Args:
        log_loss_obj (List[float]): Objective losses of last iteration of each k.
    """
    plt.figure(400, figsize=(10, 6))
    plt.clf()
    plt.plot(np.arange(len(log_loss_obj)), log_loss_obj, "bo-")
    plt.xlabel("k", fontdict=font)
    plt.ylabel("Objective", fontdict=font)
    plt.grid()
    plt.yticks(size=10)
    plt.xticks(size=10)
    plt.savefig(os.path.join(OUTPUT_DIR, f"{FIGNAME}_Fig6_B.jpg"))


def plot_6c7c(log_lambda: List[np.ndarray]):
    """Plot Fig.6 Cs and Fig.7.Cs of paper.

    Args:
        log_lambda (List[np.ndarray]): Lambdas of each k.
    """
    # plot Fig.6 Cs and Fig.7.Cs of paper
    global input_valid, output_valid, input_train

    field_lambda = np.concatenate(
        [log_lambda[1], log_lambda[4], log_lambda[9]], axis=0
    ).T
    v_visual = output_valid[..., 0] ** 2 + output_valid[..., 1] ** 2
    field_visual = np.stack((v_visual, output_valid[..., -1]), axis=-1)
    plot_field_holo(input_valid, field_visual, input_train, field_lambda)


def plot_6d(log_lambda: List[np.ndarray]):
    """Plot Fig.6 D of paper.

    Args:
        log_lambda (List[np.ndarray]): Lambdas of each k.
    """
    # lambda/mu
    mu_ = 2 ** np.arange(1, 11)
    log_lambda = np.array(log_lambda) / mu_[:, None, None]
    # randomly pick 3 lambda points to represent all points of each k
    ind = np.random.randint(low=0, high=np.shape(log_lambda)[-1], size=3)
    la_mu_ind = log_lambda[:, :, ind]
    marker = ["ro-", "bo:", "r*-", "b*:", "rp-", "bp:"]
    plt.figure(500, figsize=(7, 5))
    plt.clf()
    for i in range(6):
        plt.plot(
            np.arange(0, 10),
            la_mu_ind[:, int(i % 2), int(i / 2)],
            marker[i],
            linewidth=2,
        )
    plt.legend(
        ["Re, 1", "Im, 1", "Re, 2", "Im, 2", "Re, 3", "Im, 3"],
        loc="upper right",
        prop=font,
    )
    plt.grid()
    plt.xlabel("k", fontdict=font)
    plt.ylabel(r"$ \lambda^k / \mu^k_F$", fontdict=font)
    plt.yticks(size=12)
    plt.xticks(size=12)
    plt.savefig(os.path.join(OUTPUT_DIR, f"{FIGNAME}_Fig6_D_lambda.jpg"))


def plot_6ef(log_lambda: List[np.ndarray]):
    """Plot Fig.6 E and Fig.6.F of paper.

    Args:
        log_lambda (List[np.ndarray]): Lambdas of each k.
    """
    # lambda/mu
    mu_ = 2 ** np.arange(1, 11)
    log_lambda = np.array(log_lambda) / mu_[:, None, None]
    # pick k=1,4,6,9
    iter_ind = [1, 4, 6, 9]
    plt.figure(600, figsize=(5, 5))
    plt.clf()
    for i in iter_ind:
        sns.kdeplot(log_lambda[i, 0, :], label="k = " + str(i), cut=0, linewidth=2)
    plt.legend(prop=font)
    plt.grid()
    plt.xlim([-0.1, 0.1])
    plt.xlabel(r"$ \lambda^k_{Re} / \mu^k_F$", fontdict=font)
    plt.ylabel("Frequency", fontdict=font)
    plt.yticks(size=12)
    plt.xticks(size=12)
    plt.savefig(os.path.join(OUTPUT_DIR, f"{FIGNAME}_Fig6_E.jpg"))

    plt.figure(700, figsize=(5, 5))
    plt.clf()
    for i in iter_ind:
        sns.kdeplot(log_lambda[i, 1, :], label="k = " + str(i), cut=0, linewidth=2)
    plt.legend(prop=font)
    plt.grid()
    plt.xlim([-0.1, 0.1])
    plt.xlabel(r"$ \lambda^k_{Im} / \mu^k_F$", fontdict=font)
    plt.ylabel("Frequency", fontdict=font)
    plt.yticks(size=12)
    plt.xticks(size=12)
    plt.savefig(os.path.join(OUTPUT_DIR, f"{FIGNAME}_Fig6_F.jpg"))

5. Result Display

Refer to Problem Definition, the following figure shows the changes in loss during training, changes in parameter lambda and parameter mu with training round k in the augmented Lagrangian method, and the final predicted values of electric field E and permittivity epsilon.

The figure below shows the prediction of electromagnetic wave propagation within a defined square domain. The prediction results are basically consistent with the results of the finite difference frequency domain (FDFD) method.

Loss value changes during training:

holograpy_result_6A

Loss value change with iteration during training

Objective loss value changes with training round k:

holograpy_result_6B

k value corresponds to objective loss value

Values of the real and imaginary parts of parameter lambda when k=1, 4, 9:

holograpy_result_6C

lambda value when k=1, 4, 9

Ratio of parameter lambda to parameter mu changes with training round k:

holograpy_result_6D

k value corresponds to lambda/mu value

Frequency of occurrence of the ratio of the real parts of parameter lambda and parameter mu with training rounds k=1, 4, 6, 9. The "sharper" the curve, the more uniform the values tend to be, and the better the convergence:

holograpy_result_6E

Frequency of real part lambda/mu value when k=1, 4, 6, 9

Frequency of occurrence of the ratio of the imaginary parts of parameter lambda and parameter mu with training rounds k=1, 4, 6, 9. The "sharper" the curve, the more uniform the values tend to be, and the better the convergence:

holograpy_result_6F

Frequency of imaginary part lambda/mu value when k=1, 4, 6, 9

Electric field E value:

holograpy_result_7C

E value

Permittivity epsilon value:

holograpy_result_7eps

epsilon value

6. References