Source code for lories.connectors.cameras.camera
# -*- coding: utf-8 -*-
"""
lories.connectors.cameras.camera
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
"""
from abc import abstractmethod
from time import sleep, time
from typing import Iterable
import pandas as pd
from lories.connectors import ConnectionError, Connector
from lories.typing import Resources
[docs]
class CameraConnector(Connector):
def read(self, resources: Resources) -> pd.DataFrame:
timestamp = pd.Timestamp.now(tz="UTC").floor(freq="s")
# TODO: Wrap read_frame() and cache latest frame to only read if frame is older than a second
data = self.read_frame()
return pd.DataFrame(data=[data] * len(resources), index=[timestamp], columns=list(resources.ids))
@abstractmethod
def read_frame(self) -> bytes: ...
def stream(self, fps: int = 30) -> Iterable[bytes]:
while True:
try:
now = time()
if self.is_connected():
yield self.read_frame()
seconds = (1 / fps) - (time() - now)
if seconds > 0:
sleep(seconds)
except KeyboardInterrupt:
pass
except ConnectionError as e:
self._logger.error(f"Unexpected error '{e}' while streaming")
self.disconnect()
def write(self, data: pd.DataFrame) -> None:
raise NotImplementedError("Camera connector does not support writing")