提交 4786b8ed authored 作者: Pascal Lamblin's avatar Pascal Lamblin

Create files in a tmp dir, clean up afterwards

上级 fc356844
import numpy
from numpy.testing import assert_allclose
from nose.plugins.skip import SkipTest
import os
from tempfile import mkdtemp
import shutil
import unittest
import theano
import theano.sandbox.cuda as cuda_ndarray
......@@ -11,45 +15,57 @@ from theano.sandbox.rng_mrg import MRG_RandomStreams
from theano.misc.pkl_utils import dump, load
def test_dump_load():
if not cuda_ndarray.cuda_enabled:
raise SkipTest('Optional package cuda disabled')
class T_dump_load(unittest.TestCase):
def setUp(self):
# Work in a temporary directory to avoid cluttering the repository
self.origdir = os.getcwd()
self.tmpdir = None
self.tmpdir = mkdtemp()
os.chdir(self.tmpdir)
x = CudaNdarraySharedVariable('x', CudaNdarrayType((1, 1), name='x'),
[[1]], False)
def tearDown(self):
# Get back to the original dir, and delete the temporary one
os.chdir(self.origdir)
if self.tmpdir is not None:
shutil.rmtree(self.tmpdir)
with open('test', 'wb') as f:
dump(x, f)
def test_dump_load(self):
if not cuda_ndarray.cuda_enabled:
raise SkipTest('Optional package cuda disabled')
with open('test', 'rb') as f:
x = load(f)
x = CudaNdarraySharedVariable('x', CudaNdarrayType((1, 1), name='x'),
[[1]], False)
assert x.name == 'x'
assert_allclose(x.get_value(), [[1]])
with open('test', 'wb') as f:
dump(x, f)
with open('test', 'rb') as f:
x = load(f)
def test_dump_load_mrg():
rng = MRG_RandomStreams(use_cuda=cuda_ndarray.cuda_enabled)
assert x.name == 'x'
assert_allclose(x.get_value(), [[1]])
with open('test', 'wb') as f:
dump(rng, f)
def test_dump_load_mrg(self):
rng = MRG_RandomStreams(use_cuda=cuda_ndarray.cuda_enabled)
with open('test', 'rb') as f:
rng = load(f)
with open('test', 'wb') as f:
dump(rng, f)
assert type(rng) == MRG_RandomStreams
with open('test', 'rb') as f:
rng = load(f)
assert type(rng) == MRG_RandomStreams
def test_dump_zip_names():
foo_1 = theano.shared(0, name='foo')
foo_2 = theano.shared(1, name='foo')
foo_3 = theano.shared(2, name='foo')
with open('model.zip', 'wb') as f:
dump((foo_1, foo_2, foo_3, numpy.array(3)), f)
keys = list(numpy.load('model.zip').keys())
assert keys == ['foo', 'foo_2', 'foo_3', 'array_0', 'pkl']
foo_3 = numpy.load('model.zip')['foo_3']
assert foo_3 == numpy.array(2)
with open('model.zip', 'rb') as f:
foo_1, foo_2, foo_3, array = load(f)
assert array == numpy.array(3)
def test_dump_zip_names(self):
foo_1 = theano.shared(0, name='foo')
foo_2 = theano.shared(1, name='foo')
foo_3 = theano.shared(2, name='foo')
with open('model.zip', 'wb') as f:
dump((foo_1, foo_2, foo_3, numpy.array(3)), f)
keys = list(numpy.load('model.zip').keys())
assert keys == ['foo', 'foo_2', 'foo_3', 'array_0', 'pkl']
foo_3 = numpy.load('model.zip')['foo_3']
assert foo_3 == numpy.array(2)
with open('model.zip', 'rb') as f:
foo_1, foo_2, foo_3, array = load(f)
assert array == numpy.array(3)
......@@ -256,7 +256,7 @@ class T_Scan(unittest.TestCase):
finally:
f_in.close()
finally:
# Get back to the orinal dir, and delete temporary one.
# Get back to the original dir, and delete the temporary one.
os.chdir(origdir)
if tmpdir is not None:
shutil.rmtree(tmpdir)
......
......@@ -878,7 +878,7 @@ class T_loading_and_saving(unittest.TestCase):
loaded_objects.append(pickle.load(f))
f.close()
finally:
# Get back to the orinal dir, and temporary one.
# Get back to the original dir, and delete the temporary one.
os.chdir(origdir)
if tmpdir is not None:
shutil.rmtree(tmpdir)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论