partly completed work on scan (have to commit to merge)

上级 0dbd3931
"""Provide Scan and related functions""" """Provide Scan an related funations"""
__docformat__ = 'restructedtext en' __docformat__ = 'restructedtext en'
import traceback import traceback
import numpy import numpy
import theano import theano
from theano.tensor import opt
def scan1_lambda(lmbda, x, u, *other_inputs): from theano import gof
"""Scan function `lmbda` over `x`. from theano.compile import optdb
:param lmbda: symbolic computation of the recursive function 'f' in the scan operation. This '''
will be called with symbolic inputs, and a symbolic output is expected. The type of the TODO : test_gradinet
output should match that of y_{i-1}. test_time_taps
add_class_description -- postponed: re-write/extend
:type lmbda: lambda x_i, y_{i-1}, *other_inputs : y_i '''
:param x: iterable over which to scan class Scan(theano.Op):
"""Scan a function 'fn' over several inputs producing several outputs
:param u: initial value for y_{i-1}
The Scan operation is a multipurpose operation to be used to generate
:param other_inputs: other variables that are inputs to our lambda expression recurrent neural networks. One can understand it as going over the
length of the inputs applying the function:
:returns: lmbda scanned over x, starting at u. (See `Scan1Env`)
(y_1(t),y_2(t),..) = fn(x_1(t),x_2(t),..,y_1(t-1),y_1(t-2),..,y_1(t-k),
y_2(t-1),y_2(t-2),..,w_1,w_2,..)
For example:
All the 'y' are called outputs in this case, while 'x' are called inputs.
.. code-block:: python As one can see, the operation supports multiple inputs and multiple
outputs.For each output several time delays can be used (taps), as well
u = dscalar('u') as some of the outputs can be computed 'inplace' over some of the
c = dscalar('c') inputs. As long as the function 'fn' does not update any of the other
x = dvector('x') parameters (w_1,..) a gradient of this operation is supported.
y = scan_lambda( To use the op first you need to create it specifying the number of
lambda x_i, y_prev, c: (x_i + y_prev) * c, inputs, outputs, inplace outputs, and inputs to be ignored, a
x, u, c) dictionary describing the time taps used, the function that will
be applied recursively and if available the gradient function (or
f = theano.function([x,u, c], y) a symbolic definition of the function and the op will compute the
gradient on its own). Secondly you just call the op with a list of
xval = numpy.asarray([1., 1, 1. , 1, 1]) parameters.
uval = numpy.asarray(2.)
The order of parameters given to the op is very important. The
yval = f(xval, uval, 2.0) following order applies :
assert numpy.all(yval == [2., 6., 14., 30., 62., 126.]) 1) List of inputs that are replaced by outputs which should not be
given by the op to the function fn
""" 2) List of inputs that are replaced by outputs which should be given
by the op to the function fn
# construct the env used in the scan 3) List of output states corresponding to the outputs that are
x_this = x[0].type() computed inplace
y_this = u.type() 4) The other outputs
y_next = lmbda(x_this, y_this, *other_inputs) 5) Other arguments
if y_next.type != u.type:
raise TypeError('type of lambda recursion must match type of y_prev')
env = theano.Env([x_this, y_this] + list(other_inputs), [y_next])
#create a generic constant to hold our env
env_var = theano.Constant(data=env, type=theano.generic)
rval = scan1_env(*([env_var, x,u] + list(other_inputs)))
return rval
class Scan1Env(theano.Op):
"""A Theano loop over one variable
Scan1Env is less general than `Scan` because it permits looping only over one tensor.
Scan1Env is defined to behave like this:
.. code-block:: python
#inputs
x #a tensor with ndim >= 1
u #a tensor that is like a row of y
f #the function to scan over x
y[0] = u
for i in xrange(len(x)):
y[i+1] = f(x[i], y[i])
#outputs
y # a tensor with one more leading-dimensional-slices than x
# each leading-dimensional-slice of which is like u (in terms of shape and dtype)
The Scan1Env Op works by representing `f` symbolically with an `Env`.
:note:
The Op has two outputs: one for the output y, and one for the function compiled from the
Env representation of 'f'.
The second is intended to be a secret output, it is not returned by the
``__call__`` method of this Op.
:todo:
Optimize for the case where y_this is not required to compute y_next.
This makes all the updates possible in parallel, it also makes the `u` argument to
make_node un-necessary.
""" """
@classmethod
def symbolic(cls,(in_args,out_args), n_ins, n_outs,\
n_inplace=0, n_inplace_ignore=0, grad_inplace=0,taps={}):
destroy_map = {} # if in_args is not a list assume it is just a variable and
view_map = {} # convert it to a list (if this is neither the case the code will
mode=None # raise an error somewhere else !)
default_output = 0 if not( type(in_args) in (list,tuple)):
in_args = [in_args]
def make_node(self, env_var, x, u, *other_inputs): # if out_args is not a list assume it is just a variable and
# convert it to a list
inputs = [x,u] + list(other_inputs) if not (type(out_args) in (list,tuple)):
out_args = [out_args]
if hasattr(env_var, 'data'):
env = env_var.data
if len(env.inputs) != len(inputs):
raise ValueError('Scan: Env has wrong number of inputs for scan')
if len(env.outputs) != 1: # Create fn
raise ValueError('Scan: Env has wrong number of outputs for scan') my_fn = theano.function(in_args, out_args)
if env.inputs[0].type != x[0].type: # Create gradient function
raise TypeError('Scan: Env input[0] type must match x[0].type') gy_next = [out_args[0].type()]
g_inputs = theano.tensor.grad(out_args[0],in_args,g_cost=gy_next[-1])
for y_next in out_args[1:] :
gy_next +=[y_next.type()]
g_ls = theano.tensor.grad(y_next,in_args,g_cost=gy_next[-1])
for i in xrange(len(in_args)):
g_inputs[i] += g_ls[i]
if env.inputs[1].type != u.type: g_fn=theano.function(inputs=gy_next+in_args,outputs=g_inputs)
raise TypeError('Scan: Env input[1] type must match u.type')
# create the output type by adding a non-broadcastable dimension to u's type
out_type = theano.tensor.Tensor(dtype=u.dtype,
broadcastable=[False] + list(u.broadcastable))
return theano.Apply(self, [env_var]+inputs, [out_type(), theano.generic()]) return cls(my_fn, g_fn, n_ins, n_outs,\
n_inplace,n_inplace_ignore, grad_inplace,taps)
def grad(self, inputs, (g_y, g_fn)): @classmethod
assert g_fn is None def compiled(cls,fn,n_ins, n_outs,\
n_inplace=0, n_inplace_ignore=0, taps={}):
return cls(fn, None, n_ins, n_outs, \
n_inplace, n_inplace_ignore, taps= taps)
y = self(*inputs)
grads = scan1_grad(g_y, y, *inputs)
# trim off the output used to cache the compiled function
grads_we_care_about = grads[:-1]
return [None] + grads_we_care_about def __init__(self,fn,grad_fn,n_ins,n_outs,
n_inplace=0, n_inplace_ignore=0,
grad_inplace=0,
taps={}, inplace=False):
"""Create an instance of the scan class
def perform(self, node, args, (y_out, fn_out)): :param fn: compiled function that takes you from time step t-1 to t
env, x, u = args[:3] :param grad_fn: gradient of the function applied recursevly
other_args = args[3:]
#compile the env to a function if necessary :param n_ins: number of inputs; in the list of arguments
if fn_out[0] is None: they start from 0 to 'n_ins'
assert len(env.outputs) == 1
fn_out[0] = theano.function(
inputs=env.inputs,
outputs=env.outputs[0],
mode=self.mode)
fn = fn_out[0]
# allocate the output ndarray y :param n_outs: number of outputs; in the list of arguments you
y_shape = (x.shape[0]+1,) + u.shape need to give the initial state of each outputs, this will be from
y = numpy.empty(y_shape, dtype=u.dtype) 'n_ins' to 'n_outs'; each initial state should be a matrix where
the first dimension is time and should be sufficiently large to
cover the time taps.
# do the scan :param n_inplace: indicates the number of outputs that should be
y[0] = u computed inplace; in the list of arguments there will be the first
for i, x_i in enumerate(x): 'n_inplace' outputs in place of the first 'n_inplace' inputs
something = fn(x_i, y[i], *other_args)
y[i+1] = something
# write to storage :param n_inplace_ignore: indicates the number of inputs that are
y_out[0] = y given just to be replaced by the inplace computation and which
should not be given as arguments to the function applied
scan1_env = Scan1Env() recursevly
:param grad_inplace: the number of gradients to be computed in
place of their corresponding inputs
class Scan1EnvGrad(theano.Op): :param taps: a dictionary which for each output index gives
"""Gradient Op for Scan1Env""" a list of what taps it uses; a tap is given as an int,
where x stands for output(t - x); note that a past trace of 1 makes
no sense, since you get that by default
def __init__(self, inplace=False): :param inplace: is used by the optimizer that allows the inplace
self.inplace = inplace computation
"""
if n_ins < 1:
raise ValueError('Scan should iterate over at least on one input')
if n_outs <1:
raise ValueError('Scan should have at least one output')
if (n_inplace > n_ins) or \
(n_inplace > n_outs):
raise ValueError('Number of inline outs should be smaller then'\
'the number of inputs or outputs')
if (grad_inplace <0) or \
(grad_inplace >n_ins+n_outs - n_inplace_ignore):
raise ValueError('Wrong number of gradients to be computed'\
'inplace')
if (n_inplace < 0):
raise ValueError('Number of inplace outputs should be larger '\
'or equal to 0')
if (n_inplace_ignore > n_inplace):
raise ValueError('Number of inputs to ignore should not be '\
'larger than number of inplace outputs')
self.destroy_map = {}
if inplace: if inplace:
self.destroy_map = {1: [3]} for i in xrange(n_inplace):
self.destroy_map.update( {i:[i]} )
def make_node(self, g_y, y, scan_env, x, u, *other_inputs):
return theano.Apply(self, for (k,v) in taps.iteritems():
[g_y, y, scan_env, x, u] + list(other_inputs), if k < 0 or k > n_outs:
[x.type(), u.type()] + [oi.type() for oi in other_inputs] + [theano.generic()]) raise ValueError('Taps dictionary contains wrong key!')
for vi in v:
def get_fn(self, scan_env, grad_storage): if vi < 2:
"""Return the function to compute gradients during a backward scan raise ValueError('Taps dictionary contains wrong values!')
:postcondition: grad_storage[-1][0] == fn self.taps = taps
self.n_ins = n_ins
self.n_outs = n_outs
self.n_inplace = n_inplace
self.inplace = inplace
self.n_inplace_ignore = n_inplace_ignore
self.fn = fn
self.grad_fn = grad_fn
self.grad_inplace = grad_inplace
def make_node(self,*inputs):
"""Create an node for the Scan operation
:param inputs: list of inputs for the operations; they should be
at least 'self.n_ins'+'self.n_outs' arguments; first 'self.n_inplace'
are inputs that are replaced inplace, followed by oter inputs up
to 'self.n_ins'; next 'self.n_outs' are ouputs followed by other
arguments that will be given to the function applied recursevly
""" """
# identify the output storage for our compiled function
fn_storage = grad_storage[-1]
assert isinstance(scan_env, theano.gof.Env)
# skip compilation if it's there
if fn_storage[0] is None:
# compile the grad function by doing symbolic gradient
# on the scan Op's env
y_next = scan_env.outputs[0]
gy_next = y_next.type()
inputs = scan_env.inputs # x_this, y_this, *rest
g_inputs = theano.tensor.grad(y_next, inputs, g_cost=gy_next)
fn_storage[0] = theano.function(
inputs=[gy_next] + inputs,
outputs=g_inputs)
return fn_storage[0] n_args = len(inputs)
min_n_args = self.n_ins+self.n_outs
def perform(self, node, args, grad_storage): if n_args < min_n_args:
err = 'There should be at least '+str(min_n_args)+ 'arguments'
#retrieve (or compute) the gradient function raise ValueError(err)
fn = self.get_fn(args[2], grad_storage)
# Create list of output datatypes
out_types = []
for i in xrange(self.n_ins,self.n_ins+self.n_outs):
out_types += [theano.tensor.Tensor(dtype=inputs[i].dtype,\
broadcastable=list(inputs[i].broadcastable))()]
return theano.Apply(self,inputs, out_types)
def __eq__(self,other):
rval = type(self) == type(other)
if rval:
rval = (self.fn is other.fn) and \
(self.grad_fn is other.grad_fn) and \
(self.n_ins == other.n_ins) and \
(self.n_outs == other.n_outs) and \
(self.n_inplace == other.n_inplace) and \
(self.n_inplace_ignore == other.n_inplace_ignore) and\
(self.inplace == other.inplace) and\
(self.taps == other.taps) and\
(self.grad_inplace == other.grad_inplace)
return rval
#unpack the args def __hash__(self):
(g_y, y) = args[0:2] # hash the taps dictionary
(x, u) = args[3:5] taps_hash = 0
other_args = args[5:] for k,v in self.taps.iteritems():
taps_hash ^= k
for vi in v :
taps_hash ^= vi
return hash(type(self)) ^ \
hash(self.fn) ^ \
hash(self.grad_fn) ^ \
hash(self.n_ins) ^ \
hash(self.n_outs) ^ \
hash(self.n_inplace) ^ \
hash(self.n_inplace_ignore) ^\
hash(self.inplace) ^\
taps_hash ^\
hash(self.grad_inplace)
def grad(self, inputs, g_outs):
if self.grad_fn == None:
print 'Warning! no gradient for the recursive function was given'
return [None for i in inputs]
else:
y = self(*inputs).owner.outputs
# if not( type(y) in (list,tuple)):
# y = [y]
for o,go in zip(y,g_outs):
print o.type
print go.type
assert o.type == go.type
# Construct my gradient class:
gradScan = ScanGrad(self.grad_fn,
self.n_ins- self.n_inplace_ignore, self.n_outs,
self.grad_inplace, self.taps)
args = g_outs[self.n_inplace_ignore:] + y + \
inputs[self.n_inplace_ignore:]
grads = gradScan(*args)
return [None for i in inputs[:self.n_inplace_ignore]]+grads
def perform(self,node,args, outs):
# find number of timesteps, note that a precondition is to have
# atleast one input to iterate over
n_steps = len(args[0])
# check if we deal with a inplace operation
n_inplace = self.n_inplace
n_inplace_ignore = self.n_inplace_ignore
if not self.inplace: #if it was not optimized to work inplace
n_inplace = 0
# check lengths of inputs
for i in xrange(self.n_ins):
if args[i].shape[0] != n_steps:
raise ValueError('All inputs should have n_steps length!')
# check lengths of initial states
for i in xrange(self.n_ins, self.n_ins+self.n_outs):
req_size = 1
if self.taps.has_key(i- self.n_ins):
req_size = max(self.taps[i-self.n_ins])
if len(args[i].shape) == 0:
raise ValueError('Wrong initial state! ')
if args[i].shape[0] < req_size:
raise ValueError('Wrong initial state! ')
# allocate space for the outputs
y = []
# inplace outputs
for i in xrange(n_inplace):
y += [args[i]]
# add outputs
for i in xrange(self.n_ins+n_inplace,self.n_ins+self.n_outs):
y_shape = (n_steps,)+args[i].shape[1:]
y += [numpy.empty(y_shape, dtype = args[i].dtype)]
# iterate
for i in xrange(n_steps):
fn_args = []
# get a time slice of inputs
for j in xrange(n_inplace_ignore, self.n_ins):
fn_args += [args[j][i]]
# get past values of outputs (t-1 + taps)
for j in xrange(self.n_outs):
# get list of taps
ls_taps = [1]
if self.taps.has_key(j):
ls_taps += self.taps[j]
maxVal = max(ls_taps)
for tap_value in ls_taps:
if i - tap_value < 0:
fn_args += [args[j+self.n_ins][maxVal-tap_value+i]]
else:
fn_args += [y[j][i-tap_value]]
# get the none iterable parameters
fn_args += list(args[(self.n_ins+self.n_outs):])
# compute output
something = self.fn(*fn_args)
# update y and inplace outputs
for j in xrange(self.n_outs):
y[j][i] = something[j]
#unpack grad_storage (outputs) # write to storage
gx_out, gu_out = grad_storage[0:2] for i in xrange(self.n_outs):
g_other_storage = grad_storage[2:-1] outs[i][0]=y[i]
assert len(other_args) == len(g_other_storage)
# the algorithm below has to work in-place on g_y,
# so here we just make a copy of it if we can't work
# in-place on the original.
if not self.inplace:
g_y = g_y.copy()
# allocate space to hold the gradient on gx @gof.local_optimizer([None])
gx = numpy.zeros_like(x) def scan_make_inplace(node):
op = node.op
if isinstance(op, Scan) and (not op.inplace) and (op.n_inplace>0):
return Scan(op.fn, op.grad_fn, op.n_ins,\
op.n_outs, op.n_inplace, op.n_inplace_ignore,\
op.grad_inplace,op.taps,inplace=True\
).make_node(*node.inputs).outputs
return False
# allocate space to hold the gradient on the other inputs optdb.register('scan_make_inplace', opt.in2out(scan_make_inplace,\
g_other = [numpy.zeros_like(other) for other in other_args] ignore_newtrees=True), 75, 'fast_run', 'inplace')
# loop backward over the elements of x,
# computing the gradient on several terms:
# - x[i]
# - y[i]
# - other_inputs wrt y[i+1]
for i in xrange(len(x)-1, -1, -1):
#print 'x y gy_next', x[i], y[i], g_y[i+1]
grads = fn(g_y[i+1], x[i], y[i], *other_args)
#gx[i] can be set directly from the computed gradient
gx[i], gy_i = grads[0:2]
# gy_i has to be added to the existing g_y[i]
g_y[i] += gy_i
#now increment the other-input gradient buffers class ScanGrad(theano.Op):
assert len(g_other) == (len(grads)-2) """Gradient Op for Scan"""
for g_arg_buffer, g_arg in zip(g_other, grads[2:]):
g_arg_buffer += g_arg
#write results into storage locations def __init__(self, grad_fn, n_ins, n_outs, grad_inplace=0,
gx_out[0] = gx taps = {},inplace=False):
gu_out[0] = g_y[0] self.grad_fn = grad_fn
assert len(g_other_storage) == len(g_other) self.n_ins = n_ins # number of inputs of Scan op not of Grad Scan !!
for grad_storage, grad in zip(g_other_storage, g_other): self.n_outs = n_outs # number of outs of Scan op not of Grad Scan !!
grad_storage[0] = grad self.grad_inplace = grad_inplace
self.inplace = inplace
self.taps = taps
self.destroy_map = {}
if self.inplace:
for i in xrange(self.grad_inplace):
self.destroy_map.update( {i:[i+n_ins+n_outs]} )
def __eq__(self,other):
rval = type(self) == type(other)
if rval:
rval = (self.grad_fn is other.grad_fn) and \
(self.n_ins == other.n_ins) and \
(self.n_outs == other.n_outs) and \
(self.grad_inplace == other.grad_inplace) and \
(self.inplace == other.inplace) and \
(self.taps == taps)
return rval
scan1_grad = Scan1EnvGrad(inplace=False) def __hash__(self):
scan1_grad_inplace = Scan1EnvGrad(inplace=True) taps_hash = 0
for k,v in self.taps.iteritems():
taps_hash ^= k
for vi in v :
taps_hash ^= vi
return hash(type(self)) ^ \
hash(self.grad_fn) ^ \
hash(self.n_ins) ^ \
hash(self.n_outs) ^ \
hash(self.grad_inplace) ^ \
hash(self.inplace) ^ taps_hash
def make_node(self, *args):
# input of the gradient op :
# |g_outs | y | ins | outs | other_args |
# | n_ins | n_outs | n_ins | n_outs | unknown |
# return
# | grad of ins | grad of outs | grad of other_args|
# | n_ins | n_outs | unknown |
return theano.Apply(self, list(args),
[i.type() for i in args[self.n_ins+self.n_outs:] ])
def perform(self, node, args, storage):
# get scan inputs
inputs = args[self.n_ins+self.n_outs:]
ins = inputs[:self.n_ins]
initSt = inputs[self.n_ins:self.n_ins+self.n_outs]
otherArgs = inputs[self.n_outs+self.n_ins:]
# generate space for gradient
# not do if inplace !?
if not self.inplace:
g_ins = [numpy.zeros_like(k) for k in ins]
g_initSt = [numpy.zeros_like(k) for k in initSt]
else:
if self.grad_inplace > self.n_ins:
g_ins = ins
g_initSt = initSt[:self.grad_inplace-self.n_ins]
g_initSt += [numpy.zeros_like(k) for k in \
initSt[self.grad_inplace-self.n_ins:]]
else:
g_ins = ins[:self.grad_inplace]
g_ins += [numpy.zeros_like(k) for k in \
ins[self.grad_inplace:]]
g_initSt = [numpy.zeros_like(k) for k in initSt]
g_otherArgs = [numpy.zeros_like(k) for k in otherArgs]
# get gradient from above
g_outs = args[:self.n_ins]
# we modify g_outs inplace ..
if not self.inplace:
g_outs = [gout.copy() for gout in g_outs]
# get the output of the scan operation
outs = args[self.n_ins:self.n_ins+self.n_outs]
# diagnostic:
print 'g_outs:' ,g_outs
print 'outs:', outs
print 'ins:', ins
print 'initSt:', initSt
print 'otherArgs:', otherArgs
# go back through time to 0 (use a time window !?)
for i in xrange(len(ins[0])-1,-1,-1):
# time slice of inputs
_ins = [arg[i] for arg in ins]
# time slice of outputs + taps
_outs = []
for j in xrange(self.n_outs):
ls_taps = [1]
if self.taps.has_key(j):
ls_taps += self.taps[j]
maxVal = max(ls_taps)
for tap_value in ls_taps:
if i - tap_value < 0:
_outs += [initSt[j][maxVal-tap_value+i]]
else:
_outs += [outs[j][i- tap_value]]
g_out = [arg[i] for arg in g_outs]
grads=self.grad_fn(g_out,_ins,_outs,otherArgs)
# get gradient for inputs
for j in xrange(self.n_ins):
g_ins[j][i] = grads[j]
# get gradient for outputs
pos = self.n_ins
for j in xrange(self.n_outs):
ls_taps = [1]
if self.taps.has_key(j):
ls_taps += self.taps[j]
maxVal = max(ls_taps)
for tap_value in ls_taps:
if i - tap_value < 0:
g_initSt[maxVal-tap_value+i] = grads[pos]
pos +=1
else:
g_outs[i-tap_value]+= grads[pos]
pos += 1
for j in xrange(len(g_otherArgs)):
g_otherArgs[j] += grads[j+pos]
# return the gradient
for i in xrange(len(g_ins)):
storage[i][0] = g_ins[i]
for i in xrange(len(g_initSt)):
storage[i+self.n_ins][0] = g_initSt[i]
for i in xrange(len(g_otherArgs)):
storage[i+self.n_ins+self.n_outs][0] = g_otherArgs[i]
'''
@gof.local_optimizer([None])
def grad_scan_make_inplace(node):
op = node.op
if isinstance(op, ScanGrad) and (not op.inplace):
return ScanGrad(op.grad_fn, op.n_ins, op.n_outs, op.grad_inplace,
inplace=True).make_node(*node.inputs).outputs
return False
optdb.register('grad_scan_make_inplace', opt.in2out(grad_scan_make_inplace,\
ignore_newtrees=True), 75, 'fast_run', 'inplace')
'''
#TODO: a specialize-phase optimization to swap in scan1_grad_inplace
import numpy from scan import Scan
import unittest
import theano import theano
from theano.tensor import dscalar, dvector, dmatrix
from scan import scan1_lambda
RUN_TESTS = False import random
def run(TF): import numpy.random
def deco(f): from theano.tests import unittest_tools as utt
if TF and RUN_TESTS:
print 'running test', f.__name__ class T_Scan(unittest.TestCase):
f() def setUp(self):
return f if RUN_TESTS else None utt.seed_rng()
return deco x_1 = theano.tensor.dscalar('x_1')
self.my_f = theano.function([x_1],[x_1]) #dummy function
# Naming convention :
# u_1,u_2,.. -> inputs, arrays to iterate over
# x_1,x_2,.. -> outputs at t-1 that are required in the recurrent
# computation
# iu_1,iu_2,.. -> inplace inputs, inputs that are being replaced by
# outputs during computation
# du_1,du_2,.. -> dummy inputs used to do inplace computation, they
# are not passed to my_f
# ix_1,ix_2,.. -> inplace outputs at t-1
# x_1_next,.. -> outputs at t
# ix_1_next,.. -> inplace outputs at time t
# w_1,w_2,.. -> weights, paramters over which scan does not iterate
# my_f -> compiled function that will be applied recurrently
# my_op -> operator class
# final_f -> compiled function that applies the Scan operation
# out_1,.. -> outputs of the Scan operation
###################################################################
def test_numberOfIterableInputs(self):
def t1():
my_op = Scan.compiled(self.my_f,-1,1)
def t2():
my_op = Scan.compiled(self.my_f,0,1)
self.failUnlessRaises(ValueError,t1)
self.failUnlessRaises(ValueError,t2)
###################################################################
def test_numberOfOutputs(self):
def t1():
my_op = Scan.compiled(self.my_f,1,-1)
def t2():
my_op = Scan.compiled(self.my_f,1,0)
self.failUnlessRaises(ValueError,t1)
self.failUnlessRaises(ValueError,t2)
#####################################################################
def test_numberOfInplaceOutputs(self):
def t1():
my_op =Scan.compiled(self.my_f,1,1,n_inplace = -1)
def t2():
my_op =Scan.compiled(self.my_f,1,1,n_inplace = 2)
def t3():
my_op =Scan.compiled(self.my_f,2,1,n_inplace=2)
def t4():
my_op =Scan.compiled(self.my_f,1,2,n_inplace=2)
def t5():
my_op =Scan.compiled(self.my_f,1,1,n_inplace=1,n_inplace_ignore=2)
self.failUnlessRaises(ValueError,t1)
self.failUnlessRaises(ValueError,t2)
self.failUnlessRaises(ValueError,t3)
self.failUnlessRaises(ValueError,t4)
self.failUnlessRaises(ValueError,t5)
#####################################################################
def test_taps(self):
def t1():
my_op = Scan.compiled(self.my_f,1,1, taps={2:[3]})
def t2():
my_op = Scan.compiled(self.my_f,1,2, taps={0:[0]})
def t3():
my_op = Scan.compiled(self.my_f,1,2, taps={0:[1]})
self.failUnlessRaises(ValueError,t1)
self.failUnlessRaises(ValueError,t2)
self.failUnlessRaises(ValueError,t3)
#####################################################################
def test_makeNode(self):
def t1():
######### Test inputs of different lengths
# define the function that is applied recurrently
u_1 = theano.tensor.dscalar('u_1')
u_2 = theano.tensor.dscalar('u_2')
x_1 = theano.tensor.dscalar('x_1')
x_1_next = u_1+u_2*x_1
my_f = theano.function([u_1,u_2,x_1],[x_1_next])
# define the function that applies the scan operation
my_op = Scan.compiled(my_f,2,1)
u_1 = theano.tensor.dvector('u_1')
u_2 = theano.tensor.dvector('u_2')
x_1 = theano.tensor.dvector('x_1')
x_1_next = my_op(u_1,u_2,x_1)
final_f = theano.function([u_1,u_2,x_1],[x_1_next])
# test the function final_f
u_1 = numpy.random.rand(3)
u_2 = numpy.random.rand(2)
x_1 = [numpy.random.rand()]
out = final_f(u_1,u_2,x_1)
def t2():
######### Test function does not return correct number of outputs
# define the function that is applied recurrently
u_1 = theano.tensor.dscalar('u_1')
x_1 = theano.tensor.dscalar('x_1')
x_1_next = u_1 * x_1
my_f = theano.function([u_1,x_1],[x_1_next])
# define the function that applies the scan operation
my_op = Scan.compiled(my_f,1,2)
u_1 = theano.tensor.dvector('u_1')
x_1 = theano.tensor.dvector('x_1')
x_2 = theano.tensor.dvector('x_2')
x_1_next,x_2_next = my_op(u_1,x_1,x_2)
final_f = theano.function([u_1,x_1,x_2],[x_1_next,x_2_next])
#generate data
u_1 = numpy.random.rand(3)
x_1 = [numpy.random.rand()]
x_2 = [numpy.random.rand()]
out_1,out_2 = final_f(u_1,x_1,x_2)
self.failUnlessRaises(ValueError,t1)
self.failUnlessRaises(TypeError,t2)
#####################################################################
def test_generator(self):
# compile my_f
u_1 = theano.tensor.dscalar('u_1') # dummy input,
# required if no inplace is used!
x_1 = theano.tensor.dscalar('x_1')
w_1 = theano.tensor.dscalar('w_1')
x_1_next = x_1*w_1
my_f = theano.function([u_1,x_1,w_1],[x_1_next])
# create operation
my_op = Scan.compiled(my_f,1,1)
u_1 = theano.tensor.dvector('u_1') # dummy input, there is no
#inplace, so output will not be put in place of this u_1!
x_1 = theano.tensor.dvector('x_1')
w_1 = theano.tensor.dscalar('w_1')
x_1_next = my_op(u_1,x_1,w_1)
final_f = theano.function([u_1,x_1,w_1],[x_1_next])
#generate data
x_1 = numpy.ndarray(3) # dummy input, just tells for how many time
# steps to run recursively
out_1 = final_f(x_1,[2],2)
self.failUnless(numpy.all(out_1 == numpy.asarray([4,8,16])))
#####################################################################
def test_generator_inplace_no_ignore(self):
# compile my_f
u_1 = theano.tensor.dscalar('u_1')
x_1 = theano.tensor.dscalar('x_1')
w_1 = theano.tensor.dscalar('w_1')
x_1_next = x_1*w_1
my_f = theano.function([u_1,x_1,w_1],[x_1_next])
# create operation
my_op = Scan.compiled(my_f,1,1,n_inplace=1)
iu_1 = theano.tensor.dvector('iu_1')
ix_1 = theano.tensor.dvector('ix_1')
w_1 = theano.tensor.dscalar('w_1')
ix_1_next= my_op(iu_1,ix_1,w_1)
final_f = theano.function([theano.In(iu_1, mutable=True),ix_1,w_1],
[ix_1_next], mode='FAST_RUN')
#generate data
iu_1 = numpy.ndarray(3)
out_1 = final_f(iu_1,[2],2)
# not concretely implemented yet ..
self.failUnless(numpy.all(out_1 == numpy.asarray([4,8,16])))
self.failUnless(numpy.all(out_1 == iu_1))
#####################################################################
def test_generator_inplace_no_ignore_2states(self):
# compile my_f
u_1 = theano.tensor.dscalar('u_1')
u_2 = theano.tensor.dscalar('u_2')
x_1 = theano.tensor.dscalar('x_1')
x_2 = theano.tensor.dscalar('x_2')
w_1 = theano.tensor.dscalar('w_1')
x_1_next = x_1*w_1
x_2_next = x_2*w_1
my_f = theano.function([u_1,u_2,x_1,x_2,w_1],[x_1_next,x_2_next])
# create operation
my_op = Scan.compiled(my_f,2,2,n_inplace=2)
iu_1 = theano.tensor.dvector('iu_1')
iu_2 = theano.tensor.dvector('iu_2')
ix_1 = theano.tensor.dvector('ix_1')
ix_2 = theano.tensor.dvector('ix_2')
w_1 = theano.tensor.dscalar('w_1')
ix_1_next,ix_2_next= my_op(iu_1,iu_2,ix_1,ix_2,w_1)
final_f = theano.function([theano.In(iu_1, mutable=True),
theano.In(iu_2, mutable=True),ix_1,ix_2,
w_1],[ix_1_next,ix_2_next], mode='FAST_RUN')
#generate data
iu_1 = numpy.ndarray(3)
iu_2 = numpy.ndarray(3)
out_1,out_2 = final_f(iu_1,iu_2,[2],[1],2)
# not concretely implemented yet ..
self.failUnless(numpy.all(out_1 == numpy.asarray([4,8,16])))
self.failUnless(numpy.all(out_1 == iu_1))
self.failUnless(numpy.all(out_2 == numpy.asarray([2,4,8])))
self.failUnless(numpy.all(out_2 == iu_2))
@run(True) #######################################################################
def test_extra_inputs(): def test_generator_inplace(self):
u = dscalar('u') #compile my_f
c = dscalar('c') u_1 = theano.tensor.dscalar('u_1')
x = dvector('x') x_1 = theano.tensor.dscalar('x_1')
x_2 = theano.tensor.dscalar('x_2')
x_1_next = u_1 + x_1
x_2_next = x_1 * x_2
my_f = theano.function([u_1,x_1,x_2],[x_1_next,x_2_next])
# create operation
my_op = Scan.compiled(my_f,2,2,n_inplace=2,n_inplace_ignore=1)
du_1 = theano.tensor.dvector('du_1')
iu_1 = theano.tensor.dvector('iu_1')
ix_1 = theano.tensor.dvector('ix_1')
ix_2 = theano.tensor.dvector('ix_2')
ix_1_next,ix_2_next = my_op(du_1,iu_1,ix_1,ix_2)
final_f=theano.function([theano.In(du_1, mutable = True),
theano.In(iu_1, mutable = True),
ix_1,ix_2],[ix_1_next,ix_2_next],mode='FAST_RUN')
# generate data
du_1 = numpy.asarray([0.,0.,0.])
iu_1 = numpy.asarray([1.,1.,1.])
ix_1 = [1]
ix_2 = [1]
out_1,out_2 = final_f(du_1,iu_1,ix_1,ix_2)
self.failUnless(numpy.all(out_1 == numpy.asarray([2,3,4])))
self.failUnless(numpy.all(out_2 == numpy.asarray([1,2,6])))
self.failUnless(numpy.all(out_1 == du_1))
self.failUnless(numpy.all(out_2 == iu_1))
y = scan1_lambda( #####################################################################
lambda x_i, y_prev, c: (x_i + y_prev) * c, def tets_iterateOnlyOverX(self):
x, u, c) u_1 = theano.tensor.dscalar('u_1')
x_1 = theano.tensor.dscalar('x_1')
x_1_next = u_1*x_1
my_f = theano.function([u_1,x_1],[x_1_next])
my_op = Scan.compiled(my_f,1,1)
u_1 = theano.tensor.dvector('u_1')
x_1 = theano.tensor.dvector('x_1')
x_1_next = my_op(u_1,x_1)
final_f = theano.function([x_1,u_1],[x_1_next])
u_1 = numpy.asarray([2,2,2])
out_1 = final_f(inp,2)
self.failUnless(numpy.all(out_1==numpy.asarray([4,8,16])))
sum_y = theano.tensor.sum(y) #####################################################################
def test_iterateOverSeveralInputs(self):
f = theano.function([x,u, c], y) u_1 = theano.tensor.dscalar('u_1') # input 1
u_2 = theano.tensor.dscalar('u_2') # input 2
x_1 = theano.tensor.dscalar('x_1') # output
x_1_next = (u_1+u_2)*x_1
my_f = theano.function([u_1,u_2,x_1],[x_1_next])
my_op = Scan.compiled(my_f,2,1)
u_1 = theano.tensor.dvector('u_1')
u_2 = theano.tensor.dvector('u_2')
x_1 = theano.tensor.dvector('x_1')
x_1_next = my_op(u_1,u_2,x_1)
final_f = theano.function([u_1,u_2,x_1],[x_1_next])
u_1 = numpy.asarray([1,1,1])
u_2 = numpy.asarray([1,1,1])
x_1 = [2]
out_1 = final_f(u_1,u_2,x_1)
self.failUnless(numpy.all(out_1==numpy.asarray([4,8,16])))
xval = numpy.asarray([1., 1, 1. , 1, 1]) #####################################################################
uval = numpy.asarray(2.) def test_iterateOverSeveralInputsSeveralInplace(self):
iu_1 = theano.tensor.dscalar('iu_1')
u_1 = theano.tensor.dscalar('u_1')
u_2 = theano.tensor.dscalar('u_2')
u_3 = theano.tensor.dscalar('u_3')
u_4 = theano.tensor.dscalar('u_4')
ix_1 = theano.tensor.dscalar('ix_1')
ix_2 = theano.tensor.dscalar('ix_2')
x_1 = theano.tensor.dscalar('x_1')
w_1 = theano.tensor.dscalar('w_1')
ix_1_next = u_3 + u_4
ix_2_next = ix_1 + ix_2
x_1_next = x_1 + u_3 + u_4 + ix_1 + ix_2
my_f = theano.function([iu_1,u_1,u_2,u_3,u_4,ix_1,ix_2,x_1,w_1],\
[ix_1_next,ix_2_next, x_1_next])
my_op = Scan.compiled(my_f,6,3, n_inplace=2,\
n_inplace_ignore=1)
du_1 = theano.tensor.dvector('du_1')
iu_1 = theano.tensor.dvector('iu_1')
u_1 = theano.tensor.dvector('u_1')
u_2 = theano.tensor.dvector('u_2')
u_3 = theano.tensor.dvector('u_3')
u_4 = theano.tensor.dvector('u_4')
x_1 = theano.tensor.dvector('x_1')
ix_1 = theano.tensor.dvector('ix_1')
ix_2 = theano.tensor.dvector('ix_2')
w_1 = theano.tensor.dscalar('w_1')
[ix_1_next,ix_2_next,x_1_next]= \
my_op(du_1,iu_1,u_1,u_2,u_3,u_4,x_1,ix_1,ix_2,w_1)
final_f=theano.function([theano.In(du_1, mutable = True),
theano.In(iu_1, mutable = True),
u_1,u_2,u_3,u_4,ix_1,ix_2,x_1,w_1],
[ix_1_next,ix_2_next,
x_1_next],mode='FAST_RUN')
#generate data
du_1 = numpy.asarray([0.,0.,0.])
iu_1 = numpy.asarray([0.,1.,2.])
u_1 = numpy.asarray([1.,2.,3.])
u_2 = numpy.asarray([1.,1.,1.])
u_3 = numpy.asarray([2.,2.,2.])
u_4 = numpy.asarray([3.,2.,1.])
x_1 = [1.]
ix_1 = [1.]
ix_2 = [1.]
w_1 = 2.
out_1,out_2,out_3 = final_f(du_1,iu_1,u_1,u_2,u_3,u_4,\
ix_1,ix_2,x_1,w_1)
self.failUnless(numpy.all(out_3 == numpy.asarray([8.,19.,33.])))
self.failUnless(numpy.all(out_1 == numpy.asarray([5.,4.,3.])))
self.failUnless(numpy.all(out_2 == numpy.asarray([2.,7.,11.])))
self.failUnless(numpy.all(out_1 == du_1))
self.failUnless(numpy.all(out_2 == iu_1))
yval = f(xval, uval, 2.0)
assert numpy.all(yval == [2., 6., 14., 30., 62., 126.])
#####################################################################
def test_computeInPlaceArguments(self):
u_1 = theano.tensor.dscalar('u_1')
x_1 = theano.tensor.dscalar('x_1')
w_1 = theano.tensor.dscalar('w_1')
x_1_next = u_1*w_1+x_1
my_f = theano.function([u_1,x_1,theano.In(w_1,update=w_1*2)],
[x_1_next])
my_op = Scan.compiled(my_f,1,1)
u_1 = theano.tensor.dvector('u_1')
x_1 = theano.tensor.dvector('x_1')
w_1 = theano.tensor.dscalar('w_1')
x_1_next = my_op(u_1,x_1,w_1)
final_f = theano.function([u_1,x_1,w_1], [x_1_next])
u_1 = [1.,1.,1.]
x_1 = [1.]
w_1 = 1.
out_1 = final_f(u_1,x_1,w_1)
self.failUnless(numpy.all(out_1 == numpy.asarray([2,4,8])))
g_x = theano.tensor.grad(sum_y, x) #####################################################################
g_u = theano.tensor.grad(sum_y, u) def test_timeTaps(self):
u_1 = theano.tensor.dscalar('u_1')
x_1 = theano.tensor.dscalar('x_1')
x_1_t2 = theano.tensor.dscalar('x_1_t2')
x_1_t4 = theano.tensor.dscalar('x_1_t4')
x_1_next = u_1+x_1+x_1_t2+x_1_t4
my_f = theano.function([u_1,x_1,x_1_t2,x_1_t4],[x_1_next])
my_op = Scan.compiled(my_f,1,1,taps={0:[2,4]})
u_1 = theano.tensor.dvector('u_1')
x_1 = theano.tensor.dvector('x_1')
x_1_next = my_op(u_1,x_1)
final_f = theano.function([u_1,x_1],[x_1_next])
u_1 = [1.,1.,1.,1.,1.]
x_1 = [1.,2.,3.,4.]
out_1 = final_f(u_1,x_1)
self.failUnless(numpy.all(out_1==numpy.asarray([9.,16.,29.,50.,89.])))
gf = theano.function([x, u, c], [g_x, g_u])
gxval, guval = gf(xval, uval, 2.0) #####################################################################
def test_constructFunction(self):
u_1 = theano.tensor.dscalar('u_1')
x_1 = theano.tensor.dscalar('x_1')
x_1_next = u_1 + x_1
my_op = Scan.symbolic(([u_1,x_1],x_1_next),1,1)
u_1 = theano.tensor.dvector('u_1')
x_1 = theano.tensor.dvector('x_1')
x_1_next = my_op(u_1,x_1)
final_f = theano.function([u_1,x_1],[x_1_next])
u_1 = [1.,1.,1.]
x_1 = [1.]
out_1 = final_f(u_1,x_1)
self.failUnless(numpy.all(out_1==numpy.asarray([2.,3.,4.])))
#print gxval #####################################################################
#print guval def test_gradSimple(self):
assert numpy.all(gxval == [ 62., 30., 14., 6., 2.]) u_1 = theano.tensor.dscalar('u_1')
assert numpy.all(guval == 63) x_1 = theano.tensor.dscalar('x_1')
x_1_next = u_1*x_1
my_op = Scan.symbolic( ([u_1,x_1],x_1_next), 1,1)
u_1 = theano.tensor.dvector('u_1')
x_1 = theano.tensor.dvector('x_1')
x_1_next = my_op(u_1,x_1)
#final_f = theano.function([u_1,x_1],[x_1_next])
u_1 = [1.,2.,3.]
x_1 = [1.]
@run(True) utt.verify_grad( my_op , [u_1,x_1] )
def test_verify_scan_grad():
def scanxx(x, u, c):
# u = dvector('u')
# c = dvector('c')
# x = dmatrix('x')
y = scan1_lambda(
lambda x_i, y_prev, c: (x_i + y_prev) * c,
x, u, c)
return y
rng = numpy.random.RandomState(456) def test_gradManyInputsManyOutputs(self):
pass
xval = rng.rand(4, 3) def test_gradTimeTaps(self):
uval = rng.rand(3) pass
cval = rng.rand(3)
theano.tensor.verify_grad(scanxx, (xval, uval, cval), rng=rng) def test_gradManyInputsManyOutputsTimeTaps(self):
pass
if __name__ == '__main__':
unittest.main()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论