Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
P
pytensor
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
图表
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
testgroup
pytensor
Commits
405feb22
提交
405feb22
authored
4月 09, 2009
作者:
James Bergstra
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
added more tests of pickling function
上级
648fb338
隐藏空白字符变更
内嵌
并排
正在显示
1 个修改的文件
包含
73 行增加
和
250 行删除
+73
-250
test_function.py
theano/compile/tests/test_function.py
+73
-250
没有找到文件。
theano/compile/tests/test_function.py
浏览文件 @
405feb22
...
@@ -27,126 +27,6 @@ def checkfor(testcase, fn, E):
...
@@ -27,126 +27,6 @@ def checkfor(testcase, fn, E):
testcase
.
fail
()
testcase
.
fail
()
# def graph1(): # (x+y) * (x/z)
# x, y, z = floats('xyz')
# o = mul(add(x, y), div(x, z))
# return [x,y,z], [o]
# class T_Function(unittest.TestCase):
# def test_noopt(self):
# gi, go = graph1()
# p = function(gi, go, optimizer = None, linker = 'py')
# self.failUnless(p(1.0,3.0,4.0) == 1.0)
# def test_opt(self):
# opt = PatternOptimizer((div, '1', '2'), (div, '2', '1'))
# gi, go = graph1()
# p = function(gi,go, optimizer=opt.optimize, linker = 'py')
# self.failUnless(p(1.,3.,4.) == 16.0)
# def test_multiout(self):
# def graph2():
# x, y, z = floats('xyz')
# o = mul(add(x, y), div(x, z))
# return [x,y,z], [o, o.owner.inputs[1]]
# opt = PatternOptimizer((div, '1', '2'), (div, '2', '1'))
# gi, go = graph2()
# p = function(gi,go, optimizer=opt.optimize)
# a,b = p(1.,3.,4.)
# self.failUnless(a == 16.0)
# self.failUnless(b == 4.0)
# def test_make_many_functions(self):
# x, y, z = tensor.scalars('xyz')
# e0, e1, e2 = x+y+z, x*y-z, z*z+x*x+y*y
# f1 = function([x, y, z], [e0])
# f2 = function([x, y, z], [e0])
# f3 = function([x, y, z], [e1])
# f4 = function([x, y, z], [e2])
# f5 = function([e0], [e0 * e0])
# ff = FunctionFactory([x, y, z], [e0])
# f6 = ff.create()
# f7 = ff.create()
# f8 = ff.create()
# f9 = ff.partial(1.0, 2.0)
# assert f1(1.0, 2.0, 3.0) == 6.0
# assert f2(1.0, 2.0, 3.0) == 6.0
# assert f3(1.0, 2.0, 3.0) == -1.0
# assert f4(1.0, 2.0, 3.0) == 14.0
# assert f5(7.0) == 49.0
# assert f6(1.0, 2.0, 3.0) == 6.0
# assert f7(1.0, 2.0, 3.0) == 6.0
# assert f8(1.0, 2.0, 3.0) == 6.0
# assert f9(3.0) == 6.0
# def test_no_inputs(self):
# x, y, z = tensor.value(1.0), tensor.value(2.0), tensor.value(3.0)
# e = x*x + y*y + z*z
# assert function([], [e], linker = 'py')() == 14.0
# assert function([], [e], linker = 'c')() == 14.0
# assert function([], [e], linker = 'c|py')() == 14.0
# assert function([], [e], linker = 'c&py')() == 14.0
# assert eval_outputs([e]) == 14.0
# assert fast_compute(e) == 14.0
# def test_closure(self):
# x, y, z = tensor.scalars('xyz')
# v = tensor.value(numpy.zeros(()))
# e = x + tensor.add_inplace(v, 1)
# f = function([x], [e])
# assert f(1.) == 2.
# assert f(1.) == 3.
# assert f(1.) == 4.
# def test_borrow_true(self):
# x, y, z = tensor.scalars('xyz')
# e = x + y + z
# f = function([x, y, z], [e], borrow_outputs = True)
# res1 = f(1.0, 2.0, 3.0)
# assert res1 == 6.0
# res2 = f(1.0, 3.0, 5.0)
# assert res1 is res2
# assert res1 == 9.0
# assert res2 == 9.0
# def test_borrow_false(self):
# x, y, z = tensor.scalars('xyz')
# e = x + y + z
# for linker in 'py c c|py c&py'.split():
# f = function([x, y, z], [e], borrow_outputs = False, linker = linker)
# res1 = f(1.0, 2.0, 3.0)
# self.failUnless(res1 == 6.0, (res1, linker))
# res2 = f(1.0, 3.0, 5.0)
# self.failUnless(res1 is not res2, (res1, res2, linker))
# self.failUnless(res1 == 6.0, (res1, linker))
# self.failUnless(res2 == 9.0, (res2, linker))
# def test_borrow_false_through_inplace(self):
# x, y, z = tensor.scalars('xyz')
# # if borrow_outputs is False, we must not reuse the temporary created for x+y
# e = tensor.add_inplace(x + y, z)
# for linker in 'py c c|py c&py'.split():
# f = function([x, y, z], [e], borrow_outputs = False, linker = linker)
# res1 = f(1.0, 2.0, 3.0)
# self.failUnless(res1 == 6.0, (res1, linker))
# res2 = f(1.0, 3.0, 5.0)
# self.failUnless(res1 is not res2, (res1, res2, linker))
# self.failUnless(res1 == 6.0, (res1, linker))
# self.failUnless(res2 == 9.0, (res2, linker))
# class T_fast_compute(unittest.TestCase):
# def test_straightforward(self):
# x, y, z = tensor.value(1.0), tensor.value(2.0), tensor.value(3.0)
# e = x*x + y*y + z*z
# assert fast_compute(e) == 14.0
# assert compile._fcache[(e, )]() == 14.0
class
T_function
(
unittest
.
TestCase
):
class
T_function
(
unittest
.
TestCase
):
def
test_none
(
self
):
def
test_none
(
self
):
fn
=
function
([],
None
)
#ok
fn
=
function
([],
None
)
#ok
...
@@ -477,165 +357,108 @@ class T_picklefunction(unittest.TestCase):
...
@@ -477,165 +357,108 @@ class T_picklefunction(unittest.TestCase):
assert
[
i
.
type
for
i
in
nf
.
outputs
]
==
[
i
.
type
for
i
in
ng
.
outputs
]
assert
[
i
.
type
for
i
in
nf
.
outputs
]
==
[
i
.
type
for
i
in
ng
.
outputs
]
# class T_function_examples(unittest.TestCase):
def
test_multiple_functions
(
self
):
# def test_accumulator(self):
a
=
T
.
scalar
()
# the a is for 'anonymous' (un-named).
# """Test low-level interface with state."""
x
,
s
=
T
.
scalars
(
'xs'
)
# x = T.scalar('x')
v
=
T
.
vector
(
'v'
)
# s = T.scalar('s')
# fn, states = program_states(inputs = [x], outputs = [], states = [(s, 0, s+x)])
# sum = 0
# for inc in [1, 4, 5,23, -324]:
# sum += inc
# fn.run([inc], states)
# assert sum == states[0].value
# def test_misc0(self):
# fn_inc, states_inc = function_states(\
# put in some inputs
# inputs = [x], outputs = [], states = [(s, 0, s+x)])
list_of_things
=
[
s
,
x
,
v
]
# fn_inc2, states_inc2 = function_states(\
# some derived thing, whose inputs aren't all in the list
# inputs = [x], outputs = [], states = [(s, 0, s+x)]
)
list_of_things
.
append
(
a
*
x
+
s
)
# fn_inc_copy = copy.copy(fn_inc) #USE fn copy
f1
=
function
([
x
,
In
(
a
,
value
=
1.0
,
name
=
'a'
),
In
(
s
,
value
=
0.0
,
update
=
s
+
a
*
x
,
mutable
=
True
)],
s
+
a
*
x
)
list_of_things
.
append
(
f1
)
# # run() is like __call__, but requires an explicit state argument
# now put in a function sharing container with the previous one
f2
=
function
([
x
,
In
(
a
,
value
=
1.0
,
name
=
'a'
),
In
(
s
,
value
=
f1
.
container
[
s
],
update
=
s
+
a
*
x
,
mutable
=
True
)],
s
+
a
*
x
)
list_of_things
.
append
(
f2
)
# fn_inc.run([5], states_inc) #run on own state object
assert
isinstance
(
f2
.
container
[
s
]
.
storage
,
list
)
# fn_inc2.run([3], states_inc) #run on compatible state object
assert
f2
.
container
[
s
]
.
storage
is
f1
.
container
[
s
]
.
storage
# assert states_inc[0].value == 8
# states_inc_copy = copy.copy(states_inc) #USE state copy
# now put in a function with non-scalar
# fn_inc_copy.run([2], states_inc_copy
)
f3
=
function
([
x
,
In
(
v
,
value
=
numpy
.
asarray
([
2
,
3
,
4.
]))],
x
+
v
)
# assert states_inc[0].value == 10 #compatible
list_of_things
.
append
(
f3
)
# fn_dec, states_dec = function_states(\
# try to pickle the entire things
# inputs = [x], outputs = [], states = [((s, s-x), states_inc[0])])
try
:
saved_format
=
cPickle
.
dumps
(
list_of_things
)
new_list_of_things
=
cPickle
.
loads
(
saved_format
)
except
NotImplementedError
,
e
:
if
e
[
0
]
.
startswith
(
'DebugMode is not picklable'
):
return
else
:
raise
# try:
# now test our recovered new_list_of_things
# fn_inc.run([5], states_dec) # wrong kind of state for given program
# it should be totally unrelated to the original
# self.fail("fn accepted an invalid state argument")
# it should be interdependent in the same way as the original
# except SpecificException:
# raise NotImplementedError() #TODO
# except Exception:
# self.fail("fn accepted an invalid state argument")
# def test_perceptron(self):
ol
=
list_of_things
# """Test high-level state interface."""
nl
=
new_list_of_things
# mu0 = numpy.array([1.0,0.0])
for
i
in
range
(
4
):
# mu1 = numpy.array([0.0,0.1])
assert
nl
[
i
]
!=
ol
[
i
]
# si0 = numpy.ones_like(mu0) #unit varianc
e
assert
nl
[
i
]
.
type
==
ol
[
i
]
.
typ
e
# si1 = numpy.ones_like(mu1) #unit varianc
e
assert
nl
[
i
]
.
type
is
not
ol
[
i
]
.
typ
e
# #implicit internal state
# see if the implicit input got stored
# r_state = random.random_state()
assert
ol
[
3
]
.
owner
.
inputs
[
1
]
is
s
# label = r_state.bernoulli(0.5)
assert
nl
[
3
]
.
owner
.
inputs
[
1
]
is
not
s
assert
nl
[
3
]
.
owner
.
inputs
[
1
]
.
type
==
s
.
type
# #implicit internal state for each DiagGaussian
# moving on to the functions...
# x = label * DiagGaussian(mu0, si0, state=r_state) \
for
i
in
range
(
4
,
7
):
# + (1 - label) * random.DiagGaussian(mu1, si1, state=r_state)
assert
nl
[
i
]
!=
ol
[
i
]
# w = T.tensor.dvector()
# looking at function number 1, input 's'
# b = T.tensor.dscalar()
assert
nl
[
4
][
nl
[
0
]]
is
not
ol
[
4
][
ol
[
0
]]
# lr = 0.01
assert
nl
[
4
][
nl
[
0
]]
==
ol
[
4
][
ol
[
0
]]
assert
nl
[
4
](
3
)
==
ol
[
4
](
3
)
# decision = dot(x,w) + b > 0
# looking at function number 2, input 's'
# new_w = w + neq(label, decision) * lr * x
# make sure it's shared with the first function
# new_b = b + neq(label, decision) * (label * (-lr) + (1-label)*lr)
assert
ol
[
4
]
.
container
[
ol
[
0
]]
.
storage
is
ol
[
5
]
.
container
[
ol
[
0
]]
.
storage
assert
nl
[
4
]
.
container
[
nl
[
0
]]
.
storage
is
nl
[
5
]
.
container
[
nl
[
0
]]
.
storage
assert
nl
[
5
](
3
)
==
ol
[
5
](
3
)
assert
nl
[
4
]
.
value
[
nl
[
0
]]
==
6
# init_w = numpy.array([0.0, 0.0])
assert
numpy
.
all
(
nl
[
6
][
nl
[
2
]]
==
numpy
.
asarray
([
2
,
3.
,
4
]))
# init_b = 0.0
# io_stream = T.function([], [label, x], state={'seed':(r_state, 42)})
# perceptron_learn = T.function([x, label], [decision],
def
test_pickle_class_with_functions
(
self
):
# state={
# 'w':((w, update_w), init_w),
# 'b':((b, update_b), init_b),
# 'lr':(lr, 0.01)})
# perceptron_use = T.function([x], [decision],
blah
=
SomethingToPickle
()
# state={
assert
blah
.
f2
.
container
[
blah
.
s
]
.
storage
is
blah
.
f1
.
container
[
blah
.
s
]
.
storage
# 'w':(w, perceptron_learn.shared['w']),
# 'b':(b, perceptron_learn.shared['b'])})
# errs = 0
blah2
=
copy
.
deepcopy
(
blah
)
# for i in xrange(100):
assert
blah2
.
f2
.
container
[
blah2
.
s
]
.
storage
is
blah2
.
f1
.
container
[
blah2
.
s
]
.
storage
# il, ix = io_stream()
# d0 = perceptron_use(ix)
assert
blah
.
f1
[
blah
.
s
]
==
blah2
.
f1
[
blah2
.
s
]
# d1 = perceptron_learn(ix, il)
# assert d0 == d1
blah
.
f2
(
5
)
assert
blah
.
f1
[
blah
.
s
]
!=
blah2
.
f1
[
blah2
.
s
]
# errs += (d0 != d1)
# print d0
# print 'errs =', errs
class
SomethingToPickle
(
object
):
def
__init__
(
self
):
a
=
T
.
scalar
()
# the a is for 'anonymous' (un-named).
x
,
s
=
T
.
scalars
(
'xs'
)
v
=
T
.
vector
(
'v'
)
# class T_dict_interface(unittest.TestCase):
self
.
s
=
s
self
.
x
=
x
self
.
v
=
v
# def test_keyword(self):
self
.
e
=
a
*
x
+
s
# x = T.scalar('x')
# y = T.scalar('y')
# s = T.scalar('s')
# fn = function(input_kw = {'a':x, 'b':y}, outputs = [], state = {'s':(s, 0, s+x/y)}
)
self
.
f1
=
function
([
x
,
In
(
a
,
value
=
1.0
,
name
=
'a'
),
In
(
s
,
value
=
0.0
,
update
=
s
+
a
*
x
,
mutable
=
True
)],
s
+
a
*
x
)
# try:
self
.
f2
=
function
([
x
,
In
(
a
,
value
=
1.0
,
name
=
'a'
),
In
(
s
,
value
=
self
.
f1
.
container
[
s
],
update
=
s
+
a
*
x
,
mutable
=
True
)],
s
+
a
*
x
)
# fn(1, 1)
# self.fail("non-keyword call accepted!")
# except SpecificException:
# raise NotImplementedError()
# except Exception:
# self.fail("non-keyword call accepted!")
# try:
# fn(a=1)
# self.fail("incomplete call accepted!")
# except SpecificException:
# raise NotImplementedError()
# except Exception:
# self.fail("incomplete call accepted!")
# try:
# fn(a=1, b=1, c=1)
# self.fail("overcomplete call accepted!")
# except SpecificException:
# raise NotImplementedError()
# except Exception:
# self.fail("overcomplete call accepted!")
# def test_aliased_state(self):
# """Test keyword input and copy."""
# x = T.scalar('x')
# y = T.scalar('y')
# s = T.scalar('s')
# fn = function(input_kw = {'a':x, 'b':y}, outputs = [], state = {'s':(s, 0, s+x/y)})
# fn2 = fn.copy()
# fn3 = fn.copy()
# fn(a=2, b=5)
# fn2(a=5, b=2)
# fn3(b=2, a=5)
# assert fn.state['s'] == 2.0/5
# assert fn2.state['s'] == 5.0/2
# assert fn3.state['s'] == 5.0/2
# #fn and fn3 use the same sort of state, so this is OK.
# fn3.state = fn.state
# fn.state['s'] = 0
# fn(a=1, b=1) #increment the shared state
# assert fn3.state['s'] == 1
# fn3(a=-1, b=1) #decrement the shared state
# assert fn.state['s'] == 0
if
__name__
==
'__main__'
:
if
__name__
==
'__main__'
:
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论