Source code for mpipartition.overload

import itertools
from typing import List, Mapping, Union

import numpy as np

from .partition import Partition

ParticleDataT = Mapping[str, np.ndarray]


[docs] def overload( partition: Partition, box_size: float, data: ParticleDataT, overload_length: float, coord_keys: List[str], *, structure_key: str = None, verbose: Union[bool, int] = False, ): """Copy data within an overload length to the neighboring ranks This method assumes that the volume cube is periodic and will wrap the data around the boundary interfaces. Parameters ---------- partition: The MPI partition defining which rank should own which subvolume of the data box_size: the size of the full volume cube data: The treenode / coretree data that should be distributed overload_length: The thickness of the boundary layer that will be copied to the neighboring rank. Must be smaller than half the extent of the local subvolume (along any axis) coord_keys: The columns in `data` that define the position of the object structure_key: The column in `data` containing a structure ("group") tag. If provided, the data will be overloaded to include entire structures; ie when one object in a structure is overloaded, all other objects in that structure are sent as well. The column `data[structure_key]` should be of integer type, and any objects not belonging to a structure are assumed to have tag -1. verbose: If True, print summary statistics of the distribute. If > 1, print statistics of each rank (i.e. how much data each rank sends to every other rank). Returns ------- data: TreeDataT The combined data of objects within the rank's subvolume as well as the objects within the overload region of neighboring ranks Notes ----- The function does not change the objects' coordinates or alter any data. Objects that have been overloaded accross the periodic boundaries will still have the original positions. In case "local" coordinates are required, this will need to be done manually after calling this function. """ assert len(coord_keys) == partition.dimensions for i in range(partition.dimensions): assert partition.decomposition[i] > 1 # currently can't overload if only 1 rank # we only overload particles in one layer of the domain decomposition # so we cannot overload to more than the extent of each partition assert overload_length < partition.extent[i] * box_size nranks = partition.nranks if nranks == 1: return data rank = partition.rank comm = partition.comm dimensions = partition.dimensions origin = box_size * np.array(partition.origin) extent = box_size * np.array(partition.extent) neighbors = partition.neighbors # Find all overload regions each particle should be in overload_left = {} overload_right = {} for i, x in enumerate(coord_keys): _i = np.zeros_like(data[x], dtype=np.int8) _i[data[x] < origin[i] + overload_length] = -1 if structure_key is not None: # find all structures present in objects to be overloaded left all_structs = np.unique(data[structure_key][_i == -1]) all_structs = np.setdiff1d(all_structs, -1) # add objects with these structure flags to the mask all_structs_mask = np.isin(data[structure_key], all_structs) _i[all_structs_mask] = -1 overload_left[i] = _i _i = np.zeros_like(data[x], dtype=np.int8) _i[data[x] > origin[i] + extent[i] - overload_length] = 1 if structure_key is not None: # find all structures present in objects to be overloaded right all_structs = np.unique(data[structure_key][_i == 1]) all_structs = np.unique(all_structs, -1) # add objects with these structure flags to the mask all_structs_mask = np.isin(data[structure_key], all_structs) _i[all_structs_mask] = 1 overload_right[i] = _i # Get particle indices of each of the 27 neighbors overload exchange_indices = [np.empty(0, dtype=np.int64) for i in range(nranks)] def add_exchange_indices(mask, idx): assert len(idx) == dimensions n = neighbors[tuple(_i + 1 for _i in idx)] if n != rank: exchange_indices[n] = np.union1d(exchange_indices[n], np.nonzero(mask)[0]) corners = itertools.product([0, -1, 1], repeat=partition.dimensions) # skip first: will be [0,0,0] next(corners) for corner in corners: mask = np.ones_like(overload_left[0], dtype=np.bool_) for d in range(partition.dimensions): if corner[d] == 0: continue mask &= (overload_left[d] == corner[d]) | (overload_right[d] == corner[d]) add_exchange_indices(mask, corner) # Check how many elements will be sent send_counts = np.array([len(i) for i in exchange_indices], dtype=np.int32) send_idx = np.concatenate(exchange_indices) send_displacements = np.insert(np.cumsum(send_counts)[:-1], 0, 0) total_to_send = np.sum(send_counts) # Check how many elements will be received recv_counts = np.empty_like(send_counts) comm.Alltoall(send_counts, recv_counts) recv_displacements = np.insert(np.cumsum(recv_counts)[:-1], 0, 0) total_to_receive = np.sum(recv_counts) # debug message if verbose > 1: for i in range(nranks): if rank == i: print(f"Overload Debug Rank {i}") print(f" - rank sends {total_to_send:10d} particles") print(f" - rank receives {total_to_receive:10d} particles") print(f" - send_counts: {send_counts}") print(f" - send_displacements: {send_displacements}") print(f" - recv_counts: {recv_counts}") print(f" - recv_displacements: {recv_displacements}") for i, x in enumerate(coord_keys): print(f" - overload_left_{x}: {overload_left[i]}") print(f" - overload_right_{x}: {overload_right[i]}") print(f" - send_idx: {send_idx}") print("", flush=True) comm.Barrier() # send data all-to-all, each array individually data_new = {} for k in data.keys(): # prepare send-array ds = data[k][send_idx] # prepare recv-array dr = np.empty(total_to_receive, dtype=ds.dtype) # exchange data s_msg = [ds, (send_counts, send_displacements), ds.dtype.char] r_msg = [dr, (recv_counts, recv_displacements), ds.dtype.char] comm.Alltoallv(s_msg, r_msg) # add received data to original data data_new[k] = np.concatenate((data[k], dr)) return data_new