# -*- coding: utf-8 -*-
"""
lories.data.databases
~~~~~~~~~~~~~~~~~~~~~
"""
from __future__ import annotations
import logging
from typing import Any, Collection, Optional
import tzlocal
import pandas as pd
from lories.connectors import ConnectorContext, ConnectType, Database
from lories.core import Configurations, Configurator, ResourceError
from lories.data.channels import Channel, Channels
from lories.data.context import DataContext
from lories.data.replication import Replications
from lories.data.retention import Retention, Retentions
from lories.util import floor_date, parse_freq, to_bool, to_timedelta, to_timezone
[docs]
class Databases(ConnectorContext, Configurator):
TYPE: str = "databases"
# noinspection PyProtectedMember, PyUnresolvedReferences
def __init__(self, context: DataContext, configs: Configurations) -> None:
super().__init__(context, configs=configs.get_member(Databases.TYPE, defaults={}))
self.load(configure=False, sort=False)
self.configure()
for database in context.connectors.filter(lambda c: c.is_enabled() and isinstance(c, Database)):
self._add(database)
self.sort()
# noinspection PyProtectedMember
@classmethod
def _assert_configs(cls, configs: Configurations) -> Configurations:
if configs is None:
raise ResourceError(f"Invalid '{cls.__name__}' NoneType configurations")
return super()._assert_configs(configs)
def load(self, **kwargs: Any) -> Collection[Database]:
databases = self._load(
self,
self.configs,
configs_file=self.configs.name,
configs_dir=self.configs.dirs.conf.joinpath(self.configs.name.replace(".conf", ".d")),
includes=Database.INCLUDES,
**kwargs,
)
# for database in databases:
# database._connect_type = ConnectType.NONE
return databases
# noinspection PyProtectedMember, PyUnresolvedReferences
def connect(self, channels: Optional[Channels] = None) -> None:
self.context._connect(*self.filter(self.__is_connectable), channels=channels, force=True)
# noinspection PyProtectedMember, PyUnresolvedReferences
def _connect(self, database: Database, channels: Optional[Channels] = None) -> None:
if self.__is_connectable(database):
self.context._connect(database, channels=channels, force=True)
# noinspection PyProtectedMember
@staticmethod
def __is_connectable(database: Database) -> bool:
return database._connect_type == ConnectType.NONE or not database._is_connected()
# noinspection PyProtectedMember, PyUnresolvedReferences
def disconnect(self):
self.context._disconnect(*self.filter(self.__is_disconnectable))
# noinspection PyProtectedMember, PyUnresolvedReferences
def _disconnect(self, database: Database) -> None:
if self.__is_disconnectable(database):
self.context._disconnect(database)
# noinspection PyProtectedMember
@staticmethod
def __is_disconnectable(database: Database) -> bool:
return database._connect_type == ConnectType.NONE and database._is_connected()
# noinspection PyProtectedMember
def replicate(self, channels: Channels, full: bool = False, force: bool = False, **kwargs) -> None:
replications = Replications()
def build_replication(channel: Channel) -> Channel:
channel = channel.from_logger()
channel.replication = replications.build(self, channel, **kwargs)
return channel
channels = channels.apply(build_replication).filter(lambda c: c.replication is not None)
for database in self.values():
database_channels = channels.filter(lambda c: c.replication.database.id == database.id)
if len(database_channels) == 0:
continue
self._connect(database, channels=database_channels)
try:
for logger, logger_channels in database_channels.groupby(lambda c: c.logger._connector):
self._connect(logger, channels=logger_channels)
try:
for replication, replication_channels in logger_channels.groupby(lambda c: c.replication):
replication.replicate(replication_channels, full=to_bool(full), force=to_bool(force))
except ResourceError as e:
self._logger.warning(f"Error replicating database '{database.id}': {str(e)}")
if self._logger.getEffectiveLevel() <= logging.DEBUG:
self._logger.exception(e)
finally:
self._disconnect(logger)
finally:
self._disconnect(database)
# noinspection PyProtectedMember
def rotate(self, channels: Channels, full: bool = False) -> None:
retentions = Retentions()
def build_rotation(channel: Channel) -> Channel:
channel = channel.from_logger()
channel.rotate = parse_freq(channel.get("rotate", default=None))
channel.retentions = Retention.build(self.configs, channel)
retentions.extend(channel.retentions, unique=True)
return channel
channels = channels.apply(build_rotation).filter(lambda c: c.rotate is not None or len(c.retentions) > 0)
for database in self.values():
database_channels = channels.filter(lambda c: c.has_logger(database.id))
if len(database_channels) == 0:
continue
self._connect(database, channels=database_channels)
try:
for rotation, rotation_channels in database_channels.groupby(lambda c: c.rotate):
if rotation is None:
continue
freq = self.configs.get("freq", default="D")
timezone = to_timezone(self.configs.get("timezone", default=tzlocal.get_localzone_name()))
rotate = floor_date(pd.Timestamp.now(tz=timezone) - to_timedelta(rotation), freq=freq)
for _, deletion_channels in rotation_channels.groupby(lambda c: c.group):
start = database.read_first_index(deletion_channels)
if start is None or start > rotate:
self._logger.debug(
f"Skip rotating values of resource{'s' if len(deletion_channels) > 1 else ''} "
+ ", ".join([f"'{r.id}'" for r in deletion_channels])
+ " without any values found"
)
continue
self._logger.info(
f"Deleting values of resource{'s' if len(deletion_channels) > 1 else ''} "
+ ", ".join([f"'{r.id}'" for r in deletion_channels])
+ f" up to {rotate.strftime('%d.%m.%Y (%H:%M:%S)')}"
)
database.delete(deletion_channels, end=rotate)
retentions.sort()
for retention in retentions:
if not retention.enabled:
self._logger.debug(f"Skipping disabled retention '{retention.keep}'")
continue
try:
# noinspection PyProtectedMember
def has_retention(channel: Channel) -> bool:
return (
channel.logger.enabled
and isinstance(channel.logger._connector, Database)
and retention in channel.retentions
)
retention.aggregate(database_channels.filter(has_retention), full=to_bool(full))
except ResourceError as e:
self._logger.warning(
f"Error aggregating '{retention.method}' retaining {retention.keep}: {str(e)}"
)
finally:
self._disconnect(database)