提交 9f98757c authored 作者: ricardoV94's avatar ricardoV94 提交者: Ricardo Vieira

Implement xtensor.signal.convolve1d

上级 e854b4d2
(libdoc_xtensor_linalg)=
# `xtensor.signal` -- Signal processing operations
```{eval-rst}
.. automodule:: pytensor.xtensor.signal
:members:
```
......@@ -195,7 +195,8 @@ class Convolve1d(AbstractConvolveNd, COp): # type: ignore[misc]
return code
blockwise_convolve_1d = Blockwise(Convolve1d())
_convolve_1d = Convolve1d()
_blockwise_convolve_1d = Blockwise(_convolve_1d)
def convolve1d(
......@@ -235,14 +236,14 @@ def convolve1d(
zeros_right = (in2.shape[-1] - 1) // 2
in1 = join(
-1,
zeros((*in1_batch_shape, zeros_left), dtype=in2.dtype),
zeros((*in1_batch_shape, zeros_left), dtype=in1.dtype),
in1,
zeros((*in1_batch_shape, zeros_right), dtype=in2.dtype),
zeros((*in1_batch_shape, zeros_right), dtype=in1.dtype),
)
mode = "valid"
full_mode = as_scalar(np.bool_(mode == "full"))
return type_cast(TensorVariable, blockwise_convolve_1d(in1, in2, full_mode))
return type_cast(TensorVariable, _blockwise_convolve_1d(in1, in2, full_mode))
class Convolve2d(AbstractConvolveNd, Op): # type: ignore[misc]
......
import warnings
import pytensor.xtensor.rewriting
from pytensor.xtensor import linalg, math, random
from pytensor.xtensor import linalg, math, random, signal
from pytensor.xtensor.math import dot
from pytensor.xtensor.shape import broadcast, concat, full_like, ones_like, zeros_like
from pytensor.xtensor.type import (
......
from typing import Literal
import numpy as np
from pytensor.scalar import as_scalar
from pytensor.tensor import zeros
from pytensor.tensor.signal.conv import _convolve_1d
from pytensor.xtensor.shape import concat
from pytensor.xtensor.type import as_xtensor
from pytensor.xtensor.vectorization import XBlockwise
def convolve1d(
in1,
in2,
mode: Literal["full", "valid", "same"] = "full",
*,
dims: tuple[str, str],
):
"""Convolve two arrays along a single dimension.
Convolve in1 and in2, with the output size determined by the mode argument.
Parameters
----------
in1 : XTensorVariable
First input.
in2 : XTensorVariable
Second input.
mode : {'full', 'valid', 'same'}, optional
A string indicating the size of the output:
- 'full': The output is the full discrete linear convolution of the inputs, with shape (..., N+M-1,).
- 'valid': The output consists only of elements that do not rely on zero-padding, with shape (..., max(N, M) - min(N, M) + 1,).
- 'same': The output is the same size as in1, centered with respect to the 'full' output.
dims: tuple[str, str]
The dimension along which to convolve each of the inputs. Must be unique to each input.
The left dimension will be present in the output.
Returns
-------
out: XTensorVariable
The discrete linear convolution of in1 with in2.
"""
if len(dims) != 2:
raise ValueError(f"Two dims required, got {dims}")
in1_dim, in2_dim = dims
if in1_dim == in2_dim:
raise ValueError(f"The two dims must be unique, got {dims}")
if mode == "same":
# We implement "same" as "valid" with padded `in1`.
in2_core_size = in2.sizes[in2_dim]
zeros_left = as_xtensor(
zeros(in2_core_size // 2, dtype=in1.dtype), dims=(in1_dim,)
)
zeros_right = as_xtensor(
zeros((in2_core_size - 1) // 2, dtype=in1.dtype), dims=(in1_dim,)
)
in1 = concat([zeros_left, in1, zeros_right], dim=in1_dim)
mode = "valid"
elif mode not in {"full", "valid"}:
raise ValueError(f"mode must be one of 'full', 'valid', or 'same', got {mode}")
full_mode = as_scalar(np.bool_(mode == "full"))
xop = XBlockwise(
_convolve_1d,
core_dims=(((in1_dim,), (in2_dim,), ()), ((in1_dim,),)),
signature=_convolve_1d.gufunc_signature,
)
return xop(in1, in2, full_mode)
......@@ -8,6 +8,7 @@ from pytensor.compile import (
register_deep_copy_op_c_code,
register_view_op_c_code,
)
from pytensor.scalar import ScalarType
from pytensor.tensor import (
TensorType,
_as_tensor_variable,
......@@ -35,6 +36,7 @@ from pytensor.graph import Apply, Constant
from pytensor.graph.basic import OptionalApplyType, Variable
from pytensor.graph.type import HasDataType, HasShape, Type
from pytensor.tensor.basic import constant as tensor_constant
from pytensor.tensor.basic import tensor_from_scalar
from pytensor.tensor.variable import TensorConstantSignature, TensorVariable
......@@ -1014,9 +1016,15 @@ def as_xtensor(x, dims: Sequence[str] | None = None, *, name: str | None = None)
"non-scalar TensorVariable cannot be converted to XTensorVariable without dims."
)
return px.basic.xtensor_from_tensor(x, dims=dims, name=name)
elif isinstance(x.type, ScalarType):
if dims is None:
dims = ()
return px.basic.xtensor_from_tensor(
tensor_from_scalar(x), dims=dims, name=name
)
else:
raise TypeError(
"Variable with type {x.type} cannot be converted to XTensorVariable."
f"Variable with type {x.type} cannot be converted to XTensorVariable."
)
try:
return xtensor_constant(x, dims=dims, name=name)
......
......@@ -91,10 +91,21 @@ class XBlockwise(XOp):
f"Wrong number of inputs, expected {len(self.core_dims[0])}, got {len(inputs)}"
)
dims_and_shape = combine_dims_and_shape(inputs)
core_inputs_dims, core_outputs_dims = self.core_dims
core_input_dims_set = set(chain.from_iterable(core_inputs_dims))
# Check no input has a core_dim it shouldn't have
for i, (inp, core_inp_dims) in enumerate(
zip(inputs, core_inputs_dims, strict=True)
):
if invalid_dims := (
set(inp.dims) & (core_input_dims_set - set(core_inp_dims))
):
raise ValueError(
f"Input {i} has invalid core dims {sorted(invalid_dims)}. Allowed: {core_inp_dims}"
)
dims_and_shape = combine_dims_and_shape(inputs)
batch_dims, batch_shape = unzip(
((k, v) for k, v in dims_and_shape.items() if k not in core_input_dims_set),
n=2,
......
import re
from functools import partial
import pytest
import scipy.signal
pytest.importorskip("xarray")
pytestmark = pytest.mark.filterwarnings("error")
from xarray import apply_ufunc
from pytensor.xtensor.signal import convolve1d
from pytensor.xtensor.type import xtensor
from tests.xtensor.util import xr_arange_like, xr_assert_allclose, xr_function
@pytest.mark.parametrize("mode", ("full", "valid", "same"))
def test_convolve_1d(mode):
in1 = xtensor("in1", dims=("batch_a", "time", "batch_b"), shape=(2, 11, 3))
in2 = xtensor("in2", dims=("batch_c", "kernel", "batch_b"), shape=(5, 17, 3))
out = convolve1d(in1, in2, mode=mode, dims=("time", "kernel"))
assert out.type.dims == ("batch_a", "batch_b", "batch_c", "time")
assert out.type.shape == (2, 3, 5, None)
fn = xr_function([in1, in2], out)
in1_test = xr_arange_like(in1)
in2_test = xr_arange_like(in2)
eval_out = fn(in1_test, in2_test)
expected_out = apply_ufunc(
partial(scipy.signal.convolve, mode=mode),
in1_test,
in2_test,
input_core_dims=[("time",), ("kernel",)],
output_core_dims=[("time",)],
exclude_dims={"time"}, # Output time isn't aligned with input
vectorize=True,
)
xr_assert_allclose(eval_out, expected_out)
def test_convolve_1d_invalid():
in1 = xtensor("x", dims=("time", "batch"))
in2 = xtensor("x", dims=("batch", "kernel"))
# Check valid case doesn't raise
convolve1d(in1, in2, dims=("time", "kernel"))
with pytest.raises(ValueError, match=r"mode must be one of .*, got parisian"):
convolve1d(in1, in2, mode="parisian", dims=("time", "kernel"))
with pytest.raises(ValueError, match="Two dims required"):
convolve1d(in1, in2, dims=("time",))
with pytest.raises(ValueError, match="The two dims must be unique"):
convolve1d(in1, in2, dims=("batch", "batch"))
with pytest.raises(
ValueError,
match=re.escape("Input 0 has invalid core dims ['kernel']. Allowed: ('time',)"),
):
convolve1d(in1.rename({"batch": "kernel"}), in2, dims=("time", "kernel"))
with pytest.raises(
ValueError,
match=re.escape("Input 1 has invalid core dims ['time']. Allowed: ('kernel',)"),
):
convolve1d(in1, in2.rename({"batch": "time"}), dims=("time", "kernel"))
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论