提交 110e345f authored 作者: Brandon T. Willard's avatar Brandon T. Willard 提交者: Brandon T. Willard

Move utility functions used only in aesara.scan.basic to aesara.scan.basic

上级 cec873e3
......@@ -9,9 +9,9 @@ from aesara.configdefaults import config
from aesara.graph.basic import Constant, Variable, clone_replace, graph_inputs
from aesara.graph.op import get_test_value
from aesara.graph.utils import MissingInputError, TestValueError
from aesara.scan import utils
from aesara.scan.op import Scan, ScanInfo
from aesara.scan.utils import safe_new
from aesara.scan.utils import expand_empty, safe_new, until
from aesara.tensor.basic import get_scalar_constant_value
from aesara.tensor.exceptions import NotScalarConstantError
from aesara.tensor.math import minimum
from aesara.tensor.shape import shape_padleft
......@@ -19,6 +19,146 @@ from aesara.tensor.type import TensorType, integer_dtypes
from aesara.updates import OrderedUpdates
def get_updates_and_outputs(ls):
"""Recognize and order the updates, outputs, and stopping condition for a `Scan`.
WRITEME: what is the type of ls? how is it formatted? if it's not in the
predefined order already, how does this function know how to put it in that
order?
"""
def is_outputs(elem):
if isinstance(elem, (list, tuple)) and all(
[isinstance(x, Variable) for x in elem]
):
return True
if isinstance(elem, Variable):
return True
return False
def is_updates(elem):
if isinstance(elem, dict):
# Make sure the updates will be applied in a deterministic order
return True
# Dictionaries can be given as lists of tuples
if isinstance(elem, (list, tuple)) and all(
[isinstance(x, (list, tuple)) and len(x) == 2 for x in elem]
):
return True
return False
def is_condition(elem):
return isinstance(elem, until)
def _list(x):
if isinstance(x, (list, tuple)):
return list(x)
else:
return [x]
def _filter(x):
"""
Ensure `x` is made only of allowed data types.
Return True iff `x` is made only of lists, tuples, dictionaries, Aesara
variables or `aesara.scan.utils.until` objects.
"""
# Is `x` a container we can iterate on?
iter_on = None
if isinstance(x, list) or isinstance(x, tuple):
iter_on = x
elif isinstance(x, dict):
iter_on = x.items()
if iter_on is not None:
return all(_filter(y) for y in iter_on)
else:
return isinstance(x, Variable) or isinstance(x, until)
if not _filter(ls):
raise ValueError(
"The return value of your scan lambda expression may only be "
"made of lists, tuples, or dictionaries containing Aesara "
"variables (or `aesara.scan.utils.until` objects for "
"conditions). In particular if you need to use constant "
"values, you can use `tensor.constant` to turn them into "
"Aesara variables."
)
if is_outputs(ls):
return None, _list(ls), dict()
if is_updates(ls):
return None, [], dict(ls)
error_msg = (
f"Scan cannot parse the return value of your lambda expression, which is: {ls}"
)
if not isinstance(ls, (list, tuple)):
raise ValueError(error_msg)
ls = list(ls)
deprecation_msg = (
"The return value of the lambda function"
" has been restricted. you have to always return first the"
" outputs (if any), afterwards the updates (if any) and"
" at the end the conclusion"
)
if len(ls) == 2:
if is_outputs(ls[0]):
if is_updates(ls[1]):
return (None, _list(ls[0]), dict(ls[1]))
elif is_condition(ls[1]):
return (ls[1].condition, _list(ls[0]), dict())
else:
raise ValueError(error_msg)
elif is_updates(ls[0]):
if is_outputs(ls[1]):
raise ValueError(deprecation_msg)
elif is_condition(ls[1]):
return (ls[1].condition, [], dict(ls[0]))
else:
raise ValueError(error_msg)
else:
raise ValueError(error_msg)
elif len(ls) == 3:
if is_outputs(ls[0]):
if is_updates(ls[1]):
if is_condition(ls[2]):
return (ls[2].condition, _list(ls[0]), dict(ls[1]))
else:
raise ValueError(error_msg)
else:
raise ValueError(error_msg)
else:
raise ValueError(error_msg)
else:
raise ValueError(error_msg)
def isNaN_or_Inf_or_None(x):
isNone = x is None
try:
isNaN = np.isnan(x)
isInf = np.isinf(x)
isStr = isinstance(x, str)
except Exception:
isNaN = False
isInf = False
isStr = False
if not isNaN and not isInf:
try:
val = get_scalar_constant_value(x)
isInf = np.isinf(val)
isNaN = np.isnan(val)
except Exception:
isNaN = False
isInf = False
if isinstance(x, Constant) and isinstance(x.data, str):
isStr = True
else:
isStr = False
return isNone or isNaN or isInf or isStr
def scan(
fn,
sequences=None,
......@@ -516,7 +656,7 @@ def scan(
for seq in scan_seqs:
lengths_vec.append(seq.shape[0])
if not utils.isNaN_or_Inf_or_None(n_steps):
if not isNaN_or_Inf_or_None(n_steps):
# ^ N_steps should also be considered
lengths_vec.append(at.as_tensor(n_steps))
......@@ -531,7 +671,7 @@ def scan(
# If the user has provided the number of steps, do that regardless ( and
# raise an error if the sequences are not long enough )
if utils.isNaN_or_Inf_or_None(n_steps):
if isNaN_or_Inf_or_None(n_steps):
actual_n_steps = lengths_vec[0]
for contestant in lengths_vec[1:]:
actual_n_steps = minimum(actual_n_steps, contestant)
......@@ -611,7 +751,7 @@ def scan(
# the initial state over. We do this using the expand function
# defined in scan utils
sit_sot_scan_inputs.append(
utils.expand_empty(
expand_empty(
at.unbroadcast(shape_padleft(actual_arg), 0),
actual_n_steps,
)
......@@ -635,7 +775,7 @@ def scan(
mit_sot_tap_array.append(init_out["taps"])
# Sequence
mit_sot_scan_inputs.append(
utils.expand_empty(init_out["initial"][:mintap], actual_n_steps)
expand_empty(init_out["initial"][:mintap], actual_n_steps)
)
if i in return_steps:
......@@ -721,7 +861,7 @@ def scan(
# when we apply the lambda expression we get a mixture of update rules
# and outputs that needs to be separated
condition, outputs, updates = utils.get_updates_and_outputs(fn(*args))
condition, outputs, updates = get_updates_and_outputs(fn(*args))
if condition is not None:
as_while = True
else:
......@@ -844,7 +984,7 @@ def scan(
if isinstance(new_var.type, TensorType):
sit_sot_inner_inputs.append(new_var)
sit_sot_scan_inputs.append(
utils.expand_empty(
expand_empty(
at.unbroadcast(shape_padleft(input.variable), 0),
actual_n_steps,
)
......
......@@ -3,7 +3,6 @@
import copy
import dataclasses
import logging
import warnings
from collections import OrderedDict, namedtuple
from typing import TYPE_CHECKING, Callable, List, Optional, Sequence, Set, Tuple, Union
from typing import cast as type_cast
......@@ -24,7 +23,7 @@ from aesara.graph.basic import (
from aesara.graph.op import get_test_value
from aesara.graph.type import HasDataType
from aesara.graph.utils import TestValueError
from aesara.tensor.basic import AllocEmpty, cast, get_scalar_constant_value
from aesara.tensor.basic import AllocEmpty, cast
from aesara.tensor.subtensor import set_subtensor
from aesara.tensor.var import TensorConstant
......@@ -226,156 +225,6 @@ def traverse(out, x, x_copy, d, visited=None):
return d
def get_updates_and_outputs(ls):
"""
This function tries to recognize the updates OrderedDict, the
list of outputs and the stopping condition returned by the
lambda expression and arrange them in a predefined order.
WRITEME: what is the type of ls? how is it formatted? if it's not in the
predefined order already, how does this function know how to put it in that
order?
"""
def is_outputs(elem):
if isinstance(elem, (list, tuple)) and all(
[isinstance(x, Variable) for x in elem]
):
return True
if isinstance(elem, Variable):
return True
return False
def is_updates(elem):
if isinstance(elem, dict):
# Make sure the updates will be applied in a deterministic order
if not isinstance(elem, OrderedDict) and len(elem) > 1:
warnings.warn(
"Expected OrderedDict or OrderedUpdates, got "
+ str(type(elem))
+ ". This can make your script non-"
"deterministic."
)
return True
# Dictionaries can be given as lists of tuples
if isinstance(elem, (list, tuple)) and all(
[isinstance(x, (list, tuple)) and len(x) == 2 for x in elem]
):
return True
return False
def is_condition(elem):
return isinstance(elem, until)
def _list(x):
if isinstance(x, (list, tuple)):
return list(x)
else:
return [x]
def _filter(x):
"""
Ensure `x` is made only of allowed data types.
Return True iff `x` is made only of lists, tuples, dictionaries, Aesara
variables or `aesara.scan.utils.until` objects.
"""
# Is `x` a container we can iterate on?
iter_on = None
if isinstance(x, list) or isinstance(x, tuple):
iter_on = x
elif isinstance(x, dict):
iter_on = x.items()
if iter_on is not None:
return all(_filter(y) for y in iter_on)
else:
return isinstance(x, Variable) or isinstance(x, until)
if not _filter(ls):
raise ValueError(
"The return value of your scan lambda expression may only be "
"made of lists, tuples, or dictionaries containing Aesara "
"variables (or `aesara.scan.utils.until` objects for "
"conditions). In particular if you need to use constant "
"values, you can use `tensor.constant` to turn them into "
"Aesara variables."
)
if is_outputs(ls):
return None, _list(ls), OrderedDict()
if is_updates(ls):
return None, [], OrderedDict(ls)
error_msg = (
f"Scan cannot parse the return value of your lambda expression, which is: {ls}"
)
if not isinstance(ls, (list, tuple)):
raise ValueError(error_msg)
ls = list(ls)
deprecation_msg = (
"The return value of the lambda function"
" has been restricted. you have to always return first the"
" outputs (if any), afterwards the updates (if any) and"
" at the end the conclusion"
)
if len(ls) == 2:
if is_outputs(ls[0]):
if is_updates(ls[1]):
return (None, _list(ls[0]), OrderedDict(ls[1]))
elif is_condition(ls[1]):
return (ls[1].condition, _list(ls[0]), OrderedDict())
else:
raise ValueError(error_msg)
elif is_updates(ls[0]):
if is_outputs(ls[1]):
raise ValueError(deprecation_msg)
elif is_condition(ls[1]):
return (ls[1].condition, [], OrderedDict(ls[0]))
else:
raise ValueError(error_msg)
else:
raise ValueError(error_msg)
elif len(ls) == 3:
if is_outputs(ls[0]):
if is_updates(ls[1]):
if is_condition(ls[2]):
return (ls[2].condition, _list(ls[0]), OrderedDict(ls[1]))
else:
raise ValueError(error_msg)
else:
raise ValueError(error_msg)
else:
raise ValueError(error_msg)
else:
raise ValueError(error_msg)
def isNaN_or_Inf_or_None(x):
isNone = x is None
try:
isNaN = np.isnan(x)
isInf = np.isinf(x)
isStr = isinstance(x, str)
except Exception:
isNaN = False
isInf = False
isStr = False
if not isNaN and not isInf:
try:
val = get_scalar_constant_value(x)
isInf = np.isinf(val)
isNaN = np.isnan(val)
except Exception:
isNaN = False
isInf = False
if isinstance(x, Constant) and isinstance(x.data, str):
isStr = True
else:
isStr = False
return isNone or isNaN or isInf or isStr
def expand_empty(tensor_var, size):
"""
Transforms the shape of a tensor from (d1, d2 ... ) to ( d1+size, d2, ..)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论