# -*- coding: utf-8 -*-
"""
lories.connector.weather.dwd.brightsky
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
"""
from __future__ import annotations
import json
from typing import Optional, Tuple
import requests
import numpy as np
import pandas as pd
from lories import ConfigurationError
from lories.components.weather import Weather
from lories.connectors import Connector
from lories.location import Location
from lories.typing import Configurations, Resources, Timestamp
[docs]
class Brightsky(Connector):
location: Location
address: str = "https://api.brightsky.dev/"
horizon: int = 10
def __init__(self, context: Weather, location: Location, **kwargs) -> None:
super().__init__(context, context.configs.get_member("brightsky", defaults={}), **kwargs)
self.location = location
def configure(self, configs: Configurations) -> None:
super().configure(configs)
self.address = configs.get("address", default=Brightsky.address)
self.horizon = configs.get_int("horizon", default=Brightsky.horizon)
if -1 > self.horizon > 10:
raise ConfigurationError(f"Invalid forecast horizon: {self.horizon}")
def read(
self,
resources: Resources,
start: Optional[Timestamp] = None,
end: Optional[Timestamp] = None,
) -> pd.DataFrame:
response, sources = self._request(start, end)
response_sources = sources.loc[response["source_id"], ["observation_type", "first_record", "last_record"]]
response_source_columns = ["source_type", "source_first_record", "source_last_record"]
response_sources.columns = response_source_columns
response_sources.index = response.index
response[response_source_columns] = response_sources
data = []
for source, source_resources in resources.groupby("source"):
source_columns = [r.address for r in source_resources if r.address in response.columns]
source_data = response.loc[
response["source_type"].isin(s.strip() for s in source.split(",")),
np.unique(["source_id", "source_first_record", "source_last_record"] + source_columns),
]
if source_data.empty:
self._logger.warning(f"Unable to read {self._id} channels: {[r.id for r in source_resources]}")
continue
source_start = start if start is not None else min(source_data["source_first_record"].unique())
source_end = end if end is not None else max(source_data["source_last_record"].unique())
source_data = source_data.rename(columns={r.address: r.id for r in source_resources})
if source == "forecast":
source_end = source_start + pd.Timedelta(days=self.horizon)
elif any(s in ["historical", "current"] for s in source.split(",")):
if all(t is None for t in [start, end]):
source_start = source_end
data.append(
source_data.loc[
source_start:source_end, [r.id for r in source_resources if r.id in source_data.columns]
]
)
return pd.concat(data, axis="index")
# noinspection PyPackageRequirements
def _request(
self,
date: Optional[Timestamp] = None,
date_last: Optional[Timestamp] = None,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
if date is None:
date = pd.Timestamp.now(tz=self.location.timezone)
if date_last is None:
date_last = date + pd.Timedelta(days=self.horizon)
parameters = {
"date": date.strftime("%Y-%m-%d"),
"last_date": date_last.strftime("%Y-%m-%d"),
"lat": self.location.latitude,
"lon": self.location.longitude,
"tz": self.location.timezone.zone,
}
response = requests.get(self.address + "weather", params=parameters)
if response.status_code != 200:
raise requests.HTTPError(
"Response returned with error " + str(response.status_code) + ": " + response.reason
)
response_json = json.loads(response.text)
sources = pd.DataFrame(response_json["sources"])
sources = sources.set_index("id")
sources["first_record"] = pd.to_datetime(sources["first_record"], utc=True)
sources["last_record"] = pd.to_datetime(sources["last_record"], utc=True)
data = pd.DataFrame(response_json["weather"])
data["timestamp"] = pd.to_datetime(data["timestamp"], utc=True)
data = data.set_index("timestamp").tz_convert(self.location.timezone)
data.index.name = "timestamp"
hours = pd.Series(data=data.index, index=data.index).diff().bfill().dt.total_seconds() / 3600.0
# Convert global horizontal irradiance from kWh/m^2 to W/m^2
data["solar"] = data["solar"] * hours * 1000
if data[Weather.CLOUD_COVER].isna().any():
data[Weather.CLOUD_COVER] = data[Weather.CLOUD_COVER].interpolate(method="linear")
return data.dropna(how="all", axis="columns"), sources
def write(self, data: pd.DataFrame) -> None:
raise NotImplementedError("Brightsky connector does not support writing")