提交 bdb05333 authored 作者: Brandon T. Willard's avatar Brandon T. Willard 提交者: Brandon T. Willard

Use timeit in tests.scan.test_basic.test_speed* tests

上级 eeaa94d6
......@@ -4565,54 +4565,49 @@ class TestScan:
not config.cxx, reason="G++ not available, so we need to skip this test."
)
def test_speed():
"""
This function prints out the speed of very simple recurrent
calculations implemented in various ways. In DebugMode this will
test the correctness of the optimizations applied, but generally
correctness-testing is not the goal of this test.
from timeit import timeit
To be honest, it isn't really a unit test so much as a tool for testing
approaches to scan.
n_timeit = 50
The computation being tested here is a recurrent addition.
"""
# We need the CVM for this speed test
r = np.arange(10000).astype(config.floatX).reshape(1000, 10)
t0 = time.time()
for i in range(1, 1000):
r[i] += r[i - 1]
t1 = time.time()
print("python", t1 - t0)
def f_py():
for i in range(1, 1000):
r[i] += r[i - 1]
r = np.arange(10000).astype(config.floatX).reshape(1000, 10)
t0 = time.time()
r_i = iter(r[1:])
r_ii = iter(r[:-1])
while True:
try:
tmp = next(r_i)
tmp += next(r_ii)
except StopIteration:
break
t1 = time.time()
print("python with builtin iterator", t1 - t0)
python_duration = timeit(lambda: f_py(), number=n_timeit)
r = np.arange(10000).astype(config.floatX).reshape(1000, 10)
s_r = tensor.matrix()
s_y, updates = scan(
fn=lambda ri, rii: ri + rii,
sequences=[s_r[1:]],
outputs_info=tensor.constant(r[0]),
mode=theano.Mode(linker="cvm"),
)
assert not updates
f = theano.function([s_r], s_y)
t2 = time.time()
f(r)
t3 = time.time()
print("theano (scan, cvm)", t3 - t2)
def f_py_iter():
r_i = iter(r[1:])
r_ii = iter(r[:-1])
while True:
try:
tmp = next(r_i)
tmp += next(r_ii)
except StopIteration:
break
python_iter_duration = timeit(lambda: f_py_iter(), number=n_timeit)
# r = np.arange(10000).astype(config.floatX).reshape(1000, 10)
# s_r = tensor.matrix()
# s_y, updates = scan(
# fn=lambda ri, rii: ri + rii,
# sequences=[s_r[1:]],
# outputs_info=tensor.constant(r[0]),
# mode=theano.Mode(linker="cvm"),
# )
# assert not updates
#
# f_cvm = theano.function([s_r], s_y)
#
# cvm_duration = timeit(lambda: f_cvm(r), number=n_timeit)
# XXX: Why does this take so much longer than Python?!
# assert cvm_duration - python_duration < python_duration * 0.15
r = np.arange(10000).astype(config.floatX).reshape(-1, 10)
shared_r = theano.shared(r)
......@@ -4620,39 +4615,28 @@ def test_speed():
s_rinc = tensor.inc_subtensor(
shared_r[s_i], shared_r[s_i - 1], tolerate_inplace_aliasing=True
)
# theano.printing.debugprint(s_rinc)
f = theano.function(
f_cvm_shared = theano.function(
[],
[],
updates=OrderedDict([(s_i, s_i + 1), (shared_r, s_rinc)]),
mode=theano.Mode(linker="cvm"),
)
f._check_for_aliased_inputs = False
t2 = time.time()
f_fn = f.fn
for i in range(998):
f_fn()
f() # 999 to update the profiling timers
t3 = time.time()
print("theano (updates, cvm)", t3 - t2)
f_cvm_shared._check_for_aliased_inputs = False
cvm_shared_duration = timeit(lambda: f_cvm_shared(), number=n_timeit)
assert cvm_shared_duration < python_duration
assert cvm_shared_duration < python_iter_duration
@pytest.mark.skipif(
not config.cxx, reason="G++ not available, so we need to skip this test."
)
def test_speed_rnn():
"""
This function prints out the speed of recurrent neural network
calculations implemented in various ways. In DebugMode this will
test the correctness of the optimizations applied, but generally
correctness-testing is not the goal of this test.
To be honest, it isn't really a unit test so much as a tool for testing
approaches to scan.
from timeit import timeit
The computation being tested here is a repeated tanh of a matrix-vector
multiplication - the heart of an ESN or RNN.
"""
n_timeit = 50
L = 10000
N = 50
......@@ -4660,29 +4644,28 @@ def test_speed_rnn():
r = np.arange(L * N).astype(config.floatX).reshape(L, N)
w = np.random.randn(N, N).astype(config.floatX)
t0 = time.time()
for i in range(1, L):
r[i] = np.tanh(np.dot(r[i - 1], w))
t1 = time.time()
python_duration = t1 - t0
print("python", python_duration)
r = np.arange(L * N).astype(config.floatX).reshape(L, N)
s_r = tensor.matrix()
s_y, updates = scan(
fn=lambda ri, rii: tensor.tanh(tensor.dot(rii, w)),
sequences=[s_r[1:]],
outputs_info=tensor.constant(r[0]),
mode=theano.Mode(linker="cvm"),
)
assert not updates
f = theano.function([s_r], s_y, mode=theano.Mode(linker="cvm"))
t2 = time.time()
f(r)
t3 = time.time()
cvm_duration = t3 - t2
print("theano (cvm)", cvm_duration)
def f_py():
for i in range(1, L):
r[i] = np.tanh(np.dot(r[i - 1], w))
python_duration = timeit(lambda: f_py(), number=n_timeit)
# r = np.arange(L * N).astype(config.floatX).reshape(L, N)
# s_r = tensor.matrix()
# s_y, updates = scan(
# fn=lambda ri, rii: tensor.tanh(tensor.dot(rii, w)),
# sequences=[s_r[1:]],
# outputs_info=tensor.constant(r[0]),
# mode=theano.Mode(linker="cvm"),
# )
# assert not updates
#
# f_cvm = theano.function([s_r], s_y, mode=theano.Mode(linker="cvm"))
#
# cvm_duration = timeit(lambda: f_cvm(r), number=n_timeit)
# XXX: Why does this take so much longer than Python?!
# assert cvm_duration - python_duration < python_duration * 0.15
r = np.arange(L * N).astype(config.floatX).reshape(L, N)
shared_r = theano.shared(r)
......@@ -4692,20 +4675,14 @@ def test_speed_rnn():
theano.tensor.tanh(theano.tensor.dot(shared_r[s_i - 1], w)),
tolerate_inplace_aliasing=True,
)
f = theano.function(
f_cvm_shared = theano.function(
[],
[],
updates=OrderedDict([(s_i, s_i + 1), (shared_r, s_rinc)]),
mode=theano.Mode(linker="cvm"),
)
f_fn = f.fn
t4 = time.time()
f_fn(n_calls=L - 2)
f() # 999 to update the profiling timers
t5 = time.time()
cvm_shared_duration = t5 - t4
print("theano (updates, cvm)", cvm_shared_duration)
cvm_shared_duration = timeit(lambda: f_cvm_shared(), number=n_timeit)
assert cvm_shared_duration < python_duration
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论