Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
P
pytensor
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
图表
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
testgroup
pytensor
Commits
cf860fa6
提交
cf860fa6
authored
2月 19, 2025
作者:
ricardoV94
提交者:
Ricardo Vieira
6月 26, 2025
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Allow building jacobian via vectorization instead of Scan
Also allow arbitrary expression dimensionality
上级
64dfa93e
显示空白字符变更
内嵌
并排
正在显示
3 个修改的文件
包含
196 行增加
和
100 行删除
+196
-100
gradients.rst
doc/tutorial/gradients.rst
+34
-7
gradient.py
pytensor/gradient.py
+61
-43
test_gradient.py
tests/test_gradient.py
+101
-50
没有找到文件。
doc/tutorial/gradients.rst
浏览文件 @
cf860fa6
...
@@ -101,9 +101,12 @@ PyTensor implements the :func:`pytensor.gradient.jacobian` macro that does all
...
@@ -101,9 +101,12 @@ PyTensor implements the :func:`pytensor.gradient.jacobian` macro that does all
that is needed to compute the Jacobian. The following text explains how
that is needed to compute the Jacobian. The following text explains how
to do it manually.
to do it manually.
Using Scan
----------
In order to manually compute the Jacobian of some function ``y`` with
In order to manually compute the Jacobian of some function ``y`` with
respect to some parameter ``x`` we
need to use `scan`. What we
respect to some parameter ``x`` we
can use `scan`.
do is to
loop over the entries in ``y`` and compute the gradient of
In this case, we
loop over the entries in ``y`` and compute the gradient of
``y[i]`` with respect to ``x``.
``y[i]`` with respect to ``x``.
.. note::
.. note::
...
@@ -111,8 +114,7 @@ do is to loop over the entries in ``y`` and compute the gradient of
...
@@ -111,8 +114,7 @@ do is to loop over the entries in ``y`` and compute the gradient of
`scan` is a generic op in PyTensor that allows writing in a symbolic
`scan` is a generic op in PyTensor that allows writing in a symbolic
manner all kinds of recurrent equations. While creating
manner all kinds of recurrent equations. While creating
symbolic loops (and optimizing them for performance) is a hard task,
symbolic loops (and optimizing them for performance) is a hard task,
effort is being done for improving the performance of `scan`. We
efforts are being made to improving the performance of `scan`.
shall return to :ref:`scan<tutloop>` later in this tutorial.
>>> import pytensor
>>> import pytensor
>>> import pytensor.tensor as pt
>>> import pytensor.tensor as pt
...
@@ -124,9 +126,9 @@ do is to loop over the entries in ``y`` and compute the gradient of
...
@@ -124,9 +126,9 @@ do is to loop over the entries in ``y`` and compute the gradient of
array([[ 8., 0.],
array([[ 8., 0.],
[ 0., 8.]])
[ 0., 8.]])
What we do in this code is to generate
a sequence of integers from ``0`` to
This code generates
a sequence of integers from ``0`` to
``y.shape[0]`` using `pt.arange`. Then
we loop
through this sequence, and
``y.shape[0]`` using `pt.arange`. Then
it loops
through this sequence, and
at each step,
we compute
the gradient of element ``y[i]`` with respect to
at each step,
computes
the gradient of element ``y[i]`` with respect to
``x``. `scan` automatically concatenates all these rows, generating a
``x``. `scan` automatically concatenates all these rows, generating a
matrix which corresponds to the Jacobian.
matrix which corresponds to the Jacobian.
...
@@ -139,6 +141,31 @@ matrix which corresponds to the Jacobian.
...
@@ -139,6 +141,31 @@ matrix which corresponds to the Jacobian.
``x`` anymore, while ``y[i]`` still is.
``x`` anymore, while ``y[i]`` still is.
Using automatic vectorization
-----------------------------
An alternative way to build the Jacobian is to vectorize the graph that computes a single row or colum of the jacobian
We can use `Lop` or `Rop` (more about it below) to obtain the row or column of the jacobian and `vectorize_graph`
to vectorize it to the full jacobian matrix.
>>> import pytensor
>>> import pytensor.tensor as pt
>>> from pytensor.gradient import Lop
>>> from pytensor.graph import vectorize_graph
>>> x = pt.dvector('x')
>>> y = x ** 2
>>> row_cotangent = pt.dvector("row_cotangent") # Helper variable, it will be replaced during vectorization
>>> J_row = Lop(y, x, row_cotangent)
>>> J = vectorize_graph(J_row, replace={row_cotangent: pt.eye(x.size)})
>>> f = pytensor.function([x], J)
>>> f([4, 4])
array([[ 8., 0.],
[ 0., 8.]])
This avoids the overhead of scan, at the cost of higher memory usage if the jacobian expression has large intermediate operations.
Also, not all graphs are safely vectorizable (e.g., if different rows require intermediate operations of different sizes).
For these reasons `jacobian` uses scan by default. The behavior can be changed by setting `vectorize=True`.
Computing the Hessian
Computing the Hessian
=====================
=====================
...
...
pytensor/gradient.py
浏览文件 @
cf860fa6
...
@@ -11,7 +11,7 @@ import numpy as np
...
@@ -11,7 +11,7 @@ import numpy as np
import
pytensor
import
pytensor
from
pytensor.compile.ops
import
ViewOp
from
pytensor.compile.ops
import
ViewOp
from
pytensor.configdefaults
import
config
from
pytensor.configdefaults
import
config
from
pytensor.graph
import
utils
from
pytensor.graph
import
utils
,
vectorize_graph
from
pytensor.graph.basic
import
Apply
,
NominalVariable
,
Variable
from
pytensor.graph.basic
import
Apply
,
NominalVariable
,
Variable
from
pytensor.graph.null_type
import
NullType
,
null_type
from
pytensor.graph.null_type
import
NullType
,
null_type
from
pytensor.graph.op
import
get_test_values
from
pytensor.graph.op
import
get_test_values
...
@@ -703,15 +703,15 @@ def grad(
...
@@ -703,15 +703,15 @@ def grad(
grad_dict
[
var
]
=
g_var
grad_dict
[
var
]
=
g_var
def
handle_disconnected
(
var
):
def
handle_disconnected
(
var
):
if
disconnected_inputs
==
"ignore"
:
return
elif
disconnected_inputs
==
"warn"
:
message
=
(
message
=
(
"grad method was asked to compute the gradient "
"grad method was asked to compute the gradient "
"with respect to a variable that is not part of "
"with respect to a variable that is not part of "
"the computational graph of the cost, or is used "
"the computational graph of the cost, or is used "
f
"only by a non-differentiable operator: {var}"
f
"only by a non-differentiable operator: {var}"
)
)
if
disconnected_inputs
==
"ignore"
:
pass
elif
disconnected_inputs
==
"warn"
:
warnings
.
warn
(
message
,
stacklevel
=
2
)
warnings
.
warn
(
message
,
stacklevel
=
2
)
elif
disconnected_inputs
==
"raise"
:
elif
disconnected_inputs
==
"raise"
:
message
=
utils
.
get_variable_trace_string
(
var
)
message
=
utils
.
get_variable_trace_string
(
var
)
...
@@ -2021,13 +2021,19 @@ GradientError: numeric gradient and analytic gradient exceed tolerance:
...
@@ -2021,13 +2021,19 @@ GradientError: numeric gradient and analytic gradient exceed tolerance:
Exception args: {args_msg}"""
Exception args: {args_msg}"""
def
jacobian
(
expression
,
wrt
,
consider_constant
=
None
,
disconnected_inputs
=
"raise"
):
def
jacobian
(
expression
,
wrt
,
consider_constant
=
None
,
disconnected_inputs
=
"raise"
,
vectorize
=
False
,
):
"""
"""
Compute the full Jacobian, row by row.
Compute the full Jacobian, row by row.
Parameters
Parameters
----------
----------
expression :
Vector (1-dimensional) :
class:`~pytensor.graph.basic.Variable`
expression :class:`~pytensor.graph.basic.Variable`
Values that we are differentiating (that we want the Jacobian of)
Values that we are differentiating (that we want the Jacobian of)
wrt : :class:`~pytensor.graph.basic.Variable` or list of Variables
wrt : :class:`~pytensor.graph.basic.Variable` or list of Variables
Term[s] with respect to which we compute the Jacobian
Term[s] with respect to which we compute the Jacobian
...
@@ -2051,18 +2057,18 @@ def jacobian(expression, wrt, consider_constant=None, disconnected_inputs="raise
...
@@ -2051,18 +2057,18 @@ def jacobian(expression, wrt, consider_constant=None, disconnected_inputs="raise
output, then a zero variable is returned. The return value is
output, then a zero variable is returned. The return value is
of same type as `wrt`: a list/tuple or TensorVariable in all cases.
of same type as `wrt`: a list/tuple or TensorVariable in all cases.
"""
"""
from
pytensor.tensor.basic
import
eye
from
pytensor.tensor.extra_ops
import
broadcast_to
if
not
isinstance
(
expression
,
Variable
):
if
not
isinstance
(
expression
,
Variable
):
raise
TypeError
(
"jacobian expects a Variable as `expression`"
)
raise
TypeError
(
"jacobian expects a Variable as `expression`"
)
if
expression
.
ndim
>
1
:
raise
ValueError
(
"jacobian expects a 1 dimensional variable as `expression`."
" If not use flatten to make it a vector"
)
using_list
=
isinstance
(
wrt
,
list
)
using_list
=
isinstance
(
wrt
,
list
)
using_tuple
=
isinstance
(
wrt
,
tuple
)
using_tuple
=
isinstance
(
wrt
,
tuple
)
grad_kwargs
=
{
"consider_constant"
:
consider_constant
,
"disconnected_inputs"
:
disconnected_inputs
,
}
if
isinstance
(
wrt
,
list
|
tuple
):
if
isinstance
(
wrt
,
list
|
tuple
):
wrt
=
list
(
wrt
)
wrt
=
list
(
wrt
)
...
@@ -2070,43 +2076,55 @@ def jacobian(expression, wrt, consider_constant=None, disconnected_inputs="raise
...
@@ -2070,43 +2076,55 @@ def jacobian(expression, wrt, consider_constant=None, disconnected_inputs="raise
wrt
=
[
wrt
]
wrt
=
[
wrt
]
if
all
(
expression
.
type
.
broadcastable
):
if
all
(
expression
.
type
.
broadcastable
):
# expression is just a scalar, use grad
jacobian_matrices
=
grad
(
expression
.
squeeze
(),
wrt
,
**
grad_kwargs
)
return
as_list_or_tuple
(
using_list
,
elif
vectorize
:
using_tuple
,
expression_flat
=
expression
.
ravel
()
grad
(
row_tangent
=
_float_ones_like
(
expression_flat
)
.
type
(
"row_tangent"
)
expression
.
squeeze
(),
jacobian_single_rows
=
Lop
(
expression
.
ravel
(),
wrt
,
row_tangent
,
**
grad_kwargs
)
wrt
,
consider_constant
=
consider_constant
,
n_rows
=
expression_flat
.
size
disconnected_inputs
=
disconnected_inputs
,
jacobian_matrices
=
vectorize_graph
(
),
jacobian_single_rows
,
replace
=
{
row_tangent
:
eye
(
n_rows
,
dtype
=
row_tangent
.
dtype
)},
)
if
disconnected_inputs
!=
"raise"
:
# If the input is disconnected from the cost, `vectorize_graph` has no effect on the respective jacobian
# We have to broadcast the zeros explicitly here
for
i
,
(
jacobian_single_row
,
jacobian_matrix
)
in
enumerate
(
zip
(
jacobian_single_rows
,
jacobian_matrices
,
strict
=
True
)
):
if
jacobian_single_row
.
ndim
==
jacobian_matrix
.
ndim
:
jacobian_matrices
[
i
]
=
broadcast_to
(
jacobian_matrix
,
shape
=
(
n_rows
,
*
jacobian_matrix
.
shape
)
)
)
else
:
def
inner_function
(
*
args
):
def
inner_function
(
*
args
):
idx
=
args
[
0
]
idx
,
expr
,
*
wrt
=
args
expr
=
args
[
1
]
return
grad
(
expr
[
idx
],
wrt
,
**
grad_kwargs
)
rvals
=
[]
for
inp
in
args
[
2
:]:
rval
=
grad
(
expr
[
idx
],
inp
,
consider_constant
=
consider_constant
,
disconnected_inputs
=
disconnected_inputs
,
)
rvals
.
append
(
rval
)
return
rvals
# Computing the gradients does not affect the random seeds on any random
jacobian_matrices
,
updates
=
pytensor
.
scan
(
# generator used n expression (because during computing gradients we are
# just backtracking over old values. (rp Jan 2012 - if anyone has a
# counter example please show me)
jacobs
,
updates
=
pytensor
.
scan
(
inner_function
,
inner_function
,
sequences
=
pytensor
.
tensor
.
arange
(
expression
.
shape
[
0
]),
sequences
=
pytensor
.
tensor
.
arange
(
expression
.
size
),
non_sequences
=
[
expression
,
*
wrt
],
non_sequences
=
[
expression
.
ravel
(),
*
wrt
],
return_list
=
True
,
)
if
updates
:
raise
ValueError
(
"The scan used to build the jacobian matrices returned a list of updates"
)
)
assert
not
updates
,
"Scan has returned a list of updates; this should not happen."
return
as_list_or_tuple
(
using_list
,
using_tuple
,
jacobs
)
if
jacobian_matrices
[
0
]
.
ndim
<
(
expression
.
ndim
+
wrt
[
0
]
.
ndim
):
# There was some raveling or squeezing done prior to getting the jacobians
# Reshape into original shapes
jacobian_matrices
=
[
jac_matrix
.
reshape
((
*
expression
.
shape
,
*
w
.
shape
))
for
jac_matrix
,
w
in
zip
(
jacobian_matrices
,
wrt
,
strict
=
True
)
]
return
as_list_or_tuple
(
using_list
,
using_tuple
,
jacobian_matrices
)
def
hessian
(
cost
,
wrt
,
consider_constant
=
None
,
disconnected_inputs
=
"raise"
):
def
hessian
(
cost
,
wrt
,
consider_constant
=
None
,
disconnected_inputs
=
"raise"
):
...
...
tests/test_gradient.py
浏览文件 @
cf860fa6
...
@@ -4,6 +4,7 @@ from scipy.optimize import rosen_hess_prod
...
@@ -4,6 +4,7 @@ from scipy.optimize import rosen_hess_prod
import
pytensor
import
pytensor
import
pytensor.tensor.basic
as
ptb
import
pytensor.tensor.basic
as
ptb
from
pytensor
import
function
from
pytensor.configdefaults
import
config
from
pytensor.configdefaults
import
config
from
pytensor.gradient
import
(
from
pytensor.gradient
import
(
DisconnectedInputError
,
DisconnectedInputError
,
...
@@ -31,7 +32,7 @@ from pytensor.graph.basic import Apply, graph_inputs
...
@@ -31,7 +32,7 @@ from pytensor.graph.basic import Apply, graph_inputs
from
pytensor.graph.null_type
import
NullType
from
pytensor.graph.null_type
import
NullType
from
pytensor.graph.op
import
Op
from
pytensor.graph.op
import
Op
from
pytensor.scan.op
import
Scan
from
pytensor.scan.op
import
Scan
from
pytensor.tensor.math
import
add
,
dot
,
exp
,
sigmoid
,
sqr
,
tanh
from
pytensor.tensor.math
import
add
,
dot
,
exp
,
outer
,
sigmoid
,
sqr
,
tanh
from
pytensor.tensor.math
import
sum
as
pt_sum
from
pytensor.tensor.math
import
sum
as
pt_sum
from
pytensor.tensor.random
import
RandomStream
from
pytensor.tensor.random
import
RandomStream
from
pytensor.tensor.type
import
(
from
pytensor.tensor.type
import
(
...
@@ -940,36 +941,38 @@ def test_undefined_grad_opt():
...
@@ -940,36 +941,38 @@ def test_undefined_grad_opt():
)
)
def
test_jacobian_vector
():
@pytest.mark.parametrize
(
"vectorize"
,
[
False
,
True
],
ids
=
lambda
x
:
f
"vectorize={x}"
)
class
TestJacobian
:
def
test_jacobian_vector
(
self
,
vectorize
):
x
=
vector
()
x
=
vector
()
y
=
x
*
2
y
=
x
*
2
rng
=
np
.
random
.
default_rng
(
seed
=
utt
.
fetch_seed
())
rng
=
np
.
random
.
default_rng
(
seed
=
utt
.
fetch_seed
())
# test when the jacobian is called with a tensor as wrt
# test when the jacobian is called with a tensor as wrt
Jx
=
jacobian
(
y
,
x
)
Jx
=
jacobian
(
y
,
x
,
vectorize
=
vectorize
)
f
=
pytensor
.
function
([
x
],
Jx
)
f
=
function
([
x
],
Jx
)
vx
=
rng
.
uniform
(
size
=
(
10
,))
.
astype
(
pytensor
.
config
.
floatX
)
vx
=
rng
.
uniform
(
size
=
(
10
,))
.
astype
(
pytensor
.
config
.
floatX
)
assert
np
.
allclose
(
f
(
vx
),
np
.
eye
(
10
)
*
2
)
assert
np
.
allclose
(
f
(
vx
),
np
.
eye
(
10
)
*
2
)
# test when the jacobian is called with a tuple as wrt
# test when the jacobian is called with a tuple as wrt
Jx
=
jacobian
(
y
,
(
x
,)
)
Jx
=
jacobian
(
y
,
(
x
,),
vectorize
=
vectorize
)
assert
isinstance
(
Jx
,
tuple
)
assert
isinstance
(
Jx
,
tuple
)
f
=
pytensor
.
function
([
x
],
Jx
[
0
])
f
=
function
([
x
],
Jx
[
0
])
vx
=
rng
.
uniform
(
size
=
(
10
,))
.
astype
(
pytensor
.
config
.
floatX
)
vx
=
rng
.
uniform
(
size
=
(
10
,))
.
astype
(
pytensor
.
config
.
floatX
)
assert
np
.
allclose
(
f
(
vx
),
np
.
eye
(
10
)
*
2
)
assert
np
.
allclose
(
f
(
vx
),
np
.
eye
(
10
)
*
2
)
# test when the jacobian is called with a list as wrt
# test when the jacobian is called with a list as wrt
Jx
=
jacobian
(
y
,
[
x
]
)
Jx
=
jacobian
(
y
,
[
x
],
vectorize
=
vectorize
)
assert
isinstance
(
Jx
,
list
)
assert
isinstance
(
Jx
,
list
)
f
=
pytensor
.
function
([
x
],
Jx
[
0
])
f
=
function
([
x
],
Jx
[
0
])
vx
=
rng
.
uniform
(
size
=
(
10
,))
.
astype
(
pytensor
.
config
.
floatX
)
vx
=
rng
.
uniform
(
size
=
(
10
,))
.
astype
(
pytensor
.
config
.
floatX
)
assert
np
.
allclose
(
f
(
vx
),
np
.
eye
(
10
)
*
2
)
assert
np
.
allclose
(
f
(
vx
),
np
.
eye
(
10
)
*
2
)
# test when the jacobian is called with a list of two elements
# test when the jacobian is called with a list of two elements
z
=
vector
()
z
=
vector
()
y
=
x
*
z
y
=
x
*
z
Js
=
jacobian
(
y
,
[
x
,
z
]
)
Js
=
jacobian
(
y
,
[
x
,
z
],
vectorize
=
vectorize
)
f
=
pytensor
.
function
([
x
,
z
],
Js
)
f
=
function
([
x
,
z
],
Js
)
vx
=
rng
.
uniform
(
size
=
(
10
,))
.
astype
(
pytensor
.
config
.
floatX
)
vx
=
rng
.
uniform
(
size
=
(
10
,))
.
astype
(
pytensor
.
config
.
floatX
)
vz
=
rng
.
uniform
(
size
=
(
10
,))
.
astype
(
pytensor
.
config
.
floatX
)
vz
=
rng
.
uniform
(
size
=
(
10
,))
.
astype
(
pytensor
.
config
.
floatX
)
vJs
=
f
(
vx
,
vz
)
vJs
=
f
(
vx
,
vz
)
...
@@ -980,8 +983,7 @@ def test_jacobian_vector():
...
@@ -980,8 +983,7 @@ def test_jacobian_vector():
assert
np
.
allclose
(
vJs
[
0
],
evz
)
assert
np
.
allclose
(
vJs
[
0
],
evz
)
assert
np
.
allclose
(
vJs
[
1
],
evx
)
assert
np
.
allclose
(
vJs
[
1
],
evx
)
def
test_jacobian_matrix
(
self
,
vectorize
):
def
test_jacobian_matrix
():
x
=
matrix
()
x
=
matrix
()
y
=
2
*
x
.
sum
(
axis
=
0
)
y
=
2
*
x
.
sum
(
axis
=
0
)
rng
=
np
.
random
.
default_rng
(
seed
=
utt
.
fetch_seed
())
rng
=
np
.
random
.
default_rng
(
seed
=
utt
.
fetch_seed
())
...
@@ -990,30 +992,30 @@ def test_jacobian_matrix():
...
@@ -990,30 +992,30 @@ def test_jacobian_matrix():
ev
[
dx
,
:,
dx
]
=
2.0
ev
[
dx
,
:,
dx
]
=
2.0
# test when the jacobian is called with a tensor as wrt
# test when the jacobian is called with a tensor as wrt
Jx
=
jacobian
(
y
,
x
)
Jx
=
jacobian
(
y
,
x
,
vectorize
=
vectorize
)
f
=
pytensor
.
function
([
x
],
Jx
)
f
=
function
([
x
],
Jx
)
vx
=
rng
.
uniform
(
size
=
(
10
,
10
))
.
astype
(
pytensor
.
config
.
floatX
)
vx
=
rng
.
uniform
(
size
=
(
10
,
10
))
.
astype
(
pytensor
.
config
.
floatX
)
assert
np
.
allclose
(
f
(
vx
),
ev
)
assert
np
.
allclose
(
f
(
vx
),
ev
)
# test when the jacobian is called with a tuple as wrt
# test when the jacobian is called with a tuple as wrt
Jx
=
jacobian
(
y
,
(
x
,)
)
Jx
=
jacobian
(
y
,
(
x
,),
vectorize
=
vectorize
)
assert
isinstance
(
Jx
,
tuple
)
assert
isinstance
(
Jx
,
tuple
)
f
=
pytensor
.
function
([
x
],
Jx
[
0
])
f
=
function
([
x
],
Jx
[
0
])
vx
=
rng
.
uniform
(
size
=
(
10
,
10
))
.
astype
(
pytensor
.
config
.
floatX
)
vx
=
rng
.
uniform
(
size
=
(
10
,
10
))
.
astype
(
pytensor
.
config
.
floatX
)
assert
np
.
allclose
(
f
(
vx
),
ev
)
assert
np
.
allclose
(
f
(
vx
),
ev
)
# test when the jacobian is called with a list as wrt
# test when the jacobian is called with a list as wrt
Jx
=
jacobian
(
y
,
[
x
]
)
Jx
=
jacobian
(
y
,
[
x
],
vectorize
=
vectorize
)
assert
isinstance
(
Jx
,
list
)
assert
isinstance
(
Jx
,
list
)
f
=
pytensor
.
function
([
x
],
Jx
[
0
])
f
=
function
([
x
],
Jx
[
0
])
vx
=
rng
.
uniform
(
size
=
(
10
,
10
))
.
astype
(
pytensor
.
config
.
floatX
)
vx
=
rng
.
uniform
(
size
=
(
10
,
10
))
.
astype
(
pytensor
.
config
.
floatX
)
assert
np
.
allclose
(
f
(
vx
),
ev
)
assert
np
.
allclose
(
f
(
vx
),
ev
)
# test when the jacobian is called with a list of two elements
# test when the jacobian is called with a list of two elements
z
=
matrix
()
z
=
matrix
()
y
=
(
x
*
z
)
.
sum
(
axis
=
1
)
y
=
(
x
*
z
)
.
sum
(
axis
=
1
)
Js
=
jacobian
(
y
,
[
x
,
z
]
)
Js
=
jacobian
(
y
,
[
x
,
z
],
vectorize
=
vectorize
)
f
=
pytensor
.
function
([
x
,
z
],
Js
)
f
=
function
([
x
,
z
],
Js
)
vx
=
rng
.
uniform
(
size
=
(
10
,
10
))
.
astype
(
pytensor
.
config
.
floatX
)
vx
=
rng
.
uniform
(
size
=
(
10
,
10
))
.
astype
(
pytensor
.
config
.
floatX
)
vz
=
rng
.
uniform
(
size
=
(
10
,
10
))
.
astype
(
pytensor
.
config
.
floatX
)
vz
=
rng
.
uniform
(
size
=
(
10
,
10
))
.
astype
(
pytensor
.
config
.
floatX
)
vJs
=
f
(
vx
,
vz
)
vJs
=
f
(
vx
,
vz
)
...
@@ -1025,21 +1027,20 @@ def test_jacobian_matrix():
...
@@ -1025,21 +1027,20 @@ def test_jacobian_matrix():
assert
np
.
allclose
(
vJs
[
0
],
evz
)
assert
np
.
allclose
(
vJs
[
0
],
evz
)
assert
np
.
allclose
(
vJs
[
1
],
evx
)
assert
np
.
allclose
(
vJs
[
1
],
evx
)
def
test_jacobian_scalar
(
self
,
vectorize
):
def
test_jacobian_scalar
():
x
=
scalar
()
x
=
scalar
()
y
=
x
*
2
y
=
x
*
2
rng
=
np
.
random
.
default_rng
(
seed
=
utt
.
fetch_seed
())
rng
=
np
.
random
.
default_rng
(
seed
=
utt
.
fetch_seed
())
# test when the jacobian is called with a tensor as wrt
# test when the jacobian is called with a tensor as wrt
Jx
=
jacobian
(
y
,
x
)
Jx
=
jacobian
(
y
,
x
,
vectorize
=
vectorize
)
f
=
pytensor
.
function
([
x
],
Jx
)
f
=
function
([
x
],
Jx
)
vx
=
np
.
asarray
(
rng
.
uniform
(),
dtype
=
pytensor
.
config
.
floatX
)
vx
=
np
.
asarray
(
rng
.
uniform
(),
dtype
=
pytensor
.
config
.
floatX
)
assert
np
.
allclose
(
f
(
vx
),
2
)
assert
np
.
allclose
(
f
(
vx
),
2
)
# test when input is a shape (1,) vector -- should still be treated as a scalar
# test when input is a shape (1,) vector -- should still be treated as a scalar
Jx
=
jacobian
(
y
[
None
],
x
)
Jx
=
jacobian
(
y
[
None
],
x
)
f
=
pytensor
.
function
([
x
],
Jx
)
f
=
function
([
x
],
Jx
)
# Ensure we hit the scalar grad case (doesn't use scan)
# Ensure we hit the scalar grad case (doesn't use scan)
nodes
=
f
.
maker
.
fgraph
.
apply_nodes
nodes
=
f
.
maker
.
fgraph
.
apply_nodes
...
@@ -1049,24 +1050,24 @@ def test_jacobian_scalar():
...
@@ -1049,24 +1050,24 @@ def test_jacobian_scalar():
assert
np
.
allclose
(
f
(
vx
),
2
)
assert
np
.
allclose
(
f
(
vx
),
2
)
# test when the jacobian is called with a tuple as wrt
# test when the jacobian is called with a tuple as wrt
Jx
=
jacobian
(
y
,
(
x
,)
)
Jx
=
jacobian
(
y
,
(
x
,),
vectorize
=
vectorize
)
assert
isinstance
(
Jx
,
tuple
)
assert
isinstance
(
Jx
,
tuple
)
f
=
pytensor
.
function
([
x
],
Jx
[
0
])
f
=
function
([
x
],
Jx
[
0
])
vx
=
np
.
asarray
(
rng
.
uniform
(),
dtype
=
pytensor
.
config
.
floatX
)
vx
=
np
.
asarray
(
rng
.
uniform
(),
dtype
=
pytensor
.
config
.
floatX
)
assert
np
.
allclose
(
f
(
vx
),
2
)
assert
np
.
allclose
(
f
(
vx
),
2
)
# test when the jacobian is called with a list as wrt
# test when the jacobian is called with a list as wrt
Jx
=
jacobian
(
y
,
[
x
]
)
Jx
=
jacobian
(
y
,
[
x
],
vectorize
=
vectorize
)
assert
isinstance
(
Jx
,
list
)
assert
isinstance
(
Jx
,
list
)
f
=
pytensor
.
function
([
x
],
Jx
[
0
])
f
=
function
([
x
],
Jx
[
0
])
vx
=
np
.
asarray
(
rng
.
uniform
(),
dtype
=
pytensor
.
config
.
floatX
)
vx
=
np
.
asarray
(
rng
.
uniform
(),
dtype
=
pytensor
.
config
.
floatX
)
assert
np
.
allclose
(
f
(
vx
),
2
)
assert
np
.
allclose
(
f
(
vx
),
2
)
# test when the jacobian is called with a list of two elements
# test when the jacobian is called with a list of two elements
z
=
scalar
()
z
=
scalar
()
y
=
x
*
z
y
=
x
*
z
Jx
=
jacobian
(
y
,
[
x
,
z
]
)
Jx
=
jacobian
(
y
,
[
x
,
z
],
vectorize
=
vectorize
)
f
=
pytensor
.
function
([
x
,
z
],
Jx
)
f
=
function
([
x
,
z
],
Jx
)
vx
=
np
.
asarray
(
rng
.
uniform
(),
dtype
=
pytensor
.
config
.
floatX
)
vx
=
np
.
asarray
(
rng
.
uniform
(),
dtype
=
pytensor
.
config
.
floatX
)
vz
=
np
.
asarray
(
rng
.
uniform
(),
dtype
=
pytensor
.
config
.
floatX
)
vz
=
np
.
asarray
(
rng
.
uniform
(),
dtype
=
pytensor
.
config
.
floatX
)
vJx
=
f
(
vx
,
vz
)
vJx
=
f
(
vx
,
vz
)
...
@@ -1074,6 +1075,74 @@ def test_jacobian_scalar():
...
@@ -1074,6 +1075,74 @@ def test_jacobian_scalar():
assert
np
.
allclose
(
vJx
[
0
],
vz
)
assert
np
.
allclose
(
vJx
[
0
],
vz
)
assert
np
.
allclose
(
vJx
[
1
],
vx
)
assert
np
.
allclose
(
vJx
[
1
],
vx
)
@pytest.mark.parametrize
(
"square_jac"
,
[
False
,
True
])
def
test_jacobian_matrix_expression
(
self
,
vectorize
,
square_jac
):
x
=
vector
(
"x"
,
shape
=
(
3
,))
y
=
outer
(
x
,
x
)
if
not
square_jac
:
y
=
y
[:,
1
:]
Jy_wrt_x
=
jacobian
(
y
,
wrt
=
x
,
vectorize
=
vectorize
)
f
=
function
([
x
],
Jy_wrt_x
)
x_test
=
np
.
arange
(
3
,
dtype
=
x
.
type
.
dtype
)
res
=
f
(
x_test
)
expected_res
=
np
.
array
(
[
# Jy[0]_wrt_x (y[0] = x[0] * x)
[[
0
,
0
,
0
],
[
1
,
0
,
0
],
[
2
,
0
,
0
]],
# Jy[1]_wrt_x (y[1] = x[1] * x)
[
[
1
,
0
,
0
],
[
0
,
2
,
0
],
[
0
,
2
,
1
],
],
# Jy[2]_wrt_x (y[2] = x[2] * x)
[
[
2
,
0
,
0
],
[
0
,
2
,
1
],
[
0
,
0
,
4
],
],
]
)
if
not
square_jac
:
expected_res
=
expected_res
[:,
1
:,
:]
np
.
testing
.
assert_allclose
(
res
,
expected_res
)
def
test_jacobian_disconnected_inputs
(
self
,
vectorize
):
# Test that disconnected inputs are properly handled by jacobian.
s1
=
scalar
(
"s1"
)
s2
=
scalar
(
"s2"
)
jacobian_s
=
jacobian
(
1
+
s1
,
s2
,
disconnected_inputs
=
"ignore"
)
func_s
=
function
([
s2
],
jacobian_s
)
val
=
np
.
array
(
1.0
,
dtype
=
config
.
floatX
)
np
.
testing
.
assert_allclose
(
func_s
(
val
),
np
.
zeros
(
1
))
v1
=
vector
(
"v1"
)
v2
=
vector
(
"v2"
)
jacobian_v
=
jacobian
(
1
+
v1
,
v2
,
disconnected_inputs
=
"ignore"
,
vectorize
=
vectorize
)
func_v
=
function
([
v1
,
v2
],
jacobian_v
,
on_unused_input
=
"ignore"
)
val
=
np
.
arange
(
4.0
,
dtype
=
pytensor
.
config
.
floatX
)
np
.
testing
.
assert_allclose
(
func_v
(
val
,
val
),
np
.
zeros
((
4
,
4
)))
m1
=
matrix
(
"m1"
)
m2
=
matrix
(
"m2"
)
jacobian_m
=
jacobian
(
1
+
m1
[
1
:,
2
:],
m2
,
disconnected_inputs
=
"ignore"
,
vectorize
=
vectorize
)
func_v
=
function
([
m1
,
m2
],
jacobian_m
,
on_unused_input
=
"ignore"
)
val
=
np
.
ones
((
4
,
4
),
dtype
=
config
.
floatX
)
np
.
testing
.
assert_allclose
(
func_v
(
val
,
val
),
np
.
zeros
((
3
,
2
,
4
,
4
)))
def
test_benchmark
(
self
,
vectorize
,
benchmark
):
x
=
vector
(
"x"
,
shape
=
(
3
,))
y
=
outer
(
x
,
x
)
jac_y
=
jacobian
(
y
,
x
,
vectorize
=
vectorize
)
fn
=
function
([
x
],
jac_y
,
trust_input
=
True
)
benchmark
(
fn
,
np
.
array
([
0
,
1
,
2
],
dtype
=
x
.
type
.
dtype
))
def
test_hessian
():
def
test_hessian
():
x
=
vector
()
x
=
vector
()
...
@@ -1084,25 +1153,7 @@ def test_hessian():
...
@@ -1084,25 +1153,7 @@ def test_hessian():
assert
np
.
allclose
(
f
(
vx
),
np
.
eye
(
10
)
*
2
)
assert
np
.
allclose
(
f
(
vx
),
np
.
eye
(
10
)
*
2
)
def
test_jacobian_disconnected_inputs
():
class
TestHessianVectorProduct
:
# Test that disconnected inputs are properly handled by jacobian.
v1
=
vector
()
v2
=
vector
()
jacobian_v
=
pytensor
.
gradient
.
jacobian
(
1
+
v1
,
v2
,
disconnected_inputs
=
"ignore"
)
func_v
=
pytensor
.
function
([
v1
,
v2
],
jacobian_v
)
val
=
np
.
arange
(
4.0
)
.
astype
(
pytensor
.
config
.
floatX
)
assert
np
.
allclose
(
func_v
(
val
,
val
),
np
.
zeros
((
4
,
4
)))
s1
=
scalar
()
s2
=
scalar
()
jacobian_s
=
pytensor
.
gradient
.
jacobian
(
1
+
s1
,
s2
,
disconnected_inputs
=
"ignore"
)
func_s
=
pytensor
.
function
([
s2
],
jacobian_s
)
val
=
np
.
array
(
1.0
)
.
astype
(
pytensor
.
config
.
floatX
)
assert
np
.
allclose
(
func_s
(
val
),
np
.
zeros
(
1
))
class
TestHessianVectorProdudoct
:
def
test_rosen
(
self
):
def
test_rosen
(
self
):
x
=
vector
(
"x"
,
dtype
=
"float64"
)
x
=
vector
(
"x"
,
dtype
=
"float64"
)
rosen
=
(
100
*
(
x
[
1
:]
-
x
[:
-
1
]
**
2
)
**
2
+
(
1
-
x
[:
-
1
])
**
2
)
.
sum
()
rosen
=
(
100
*
(
x
[
1
:]
-
x
[:
-
1
]
**
2
)
**
2
+
(
1
-
x
[:
-
1
])
**
2
)
.
sum
()
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论