Source code for sdss_semaphore

__version__ = "0.2.2"

import numpy as np
import warnings
from typing import Union, Tuple, Iterable, List, Tuple


[docs]class BaseFlags: """A base class for communicating with flags.""" def __init__(self, array: Union[np.ndarray, Iterable[Iterable[int]], Iterable[bytearray]]) -> None: if isinstance(array, (list, tuple)) and isinstance(array[0], bytearray): # TODO: If the self.dtype is not uint8, then we might need to compute these initial offsets ourselves, # because I think bytearray is natively uint8 if self.dtype != np.uint8: warnings.warn("Converting from list of bytearrays to integer array, but `dtype` is not uint8. Hold on to your butts.") N, F = (len(array), max(len(item) for item in array)) self.array = np.zeros((N, F), dtype=self.dtype) for i, item in enumerate(array): self.array[i, :len(item)] = np.frombuffer(item, dtype=self.dtype) else: self.array = np.atleast_2d(array).astype(self.dtype) return None @property def dtype(self): raise NotImplementedError(f"`dtype` must be defined in subclass") @property def n_bits(self): raise NotImplementedError(f"`n_bits` must be defined in subclass") @property def mapping(self): raise NotImplementedError(f"`mapping` must be defined in subclass") def _all_attributes(self, key): return tuple(set(attrs[key] for attrs in self.mapping.values()))
[docs] def count(self, skip_empty: bool = False) -> dict: """ Return a dictionary containing the number of items assigned with each flag. :param skip_empty: [optional] Skip flags with no items assigned to them. """ return self._count( { bit: [bit] for bit in self.mapping.keys() }, skip_empty=skip_empty )
[docs] def count_by_attribute(self, attribute, skip_empty: bool = False) -> dict: """ Return a dictionary of the items assigned with flags of a given attribute. The keys are the bit positions of each flag, or if an attribuet is given, then this will be the attribute of the flags to count. The values are the number of items assigned to flags with that attribute. :param attribute: The flag attribute to count by. :param skip_empty: [optional] Skip flags with no items assigned to them. """ # Need bits per attribute to avoid double-counting bits_per_attribute = {} for bit, attributes in self.mapping.items(): bits_per_attribute.setdefault(attributes[attribute], []) bits_per_attribute[attributes[attribute]].append(bit) return self._count(bits_per_attribute, skip_empty=skip_empty)
def _count(self, bits_per_attribute, skip_empty: bool = False) -> dict: """ Count the number of items assigned to flags with given attributes. :param bits_per_attribute: A dictionary of the bit positions for each attribute. :param skip_empty: [optional] Skip flags with no items assigned to them. """ counts = {} flags = self.as_boolean_array() # (N, F) shape for attribute, bits in bits_per_attribute.items(): count = np.sum(np.any(flags[:, bits], axis=1)) if count > 0 or not skip_empty: counts[attribute] = count return counts
[docs] def as_boolean_array(self) -> np.ndarray: """ Return a (N, F) shaped big-endian boolean array indicating whether each bit is set for each item, where the input data array has shape (N, B) and `F = B * n_bits` is the maximum possible number of flags. """ N, B = self.array.shape num, offset = np.divmod(np.arange(B * self.n_bits), self.n_bits) return (self.array[:, num] & (1 << offset)).astype(bool)
[docs] def shrink(self): """Shrink the data array to the maximum required shape based on the highest bit set.""" index = 1 + np.where(np.any(self.array > 0, axis=0))[0][-1] self.array = self.array[:, :index] return self
[docs] def is_attribute_set(self, key, value) -> np.array: """ Return a N-length boolean array indicating whether the item has any flag with the given attribute. :param key: The attribute key. :param value: The attribute value. :returns: A boolean array indicating whether the item has any flag with the given attribute. """ bits = self.get_bits_with_attribute(key, value) if len(bits) == 0: raise ValueError(f"No bits found with attribute {key}={value}") num, offset = np.divmod(bits, self.n_bits) return np.any(self.array[:, num] & (1 << offset), axis=1)
[docs] def get_bits_with_attribute(self, key, value) -> List[int]: """ Return the bit positions for all items with the given attribute. :param key: The attribute key. :param value: The attribute value. :returns: A list of bit positions. """ bits = [] for bit, attrs in self.mapping.items(): try: if attrs[key] == value: bits.append(bit) except KeyError: continue return bits
[docs] def are_any_bits_set(self, *bits) -> np.array: """ Return an N-length boolean array indicating whether any of the given bits are set for each item. :param bits: The zero-indexed bit positions to check. :returns: A boolean array indicating whether any of the given bits are set for each item. """ return np.any(self._check_bits(*bits), axis=1)
[docs] def are_all_bits_set(self, *bits) -> np.array: """ Return an N-length boolean array indicating whether all of the given bits are set for each item. :param bits: The zero-indexed bit positions to check. :returns: A boolean array indicating whether all of the given bits are set for each item. """ return np.all(self._check_bits(*bits), axis=1)
[docs] def is_bit_set(self, bit) -> np.array: """ Return an N-length boolean array indicating whether the given bit is set for each item. :param bit: The zero-indexed bit position to check. :returns: A boolean array indicating whether the given bit is set for each item. """ return self.are_any_bits_set(bit)
[docs] def set_bit(self, index, bit): """ Set the given bit for the given item. :param index: The item index. :param bit: The zero-indexed bit position to set. """ num, offset = self._ensure_shape_for_bit(bit) self.array[index, num] |= (1 << offset) return self
[docs] def clear_bit(self, index, bit): """ Clear the given bit for the given index. :param index: The item index. :param bit: The zero-indexed bit position to clear. """ # Here we don't ensure_shape_for_bit because we don't want to create a YUGE array just # to clear a ficticious bit at position 2**128 num, offset = np.divmod(bit, self.n_bits) N, B = self.array.shape is_set_able = B > num self.array[index, num[is_set_able]] &= ~(1 << offset[is_set_able]) return self
[docs] def toggle_bit(self, index, bit): """ Toggle the given bit for the given index. :param index: The item index. :param bit: The zero-indexed bit position to clear. """ num, offset = self._ensure_shape_for_bit(bit) self.array[index, num] ^= (1 << offset) return self
def _check_bits(self, *bits): """Check whether the given bits are set or not.""" num, offset = np.divmod(bits, self.n_bits) N, B = self.array.shape can_be_set = B > num return self.array[:, num[can_be_set]] & (1 << offset[can_be_set]) def _ensure_shape_for_bit(self, bit: int) -> Tuple[int, int]: """ Ensure the data array has sufficient size to store information about the given bit. :param bit: The zero-indexed bit position to check. :returns: A tuple of the number of the data array column and the bit offset within that column. """ num, offset = np.divmod(bit, self.n_bits) N, F = self.array.shape if F <= np.max(num): C = (np.max(num) + 1) - F # little-endian self.array = np.hstack([ self.array, np.zeros((N, C), dtype=self.array.dtype) ]) return (num, offset) def __repr__(self): N, B = self.array.shape return f"<{self.__class__.__name__} with {N:,} items and up to {B * self.n_bits:,} flags ({len(self.mapping):,} defined) at {hex(id(self))}>"