提交 1fc6af56 authored 作者: Pascal Lamblin's avatar Pascal Lamblin

Merge pull request #2855 from carriepl/scan_memory_reuse

Scan memory reuse
...@@ -208,6 +208,17 @@ import theano and print the config variable, as in: ...@@ -208,6 +208,17 @@ import theano and print the config variable, as in:
significant speed up on functions with many ops that are fast to significant speed up on functions with many ops that are fast to
execute, but this increases Theano's memory usage. execute, but this increases Theano's memory usage.
.. attribute:: scan.allow_output_prealloc
Bool value, either ``True`` or ``False``
Default: ``True``
This enables, or not, an optimization in Scan in which it tries to
pre-allocate memory for its outputs. Enabling the optimization can
give a significant speed up with Scan at the cost of slightly increased
memory usage.
.. attribute:: openmp .. attribute:: openmp
Bool value: either True or False Bool value: either True or False
......
@@ -5597,7 +5597,7 @@ @@ -5808,7 +5808,7 @@
* cdef list stack * cdef list stack
* cdef int offset * cdef int offset
*/ */
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
__Pyx_INCREF(__pyx_t_4); __Pyx_INCREF(__pyx_t_4);
__pyx_v_descr = ((PyArray_Descr *)__pyx_t_4); __pyx_v_descr = ((PyArray_Descr *)__pyx_t_4);
__pyx_t_4 = 0; __pyx_t_4 = 0;
@@ -7126,7 +7126,7 @@ @@ -7337,7 +7337,7 @@
* arr.base = baseptr * arr.base = baseptr
* *
*/ */
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
/* "numpy.pxd":973 /* "numpy.pxd":973
* baseptr = <PyObject*>base * baseptr = <PyObject*>base
@@ -7135,7 +7135,11 @@ @@ -7346,7 +7346,11 @@
* *
* cdef inline object get_array_base(ndarray arr): * cdef inline object get_array_base(ndarray arr):
*/ */
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
__Pyx_RefNannyFinishContext(); __Pyx_RefNannyFinishContext();
} }
@@ -7161,7 +7165,7 @@ @@ -7376,7 +7376,7 @@
* return None * return None
* else: * else:
*/ */
...@@ -38,7 +38,7 @@ ...@@ -38,7 +38,7 @@
if (__pyx_t_1) { if (__pyx_t_1) {
/* "numpy.pxd":977 /* "numpy.pxd":977
@@ -7185,8 +7189,8 @@ @@ -7400,8 +7404,8 @@
* return <object>arr.base # <<<<<<<<<<<<<< * return <object>arr.base # <<<<<<<<<<<<<<
*/ */
__Pyx_XDECREF(__pyx_r); __Pyx_XDECREF(__pyx_r);
......
...@@ -2,6 +2,48 @@ ...@@ -2,6 +2,48 @@
This module provides the Scan Op This module provides the Scan Op
See scan.py for details on scan See scan.py for details on scan
Memory reuse in scan
--------------------
To reduce the number of memory allocations and copies associated with calling
the inner function and recovering the outputs at every iteration, Scan uses a
memory pre-allocation mechanism for some of its outputs. Instead of repeatedly
calling the inner function and copying the outputs to designated locations,
it tries to make the inner function write the outputs directly to the
designated locations.
This is achieved by initializing, at every iteration, the output storage
of the inner function with references to previously allocated memory. Other
than the code in the Python and Cython backends to do this and to ensure that
the pre-allocated memory has been used, the memory pre-allocation mechanism
relies on the following elements to work properly :
- In make_thunk(), when compiling the inner function, the borrow flag must
be set to False for the inputs. This will prevent aliasing between the
inputs and the outputs of the inner function which could lead to invalid
results.
- In make_thunk(), again, the borrow flag must be set to True for the outputs.
This will make Theano consider the output storages as persistent and make
Theano provide them as pre-allocated storage to the ops that compute the
outputs of the inner function instead of letting these ops allocate their
own output storage.
- The ops that produce the outputs of the inner function must be prevented
from working inplace because if they do, they're not using the pre-allocated
storage. This is achieved by including the optimization
'add_no_output_from_inplace' to the compilation mode used by scan. It
prevents other optimizations from altering the graph such that outputs are
produced by inplace operations.
- The ScanSaveMem optimization, whose goal is to limit the amount of memory
used by scan, needs to allocate buffers large enough to be able, at every
iteration, to simultaneously read the needed previous states and storing
the new states. Before the memory reuse feature, the buffers could be
smaller because, often, Scan only needed buffers large enough to read the
needed previous states. This is because all the outputs of the inner
function were computed before any of them was stored in the buffers. Now,
the outputs are stored as they are computed which means that, if the buffer
is too small, computing an output can overwrite an input that is still
needed to compute another output.
""" """
from __future__ import print_function from __future__ import print_function
...@@ -45,6 +87,11 @@ AddConfigVar('scan.allow_gc', ...@@ -45,6 +87,11 @@ AddConfigVar('scan.allow_gc',
"Allow/disallow gc inside of Scan (default: False)", "Allow/disallow gc inside of Scan (default: False)",
BoolParam(False)) BoolParam(False))
AddConfigVar('scan.allow_output_prealloc',
"Allow/disallow memory preallocation for outputs inside of scan "
"(default: True)",
BoolParam(True))
class Scan(PureOp): class Scan(PureOp):
def __init__(self, def __init__(self,
...@@ -144,11 +191,13 @@ class Scan(PureOp): ...@@ -144,11 +191,13 @@ class Scan(PureOp):
optimizer=mode_instance.provided_optimizer, optimizer=mode_instance.provided_optimizer,
linker=mode_instance.linker.clone(allow_gc=self.allow_gc)) linker=mode_instance.linker.clone(allow_gc=self.allow_gc))
# Now that scan has its mode instance, we activate optimization # Now that scan has its mode instance, if memory pre-allocation is
# activated for the outputs, we activate the optimization
# add_no_output_from_inplace in this mode instance. This will prevent # add_no_output_from_inplace in this mode instance. This will prevent
# Scan from producing outputs by means of inplace operations and # Scan from producing outputs by means of inplace operations and
# therefore allow it to pre-allocate memory storage for the outputs, # therefore allow it to pre-allocate memory storage for the outputs,
# avoiding needless copies. # avoiding needless copies.
if theano.config.scan.allow_output_prealloc:
self.mode_instance = self.mode_instance.including( self.mode_instance = self.mode_instance.including(
"add_no_output_from_inplace") "add_no_output_from_inplace")
...@@ -675,7 +724,14 @@ class Scan(PureOp): ...@@ -675,7 +724,14 @@ class Scan(PureOp):
self.n_mit_sot + self.n_mit_sot +
self.n_sit_sot + self.n_sit_sot +
self.n_nit_sot) self.n_nit_sot)
wrapped_inputs = [Param(x, borrow=True) for x in self.inputs] if theano.config.scan.allow_output_prealloc:
wrapped_inputs = [Param(x, borrow=False) for x in
self.inputs]
wrapped_outputs = [Out(x, borrow=True) for x in
self.outputs[:slices]]
else:
wrapped_inputs = [Param(x, borrow=True) for x in
self.inputs]
wrapped_outputs = [Out(x, borrow=False) for x in wrapped_outputs = [Out(x, borrow=False) for x in
self.outputs[:slices]] self.outputs[:slices]]
wrapped_outputs += self.outputs[slices:] wrapped_outputs += self.outputs[slices:]
...@@ -1022,6 +1078,9 @@ class Scan(PureOp): ...@@ -1022,6 +1078,9 @@ class Scan(PureOp):
other_args = args[offset:] other_args = args[offset:]
input_storage = self.fn.input_storage input_storage = self.fn.input_storage
output_storage = self.fn.output_storage output_storage = self.fn.output_storage
old_output_storage = [None] * len(output_storage)
old_output_data = [None] * len(output_storage)
output_reused = [None] * len(output_storage)
fn = self.fn.fn fn = self.fn.fn
offset = (self.n_seqs + sum(map(len, self.tap_array[:self.n_outs])) + offset = (self.n_seqs + sum(map(len, self.tap_array[:self.n_outs])) +
self.n_shared_outs) self.n_shared_outs)
...@@ -1100,10 +1159,23 @@ class Scan(PureOp): ...@@ -1100,10 +1159,23 @@ class Scan(PureOp):
pdx = offset + self.n_shared_outs pdx = offset + self.n_shared_outs
output_storage[pdx].storage[0] = None output_storage[pdx].storage[0] = None
# 4.5. Keep a reference to the variables currently in the # 4.5. Keep a reference to the variables (ndarrays, CudaNdarrays,
# output_storage to be able to compare them with the actual # etc) currently in the output_storage to be able to compare them
# outputs of the inner function after its execution # with the actual outputs of the inner function after its
old_output_storage = [o.storage[0] for o in output_storage] # execution. Also keep pointers to their data to be able to detect
# cases where outputs reused the allocated object but alter the
# memory region they refer to.
for idx in xrange(len(output_storage)):
var = output_storage[idx].storage[0]
old_output_storage[idx] = var
if hasattr(var, 'gpudata'):
old_output_data[idx] = var.gpudata
elif hasattr(var, 'data'):
old_output_data[idx] = var.data
else:
old_output_data[idx] = None
# 5. compute outputs # 5. compute outputs
t0_fn = time.time() t0_fn = time.time()
...@@ -1136,9 +1208,26 @@ class Scan(PureOp): ...@@ -1136,9 +1208,26 @@ class Scan(PureOp):
# Check which of the pre-allocated outputs (if applicable) have # Check which of the pre-allocated outputs (if applicable) have
# been reused by the inner function # been reused by the inner function
output_reused = [old_output_storage[o] is for idx in xrange(len(output_storage)):
output_storage[o].storage[0] # If the storage map does not contain the same object, then
for o in range(len(output_storage))] # the pre-allocated output has not been reused
new_var = output_storage[idx].storage[0]
if old_output_storage[idx] is new_var:
# The pre-allocated output is only considered as having
# been reused if it still points to the same data as it
# did before the execution of the inner function
if old_output_data[idx] is None:
output_reused[idx] = False
else:
if hasattr(new_var, 'gpudata'):
output_reused[idx] = (new_var.gpudata ==
old_output_data[idx])
elif hasattr(new_var, 'data'):
output_reused[idx] = (new_var.data ==
old_output_data[idx])
else:
output_reused[idx] = False
t_fn += dt_fn t_fn += dt_fn
offset_out = 0 offset_out = 0
......
...@@ -1227,9 +1227,33 @@ class ScanSaveMem(gof.Optimizer): ...@@ -1227,9 +1227,33 @@ class ScanSaveMem(gof.Optimizer):
start = tensor.basic.extract_constant(cf_slice[0]) start = tensor.basic.extract_constant(cf_slice[0])
if start == 0 or store_steps[i] == 0: if start == 0 or store_steps[i] == 0:
store_steps[i] = 0 store_steps[i] = 0
else:
# The "+ 1" is because of the memory pre-allocation
# mechanism used to in the Scan op to reduce overhead.
# To prevent aliasing between the inputs and outputs
# of recurrent states, it requires that the buffer be
# large enough to that, the new state and the oldest
# tap needed don't occupy the sample place in the
# circular buffer. For now, this only needs to be done
# for mitsots and sitsots (because mitmots are not
# currently supported by the mechanism) and only if
# the pre-allocation mechanism is activated.
prealloc_outs = theano.config.scan.allow_output_prealloc
first_mitsot_idx = node.op.n_mit_mot
last_sitsot_idx = (node.op.n_mit_mot +
node.op.n_mit_sot +
node.op.n_sit_sot - 1)
preallocable_output = (i >= first_mitsot_idx and
i <= last_sitsot_idx)
if (prealloc_outs and preallocable_output):
pval = select_max(nw_steps - start + init_l[i],
init_l[i] + 1)
else: else:
pval = select_max(nw_steps - start + init_l[i], pval = select_max(nw_steps - start + init_l[i],
init_l[i]) init_l[i])
if store_steps[i] != -1: if store_steps[i] != -1:
pval = select_max(pval, store_steps[i]) pval = select_max(pval, store_steps[i])
......
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -62,7 +62,7 @@ import copy ...@@ -62,7 +62,7 @@ import copy
def get_version(): def get_version():
return 0.285 return 0.286
@cython.boundscheck(False) @cython.boundscheck(False)
def perform( def perform(
...@@ -255,6 +255,8 @@ def perform( ...@@ -255,6 +255,8 @@ def perform(
other_args = args[offset:] other_args = args[offset:]
input_storage = fnct.input_storage input_storage = fnct.input_storage
output_storage = fnct.output_storage output_storage = fnct.output_storage
old_output_storage = [None] * len_output_storage
old_output_data = [None] * len_output_storage
offset = n_seqs offset = n_seqs
for idx in range(n_outs): for idx in range(n_outs):
offset += tap_array_len[idx] offset += tap_array_len[idx]
...@@ -338,10 +340,23 @@ def perform( ...@@ -338,10 +340,23 @@ def perform(
pdx = offset + n_shared_outs pdx = offset + n_shared_outs
output_storage[<unsigned int>pdx].storage[0] = None output_storage[<unsigned int>pdx].storage[0] = None
# 4.5. Keep a reference to the variables currently in the # 4.5. Keep a reference to the variables (ndarrays, CudaNdarrays,
# output_storage to be able to compare them with the actual # etc) currently in the output_storage to be able to compare them
# outputs of the inner function after its execution # with the actual outputs of the inner function after its
old_output_storage = [o.storage[0] for o in output_storage] # execution. Also keep pointers to their data to be able to detect
# cases where outputs reused the allocated object but alter the
# memory region they refer to.
for idx in range(len_output_storage):
var = output_storage[idx].storage[0]
old_output_storage[idx] = var
if hasattr(var, 'gpudata'):
old_output_data[idx] = var.gpudata
elif hasattr(var, 'data'):
old_output_data[idx] = var.data
else:
old_output_data[idx] = None
# 5. compute outputs # 5. compute outputs
t0_fn = time.time() t0_fn = time.time()
...@@ -366,9 +381,26 @@ def perform( ...@@ -366,9 +381,26 @@ def perform(
# Check which of the pre-allocated outputs (if applicable) have # Check which of the pre-allocated outputs (if applicable) have
# been reused by the inner function # been reused by the inner function
for j in range(len_output_storage): for idx in range(len_output_storage):
output_reused[j] = (old_output_storage[j] is # If the storage map does not contain the same object, then
output_storage[j].storage[0]) # the pre-allocated output has not been reused
new_var = output_storage[idx].storage[0]
if old_output_storage[idx] is new_var:
# The pre-allocated output is only considered as having
# been reused if it still points to the same data as it
# did before the execution of the inner function
if old_output_data[idx] is None:
output_reused[idx] = False
else:
if hasattr(new_var, 'gpudata'):
output_reused[idx] = (new_var.gpudata ==
old_output_data[idx])
elif hasattr(new_var, 'data'):
output_reused[idx] = (new_var.data ==
old_output_data[idx])
else:
output_reused[idx] = False
offset_out = 0 offset_out = 0
# 5.1 Copy over the values for mit_mot outputs # 5.1 Copy over the values for mit_mot outputs
......
...@@ -16,7 +16,7 @@ from theano.gof import cmodule ...@@ -16,7 +16,7 @@ from theano.gof import cmodule
_logger = logging.getLogger('theano.scan_module.scan_perform') _logger = logging.getLogger('theano.scan_module.scan_perform')
version = 0.285 # must match constant returned in function get_version() version = 0.286 # must match constant returned in function get_version()
need_reload = False need_reload = False
......
...@@ -2976,7 +2976,17 @@ class T_Scan(unittest.TestCase): ...@@ -2976,7 +2976,17 @@ class T_Scan(unittest.TestCase):
assert scan_nodes is not None assert scan_nodes is not None
scan_node = scan_nodes[0] scan_node = scan_nodes[0]
f1 = theano.function(inputs, scan_node.inputs[2]) f1 = theano.function(inputs, scan_node.inputs[2])
# Originally, the shape would have been 1 due to the SaveMem
# optimization reducing the size to the number of taps (in this case
# 1) provided to the inner function. Now, because of the memory-reuse
# feature in Scan it can be 2 because SaveMem needs to keep a
# larger buffer to avoid aliasing between the inputs and the outputs.
if theano.config.scan.allow_output_prealloc:
assert f1().shape[0] == 2
else:
assert f1().shape[0] == 1 assert f1().shape[0] == 1
gx = theano.tensor.grad(o, x) gx = theano.tensor.grad(o, x)
f2 = theano.function([], gx) f2 = theano.function([], gx)
utt.assert_allclose(f2(), numpy.ones((10,))) utt.assert_allclose(f2(), numpy.ones((10,)))
...@@ -2999,7 +3009,17 @@ class T_Scan(unittest.TestCase): ...@@ -2999,7 +3009,17 @@ class T_Scan(unittest.TestCase):
assert scan_nodes is not None assert scan_nodes is not None
scan_node = scan_nodes[0] scan_node = scan_nodes[0]
f1 = theano.function(inputs, scan_node.inputs[2]) f1 = theano.function(inputs, scan_node.inputs[2])
# Originally, the shape would have been 1 due to the SaveMem
# optimization reducing the size to the number of taps (in this case
# 1) provided to the inner function. Now, because of the memory-reuse
# feature in Scan it can be 2 because SaveMem needs to keep a
# larger buffer to avoid aliasing between the inputs and the outputs.
if theano.config.scan.allow_output_prealloc:
assert f1().shape[0] == 2
else:
assert f1().shape[0] == 1 assert f1().shape[0] == 1
gx = theano.tensor.grad(o, x) gx = theano.tensor.grad(o, x)
f2 = theano.function([], gx) f2 = theano.function([], gx)
utt.assert_allclose(f2(), numpy.ones((10,))) utt.assert_allclose(f2(), numpy.ones((10,)))
...@@ -3023,7 +3043,17 @@ class T_Scan(unittest.TestCase): ...@@ -3023,7 +3043,17 @@ class T_Scan(unittest.TestCase):
assert scan_nodes is not None assert scan_nodes is not None
scan_node = scan_nodes[0] scan_node = scan_nodes[0]
f1 = theano.function(inputs, scan_node.inputs[2]) f1 = theano.function(inputs, scan_node.inputs[2])
# Originally, the shape would have been 1 due to the SaveMem
# optimization reducing the size to the number of taps (in this case
# 1) provided to the inner function. Now, because of the memory-reuse
# feature in Scan it can be 2 because SaveMem needs to keep a
# larger buffer to avoid aliasing between the inputs and the outputs.
if theano.config.scan.allow_output_prealloc:
assert f1().shape[0] == 2
else:
assert f1().shape[0] == 1 assert f1().shape[0] == 1
gx = theano.tensor.grad(o, x) gx = theano.tensor.grad(o, x)
f2 = theano.function([], gx) f2 = theano.function([], gx)
utt.assert_allclose(f2(), numpy.ones((10,))) utt.assert_allclose(f2(), numpy.ones((10,)))
...@@ -3918,6 +3948,87 @@ class T_Scan(unittest.TestCase): ...@@ -3918,6 +3948,87 @@ class T_Scan(unittest.TestCase):
f = theano.function([seq], results[1], updates=updates) f = theano.function([seq], results[1], updates=updates)
assert numpy.all(exp_out == f(inp)) assert numpy.all(exp_out == f(inp))
def test_memory_reuse_gpudimshuffle(self):
# Test the memory pre-allocation feature in scan when one output is
# the result of a GpuDimshuffle (because an optimization in
# GpuDimshuffle can cause issues with the memory pre-allocation
# where it falsely thinks that a pre-allocated memory region has
# been used when it hasn't).
from theano.sandbox import cuda
if not cuda.cuda_available:
raise SkipTest('Optional package cuda disabled')
def inner_fn(seq1, recurrent_out):
temp = seq1 + recurrent_out.sum()
output1 = temp.dimshuffle(1, 0)
output2 = temp.sum() + recurrent_out
return output1, output2
input1 = theano.tensor.ftensor3()
init = theano.tensor.ftensor3()
outputs_info = [None, init]
out, _ = theano.scan(inner_fn, sequences=[input1],
outputs_info=outputs_info,
mode=mode_with_gpu)
out1 = out[0].flatten()
out2 = out[1].flatten()
fct = theano.function([input1, init], [out1, out2],
mode=mode_with_gpu)
output = fct(numpy.ones((2, 1, 1), dtype="float32"),
numpy.ones((1, 1, 1), dtype="float32"))
expected_output = (numpy.array([2, 4], dtype="float32"),
numpy.array([3, 7], dtype="float32"))
utt.assert_allclose(output, expected_output)
def test_memory_reuse_with_outputs_as_inputs(self):
# Test the memory pre-allocation feature in scan for the following
# cases :
# - An output of the inner graph is also an input of the inner graph
# - An output of the inner graph is not an input in the unoptimized
# graph but it could becomes the case in the optimized graph due to
# the optimizations.
# - An output of the inner graph is obtained through a view op on an
# input of the inner graph and the view op is removed by the
# optimization process
# - An output of the inner graph is obtained through a view op on an
# input of the inner graph and the view op is NOT removed by the
# optimization process
# - An output of the inner graph is not obtained through any of the
# previously mentionned cases (standard case)
def inner_fn(tap_m3, tap_m2, tap_m1):
return (tap_m2, (tap_m1 * 1),
theano.gradient.disconnected_grad(tap_m2),
theano.tensor.opt.assert_(tap_m2, 1),
tap_m3 + tap_m2 + tap_m1)
init = theano.tensor.matrix()
outputs_info = [None, None, None, None,
dict(initial=init, taps=[-3, -2, -1])]
out, _ = theano.scan(inner_fn, outputs_info=outputs_info, n_steps=3)
fct = theano.function([init], out)
# Compare obtained outputs with expected outputs
floatX = theano.config.floatX
outputs = fct(numpy.arange(9, dtype=floatX).reshape(3,3))
states = numpy.array([[0, 1, 2],
[3, 4, 5],
[6, 7, 8],
[9, 12, 15],
[18, 23, 28],
[33, 42, 51]],dtype=floatX)
expected_outputs = [states[1:4], states[2:5], states[1:4],
states[1:4], states[3:6]]
utt.assert_allclose(outputs, expected_outputs)
def test_grad_connectivity_matrix(self): def test_grad_connectivity_matrix(self):
def inner_fn(x_tm1, y_tm1, z_tm1): def inner_fn(x_tm1, y_tm1, z_tm1):
x_tm1.name = 'x' x_tm1.name = 'x'
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论