提交 26b2c25d authored 作者: Maxim Kochurov's avatar Maxim Kochurov 提交者: Maxim Kochurov

remove MPI

上级 f6ab37b5
......@@ -12,11 +12,6 @@ File operation
- Load from disk with the function :func:`load <pytensor.tensor.io.load>` and its associated op :class:`LoadFromDisk <pytensor.tensor.io.LoadFromDisk>`
MPI operation
=============
- Non-blocking transfer: :func:`isend <pytensor.tensor.io.isend>` and :func:`irecv <pytensor.tensor.io.irecv>`.
- Blocking transfer: :func:`send <pytensor.tensor.io.send>` and :func:`recv <pytensor.tensor.io.recv>`
Details
=======
......
import numpy as np
from pytensor.graph.basic import Apply, Constant, Variable
from pytensor.graph.basic import Apply, Constant
from pytensor.graph.op import Op
from pytensor.link.c.type import Generic
from pytensor.tensor.type import tensor
from pytensor.utils import key_to_cmp
class LoadFromDisk(Op):
......@@ -92,229 +91,4 @@ def load(path, dtype, shape, mmap_mode=None):
return LoadFromDisk(dtype, shape, mmap_mode)(path)
##########################
# MPI
##########################
try:
from mpi4py import MPI
except ImportError:
mpi_enabled = False
else:
comm = MPI.COMM_WORLD
mpi_enabled = True
class MPIRecv(Op):
"""
An operation to asynchronously receive an array to a remote host using MPI.
See Also
--------
MPIRecv
MPIWait
Notes
-----
Non-differentiable.
"""
__props__ = ("source", "tag", "shape", "dtype")
def __init__(self, source, tag, shape, dtype):
self.source = source
self.tag = tag
self.shape = shape
self.dtype = np.dtype(dtype) # turn "float64" into numpy.float64
self.static_shape = (None,) * len(shape)
def make_node(self):
return Apply(
self,
[],
[
Variable(Generic(), None),
tensor(dtype=self.dtype, shape=self.static_shape),
],
)
def perform(self, node, inp, out):
data = np.zeros(self.shape, dtype=self.dtype)
request = comm.Irecv(data, self.source, self.tag)
out[0][0] = request
out[1][0] = data
def __str__(self):
return f"MPIRecv{{source: {int(self.source)}, tag: {int(self.tag)}, shape: {self.shape}, dtype: {self.dtype}}}"
def infer_shape(self, fgraph, node, shapes):
return [None, self.shape]
def do_constant_folding(self, fgraph, node):
return False
class MPIRecvWait(Op):
"""
An operation to wait on a previously received array using MPI.
See Also
--------
MPIRecv
Notes
-----
Non-differentiable.
"""
__props__ = ("tag",)
def __init__(self, tag):
self.tag = tag
def make_node(self, request, data):
return Apply(
self,
[request, data],
[tensor(dtype=data.dtype, shape=data.type.shape)],
)
def perform(self, node, inp, out):
request = inp[0]
data = inp[1]
request.wait()
out[0][0] = data
def infer_shape(self, fgraph, node, shapes):
return [shapes[1]]
view_map = {0: [1]}
class MPISend(Op):
"""
An operation to asynchronously Send an array to a remote host using MPI.
See Also
--------
MPIRecv
MPISendWait
Notes
-----
Non-differentiable.
"""
__props__ = ("dest", "tag")
def __init__(self, dest, tag):
self.dest = dest
self.tag = tag
def make_node(self, data):
return Apply(self, [data], [Variable(Generic(), None), data.type()])
view_map = {1: [0]}
def perform(self, node, inp, out):
data = inp[0]
request = comm.Isend(data, self.dest, self.tag)
out[0][0] = request
out[1][0] = data
def __str__(self):
return f"MPISend{{dest: {int(self.dest)}, tag: {int(self.tag)}}}"
class MPISendWait(Op):
"""
An operation to wait on a previously sent array using MPI.
See Also
--------
MPISend
Notes
-----
Non-differentiable.
"""
__props__ = ("tag",)
def __init__(self, tag):
self.tag = tag
def make_node(self, request, data):
return Apply(self, [request, data], [Variable(Generic(), None)])
def perform(self, node, inp, out):
request = inp[0]
request.wait()
out[0][0] = True
def isend(var, dest, tag):
"""
Non blocking send.
"""
return MPISend(dest, tag)(var)
def send(var, dest, tag):
"""
Blocking send.
"""
return MPISendWait(tag)(*isend(var, dest, tag))
def irecv(shape, dtype, source, tag):
"""
Non-blocking receive.
"""
return MPIRecv(source, tag, shape, dtype)()
def recv(shape, dtype, source, tag):
"""
Blocking receive.
"""
return MPIRecvWait(tag)(*irecv(shape, dtype, source, tag))
# Ordering keys for scheduling
def mpi_send_wait_key(a):
"""Wait as long as possible on Waits, Start Send/Recvs early."""
if isinstance(a.op, (MPIRecvWait, MPISendWait)):
return 1
if isinstance(a.op, (MPIRecv, MPISend)):
return -1
return 0
def mpi_tag_key(a):
"""Break MPI ties by using the variable tag - prefer lower tags first."""
if isinstance(a.op, (MPISend, MPIRecv, MPIRecvWait, MPISendWait)):
return a.op.tag
else:
return 0
mpi_send_wait_cmp = key_to_cmp(mpi_send_wait_key)
mpi_tag_cmp = key_to_cmp(mpi_tag_key)
mpi_keys = (mpi_send_wait_key, mpi_tag_key)
mpi_cmps = (mpi_send_wait_cmp, mpi_tag_cmp)
__all__ = ["load"]
......@@ -4,15 +4,13 @@ from typing import Callable
import numpy as np
import pytensor
from pytensor.compile.mode import Mode
from pytensor.graph import fg
from pytensor.graph.basic import Apply, Constant, Variable, clone
from pytensor.graph.op import Op
from pytensor.graph.type import Type
from pytensor.link.basic import Container, Linker, PerformLinker, WrapLinker
from pytensor.link.c.basic import OpWiseCLinker
from pytensor.tensor.type import matrix, scalar
from pytensor.utils import cmp, to_return_values
from pytensor.tensor.type import scalar
from pytensor.utils import to_return_values
def make_function(linker: Linker, unpack_single: bool = True, **kwargs) -> Callable:
......@@ -219,26 +217,6 @@ class TestWrapLinker:
assert o[0].data == 1.5
def test_sort_schedule_fn():
from pytensor.graph.sched import make_depends, sort_schedule_fn
x = matrix("x")
y = pytensor.tensor.dot(x[:5] * 2, x.T + 1).T
def str_cmp(a, b):
return cmp(str(a), str(b)) # lexicographical sort
linker = OpWiseCLinker(schedule=sort_schedule_fn(str_cmp))
mode = Mode(linker=linker)
f = pytensor.function((x,), (y,), mode=mode)
nodes = f.maker.linker.make_all()[-1]
depends = make_depends()
for a, b in zip(nodes[:-1], nodes[1:]):
if not depends((b, a)):
assert str(a) < str(b)
def test_container_deepcopy():
# This is a test to a work around a NumPy bug.
......
# Run using
# mpiexec -np 2 python _test_mpi_roundtrip.py
from sys import exit, stderr, stdout
import numpy as np
from mpi4py import MPI
import pytensor
from pytensor.configdefaults import config
from pytensor.graph.sched import sort_schedule_fn
from pytensor.tensor.io import mpi_cmps, recv, send
from pytensor.tensor.type import matrix
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
if size != 2:
stderr.write(
"mpiexec failed to create a world with two nodes.\n"
"Closing with success message."
)
stdout.write("True")
exit(0)
shape = (2, 2)
dtype = "float32"
scheduler = sort_schedule_fn(*mpi_cmps)
mode = pytensor.compile.mode.Mode(
optimizer=None, linker=pytensor.link.c.basic.OpWiseCLinker(schedule=scheduler)
)
with config.change_flags(compute_test_value="off"):
if rank == 0:
x = matrix("x", dtype=dtype)
y = x + 1
send_request = send(y, 1, 11)
z = recv(shape, dtype, 1, 12)
f = pytensor.function([x], [send_request, z], mode=mode)
xx = np.random.random(shape).astype(dtype)
expected = (xx + 1) * 2
_, zz = f(xx)
same = np.linalg.norm(zz - expected) < 0.001
# The parent test will look for "True" in the output
stdout.write(str(same))
if rank == 1:
y = recv(shape, dtype, 0, 11)
z = y * 2
send_request = send(z, 0, 12)
f = pytensor.function([], send_request, mode=mode)
f()
import os
import subprocess
import pytest
import pytensor
from pytensor.compile.mode import Mode
from pytensor.configdefaults import config
from pytensor.graph.sched import sort_schedule_fn
from pytensor.link.c.basic import OpWiseCLinker
from pytensor.tensor.io import (
MPISend,
MPISendWait,
mpi_cmps,
mpi_enabled,
mpi_send_wait_cmp,
recv,
send,
)
from pytensor.tensor.type import matrix
mpi_scheduler = sort_schedule_fn(*mpi_cmps)
mpi_linker = OpWiseCLinker(schedule=mpi_scheduler)
mpi_mode = Mode(linker=mpi_linker)
@config.change_flags(compute_test_value="off")
def test_recv():
x = recv((10, 10), "float64", 0, 11)
assert x.dtype == "float64"
assert x.broadcastable == (False, False)
recvnode = x.owner.inputs[0].owner
assert recvnode.op.source == 0
assert recvnode.op.tag == 11
def test_send():
x = matrix("x")
y = send(x, 1, 11)
sendnode = y.owner.inputs[0].owner
assert sendnode.op.dest == 1
assert sendnode.op.tag == 11
@config.change_flags(compute_test_value="off")
def test_can_make_function():
x = recv((5, 5), "float32", 0, 11)
y = x + 1
assert pytensor.function([], [y])
@pytest.mark.skipif(not mpi_enabled, reason="MPI not enabled")
def test_mpi_roundtrip():
pytensor_root = pytensor.__file__.split("__init__")[0]
env = os.environ.copy()
flags = env.get("PYTENSOR_FLAGS", "")
keep_flags = ",".join(
f for f in flags.split(",") if not f.startswith("init_gpu_device")
)
env["PYTENSOR_FLAGS"] = keep_flags
p = subprocess.Popen(
"mpiexec -np 2 python " + pytensor_root + "tensor/tests/_test_mpi_roundtrip.py",
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
close_fds=True,
env=env,
)
(stdout, stderr) = p.communicate()
result = stdout.decode()
assert "True" in result, stderr.decode()
def test_mpi_send_wait_cmp():
x = matrix("x")
y = send(x, 1, 11)
z = x + x
waitnode = y.owner
sendnode = y.owner.inputs[0].owner
addnode = z.owner
assert mpi_send_wait_cmp(sendnode, addnode) < 0 # send happens first
assert mpi_send_wait_cmp(waitnode, addnode) > 0 # wait happens last
@config.change_flags(compute_test_value="off")
def test_mpi_tag_ordering():
x = recv((2, 2), "float32", 1, 12)
y = recv((2, 2), "float32", 1, 11)
z = recv((2, 2), "float32", 1, 13)
f = pytensor.function([], [x, y, z], mode=mpi_mode)
nodes = f.maker.linker.make_all()[-1]
assert all(node.op.tag == tag for node, tag in zip(nodes, (11, 12, 13, 11, 12, 13)))
def test_mpi_schedule():
x = matrix("x")
y = send(x, 1, 11)
z = x + x
f = pytensor.function([x], [y, z], mode=mpi_mode)
nodes = f.maker.linker.make_all()[-1]
optypes = [MPISend, pytensor.tensor.elemwise.Elemwise, MPISendWait]
assert all(isinstance(node.op, optype) for node, optype in zip(nodes, optypes))
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论