提交 906dba3b authored 作者: James Bergstra's avatar James Bergstra

Merge pull request #935 from mrocklin/mpi-schedule

MPI Ops
...@@ -170,3 +170,8 @@ def sort_schedule_fn(*cmps): ...@@ -170,3 +170,8 @@ def sort_schedule_fn(*cmps):
""" Order nodes in a FunctionGraph """ """ Order nodes in a FunctionGraph """
return sort_apply_nodes(fgraph.inputs, fgraph.outputs, cmps) return sort_apply_nodes(fgraph.inputs, fgraph.outputs, cmps)
return schedule return schedule
def key_to_cmp(key):
def key_cmp(a, b):
return cmp(key(a), key(b))
return key_cmp
import numpy import numpy
from theano import gof from theano import gof
from theano.gof import Constant, Generic, Op from theano.gof import Constant, Generic, Op
from theano.gof.sched import key_to_cmp
from basic import tensor from basic import tensor
import theano
########################## ##########################
# Disk Access # Disk Access
########################## ##########################
...@@ -29,7 +31,7 @@ class LoadFromDisk(Op): ...@@ -29,7 +31,7 @@ class LoadFromDisk(Op):
return (type(self) == type(other) and self._info == other._info) return (type(self) == type(other) and self._info == other._info)
def __hash__(self): def __hash__(self):
return hash(self._info) return hash((type(self),) + self._info)
def make_node(self, path): def make_node(self, path):
if isinstance(path, str): if isinstance(path, str):
...@@ -48,8 +50,7 @@ class LoadFromDisk(Op): ...@@ -48,8 +50,7 @@ class LoadFromDisk(Op):
out[0][0] = result out[0][0] = result
def __str__(self): def __str__(self):
return "Load{dtype:%s, broadcastable:%s, mmep:%s}" % self._info return "Load{dtype: %s, broadcastable: %s, mmep: %s}" % self._info
def load(path, dtype, broadcastable, mmap_mode=None): def load(path, dtype, broadcastable, mmap_mode=None):
""" """
...@@ -77,3 +78,205 @@ def load(path, dtype, broadcastable, mmap_mode=None): ...@@ -77,3 +78,205 @@ def load(path, dtype, broadcastable, mmap_mode=None):
""" """
return LoadFromDisk(dtype, broadcastable, mmap_mode)(path) return LoadFromDisk(dtype, broadcastable, mmap_mode)(path)
##########################
# MPI
##########################
try:
from mpi4py import MPI
except ImportError:
mpi_enabled = False
else:
comm = MPI.COMM_WORLD
mpi_enabled = True
class MPIRecv(Op):
"""
An operation to asynchronously receive an array to a remote host using MPI
See Also
MPIRecv
MPIWait
@note: Non-differentiable.
"""
def __init__(self, source, tag, shape, dtype):
self.source = source
self.tag = tag
self.shape = shape
self.dtype = numpy.dtype(dtype) # turn "float64" into numpy.float64
self.broadcastable = (False,) * len(shape)
self._info = (source, tag, shape, dtype)
def __eq__(self, other):
return (type(self) == type(other) and self._info == other._info)
def __hash__(self):
return hash((type(self),) + self._info)
def make_node(self):
return gof.Apply(self, [], [theano.Variable(Generic()),
tensor(self.dtype,
broadcastable=self.broadcastable)])
def perform(self, node, inp, out):
data = numpy.zeros(self.shape, dtype=self.dtype)
request = comm.Irecv(data, self.source, self.tag)
out[0][0] = request
out[1][0] = data
def __str__(self):
return "MPIRecv{source: %d, tag: %d, shape: %s, dtype: %s}" % self._info
def infer_shape(self, node, shapes):
return [None, self.shape]
def do_constant_folding(self, node):
return False
class MPIRecvWait(Op):
"""
An operation to wait on a previously received array using MPI
See Also
MPIRecv
@note: Non-differentiable.
"""
def __init__(self, tag):
self.tag = tag
def __eq__(self, other):
return type(self) == type(other) and self.tag == other.tag
def __hash__(self):
return hash((type(self), self.tag))
def make_node(self, request, data):
return gof.Apply(self, [request, data],
[tensor(data.dtype,
broadcastable=data.broadcastable)])
def perform(self, node, inp, out):
request = inp[0]
data = inp[1]
request.wait()
out[0][0] = data
def __str__(self):
return "MPIRecvWait"
def infer_shape(self, node, shapes):
return [shapes[1]]
view_map = {0: [1]}
class MPISend(Op):
"""
An operation to asynchronously Send an array to a remote host using MPI
See Also
MPIRecv
MPISendWait
@note: Non-differentiable.
"""
def __init__(self, dest, tag):
self.dest = dest
self.tag = tag
self._info = (dest, tag)
def __eq__(self, other):
return (type(self) == type(other) and self._info == other._info)
def __hash__(self):
return hash((type(self),) + self._info)
def make_node(self, data):
return gof.Apply(self, [data],
[theano.Variable(Generic()), data.type()])
view_map = {1: [0]}
def perform(self, node, inp, out):
data = inp[0]
request = comm.Isend(data, self.dest, self.tag)
out[0][0] = request
out[1][0] = data
def __str__(self):
return "MPISend{dest: %d, tag: %d}" % self._info
class MPISendWait(Op):
"""
An operation to wait on a previously sent array using MPI
See Also:
MPISend
@note: Non-differentiable.
"""
def __init__(self, tag):
self.tag = tag
def __eq__(self, other):
return type(self) == type(other) and self.tag == other.tag
def __hash__(self):
return hash((type(self), self.tag))
def make_node(self, request, data):
return gof.Apply(self, [request, data],
[theano.Variable(Generic())])
def perform(self, node, inp, out):
request = inp[0]
request.wait()
out[0][0] = True
def __str__(self):
return "MPISendWait"
def isend(var, dest, tag):
return MPISend(dest, tag)(var)
def send(var, dest, tag):
return MPISendWait(tag)(*isend(var, dest, tag))
def irecv(shape, dtype, source, tag):
return MPIRecv(source, tag, shape, dtype)()
def recv(shape, dtype, source, tag):
return MPIRecvWait(tag)(*irecv(shape, dtype, source, tag))
# Ordering keys for scheduling
def mpi_send_wait_key(a):
""" Wait as long as possible on Waits, Start Send/Recvs early """
if isinstance(a.op, (MPIRecvWait, MPISendWait)):
return 1
if isinstance(a.op, (MPIRecv, MPISend)):
return -1
return 0
def mpi_tag_key(a):
""" Break MPI ties by using the variable tag - prefer lower tags first """
if isinstance(a.op, (MPISend, MPIRecv, MPIRecvWait, MPISendWait)):
return a.op.tag
else:
return 0
mpi_send_wait_cmp = key_to_cmp(mpi_send_wait_key)
mpi_tag_cmp = key_to_cmp(mpi_tag_key)
mpi_keys = (mpi_send_wait_key, mpi_tag_key)
mpi_cmps = (mpi_send_wait_cmp, mpi_tag_cmp)
# Run using
# mpiexec -np 2 python _test_mpi_roundtrip.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
import theano
from theano.tensor.io import send, recv, mpi_cmps
from theano.gof.sched import sort_schedule_fn
import numpy as np
from sys import stdout, stderr, exit
rank = comm.Get_rank()
size = comm.Get_size()
if size != 2:
stderr.write("mpiexec failed to create a world with two nodes.\n"
"Closing with success message.")
stdout.write("True")
exit(0)
shape = (2, 2)
dtype = 'float32'
scheduler = sort_schedule_fn(*mpi_cmps)
mode = theano.Mode(optimizer=None,
linker=theano.OpWiseCLinker(schedule=scheduler))
if rank == 0:
x = theano.tensor.matrix('x', dtype=dtype)
y = x + 1
send_request = send(y, 1, 11)
z = recv(shape, dtype, 1, 12)
f = theano.function([x], [send_request, z], mode=mode)
xx = np.random.rand(*shape).astype(dtype)
expected = (xx + 1) * 2
_, zz = f(xx)
same = np.linalg.norm(zz - expected) < .001
stdout.write(str(same))
if rank == 1:
y = recv(shape, dtype, 0, 11)
z = y * 2
send_request = send(z, 0, 12)
f = theano.function([], send_request, mode=mode)
f()
from theano.tensor.io import (send, recv, mpi_cmps, MPISend, MPISendWait,
mpi_send_wait_cmp, mpi_tag_cmp, mpi_enabled)
import theano
import subprocess
import os
from theano.gof.sched import sort_schedule_fn
mpi_scheduler = sort_schedule_fn(*mpi_cmps)
mpi_linker = theano.OpWiseCLinker(schedule=mpi_scheduler)
mpi_mode = theano.Mode(linker=mpi_linker)
def test_recv():
x = recv((10,10), 'float64', 0, 11)
assert x.dtype == 'float64'
assert x.broadcastable == (False, False)
recvnode = x.owner.inputs[0].owner
assert recvnode.op.source == 0
assert recvnode.op.tag == 11
def test_send():
x = theano.tensor.matrix('x')
y = send(x, 1, 11)
sendnode = y.owner.inputs[0].owner
assert sendnode.op.dest == 1
assert sendnode.op.tag == 11
def test_can_make_function():
x = recv((5,5), 'float32', 0, 11)
y = x+1
assert theano.function([], [y])
def test_mpi_roundtrip():
# p = subprocess.Popen(executable="mpiexec",
# args = ("-np", "2",
# "python",
# "theano/tensor/tests/_test_mpi_roundtrip.py"),
# stdout=subprocess.PIPE)
# assert p.stdout.read() == "True"
if not mpi_enabled:
return
theano_root = theano.__file__.split('__init__')[0]
sin, sout, serr = os.popen3("mpiexec -np 2 python " + theano_root +
"tensor/tests/_test_mpi_roundtrip.py")
result = sout.read()
assert "True" in result
def test_mpi_send_wait_cmp():
x = theano.tensor.matrix('x')
y = send(x, 1, 11)
z = x + x
waitnode = y.owner
sendnode = y.owner.inputs[0].owner
addnode = z.owner
assert mpi_send_wait_cmp(sendnode, addnode) < 0 # send happens first
assert mpi_send_wait_cmp(waitnode, addnode) > 0 # wait happens last
def test_mpi_tag_ordering():
x = recv((2,2), 'float32', 1, 12)
y = recv((2,2), 'float32', 1, 11)
z = recv((2,2), 'float32', 1, 13)
f = theano.function([], [x,y,z], mode=mpi_mode)
nodes = f.maker.linker.make_all()[-1]
assert all(node.op.tag == tag
for node, tag in zip(nodes, (11, 12, 13, 11, 12, 13)))
def test_mpi_schedule():
x = theano.tensor.matrix('x')
y = send(x, 1, 11)
z = x + x
waitnode = y.owner
sendnode = y.owner.inputs[0].owner
addnode = z.owner
f = theano.function([x], [y, z], mode=mpi_mode)
nodes = f.maker.linker.make_all()[-1]
optypes = [MPISend, theano.tensor.Elemwise, MPISendWait]
assert all(isinstance(node.op, optype)
for node, optype in zip(nodes, optypes))
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论