提交 97aa44e0 authored 作者: Matthew Rocklin's avatar Matthew Rocklin

add draft of MPI ops

上级 5a214815
......@@ -77,3 +77,158 @@ def load(path, dtype, broadcastable, mmap_mode=None):
"""
return LoadFromDisk(dtype, broadcastable, mmap_mode)(path)
##########################
# MPI
##########################
try:
from mpi4py import MPI
comm = MPI.COMM_WORLD
mpi_enabled = True
except:
mpi_enabled = False
class MPIRecv(Op):
"""
An operation to asynchronously receive an array to a remote host using MPI
See Also
MPIRecv
MPIWait
@note: Non-differentiable.
"""
def __init__(self, rank, tag, dtype, shape):
self.rank = rank
self.tag = tag
self.shape = shape
self.dtype = numpy.dtype(dtype) # turn "float64" into numpy.float64
self.broadcastable = (False,)*len(shape)
self._info = (rank, tag, dtype, shape)
def __eq__(self, other):
return (type(self) == type(other) and self._info == other._info)
def __hash__(self):
return hash(self._info)
def make_node(self):
return gof.Apply(self, [], [theano.Generic(),
tensor(self.dtype,
broadcastable=self.broadcastable)])
def perform(self, node, inp, out):
data = numpy.empty(self.shape, dtype=self.dtype)
request = comm.Irecv(data, self.rank, self.tag)
out[0][0] = request
out[0][1] = data
def __str__(self):
return "MPIRecv{source: %d, tag: %d, dtype:%s, shape:%s, :%s}"%self._info
class MPIRecvWait(Op):
"""
An operation to wait on a previously received array using MPI
See Also
MPIRecv
@note: Non-differentiable.
"""
def __init__(self):
pass
def __eq__(self, other):
return type(self) == type(other)
def __hash__(self):
return hash(self.type)
def make_node(self):
return gof.Apply(self, [theano.Generic(),
tensor(self.dtype,
broadcastable=self.broadcastable)],
[tensor(self.dtype,
broadcastable=self.broadcastable)])
def perform(self, node, inp, out):
request = inp[0][0]
data = inp[0][1]
request.wait()
out[0][0] = data
def __str__(self):
return "MPIRecvWait"
class MPISend(Op):
"""
An operation to asynchronously Send an array to a remote host using MPI
See Also
MPIRecv
MPISendWait
@note: Non-differentiable.
"""
def __init__(self, rank, tag):
self.rank = rank
self.tag = tag
self._info = (rank, tag)
def __eq__(self, other):
return (type(self) == type(other) and self._info == other._info)
def __hash__(self):
return hash(self._info)
def make_node(self):
return gof.Apply(self, [tensor(self.dtype, broadcastable=self.broadcastable)],
[theano.Generic()])
def perform(self, node, inp, out):
data = inp[0][0]
request = comm.Isend(data, self.rank, self.tag)
out[0][0] = request
def __str__(self):
return "MPISend{dest: %d, tag: %d}"%self._info
class MPISendWait(Op):
"""
An operation to wait on a previously sent array using MPI
See Also:
MPISend
@note: Non-differentiable.
"""
def __init__(self):
pass
def __eq__(self, other):
return type(self) == type(other)
def __hash__(self):
return hash(self.type)
def make_node(self):
return gof.Apply(self, [theano.Generic()], [])
def perform(self, node, inp, out):
request = inp[0][0]
request.wait()
def __str__(self):
return "MPISendWait"
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论