# -*- coding: utf-8 -*-
"""
lories.data.channels.channel
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
"""
from __future__ import annotations
from abc import abstractmethod
from collections.abc import Callable
from copy import deepcopy
from enum import Enum
from typing import Any, Collection, Dict, Mapping, Optional, TypeVar, overload
import pandas as pd
from lories._core._configurations import Configurations
from lories._core._context import Context
from lories._core._entity import Entity, _Entity
from lories._core._resource import _Resource
from lories._core.typing import Timestamp
# FIXME: Remove this once Python >= 3.9 is a requirement
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
[docs]
class ChannelState(Enum):
DISABLED = "DISABLED"
DISCONNECTING = "DISCONNECTING"
DISCONNECTED = "DISCONNECTED"
CONNECTING = "CONNECTING"
CONNECTED = "CONNECTED"
VALID = "VALID"
NOT_AVAILABLE = "NOT_AVAILABLE"
TIMEOUT = "TIMEOUT"
READ_ERROR = "READ_ERROR"
WRITE_ERROR = "WRITE_ERROR"
UNKNOWN_ERROR = "UNKNOWN_ERROR"
ARGUMENT_SYNTAX_ERROR = "ARGUMENT_SYNTAX_ERROR"
def __str__(self):
return str(self.value)
[docs]
class _Channel(_Resource):
INCLUDES: Collection[str] = (
"logger",
"connector",
"converter",
"replicator",
"replication",
"retention",
"rotate",
)
TYPE: str = "channel"
TIMESTAMP: str = "timestamp"
@property
@abstractmethod
def freq(self) -> Optional[str]: ...
@property
@abstractmethod
def timedelta(self) -> Optional[pd.Timedelta]: ...
@property
@abstractmethod
def timestamp(self) -> pd.Timestamp | pd.NaT: ...
@property
@abstractmethod
def value(self) -> Optional[Any]: ...
@value.setter
@abstractmethod
def value(self, value) -> None: ...
@property
@abstractmethod
def state(self) -> ChannelState | str: ...
@state.setter
@abstractmethod
def state(self, state) -> None: ...
@abstractmethod
def is_valid(self) -> bool: ...
@overload
def set(
self,
timestamp: pd.Timestamp,
value: Any,
) -> None: ...
@abstractmethod
def set(
self,
timestamp: pd.Timestamp,
value: Any,
state: Optional[str | ChannelState] = ChannelState.VALID,
) -> None: ...
@abstractmethod
def to_series(self, state: bool = False) -> pd.Series: ...
@abstractmethod
def from_logger(self) -> Channel: ...
@abstractmethod
def has_logger(self, *ids: Optional[str]) -> bool: ...
# noinspection PyShadowingBuiltins
@abstractmethod
def has_connector(self, id: Optional[str] = None) -> bool: ...
@abstractmethod
def register(
self,
function: Callable[[pd.DataFrame], None],
how: Literal["any", "all"] = "any",
unique: bool = False,
) -> None: ...
@abstractmethod
def read(
self,
start: Optional[Timestamp] = None,
end: Optional[Timestamp] = None,
) -> pd.DataFrame: ...
@overload
def write(self, data: Any) -> None: ...
@overload
def write(self, data: pd.Series) -> None: ...
@overload
def write(self, data: pd.DataFrame) -> None: ...
@abstractmethod
def write(self, data: pd.DataFrame | pd.Series | Any) -> None: ...
# noinspection PyShadowingBuiltins
@classmethod
def _build_id(
cls,
id: Optional[str] = None,
key: Optional[str] = None,
context: Optional[Context | Entity] = None,
) -> str:
if id is None:
if key is None:
raise ValueError(f"Unable to build '{cls.__name__}' for missing ID")
if id is None and context is not None and isinstance(context, _Entity):
id = f"{context.id}.{key}"
else:
id = key
return id
@staticmethod
def _build_defaults(configs: Configurations) -> Dict[str, Any]:
return _Channel._build_configs(
{k: deepcopy(v) for k, v in configs.items() if not isinstance(v, Mapping) or k in _Channel.INCLUDES}
)
@staticmethod
def _build_configs(configs: Dict[str, Any]) -> Dict[str, Any]:
def _build_wrapper(_key: str, _type: Optional[str] = None) -> None:
if _type is None:
_type = _key
if _type not in configs:
return
configs[_type] = _Channel.__build_member(configs[_type], _key)
_build_wrapper("converter")
_build_wrapper("connector")
_build_wrapper("connector", "logger")
return configs
@staticmethod
def __build_member(configs: Optional[Dict[str, Any] | str], key: str) -> Optional[Dict[str, Any]]:
if isinstance(configs, str) or configs is None:
return {key: configs}
elif not isinstance(configs, Mapping):
raise ValueError(f"Invalid channel {key} type: " + str(configs))
return dict(configs)
Channel = TypeVar("Channel", bound=_Channel)