Skip to content

FengWu

None

None

None

# Download sample input data
wget -c https://paddle-org.bj.bcebos.com/paddlescience/models/Fengwu/input1.npy -P ./data
wget -c https://paddle-org.bj.bcebos.com/paddlescience/models/Fengwu/input2.npy -P ./data

# Download pretrain model weight
wget -c https://paddle-org.bj.bcebos.com/paddlescience/models/Fengwu/fengwu_v2.onnx -P ./inference

# inference
python predict.py

1. Background Introduction

With the intensification of global climate change and the frequent occurrence of extreme weather in recent years, the expectations of all sectors for the timeliness and accuracy of weather forecasts are increasing day by day. How to improve the timeliness and accuracy of weather forecasts has always been a key topic in the industry. The AI large model "FengWu" is built based on multi-modal and multi-task deep learning methods, achieving effective forecasting of core atmospheric variables for more than 10 days at high resolution, and surpassing GraphCast, a model released by DeepMind, on 80% of the evaluation indicators. At the same time, "FengWu" can generate high-precision global forecast results for the next 10 days in just 30 seconds, which is significantly better than traditional models in efficiency.

2. Model Principle

This chapter only briefly introduces the principle of the FengWu meteorological large model. For detailed theoretical derivation, please read FengWu: Pushing the Skillful Global Medium-range Weather Forecast beyond 10 Days Lead.

The overall structure of the model is shown in the figure:

result

Model Structure

The model takes climate variables as multi-modal inputs. The features of multiple modalities are encoded in the Modal-Customized Encoder, and the encoded features are fused using the Transformer-based Cross-modal Fuser to obtain a joint representation. Finally, climate variables are predicted separately from the joint representation in the Modal-Customized Decoder.

The model uses pre-trained weights for inference. Next, the inference process of the model will be introduced.

3. Model Construction

In this case, FengWuPredictor is implemented for inference of the ONNX model:

examples/fengwu/predict.py
    # load mean and std data
    self.data_mean = np.load(cfg.INFER.mean_path)[:, np.newaxis, np.newaxis]
    self.data_std = np.load(cfg.INFER.std_path)[:, np.newaxis, np.newaxis]

def _preprocess_data(
    self, input_data_prev: np.ndarray, input_data_next: np.ndarray
) -> np.ndarray:
    input_data_prev_after_norm = (
        input_data_prev.astype("float32") - self.data_mean
    ) / self.data_std
    input_data_next_after_norm = (
        input_data_next.astype("float32") - self.data_mean
    ) / self.data_std
    input_data = np.concatenate(
        (input_data_prev_after_norm, input_data_next_after_norm), axis=0
    )[np.newaxis, :, :, :]
    input_data = input_data.astype(np.float32)

    return input_data

def predict(
    self,
    input_data_prev: np.ndarray,
    input_data_next: np.ndarray,
    batch_size: int = 1,
) -> List[np.ndarray]:
    """Predicts the output of the yinglong model for the given input.

    Args:
        input_data_prev(np.ndarray): Atomospheric data at the first time moment.
        input_data_next(np.ndarray): Atmospheric data six later.
        batch_size (int, optional): Batch size, now only support 1. Defaults to 1.

    Returns:
        List[np.ndarray]: Prediction for next 56 hours.
    """
    if batch_size != 1:
        raise ValueError(
            f"FengWuPredictor only support batch_size=1, but got {batch_size}"
        )

    # process data
    input_data = self._preprocess_data(input_data_prev, input_data_next)

    output_data_list = []
    # prepare input dict
    for _ in range(self.PREDICT_TIMESTAMP):
        input_dict = {
            self.input_names[0]: input_data,
        }

        # run predictor
        output_data = self.predictor.run(None, input_dict)[0]
        input_data = np.concatenate(
            (
                input_data[:, self.NUM_ATMOSPHERIC_FEATURES :],
examples/fengwu/conf/fengwu.yaml
# inference settings
INFER:
  pretrained_model_path: null
  export_path: inference/fengwu_v2
  onnx_path: ${INFER.export_path}.onnx
  device: gpu
  engine: onnx
  precision: fp32
  ir_optim: false
  min_subgraph_size: 30
  gpu_mem: 100
  gpu_id: 0
  max_batch_size: 1
  num_cpu_threads: 10
  batch_size: 1
  mean_path: ./data_mean.npy
  std_path: ./data_std.npy
  input_file: './data/input1.npy'
  input_next_file: './data/input2.npy'

Among them, input_file and input_next_file represent the meteorological data at the start time and the meteorological data 6 hours later input to the network model respectively.

4. Result Visualization

The model inference result contains 56 npy files, representing meteorological data every 6 hours for the next 14 days starting from the prediction time point. Result visualization requires first converting the data from npy to NetCDF format, and then using ncvue for viewing.

  1. Install dependencies

    pip install cdsapi netCDF4 ncvue
    

  2. Use script for data conversion

    python convert_data.py
    

  3. Use ncvue to open the converted NetCDF file. For detailed instructions on ncvue, see ncvue official documentation

5. Complete Code

examples/fengwu/predict.py
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from os import path as osp
from typing import List

import hydra
import numpy as np
import paddle
from omegaconf import DictConfig
from packaging import version

from deploy.python_infer import base
from ppsci.utils import logger


class FengWuPredictor(base.Predictor):
    """General predictor for FengWu model.

    Args:
        cfg (DictConfig): Running configuration.
    """

    # 14 day with time-interval of six hours
    PREDICT_TIMESTAMP = int(14 * 24 / 6)
    # Where 69 represents 69 atmospheric features, The first four variables are surface variables in the order of ['u10', 'v10', 't2m', 'msl'],
    # followed by non-surface variables in the order of ['z', 'q', 'u', 'v', 't']. Each data has 13 levels, which are ordered as
    # [50, 100, 150, 200, 250, 300, 400, 500, 600, 700, 850, 925, 1000].
    # Therefore, the order of the 69 variables is [u10, v10, t2m, msl, z50, z100, ..., z1000, q50, q100, ..., q1000, t50, t100, ..., t1000].
    NUM_ATMOSPHERIC_FEATURES = 69

    def __init__(
        self,
        cfg: DictConfig,
    ):
        assert cfg.INFER.engine == "onnx", "FengWu engine only supports 'onnx'."

        super().__init__(
            pdmodel_path=None,
            pdiparams_path=None,
            device=cfg.INFER.device,
            engine=cfg.INFER.engine,
            precision=cfg.INFER.precision,
            onnx_path=cfg.INFER.onnx_path,
            ir_optim=cfg.INFER.ir_optim,
            min_subgraph_size=cfg.INFER.min_subgraph_size,
            gpu_mem=cfg.INFER.gpu_mem,
            gpu_id=cfg.INFER.gpu_id,
            max_batch_size=cfg.INFER.max_batch_size,
            num_cpu_threads=cfg.INFER.num_cpu_threads,
        )
        self.log_freq = cfg.log_freq

        # get input names
        self.input_names = [
            input_node.name for input_node in self.predictor.get_inputs()
        ]

        # get output names
        self.output_names = [
            output_node.name for output_node in self.predictor.get_outputs()
        ]

        # load mean and std data
        self.data_mean = np.load(cfg.INFER.mean_path)[:, np.newaxis, np.newaxis]
        self.data_std = np.load(cfg.INFER.std_path)[:, np.newaxis, np.newaxis]

    def _preprocess_data(
        self, input_data_prev: np.ndarray, input_data_next: np.ndarray
    ) -> np.ndarray:
        input_data_prev_after_norm = (
            input_data_prev.astype("float32") - self.data_mean
        ) / self.data_std
        input_data_next_after_norm = (
            input_data_next.astype("float32") - self.data_mean
        ) / self.data_std
        input_data = np.concatenate(
            (input_data_prev_after_norm, input_data_next_after_norm), axis=0
        )[np.newaxis, :, :, :]
        input_data = input_data.astype(np.float32)

        return input_data

    def predict(
        self,
        input_data_prev: np.ndarray,
        input_data_next: np.ndarray,
        batch_size: int = 1,
    ) -> List[np.ndarray]:
        """Predicts the output of the yinglong model for the given input.

        Args:
            input_data_prev(np.ndarray): Atomospheric data at the first time moment.
            input_data_next(np.ndarray): Atmospheric data six later.
            batch_size (int, optional): Batch size, now only support 1. Defaults to 1.

        Returns:
            List[np.ndarray]: Prediction for next 56 hours.
        """
        if batch_size != 1:
            raise ValueError(
                f"FengWuPredictor only support batch_size=1, but got {batch_size}"
            )

        # process data
        input_data = self._preprocess_data(input_data_prev, input_data_next)

        output_data_list = []
        # prepare input dict
        for _ in range(self.PREDICT_TIMESTAMP):
            input_dict = {
                self.input_names[0]: input_data,
            }

            # run predictor
            output_data = self.predictor.run(None, input_dict)[0]
            input_data = np.concatenate(
                (
                    input_data[:, self.NUM_ATMOSPHERIC_FEATURES :],
                    output_data[:, : self.NUM_ATMOSPHERIC_FEATURES],
                ),
                axis=1,
            )
            output_data = (
                output_data[0, : self.NUM_ATMOSPHERIC_FEATURES] * self.data_std
            ) + self.data_mean

            output_data_list.append(output_data)

        return output_data_list


def inference(cfg: DictConfig):
    # log paddlepaddle's version
    if version.Version(paddle.__version__) != version.Version("0.0.0"):
        paddle_version = paddle.__version__
        if version.Version(paddle.__version__) < version.Version("2.6.0"):
            logger.warning(
                f"Detected paddlepaddle version is '{paddle_version}', "
                "currently it is recommended to use release 2.6 or develop version."
            )
    else:
        paddle_version = f"develop({paddle.version.commit[:7]})"

    logger.info(f"Using paddlepaddle {paddle_version}")

    # create predictor
    predictor = FengWuPredictor(cfg)

    # load data
    input_data_prev = np.load(cfg.INFER.input_file).astype(np.float32)
    input_data_next = np.load(cfg.INFER.input_next_file).astype(np.float32)

    # run predictor
    output_data_list = predictor.predict(input_data_prev, input_data_next)

    # save predict data
    for i in range(FengWuPredictor.PREDICT_TIMESTAMP):
        output_save_path = osp.join(cfg.output_dir, f"output_{i}.npy")
        np.save(output_save_path, output_data_list[i])
        logger.info(f"Save output with timestamp:{i} to {output_save_path}.")


@hydra.main(version_base=None, config_path="./conf", config_name="fengwu.yaml")
def main(cfg: DictConfig):
    if cfg.mode == "infer":
        inference(cfg)
    else:
        raise ValueError(f"cfg.mode should in ['infer'], but got '{cfg.mode}'")


if __name__ == "__main__":
    main()

6. Result Display

The figure below shows the model's prediction result of the average sea level pressure for the next 6 hours. More indicators can be viewed using ncvue.

result

Average sea level pressure in the next 6 hours

7. References