提交 d7d28014 authored 作者: Arnaud Bergeron's avatar Arnaud Bergeron

Remove deprecated tensor.randomstreams and replace tensor.random by shared_randomstreams.

上级 95d54e10
......@@ -195,11 +195,11 @@ def sparse_grad(var):
ret = var.owner.op.__class__(sparse_grad=True)(*var.owner.inputs)
return ret
# This cannot be done in tensor/__init__.py due to a circular dependency -- randomstreams
# depends on raw_random which depends on tensor. As a work-around, we import RandomStreams
# here and inject an instance in tensor.
from theano.tensor.randomstreams import RandomStreams
# This cannot be done in tensor/__init__.py due to a circular
# dependency -- shared_randomstreams depends on raw_random which
# depends on tensor. As a work-around, we import RandomStreams here
# and inject an instance in tensor.
from theano.tensor.shared_randomstreams import RandomStreams
# Imitate the numpy.random symbol with a tensor.random one
tensor.random = RandomStreams(seed=0xBAD5EED, no_warn=True)
tensor.random = RandomStreams(seed=0xBAD5EED)
del RandomStreams
__import__('theano.tensor.shared_randomstreams')
......@@ -20,13 +20,7 @@ from theano.tensor import xlogx
# These imports cannot be performed here because the modules depend on tensor. This is done at the
# end of theano.__init__.py instead.
#from theano.tensor import raw_random
#from theano.tensor import randomstreams
#from theano.tensor import shared_randomstreams
#from theano.tensor.randomstreams import \
# RandomStreams
#random = RandomStreams(seed=0xBAD5EED, no_warn = True)
#"""Imitate the numpy.random symbol with a tensor.random one"""
from theano.tensor.elemwise import DimShuffle, Elemwise, CAReduce
......
"""Define RandomStreams, providing random number variables for Theano
graphs.
"""
__docformat__ = "restructuredtext en"
import numpy
from theano.compile import In, Component
from theano.gof import Container
from theano.tensor import raw_random
import warnings
def deprecation_warning():
# Make sure the warning is displayed only once.
if deprecation_warning.already_displayed:
return
warnings.warn((
"RandomStreams is deprecated and will be removed in release 0.7. "
"Use shared_randomstreams.RandomStreams or "
"MRG_RandomStreams instead."),
stacklevel=3)
deprecation_warning.already_displayed = True
deprecation_warning.already_displayed = False
class RandomStreamsInstance(object):
"""RandomStreamsInstance"""
def __init__(self, random_streams, memo, default_seed):
self.random_streams = random_streams
self.memo = memo
self.default_seed = default_seed
def initialize(self, seed=None):
"""Initialize each random stream
:param seed: each random stream will be assigned a unique
state that depends deterministically on this value.
:type seed: None or integer in range 0 to 2**30
:rtype: None
"""
self.seed(seed)
def seed(self, seed=None):
"""Re-initialize each random stream
:param seed: each random stream will be assigned a unique
state that depends deterministically on this value.
:type seed: None or integer in range 0 to 2**30
:rtype: None
"""
if seed is None:
seed = self.default_seed
#backport
#seed = self.default_seed if seed is None else seed
seedgen = numpy.random.RandomState(seed)
for old_r, new_r in self.random_streams.random_state_variables:
old_r_seed = seedgen.randint(2 ** 30)
old_r_container = self.memo[old_r].value
if old_r_container.value is None:
#the cast to int here makes it work on 32bit machines,
#not sure why
old_r_container.value = numpy.random.RandomState(
int(old_r_seed))
else:
#the cast to int here makes it work on 32bit machines,
#not sure why
old_r_container.value.seed(int(old_r_seed))
def __getitem__(self, item):
"""Retrieve the numpy RandomState instance associated with a
particular stream
:param item: a variable of type RandomStateType, associated
with this RandomStream
:rtype: numpy RandomState (or None, before initialize)
"""
for old_r, new_r in self.random_streams.random_state_variables:
if item is old_r:
container = self.memo[item].value
return container.value
raise KeyError(item)
def __setitem__(self, item, val):
"""Set the numpy RandomState instance associated with a
particular stream
:param item: a variable of type RandomStateType, associated
with this RandomStream
:param val: the new value
:type val: numpy RandomState
:rtype: None
"""
if type(val) is not numpy.random.RandomState:
raise TypeError('only values of type RandomState are permitted',
val)
for old_r, new_r in self.random_streams.random_state_variables:
if item is old_r:
container = self.memo[item].value
container.value = val
return
raise KeyError(item)
class RandomStreams(Component, raw_random.RandomStreamsBase):
"""Module component with similar interface to numpy.random
(numpy.random.RandomState)
"""
def __init__(self, seed=None, no_warn=False):
""":type seed: None or int
:param seed: a default seed to initialize the RandomState
instances after build. See `RandomStreamsInstance.__init__`
for more details.
"""
if not no_warn:
deprecation_warning()
super(RandomStreams, self).__init__(no_warn=True)
# A list of pairs of the form (input_r, output_r). This will be
# over-ridden by the module instance to contain stream generators.
self.random_state_variables = []
# Instance variable should take None or integer value. Used to seed the
# random number generator that provides seeds for member streams
self.default_instance_seed = seed
def allocate(self, memo):
"""override `Component.allocate` """
for old_r, new_r in self.random_state_variables:
if old_r in memo:
assert memo[old_r].update is new_r
else:
memo[old_r] = In(old_r,
value=Container(old_r, storage=[None]),
update=new_r,
mutable=True)
def build(self, mode, memo):
"""override `Component.build` """
if self not in memo:
memo[self] = RandomStreamsInstance(self, memo,
self.default_instance_seed)
return memo[self]
def gen(self, op, *args, **kwargs):
"""Create a new random stream in this container.
:param op: a RandomFunction instance to
:param args: interpreted by `op`
:param kwargs: interpreted by `op`
:returns: The symbolic random draw part of op()'s return
value. This function stores the updated RandomStateType
Variable for use at `build` time.
:rtype: TensorVariable
"""
random_state_variable = raw_random.random_state_type()
new_r, out = op(random_state_variable, *args, **kwargs)
out.rng = random_state_variable
self.random_state_variables.append((random_state_variable, new_r))
return out
__docformat__ = "restructuredtext en"
import sys
import unittest
import numpy
from theano.tensor.randomstreams import RandomStreams, raw_random
from theano.compile import Module, Method, Member
from theano.tests import unittest_tools as utt
from theano import tensor
from theano import compile, config, gof
class T_RandomStreams(unittest.TestCase):
def setUp(self):
utt.seed_rng()
def test_basics(self):
m = Module()
m.random = RandomStreams(utt.fetch_seed())
m.fn = Method([], m.random.uniform((2,2)))
m.gn = Method([], m.random.normal((2,2)))
made = m.make()
made.random.initialize()
fn_val0 = made.fn()
fn_val1 = made.fn()
gn_val0 = made.gn()
rng_seed = numpy.random.RandomState(utt.fetch_seed()).randint(2**30)
rng = numpy.random.RandomState(int(rng_seed)) #int() is for 32bit
#print fn_val0
numpy_val0 = rng.uniform(size=(2,2))
numpy_val1 = rng.uniform(size=(2,2))
#print numpy_val0
assert numpy.allclose(fn_val0, numpy_val0)
assert numpy.allclose(fn_val1, numpy_val1)
def test_seed_in_initialize(self):
m = Module()
m.random = RandomStreams(234)
m.fn = Method([], m.random.uniform((2,2)))
made = m.make()
made.random.initialize(seed=utt.fetch_seed())
fn_val0 = made.fn()
fn_val1 = made.fn()
rng_seed = numpy.random.RandomState(utt.fetch_seed()).randint(2**30)
rng = numpy.random.RandomState(int(rng_seed)) #int() is for 32bit
#print fn_val0
numpy_val0 = rng.uniform(size=(2,2))
numpy_val1 = rng.uniform(size=(2,2))
#print numpy_val0
assert numpy.allclose(fn_val0, numpy_val0)
assert numpy.allclose(fn_val1, numpy_val1)
def test_seed_fn(self):
m = Module()
m.random = RandomStreams(234)
m.fn = Method([], m.random.uniform((2,2)))
made = m.make()
made.random.initialize(seed=789)
made.random.seed(utt.fetch_seed())
fn_val0 = made.fn()
fn_val1 = made.fn()
rng_seed = numpy.random.RandomState(utt.fetch_seed()).randint(2**30)
rng = numpy.random.RandomState(int(rng_seed)) #int() is for 32bit
#print fn_val0
numpy_val0 = rng.uniform(size=(2,2))
numpy_val1 = rng.uniform(size=(2,2))
#print numpy_val0
assert numpy.allclose(fn_val0, numpy_val0)
assert numpy.allclose(fn_val1, numpy_val1)
def test_getitem(self):
m = Module()
m.random = RandomStreams(234)
out = m.random.uniform((2,2))
m.fn = Method([], out)
made = m.make()
made.random.initialize(seed=789)
made.random.seed(utt.fetch_seed())
rng = numpy.random.RandomState()
rng.set_state(made.random[out.rng].get_state())
fn_val0 = made.fn()
fn_val1 = made.fn()
numpy_val0 = rng.uniform(size=(2,2))
numpy_val1 = rng.uniform(size=(2,2))
assert numpy.allclose(fn_val0, numpy_val0)
assert numpy.allclose(fn_val1, numpy_val1)
def test_setitem(self):
m = Module()
m.random = RandomStreams(234)
out = m.random.uniform((2,2))
m.fn = Method([], out)
made = m.make()
#as a distraction, install various seeds
made.random.initialize(seed=789)
made.random.seed(888)
# then replace the rng of the stream we care about via setitem
realseed = utt.fetch_seed()
rng = numpy.random.RandomState(realseed)
made.random[out.rng] = numpy.random.RandomState(realseed)
print made.fn()
print rng.uniform(size=(2,2))
fn_val0 = made.fn()
fn_val1 = made.fn()
numpy_val0 = rng.uniform(size=(2,2))
numpy_val1 = rng.uniform(size=(2,2))
assert numpy.allclose(fn_val0, numpy_val0)
assert numpy.allclose(fn_val1, numpy_val1)
def test_multiple(self):
M = Module()
M.random = RandomStreams(utt.fetch_seed())
out = M.random.uniform((2,2))
M.m2 = Module()
M.m2.random = M.random
out2 = M.m2.random.uniform((2,2))
M.fn = Method([], out)
M.m2.fn2 = Method([], out2)
m = M.make()
m.random.initialize()
m.m2.initialize()
assert m.random is m.m2.random
def test_ndim(self):
"""Test that the behaviour of 'ndim' optional parameter"""
# 'ndim' is an optional integer parameter, specifying the length
# of the 'shape', passed as a keyword argument.
# ndim not specified, OK
m1 = Module()
m1.random = RandomStreams(utt.fetch_seed())
m1.fn = Method([], m1.random.uniform((2,2)))
made1 = m1.make()
made1.random.initialize()
# ndim specified, consistent with shape, OK
m2 = Module()
m2.random = RandomStreams(utt.fetch_seed())
m2.fn = Method([], m2.random.uniform((2,2), ndim=2))
made2 = m2.make()
made2.random.initialize()
val1 = made1.fn()
val2 = made2.fn()
assert numpy.all(val1 == val2)
# ndim specified, inconsistent with shape, should raise ValueError
m3 = Module()
m3.random = RandomStreams(utt.fetch_seed())
self.assertRaises(ValueError, m3.random.uniform, (2,2), ndim=1)
def test_uniform(self):
"""Test that RandomStreams.uniform generates the same results as numpy"""
# Check over two calls to see if the random state is correctly updated.
m = Module()
m.random = RandomStreams(utt.fetch_seed())
m.fn = Method([], m.random.uniform((2,2), -1, 1))
made = m.make()
made.random.initialize()
fn_val0 = made.fn()
fn_val1 = made.fn()
print fn_val0
print fn_val1
rng_seed = numpy.random.RandomState(utt.fetch_seed()).randint(2**30)
rng = numpy.random.RandomState(int(rng_seed)) #int() is for 32bit
numpy_val0 = rng.uniform(-1, 1, size=(2,2))
numpy_val1 = rng.uniform(-1, 1, size=(2,2))
print numpy_val0
print numpy_val1
assert numpy.allclose(fn_val0, numpy_val0)
assert numpy.allclose(fn_val1, numpy_val1)
def test_normal(self):
"""Test that RandomStreams.normal generates the same results as numpy"""
# Check over two calls to see if the random state is correctly updated.
m = Module()
m.random = RandomStreams(utt.fetch_seed())
m.fn = Method([], m.random.normal((2,2), -1, 2))
made = m.make()
made.random.initialize()
fn_val0 = made.fn()
fn_val1 = made.fn()
rng_seed = numpy.random.RandomState(utt.fetch_seed()).randint(2**30)
rng = numpy.random.RandomState(int(rng_seed)) #int() is for 32bit
numpy_val0 = rng.normal(-1, 2, size=(2,2))
numpy_val1 = rng.normal(-1, 2, size=(2,2))
assert numpy.allclose(fn_val0, numpy_val0)
assert numpy.allclose(fn_val1, numpy_val1)
def test_random_integers(self):
"""Test that RandomStreams.random_integers generates the same results as numpy"""
# Check over two calls to see if the random state is correctly updated.
m = Module()
m.random = RandomStreams(utt.fetch_seed())
m.fn = Method([], m.random.random_integers((20,20), -5, 5))
made = m.make()
made.random.initialize()
fn_val0 = made.fn()
fn_val1 = made.fn()
rng_seed = numpy.random.RandomState(utt.fetch_seed()).randint(2**30)
rng = numpy.random.RandomState(int(rng_seed)) #int() is for 32bit
numpy_val0 = rng.random_integers(-5, 5, size=(20,20))
numpy_val1 = rng.random_integers(-5, 5, size=(20,20))
assert numpy.all(fn_val0 == numpy_val0)
assert numpy.all(fn_val1 == numpy_val1)
def test_permutation(self):
"""Test that RandomStreams.permutation generates the same results as numpy"""
# Check over two calls to see if the random state is correctly updated.
m = Module()
m.random = RandomStreams(utt.fetch_seed())
m.fn = Method([], m.random.permutation((20,), 10))
made = m.make()
made.random.initialize()
fn_val0 = made.fn()
fn_val1 = made.fn()
rng_seed = numpy.random.RandomState(utt.fetch_seed()).randint(2**30)
rng = numpy.random.RandomState(int(rng_seed)) #int() is for 32bit
# rng.permutation outputs one vector at a time, so we iterate.
numpy_val0 = numpy.asarray([rng.permutation(10) for i in range(20)])
numpy_val1 = numpy.asarray([rng.permutation(10) for i in range(20)])
assert numpy.all(fn_val0 == numpy_val0)
assert numpy.all(fn_val1 == numpy_val1)
def test_multinomial(self):
"""Test that RandomStreams.multinomial generates the same results as numpy"""
# Check over two calls to see if the random state is correctly updated.
m = Module()
m.random = RandomStreams(utt.fetch_seed())
m.fn = Method([], m.random.multinomial((20,20), 1, [0.1]*10))
made = m.make()
made.random.initialize()
fn_val0 = made.fn()
fn_val1 = made.fn()
rng_seed = numpy.random.RandomState(utt.fetch_seed()).randint(2**30)
rng = numpy.random.RandomState(int(rng_seed)) #int() is for 32bit
numpy_val0 = rng.multinomial(1, [0.1]*10, size=(20,20))
numpy_val1 = rng.multinomial(1, [0.1]*10, size=(20,20))
assert numpy.all(fn_val0 == numpy_val0)
assert numpy.all(fn_val1 == numpy_val1)
def test_shuffle_row_elements(self):
"""Test that RandomStreams.shuffle_row_elements generates the right results"""
# Check over two calls to see if the random state is correctly updated.
# On matrices, for each row, the elements of that row should be
# shuffled.
# Note that this differs from numpy.random.shuffle, where all the
# elements of the matrix are shuffled.
mm = Module()
mm.random = RandomStreams(utt.fetch_seed())
m_input = tensor.dmatrix()
mm.f = Method([m_input], mm.random.shuffle_row_elements(m_input))
mmade = mm.make()
mmade.random.initialize()
# Generate the elements to be shuffled
val_rng = numpy.random.RandomState(utt.fetch_seed()+42)
in_mval = val_rng.uniform(-2, 2, size=(20,5))
fn_mval0 = mmade.f(in_mval)
fn_mval1 = mmade.f(in_mval)
print in_mval[0]
print fn_mval0[0]
print fn_mval1[0]
assert not numpy.all(in_mval == fn_mval0)
assert not numpy.all(in_mval == fn_mval1)
assert not numpy.all(fn_mval0 == fn_mval1)
rng_seed = numpy.random.RandomState(utt.fetch_seed()).randint(2**30)
rng = numpy.random.RandomState(int(rng_seed))
numpy_mval0 = in_mval.copy()
numpy_mval1 = in_mval.copy()
for row in numpy_mval0:
rng.shuffle(row)
for row in numpy_mval1:
rng.shuffle(row)
assert numpy.all(numpy_mval0 == fn_mval0)
assert numpy.all(numpy_mval1 == fn_mval1)
# On vectors, the behaviour is the same as numpy.random.shuffle,
# except that it does not work in place, but returns a shuffled vector.
vm = Module()
vm.random = RandomStreams(utt.fetch_seed())
v_input = tensor.dvector()
vm.f = Method([v_input], vm.random.shuffle_row_elements(v_input))
vmade = vm.make()
vmade.random.initialize()
in_vval = val_rng.uniform(-3, 3, size=(12,))
fn_vval = vmade.f(in_vval)
numpy_vval = in_vval.copy()
vrng = numpy.random.RandomState(int(rng_seed))
vrng.shuffle(numpy_vval)
print in_vval
print fn_vval
print numpy_vval
assert numpy.all(numpy_vval == fn_vval)
# Trying to shuffle a vector with function that should shuffle
# matrices, or vice versa, raises a TypeError
self.assertRaises(TypeError, vmade.f, in_mval)
self.assertRaises(TypeError, mmade.f, in_vval)
def test_symbolic_shape(self):
m = Module()
m.random = RandomStreams(utt.fetch_seed())
shape = tensor.lvector()
out = m.random.uniform(size=shape, ndim=2)
m.f = Method([shape], out)
made = m.make()
made.random.initialize()
assert made.f([2,3]).shape == (2,3)
assert made.f([4,8]).shape == (4,8)
self.assertRaises(ValueError, made.f, [4])
self.assertRaises(ValueError, made.f, [4,3,4,5])
def test_default_shape(self):
m = Module()
m.random = RandomStreams(utt.fetch_seed())
m.f = Method([], m.random.uniform())
m.g = Method([], m.random.multinomial())
made = m.make()
made.random.initialize()
#seed_rng is generator for generating *seeds* for RandomStates
seed_rng = numpy.random.RandomState(utt.fetch_seed())
uniform_rng = numpy.random.RandomState(int(seed_rng.randint(2**30)))
multinomial_rng = numpy.random.RandomState(int(seed_rng.randint(2**30)))
val0 = made.f()
val1 = made.f()
numpy_val0 = uniform_rng.uniform()
numpy_val1 = uniform_rng.uniform()
assert numpy.allclose(val0, numpy_val0)
assert numpy.allclose(val1, numpy_val1)
for i in range(10): # every test has 50% chance of passing even with non-matching random states
val2 = made.g()
numpy_val2 = multinomial_rng.multinomial(n=1, pvals=[.5, .5])
assert numpy.all(val2 == numpy_val2)
def test_vector_arguments(self):
m = Module()
m.random = RandomStreams(utt.fetch_seed())
low = tensor.vector()
out = m.random.uniform(low=low, high=1)
assert out.ndim == 1
m.f = Method([low], out)
high = tensor.vector()
outb = m.random.uniform(low=low, high=high)
assert outb.ndim == 1
m.fb = Method([low, high], outb)
size = tensor.lvector()
outc = m.random.uniform(low=low, high=high, size=size, ndim=1)
m.fc = Method([low, high, size], outc)
made = m.make()
made.random.initialize()
seed_gen = numpy.random.RandomState(utt.fetch_seed())
numpy_rng = numpy.random.RandomState(int(seed_gen.randint(2**30)))
low_val0 = numpy.asarray([-5, .5, 0, 1], dtype=config.floatX)
low_val1 = numpy.asarray([.9], dtype=config.floatX)
val0 = made.f(low_val0)
val1 = made.f(low_val1)
numpy_val0 = numpy_rng.uniform(low=low_val0, high=1)
numpy_val1 = numpy_rng.uniform(low=low_val1, high=1)
assert numpy.allclose(val0, numpy_val0)
assert numpy.allclose(val1, numpy_val1)
numpy_rng = numpy.random.RandomState(int(seed_gen.randint(2**30)))
val0b = made.fb([-4., -2], [-1, 0])
val1b = made.fb([-4.], [-1])
numpy_val0b = numpy_rng.uniform(low=[-4., -2], high=[-1, 0])
numpy_val1b = numpy_rng.uniform(low=[-4.], high=[-1])
assert numpy.allclose(val0b, numpy_val0b)
assert numpy.allclose(val1b, numpy_val1b)
self.assertRaises(ValueError, made.fb, [-4., -2], [-1, 0, 1])
#TODO: do we want that?
#self.assertRaises(ValueError, made.fb, [-4., -2], [-1])
numpy_rng = numpy.random.RandomState(int(seed_gen.randint(2**30)))
val0c = made.fc([-4., -2], [-1, 0], [2])
val1c = made.fc([-4.], [-1], [1])
numpy_val0c = numpy_rng.uniform(low=[-4., -2], high=[-1, 0])
numpy_val1c = numpy_rng.uniform(low=[-4.], high=[-1])
assert numpy.allclose(val0c, numpy_val0c)
assert numpy.allclose(val1c, numpy_val1c)
self.assertRaises(ValueError, made.fc, [-4., -2], [-1, 0], [1])
self.assertRaises(ValueError, made.fc, [-4., -2], [-1, 0], [1,2])
self.assertRaises(ValueError, made.fc, [-4., -2], [-1, 0], [2,1])
self.assertRaises(ValueError, made.fc, [-4., -2], [-1], [1])
#TODO: do we want that?
#self.assertRaises(ValueError, made.fc, [-4., -2], [-1], [2])
def test_broadcast_arguments(self):
m = Module()
m.random = RandomStreams(utt.fetch_seed())
low = tensor.vector()
high = tensor.col()
out = m.random.uniform(low=low, high=high)
assert out.ndim == 2
m.f = Method([low, high], out)
made = m.make()
made.random.initialize()
rng_seed = numpy.random.RandomState(utt.fetch_seed()).randint(2**30)
numpy_rng = numpy.random.RandomState(int(rng_seed))
low_vals = [
numpy.asarray([-5, .5, 0, 1], dtype=config.floatX),
numpy.asarray([.9], dtype=config.floatX),
numpy.asarray([-5, .5, 0, 1], dtype=config.floatX) ]
high_vals = [
numpy.asarray([[1.]], dtype=config.floatX),
numpy.asarray([[1.], [1.1], [1.5]], dtype=config.floatX),
numpy.asarray([[1.], [1.1], [1.5]], dtype=config.floatX) ]
val0 = made.f(low_vals[0], high_vals[0])
val1 = made.f(low_vals[1], high_vals[1])
val2 = made.f(low_vals[2], high_vals[2])
numpy_val0 = numpy_rng.uniform(low=low_vals[0], high=high_vals[0])
numpy_val1 = numpy_rng.uniform(low=low_vals[1], high=high_vals[1])
numpy_val2 = numpy_rng.uniform(low=low_vals[2], high=high_vals[2])
assert numpy.allclose(val0, numpy_val0)
assert numpy.allclose(val1, numpy_val1)
assert numpy.allclose(val2, numpy_val2)
def test_uniform_vector(self):
m = Module()
m.random = RandomStreams(utt.fetch_seed())
low = tensor.vector()
high = tensor.vector()
out = m.random.uniform(low=low, high=high)
assert out.ndim == 1
m.f = Method([low, high], out)
# Specifying the size explicitly
m.g = Method([low, high],
m.random.uniform(low=low, high=high, size=(3,)))
made = m.make()
made.random.initialize()
low_val = numpy.asarray([.1, .2, .3], dtype=config.floatX)
high_val = numpy.asarray([1.1, 2.2, 3.3], dtype=config.floatX)
seed_gen = numpy.random.RandomState(utt.fetch_seed())
numpy_rng = numpy.random.RandomState(int(seed_gen.randint(2**30)))
# Arguments of size (3,)
val0 = made.f(low_val, high_val)
numpy_val0 = numpy_rng.uniform(low=low_val, high=high_val)
assert numpy.allclose(val0, numpy_val0)
# arguments of size (2,)
val1 = made.f(low_val[:-1], high_val[:-1])
numpy_val1 = numpy_rng.uniform(low=low_val[:-1], high=high_val[:-1])
assert numpy.allclose(val1, numpy_val1)
# Specifying the size explicitly
numpy_rng = numpy.random.RandomState(int(seed_gen.randint(2**30)))
val2 = made.g(low_val, high_val)
numpy_val2 = numpy_rng.uniform(low=low_val, high=high_val, size=(3,))
assert numpy.allclose(val2, numpy_val2)
self.assertRaises(ValueError, made.g, low_val[:-1], high_val[:-1])
def test_binomial_vector(self):
m = Module()
m.random = RandomStreams(utt.fetch_seed())
n = tensor.lvector()
prob = tensor.vector()
out = m.random.binomial(n=n, p=prob)
assert out.ndim == 1
m.f = Method([n, prob], out)
# Specifying the size explicitly
m.g = Method([n, prob],
m.random.binomial(n=n, p=prob, size=(3,)))
made = m.make()
made.random.initialize()
n_val = [1, 2, 3]
prob_val = numpy.asarray([.1, .2, .3], dtype=config.floatX)
seed_gen = numpy.random.RandomState(utt.fetch_seed())
numpy_rng = numpy.random.RandomState(int(seed_gen.randint(2**30)))
# Arguments of size (3,)
val0 = made.f(n_val, prob_val)
numpy_val0 = numpy_rng.binomial(n=n_val, p=prob_val)
assert numpy.all(val0 == numpy_val0)
# arguments of size (2,)
val1 = made.f(n_val[:-1], prob_val[:-1])
numpy_val1 = numpy_rng.binomial(n=n_val[:-1], p=prob_val[:-1])
assert numpy.all(val1 == numpy_val1)
# Specifying the size explicitly
numpy_rng = numpy.random.RandomState(int(seed_gen.randint(2**30)))
val2 = made.g(n_val, prob_val)
numpy_val2 = numpy_rng.binomial(n=n_val, p=prob_val, size=(3,))
assert numpy.all(val2 == numpy_val2)
self.assertRaises(ValueError, made.g, n_val[:-1], prob_val[:-1])
def test_normal_vector(self):
m = Module()
m.random = RandomStreams(utt.fetch_seed())
avg = tensor.vector()
std = tensor.vector()
out = m.random.normal(avg=avg, std=std)
assert out.ndim == 1
m.f = Method([avg, std], out)
# Specifying the size explicitly
m.g = Method([avg, std],
m.random.normal(avg=avg, std=std, size=(3,)))
made = m.make()
made.random.initialize()
avg_val = [1, 2, 3]
std_val = numpy.asarray([.1, .2, .3], dtype=config.floatX)
seed_gen = numpy.random.RandomState(utt.fetch_seed())
numpy_rng = numpy.random.RandomState(int(seed_gen.randint(2**30)))
# Arguments of size (3,)
val0 = made.f(avg_val, std_val)
numpy_val0 = numpy_rng.normal(loc=avg_val, scale=std_val)
assert numpy.allclose(val0, numpy_val0)
# arguments of size (2,)
val1 = made.f(avg_val[:-1], std_val[:-1])
numpy_val1 = numpy_rng.normal(loc=avg_val[:-1], scale=std_val[:-1])
assert numpy.allclose(val1, numpy_val1)
# Specifying the size explicitly
numpy_rng = numpy.random.RandomState(int(seed_gen.randint(2**30)))
val2 = made.g(avg_val, std_val)
numpy_val2 = numpy_rng.normal(loc=avg_val, scale=std_val, size=(3,))
assert numpy.allclose(val2, numpy_val2)
self.assertRaises(ValueError, made.g, avg_val[:-1], std_val[:-1])
def test_random_integers_vector(self):
m = Module()
m.random = RandomStreams(utt.fetch_seed())
low = tensor.lvector()
high = tensor.lvector()
out = m.random.random_integers(low=low, high=high)
assert out.ndim == 1
m.f = Method([low, high], out)
# Specifying the size explicitly
m.g = Method([low, high],
m.random.random_integers(low=low, high=high, size=(3,)))
made = m.make()
made.random.initialize()
low_val = [100, 200, 300]
high_val = [110, 220, 330]
seed_gen = numpy.random.RandomState(utt.fetch_seed())
numpy_rng = numpy.random.RandomState(int(seed_gen.randint(2**30)))
# Arguments of size (3,)
val0 = made.f(low_val, high_val)
numpy_val0 = numpy.asarray([numpy_rng.random_integers(low=lv, high=hv)
for lv, hv in zip(low_val, high_val)])
assert numpy.all(val0 == numpy_val0)
# arguments of size (2,)
val1 = made.f(low_val[:-1], high_val[:-1])
numpy_val1 = numpy.asarray([numpy_rng.random_integers(low=lv, high=hv)
for lv, hv in zip(low_val[:-1], high_val[:-1])])
assert numpy.all(val1 == numpy_val1)
# Specifying the size explicitly
numpy_rng = numpy.random.RandomState(int(seed_gen.randint(2**30)))
val2 = made.g(low_val, high_val)
numpy_val2 = numpy.asarray([numpy_rng.random_integers(low=lv, high=hv)
for lv, hv in zip(low_val, high_val)])
assert numpy.all(val2 == numpy_val2)
self.assertRaises(ValueError, made.g, low_val[:-1], high_val[:-1])
# Vectorized permutation don't make sense: the only parameter, n,
# controls one dimension of the returned tensor.
def test_multinomial_vector(self):
m = Module()
m.random = RandomStreams(utt.fetch_seed())
n = tensor.lvector()
pvals = tensor.matrix()
out = m.random.multinomial(n=n, pvals=pvals)
assert out.ndim == 2
m.f = Method([n, pvals], out)
# Specifying the size explicitly
m.g = Method([n, pvals],
m.random.multinomial(n=n, pvals=pvals, size=(3,)))
made = m.make()
made.random.initialize()
n_val = [1, 2, 3]
pvals_val = numpy.asarray([[.1, .9], [.2, .8], [.3, .7]],
dtype=config.floatX)
seed_gen = numpy.random.RandomState(utt.fetch_seed())
numpy_rng = numpy.random.RandomState(int(seed_gen.randint(2**30)))
# Arguments of size (3,)
val0 = made.f(n_val, pvals_val)
numpy_val0 = numpy.asarray([numpy_rng.multinomial(n=nv, pvals=pv)
for nv, pv in zip(n_val, pvals_val)])
assert numpy.all(val0 == numpy_val0)
# arguments of size (2,)
val1 = made.f(n_val[:-1], pvals_val[:-1])
numpy_val1 = numpy.asarray([numpy_rng.multinomial(n=nv, pvals=pv)
for nv, pv in zip(n_val[:-1], pvals_val[:-1])])
assert numpy.all(val1 == numpy_val1)
# Specifying the size explicitly
numpy_rng = numpy.random.RandomState(int(seed_gen.randint(2**30)))
val2 = made.g(n_val, pvals_val)
numpy_val2 = numpy.asarray([numpy_rng.multinomial(n=nv, pvals=pv)
for nv, pv in zip(n_val, pvals_val)])
assert numpy.all(val2 == numpy_val2)
self.assertRaises(ValueError, made.g, n_val[:-1], pvals_val[:-1])
def test_dtype(self):
m = Module()
m.random = RandomStreams(utt.fetch_seed())
low = tensor.lscalar()
high = tensor.lscalar()
out = m.random.random_integers(low=low, high=high, size=(20,), dtype='int8')
assert out.dtype == 'int8'
m.f = Method([low, high], out)
made = m.make()
made.random.initialize()
val0 = made.f(0, 9)
assert val0.dtype == 'int8'
val1 = made.f(255, 257)
assert val1.dtype == 'int8'
assert numpy.all(abs(val1) <= 1)
def test_multiple_rng(self):
"""
Test that when we have multiple random number generators, we do not alias
the state_updates member. `state_updates` can be useful when attempting to
copy the (random) state between two similar theano graphs. The test is
meant to detect a previous bug where state_updates was initialized as a
class-attribute, instead of the __init__ function.
"""
rng1 = RandomStreams(1234)
rng2 = RandomStreams(2392)
assert rng1.random_state_variables is not rng2.random_state_variables
if __name__ == '__main__':
from theano.tests import main
main("test_randomstreams")
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论