提交 7430083d authored 作者: James Bergstra's avatar James Bergstra

more tests for compile

上级 69c9a512
...@@ -345,11 +345,99 @@ class T_function(unittest.TestCase): ...@@ -345,11 +345,99 @@ class T_function(unittest.TestCase):
checkfor(self, lambda:function([In(a,name=[])],[]), UnhashableName) checkfor(self, lambda:function([In(a,name=[])],[]), UnhashableName)
f = function([In(a,name=set(['adsf',())], value=1.0), f = function([In(a,name=set(['adsf',()]), value=1.0),
In(x,name=(), value=2.0), In(x,name=(), value=2.0),
In(s,name=T.scalar(), value=3.0)], a+x+s) In(s,name=T.scalar(), value=3.0)], a+x+s)
def test_copy(self):
a = T.scalar() # the a is for 'anonymous' (un-named).
x,s = T.scalars('xs')
f = function([x, In(a, value=1.0,name='a'), In(s, value=0.0, update=s+a*x, mutable=True)], s+a*x)
g = copy.copy(f)
g = f.copy()
#if they both return, assume that they return equivalent things.
self.failUnless(len(g.container) == 3)
self.failUnless(len(g.state) == 3)
self.failIf(g.container[x] is f.container[x])
self.failIf(g.container[a] is f.container[a])
self.failIf(g.container[s] is f.container[s])
self.failIf(g.state[x] is not f.state[x]) # should not have been copied
self.failIf(g.state[a] is not f.state[a]) # should not have been copied
self.failIf(g.state[s] is f.state[s]) # should have been copied because it is mutable.
self.failUnless(f(2, 1) == g(1)) #they should be in sync, default value should be copied.
self.failUnless(f(2, 1) == g(1)) #they should be in sync, default value should be copied.
f(1,2) # put them out of sync
self.failIf(f(1, 2) == g(1, 2)) #they should be equal anymore.
def test_shared_state0(self):
a = T.scalar() # the a is for 'anonymous' (un-named).
x,s = T.scalars('xs')
f = function([x, In(a, value=1.0,name='a'), In(s, value=0.0, update=s+a*x, mutable=True)], s+a*x)
g = function([x, In(a, value=1.0,name='a'), In(s, value=f.container[s], update=s-a*x, mutable=True)], s+a*x)
f(1, 2)
self.failUnless(f.s == 2)
self.failUnless(g.s == 2)
g(1, 2)
self.failUnless(f.s == 0)
self.failUnless(g.s == 0)
def test_shared_state1(self):
a = T.scalar() # the a is for 'anonymous' (un-named).
x,s = T.scalars('xs')
f = function([x, In(a, value=1.0,name='a'), In(s, value=0.0, update=s+a*x, mutable=True)], s+a*x)
g = function([x, In(a, value=1.0,name='a'), In(s, value=99.0, update=s-a*x, mutable=True)], s+a*x)
g.container[s] = f.container[s]
f(1, 2)
self.failUnless(f.s == 2)
self.failUnless(g.s == 2)
g(1, 2)
self.failUnless(f.s == 0)
self.failUnless(g.s == 0)
def test_autoname(self):
raise NotImplementedError()
def test_modes(self):
raise NotImplementedError()
def test_modes_duallinker(self):
a = T.scalar() # the a is for 'anonymous' (un-named).
x,s = T.scalars('xs')
f = function([x, In(a, value=1.0,name='a'), In(s, value=0.0, update=s+a*x, mutable=True)], s+a*x)
f.append_mode('c&py')
f.append_mode(custom_mode)
f(1,2)
raise NotImplementedError()
def test_modes_dual_with_error(self):
custom_broken_mode = None
a = T.scalar() # the a is for 'anonymous' (un-named).
x,s = T.scalars('xs')
f = function([x, In(a, value=1.0,name='a'), In(s, value=0.0, update=s+a*x, mutable=True)], s+a*x)
f.append_mode(custom_broken_mode)
checkfor(self, lambda: f(1,2), SomeException)
raise NotImplementedError()
class T_function_examples(unittest.TestCase):
def test_accumulator(self): def test_accumulator(self):
"""Test low-level interface with state.""" """Test low-level interface with state."""
x = T.scalar('x') x = T.scalar('x')
...@@ -385,7 +473,7 @@ class T_function(unittest.TestCase): ...@@ -385,7 +473,7 @@ class T_function(unittest.TestCase):
assert states_inc[0].value == 10 #compatible assert states_inc[0].value == 10 #compatible
fn_dec, states_dec = function_states(\ fn_dec, states_dec = function_states(\
inputs = [x], outputs = [], states = [(s, states_inc[0], s-x)]) inputs = [x], outputs = [], states = [((s, s-x), states_inc[0])])
try: try:
fn_inc.run([5], states_dec) # wrong kind of state for given program fn_inc.run([5], states_dec) # wrong kind of state for given program
...@@ -404,11 +492,12 @@ class T_function(unittest.TestCase): ...@@ -404,11 +492,12 @@ class T_function(unittest.TestCase):
si1 = numpy.ones_like(mu1) #unit variance si1 = numpy.ones_like(mu1) #unit variance
#implicit internal state #implicit internal state
label = random.bernoulli(0.5) r_state = random.random_state()
label = r_state.bernoulli(0.5)
#implicit internal state for each DiagGaussian #implicit internal state for each DiagGaussian
x = label * random.DiagGaussian(mu0, si0) \ x = label * DiagGaussian(mu0, si0, state=r_state) \
+ (1 - label) * random.DiagGaussian(mu1,si1) + (1 - label) * random.DiagGaussian(mu1, si1, state=r_state)
w = T.tensor.dvector() w = T.tensor.dvector()
b = T.tensor.dscalar() b = T.tensor.dscalar()
...@@ -421,12 +510,12 @@ class T_function(unittest.TestCase): ...@@ -421,12 +510,12 @@ class T_function(unittest.TestCase):
init_w = numpy.array([0.0, 0.0]) init_w = numpy.array([0.0, 0.0])
init_b = 0.0 init_b = 0.0
io_stream = T.function([], [label, x]) io_stream = T.function([], [label, x], state={'seed':(r_state, 42)})
perceptron_learn = T.function([x, label], [decision], perceptron_learn = T.function([x, label], [decision],
state={ state={
'w':(w, init_w, update_w), 'w':((w, update_w), init_w),
'b':(b, init_b, update_b), 'b':((b, update_b), init_b),
'lr':(lr, 0.01)}) 'lr':(lr, 0.01)})
perceptron_use = T.function([x], [decision], perceptron_use = T.function([x], [decision],
...@@ -448,34 +537,6 @@ class T_function(unittest.TestCase): ...@@ -448,34 +537,6 @@ class T_function(unittest.TestCase):
print d0 print d0
print 'errs =', errs print 'errs =', errs
def test_shared(self):
"""Test shared r/w state."""
x = T.scalar('x')
s = T.scalar('s')
fn_inc, states_inc = function_states(\
inputs = [x], outputs = [], states = [(s, 0, s+x)])
fn_dec, states_dec = function_states(\
inputs = [x], outputs = [], states = [(s, states_inc[0], s-x)])
sum = 0
for inc in [1, 4, 5,23, -324]:
sum += inc
fn_inc.run([inc], states_inc)
assert sum == states_inc[0].value
a = sum
for inc in [1, 4, 5,23, -324]:
sum -= inc
fn_dec(inc)
assert sum == 0
assert states_inc[0].value == sum
for inc in [1, 4, 5,23, -324]:
sum -= inc
fn_dec(inc)
assert sum == -a
assert states_inc[0].value == sum
class T_dict_interface(unittest.TestCase): class T_dict_interface(unittest.TestCase):
...@@ -539,10 +600,11 @@ class T_dict_interface(unittest.TestCase): ...@@ -539,10 +600,11 @@ class T_dict_interface(unittest.TestCase):
if __name__ == '__main__': if __name__ == '__main__':
if 1: if 0:
unittest.main() unittest.main()
else: else:
testcases = [T_dict_interface, T_state] testcases = []
testcases.append(T_function)
#<testsuite boilerplate> #<testsuite boilerplate>
testloader = unittest.TestLoader() testloader = unittest.TestLoader()
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论