Source code for smpy.utils

"""Utility functions for SMPy mass mapping operations.

This module provides core utility functions for data loading, peak detection,
coordinate transformations, and statistical operations used throughout the
SMPy package.
"""

import numpy as np
import pandas as pd
import random
import secrets

from astropy.table import Table
from astropy.io import fits

[docs] def load_shear_data(shear_cat_path, coord1_col, coord2_col, g1_col, g2_col, weight_col=None, hdu=0): """Load shear catalog from FITS file. Read shear catalog data from a FITS file and return as a pandas DataFrame with standardized column names for use in mass mapping operations. Parameters ---------- shear_cat_path : `str` Path to FITS catalog file. coord1_col : `str` Name of first coordinate column (e.g., 'ra' or 'x'). coord2_col : `str` Name of second coordinate column (e.g., 'dec' or 'y'). g1_col : `str` Name of g1 shear component column. g2_col : `str` Name of g2 shear component column. weight_col : `str`, optional Name of weight column. If None, unit weights are assigned. hdu : `int`, optional HDU number to read from FITS file. Returns ------- shear_data : `pandas.DataFrame` DataFrame with standardized column names: 'coord1', 'coord2', 'g1', 'g2', 'weight'. Raises ------ IndexError If specified HDU is not found in the FITS file. KeyError If required columns are not found in the catalog. """ # Read data from FITS file try: shear_catalog = Table.read(shear_cat_path, hdu=hdu) except IndexError: raise IndexError(f"HDU {hdu} not found in {shear_cat_path}") # Convert to pandas DataFrame with generic column names try: shear_df = pd.DataFrame({ 'coord1': shear_catalog[coord1_col], 'coord2': shear_catalog[coord2_col], 'g1': shear_catalog[g1_col], 'g2': shear_catalog[g2_col], }) except KeyError as e: raise KeyError(f"Column {e} not found in HDU {hdu}. Available columns: {shear_catalog.colnames}") # Add weights (unit weights if not specified) if weight_col is None: shear_df['weight'] = np.ones(len(shear_df)) else: shear_df['weight'] = shear_catalog[weight_col] return shear_df
[docs] def find_peaks2d(image, threshold=None, verbose=False, true_boundaries=None, scaled_boundaries=None): """Identify peaks in a 2D array above a specified threshold. Find local maxima in a 2D image where each peak pixel has a value greater than all 8 neighboring pixels. Refactored from cosmostat/lenspack. Parameters ---------- image : `numpy.ndarray` 2D input map for peak detection. threshold : `float`, optional Detection threshold. If None, uses minimum image value. verbose : `bool`, optional Whether to print peak information. true_boundaries : `dict`, optional True coordinate boundaries for position conversion. scaled_boundaries : `dict`, optional Scaled coordinate boundaries for position conversion. Returns ------- X : `numpy.ndarray` Peak pixel indices in x-direction. Y : `numpy.ndarray` Peak pixel indices in y-direction. heights : `numpy.ndarray` Peak values sorted in descending order. coords : `list` or None Peak coordinates in true coordinate system if boundaries provided, otherwise None. """ image = np.atleast_2d(image) threshold = threshold if threshold is not None else image.min() # Pad the image for border peak checks padded_image = np.pad(image, pad_width=1, mode='constant', constant_values=image.min()) # Check for peaks by comparing to neighbors is_peak = ( (padded_image[1:-1, 1:-1] > padded_image[:-2, :-2]) & (padded_image[1:-1, 1:-1] > padded_image[:-2, 1:-1]) & (padded_image[1:-1, 1:-1] > padded_image[:-2, 2:]) & (padded_image[1:-1, 1:-1] > padded_image[1:-1, :-2]) & (padded_image[1:-1, 1:-1] > padded_image[1:-1, 2:]) & (padded_image[1:-1, 1:-1] > padded_image[2:, :-2]) & (padded_image[1:-1, 1:-1] > padded_image[2:, 1:-1]) & (padded_image[1:-1, 1:-1] > padded_image[2:, 2:]) ) # Apply threshold peaks_mask = is_peak & (image >= threshold) # Get peak coordinates and heights Y, X = np.nonzero(peaks_mask) heights = image[Y, X] # Sort peaks by height in descending order sort_indices = np.argsort(-heights) X = X[sort_indices] Y = Y[sort_indices] heights = heights[sort_indices] # Convert pixel coordinates to true coordinates if boundaries are provided coords = None if verbose and true_boundaries and scaled_boundaries: coords = [] coord_system = 'pixel' if 'X' in true_boundaries['coord1_name'] else 'radec' for x, y in zip(X, Y): # Convert pixel indices to scaled coordinates scaled_coord1 = scaled_boundaries['coord1_min'] + (x + 0.5) * ( scaled_boundaries['coord1_max'] - scaled_boundaries['coord1_min'] ) / image.shape[1] scaled_coord2 = scaled_boundaries['coord2_min'] + (y + 0.5) * ( scaled_boundaries['coord2_max'] - scaled_boundaries['coord2_min'] ) / image.shape[0] # Convert scaled coordinates to true coordinates true_coord1 = np.interp( scaled_coord1, [scaled_boundaries['coord1_min'], scaled_boundaries['coord1_max']], [true_boundaries['coord1_min'], true_boundaries['coord1_max']] ) true_coord2 = np.interp( scaled_coord2, [scaled_boundaries['coord2_min'], scaled_boundaries['coord2_max']], [true_boundaries['coord2_min'], true_boundaries['coord2_max']] ) coords.append((true_coord1, true_coord2)) # Print peak information with appropriate coordinate labels if coord_system == 'radec': coord1_label, coord2_label = 'RA', 'Dec' else: coord1_label, coord2_label = 'X', 'Y' print("\nDetected Peaks:") print("-" * 60) print(f"{'Peak #':<8}{'Value':<12}{coord1_label:<12}{coord2_label:<12}") print("-" * 60) for i, ((c1, c2), height) in enumerate(zip(coords, heights), 1): print(f"{i:<8}{height:.<12.5f}{c1:.<12.5f}{c2:.<12.5f}") print("-" * 60) return X, Y, heights, coords
[docs] def g1g2_to_gt_gc(g1, g2, coord1, coord2, center_coord1, center_coord2, pix_coord1=100): """Convert shear components to tangential/cross components. Transform Cartesian shear components (g1, g2) to tangential and cross shear components relative to a specified center position. Parameters ---------- g1 : `numpy.ndarray` First shear component. g2 : `numpy.ndarray` Second shear component. coord1 : `numpy.ndarray` First coordinates (RA or X). coord2 : `numpy.ndarray` Second coordinates (Dec or Y). center_coord1 : `float` Center position in first coordinate. center_coord2 : `float` Center position in second coordinate. pix_coord1 : `int`, optional Grid size in first dimension. Returns ------- gt : `numpy.ndarray` Tangential shear component. gc : `numpy.ndarray` Cross shear component. phi : `numpy.ndarray` Polar angle relative to center. """ coord1_max = np.max(coord1) coord1_min = np.min(coord1) coord2_max = np.max(coord2) coord2_min = np.min(coord2) aspect_ratio = (coord1_max - coord1_min) / (coord2_max - coord2_min) pix_coord2 = int(pix_coord1 / aspect_ratio) # Create coordinate grid coord1_grid, coord2_grid = np.meshgrid( np.linspace(coord1_min, coord1_max, pix_coord1), np.linspace(coord2_min, coord2_max, pix_coord2) ) # Calculate polar angle phi = np.arctan2(coord2_grid - center_coord2, coord1_grid - center_coord1) # Calculate tangential and cross components gt = -g1 * np.cos(2 * phi) - g2 * np.sin(2 * phi) gc = -g1 * np.sin(2 * phi) + g2 * np.cos(2 * phi) return gt, gc, phi
def _shuffle_coordinates(shear_df, seed=None): """Shuffle the scaled coordinates of the input DataFrame together. Randomly permute coordinate pairs while maintaining the pairing between coord1_scaled and coord2_scaled columns. Parameters ---------- shear_df : `pandas.DataFrame` Input DataFrame with scaled coordinates. seed : `int`, optional Random seed for reproducibility. Returns ------- shuffled_df : `pandas.DataFrame` DataFrame with shuffled coordinate pairs. """ if seed is not None: random.seed(seed) shuffled_df = shear_df.copy() # Combine scaled coordinates into pairs coord_pairs = list(zip(shuffled_df['coord1_scaled'], shuffled_df['coord2_scaled'])) # Shuffle the pairs random.shuffle(coord_pairs) # Unzip the shuffled pairs shuffled_coord1, shuffled_coord2 = zip(*coord_pairs) # Update the scaled coordinates shuffled_df['coord1_scaled'] = shuffled_coord1 shuffled_df['coord2_scaled'] = shuffled_coord2 return shuffled_df def _shuffle_galaxy_rotation(shear_df, rng=None): """Shuffle galaxy orientations in the input shear DataFrame. Randomly rotate galaxy orientations by adding random angles to the shear components while preserving shear magnitudes. Parameters ---------- shear_df : `pandas.DataFrame` Input DataFrame with shear components. rng : `numpy.random.Generator`, optional Random number generator for orientation shuffling. Returns ------- shuffled_df : `pandas.DataFrame` DataFrame with randomly rotated shear components. """ shuffled_df = shear_df.copy() # Add a random angle to the galaxy rotation g1, g2 = shuffled_df['g1'], shuffled_df['g2'] if rng is None: angle = np.random.uniform(0, 2 * np.pi, len(g1)) else: angle = rng.uniform(0, 2 * np.pi, len(g1)) g1g2_len = np.sqrt(np.array(g1)**2 + np.array(g2)**2) g1g2_angle = np.arctan2(g2, g1) + angle shuffled_df['g1'] = g1g2_len * np.cos(g1g2_angle) shuffled_df['g2'] = g1g2_len * np.sin(g1g2_angle) return shuffled_df
[docs] def generate_multiple_shear_dfs(og_shear_df, num_shuffles=100, shuffle_type='spatial', seed=0): """Generate shuffled versions of shear catalog. Create multiple randomized versions of the input shear catalog for null hypothesis testing and error estimation. Parameters ---------- og_shear_df : `pandas.DataFrame` Original shear catalog. num_shuffles : `int`, optional Number of shuffled versions to generate. shuffle_type : `str`, optional Type of shuffling: 'spatial' (randomize positions) or 'orientation' (randomize galaxy orientations). seed : `int` or `str`, optional Random seed for reproducibility. If 'random', uses cryptographically secure random number from secrets module. Returns ------- shuffled_catalogs : `list` of `pandas.DataFrame` List of shuffled DataFrame copies. Raises ------ ValueError If invalid shuffle_type is specified. """ shuffled_dfs = [] # Handle 'random' seed option if seed == 'random': seed = secrets.randbits(128) rng = np.random.default_rng(seed) # For orientation shuffling, we'll use NumPy's RNG directly # For spatial shuffling, we still use Python's random module with the seed if shuffle_type == 'spatial': random.seed(seed) for i in range(num_shuffles): if shuffle_type == 'spatial': if seed != 'random': # Only set seed for each iteration if not using the 'random' option shuffled_df = _shuffle_coordinates(og_shear_df, seed=seed+i) else: # For 'random', we already set the seed once outside the loop shuffled_df = _shuffle_coordinates(og_shear_df, seed=None) elif shuffle_type == 'orientation': if seed == 'random': # Use NumPy's RNG directly for orientation shuffling with 'random' option shuffled_df = _shuffle_galaxy_rotation(og_shear_df, rng=rng) else: # Standard orientation shuffling shuffled_df = _shuffle_galaxy_rotation(og_shear_df) else: raise ValueError(f"Invalid shuffle type: {shuffle_type}") shuffled_dfs.append(shuffled_df) return shuffled_dfs
[docs] def save_fits(data, true_boundaries, filename): """Save a 2D array as a FITS file with proper WCS information. Write mass map data to a FITS file with World Coordinate System (WCS) headers for proper astronomical coordinate handling. Parameters ---------- data : `numpy.ndarray` 2D array containing the map data. true_boundaries : `dict` Dictionary with coordinate boundaries containing 'ra_min', 'ra_max', 'dec_min', 'dec_max' keys. filename : `str` Output FITS filename. Notes ----- The WCS information assumes a tangent plane projection (TAN) and sets appropriate pixel scales based on the coordinate boundaries. """ hdu = fits.PrimaryHDU(data) header = hdu.header ny, nx = data.shape ra_min, ra_max = true_boundaries['ra_min'], true_boundaries['ra_max'] dec_min, dec_max = true_boundaries['dec_min'], true_boundaries['dec_max'] pixel_scale_ra = (ra_max - ra_min) / nx pixel_scale_dec = (dec_max - dec_min) / ny header["CTYPE1"] = "RA---TAN" header["CUNIT1"] = "deg" header["CRVAL1"] = (ra_max + ra_min) / 2 header["CRPIX1"] = nx / 2 header["CD1_1"] = -pixel_scale_ra header["CD1_2"] = 0.0 header["CTYPE2"] = "DEC--TAN" header["CUNIT2"] = "deg" header["CRVAL2"] = (dec_max + dec_min) / 2 header["CRPIX2"] = ny / 2 header["CD2_1"] = 0.0 header["CD2_2"] = pixel_scale_dec hdu.writeto(filename, overwrite=True) print(f"Saved FITS file: {filename}")