# -*- coding: utf-8 -*-
"""
lories.connectors.connector
~~~~~~~~~~~~~~~~~~~~~~~~~~~
"""
from __future__ import annotations
import datetime as dt
from collections import OrderedDict
from functools import wraps
from threading import Lock
from typing import Any, Dict, Optional
import pandas as pd
import pytz as tz
from lories._core._configurations import Configurations # noqa
from lories._core._connector import ConnectType, _Connector # noqa
from lories._core._context import _Context # noqa
from lories._core._registrator import RegistratorContext # noqa
from lories.connectors.errors import ConnectorError
from lories.core import Resource, ResourceError, Resources
from lories.core.configs.configurator import Configurator, ConfiguratorMeta
from lories.core.configs.errors import ConfigurationError
from lories.core.register.registrator import Registrator
from lories.data.channels import Channel, Channels, ChannelState
from lories.data.validation import validate_index
# noinspection PyAbstractClass
[docs]
class Connector(_Connector, Registrator, metaclass=ConnectorMeta):
_connected: bool = False
_connect_type: ConnectType = ConnectType.AUTO
_timestamp_connect: pd.Timestamp = pd.NaT
_timestamp_disconnect: pd.Timestamp = pd.NaT
_interval_reconnect: pd.Timedelta = pd.Timedelta(minutes=1)
__resources: Resources
_lock: Lock
def __init__(
self,
context: RegistratorContext,
configs: Optional[Configurations] = None,
**kwargs,
) -> None:
super().__init__(context=context, configs=configs, **kwargs)
self.__resources = Resources()
self._lock = Lock()
def __enter__(self) -> Connector:
self.connect(self.__resources)
return self
# noinspection PyShadowingBuiltins
def __exit__(self, type, value, traceback):
self.disconnect()
@classmethod
def _assert_context(cls, context: RegistratorContext) -> RegistratorContext:
if context is None:
raise TypeError(f"Invalid '{cls.__name__}' context: {type(context)}")
return super()._assert_context(context)
# noinspection PyShadowingBuiltins, PyProtectedMember
def _get_vars(self) -> Dict[str, Any]:
vars = super()._get_vars()
# Channels are a subset of resources, hence omit them from printing
vars.pop("channels", None)
return vars
# noinspection PyShadowingBuiltins
def _convert_vars(self, convert: callable = str) -> Dict[str, str]:
vars = self._get_vars()
values = OrderedDict()
try:
id = vars.pop("id", self.id)
key = vars.pop("key", self.key)
if id != key:
values["id"] = id
values["key"] = key
except (ResourceError, AttributeError):
# Abstract properties are not yet instanced
pass
if "name" in vars:
values["name"] = vars.pop("name")
values.update(
{
k: str(v) if not isinstance(v, (_Context, Configurator, Resource, Resources)) else convert(v)
for k, v in vars.items()
}
)
values["context"] = convert(self.context)
values["configurations"] = convert(self.configs)
values["configured"] = str(self.is_configured())
values["connected"] = str(self._is_connected())
values["enabled"] = str(self.is_enabled())
return values
@property
def resources(self) -> Resources:
return self.__resources
@property
def channels(self) -> Channels:
return Channels([resource for resource in self.__resources if isinstance(resource, Channel)])
def set_channels(self, state: ChannelState) -> None:
# Set only channel states for channels, that actively are getting read or written by this connector.
# Local channels may be logging channels as well, which need to be skipped.
for channel in self.channels.filter(lambda c: c.has_connector(self.id)):
channel.state = state
def configure(self, configs: Configurations) -> None:
super().configure(configs)
self._connect_type = ConnectType.get(configs.get("connect", default=True))
def _is_disconnected(self) -> bool:
return not self._is_connected()
# noinspection SpellCheckingInspection
def _is_reconnectable(self) -> bool:
if not self.is_enabled() or not self.is_configured() or not self._is_connectable():
return False
if not pd.isna(self._timestamp_connect):
return self._timestamp_connect + self._interval_reconnect <= pd.Timestamp.now(tz.UTC)
if not pd.isna(self._timestamp_disconnect):
return self._timestamp_disconnect + self._interval_reconnect <= pd.Timestamp.now(tz.UTC)
return False
def _is_connectable(self) -> bool:
return self._is_disconnected() and self._connect_type == ConnectType.AUTO
def _is_connected(self) -> bool:
return self.is_connected() and self._connected
def is_connected(self) -> bool:
return True
# noinspection PyUnresolvedReferences, PyTypeChecker
@wraps(_Connector.connect, updated=())
def _do_connect(self, resources: Resources, *args, **kwargs) -> None:
with self._lock:
if not self.is_enabled():
raise ConfigurationError(f"Trying to connect disabled {type(self).__name__}: {self.id}")
if not self.is_configured():
raise ConfigurationError(f"Trying to connect unconfigured {type(self).__name__}: {self.id}")
self._timestamp_connect = pd.Timestamp.now(tz.UTC)
self._timestamp_disconnect = pd.NaT
if not self._is_connected():
self._at_connect(resources)
self._run_connect(resources, *args, **kwargs)
self._on_connect(resources)
self.__resources = resources
else:
self._logger.warning(f"{type(self).__name__} '{self.id}' already connected")
self._connected = True
def _at_connect(self, resources: Resources) -> None:
pass
def _on_connect(self, resources: Resources) -> None:
pass
# noinspection PyUnresolvedReferences, PyTypeChecker
@wraps(_Connector.disconnect, updated=())
def _do_disconnect(self) -> None:
with self._lock:
self._timestamp_connect = pd.NaT
self._timestamp_disconnect = pd.Timestamp.now(tz.UTC)
if not self._is_disconnected():
self._at_disconnect()
self._run_disconnect()
self._on_disconnect()
self._connected = False
def _at_disconnect(self) -> None:
pass
def _on_disconnect(self) -> None:
pass
# noinspection PyUnresolvedReferences, PyTypeChecker
@wraps(_Connector.read, updated=())
def _do_read(self, resources: Resources, *args, **kwargs) -> pd.DataFrame:
with self._lock:
if not self._is_connected():
raise ConnectorError(self, f"Trying to read from unconnected {type(self).__name__}: {self.id}")
data = self._run_read(resources, *args, **kwargs)
data = self._validate(resources, data)
return data
# noinspection PyMethodMayBeStatic
def _validate(self, resources: Resources, data: pd.DataFrame) -> pd.DataFrame:
if not data.empty:
data = validate_index(data)
for resource in resources:
if resource.id not in data:
continue
if resource.type in [pd.Timestamp, dt.datetime]:
resource_data = data[resource.id]
if pd.api.types.is_string_dtype(resource_data.values):
data[resource.id] = pd.to_datetime(resource_data)
return data
# noinspection PyUnresolvedReferences, PyTypeChecker
@wraps(_Connector.write, updated=())
def _do_write(self, data: pd.DataFrame, *args, **kwargs) -> None:
with self._lock:
if not self._is_connected():
raise ConnectorError(self, f"Trying to write to unconnected {type(self).__name__}: {self.id}")
unknown = [c for c in data.columns if c not in self.resources]
if len(unknown) > 0:
raise ConnectorError(
self,
f"Trying to write unknown resource{'s' if len(unknown) > 0 else ''} '{', '.join(unknown)}' for "
f"{type(self).__name__}: {self.id}",
)
self._run_write(data, *args, **kwargs)