跳转至

Utils.symbolic(符号计算) 模块

ppsci.utils.symbolic

Sympy to python function conversion module

lambdify(expr, models=None, extra_parameters=None, graph_filename=None)

Convert sympy expression to callable function.

Parameters:

Name Type Description Default
expr Basic

Sympy expression to be converted.

required
models Optional[Union[Arch, Tuple[Arch, ...]]]

Model(s) for computing forward result in LayerNode.

None
extra_parameters Optional[ParameterList]

Extra learnable parameters. Defaults to None.

None
graph_filename Optional[str]

Save computational graph to graph_filename.png for given expr, if graph_filename is not None and a valid string, such as 'momentum_x'. Defaults to None.

None

Returns:

Name Type Description
ComposedNode ComposedNode

Callable object for computing expr with necessary input(s) data in dict given.

Examples:

>>> import paddle
>>> import ppsci
>>> import sympy as sp
>>> a, b, c, x, y = sp.symbols("a b c x y")
>>> u = sp.Function("u")(x, y)
>>> v = sp.Function("v")(x, y)
>>> z = -a + b * (c ** 2) + u * v + 2.3
>>> model = ppsci.arch.MLP(("x", "y"), ("u", "v"), 4, 16)
>>> batch_size = 13
>>> a_tensor = paddle.randn([batch_size, 1])
>>> b_tensor = paddle.randn([batch_size, 1])
>>> c_tensor = paddle.randn([batch_size, 1])
>>> x_tensor = paddle.randn([batch_size, 1])
>>> y_tensor = paddle.randn([batch_size, 1])
>>> model_output_dict = model({"x": x_tensor, "y": y_tensor})
>>> u_tensor, v_tensor = model_output_dict["u"], model_output_dict["v"]
>>> z_tensor_manually = (
...     -a_tensor + b_tensor * (c_tensor ** 2)
...     + u_tensor * v_tensor + 2.3
... )
>>> z_tensor_sympy = ppsci.lambdify(z, model)(
...     {
...         "a": a_tensor,
...         "b": b_tensor,
...         "c": c_tensor,
...         "x": x_tensor,
...         "y": y_tensor,
...     }
... )
>>> paddle.allclose(z_tensor_manually, z_tensor_sympy).item()
True
Source code in ppsci/utils/symbolic.py
def lambdify(
    expr: sp.Basic,
    models: Optional[Union[arch.Arch, Tuple[arch.Arch, ...]]] = None,
    extra_parameters: Optional[Sequence[paddle.Tensor]] = None,
    graph_filename: Optional[str] = None,
) -> ComposedNode:
    """Convert sympy expression to callable function.

    Args:
        expr (sp.Basic): Sympy expression to be converted.
        models (Optional[Union[arch.Arch, Tuple[arch.Arch, ...]]]): Model(s) for
            computing forward result in `LayerNode`.
        extra_parameters (Optional[nn.ParameterList]): Extra learnable parameters.
            Defaults to None.
        graph_filename (Optional[str]): Save computational graph to `graph_filename.png`
            for given `expr`, if `graph_filename` is not None and a valid string,
            such as 'momentum_x'. Defaults to None.

    Returns:
        ComposedNode: Callable object for computing expr with necessary input(s) data
            in dict given.

    Examples:
        >>> import paddle
        >>> import ppsci
        >>> import sympy as sp

        >>> a, b, c, x, y = sp.symbols("a b c x y")
        >>> u = sp.Function("u")(x, y)
        >>> v = sp.Function("v")(x, y)
        >>> z = -a + b * (c ** 2) + u * v + 2.3

        >>> model = ppsci.arch.MLP(("x", "y"), ("u", "v"), 4, 16)

        >>> batch_size = 13
        >>> a_tensor = paddle.randn([batch_size, 1])
        >>> b_tensor = paddle.randn([batch_size, 1])
        >>> c_tensor = paddle.randn([batch_size, 1])
        >>> x_tensor = paddle.randn([batch_size, 1])
        >>> y_tensor = paddle.randn([batch_size, 1])

        >>> model_output_dict = model({"x": x_tensor, "y": y_tensor})
        >>> u_tensor, v_tensor = model_output_dict["u"], model_output_dict["v"]

        >>> z_tensor_manually = (
        ...     -a_tensor + b_tensor * (c_tensor ** 2)
        ...     + u_tensor * v_tensor + 2.3
        ... )
        >>> z_tensor_sympy = ppsci.lambdify(z, model)(
        ...     {
        ...         "a": a_tensor,
        ...         "b": b_tensor,
        ...         "c": c_tensor,
        ...         "x": x_tensor,
        ...         "y": y_tensor,
        ...     }
        ... )

        >>> paddle.allclose(z_tensor_manually, z_tensor_sympy).item()
        True
    """

    # NOTE: Those simplify methods may complicate given expr instead, so not use here
    # simplify expression to reduce nodes in tree
    # expr = sp.nsimplify(expr)
    # expr = sp.expand(expr)
    # expr = sp.simplify(expr)

    # remove 1.0 from sympy expression tree
    expr = expr.subs(1.0, 1)

    # convert sympy expression tree to list of nodes in post-order
    sympy_nodes: List[sp.Basic] = []
    sympy_nodes = _post_traverse(expr, sympy_nodes)

    # remove unnecessary symbol nodes already in input dict(except for parameter symbol)
    if not extra_parameters:
        extra_parameters = ()
    _parameter_names = tuple(param.name for param in extra_parameters)
    sympy_nodes = [
        node
        for node in sympy_nodes
        if (not node.is_Symbol) or (_cvt_to_key(node) in _parameter_names)
    ]

    # remove duplicates with topological order kept
    sympy_nodes = list(dict.fromkeys(sympy_nodes))

    if isinstance(models, arch.ModelList):
        models = tuple(models.model_list[i] for i in range(len(models.model_list)))
    if not isinstance(models, (tuple, list)):
        models = (models,)

    # convert sympy node to callable node
    callable_nodes = []
    for i, node in enumerate(sympy_nodes):
        if isinstance(
            node, tuple(SYMPY_TO_PADDLE.keys()) + (sp.Add, sp.Mul, sp.Derivative)
        ):
            callable_nodes.append(OperatorNode(node))
        elif isinstance(node, sp.Function):
            if node.name == equation.DETACH_FUNC_NAME:
                callable_nodes.append(DetachNode(node))
            else:
                match_index = None
                for j, model in enumerate(models):
                    if str(node.func.name) in model.output_keys:
                        callable_nodes.append(
                            LayerNode(
                                node,
                                model,
                            )
                        )
                        if match_index is not None:
                            raise ValueError(
                                f"Name of function({node}) should be unique along given"
                                f" models, but got same output_key({node.func.name}) "
                                f"in models[{match_index}] and models[{j}]."
                            )
                        match_index = j
                if match_index is None:
                    raise ValueError(
                        f"Node {node} can not match any model in given model(s)."
                    )
        elif node.is_Number or node.is_NumberSymbol:
            callable_nodes.append(ConstantNode(node))
        elif isinstance(node, sp.Symbol):
            callable_nodes.append(
                ParameterNode(
                    node,
                    *[param for param in extra_parameters if param.name == node.name],
                )
            )
        else:
            raise NotImplementedError(f"The node {node} is not supported in lambdify.")

    # NOTE: Visualize computational graph using 'pygraphviz'
    if isinstance(graph_filename, str):
        _visualize_graph(sympy_nodes, graph_filename)

    # Compose callable nodes into one callable object
    return ComposedNode(callable_nodes)

最后更新: November 17, 2023
创建日期: November 6, 2023