Source code for lories._core._database

# -*- coding: utf-8 -*-
"""
lories._core._database
~~~~~~~~~~~~~~~~~~~~~~


"""

from __future__ import annotations

from abc import abstractmethod
from typing import Any, Optional, TypeVar, overload

import pandas as pd
from lories._core._connector import _Connector
from lories._core._resources import Resources
from lories._core.typing import Timestamp

# FIXME: Remove this once Python >= 3.9 is a requirement
try:
    from typing import Literal

except ImportError:
    from typing_extensions import Literal


[docs] class _Database(_Connector): # noinspection PyShadowingBuiltins @abstractmethod def hash( self, resources: Resources, start: Optional[Timestamp] = None, end: Optional[Timestamp] = None, method: Literal["MD5", "SHA1", "SHA256", "SHA512"] = "MD5", encoding: str = "UTF-8", ) -> Optional[str]: ... def exists( self, resources: Resources, start: Optional[Timestamp] = None, end: Optional[Timestamp] = None, ) -> bool: ... @overload def read(self, resources: Resources) -> pd.DataFrame: ... @overload def read( self, resources: Resources, start: Optional[Timestamp] = None, end: Optional[Timestamp] = None, ) -> pd.DataFrame: ... @abstractmethod def read( self, resources: Resources, start: Optional[Timestamp] = None, end: Optional[Timestamp] = None, ) -> pd.DataFrame: ... @abstractmethod def read_first(self, resources: Resources) -> Optional[pd.DataFrame]: ... @abstractmethod def read_first_index(self, resources: Resources) -> Optional[Any]: ... @abstractmethod def read_last(self, resources: Resources) -> Optional[pd.DataFrame]: ... @abstractmethod def read_last_index(self, resources: Resources) -> Optional[pd.Index]: ... @overload def delete(self, resources: Resources) -> None: ... @overload def delete( self, resources: Resources, start: Optional[Timestamp] = None, end: Optional[Timestamp] = None, ) -> None: ... @abstractmethod def delete( self, resources: Resources, start: Optional[Timestamp] = None, end: Optional[Timestamp] = None, ) -> None: ...
Database = TypeVar("Database", bound=_Database)