larvaworld.lib.util.xy ====================== .. py:module:: larvaworld.lib.util.xy .. autoapi-nested-parse:: Methods for managing spatial metrics (2D x-y arrays) Exceptions ---------- .. autoapisummary:: larvaworld.lib.util.xy.Collision Functions --------- .. autoapisummary:: larvaworld.lib.util.xy.fft_max larvaworld.lib.util.xy.detect_strides larvaworld.lib.util.xy.stride_interp larvaworld.lib.util.xy.mean_stride_curve larvaworld.lib.util.xy.comp_PI larvaworld.lib.util.xy.rolling_window larvaworld.lib.util.xy.straightness_index larvaworld.lib.util.xy.sense_food larvaworld.lib.util.xy.generate_seg_shapes larvaworld.lib.util.xy.rearrange_contour larvaworld.lib.util.xy.comp_bearing larvaworld.lib.util.xy.comp_bearing_solo larvaworld.lib.util.xy.compute_dispersal_solo larvaworld.lib.util.xy.compute_dispersal_multi larvaworld.lib.util.xy.compute_component_velocity larvaworld.lib.util.xy.compute_velocity_threshold larvaworld.lib.util.xy.get_display_dims larvaworld.lib.util.xy.get_window_dims larvaworld.lib.util.xy.get_arena_bounds larvaworld.lib.util.xy.circle_to_polygon larvaworld.lib.util.xy.boolean_indexing larvaworld.lib.util.xy.concat_datasets larvaworld.lib.util.xy.moving_average larvaworld.lib.util.xy.body_contour larvaworld.lib.util.xy.apply_per_level larvaworld.lib.util.xy.unwrap_deg larvaworld.lib.util.xy.unwrap_rad larvaworld.lib.util.xy.rate larvaworld.lib.util.xy.eudist larvaworld.lib.util.xy.eudi5x larvaworld.lib.util.xy.eudiNxN larvaworld.lib.util.xy.compute_dst larvaworld.lib.util.xy.comp_extrema larvaworld.lib.util.xy.fixate_larva larvaworld.lib.util.xy.epoch_overlap larvaworld.lib.util.xy.epoch_slices Module Contents --------------- .. py:function:: fft_max(a: numpy.ndarray, dt: float, fr_range: tuple[float, float] = (0.0, +np.inf), return_amps: bool = False) -> float | tuple[float, numpy.ndarray] Compute power spectrum and dominant frequency of a signal. Args: a: 1D signal timeseries array dt: Timestep of the timeseries fr_range: Frequency range allowed (min, max) return_amps: If True, return both frequency and power spectrum array Returns: Dominant frequency within range, or tuple of (frequency, power spectrum array) if return_amps=True Example: >>> signal = np.sin(2 * np.pi * 1.5 * np.arange(0, 10, 0.1)) >>> freq = fft_max(signal, dt=0.1, fr_range=(1.0, 2.0)) .. py:function:: detect_strides(a: numpy.ndarray, dt: float, vel_thr: float = 0.3, stretch: tuple[float, float] = (0.75, 2.0), fr: Optional[float] = None) -> numpy.ndarray Detect stride events in velocity timeseries. Args: a: 1D forward velocity timeseries array dt: Timestep of the timeseries vel_thr: Maximum velocity threshold for pause detection stretch: Min-max stretch of stride duration relative to frequency-based default fr: Dominant crawling frequency (auto-detected if None) Returns: Array of stride intervals, shape (N, 2) with [start_idx, end_idx] pairs Example: >>> velocity = np.array([0.1, 0.5, 0.8, 0.5, 0.1, 0.5, 0.8]) >>> strides = detect_strides(velocity, dt=0.1) .. py:function:: stride_interp(a: numpy.ndarray, strides: numpy.ndarray, Nbins: int = 64) -> numpy.ndarray Interpolate stride segments to uniform length. Args: a: 1D signal array strides: Array of stride intervals, shape (N, 2) with [start, end] indices Nbins: Number of bins for interpolation Returns: Array of interpolated strides, shape (N_strides, Nbins) Example: >>> signal = np.array([0, 1, 2, 1, 0, 1, 2, 1, 0]) >>> strides = np.array([[0, 4], [4, 8]]) >>> interp = stride_interp(signal, strides, Nbins=32) .. py:function:: mean_stride_curve(a: numpy.ndarray, strides: numpy.ndarray, da: numpy.ndarray, Nbins: int = 64) -> larvaworld.lib.util.AttrDict Compute median stride curves separated by direction. Args: a: 1D signal array strides: Array of stride intervals da: Direction array (positive/negative values) Nbins: Number of bins for interpolation Returns: AttrDict with keys 'abs', 'plus', 'minus', 'norm' containing median stride curves Example: >>> signal = np.array([0, 1, 2, 1, 0, 1, 2, 1, 0]) >>> strides = np.array([[0, 4], [4, 8]]) >>> da = np.array([1, -1]) >>> curves = mean_stride_curve(signal, strides, da) .. py:function:: comp_PI(arena_xdim: float, xs: numpy.ndarray, return_num: bool = False) -> float | tuple[float, int] Compute preference index for spatial distribution. Calculates left-right preference index based on x-coordinates distribution in arena. Values range from -1 (all right) to +1 (all left). Args: arena_xdim: Arena x-dimension xs: Array of x-coordinates return_num: If True, also return sample count Returns: Preference index, or tuple of (index, count) if return_num=True Example: >>> xs = np.array([-0.3, -0.2, 0.1, 0.3]) >>> pi = comp_PI(arena_xdim=1.0, xs=xs) .. py:function:: rolling_window(a: numpy.ndarray, w: int) -> numpy.ndarray Create rolling windows of size w from 1D array. Args: a: 1D input array w: Window size Returns: 2D array of rolling windows, shape (N-w+1, w) Raises: ValueError: If input array is not 1-dimensional Example: >>> a = np.array([1, 2, 3, 4, 5]) >>> windows = rolling_window(a, w=3) .. py:function:: straightness_index(ss: pandas.DataFrame, rolling_ticks: numpy.ndarray) -> numpy.ndarray Compute straightness index over rolling windows. Straightness index is defined as 1 - (straight_line_distance / path_distance), ranging from 0 (perfectly straight) to 1 (highly tortuous). Args: ss: DataFrame with columns 'x', 'y', 'dst' rolling_ticks: Rolling window indices array Returns: Array of straightness index values Example: >>> ss = pd.DataFrame({'x': [0, 1, 2], 'y': [0, 0, 0], 'dst': [0, 1, 1]}) >>> rolling_ticks = np.array([[0, 1], [1, 2]]) >>> si = straightness_index(ss, rolling_ticks) .. py:function:: sense_food(pos: tuple[float, float], sources: Optional[Any] = None, grid: Optional[Any] = None, radius: Optional[float] = None) -> Any Detect food sources near a position. Args: pos: (x, y) position coordinates sources: Optional agent list with food sources grid: Optional grid object with food distribution radius: Detection radius for source-based sensing Returns: Grid cell coordinates, food source object, or None if no food detected Example: >>> pos = (0.5, 0.5) >>> cell = sense_food(pos, grid=food_grid) .. py:function:: generate_seg_shapes(Nsegs: int, points: numpy.ndarray, seg_ratio: Optional[numpy.ndarray] = None, centered: bool = True, closed: bool = False) -> numpy.ndarray Segment body contour into equal or custom-length segments via vertical lines. Args: Nsegs: Number of segments to divide the body into points: Array of shape (M, 2) representing body contour seg_ratio: Optional array of segment length ratios (default: equal segments) centered: If True, center segments around origin closed: If True, connect last point to first point in each segment Returns: Array of shape (Nsegs, L, 2) where L is vertices per segment, front segment first Example: >>> contour = np.array([[1, 0.1], [0.5, 0.1], [0, 0]]) >>> segments = generate_seg_shapes(Nsegs=2, points=contour) .. py:exception:: Collision(object1: Any, object2: Any) Bases: :py:obj:`Exception` Exception raised when two objects collide. Attributes: object1: First colliding object object2: Second colliding object Example: >>> raise Collision(agent1, agent2) .. py:attribute:: object1 .. py:attribute:: object2 .. py:function:: rearrange_contour(ps0: list[tuple[float, float]]) -> list[tuple[float, float]] Rearrange contour points by separating positive and negative y-values. Args: ps0: List of (x, y) contour points Returns: Rearranged list with positive y points (descending x) followed by negative y points (ascending x) Example: >>> points = [(1.0, 0.5), (0.5, -0.3), (0.8, 0.2)] >>> rearranged = rearrange_contour(points) .. py:function:: comp_bearing(xs: numpy.ndarray, ys: numpy.ndarray, ors: float | numpy.ndarray, loc: tuple[float, float] = (0.0, 0.0), in_deg: bool = True) -> numpy.ndarray Compute bearing (azimuth) of oriented points relative to reference location. Args: xs: Array of x-coordinates ys: Array of y-coordinates ors: Orientation angles (in degrees) loc: Reference location (x, y) in_deg: If True, return bearings in degrees; if False, in radians Returns: Array of bearing angles, range (-180, 180] degrees or (-π, π] radians Example: >>> xs = np.array([1.0, 2.0, 3.0]) >>> ys = np.array([1.0, 2.0, 0.0]) >>> bearings = comp_bearing(xs, ys, ors=90.0) >>> # Returns [-135., -135., -90.] .. py:function:: comp_bearing_solo(x: float, y: float, o: float, loc: tuple[float, float] = (0.0, 0.0)) -> float Compute bearing angle for single oriented point relative to location. Args: x: Point x-coordinate y: Point y-coordinate o: Orientation angle (radians) loc: Reference location (x, y) Returns: Bearing angle in radians, range (-π, π] Example: >>> bearing = comp_bearing_solo(x=1.0, y=1.0, o=np.pi/4, loc=(0.0, 0.0)) .. py:function:: compute_dispersal_solo(xy: numpy.ndarray | pandas.DataFrame, min_valid_proportion: float = 0.2, max_start_proportion: float = 0.1, min_end_proportion: float = 0.9) -> numpy.ndarray Compute dispersal (distance from start) for single trajectory. Validates trajectory completeness before computing distances from initial position. Args: xy: Trajectory data, shape (N, 2) with [x, y] coordinates min_valid_proportion: Minimum proportion of non-NaN data points required (default: 0.2) max_start_proportion: Maximum proportion of NaN data allowed at start (default: 0.1) min_end_proportion: Minimum data proportion before last valid point (default: 0.9) Returns: Array of dispersal values, or NaN array if trajectory invalid Example: >>> xy = np.array([[0, 0], [1, 0], [2, 1]]) >>> dispersal = compute_dispersal_solo(xy) .. py:function:: compute_dispersal_multi(xy0: pandas.DataFrame, t0: float, t1: float, dt: float, **kwargs: Any) -> tuple[numpy.ndarray, int] Compute dispersal values for multiple agents over time range. Args: xy0: MultiIndex DataFrame with agent positions (levels: Step, AgentID) t0: Start time in seconds t1: End time in seconds dt: Timestep of timeseries **kwargs: Additional arguments passed to compute_dispersal_solo Returns: Tuple of (dispersal_array, n_timesteps) where dispersal_array is flattened Example: >>> xy_data = pd.DataFrame({...}) # MultiIndex DataFrame >>> dispersal, n_steps = compute_dispersal_multi(xy_data, t0=0, t1=10, dt=0.1) .. py:function:: compute_component_velocity(xy: numpy.ndarray, angles: numpy.ndarray, dt: float, return_dst: bool = False) -> numpy.ndarray | tuple[numpy.ndarray, numpy.ndarray] Compute velocity component along orientation angles. Args: xy: Array of shape (N, 2) with [x, y] coordinates angles: Array of shape (N,) with orientation angles in radians dt: Time interval for velocity calculation return_dst: If True, return both velocities and displacements Returns: Velocity array, or tuple of (velocity, displacement) if return_dst=True Example: >>> xy = np.array([[0, 0], [1, 0], [2, 1]]) >>> angles = np.array([0, 0, np.pi/4]) >>> v = compute_component_velocity(xy, angles, dt=0.1) .. py:function:: compute_velocity_threshold(v: numpy.ndarray, Nbins: int = 500, max_v: Optional[float] = None, kernel_width: float = 0.02) -> float Compute velocity threshold using density-based approach. Identifies minimum between local maxima and minima in smoothed density curve. Args: v: Input velocity data array Nbins: Number of histogram bins max_v: Maximum velocity value (auto-detected if None) kernel_width: Gaussian kernel width for density smoothing Returns: Computed velocity threshold Example: >>> velocities = np.random.exponential(0.5, 1000) >>> threshold = compute_velocity_threshold(velocities) .. py:function:: get_display_dims() -> tuple[int, int] Get display dimensions scaled to 2/3 of screen size. Returns: Tuple of (width, height) in pixels, rounded to multiples of 16 Example: >>> width, height = get_display_dims() .. py:function:: get_window_dims(arena_dims: tuple[float, float]) -> tuple[int, int] Compute optimal window dimensions for arena visualization. Maintains aspect ratio while fitting within display bounds. Args: arena_dims: Arena dimensions (width, height) Returns: Tuple of (window_width, window_height) in pixels Example: >>> dims = get_window_dims(arena_dims=(0.2, 0.2)) .. py:function:: get_arena_bounds(arena_dims: tuple[float, float], s: float = 1) -> numpy.ndarray Compute arena bounds centered at origin. Args: arena_dims: Arena dimensions (width, height) s: Scaling factor Returns: Array [x_min, x_max, y_min, y_max] Example: >>> bounds = get_arena_bounds(arena_dims=(1.0, 0.8)) >>> # Returns [-0.5, 0.5, -0.4, 0.4] .. py:function:: circle_to_polygon(N: int, r: float) -> list[tuple[float, float]] Generate polygon vertices approximating a circle. Args: N: Number of vertices r: Radius of circle Returns: List of (x, y) vertex coordinates Example: >>> vertices = circle_to_polygon(N=8, r=1.0) .. py:function:: boolean_indexing(v: list[numpy.ndarray], fillval: float = np.nan) -> numpy.ndarray Convert list of variable-length arrays to 2D array with padding. Args: v: List of 1D numpy arrays with different lengths fillval: Value to use for padding shorter arrays Returns: 2D array with shape (N, max_length), padded with fillval Example: >>> arrays = [np.array([1, 2]), np.array([3, 4, 5])] >>> result = boolean_indexing(arrays, fillval=0) .. py:function:: concat_datasets(ddic: dict[str, Any], key: str = 'end', unit: str = 'sec') -> pandas.DataFrame Concatenate multiple datasets into single DataFrame. Args: ddic: Dictionary mapping dataset IDs to dataset objects key: Data type to extract ('end' for endpoint_data, 'step' for step_data) unit: Time unit for step data ('sec', 'min', 'hour', 'day') Returns: Concatenated DataFrame with added DatasetID and GroupID columns Example: >>> datasets = {'exp1': dataset1, 'exp2': dataset2} >>> df = concat_datasets(datasets, key='step', unit='min') .. py:function:: moving_average(a: numpy.ndarray, n: int = 3) -> numpy.ndarray Compute moving average with window size n. Args: a: 1D input array n: Window size Returns: Array of moving averages (same length as input) Example: >>> data = np.array([1, 2, 3, 4, 5]) >>> smoothed = moving_average(data, n=3) .. py:function:: body_contour(points: list[tuple[float, float]] = [(0.9, 0.1), (0.05, 0.1)], start: tuple[float, float] = (1, 0), stop: tuple[float, float] = (0, 0)) -> numpy.ndarray Generate symmetric body contour from half-side points. Args: points: List of (x, y) points for upper half of body start: Starting point coordinates stop: Ending point coordinates Returns: Array of shape (2*N+2, 2) with full symmetric contour Example: >>> contour = body_contour(points=[(0.9, 0.1), (0.5, 0.15)]) .. py:function:: apply_per_level(s: pandas.DataFrame, func: Any, level: str = 'AgentID', **kwargs: Any) -> numpy.ndarray Apply function to each group in MultiIndex DataFrame. Args: s: MultiIndex DataFrame with levels ['Step', 'AgentID'] func: Function to apply to each group level: Grouping level ('AgentID' or 'Step') **kwargs: Additional arguments passed to func Returns: Array of shape (N_steps, N_agents) with function results Example: >>> data = pd.DataFrame(...).set_index(['Step', 'AgentID']) >>> result = apply_per_level(data, np.mean, level='AgentID') .. py:function:: unwrap_deg(a: numpy.ndarray | pandas.Series) -> numpy.ndarray Unwrap angles in degrees to remove discontinuities. Args: a: Array or Series of angles in degrees Returns: Unwrapped angles in degrees Example: >>> angles = np.array([170, 180, -170, -160]) >>> unwrapped = unwrap_deg(angles) .. py:function:: unwrap_rad(a: numpy.ndarray | pandas.Series) -> numpy.ndarray Unwrap angles in radians to remove discontinuities. Args: a: Array or Series of angles in radians Returns: Unwrapped angles in radians Example: >>> angles = np.array([3.0, 3.14, -3.1, -3.0]) >>> unwrapped = unwrap_rad(angles) .. py:function:: rate(a: numpy.ndarray | pandas.Series, dt: float) -> numpy.ndarray Compute rate of change (derivative) of signal. Args: a: Input signal array or Series dt: Time step Returns: Array of rates, first element is NaN Example: >>> signal = np.array([0, 1, 3, 6]) >>> velocity = rate(signal, dt=0.1) .. py:function:: eudist(xy: numpy.ndarray | pandas.DataFrame) -> numpy.ndarray Compute Euclidean distances between consecutive points in trajectory. Args: xy: Trajectory array or DataFrame, shape (N, 2) with [x, y] coordinates Returns: Array of cumulative distances, first element is 0 Example: >>> xy = np.array([[0, 0], [1, 0], [1, 1]]) >>> distances = eudist(xy) >>> # Returns [0, 1.0, 1.0] .. py:function:: eudi5x(a: numpy.ndarray, b: numpy.ndarray) -> numpy.ndarray Calculate Euclidean distances between points in arrays a and b. Args: a: Array of shape (N, D) with N points in D dimensions b: Single point or array of shape (D,) to measure distance from Returns: Array of N Euclidean distances Example: >>> a = np.array([[0, 0], [1, 0], [0, 1]]) >>> b = np.array([0.5, 0.5]) >>> distances = eudi5x(a, b) .. py:function:: eudiNxN(a: numpy.ndarray, b: numpy.ndarray) -> numpy.ndarray Compute pairwise Euclidean distances between two sets of points. Args: a: Array of shape (N, M, 2) representing N sets of M points b: Array of shape (K, 2) representing K reference points Returns: Array of shape (N, M, K) with pairwise distances Example: >>> a = np.random.rand(5, 10, 2) >>> b = np.random.rand(3, 2) >>> distances = eudiNxN(a, b) .. py:function:: compute_dst(s: pandas.DataFrame, point: str = '') -> None Compute and add distance column to DataFrame (in-place). Args: s: MultiIndex DataFrame with trajectory data point: Point identifier (empty for default midpoint) Example: >>> compute_dst(step_data, point="head") .. py:function:: comp_extrema(a: pandas.Series, order: int = 3, threshold: Optional[tuple[float, float]] = None, return_2D: bool = True) -> numpy.ndarray Compute local extrema in time series using scipy.signal.argrelextrema. Args: a: Input time series as pandas Series order: Order of extrema detection (minimum separation) threshold: Optional (min_threshold, max_threshold) to filter extrema by value return_2D: If True, return 2D array [minima_flags, maxima_flags]; if False, return 1D (-1/1/NaN) Returns: Array with extrema flags (shape (N, 2) if return_2D=True, else (N,)) Example: >>> data = pd.Series([1, 3, 2, 4, 1, 5]) >>> extrema = comp_extrema(data, order=1, return_2D=False) .. py:function:: fixate_larva(s: pandas.DataFrame, c: Any, arena_dims: tuple[float, float], P1: str, P2: Optional[str] = None) -> tuple[pandas.DataFrame, numpy.ndarray] Transform coordinates to fixate primary point to arena center. Optionally aligns secondary point to vertical axis via rotation. Args: s: Step data DataFrame with trajectory coordinates c: Dataset configuration object arena_dims: Arena dimensions (width, height) P1: Primary point identifier to fix to center P2: Optional secondary point to align to vertical axis Returns: Tuple of (transformed_dataframe, background_transformations) where background is [bg_x, bg_y, bg_angle] Raises: ValueError: If requested point not found in dataset Example: >>> s_fixed, bg = fixate_larva(step_data, config, (0.2, 0.2), P1='centroid', P2='head') .. py:function:: epoch_overlap(epochs1: numpy.ndarray, epochs2: numpy.ndarray) -> numpy.ndarray Find epochs from epochs1 that overlap with any epoch in epochs2. Args: epochs1: Array of shape (N, 2) with [start, end] time pairs epochs2: Array of shape (M, 2) with [start, end] time pairs Returns: Array of overlapping epochs from epochs1 Example: >>> epochs1 = np.array([[0, 5], [10, 15]]) >>> epochs2 = np.array([[3, 12]]) >>> overlapping = epoch_overlap(epochs1, epochs2) .. py:function:: epoch_slices(epochs: numpy.ndarray) -> list[numpy.ndarray] Generate index arrays for each epoch interval. Args: epochs: Array of shape (N, 2) with [start_idx, end_idx] pairs Returns: List of N index arrays, each covering one epoch interval Example: >>> epochs = np.array([[0, 3], [5, 8]]) >>> slices = epoch_slices(epochs) >>> # Returns [array([0, 1, 2]), array([5, 6, 7])]