Unverified 提交 060d85f3 authored 作者: Jesse Grabowski's avatar Jesse Grabowski 提交者: GitHub

Fix gradient of `minimize` and `root` wrt higher-dimensional arguments (#1806)

* More precise imports * Fix gradient of `minimize` and `root` wrt higher-dimensional arguments Also: * Allow several inputs to be optimized * Handle disconnected and undefined gradients --------- Co-authored-by: 's avatarricardoV94 <ricardo.vieira1994@gmail.com>
上级 f3601225
......@@ -12,8 +12,8 @@ from pytensor import config
from pytensor.gradient import grad_not_implemented
from pytensor.graph import Apply, Op
from pytensor.link.c.op import COp
from pytensor.tensor import TensorType, Variable, specify_broadcastable, tensor
from pytensor.tensor.type import complex_dtypes
from pytensor.tensor.shape import specify_broadcastable
from pytensor.tensor.type import TensorType, Variable, complex_dtypes, tensor
def structured_elemwise(tensor_op):
......
......@@ -30,8 +30,8 @@ from pytensor.sparse.math import (
)
from pytensor.sparse.type import SparseTensorType
from pytensor.sparse.utils import hash_from_sparse
from pytensor.tensor import iscalar
from pytensor.tensor.shape import shape
from pytensor.tensor.type import iscalar
from pytensor.tensor.variable import (
TensorConstant,
TensorVariable,
......
......@@ -6,24 +6,34 @@ import numpy as np
import pytensor.scalar as ps
from pytensor.compile.function import function
from pytensor.gradient import grad, grad_not_implemented, jacobian
from pytensor.gradient import DisconnectedType, grad, jacobian
from pytensor.graph.basic import Apply, Constant
from pytensor.graph.fg import FunctionGraph
from pytensor.graph.op import ComputeMapType, HasInnerGraph, Op, StorageMapType
from pytensor.graph.null_type import NullType
from pytensor.graph.op import (
ComputeMapType,
HasInnerGraph,
Op,
StorageMapType,
io_connection_pattern,
)
from pytensor.graph.replace import graph_replace
from pytensor.graph.traversal import ancestors, truncated_graph_inputs
from pytensor.graph.traversal import (
ancestors,
truncated_graph_inputs,
)
from pytensor.scalar import ScalarType, ScalarVariable
from pytensor.tensor import as_tensor_variable
from pytensor.tensor.basic import (
atleast_2d,
concatenate,
scalar_from_tensor,
tensor,
tensor_from_scalar,
zeros_like,
)
from pytensor.tensor.math import dot
from pytensor.tensor.math import tensordot
from pytensor.tensor.reshape import pack, unpack
from pytensor.tensor.slinalg import solve
from pytensor.tensor.type import DenseTensorType
from pytensor.tensor.variable import TensorVariable, Variable
......@@ -143,36 +153,6 @@ def _find_optimization_parameters(
]
def _get_parameter_grads_from_vector(
grad_wrt_args_vector: TensorVariable,
x_star: TensorVariable,
args: Sequence[TensorVariable | ScalarVariable],
output_grad: TensorVariable,
) -> list[TensorVariable | ScalarVariable]:
"""
Given a single concatenated vector of objective function gradients with respect to raveled optimization parameters,
returns the contribution of each parameter to the total loss function, with the unraveled shape of the parameter.
"""
cursor = 0
grad_wrt_args = []
for arg in args:
arg_shape = arg.shape
arg_size = arg_shape.prod()
arg_grad = grad_wrt_args_vector[:, cursor : cursor + arg_size].reshape(
(*x_star.shape, *arg_shape)
)
grad_wrt_arg = dot(output_grad, arg_grad)
if isinstance(arg.type, ScalarType):
grad_wrt_arg = scalar_from_tensor(grad_wrt_arg)
grad_wrt_args.append(grad_wrt_arg)
cursor += arg_size
return grad_wrt_args
class ScipyWrapperOp(Op, HasInnerGraph):
"""Shared logic for scipy optimization ops"""
......@@ -233,6 +213,22 @@ class ScipyWrapperOp(Op, HasInnerGraph):
return Apply(self, inputs, [self.inner_inputs[0].type(), ps.bool("success")])
def connection_pattern(self, node=None):
"""
All Ops that inherit from ScipyWrapperOp share the same connection pattern logic, because they all share the
same output structure. There are two outputs: the optimized variable, and a success flag. The success flag is
not differentiable, so it is never connected. The optimized variable is connected only to inputs that are
both connected to the objective function and of float dtype.
"""
fgraph = self.fgraph
fx = fgraph.outputs[0]
return [
# Every input is disonnected to the second output (success)
# And may or not be connected to the first output (opt_x)
[connected, False]
for [connected] in io_connection_pattern(fgraph.inputs, [fx])
]
class ScipyScalarWrapperOp(ScipyWrapperOp):
def build_fn(self):
......@@ -251,6 +247,74 @@ class ScipyScalarWrapperOp(ScipyWrapperOp):
self._fn_wrapped = LRUCache1(fn)
def compute_implicit_gradients(
self,
x_star: TensorVariable,
args: Sequence[TensorVariable | ScalarVariable],
output_grad: TensorVariable,
is_minimization: bool,
):
"""
Compute gradients of a scalar optimization problem with respect to its parameters.
For details, see the docstring of ScipyVectorWrapperOp.compute_implicit_gradients.
Parameters
----------
x_star : TensorVariable
The symbolic solution of the optimization problem.
args : Sequence of TensorVariable or ScalarVariable
The parameters of the optimization problem.
output_grad : TensorVariable
The gradient of the output of the optimization Op with respect to some scalar loss.
is_minimization : bool
Whether the optimization problem is a minimization problem. If False, it is assumed to be a root-finding
problem.
"""
fgraph = self.fgraph
inner_x, *inner_args = self.inner_inputs
inner_fx = self.inner_outputs[0]
if is_minimization:
inner_fx = grad(inner_fx, inner_x)
df_dx, *arg_grads = grad(
inner_fx,
[inner_x, *inner_args],
disconnected_inputs="ignore",
null_gradients="return",
return_disconnected="disconnected",
)
outer_arg_grad_map = dict(zip(args, arg_grads))
valid_args_and_grads = [
(arg, g)
for arg, g in outer_arg_grad_map.items()
if not isinstance(g.type, DisconnectedType | NullType)
]
if len(valid_args_and_grads) == 0:
# No differentiable arguments, return disconnected gradients
return arg_grads
outer_args_to_diff, df_dthetas = zip(*valid_args_and_grads)
replace = dict(zip(fgraph.inputs, (x_star, *args), strict=True))
df_dx_star, *df_dthetas_stars = graph_replace(
[df_dx, *df_dthetas], replace=replace
)
arg_to_grad = dict(zip(outer_args_to_diff, df_dthetas_stars))
grad_wrt_args = [
(-arg_to_grad[arg] / df_dx_star) * output_grad
if arg in arg_to_grad
else outer_arg_grad_map[arg]
for arg in args
]
return grad_wrt_args
class ScipyVectorWrapperOp(ScipyWrapperOp):
def build_fn(self):
......@@ -269,43 +333,31 @@ class ScipyVectorWrapperOp(ScipyWrapperOp):
# self.fgraph = fn.maker.fgraph
self._fn_wrapped = LRUCache1(fn)
def scalar_implict_optimization_grads(
inner_fx: TensorVariable,
inner_x: TensorVariable,
inner_args: Sequence[TensorVariable | ScalarVariable],
args: Sequence[TensorVariable | ScalarVariable],
def compute_implicit_gradients(
self,
x_star: TensorVariable,
output_grad: TensorVariable,
fgraph: FunctionGraph,
) -> list[TensorVariable | ScalarVariable]:
df_dx, *df_dthetas = grad(
inner_fx, [inner_x, *inner_args], disconnected_inputs="ignore"
)
replace = dict(zip(fgraph.inputs, (x_star, *args), strict=True))
df_dx_star, *df_dthetas_stars = graph_replace([df_dx, *df_dthetas], replace=replace)
grad_wrt_args = [
(-df_dtheta_star / df_dx_star) * output_grad
for df_dtheta_star in df_dthetas_stars
]
return grad_wrt_args
def implict_optimization_grads(
df_dx: TensorVariable,
df_dtheta_columns: Sequence[TensorVariable],
args: Sequence[TensorVariable | ScalarVariable],
x_star: TensorVariable,
output_grad: TensorVariable,
fgraph: FunctionGraph,
) -> list[TensorVariable | ScalarVariable]:
is_minimization: bool,
):
r"""
Compute gradients of an optimization problem with respect to its parameters.
The gradents are computed using the implicit function theorem. Given a fuction `f(x, theta) =`, and a function
Parameters
----------
x_star : TensorVariable
The symbolic solution of the optimization problem.
args : Sequence of TensorVariable or ScalarVariable
The parameters of the optimization problem.
output_grad : TensorVariable
The gradient of the output of the optimization Op with respect to some scalar loss.
is_minimization : bool
Whether the optimization problem is a minimization problem. If False, it is assumed to be a root-finding
problem.
Notes
-----
The gradents are computed using the implicit function theorem. Given a fuction `f(x, theta) = 0`, and a function
`x_star(theta)` that, given input parameters theta returns `x` such that `f(x_star(theta), theta) = 0`, we can
compute the gradients of `x_star` with respect to `theta` as follows:
......@@ -321,45 +373,105 @@ def implict_optimization_grads(
.. math::
\frac{d x^*(\theta)}{d \theta} = - \left(\frac{\partial f}{\partial x}\left(x^*(\theta), \theta\right)\right)^{-1} \frac{\partial f}{\partial \theta}\left(x^*(\theta), \theta\right)
\frac{d x^*(\theta)}{d \theta} = - \left(\frac{\partial f}{\partial x}\left(x^*(\theta),
\theta\right)\right)^{-1} \frac{\partial f}{\partial \theta}\left(x^*(\theta), \theta\right)
Note that this method assumes `f(x_star(theta), theta) = 0`; so it is not immediately applicable to a minimization
problem, where `f` is the objective function. In this case, we instead take `f` to be the gradient of the objective
function, which *is* indeed zero at the minimum.
Parameters
----------
df_dx : Variable
The Jacobian of the objective function with respect to the variable `x`.
df_dtheta_columns : Sequence[Variable]
The Jacobians of the objective function with respect to the optimization parameters `theta`.
Each column (or columns) corresponds to a different parameter. Should be returned by pytensor.gradient.jacobian.
args : Sequence[Variable]
The optimization parameters `theta`.
x_star : Variable
Symbolic graph representing the value of the variable `x` such that `f(x_star, theta) = 0 `.
output_grad : Variable
The gradient of the output with respect to the objective function.
fgraph : FunctionGraph
The function graph that contains the inputs and outputs of the optimization problem.
"""
df_dtheta = concatenate(
[atleast_2d(jac_col, left=False) for jac_col in df_dtheta_columns],
axis=-1,
fgraph = self.fgraph
inner_x, *inner_args = self.inner_inputs
implicit_f = self.inner_outputs[0]
df_dx, *arg_grads = grad(
implicit_f.sum(),
[inner_x, *inner_args],
disconnected_inputs="ignore",
null_gradients="return",
return_disconnected="disconnected",
)
replace = dict(zip(fgraph.inputs, (x_star, *args), strict=True))
inner_args_to_diff = [
arg
for arg, g in zip(inner_args, arg_grads)
if not isinstance(g.type, DisconnectedType | NullType)
]
if len(inner_args_to_diff) == 0:
# No differentiable arguments, return disconnected/null gradients
return arg_grads
outer_args_to_diff = [
arg
for inner_arg, arg in zip(inner_args, args)
if inner_arg in inner_args_to_diff
]
invalid_grad_map = {
arg: g for arg, g in zip(args, arg_grads) if arg not in outer_args_to_diff
}
if is_minimization:
implicit_f = grad(implicit_f, inner_x)
# Gradients are computed using the inner graph of the optimization op, not the actual inputs/outputs of the op.
packed_inner_args, packed_arg_shapes, implicit_f = pack_inputs_of_objective(
implicit_f,
inner_args_to_diff,
)
df_dx, df_dtheta = jacobian(
implicit_f,
[inner_x, packed_inner_args],
disconnected_inputs="ignore",
vectorize=self.use_vectorized_jac,
)
# Replace inner inputs (abstract dummies) with outer inputs (the actual user-provided symbols)
# at the solution point. Innner arguments aren't needed anymore, delete them to avoid accidental references.
del inner_x
del inner_args
inner_to_outer_map = dict(zip(fgraph.inputs, (x_star, *args)))
df_dx_star, df_dtheta_star = graph_replace(
[atleast_2d(df_dx), df_dtheta], replace=replace
[df_dx, df_dtheta], inner_to_outer_map
)
grad_wrt_args_vector = solve(-df_dx_star, df_dtheta_star)
grad_wrt_args = _get_parameter_grads_from_vector(
grad_wrt_args_vector, x_star, args, output_grad
if df_dtheta_star.ndim == 0 or df_dx_star.ndim == 0:
grad_wrt_args_packed = -(df_dtheta_star / df_dx_star)
else:
grad_wrt_args_packed = solve(-atleast_2d(df_dx_star), df_dtheta_star)
if packed_arg_shapes is not None:
packed_shapes_from_outer = graph_replace(
packed_arg_shapes, inner_to_outer_map, strict=False
)
grad_wrt_args = unpack(
grad_wrt_args_packed,
packed_shapes_from_outer,
keep_axes=None if all(inp.ndim == 0 for inp in (x_star, *args)) else 0,
)
else:
grad_wrt_args = [grad_wrt_args_packed]
return grad_wrt_args
arg_to_grad = dict(zip(outer_args_to_diff, grad_wrt_args))
final_grads = []
for arg in args:
arg_grad = arg_to_grad.get(arg, None)
if arg_grad is None:
final_grads.append(invalid_grad_map[arg])
continue
if arg_grad.ndim > 0 and output_grad.ndim > 0:
g = tensordot(output_grad, arg_grad, [[0], [0]])
else:
g = arg_grad * output_grad
if isinstance(arg.type, ScalarType) and isinstance(g, TensorVariable):
g = scalar_from_tensor(g)
final_grads.append(g)
return final_grads
class MinimizeScalarOp(ScipyScalarWrapperOp):
......@@ -418,36 +530,17 @@ class MinimizeScalarOp(ScipyScalarWrapperOp):
def L_op(self, inputs, outputs, output_grads):
# TODO: Handle disconnected inputs
x, *args = inputs
if non_supported_types := tuple(
inp.type
for inp in inputs
if not isinstance(inp.type, DenseTensorType | ScalarType)
):
# TODO: Support SparseTensorTypes
# TODO: Remaining types are likely just disconnected anyway
msg = f"Minimize gradient not implemented due to inputs of type {non_supported_types}"
return [
grad_not_implemented(self, i, inp, msg) for i, inp in enumerate(inputs)
]
x_star, _ = outputs
output_grad, _ = output_grads
inner_x, *inner_args = self.fgraph.inputs
inner_fx = self.fgraph.outputs[0]
implicit_f = grad(inner_fx, inner_x)
grad_wrt_args = scalar_implict_optimization_grads(
inner_fx=implicit_f,
inner_x=inner_x,
inner_args=inner_args,
args=args,
d_xstar_d_theta = self.compute_implicit_gradients(
x_star=x_star,
args=args,
output_grad=output_grad,
fgraph=self.fgraph,
is_minimization=True,
)
return [zeros_like(x), *grad_wrt_args]
return [zeros_like(x), *d_xstar_d_theta]
def minimize_scalar(
......@@ -578,54 +671,62 @@ class MinimizeOp(ScipyVectorWrapperOp):
outputs[1][0] = np.bool_(res.success)
def L_op(self, inputs, outputs, output_grads):
# TODO: Handle disconnected inputs
x, *args = inputs
if non_supported_types := tuple(
inp.type
for inp in inputs
if not isinstance(inp.type, DenseTensorType | ScalarType)
):
# TODO: Support SparseTensorTypes
# TODO: Remaining types are likely just disconnected anyway
msg = f"MinimizeOp gradient not implemented due to inputs of type {non_supported_types}"
return [
grad_not_implemented(self, i, inp, msg) for i, inp in enumerate(inputs)
]
x_star, _success = outputs
output_grad, _ = output_grads
inner_x, *inner_args = self.fgraph.inputs
inner_fx = self.fgraph.outputs[0]
implicit_f = grad(inner_fx, inner_x)
df_dx, *df_dtheta_columns = jacobian(
implicit_f,
[inner_x, *inner_args],
disconnected_inputs="ignore",
vectorize=self.use_vectorized_jac,
)
grad_wrt_args = implict_optimization_grads(
df_dx=df_dx,
df_dtheta_columns=df_dtheta_columns,
args=args,
d_xstar_d_theta = self.compute_implicit_gradients(
x_star=x_star,
args=args,
output_grad=output_grad,
fgraph=self.fgraph,
is_minimization=True,
)
return [zeros_like(x), *grad_wrt_args]
return [zeros_like(x), *d_xstar_d_theta]
def pack_inputs_of_objective(
objective: TensorVariable,
x: TensorVariable | Sequence[TensorVariable],
) -> tuple[TensorVariable, list[TensorVariable] | None, TensorVariable]:
"""
Packs inputs `x` into a single tensor if `x` is a sequence of tensors, and rewrites the `objective` graph
to use the packed input. Also returns the packed shapes for unpacking the output later.
If `x` is a single tensor, it is returned as is, and no rewriting is done.
"""
packed_shapes = None
if not isinstance(x, Sequence):
packed_input = x
elif len(x) == 1:
packed_input = x[0]
else:
packed_input, packed_shapes = pack(*x)
unpacked_output = unpack(packed_input, packed_shapes)
objective = graph_replace(
objective,
{
xi: ui.astype(xi.type.dtype)
if not (isinstance(xi.type, ScalarType))
else scalar_from_tensor(ui.astype(xi.type.dtype))
for xi, ui in zip(x, unpacked_output)
},
)
return packed_input, packed_shapes, objective
def minimize(
objective: TensorVariable,
x: TensorVariable,
x: TensorVariable | Sequence[TensorVariable],
method: str = "BFGS",
jac: bool = True,
hess: bool = False,
use_vectorized_jac: bool = False,
optimizer_kwargs: dict | None = None,
) -> tuple[TensorVariable, TensorVariable]:
) -> tuple[TensorVariable | tuple[TensorVariable, ...], TensorVariable]:
"""
Minimize a scalar objective function using scipy.optimize.minimize.
......@@ -633,8 +734,8 @@ def minimize(
----------
objective : TensorVariable
The objective function to minimize. This should be a pytensor variable representing a scalar value.
x: TensorVariable
The variable with respect to which the objective function is minimized. It must be an input to the
x: TensorVariable or list of TensorVariable
The variable or variables with respect to which the objective function is minimized. It must be an input to the
computational graph of `objective`.
method: str, optional
The optimization method to use. Default is "BFGS". See scipy.optimize.minimize for other options.
......@@ -653,18 +754,21 @@ def minimize(
Returns
-------
solution: TensorVariable
The optimized value of the vector of inputs `x` that minimizes `objective(x, *args)`. If the success flag
solution: TensorVariable or tuple of TensorVariable
The optimized value of each of inputs in `x` that minimizes `objective(x, *args)`. If the success flag
is False, this will be the final state of the minimization routine, but not necessarily a minimum.
success: TensorVariable
Symbolic boolean flag indicating whether the minimization routine reported convergence to a minimum
value, based on the requested convergence criteria.
"""
args = _find_optimization_parameters(objective, x)
objective = as_tensor_variable(objective)
packed_input, packed_shapes, objective = pack_inputs_of_objective(objective, x)
args = _find_optimization_parameters(objective, packed_input)
minimize_op = MinimizeOp(
x,
packed_input,
*args,
objective=objective,
method=method,
......@@ -674,7 +778,10 @@ def minimize(
optimizer_kwargs=optimizer_kwargs,
)
solution, success = minimize_op(x, *args)
solution, success = minimize_op(packed_input, *args)
if packed_shapes is not None:
solution = unpack(solution, packed_shapes)
return solution, success
......@@ -757,36 +864,18 @@ class RootScalarOp(ScipyScalarWrapperOp):
outputs[1][0] = np.bool_(res.converged)
def L_op(self, inputs, outputs, output_grads):
# TODO: Handle disconnected inputs
x, *args = inputs
if non_supported_types := tuple(
inp.type
for inp in inputs
if not isinstance(inp.type, DenseTensorType | ScalarType)
):
# TODO: Support SparseTensorTypes
# TODO: Remaining types are likely just disconnected anyway
msg = f"RootScalarOp gradient not implemented due to inputs of type {non_supported_types}"
return [
grad_not_implemented(self, i, inp, msg) for i, inp in enumerate(inputs)
]
x_star, _ = outputs
output_grad, _ = output_grads
inner_x, *inner_args = self.fgraph.inputs
inner_fx = self.fgraph.outputs[0]
grad_wrt_args = scalar_implict_optimization_grads(
inner_fx=inner_fx,
inner_x=inner_x,
inner_args=inner_args,
args=args,
d_xstar_d_theta = self.compute_implicit_gradients(
x_star=x_star,
args=args,
output_grad=output_grad,
fgraph=self.fgraph,
is_minimization=False,
)
return [zeros_like(x), *grad_wrt_args]
return [zeros_like(x), *d_xstar_d_theta]
def root_scalar(
......@@ -948,57 +1037,28 @@ class RootOp(ScipyVectorWrapperOp):
outputs[1][0] = np.bool_(res.success)
def L_op(self, inputs, outputs, output_grads):
# TODO: Handle disconnected inputs
x, *args = inputs
if non_supported_types := tuple(
inp.type
for inp in inputs
if not isinstance(inp.type, DenseTensorType | ScalarType)
):
# TODO: Support SparseTensorTypes
# TODO: Remaining types are likely just disconnected anyway
msg = f"RootOp gradient not implemented due to inputs of type {non_supported_types}"
return [
grad_not_implemented(self, i, inp, msg) for i, inp in enumerate(inputs)
]
x_star, _ = outputs
output_grad, _ = output_grads
inner_x, *inner_args = self.fgraph.inputs
inner_fx = self.fgraph.outputs[0]
df_dx = (
jacobian(inner_fx, inner_x, vectorize=self.use_vectorized_jac)
if not self.jac
else self.fgraph.outputs[1]
)
df_dtheta_columns = jacobian(
inner_fx,
inner_args,
disconnected_inputs="ignore",
vectorize=self.use_vectorized_jac,
)
grad_wrt_args = implict_optimization_grads(
df_dx=df_dx,
df_dtheta_columns=df_dtheta_columns,
args=args,
d_xstar_d_theta = self.compute_implicit_gradients(
x_star=x_star,
args=args,
output_grad=output_grad,
fgraph=self.fgraph,
is_minimization=False,
)
return [zeros_like(x), *grad_wrt_args]
return [zeros_like(x), *d_xstar_d_theta]
def root(
equations: TensorVariable,
variables: TensorVariable,
variables: TensorVariable | Sequence[TensorVariable],
method: str = "hybr",
jac: bool = True,
use_vectorized_jac: bool = False,
optimizer_kwargs: dict | None = None,
) -> tuple[TensorVariable, TensorVariable]:
) -> tuple[TensorVariable | Sequence[TensorVariable], TensorVariable]:
"""
Find roots of a system of equations using scipy.optimize.root.
......@@ -1032,11 +1092,13 @@ def root(
success: TensorVariable
Boolean indicating whether the root-finding was successful. If True, the solution is a root of the equation
"""
args = _find_optimization_parameters(equations, variables)
packed_variables, packed_shapes, equations = pack_inputs_of_objective(
equations, variables
)
args = _find_optimization_parameters(equations, packed_variables)
root_op = RootOp(
variables,
packed_variables,
*args,
equations=equations,
method=method,
......@@ -1045,7 +1107,9 @@ def root(
use_vectorized_jac=use_vectorized_jac,
)
solution, success = root_op(variables, *args)
solution, success = root_op(packed_variables, *args)
if packed_shapes is not None:
solution = unpack(solution, packed_shapes)
return solution, success
......
......@@ -4,7 +4,10 @@ import pytest
import pytensor
import pytensor.tensor as pt
from pytensor import Variable, config, function
from pytensor.gradient import NullTypeGradError, disconnected_type
from pytensor.gradient import (
DisconnectedInputError,
disconnected_type,
)
from pytensor.graph import Apply, Op, Type
from pytensor.tensor import alloc, scalar, scalar_from_tensor, tensor_from_scalar
from pytensor.tensor.optimize import minimize, minimize_scalar, root, root_scalar
......@@ -69,8 +72,8 @@ def test_simple_minimize():
rtol=1e-8 if config.floatX == "float64" else 1e-6,
)
def f(x, a, b):
objective = (x - a * b) ** 2
def f(x, a, c):
objective = (x - a * c) ** 2
out = minimize(objective, x)[0]
return out
......@@ -96,7 +99,13 @@ def test_minimize_vector_x(method, jac, hess):
objective = rosenbrock_shifted_scaled(x, a, b)
minimized_x, success = minimize(
objective, x, method=method, jac=jac, hess=hess, optimizer_kwargs={"tol": 1e-16}
objective,
x,
method=method,
jac=jac,
hess=hess,
optimizer_kwargs={"tol": 1e-16},
use_vectorized_jac=True,
)
fn = pytensor.function([x, a, b], [minimized_x, success])
......@@ -119,12 +128,144 @@ def test_minimize_vector_x(method, jac, hess):
def f(x, a, b):
objective = rosenbrock_shifted_scaled(x, a, b)
out = minimize(objective, x)[0]
out = minimize(objective, x, use_vectorized_jac=False)[0]
return out
utt.verify_grad(f, [x0, a_val, b_val], eps=1e-3 if floatX == "float32" else 1e-6)
def test_optimize_multiple_minimands():
"""
Test optimization with many input variables of different shapes, as occurs in a PyMC model.
"""
x0, x1, x2 = pt.dvectors("x1", "x2", "d3")
x3 = pt.dmatrix("x3")
b0, b1, b2 = pt.dscalars("b0", "b1", "b2")
b3 = pt.dvector("b3")
y = pt.dvector("y")
y_hat = x0 * b0 + x1 * b1 + x2 * b2 + x3 @ b3
objective = ((y - y_hat) ** 2).sum()
minimized_x, success = minimize(
objective,
[b0, b1, b2, b3],
jac=True,
hess=True,
method="Newton-CG",
use_vectorized_jac=True,
)
assert len(minimized_x) == 4
fn = pytensor.function([b0, b1, b2, b3, x0, x1, x2, x3, y], [*minimized_x, success])
rng = np.random.default_rng()
X = rng.normal(size=(100, 3)).astype(floatX)
X3 = rng.normal(size=(100, 5)).astype(floatX)
b_vec = rng.normal(size=(8,)).astype(floatX)
true_b = [b_vec[0], b_vec[1], b_vec[2], b_vec[3:]]
true_y = X @ b_vec[0:3] + X3 @ b_vec[3:]
init_b = np.zeros((8,)).astype(floatX)
inputs = (
init_b[0],
init_b[1],
init_b[2],
init_b[3:],
X[:, 0],
X[:, 1],
X[:, 2],
X3,
true_y,
)
*estimated_b, success = fn(*inputs)
assert success
for est, true in zip(estimated_b, true_b):
np.testing.assert_allclose(
est,
true,
atol=1e-5,
rtol=1e-5,
)
def f(b0, b1, b2, b3, x0, x1, x2, x3, y):
y_hat = x0 * b0 + x1 * b1 + x2 * b2 + x3 @ b3
objective = ((y - y_hat) ** 2).sum()
result = minimize(
objective,
[b0, b1, b2, b3],
jac=True,
hess=True,
method="trust-ncg",
use_vectorized_jac=True,
)[0]
return pt.sum([x.sum() for x in result])
utt.verify_grad(f, inputs, eps=1e-6)
def test_minimize_mvn_logp_mu_and_cov():
"""Regression test for https://github.com/pymc-devs/pytensor/issues/1550"""
d = 3
def objective(mu, cov, data):
L = pt.linalg.cholesky(cov)
_, logdet = pt.linalg.slogdet(cov)
v = mu - data
y = pt.linalg.solve_triangular(L, v, lower=True)
quad_term = (y**2).sum()
return 0.5 * (d * pt.log(2 * np.pi) + logdet + quad_term)
data = pt.vector("data", shape=(d,))
mu = pt.vector("mu", shape=(d,))
cov = pt.dmatrix("cov", shape=(d, d))
neg_logp = objective(mu, cov, data)
mu_star, success = minimize(
objective=neg_logp,
x=mu,
method="BFGS",
jac=True,
hess=False,
use_vectorized_jac=True,
)
# This replace + gradient was the original source of the error in #1550, check that no longer raises
y_star = pytensor.graph_replace(neg_logp, {mu: mu_star})
_ = pt.grad(y_star, [mu, cov, data])
rng = np.random.default_rng()
data_val = rng.normal(size=(d,)).astype(floatX)
L = rng.normal(size=(d, d)).astype(floatX)
cov_val = L @ L.T
mu0_val = rng.normal(size=(d,)).astype(floatX)
fn = pytensor.function([mu, cov, data], [mu_star, success])
_, success_flag = fn(mu0_val, cov_val, data_val)
assert success_flag
def min_fn(mu, cov, data):
mu_star, _ = minimize(
objective=objective(mu, cov, data),
x=mu,
method="BFGS",
jac=True,
hess=False,
use_vectorized_jac=True,
)
return mu_star.sum()
utt.verify_grad(
min_fn, [mu0_val, cov_val, data_val], eps=1e-3 if floatX == "float32" else 1e-6
)
@pytest.mark.parametrize(
"method, jac, hess",
[("secant", False, False), ("newton", True, False), ("halley", True, True)],
......@@ -224,6 +365,39 @@ def test_root_system_of_equations():
)
def test_root_system_multiple_inputs():
# Example problem from Notes and Problems from Applied General Equilibrium Economics, Chapter 3
variables = v1, v2 = [pt.dscalar(name) for name in ["v1", "v2"]]
v3 = pt.dscalar("v3")
equations = pt.stack([v1**2 * v3 - 1, v1 + v2 - 2])
def f_analytic(v3):
v1 = 1 / np.sqrt(v3)
v2 = 2 - v1
return np.array([v1, v2])
solution, success = root(equations=equations, variables=variables)
fn = pytensor.function([v1, v2, v3], [*solution, success])
v1_val = 1.0
v2_val = 1.0
v3_val = 1.0
*solution_vals, success_flag = fn(v1_val, v2_val, v3_val)
assert success_flag
np.testing.assert_allclose(np.array(solution_vals), f_analytic(v3_val))
def root_fn(v1, v2, v3):
equations = pt.stack([v1**2 * v3 - 1, v1 + v2 - 2])
[v1_solution, v2_solution], _ = root(equations=equations, variables=[v1, v2])
return v1_solution + v2_solution
utt.verify_grad(
root_fn, [v1_val, v2_val, v3_val], eps=1e-6 if floatX == "float64" else 1e-3
)
@pytest.mark.parametrize("optimize_op", (minimize, root))
def test_optimize_0d(optimize_op):
# Scipy vector minimizers upcast 0d x to 1d. We need to work-around this
......@@ -267,7 +441,11 @@ def test_optimize_grad_scalar_arg(optimize_op):
np.testing.assert_allclose(grad_wrt_theta.eval({x: np.pi, theta: np.e}), -1)
@pytest.mark.parametrize("optimize_op", (minimize, minimize_scalar, root, root_scalar))
@pytest.mark.parametrize(
"optimize_op",
(minimize, minimize_scalar, root, root_scalar),
ids=["minimize", "minimize_scalar", "root", "root_scalar"],
)
def test_optimize_grad_disconnected_numerical_inp(optimize_op):
x = scalar("x", dtype="float64")
theta = scalar("theta", dtype="int64")
......@@ -277,13 +455,14 @@ def test_optimize_grad_disconnected_numerical_inp(optimize_op):
# Confirm theta is a direct input to the node
assert x0.owner.inputs[1] is theta
# This should technically raise, but does not right now
grad_wrt_theta = pt.grad(x0, theta, disconnected_inputs="raise")
np.testing.assert_allclose(grad_wrt_theta.eval({x: np.pi, theta: 5}), 0)
with pytest.raises(DisconnectedInputError):
pt.grad(x0, theta, disconnected_inputs="raise")
# This should work even if the previous one raised
grad_wrt_theta = pt.grad(x0, theta, disconnected_inputs="ignore")
np.testing.assert_allclose(grad_wrt_theta.eval({x: np.pi, theta: 5}), 0)
np.testing.assert_allclose(
grad_wrt_theta.eval({x: np.pi, theta: 5}, on_unused_input="ignore"), 0
)
@pytest.mark.parametrize("optimize_op", (minimize, minimize_scalar, root, root_scalar))
......@@ -344,11 +523,73 @@ def test_optimize_grad_disconnected_non_numerical_inp(optimize_op):
np.e,
)
with pytest.raises(NullTypeGradError):
with pytest.raises(DisconnectedInputError):
pt.grad(x_star, str_theta, disconnected_inputs="raise")
# This could be supported, but it is not right now.
with pytest.raises(NullTypeGradError):
_grad_wrt_num_theta = pt.grad(x_star, num_theta, disconnected_inputs="raise")
# np.testing.assert_allclose(grad_wrt_num_theta.eval({x: np.pi, num_theta: np.e, str_theta: ":)"}), -1)
# np.testing.assert_allclose(grad_wrt_num_theta.eval({x: np.pi, num_theta: np.e, str_theta: ":("}), 1)
grad_wrt_num_theta = pt.grad(x_star, num_theta, disconnected_inputs="raise")
np.testing.assert_allclose(
grad_wrt_num_theta.eval({x: np.pi, num_theta: np.e, str_theta: ":)"}), -1
)
np.testing.assert_allclose(
grad_wrt_num_theta.eval({x: np.pi, num_theta: np.e, str_theta: ":("}), 1
)
def test_vectorize_root_gradients():
"""Regression test for https://github.com/pymc-devs/pytensor/issues/1586"""
a, x, y = pt.dscalars("a", "x", "y")
eq_1 = a * x**2 - y - 1
eq_2 = x - a * y**2 + 1
[x_star, y_star], _ = pt.optimize.root(
equations=pt.stack([eq_1, eq_2]),
variables=[x, y],
method="hybr",
optimizer_kwargs={"tol": 1e-8},
)
solution = pt.stack([x_star, y_star])
a_grad = pt.grad(solution.sum(), a)
a_grid = pt.dmatrix("a_grid")
solution_grid, a_grad_grid = pytensor.graph.vectorize_graph(
[solution, a_grad], {a: a_grid}
)
def analytical_roots_and_grad(
a_vals: np.ndarray,
) -> tuple[np.ndarray, np.ndarray]:
a = a_vals
# There are 4 roots to this equation, but we're always starting the optimizer at (1, 1) to control which one
# we get. For a > 0, this is the root with both x and y positive.
solution_grid = np.array(
(
-1 + (np.sqrt(4 * a + 1) + 1) ** 2 / (4 * a),
(np.sqrt(4 * a + 1) + 1) / (2 * a),
)
)
# Derivative of the sum of the two solutions w.r.t. a
dx_da = (np.sqrt(4 * a + 1) + 1) / (a * np.sqrt(4 * a + 1)) + 1 / (
a * np.sqrt(4 * a + 1)
)
dy_da = -((np.sqrt(4 * a + 1) + 1) ** 2) / (4 * a**2) - (
np.sqrt(4 * a + 1) + 1
) / (2 * a**2)
solution_a_grad = dx_da + dy_da
return solution_grid.transpose((1, 2, 0)), solution_a_grad
fn = pytensor.function(
[a_grid, x, y],
[solution_grid, a_grad_grid],
on_unused_input="ignore",
)
a_grid = np.linspace(1, 10, 9).reshape((3, 3))
solution_grid_val, a_grad_grid_val = fn(a_grid=a_grid, x=1.0, y=1.0)
analytical_solution_grid, analytical_a_grad_grid = analytical_roots_and_grad(a_grid)
np.testing.assert_allclose(solution_grid_val, analytical_solution_grid)
np.testing.assert_allclose(a_grad_grid_val, analytical_a_grad_grid)
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论