Cartesian Partitioning
The Partition
class will create a volume decomposition using the
number of available MPI ranks. After initialization, the instance contains
information about the decomposition and the local rank coordinates.
from mpipartition import Partition
# partitioning a box among the available ranks
partition = Partition()
# print how the volume has been partitioned (from rank 0):
if partition.rank == 0:
print(partition.decomposition)
# print coordinate of all ranks:
print(partition.rank, partition.coordinates)
# print size of this rank (as fraction of unit-cube).
# Note: the extent of each rank will be the same
if partition.rank == 0:
print(partition.extent)
Cartesian Distribution Algorithms
Processing large datasets on multiple MPI ranks requires to distribute the data
among the processes. The mpipartition
package contains the following
functions that abstract this task in different use-cases:
Distribute data among MPI ranks according to data position and volume partition |
|
Copy data within an overload length to the neighboring ranks |
|
Distribute data among neighboring ranks and all2all by a key |
Examples
In the following example, we generate 100 randomly positioned points per rank and then distribute them according to the positions
from mpipartition import Partition
from mpipartition import distribute, overload, exchange
# assuming a cube size of 1.
box_size = 1.0
# partitioning a box with the available MPI ranks
# if no argument is specified, the dimension of the volume is 3
partition = Partition()
# number of random particles per rank
n_local = 100
# randomly distributed particles in the unit cube
data = {
"x": np.random.uniform(0, 1, n_local),
"y": np.random.uniform(0, 1, n_local),
"z": np.random.uniform(0, 1, n_local),
"id": n_local * partition.rank + np.arange(n_local),
"rank": np.ones(n_local) * partition.rank
}
# assign to rank by position
data_distributed = distribute(partition, box_size, data, ('x', 'y', 'z'))
# make sure we still have all particles
n_local_distributed = len(data_distributed['x'])
n_global_distributed = partition.comm.reduce(n_local_distributed)
if partition.rank == 0:
assert n_global_distributed == n_local * partition.nranks
# validate that each particle is in local extent
bbox = np.array([
np.array(partition.origin),
np.array(partition.origin) + np.array(partition.extent)
]).T
is_valid = np.ones(n_local_distributed, dtype=np.bool_)
for i, x in enumerate('xyz'):
is_valid &= data_distributed[x] >= bbox[i, 0]
is_valid &= data_distributed[x] < bbox[i, 1]
assert np.all(is_valid)
Now, we overload the partitions by 0.1
data_overloaded = overload(partition, box_size, data_distributed, 0.1, ('x', 'y', 'z'))
Sometimes, the destination of a particle is given by a key, not by the position
(e.g. for a merger-tree, we want the progenitors to be on the same rank as the
descendant, even if they cross the rank boundaries). We can then use the
exchange
function as follows:
# create a list of particle ids that we want to have on the local rank
my_keys = n_local * partition.rank + np.arange(n_local)
# since in our example, particles can be further away than 1 neighboring
# rank, we directly do an all2all exchange:
data_exchanged = exchange(partition, data_distributed, 'id', my_keys, do_all2all=True)
# we should have the same particles as we started with! Let's check
s = np.argsort(data_exchanged['id'])
for k in data_exchanged.keys():
data_exchanged[k] = data_exchanged[k][s]
n_local_exchanged = len(data_exchanged['x'])
assert n_local_exchanged == n_local
for k in data.keys():
assert np.all(data[k] == data_exchanged[k])
References
Partition
- class mpipartition.Partition(dimensions=3, *, comm=None, create_neighbor_topo=False, commensurate_topo=None)[source]
An MPI partition of a cubic volume
- Parameters:
dimension (int) – Numer of dimensions of the volume cube. Default: 3
create_neighbor_topo (boolean) – If True, an additional graph communicator will be initialized connecting all direct neighbors (3**dimension - 1) symmetrically
commensurate_topo (List[int]) – A proportional target topology for decomposition. When specified, a partition will be created so that commensurate_topo[i] % partition.decomposition[i] == 0 for all i. The code will raise a RuntimeError if such a decomposition is not possible.
Examples
Using Partition on 8 MPI ranks to split a periodic unit-cube
>>> partition = Partition(1.0) >>> partition.rank 0 >>> partition.decomposition np.ndarray([2, 2, 2]) >>> partition.coordinates np.ndarray([0, 0, 0]) >>> partition.origin np.ndarray([0., 0., 0.]) >>> partition.extent np.ndarray([0.5, 0.5, 0.5])
- property comm
3D Cartesian MPI Topology / Communicator
- property comm_neighbor
Graph MPI Topology / Communicator, connecting the neighboring ranks (symmetric)
- property coordinates
3D indices of this processor
- Type:
np.ndarray
- property decomposition
number of ranks along each dimension
- Type:
np.ndarray
- Type:
the decomposition of the cubic volume
- property dimensions
Dimension of the partitioned volume
- property extent
Length along each axis of this processors subvolume (same for all procs)
- Type:
np.ndarray
- get_neighbor(di)[source]
get the rank of the neighbor at relative position (dx, dy, dz, …)
- Parameters:
di (List[int]) – list of relative coordinates, one of [-1, 0, 1].
- Return type:
int
- property neighbor_ranks
a flattened list of the unique neighboring ranks
- Type:
np.ndarray
- property neighbors
a 3^d dimensional array with the ranks of the neighboring processes (neighbors[1,1,1, …] is this processor)
- Type:
np.ndarray
- property nranks
the total number of processors
- Type:
int
- property origin: ndarray
Cartesian coordinates of the origin of this processor
- Type:
np.ndarray
- property rank
the MPI rank of this processor
- Type:
int
- property ranklist
A complete list of ranks, aranged by their coordinates. The array has shape partition.decomposition
- Type:
np.ndarray
distribute
- mpipartition.distribute(partition, box_size, data, coord_keys, *, verbose=False, verify_count=True)[source]
Distribute data among MPI ranks according to data position and volume partition
The position of each TreeData element is given by the data columns specified with coord_keys.
- Parameters:
partition (Partition) – The MPI partition defining which rank should own which subvolume of the data
box_size (float) – The size of the full simulation volume
data (Mapping[str, ndarray]) – The treenode / coretree data that should be distributed
coord_keys (List[str]) – The columns in data that define the position of the object
verbose (bool | int) – If True, print summary statistics of the distribute. If > 1, print statistics of each rank (i.e. how much data each rank sends to every other rank).
verify_count (bool) – If True, make sure that total number of objects is conserved
- Returns:
data – The distributed particle data (i.e. the data that this rank owns)
- Return type:
ParticleDataT
overload
- mpipartition.overload(partition, box_size, data, overload_length, coord_keys, *, structure_key=None, verbose=False)[source]
Copy data within an overload length to the neighboring ranks
This method assumes that the volume cube is periodic and will wrap the data around the boundary interfaces.
- Parameters:
partition (Partition) – The MPI partition defining which rank should own which subvolume of the data
box_size (float) – the size of the full volume cube
data (Mapping[str, ndarray]) – The treenode / coretree data that should be distributed
overload_length (float) – The thickness of the boundary layer that will be copied to the neighboring rank. Must be smaller than half the extent of the local subvolume (along any axis)
coord_keys (List[str]) – The columns in data that define the position of the object
structure_key (str | None) – The column in data containing a structure (“group”) tag. If provided, the data will be overloaded to include entire structures; ie when one object in a structure is overloaded, all other objects in that structure are sent as well. The column data[structure_key] should be of integer type, and any objects not belonging to a structure are assumed to have tag -1.
verbose (bool | int) – If True, print summary statistics of the distribute. If > 1, print statistics of each rank (i.e. how much data each rank sends to every other rank).
- Returns:
data – The combined data of objects within the rank’s subvolume as well as the objects within the overload region of neighboring ranks
- Return type:
TreeDataT
Notes
The function does not change the objects’ coordinates or alter any data. Objects that have been overloaded accross the periodic boundaries will still have the original positions. In case “local” coordinates are required, this will need to be done manually after calling this function.
exchange
- mpipartition.exchange(partition, data, key, local_keys, *, verbose=False, filter_key=None, do_all2all=False, replace_notfound_key=None)[source]
Distribute data among neighboring ranks and all2all by a key
This function will assign data to the rank that owns the key. The keys that the local rank owns are given by
local_keys
, which should be unique. The keys of the data that the local rank currently has is indata[key]
. Certain values can be ignored by setting filter_key to that value or by setting filter_key to a (vectorized) function that returnsTrue
for keys that should be redistributed andFalse
for keys that should be ignored.- Parameters:
partition (Partition)
data (dict)
key (str)
local_keys (ndarray)
verbose (bool)
filter_key (int | Callable[[ndarray], ndarray] | None)
do_all2all (bool)
replace_notfound_key (int | None)