Cartesian Partitioning

The Partition class will create a volume decomposition using the number of available MPI ranks. After initialization, the instance contains information about the decomposition and the local rank coordinates.

from mpipartition import Partition

# partitioning a box among the available ranks
partition = Partition()

# print how the volume has been partitioned (from rank 0):
if partition.rank == 0:
    print(partition.decomposition)

# print coordinate of all ranks:
print(partition.rank, partition.coordinates)

# print size of this rank (as fraction of unit-cube).
# Note: the extent of each rank will be the same
if partition.rank == 0:
    print(partition.extent)

Cartesian Distribution Algorithms

Processing large datasets on multiple MPI ranks requires to distribute the data among the processes. The mpipartition package contains the following functions that abstract this task in different use-cases:

distribute

Distribute data among MPI ranks according to data position and volume partition

overload

Copy data within an overload length to the neighboring ranks

exchange

Distribute data among neighboring ranks and all2all by a key

Examples

In the following example, we generate 100 randomly positioned points per rank and then distribute them according to the positions

from mpipartition import Partition
from mpipartition import distribute, overload, exchange

# assuming a cube size of 1.
box_size = 1.0

# partitioning a box with the available MPI ranks
# if no argument is specified, the dimension of the volume is 3
partition = Partition()

# number of random particles per rank
n_local = 100

# randomly distributed particles in the unit cube
data = {
    "x": np.random.uniform(0, 1, n_local),
    "y": np.random.uniform(0, 1, n_local),
    "z": np.random.uniform(0, 1, n_local),
    "id": n_local * partition.rank + np.arange(n_local),
    "rank": np.ones(n_local) * partition.rank
}

# assign to rank by position
data_distributed = distribute(partition, box_size, data, ('x', 'y', 'z'))

# make sure we still have all particles
n_local_distributed = len(data_distributed['x'])
n_global_distributed = partition.comm.reduce(n_local_distributed)
if partition.rank == 0:
    assert n_global_distributed == n_local * partition.nranks

# validate that each particle is in local extent
bbox = np.array([
   np.array(partition.origin),
   np.array(partition.origin) + np.array(partition.extent)
]).T
is_valid = np.ones(n_local_distributed, dtype=np.bool_)
for i, x in enumerate('xyz'):
    is_valid &= data_distributed[x] >= bbox[i, 0]
    is_valid &= data_distributed[x] < bbox[i, 1]
assert np.all(is_valid)

Now, we overload the partitions by 0.1

data_overloaded = overload(partition, box_size, data_distributed, 0.1, ('x', 'y', 'z'))

Sometimes, the destination of a particle is given by a key, not by the position (e.g. for a merger-tree, we want the progenitors to be on the same rank as the descendant, even if they cross the rank boundaries). We can then use the exchange function as follows:

# create a list of particle ids that we want to have on the local rank
my_keys = n_local * partition.rank + np.arange(n_local)

# since in our example, particles can be further away than 1 neighboring
# rank, we directly do an all2all exchange:
data_exchanged = exchange(partition, data_distributed, 'id', my_keys, do_all2all=True)

# we should have the same particles as we started with! Let's check
s = np.argsort(data_exchanged['id'])
for k in data_exchanged.keys():
    data_exchanged[k] = data_exchanged[k][s]

n_local_exchanged = len(data_exchanged['x'])
assert n_local_exchanged == n_local
for k in data.keys():
    assert np.all(data[k] == data_exchanged[k])

References

Partition

class mpipartition.Partition(dimensions=3, *, comm=None, create_neighbor_topo=False, commensurate_topo=None)[source]

An MPI partition of a cubic volume

Parameters:
  • dimension (int) – Numer of dimensions of the volume cube. Default: 3

  • create_neighbor_topo (boolean) – If True, an additional graph communicator will be initialized connecting all direct neighbors (3**dimension - 1) symmetrically

  • commensurate_topo (List[int]) – A proportional target topology for decomposition. When specified, a partition will be created so that commensurate_topo[i] % partition.decomposition[i] == 0 for all i. The code will raise a RuntimeError if such a decomposition is not possible.

Examples

Using Partition on 8 MPI ranks to split a periodic unit-cube

>>> partition = Partition(1.0)
>>> partition.rank
0
>>> partition.decomposition
np.ndarray([2, 2, 2])
>>> partition.coordinates
np.ndarray([0, 0, 0])
>>> partition.origin
np.ndarray([0., 0., 0.])
>>> partition.extent
np.ndarray([0.5, 0.5, 0.5])
property comm

3D Cartesian MPI Topology / Communicator

property comm_neighbor

Graph MPI Topology / Communicator, connecting the neighboring ranks (symmetric)

property coordinates

3D indices of this processor

Type:

np.ndarray

property decomposition

number of ranks along each dimension

Type:

np.ndarray

Type:

the decomposition of the cubic volume

property dimensions

Dimension of the partitioned volume

property extent

Length along each axis of this processors subvolume (same for all procs)

Type:

np.ndarray

get_neighbor(di)[source]

get the rank of the neighbor at relative position (dx, dy, dz, …)

Parameters:

di (List[int]) – list of relative coordinates, one of [-1, 0, 1].

Return type:

int

property neighbor_ranks

a flattened list of the unique neighboring ranks

Type:

np.ndarray

property neighbors

a 3^d dimensional array with the ranks of the neighboring processes (neighbors[1,1,1, …] is this processor)

Type:

np.ndarray

property nranks

the total number of processors

Type:

int

property origin: ndarray

Cartesian coordinates of the origin of this processor

Type:

np.ndarray

property rank

the MPI rank of this processor

Type:

int

property ranklist

A complete list of ranks, aranged by their coordinates. The array has shape partition.decomposition

Type:

np.ndarray

distribute

mpipartition.distribute(partition, box_size, data, coord_keys, *, verbose=False, verify_count=True)[source]

Distribute data among MPI ranks according to data position and volume partition

The position of each TreeData element is given by the data columns specified with coord_keys.

Parameters:
  • partition (Partition) – The MPI partition defining which rank should own which subvolume of the data

  • box_size (float) – The size of the full simulation volume

  • data (Mapping[str, ndarray]) – The treenode / coretree data that should be distributed

  • coord_keys (List[str]) – The columns in data that define the position of the object

  • verbose (bool | int) – If True, print summary statistics of the distribute. If > 1, print statistics of each rank (i.e. how much data each rank sends to every other rank).

  • verify_count (bool) – If True, make sure that total number of objects is conserved

Returns:

data – The distributed particle data (i.e. the data that this rank owns)

Return type:

ParticleDataT

overload

mpipartition.overload(partition, box_size, data, overload_length, coord_keys, *, structure_key=None, verbose=False)[source]

Copy data within an overload length to the neighboring ranks

This method assumes that the volume cube is periodic and will wrap the data around the boundary interfaces.

Parameters:
  • partition (Partition) – The MPI partition defining which rank should own which subvolume of the data

  • box_size (float) – the size of the full volume cube

  • data (Mapping[str, ndarray]) – The treenode / coretree data that should be distributed

  • overload_length (float) – The thickness of the boundary layer that will be copied to the neighboring rank. Must be smaller than half the extent of the local subvolume (along any axis)

  • coord_keys (List[str]) – The columns in data that define the position of the object

  • structure_key (str | None) – The column in data containing a structure (“group”) tag. If provided, the data will be overloaded to include entire structures; ie when one object in a structure is overloaded, all other objects in that structure are sent as well. The column data[structure_key] should be of integer type, and any objects not belonging to a structure are assumed to have tag -1.

  • verbose (bool | int) – If True, print summary statistics of the distribute. If > 1, print statistics of each rank (i.e. how much data each rank sends to every other rank).

Returns:

data – The combined data of objects within the rank’s subvolume as well as the objects within the overload region of neighboring ranks

Return type:

TreeDataT

Notes

The function does not change the objects’ coordinates or alter any data. Objects that have been overloaded accross the periodic boundaries will still have the original positions. In case “local” coordinates are required, this will need to be done manually after calling this function.

exchange

mpipartition.exchange(partition, data, key, local_keys, *, verbose=False, filter_key=None, do_all2all=False, replace_notfound_key=None)[source]

Distribute data among neighboring ranks and all2all by a key

This function will assign data to the rank that owns the key. The keys that the local rank owns are given by local_keys, which should be unique. The keys of the data that the local rank currently has is in data[key]. Certain values can be ignored by setting filter_key to that value or by setting filter_key to a (vectorized) function that returns True for keys that should be redistributed and False for keys that should be ignored.

Parameters:
  • partition (Partition)

  • data (dict)

  • key (str)

  • local_keys (ndarray)

  • verbose (bool)

  • filter_key (int | Callable[[ndarray], ndarray] | None)

  • do_all2all (bool)

  • replace_notfound_key (int | None)