import numpy as np
import pygio
import os
import glob
from typing import Mapping, Sequence
from mpipartition import Partition
from .timer import Timer
[docs]
class GenericIOStore:
"""A temporary storage that uses GenericIO to save data
The data has to be a Structure-of-Arrays (SoA) (i.e. a python dictionary
`str`->`np.ndarray`), with each array having the same length.
This class behaves like a python dictionary of `SoA`s. If `temporary_path`
is set, the arrays associated to a key will be stored in GenericIO files,
otherwise, they will be kept in memory.
Parameters
----------
partition : :class:`mpipartition.Partition`
A Partition instance defining the MPI layout
box_size : float
the physical size of the volume
temporary_path : str
The base filesystem path where temporary data is stored. If `None`, the
data will be kept in memory.
Examples
--------
>>> # Creating a partition
>>> partition = mpipartition.Partition()
>>> # Creating a store
>>> store = GenericIOStore(partition, 1.0, './tmp')
>>> data = {x: np.random.uniform(10) for x in 'xyz'}
>>> store['pos_0'] = data
>>> del data
>>> # Do some memory-expensive stuff until you need the data again...
>>> data_0 = store['pos_0']
>>> # Cleanup
>>> store.remove('pos_0')
"""
def __init__(
self, partition: Partition, box_size: float, temporary_path: str = None
):
self._temporary_path = temporary_path
self._data = {}
self._partition = partition
self.box_size = box_size
[docs]
def __setitem__(self, key: str, data: Mapping[str, np.ndarray]) -> None:
"""Adding an SoA to the storage
Parameters
----------
key
The storage key, will be appended to the `temporary_path` and
therefore has to be a valid if used in a filesystem path.
data
The SoA to be added to the store. A dictionary with types
`{str: np.ndarray}`, where the numpy array have to have the same shape
and `dim=1`.
"""
if self._temporary_path is None:
self._data[key] = data
else:
with Timer("temp storage: writing (GIO)", None):
pygio.write_genericio(
f"{self._temporary_path}_{key}.tmp.gio",
data,
phys_origin=[0.0, 0.0, 0.0],
phys_scale=[self.box_size] * 3,
)
[docs]
def __getitem__(self, key: str) -> Mapping[str, np.ndarray]:
"""Retrieve a Structure-of-Arrays from the store
Parameters
----------
key
The storage key of the SoA
Returns
-------
Mapping[str, np.ndarray]
The SoA associated with the key. A python dictionary of type `{str: np.ndarray}`
"""
if self._temporary_path is None:
return self._data[key]
else:
with Timer("temp storage: reading (GIO)", None):
return pygio.read_genericio(
f"{self._temporary_path}_{key}.tmp.gio",
redistribute=pygio.PyGenericIO.MismatchBehavior.MismatchDisallowed,
)
[docs]
def get_field(self, key: str, field: Sequence[str]) -> np.ndarray:
"""Retrieve specific arrays in a Structure-of-Arrays from the store
Parameters
----------
key
The storage key of the SoA
field
The keys of the specific arrays that are to be returned. Can be a `str`
or a `list` of `str`.
Returns
-------
Mapping[str, np.ndarray]
The SoA associated with the key, with only the fields specified.
A python dictionary of type `{str: np.ndarray}`.
"""
if not isinstance(field, list):
field = [field]
if self._temporary_path is None:
return {f: self._data[key][f] for f in field}
else:
with Timer("temp storage: reading (GIO)", None):
return pygio.read_genericio(
f"{self._temporary_path}_{key}.tmp.gio",
field,
redistribute=pygio.PyGenericIO.MismatchBehavior.MismatchDisallowed,
)
[docs]
def remove(self, key: str) -> None:
"""Delete a stored SoA (from memory or disk)
Parameters
----------
key
The storage key of the SoA
"""
if self._temporary_path is None:
self._data.pop(key)
else:
self._partition.comm.Barrier()
with Timer("temp storage: cleanup (GIO)", None):
if self._partition.rank == 0:
for f in glob.glob(f"{self._temporary_path}_{key}.tmp.gio*"):
os.remove(f)
self._partition.comm.Barrier()
[docs]
def pop(self, key: str) -> Mapping[str, np.ndarray]:
"""Retrieve a Structure-of-Arrays from the store and remove the SoA
Parameters
----------
key
The storage key of the SoA
Returns
-------
Mapping[str, np.ndarray]
The SoA associated with the key. A python dictionary of type `{str: np.ndarray}`
"""
if self._temporary_path is None:
return self._data.pop(key)
else:
d = self[key]
self.remove(key)
return d
[docs]
class NumpyStore:
"""A temporary storage that uses numpy.savez to save data
The data has to be a dictionary of arrays (i.e. `str`->`np.ndarray`),
arrays can have variable lengths and dimensions. If `temporary_path`
is set, the arrays associated to a key will be stored in `.npz` files (one
per MPI rank), otherwise, they will be kept in memory.
:param partition: A Partition instance defining the MPI layout
:type partition: :class:`mpipartition.Partition`
:param temporary_path: The base filesystem path where temporary data is
stored. If `None`, the data will be kept in memory.
:type temporary_path: `Optional[str]`
Examples
--------
>>> # Creating a partition
>>> partition = mpipartition.Partition()
>>> # Creating a store
>>> store = NumpyStore(partition, './tmp')
>>> data = {x: np.random.uniform(10) for x in 'xyz'}
>>> store['pos_0'] = data
>>> # Do some memory-expensive stuff until you need the data again...
>>> data_0 = store['pos_0']
>>> # Cleanup
>>> store.remove('pos_0')
"""
def __init__(self, partition: Partition, temporary_path: str = None):
self._temporary_path = temporary_path
self._data = {}
self._partition = partition
self._filename_fct = lambda key: (
f"{self._temporary_path}_{key}.tmp.rank-{self._partition.rank}.npz"
)
[docs]
def __setitem__(self, key: str, data: Mapping[str, np.ndarray]) -> None:
"""Adding a dictionary of arrays to the storage
Parameters
----------
key : str
The storage key, will be appended to the `temporary_path` and
therefore has to be a valid if used in a filesystem path.
data : Mapping[str, np.ndarray]
The data to be added to the store. A dictionary with types
`{str: np.ndarray}`, where the numpy array can have variable shape
and dimensions.
"""
if self._temporary_path is None:
self._data[key] = data
else:
with Timer("temp storage: writing (NPY)", None):
np.savez(self._filename_fct(key), **data)
[docs]
def __getitem__(self, key: str) -> Mapping[str, np.ndarray]:
"""Retrieve a dictionary of arrays from the store
Parameters
----------
key
The storage key
Returns
-------
Mapping[str, np.ndarray]
The data associated with the key. A python dictionary of type `{str: np.ndarray}`
"""
if self._temporary_path is None:
return self._data[key]
else:
with Timer("temp storage: reading (NPY)", None):
return np.load(self._filename_fct(key))
[docs]
def remove(self, key: str) -> None:
"""Delete stored data associated with key (from memory or disk)
Parameters
----------
key
The storage key of the data
"""
if self._temporary_path is None:
self._data.pop(key)
else:
with Timer("temp storage: cleanup (NPY)", None):
os.remove(self._filename_fct(key))
[docs]
def pop(self, key: str) -> Mapping[str, np.ndarray]:
"""Retrieve stored data and remove from storage (memory or disk)
Parameters
----------
key
The storage key of the data
Returns
-------
Mapping[str, np.ndarray]
The data associated with the key. A python dictionary of type `{str: np.ndarray}`
"""
if self._temporary_path is None:
return self._data.pop(key)
else:
d = self[key]
self.remove(key)
return d