Source code for haccytrees.mergertrees.fragments

import numpy as np
import numba
from typing import Tuple, Mapping, List


[docs] def split_fragment_tag(tag: int) -> Tuple[int, int]: """Extracting the original fof_tag and fragment index from fragments Parameters ---------- tag the fof_tag of the fragment, has to be negative by definition Returns ------- fof_tag the original fof_tag of the FoF halo the fragment is associated with fragment_idx the index / enumeration of the fragment (0 == main fragment) Notes ----- This function can also be used with numpy arrays directly """ tag = -tag #reverting the - operation first fragment_idx = tag >> 48 fof_tag = tag & ((1<<48) - 1) return fof_tag, fragment_idx
@numba.jit(nopython=True) def _fix_fragment_property_constrev(snapnum, fof_tag, data_in, data_out): nhalos = len(snapnum) lastsnap = snapnum[0] lastval = data_in[0] for i in range(nhalos): _index = 0 _tag = fof_tag[i] if _tag < 0: _tag = -_tag _index = _tag >> 48 _tag = _tag & ((1<<48) - 1) if _index == 0 or snapnum[i] >= lastsnap: lastval = data_in[i] data_out[i] = lastval lastsnap = snapnum[i] @numba.jit(nopython=True) def _fix_fragment_property_linear(snapnum, fof_tag, data_in, data_out, mask_neg): nhalos = len(snapnum) lastsnap = snapnum[0] start_index = 0 for i in range(nhalos): _index = 0 _tag = fof_tag[i] if _tag < 0: _tag = -_tag _index = _tag >> 48 _tag = _tag & ((1<<48) - 1) if snapnum[i] >= lastsnap: # we are in a new branch, discard current fragment (should not be # set anyway!) data_out[i] = data_in[i] start_index = i elif _index == 0: if start_index < i-1: # We just ended a fragment, need to go back and linearly interpolate! nsteps = i-start_index valstart = data_in[start_index] dval = (data_in[i] - valstart)/nsteps dval_mask = min(data_in[i], valstart) masked = mask_neg and dval_mask < 0 for j in range(1, nsteps): data_out[start_index + j] = (valstart + j*dval * (not masked)) + (dval_mask * masked) data_out[i] = data_in[i] start_index = i else: # We are in a non-main fragment, let's wait until we're a "real" object pass
[docs] def fix_fragment_properties( forest: Mapping[str, np.ndarray], keys: List[str], *, inplace: bool = True, suffix: str = '_fragfix', interpolation: str = 'constant_reverse', mask_negative: bool = False ) -> None: """Correct properties of minor fragments The SOD data stored in the merger tree forest files will be wrong for fragmented FoF groups, if the halo is not the major fragment of this group. This function attempts to fix those values by either linearly interpolate or setting the SOD properties to a constant value during the times when a halo is a minor fragment. Parameters ---------- forest the full merger tree forest data keys list of the columns to which the correction should be applied inplace if True, the column will be updated in-place, otherwise, a new array will be created with the original name and suffix appended suffix if not in-place, the new array will be named ``old_column + suffix`` interpolation the type of correction to apply. Currently supported are: - ``"constant_reverse"``: set the properties to the value the halo has when becoming a non-minor-fragment or independent ("reverse" in time) - ``"linear"``: linearly interpolate (in snapshot-space) the property values from before and after being a minor fragment mask_negative if True, will not attempt to do a linear interpolation if either the starting or ending value is negative, e.g. for an invalid concentration parameter. Instead, the property will be set to the negative value during the minor-fragment phase. Examples -------- >>> haccytrees.mergertrees.fix_fragment_properties(forest, ... ['sod_halo_mass', 'sod_halo_radius', 'sod_halo_cdelta'], ... inplace=False, ... suffix='_fragfix_const', ... interpolation='constant_reverse') >>> haccytrees.mergertrees.fix_fragment_properties(forest, ... ['sod_halo_mass', 'sod_halo_radius', 'sod_halo_cdelta'], ... inplace=False, ... suffix='_fragfix_lin', ... interpolation='linear', ... mask_negative=True) """ for k in keys: data = forest[k] if inplace: data_out = data else: data_out = np.empty_like(data) forest[f'{k}{suffix}'] = data_out if interpolation == 'constant_reverse': _fix_fragment_property_constrev(forest['snapnum'], forest['fof_halo_tag'], data, data_out) elif interpolation == 'linear': _fix_fragment_property_linear(forest['snapnum'], forest['fof_halo_tag'], data, data_out, mask_negative) else: raise NotImplementedError(f"unknown interpolation: {interpolation}")