Source code for plastro.overlap

"""
Overlap analysis module for PLASTRO scores.

This module provides functions for computing overlap-based plasticity scores
from character matrices and single-cell data. The PLASTRO score quantifies
the relationship between lineage and phenotypic distances.
"""

import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from tqdm import tqdm
from tqdm.contrib.concurrent import process_map
import multiprocessing
import heapq
import os
from sklearn.metrics.pairwise import pairwise_distances
from typing import Optional, Tuple
import anndata

[docs] def PLASTRO_score( character_matrix: pd.DataFrame, ad: 'anndata.AnnData', threshold: float = 0.95, maximum_radius: int = 500, interval: int = 1, latent_space_key: str = 'X_dm', flavor: str = 'gini', parallel: bool = False, save_to: Optional[str] = None, show_plots: bool = True ) -> pd.DataFrame: """ Compute the PLASTRO overlap plasticity score. The PLASTRO score quantifies cellular plasticity by measuring the overlap between lineage relationships (from character matrix) and phenotypic relationships (from latent space) at multiple spatial scales. Parameters ---------- character_matrix : pd.DataFrame Character matrix with cells as rows and CRISPR mutation sites as columns. Values represent mutation states (0=unmutated, >0=mutated states). ad : anndata.AnnData Annotated data object containing latent representation of phenotype. threshold : float, optional Threshold for variance in overlap (proportion of max peak variance), by default 0.95. Only used when flavor='variable_radii'. Radii with variance >= threshold * max_variance are used for computing the final score. maximum_radius : int, optional Maximum radius for computing overlap, by default 500. interval : int, optional Interval between radii for overlap computation, by default 1. latent_space_key : str, optional Key in `ad.obsm` where latent space coordinates are stored, by default 'X_dm'. flavor : str, optional Method for computing PLASTRO score, by default 'gini'. - 'gini': Computes Gini inequality index for each cell's overlap distribution across radii. Measures how concentrated the overlap is at specific spatial scales. * High Gini (→1): Overlap concentrated at few radii (unequal distribution) * Low Gini (→0): Overlap evenly distributed across radii (equal distribution) * Uses ALL radii from 1 to maximum_radius - 'variable_radii': Computes area under overlap curve using variance-filtered radii. Measures strength of lineage-phenotype concordance at most informative scales. * High score: Strong lineage-phenotype concordance (low plasticity) * Low score: Weak lineage-phenotype concordance (high plasticity) * Uses SELECTED radii based on variance threshold parallel : bool, optional Whether to use parallel processing, by default False. save_to : str, optional Directory path to save results, by default None. show_plots : bool, optional Whether to display variance analysis plots (only for flavor='variable_radii'), by default True. Returns ------- pd.DataFrame PLASTRO plasticity scores for each cell. Column name depends on flavor: - flavor='gini': Column 'Gini_Index' with values [0,1] - flavor='variable_radii': Column 'PLASTRO_score' with positive values Examples -------- >>> import plastro >>> >>> # Compute Gini-based plasticity scores >>> gini_scores = plastro.PLASTRO_score(char_matrix, ad, flavor='gini') >>> print(f"Mean Gini index: {gini_scores['Gini_Index'].mean():.3f}") >>> >>> # Compute variance-based plasticity scores >>> var_scores = plastro.PLASTRO_score(char_matrix, ad, flavor='variable_radii', threshold=0.95) >>> print(f"Mean PLASTRO score: {var_scores['PLASTRO_score'].mean():.3f}") Notes ----- The PLASTRO analysis workflow: 1. **Overlap Computation**: For each cell and radius r, compute overlap between: - Lineage neighbors: r cells most similar by CRISPR mutations - Phenotype neighbors: r cells most similar in latent space 2. **Score Computation**: Two complementary approaches: **Gini Approach** (flavor='gini'): - Computes Gini inequality coefficient for each cell's overlap profile - Measures how overlap varies with spatial scale for individual cells - Interpretation: Distribution pattern of lineage-phenotype concordance **Variable Radii Approach** (flavor='variable_radii'): - Identifies radii with high variance in overlap across cells - Computes area under overlap curve for informative radii only - Interpretation: Overall strength of lineage-phenotype concordance **When to use each flavor:** - Use **'gini'** to understand overlap distribution patterns per cell - Use **'variable_radii'** to measure overall plasticity strength - Both provide complementary views of the same underlying relationships """ overlaps = PLASTRO_overlaps( character_matrix, ad, maximum_radius=maximum_radius, interval=interval, latent_space_key=latent_space_key, parallel=parallel, save_to=save_to ) if flavor not in ['gini', 'variable_radii']: raise ValueError("Flavor must be 'gini' or 'variable_radii'") if flavor == 'gini': overlap_score = compute_gini_plasticity_score(overlaps) return overlap_score else: print('Selecting radius based on variance threshold...') print('WARNING: This approach is VERY sensitive to threshold choice!') overlap_score = compute_variable_radii_plasticity_score( overlaps, threshold=threshold, plot_variance=show_plots, save_to=save_to ) print('Computed overlap score.') return overlap_score
[docs] def PLASTRO_overlaps( character_matrix: pd.DataFrame, ad: 'anndata.AnnData', maximum_radius: int = 500, interval: int = 1, latent_space_key: str = 'X_dm', parallel: bool = False, save_to: Optional[str] = None ) -> pd.DataFrame: """ Compute overlaps between lineage and phenotypic neighborhoods. For each cell and radius, computes the overlap between cells that are lineage neighbors (similar character states) and phenotypic neighbors (close in latent space). Parameters ---------- character_matrix : pd.DataFrame Character matrix with mutation data. ad : anndata.AnnData Annotated data object with phenotypic information. maximum_radius : int, optional Maximum neighborhood radius, by default 500. interval : int, optional Radius increment, by default 1. latent_space_key : str, optional Key for latent space coordinates, by default 'X_dm'. parallel : bool, optional Use parallel processing, by default False. save_to : str, optional Save directory, by default None. Returns ------- pd.DataFrame Overlap values for each cell (rows) at each radius (columns). """ print(f'Computing overlaps with maximum radius {maximum_radius}') # Ensure consistent cell ordering common_cells = character_matrix.index.intersection(ad.obs_names) character_matrix = character_matrix.loc[common_cells] ad_subset = ad[common_cells].copy() # Compute distance matrices lineage_distances = compute_lineage_distances(character_matrix) phenotype_distances = compute_phenotype_distances(ad_subset, latent_space_key) radii = np.arange(1, maximum_radius + 1, interval) # Use optimized computation method overlaps = _compute_overlaps_precomputed_optimized( lineage_distances, phenotype_distances, radii ) # Convert to DataFrame with proper indexing overlaps_df = pd.DataFrame(overlaps, index=common_cells, columns=radii) if save_to is not None: overlaps_df.to_csv(os.path.join(save_to, 'overlaps.csv')) print(f'Saved overlaps to {save_to}') return overlaps_df
[docs] def compute_variable_radii_plasticity_score( overlaps: pd.DataFrame, threshold: float = 0.95, plot_variance: bool = True, save_to: Optional[str] = None ) -> Tuple[pd.DataFrame, pd.Series]: """ Compute PLASTRO scores using variable radii approach. This method identifies optimal radius ranges based on variance in overlap across cells, then computes area under the overlap curve for selected radii. It focuses on the most informative spatial scales for plasticity analysis. Parameters ---------- overlaps : pd.DataFrame Overlap matrix with cells as rows and radii as columns. Each value represents overlap between lineage and phenotype neighborhoods. threshold : float, optional Variance threshold for radius selection (0-1), by default 0.95. Radii with variance >= threshold * max_variance are considered informative. plot_variance : bool, optional Whether to plot variance analysis for radius selection, by default True. save_to : str, optional Directory to save variance analysis plots, by default None. Returns ------- Tuple[pd.DataFrame, pd.Series] - PLASTRO scores for each cell (column: 'PLASTRO_score') - Variance values across all radii Notes ----- **Algorithm:** 1. Compute variance in overlap **across cells** for each radius 2. Identify radii with variance >= threshold * max_variance 3. Compute mean overlap across selected radii for each cell **Interpretation:** - **High scores**: Strong concordance between lineage and phenotype (low plasticity) - **Low scores**: Weak concordance between lineage and phenotype (high plasticity) - **Selected radii**: Spatial scales that best differentiate cells by plasticity **Comparison to Gini approach:** - Variable radii: Measures overall strength using informative scales - Gini: Measures distribution pattern across all scales """ # Compute variance across cells for each radius variance_across_cells = overlaps.var(axis=0) max_variance = variance_across_cells.max() threshold_variance = threshold * max_variance # Identify optimal radius range valid_radii = variance_across_cells[variance_across_cells >= threshold_variance] if len(valid_radii) == 0: print(f"Warning: No radii meet variance threshold {threshold}") valid_radii = variance_across_cells.nlargest(10) # Use top 10 radii print(f"Using {len(valid_radii)} radii for score computation") print(f"Radius range: {valid_radii.index.min()} - {valid_radii.index.max()}") # Compute area under curve for valid radii overlap_scores = overlaps[valid_radii.index].sum(axis=1) / len(valid_radii) overlap_scores = pd.DataFrame({'PLASTRO_score': overlap_scores}) if plot_variance: _plot_variance_analysis(variance_across_cells, threshold_variance, valid_radii, save_to) return overlap_scores
def _plot_variance_analysis( variance_across_cells: pd.Series, threshold_variance: float, valid_radii: pd.Series, save_to: Optional[str] ) -> None: """Plot variance analysis for radius selection.""" plt.figure(figsize=(10, 6)) plt.plot(variance_across_cells.index, variance_across_cells.values, 'b-', alpha=0.7) plt.axhline(y=threshold_variance, color='r', linestyle='--', label=f'Threshold ({threshold_variance:.3f})') plt.scatter(valid_radii.index, valid_radii.values, color='red', s=20, label='Selected radii', zorder=5) plt.xlabel('Radius') plt.ylabel('Variance in Overlap') plt.title('Variance Analysis for Radius Selection') plt.legend() plt.grid(True, alpha=0.3) if save_to is not None: plt.savefig(os.path.join(save_to, 'variance_analysis.png'), dpi=300, bbox_inches='tight') plt.show()
[docs] def gini_index(arr: np.ndarray) -> float: """ Compute the Gini inequality index for a 1D array. The Gini index measures inequality in the distribution of values, commonly used in economics but applicable to any distribution analysis. For PLASTRO, it measures how concentrated overlap values are across radii. Parameters ---------- arr : np.ndarray Array of overlap values between 0 and 1. Each value represents overlap at a different radius. Returns ------- float Gini index value between 0 and 1. - 0: Perfect equality (all values identical) - 1: Perfect inequality (one value has everything, others have nothing) Notes ----- **Mathematical definition:** Gini = 1 - Σ(p_i + p_{i-1}) * (x_i - x_{i-1}) Where p_i is cumulative proportion and x_i are sorted values. **For PLASTRO interpretation:** - High Gini: Overlap concentrated at specific radii (scale-specific plasticity) - Low Gini: Overlap distributed across radii (scale-invariant plasticity) """ if not (np.all(arr >= 0) and np.all(arr <= 1)): # Print the values that are out of bounds out_of_bounds = arr[(arr < 0) | (arr > 1)] print(f"Out of bounds values: {out_of_bounds}") raise ValueError("Array values must be between 0 and 1") arr = np.sort(arr) cumsum = np.cumsum(arr) cumprop = cumsum / cumsum[-1] gini = 1 - np.sum((cumprop[1:] + cumprop[:-1]) * (arr[1:] - arr[:-1])) return gini
[docs] def compute_gini_plasticity_score(overlaps: pd.DataFrame) -> pd.DataFrame: """ Compute PLASTRO scores using Gini inequality index approach. This method computes a Gini coefficient for each cell's overlap distribution across all radii. The Gini index measures how concentrated or dispersed the lineage-phenotype concordance is across different spatial scales. Parameters ---------- overlaps : pd.DataFrame Overlap matrix with cells as rows and radii as columns. This is the output from the PLASTRO_overlaps() function. Each value represents overlap between lineage and phenotype neighborhoods at a given radius. Returns ------- pd.DataFrame Gini plasticity scores for each cell with column 'Gini_Index'. Values range from 0 to 1. Notes ----- **Algorithm:** 1. For each cell, compute Gini coefficient of overlap values across ALL radii 2. Gini measures inequality in the overlap distribution **Interpretation:** - **High Gini (≈1)**: Overlap concentrated at few specific radii * Suggests scale-specific plasticity patterns * Lineage-phenotype concordance varies dramatically with spatial scale - **Low Gini (≈0)**: Overlap evenly distributed across radii * Suggests scale-invariant plasticity patterns * Consistent lineage-phenotype relationship across scales **Comparison to Variable Radii approach:** - Gini: Characterizes the **pattern** of how concordance varies with scale - Variable radii: Measures the **strength** of concordance at optimal scales **Use cases:** - Understanding how plasticity manifests across spatial scales - Identifying cells with scale-specific vs scale-invariant plasticity - Complementary analysis to variable radii approach """ cell_names = overlaps.index overlaps_values = overlaps.values if len(overlaps_values.shape) != 2: raise ValueError("Overlaps must be a 2D array") # Compute Gini index for each row gini_values = np.array([gini_index(overlaps_values[i]) for i in range(overlaps_values.shape[0])]) return pd.DataFrame(gini_values, index=cell_names, columns=['Gini_Index'])
# Optimized overlap computation functions def _compute_overlaps_precomputed_optimized( lineage_distances: pd.DataFrame, phenotype_distances: pd.DataFrame, radii: np.ndarray ) -> np.ndarray: """Optimized overlap computation using pre-computed rankings.""" # Pre-compute partial rankings - only up to max_radius, not entire distance matrix lineage_rankings = _get_all_neighbors_exact_optimized(lineage_distances.values, radii.max()) phenotype_rankings = _get_all_neighbors_exact_optimized(phenotype_distances.values, radii.max()) n_cells = len(lineage_distances) n_radii = len(radii) overlaps = np.zeros((n_cells, n_radii)) # Compute overlaps with pre-computed rankings for i, radius in enumerate(tqdm(radii, desc="Computing overlaps")): overlaps[:, i] = _compute_overlap_from_rankings_optimized(lineage_rankings, phenotype_rankings, radius) return overlaps def _get_all_neighbors_exact_optimized(distance_matrix: np.ndarray, max_k: int) -> np.ndarray: """Get all k-nearest neighbors up to max_k for all cells using partial sorting.""" n_cells = distance_matrix.shape[0] # Only partition up to max_k+1 (including self) - O(n) instead of O(n log n) neighbors = np.argpartition(distance_matrix, max_k, axis=1)[:, :max_k+1] # Sort only within the k-smallest subset for consistent ordering - O(k log k) for i in range(n_cells): k_smallest_indices = neighbors[i, :max_k+1] k_smallest_dists = distance_matrix[i, k_smallest_indices] sorted_order = np.argsort(k_smallest_dists) neighbors[i, :max_k+1] = k_smallest_indices[sorted_order] # Ensure self is at position 0 (should have distance 0) if neighbors[i, 0] != i: # Find self in the neighbors and move to position 0 self_pos = np.where(neighbors[i, :max_k+1] == i)[0] if len(self_pos) > 0: # Swap self to position 0 neighbors[i, [0, self_pos[0]]] = neighbors[i, [self_pos[0], 0]] return neighbors def _compute_overlap_from_rankings_optimized( lineage_rankings: np.ndarray, phenotype_rankings: np.ndarray, radius: int ) -> np.ndarray: """Compute overlaps from pre-computed neighbor rankings.""" n_cells = lineage_rankings.shape[0] overlaps = np.zeros(n_cells) for cell in range(n_cells): # Only use first 'radius' neighbors (excluding self at index 0) lineage_neighbors = lineage_rankings[cell, 1:radius+1] # Skip self phenotype_neighbors = phenotype_rankings[cell, 1:radius+1] # Skip self # Compute intersection size for the radius-specific neighbors overlap_size = len(np.intersect1d(lineage_neighbors, phenotype_neighbors)) overlaps[cell] = overlap_size / radius return overlaps
[docs] def compute_lineage_distances(character_matrix: pd.DataFrame) -> pd.DataFrame: """ Compute pairwise lineage distances from character matrix. Uses modified Hamming distance that accounts for different mutation states. Parameters ---------- character_matrix : pd.DataFrame Character matrix with mutation states. Returns ------- pd.DataFrame Pairwise lineage distance matrix. """ def modified_hamming_distance(x, y): """Modified Hamming distance for character states.""" dists = 2 * (x != y).astype(float) - np.logical_xor(x == 0, y == 0).astype(float) nan_mask = np.logical_or(np.isnan(x), np.isnan(y)) dists[nan_mask] = np.nan return np.nanmean(dists) distances = pairwise_distances( character_matrix.values, metric=modified_hamming_distance ) return pd.DataFrame( distances, index=character_matrix.index, columns=character_matrix.index )
[docs] def compute_phenotype_distances( ad: 'anndata.AnnData', latent_space_key: str ) -> pd.DataFrame: """ Compute pairwise phenotypic distances from latent space. Parameters ---------- ad : anndata.AnnData Annotated data object. latent_space_key : str Key for latent space coordinates in ad.obsm. Returns ------- pd.DataFrame Pairwise phenotypic distance matrix. """ if latent_space_key not in ad.obsm: raise KeyError(f"Latent space key '{latent_space_key}' not found in ad.obsm") distances = pairwise_distances(ad.obsm[latent_space_key], metric='euclidean') return pd.DataFrame( distances, index=ad.obs_names, columns=ad.obs_names )
[docs] def compute_radius_overlaps( lineage_distances: pd.DataFrame, phenotype_distances: pd.DataFrame, radius: int ) -> pd.Series: """ Compute overlaps for a specific radius. Parameters ---------- lineage_distances : pd.DataFrame Pairwise lineage distances. phenotype_distances : pd.DataFrame Pairwise phenotypic distances. radius : int Neighborhood radius. Returns ------- pd.Series Overlap values for each cell at the specified radius. """ overlaps = [] for cell in lineage_distances.index: # Get k-nearest neighbors by lineage lineage_neighbors = get_k_nearest_neighbors(lineage_distances, cell, radius) # Get k-nearest neighbors by phenotype phenotype_neighbors = get_k_nearest_neighbors(phenotype_distances, cell, radius) # Compute overlap overlap = len(lineage_neighbors.intersection(phenotype_neighbors)) / radius overlaps.append(overlap) return pd.Series(overlaps, index=lineage_distances.index)
[docs] def get_k_nearest_neighbors( distance_matrix: pd.DataFrame, cell: str, k: int ) -> set: """ Get k-nearest neighbors for a cell. Parameters ---------- distance_matrix : pd.DataFrame Pairwise distance matrix. cell : str Query cell name. k : int Number of neighbors to return. Returns ------- set Set of k-nearest neighbor cell names. """ distances = distance_matrix.loc[cell].drop(cell) # Exclude self return set(distances.nsmallest(k).index)