# -*- coding: utf-8 -*-
"""
lories.data.manager
~~~~~~~~~~~~~~~~~~~
"""
from __future__ import annotations
import logging
import os
import signal
import time
from collections.abc import Callable
from concurrent import futures
from concurrent.futures import Future, ThreadPoolExecutor, TimeoutError
from functools import partial
from threading import Event, Thread
from typing import Any, Dict, Mapping, Optional, Type
import pandas as pd
import pytz as tz
from lories._core import _Context, _DataManager # noqa
from lories.components import Component, ComponentContext
from lories.connectors import Connector, ConnectorContext, ConnectorError
from lories.connectors.tasks import CheckTask, ConnectTask, LogTask, ReadTask, WriteTask
from lories.core.activator import Activator
from lories.core.configs import ConfigurationError, Configurations
from lories.core.register import Registrator, RegistratorContext
from lories.core.typing import ChannelsArgument, Timestamp
from lories.data.channels import Channel, ChannelConnector, ChannelConverter, Channels, ChannelState
from lories.data.context import DataContext
from lories.data.converters import ConverterContext
from lories.data.databases import Database, Databases
from lories.data.listeners import ListenerContext
from lories.data.replication import Replication
from lories.data.retention import Retention
from lories.util import floor_date, parse_type, to_bool, to_timedelta, validate_key
# FIXME: Remove this once Python >= 3.9 is a requirement
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
# noinspection PyProtectedMember
[docs]
class DataManager(_DataManager, DataContext, Activator):
_converters: ConverterContext
_connectors: ConnectorContext
_components: ComponentContext
_listeners: ListenerContext
_executor: ThreadPoolExecutor
__runner: Thread
__interrupt: Event
_interval: int
def __init__(self, configs: Configurations, name: str, **kwargs) -> None:
super().__init__(configs=configs, key=validate_key(name), name=name, **kwargs)
self.__interrupt = Event()
self.__interrupt.set()
self._converters = ConverterContext(self)
self._connectors = ConnectorContext(self)
self._components = ComponentContext(self)
self._listeners = ListenerContext(self)
self._executor = ThreadPoolExecutor(
thread_name_prefix=self.name,
max_workers=max(int((os.cpu_count() or 1) / 2), 1),
)
self.__runner = Thread(name=self.name, target=self.run)
signal.signal(signal.SIGINT, self.interrupt)
signal.signal(signal.SIGTERM, self.deactivate)
# noinspection PyArgumentList
def __contains__(self, item: str | Channel | Connector | Component) -> bool:
channels = _Context.__getattribute__(self, f"{_Context.__name__}__map")
if isinstance(item, str):
return item in channels.keys()
if isinstance(item, Channel):
return item in channels.values()
if isinstance(item, Connector):
return item in self._connectors.values()
if isinstance(item, Component):
return item in self._components.values()
return False
# noinspection PyShadowingBuiltins
def _create(self, id: str, key: str, type: Type, **configs: Any) -> Channel:
# noinspection PyShadowingBuiltins
def build_args(
registrator_context: RegistratorContext,
registrator_type: str,
name: Optional[str] = None,
) -> Dict[str, Any]:
if name is None:
name = registrator_type
registrator_configs = configs.pop(name, None)
if registrator_configs is None:
return {registrator_type: None}
if isinstance(registrator_configs, str):
registrator_configs = {registrator_type: registrator_configs}
elif not isinstance(registrator_configs, Mapping):
raise ConfigurationError(f"Invalid channel {name} type: " + str(registrator_configs))
elif registrator_type not in registrator_configs:
return {registrator_type: None}
registrator_id = registrator_configs.pop(registrator_type)
if registrator_id is not None and "." not in registrator_id:
registrator_path = id.split(".")
for i in reversed(range(1, len(registrator_path))):
_registrator_id = ".".join([*registrator_path[:i], registrator_id])
if _registrator_id in registrator_context.keys():
registrator_id = _registrator_id
break
registrator = registrator_context.get(registrator_id, None) if registrator_id else None
return {registrator_type: registrator, **registrator_configs}
if "converter" not in configs:
converter = ChannelConverter(self._converters.get_by_dtype(parse_type(type)))
else:
converter = ChannelConverter(**build_args(self._converters, "converter"))
connector = ChannelConnector(**build_args(self._connectors, "connector"))
logger = ChannelConnector(**build_args(self._connectors, "connector", "logger"))
return Channel(
id=id, key=key, type=type, context=self, converter=converter, connector=connector, logger=logger, **configs
)
def configure(self, configs: Configurations) -> None:
super().configure(configs)
self._interval = configs.get_int("interval", default=1)
def _at_configure(self, configs: Configurations) -> None:
super()._at_configure(configs)
self._load(self, configs, sort=False)
self._converters.load(configure=False, sort=False)
self._converters.configure()
self._connectors.load(configure=False, sort=False)
self._connectors.configure()
self._components.load(configure=False, sort=False)
self._components.configure()
def _on_configure(self, configs: Configurations) -> None:
super()._on_configure(configs)
self._converters.sort()
self._connectors.sort()
self._components.sort()
self.sort()
# noinspection PyShadowingBuiltins
def activate(self, filter: Optional[Callable[[Registrator], bool]] = None) -> None:
super().activate()
self._connect(*self._connectors.filter(_filter(filter)))
self._activate(*self._components.filter(_filter(filter)))
def _activate(self, *component: Component) -> None:
for component in component:
if not component.is_enabled():
self._logger.debug(
f"Skipping to activate disabled {type(component).__name__} '{component.name}': {component.id}"
)
continue
self.__activate(component)
def __activate(self, component: Component) -> None:
self._logger.debug(f"Activating {type(component).__name__} '{component.name}': {component.id}")
component.activate()
self._logger.info(f"Activated {type(component).__name__} '{component.name}': {component.id}")
# noinspection PyShadowingBuiltins
def connect(
self,
filter: Optional[Callable[[Registrator], bool]] = None,
channels: Optional[Channels] = None,
timeout: Optional[float] = None,
) -> None:
self._connect(*self._connectors.filter(_filter(filter)), channels=channels, timeout=timeout)
def _connect(
self,
*connectors: Connector,
channels: Optional[Channels] = None,
timeout: Optional[float] = None,
force: bool = False,
) -> None:
connect_futures = {}
for connector in connectors:
if not connector.is_enabled():
self._logger.debug(
f"Skipping to connect disabled {type(connector).__name__} '{connector.name}': {connector.id}"
)
continue
if connector._is_connected():
self._logger.debug(
f"Skipping already connected {type(connector).__name__} '{connector.name}': {connector.id}"
)
continue
if not connector._is_connectable() and not force:
self._logger.debug(
f"Skipping not connectable {type(connector).__name__} '{connector.name}': {connector.id}"
)
continue
connect_task = self.__connect(connector, channels)
connect_future = self._executor.submit(connect_task)
connect_futures[connect_future] = connect_task
self.__connect_futures(connect_futures, timeout)
def __connect(self, connector: Connector, channels: Optional[Channels] = None) -> ConnectTask:
self._logger.debug(f"Connecting {type(connector).__name__} '{connector.name}': {connector.id}")
if channels is None:
channels = self.channels.filter(lambda c: c.has_connector(connector.id))
channels.update(self.channels.filter(lambda c: c.has_logger(connector.id)).apply(lambda c: c.from_logger()))
return ConnectTask(connector, channels)
def __connect_futures(
self,
tasks: Dict[Future, ConnectTask],
timeout: Optional[float] = None,
) -> None:
try:
for future in futures.as_completed(tasks, timeout=timeout):
tasks.pop(future)
self.__connect_callback(future)
except TimeoutError:
for future, task in tasks.items():
self._logger.warning(f"Timed out opening connector '{task.connector.id}' after {timeout} seconds")
future.cancel()
def __connect_callback(self, future: Future) -> None:
try:
connector = future.result()
self._logger.info(f"Connected {type(connector).__name__} '{connector.name}': {connector.id}")
except ConnectorError as e:
self._logger.warning(f"Failed opening connector '{e.connector.id}': {str(e)}")
if self._logger.getEffectiveLevel() <= logging.DEBUG:
self._logger.exception(e)
# noinspection PyShadowingBuiltins
def reconnect(
self,
filter: Optional[Callable[[Registrator], bool]] = None,
) -> None:
self._reconnect(*self._connectors.filter(_filter(filter)))
def _reconnect(self, *connectors: Connector) -> None:
for connector in connectors:
if not connector.is_enabled():
self._logger.debug(
f"Skipping reconnecting disabled {type(connector).__name__} '{connector.name}': {connector.id}"
)
continue
if not connector._is_connected() and connector._connected:
# Connection aborted and not yet disconnected properly
self._disconnect(connector)
continue
connect_task = self.__connect(connector)
connect_future = self._executor.submit(connect_task)
connect_future.add_done_callback(self.__connect_callback)
# noinspection PyShadowingBuiltins
def disconnect(
self,
filter: Optional[Callable[[Registrator], bool]] = None,
) -> None:
self._disconnect(*self._connectors.filter(_filter(filter)))
def _disconnect(self, *connectors: Connector) -> None:
for connector in reversed(connectors):
if not connector._is_connected():
self._logger.debug(
f"Skipping to disconnect unconnected {type(connector).__name__} '{connector.name}': {connector.id}"
)
continue
self.__disconnect(connector)
def __disconnect(self, connector: Connector) -> None:
try:
self._logger.debug(f"Disconnecting {type(connector).__name__} '{connector.name}': {connector.id}")
connector.set_channels(ChannelState.DISCONNECTING)
connector.disconnect()
self._logger.info(f"Disconnected {type(connector).__name__} '{connector.name}': {connector.id}")
except Exception as e:
self._logger.warning(f"Failed closing connector '{connector.id}': {str(e)}")
if self._logger.getEffectiveLevel() <= logging.DEBUG:
self._logger.exception(e)
finally:
connector.set_channels(ChannelState.DISCONNECTED)
# noinspection PyShadowingBuiltins
def deactivate(self, *_, filter: Optional[Callable[[Registrator], bool]] = None) -> None:
self.interrupt()
super().deactivate()
self._deactivate(*self._components.filter(_filter(filter)))
self._disconnect(*self._connectors.filter(_filter(filter)))
def _deactivate(self, *components: Component) -> None:
for component in reversed(list(components)):
if not component.is_active():
self._logger.debug(
f"Skipping to deactivate already deactivated {type(component).__name__} '{component.name}': "
f"{component.id}"
)
return
self.__deactivate(component)
def __deactivate(self, component: Component) -> None:
if not component.is_active():
self._logger.debug(
f"Skipping to deactivate already deactivated {type(component).__name__} '{component.name}': "
f"{component.id}"
)
return
try:
self._logger.debug(f"Deactivating {type(component).__name__} '{component.name}': {component.id}")
component.deactivate()
self._logger.info(f"Deactivated {type(component).__name__} '{component.name}': {component.id}")
except Exception as e:
self._logger.warning(f"Failed deactivating component '{component.id}': {str(e)}")
if self._logger.getEffectiveLevel() <= logging.DEBUG:
self._logger.exception(e)
def interrupt(self, *_) -> None:
self.__interrupt.set()
# FIXME: Add cancel_futures argument again, once Python >= 3.9 is a requirement
self._executor.shutdown(wait=True) # , cancel_futures=True)
if self.__runner.is_alive():
self.__runner.join()
def register(
self,
function: Callable[[pd.DataFrame], None],
channels: Optional[ChannelsArgument] = None,
how: Literal["any", "all"] = "any",
unique: bool = False,
) -> None:
self._listeners.register(function, self._filter_by_args(channels), how=how, unique=unique)
@property
def converters(self) -> ConverterContext:
return self._converters
@property
def connectors(self) -> ConnectorContext:
return self._connectors
@property
def components(self) -> ComponentContext:
return self._components
@property
def listeners(self) -> ListenerContext:
return self._listeners
def notify(
self,
channels: Optional[ChannelsArgument] = None,
timeout: Optional[float] = None,
) -> None:
channels = self._filter_by_args(channels)
now = pd.Timestamp.now(tz.UTC)
def _submit_listeners(_timeout: float) -> bool:
_futures = []
with self.listeners:
for _listener in self.listeners.notify(*channels):
_future = self._executor.submit(_listener, now)
_future.add_done_callback(self._notify_callback)
_futures.append(_future)
if len(_futures) > 0:
futures.wait(_futures, timeout=_timeout)
return True
return False
while _submit_listeners(timeout):
if timeout is not None:
timeout -= (pd.Timestamp.now(tz.UTC) - now).total_seconds()
if timeout <= 0:
break
# noinspection PyUnresolvedReferences
def _notify_callback(self, future: Future) -> None:
exception = future.exception()
if exception is not None:
listener = exception.listener
self._logger.warning(f"Failed notifying listener '{listener.id}': {str(exception)}")
if self._logger.getEffectiveLevel() <= logging.DEBUG:
self._logger.exception(exception)
def start(self, wait: bool = True) -> None:
self._logger.info(f"Starting {type(self).__name__}: {self.name}")
self.__interrupt.clear()
self.__runner.start()
if wait:
self.__runner.join()
# noinspection PyShadowingBuiltins, PyProtectedMember
def run(self, **kwargs) -> None:
now = pd.Timestamp.now(tz.UTC)
channels = self.channels.filter(lambda c: self.__is_reading(c, now))
if len(channels) > 0:
self.read(channels, inplace=True, **kwargs)
interval = f"{self._interval}s"
_sleep(interval)
while not self.__interrupt.is_set():
try:
now = pd.Timestamp.now(tz.UTC)
self.__read(now, timeout=self._interval / 4)
self.reconnect(lambda c: c._is_reconnectable())
self.notify(timeout=self._interval / 4)
self.log()
_sleep(interval, self.__interrupt.wait)
except KeyboardInterrupt:
self.interrupt()
break
self.notify()
self.log()
# noinspection PyShadowingBuiltins
def has_logged(
self,
channels: Optional[ChannelsArgument] = None,
start: Optional[Timestamp] = None,
end: Optional[Timestamp] = None,
timeout: Optional[float] = None,
) -> bool:
channels = self._filter_by_args(channels)
check_futures = {}
for id, connector in self.connectors.items():
if not connector._is_connected():
continue
def has_database(channel: Channel) -> bool:
return channel.has_logger(id) and channel.logger.is_database()
check_channels = channels.filter(has_database).apply(lambda c: c.from_logger())
if len(check_channels) == 0:
continue
check_task = CheckTask(connector, check_channels)
check_future = self._executor.submit(check_task, start=start, end=end)
check_futures[check_future] = check_task
check_results = []
try:
for check_future in futures.as_completed(check_futures, timeout=timeout):
check_task = check_futures.pop(check_future)
try:
check_exists = check_future.result()
check_results.append(check_exists)
except ConnectorError as e:
self._logger.warning(f"Failed checking connector '{check_task.connector.id}': {str(e)}")
if self._logger.getEffectiveLevel() <= logging.DEBUG:
self._logger.exception(e)
check_results.append(False)
except TimeoutError:
for check_future, check_task in check_futures.items():
self._logger.warning(
f"Timed out checking connector '{check_task.connector.id}' after {timeout} seconds"
)
check_future.cancel()
check_results.append(False)
if len(check_results) == 0:
return False
return all(check_results)
# noinspection PyShadowingBuiltins
def read_logged(
self,
channels: Optional[ChannelsArgument] = None,
start: Optional[Timestamp] = None,
end: Optional[Timestamp] = None,
timeout: Optional[float] = None,
) -> pd.DataFrame:
channels = self._filter_by_args(channels)
read_futures = {}
for id, connector in self.connectors.items():
if not connector._is_connected():
continue
def has_database(channel: Channel) -> bool:
return channel.has_logger(id) and channel.logger.is_database()
read_channels = channels.filter(has_database).apply(lambda c: c.from_logger())
if len(read_channels) == 0:
continue
read_task = ReadTask(connector, read_channels)
read_future = self._executor.submit(read_task, start=start, end=end)
read_futures[read_future] = read_task
return self._read_futures(read_futures, timeout)
# noinspection PyShadowingBuiltins, PyTypeChecker
def read(
self,
channels: Optional[ChannelsArgument] = None,
timeout: Optional[float] = None,
inplace: bool = False,
**kwargs,
) -> pd.DataFrame:
channels = self._filter_by_args(channels)
read_futures = {}
for id, connector in self.connectors.items():
if not connector._is_connected():
continue
read_channels = channels.filter(lambda c: c.has_connector(id))
if len(read_channels) == 0:
continue
read_task = ReadTask(connector, read_channels)
read_future = self._executor.submit(read_task, inplace=inplace, **kwargs)
read_futures[read_future] = read_task
return self._read_futures(read_futures, timeout, inplace)
def _read_futures(
self,
tasks: Dict[Future, ReadTask],
timeout: Optional[float] = None,
inplace: bool = False,
) -> pd.DataFrame:
results = []
try:
for future in futures.as_completed(tasks, timeout=timeout):
task = tasks.pop(future)
data = self._read_callback(task, future, inplace)
if data is not None:
results.append(data)
except TimeoutError:
for future, task in tasks.items():
self._logger.warning(f"Timed out reading connector '{task.connector.id}' after {timeout} seconds")
future.cancel()
if inplace:
channels = task.channels
channels.set_state(ChannelState.TIMEOUT)
if len(results) == 0:
return pd.DataFrame()
results = sorted(results, key=lambda d: min(d.index))
return pd.concat(results, axis="columns")
def _read_callback(
self,
task: ReadTask,
future: Future,
inplace: bool = False,
) -> Optional[pd.DataFrame]:
channels = task.channels
try:
return future.result()
except ConnectorError as e:
self._logger.warning(f"Failed reading connector '{task.connector.id}': {str(e)}")
if self._logger.getEffectiveLevel() <= logging.DEBUG:
self._logger.exception(e)
if inplace:
channels.set_state(ChannelState.READ_ERROR)
return None
# noinspection PyShadowingBuiltins, PyTypeChecker
def __read(
self,
timestamp: Timestamp,
timeout: Optional[float] = None,
**kwargs,
) -> None:
channels = self.channels.filter(lambda c: self.__is_reading(c, timestamp))
if len(channels) < 1:
return
self._logger.debug(f"Reading {len(channels)} channels of application: {self.name}")
read_futures = []
for id, connector in self.connectors.items():
if not connector._is_connected():
continue
read_channels = channels.filter(lambda c: c.has_connector(id))
if len(read_channels) == 0:
continue
read_task = ReadTask(connector, read_channels)
read_future = self._executor.submit(read_task, inplace=True, **kwargs)
read_future.add_done_callback(partial(self._read_callback, read_task, inplace=True))
read_futures.append(read_future)
def update_timestamp(read_channel: Channel) -> None:
read_channel.connector.timestamp = timestamp
read_channels.apply(update_timestamp, inplace=True)
futures.wait(read_futures, timeout=timeout)
def __is_reading(self, channel: Channel, timestamp: pd.Timestamp) -> bool:
freq = channel.freq
if (
freq is None
or not channel.has_connector()
or not self.connectors.get(channel.connector.id, False)
or not self.connectors.get(channel.connector.id).is_connected()
):
return False
if pd.isna(channel.connector.timestamp):
return True
next_reading = _next(freq, channel.connector.timestamp)
return timestamp >= next_reading
# noinspection PyShadowingBuiltins, PyShadowingNames, PyTypeChecker
def write(
self,
data: pd.DataFrame,
channels: Optional[ChannelsArgument] = None,
timeout: Optional[float] = None,
inplace: bool = False,
) -> None:
channels = self._filter_by_args(channels)
write_futures = {}
for id, connector in self.connectors.items():
if not connector._is_connected():
continue
write_channels = channels.filter(lambda c: (c.has_connector(id) and c.id in data.columns))
if len(write_channels) == 0:
continue
write_channels.set_frame(data)
write_task = WriteTask(connector, write_channels)
write_future = self._executor.submit(write_task)
write_futures[write_future] = write_task
self._write_futures(write_futures, timeout)
def _write_futures(
self,
tasks: Dict[Future, WriteTask | LogTask],
timeout: Optional[float] = None,
inplace: bool = False,
) -> None:
try:
for future in futures.as_completed(tasks, timeout=timeout):
task = tasks.pop(future)
self._write_callback(task, future, inplace)
except TimeoutError:
for future, task in tasks.items():
self._logger.warning(f"Timed out writing connector '{task.connector.id}' after {timeout} seconds")
future.cancel()
if inplace:
channels = task.channels
channels.set_state(ChannelState.TIMEOUT)
def _write_callback(
self,
task: WriteTask,
future: Future,
inplace: bool = False,
) -> None:
channels = task.channels
try:
future.result()
except ConnectorError as e:
self._logger.warning(f"Failed writing connector '{task.connector.id}': {str(e)}")
if self._logger.getEffectiveLevel() <= logging.DEBUG:
self._logger.exception(e)
if inplace:
channels.set_state(ChannelState.WRITE_ERROR)
# noinspection PyShadowingBuiltins, PyTypeChecker
def log(
self,
channels: Optional[ChannelsArgument] = None,
timeout: Optional[float] = None,
blocking: bool = False,
force: bool = False,
) -> None:
if channels is None:
channels = self.channels
log_futures = {}
for id, connector in self.connectors.items():
if not connector._is_connected():
continue
def has_update(channel: Channel) -> bool:
if force:
return True
if channel.freq is None:
return pd.isna(channel.logger.timestamp) or channel.timestamp > channel.logger.timestamp
if pd.isna(channel.logger.timestamp):
logger_timestamp = floor_date(channel.timestamp, freq=channel.freq)
if logger_timestamp == channel.timestamp:
logger_timestamp -= channel.timedelta
channel.logger.timestamp = logger_timestamp
return channel.timestamp >= channel.logger.timestamp + channel.timedelta
log_channels = channels.filter(lambda c: (c.has_logger(id) and c.is_valid() and has_update(c)))
if len(log_channels) == 0:
continue
log_task = LogTask(connector, log_channels)
log_future = self._executor.submit(log_task)
log_futures[log_future] = log_task
if not blocking:
log_future.add_done_callback(partial(self._write_callback, log_task, inplace=False))
def update_timestamp(channel: Channel) -> None:
channel.logger.timestamp = channel.timestamp
log_channels.apply(update_timestamp, inplace=True)
if blocking:
self._write_futures(log_futures, timeout, inplace=False)
def rotate(
self,
channels: Optional[Channels] = None,
full: bool = False,
**kwargs,
) -> None:
if channels is None:
channels = self.channels
defaults = self.configs.get_member(Retention.TYPE, defaults={})
configs = Configurations(f"{Retention.TYPE}.conf", self.configs.dirs, defaults=defaults)
configs._load(require=False)
kwargs["full"] = configs.pop("full", default=full)
databases = Databases(self, configs)
databases.rotate(channels, **kwargs)
def replicate(
self,
channels: Optional[Channels] = None,
full: bool = False,
force: bool = False,
**kwargs,
) -> None:
if channels is None:
channels = self.channels.filter(lambda c: self.__is_replicating(c))
defaults = self.configs.get_member(Replication.TYPE, defaults={})
configs = Configurations(f"{Replication.TYPE}.conf", self.configs.dirs, defaults=defaults)
configs._load(require=False)
if not configs.enabled:
self._logger.error(f"Unable to replicate for disabled configuration type '{Replication.TYPE}'")
return
kwargs["full"] = configs.pop("full", default=full)
kwargs["force"] = configs.pop("force", default=force)
kwargs.update({k: v for k, v in configs.items() if k not in configs.members})
databases = Databases(self, configs)
databases.replicate(channels, **kwargs)
# noinspection PyMethodMayBeStatic
def __is_replicating(self, channel: Channel, timestamp: Optional[Timestamp] = None) -> bool:
replication = channel.get(Replication.TYPE, default=None)
if not (
replication is not None
and "database" in replication
and to_bool(replication.get("enabled", True))
and channel.logger.enabled
and isinstance(channel.logger._connector, Database)
):
return False
if timestamp is None:
return True
return timestamp <= floor_date(timestamp, freq=replication.get("freq", Replication.freq))
# noinspection PyShadowingBuiltins
def _sleep(freq: str, sleep: Callable = time.sleep) -> None:
now = pd.Timestamp.now(tz.UTC)
next = _next(freq, now)
seconds = (next - now).total_seconds()
sleep(seconds)
# noinspection PyShadowingBuiltins, PyShadowingNames
def _next(freq: str, now: Optional[pd.Timestamp] = None) -> pd.Timestamp:
if now is None:
now = pd.Timestamp.now(tz.UTC)
next = floor_date(now, freq=freq)
while next <= now:
next += to_timedelta(freq)
return next
def _filter(*filters: Optional[Callable[[Connector | Component], bool]]) -> Callable[[...], bool]:
def _all_filters(registrator: Connector | Component) -> bool:
return all(f(registrator) for f in filters if f is not None)
return _all_filters