提交 9d29157d authored 作者: Razvan Pascanu's avatar Razvan Pascanu

PEP8 compatibility

上级 610b1d16
...@@ -24,22 +24,29 @@ from theano.gof import Op, Apply ...@@ -24,22 +24,29 @@ from theano.gof import Op, Apply
Special Op created to test what happens when you have one op that is not Special Op created to test what happens when you have one op that is not
differentiable in the computational graph differentiable in the computational graph
''' '''
class BreakRop(Op): class BreakRop(Op):
""" """
@note: Non-differentiable. @note: Non-differentiable.
""" """
def __hash__(self): def __hash__(self):
return hash(type(self)) return hash(type(self))
def __eq__(self, other): def __eq__(self, other):
return type(self) == type(other) return type(self) == type(other)
def make_node(self, x): def make_node(self, x):
return Apply(self, [x], [x.type()]) return Apply(self, [x], [x.type()])
def perform(self, node, inp, out_): def perform(self, node, inp, out_):
x, = inp x, = inp
out, = out_ out, = out_
out[0] = x out[0] = x
def grad(self, inp, grads): def grad(self, inp, grads):
return [None] return [None]
def R_op(self, inputs, eval_points): def R_op(self, inputs, eval_points):
return [None] return [None]
...@@ -55,12 +62,12 @@ class RopLop_checker(unittest.TestCase): ...@@ -55,12 +62,12 @@ class RopLop_checker(unittest.TestCase):
# computations using scan # computations using scan
self.x = tensor.vector('x') self.x = tensor.vector('x')
self.v = tensor.vector('v') self.v = tensor.vector('v')
self.rng = numpy.random.RandomState(utt.fetch_seed()) self.rng = numpy.random.RandomState(utt.fetch_seed())
self.in_shape = ( 5+self.rng.randint(30),) self.in_shape = (5 + self.rng.randint(30),)
self.mx = tensor.matrix('mx') self.mx = tensor.matrix('mx')
self.mv = tensor.matrix('mv') self.mv = tensor.matrix('mv')
self.mat_in_shape = ( 5 + self.rng.randint(30), self.mat_in_shape = (5 + self.rng.randint(30),
5+self.rng.randint(30)) 5 + self.rng.randint(30))
def check_nondiff_rop(self, y): def check_nondiff_rop(self, y):
""" If you op is not differentiable(so you can't define Rop) """ If you op is not differentiable(so you can't define Rop)
...@@ -94,35 +101,37 @@ class RopLop_checker(unittest.TestCase): ...@@ -94,35 +101,37 @@ class RopLop_checker(unittest.TestCase):
If you want to test an out with an output matrix, add a sum If you want to test an out with an output matrix, add a sum
after the Op you want to test. after the Op you want to test.
""" """
vx = numpy.asarray(self.rng.uniform(size=self.mat_in_shape), theano.config.floatX) vx = numpy.asarray(self.rng.uniform(size=self.mat_in_shape),
vv = numpy.asarray(self.rng.uniform(size=self.mat_in_shape), theano.config.floatX) theano.config.floatX)
vv = numpy.asarray(self.rng.uniform(size=self.mat_in_shape),
theano.config.floatX)
yv = tensor.Rop(y, self.mx, self.mv) yv = tensor.Rop(y, self.mx, self.mv)
rop_f = function([self.mx, self.mv], yv) rop_f = function([self.mx, self.mv], yv)
sy, _ = theano.scan( lambda i,y,x,v: (tensor.grad(y[i],x)*v).sum(), sy, _ = theano.scan(lambda i, y, x, v: \
sequences = tensor.arange(y.shape[0]), (tensor.grad(y[i], x) * v).sum(),
non_sequences = [y,self.mx,self.mv]) sequences=tensor.arange(y.shape[0]),
scan_f = function([self.mx,self.mv], sy) non_sequences=[y, self.mx, self.mv])
scan_f = function([self.mx, self.mv], sy)
v1 = rop_f(vx, vv)
v2 = scan_f(vx, vv)
v1 = rop_f(vx,vv) assert numpy.allclose(v1, v2), ('ROP mismatch: %s %s' % (v1, v2))
v2 = scan_f(vx,vv)
assert numpy.allclose(v1,v2), ('ROP mismatch: %s %s' % (v1, v2)) self.check_nondiff_rop(theano.clone(y,
replace={self.mx: break_op(self.mx)}))
self.check_nondiff_rop( theano.clone(y, vv = numpy.asarray(self.rng.uniform(size=out_shape),
replace={self.mx:break_op(self.mx)})) theano.config.floatX)
vv = numpy.asarray(self.rng.uniform(size=out_shape), theano.config.floatX)
yv = tensor.Lop(y, self.mx, self.v) yv = tensor.Lop(y, self.mx, self.v)
lop_f = function([self.mx, self.v], yv) lop_f = function([self.mx, self.v], yv)
sy = tensor.grad((self.v*y).sum(), self.mx) sy = tensor.grad((self.v * y).sum(), self.mx)
scan_f = function([self.mx, self.v], sy) scan_f = function([self.mx, self.v], sy)
v1 = lop_f(vx, vv)
v1 = lop_f(vx,vv) v2 = scan_f(vx, vv)
v2 = scan_f(vx,vv) assert numpy.allclose(v1, v2), ('LOP mismatch: %s %s' % (v1, v2))
assert numpy.allclose(v1,v2), ('LOP mismatch: %s %s' % (v1, v2))
def check_rop_lop(self, y, out_shape): def check_rop_lop(self, y, out_shape):
""" """
...@@ -131,52 +140,55 @@ class RopLop_checker(unittest.TestCase): ...@@ -131,52 +140,55 @@ class RopLop_checker(unittest.TestCase):
""" """
# TEST ROP # TEST ROP
vx = numpy.asarray(self.rng.uniform(size=self.in_shape), theano.config.floatX) vx = numpy.asarray(self.rng.uniform(size=self.in_shape),
vv = numpy.asarray(self.rng.uniform(size=self.in_shape), theano.config.floatX) theano.config.floatX)
vv = numpy.asarray(self.rng.uniform(size=self.in_shape),
yv = tensor.Rop(y,self.x,self.v) theano.config.floatX)
rop_f = function([self.x,self.v], yv)
J, _ = theano.scan( lambda i,y,x: tensor.grad(y[i],x), yv = tensor.Rop(y, self.x, self.v)
sequences = tensor.arange(y.shape[0]), rop_f = function([self.x, self.v], yv)
non_sequences = [y,self.x]) J, _ = theano.scan(lambda i, y, x: tensor.grad(y[i], x),
sequences=tensor.arange(y.shape[0]),
non_sequences=[y, self.x])
sy = tensor.dot(J, self.v) sy = tensor.dot(J, self.v)
scan_f = function([self.x,self.v], sy) scan_f = function([self.x, self.v], sy)
v1 = rop_f(vx,vv) v1 = rop_f(vx, vv)
v2 = scan_f(vx,vv) v2 = scan_f(vx, vv)
assert numpy.allclose(v1,v2), ('ROP mismatch: %s %s' % (v1, v2)) assert numpy.allclose(v1, v2), ('ROP mismatch: %s %s' % (v1, v2))
self.check_nondiff_rop( theano.clone(y, self.check_nondiff_rop(theano.clone(y,
replace={self.x:break_op(self.x)})) replace={self.x: break_op(self.x)}))
# TEST LOP # TEST LOP
vx = numpy.asarray(self.rng.uniform(size=self.in_shape), theano.config.floatX) vx = numpy.asarray(self.rng.uniform(size=self.in_shape),
vv = numpy.asarray(self.rng.uniform(size=out_shape), theano.config.floatX) theano.config.floatX)
vv = numpy.asarray(self.rng.uniform(size=out_shape),
theano.config.floatX)
yv = tensor.Lop(y,self.x,self.v) yv = tensor.Lop(y, self.x, self.v)
lop_f = function([self.x,self.v], yv) lop_f = function([self.x, self.v], yv)
J, _ = theano.scan( lambda i,y,x: tensor.grad(y[i],x), J, _ = theano.scan(lambda i, y, x: tensor.grad(y[i], x),
sequences = tensor.arange(y.shape[0]), sequences=tensor.arange(y.shape[0]),
non_sequences = [y,self.x]) non_sequences=[y, self.x])
sy = tensor.dot(self.v, J) sy = tensor.dot(self.v, J)
scan_f = function([self.x,self.v], sy) scan_f = function([self.x, self.v], sy)
v1 = lop_f(vx,vv) v1 = lop_f(vx, vv)
v2 = scan_f(vx,vv) v2 = scan_f(vx, vv)
assert numpy.allclose(v1,v2), ('LOP mismatch: %s %s' % (v1, v2)) assert numpy.allclose(v1, v2), ('LOP mismatch: %s %s' % (v1, v2))
class test_RopLop(RopLop_checker): class test_RopLop(RopLop_checker):
def test_shape(self): def test_shape(self):
self.check_nondiff_rop( self.x.shape[0]) self.check_nondiff_rop(self.x.shape[0])
def test_specifyshape(self): def test_specifyshape(self):
self.check_rop_lop(tensor.specify_shape(self.x, self.in_shape), self.check_rop_lop(tensor.specify_shape(self.x, self.in_shape),
self.in_shape) self.in_shape)
def test_max(self): def test_max(self):
## If we call max directly, we will return an CAReduce object ## If we call max directly, we will return an CAReduce object
## and he don't have R_op implemented! ## and he don't have R_op implemented!
...@@ -188,41 +200,38 @@ class test_RopLop(RopLop_checker): ...@@ -188,41 +200,38 @@ class test_RopLop(RopLop_checker):
(self.mat_in_shape[0],)) (self.mat_in_shape[0],))
def test_argmax(self): def test_argmax(self):
self.check_nondiff_rop(tensor.argmax(self.mx,axis=1)) self.check_nondiff_rop(tensor.argmax(self.mx, axis=1))
def test_subtensor(self): def test_subtensor(self):
self.check_rop_lop(self.x[:4], (4,)) self.check_rop_lop(self.x[:4], (4,))
def test_incsubtensor1(self): def test_incsubtensor1(self):
tv = numpy.asarray( self.rng.uniform(size=(3,)), tv = numpy.asarray(self.rng.uniform(size=(3,)),
theano.config.floatX) theano.config.floatX)
t = theano.shared(tv) t = theano.shared(tv)
out = tensor.inc_subtensor(self.x[:3], t) out = tensor.inc_subtensor(self.x[:3], t)
self.check_rop_lop(out, self.in_shape) self.check_rop_lop(out, self.in_shape)
def test_incsubtensor2(self): def test_incsubtensor2(self):
tv = numpy.asarray( self.rng.uniform(size=(10,)), tv = numpy.asarray(self.rng.uniform(size=(10,)),
theano.config.floatX) theano.config.floatX)
t = theano.shared(tv) t = theano.shared(tv)
out = tensor.inc_subtensor(t[:4], self.x[:4]) out = tensor.inc_subtensor(t[:4], self.x[:4])
self.check_rop_lop(out, (10,)) self.check_rop_lop(out, (10,))
def test_setsubtensor1(self): def test_setsubtensor1(self):
tv = numpy.asarray( self.rng.uniform(size=(3,)), tv = numpy.asarray(self.rng.uniform(size=(3,)),
theano.config.floatX) theano.config.floatX)
t = theano.shared(tv) t = theano.shared(tv)
out = tensor.set_subtensor(self.x[:3], t) out = tensor.set_subtensor(self.x[:3], t)
self.check_rop_lop(out, self.in_shape) self.check_rop_lop(out, self.in_shape)
def test_print(self): def test_print(self):
out = theano.printing.Print('x',attrs=('shape',))(self.x) out = theano.printing.Print('x', attrs=('shape',))(self.x)
self.check_rop_lop(out, self.in_shape) self.check_rop_lop(out, self.in_shape)
def test_setsubtensor2(self): def test_setsubtensor2(self):
tv = numpy.asarray( self.rng.uniform(size=(10,)), tv = numpy.asarray(self.rng.uniform(size=(10,)),
theano.config.floatX) theano.config.floatX)
t = theano.shared(tv) t = theano.shared(tv)
out = tensor.set_subtensor(t[:4], self.x[:4]) out = tensor.set_subtensor(t[:4], self.x[:4])
...@@ -231,60 +240,55 @@ class test_RopLop(RopLop_checker): ...@@ -231,60 +240,55 @@ class test_RopLop(RopLop_checker):
def test_dimshuffle(self): def test_dimshuffle(self):
# I need the sum, because the setup expects the output to be a # I need the sum, because the setup expects the output to be a
# vector # vector
self.check_rop_lop(self.x[:4].dimshuffle('x',0).sum(axis=0), self.check_rop_lop(self.x[:4].dimshuffle('x', 0).sum(axis=0),
(4,)) (4,))
def test_rebroadcast(self): def test_rebroadcast(self):
# I need the sum, because the setup expects the output to be a # I need the sum, because the setup expects the output to be a
# vector # vector
self.check_rop_lop(tensor.unbroadcast(self.x[:4].dimshuffle('x',0),0).sum(axis=1), self.check_rop_lop(tensor.unbroadcast(
(1,)) self.x[:4].dimshuffle('x', 0), 0).sum(axis=1),
(1,))
def test_join(self): def test_join(self):
tv = numpy.asarray( self.rng.uniform(size=(10,)), tv = numpy.asarray(self.rng.uniform(size=(10,)),
theano.config.floatX) theano.config.floatX)
t = theano.shared(tv) t = theano.shared(tv)
out = tensor.join(0, self.x, t) out = tensor.join(0, self.x, t)
self.check_rop_lop(out, (self.in_shape[0]+10,)) self.check_rop_lop(out, (self.in_shape[0] + 10,))
def test_dot(self): def test_dot(self):
insh = self.in_shape[0] insh = self.in_shape[0]
vW = numpy.asarray(self.rng.uniform(size=(insh,insh)), vW = numpy.asarray(self.rng.uniform(size=(insh, insh)),
theano.config.floatX) theano.config.floatX)
W = theano.shared(vW) W = theano.shared(vW)
self.check_rop_lop( tensor.dot(self.x, W), self.in_shape) self.check_rop_lop(tensor.dot(self.x, W), self.in_shape)
def test_elemwise0(self): def test_elemwise0(self):
self.check_rop_lop( (self.x+1)**2, self.in_shape) self.check_rop_lop((self.x + 1) ** 2, self.in_shape)
def test_elemwise1(self): def test_elemwise1(self):
self.check_rop_lop( self.x+tensor.cast(self.x, 'int32'), self.check_rop_lop(self.x + tensor.cast(self.x, 'int32'),
self.in_shape) self.in_shape)
def test_reshape(self): def test_reshape(self):
new_shape = tensor.constant( numpy.asarray([ new_shape = tensor.constant(numpy.asarray([
self.mat_in_shape[0]*self.mat_in_shape[1]], self.mat_in_shape[0] * self.mat_in_shape[1]],
dtype = 'int64')) dtype='int64'))
self.check_mat_rop_lop(self.mx.reshape(new_shape), self.check_mat_rop_lop(self.mx.reshape(new_shape),
(self.mat_in_shape[0]*self.mat_in_shape[1],)) (self.mat_in_shape[0] * self.mat_in_shape[1],))
def test_flatten(self): def test_flatten(self):
self.check_mat_rop_lop(self.mx.flatten(), self.check_mat_rop_lop(self.mx.flatten(),
(self.mat_in_shape[0]*self.mat_in_shape[1],)) (self.mat_in_shape[0] * self.mat_in_shape[1],))
def test_sum(self): def test_sum(self):
self.check_mat_rop_lop(self.mx.sum(axis=1), (self.mat_in_shape[0],)) self.check_mat_rop_lop(self.mx.sum(axis=1), (self.mat_in_shape[0],))
def test_softmax(self): def test_softmax(self):
# Softmax adds an extra dimnesion ! # Softmax adds an extra dimnesion !
self.check_rop_lop( tensor.nnet.softmax(self.x)[0], self.in_shape[0]) self.check_rop_lop(tensor.nnet.softmax(self.x)[0], self.in_shape[0])
def test_alloc(self): def test_alloc(self):
# Alloc of the sum of x into a vector # Alloc of the sum of x into a vector
...@@ -301,7 +305,7 @@ class test_RopLop(RopLop_checker): ...@@ -301,7 +305,7 @@ class test_RopLop(RopLop_checker):
success = False success = False
try: try:
tensor.Rop(0., [ tensor.matrix() ], [ tensor.vector() ] ) tensor.Rop(0., [tensor.matrix()], [tensor.vector()])
success = True success = True
except ValueError: except ValueError:
pass pass
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论