Source code for lories.data.channels.channels

# -*- coding: utf-8 -*-
"""
lories.data.channels.channels
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


"""

from __future__ import annotations

from collections import OrderedDict
from collections.abc import Callable
from typing import Any, Iterator, Tuple

import numpy as np
import pandas as pd
from lories._core._channel import Channel, ChannelState, _Channel  # noqa
from lories._core._channels import Channels as ChannelsType  # noqa
from lories._core._channels import _Channels  # noqa
from lories.core import Resources
from lories.data.validation import validate_index

# FIXME: Remove this once Python >= 3.9 is a requirement
try:
    from typing import Literal

except ImportError:
    from typing_extensions import Literal


[docs] class Channels(_Channels, Resources[Channel]): def __str__(self) -> str: return str(self.to_frame(unique=True, states=True)) def register( self, function: Callable[[pd.DataFrame], None], how: Literal["any", "all"] = "any", unique: bool = False, ) -> None: for channel in self: channel.register(function, how=how, unique=unique) # noinspection PyTypeChecker def apply(self, apply: Callable[[Channel], Channel], inplace: bool = False) -> ChannelsType: return super().apply(apply, inplace=inplace) def filter(self, *filters: Callable[[Channel], bool]) -> ChannelsType: return super().filter(*filters) # noinspection PyShadowingBuiltins, SpellCheckingInspection def groupby(self, by: Callable[[Channel], Any] | str) -> Iterator[Tuple[Any, ChannelsType]]: return super().groupby(by) def from_logger(self) -> ChannelsType: return type(self)([c.from_logger() for c in self if c.has_logger()]) def to_frame(self, unique: bool = False, states: bool = False) -> pd.DataFrame: data = OrderedDict() columns = list(self.keys if not unique else self.ids) for channel in self: if pd.isna(channel.timestamp): continue channel_uid = channel.key if not unique else channel.id channel_data = channel.to_series(state=states) channel_data.name = channel_uid if channel_data.empty: continue for timestamp, channel_values in channel_data.to_frame().to_dict(orient="index").items(): if timestamp not in data: timestamp_data = data[timestamp] = {c: np.nan for c in columns} else: timestamp_data = data[timestamp] if any(not pd.isna(timestamp_data[k]) for k in channel_values.keys()): self._logger.warning( f"Overriding value for duplicate index while merging channel '{channel.id}' into " f"DataFrame for index: {channel_data.index}" ) timestamp_data.update(channel_values) if len(data) == 0: return pd.DataFrame(columns=columns) data = pd.DataFrame.from_records( data=list(data.values()), index=list(data.keys()), ) data.dropna(axis="index", how="all", inplace=True) data = validate_index(data) data.index.name = _Channel.TIMESTAMP return data # noinspection PyProtectedMember def set_frame(self, data: pd.DataFrame) -> None: for converter, channels in self.groupby(lambda c: c.converter._converter): converted_data = converter.from_frame(data, channels) for channel in channels: channel_data = converted_data.loc[:, channel.id].dropna() if channel_data.empty or channel_data.isna().all(): channel.state = ChannelState.NOT_AVAILABLE self._logger.debug(f"Missing value for channel: {channel.id}") continue timestamp = channel_data.index[0] if len(channel_data.index) == 1: channel_data = channel_data.values[0] channel.set(timestamp, channel_data) def set_state(self, state: ChannelState) -> None: def _set_state(channel: Channel) -> Channel: channel.state = state return channel self.apply(_set_state, inplace=True)