So I think I have integrated aurora’s delayed power usage data into home assistant, though it’s a little convoluted and dodgy. Aurora is pretty much the only power provider in Tasmania.
The issue is that they post your power consumption to their non-official API the day after. This is incompatible with Home Assistant’s real-time monitoring. But I did it anyway.
Firstly, we need a way to get the information from that API. Recently, they introduced 2FA which sends a text to verify. Thankfully, there exists AuroraPlus, which is a fork of a library which worked before the 2FA wrecked everything. It contains a lot of goodness, but for my purposes, getting the token using the script is sufficient.
def get_token():
api = auroraplus.api()
url = api.oauth_authorize()
print("Please visit the following URL in a browser, "
"and follow the login prompts ...\n")
print(url)
print("\nThis will redirect to an error page (Cradle Mountain).\n")
redirect_uri = input("Please enter the full URL of the error page: ")
token = api.oauth_redirect(redirect_uri)
print("\nThe new token is\n")
print(token)
print("\n The access token is\n")
print(token['access_token'])
Now we can grab yesterdays data, we need to get it into home assistant somehow. This was the most challenging, but in the spirit of doing as little work as possible, I stumbled on this repository: https://github.com/klausj1/homeassistant-statistics
It basically takes a specially formatted csv in the root directory and imports it into Home Assistants statistics database, which is used by the energy dashboard to display usage.
From here, we just need to do the T in ETL (extract transform load). So I wrote a pyscript script in pyscript to be ran as a service.
import json
from datetime import datetime, time, timedelta, UTC
import pytz
import pandas as pd
import requests
import logging
from functools import partial
# For testing, this SQL can be helpful:
# select max(sum) from statistics s inner join statistics_meta sm on sm.id=s.metadata_id where statistic_id = 'sensor:aurora_tariff_41_v2_cost'
FILENAME = "aurora_yesterday.csv"
AUTH_KEY = "THE_KEY"
logger = logging.getLogger(__name__)
TARIFFS = {
"31": {
"price": .29947,
"sensor": "sensor:aurora_tariff_31_v2",
"sensor_cost": "sensor:aurora_tariff_31_v2_cost"
},
"41": {
"price": .19447,
"sensor": "sensor:aurora_tariff_41_v2",
"sensor_cost": "sensor:aurora_tariff_41_v2_cost"
}
}
def date_convert(d: str) -> str:
return datetime.strptime(d, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=pytz.utc).astimezone(pytz.timezone("Australia/Hobart")).strftime("%d.%m.%Y %H:00")
def _get_last_sum(df: pd.DataFrame, column_value: str) -> float:
# Doing this in a dumb way because computers fucking suck...
# Here is what i wanted to do df[df["statistic_id"] == sensor].iloc[-1]["sum"]
# But for some reason `ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().`
# It is assumed these are in assending order.
latest_sum = None
for _, row in df.iterrows():
if row["statistic_id"] == column_value:
if latest_sum is None or row["sum"] > latest_sum:
latest_sum = row["sum"]
if latest_sum is None:
raise Exception("There is no latest sum for " + column_value)
return latest_sum
async def get_last_sum(sensor: str) -> float:
try:
df = await hass.async_add_executor_job(pd.read_csv, FILENAME)
return _get_last_sum(df, sensor)
except FileNotFoundError:
return 0
@service
async def extract_aurora():
output = []
headers = {
"Authorization": f"Bearer {AUTH_KEY}",
"Accept": "application/json",
"User-Agent" : "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:125.0) Gecko/20100101 Firefox/125.0"
}
url = f"https://api.auroraenergy.com.au/api/usage/day?serviceAgreementID=YOUR_SERVICE_AGREEMENT_ID&customerId=YOUR_CUSTOMER_ID&index=-1"
r = partial(requests.get, url, headers=headers)
result = await hass.async_add_executor_job(r)
data = result.json()
data = [x for x in data["MeteredUsageRecords"] if x["KilowattHourUsage"]]
for tariff_key, tariff in TARIFFS.items():
kwh_state_running_total = 0
kwh_sum_running_total = get_last_sum(tariff["sensor"])
cost_state_running_total = 0
cost_sum_running_total = get_last_sum(tariff["sensor_cost"])
for element in data:
kwh_state_running_total += element["KilowattHourUsage"][f"T{tariff_key}"]
kwh_sum_running_total += element["KilowattHourUsage"][f"T{tariff_key}"]
cost_state_running_total += element["KilowattHourUsage"][f"T{tariff_key}"] * tariff["price"]
cost_sum_running_total += element["KilowattHourUsage"][f"T{tariff_key}"] * tariff["price"]
output.append({
"statistic_id": tariff["sensor"],
"unit": "kWh",
"start": date_convert(element["StartTime"]),
"state": round(kwh_state_running_total, 2),
"sum": round(kwh_sum_running_total, 2)
})
output.append({
"statistic_id": tariff["sensor_cost"],
"unit": "AUD/kWh",
"start": date_convert(element["StartTime"]),
"state": round(cost_state_running_total, 2),
"sum": round(cost_sum_running_total, 2)
})
fn = partial(pd.DataFrame(output).to_csv, FILENAME, index=False) # This should be able to be a lambda, but fuck no apparently.
hass.async_add_executor_job(fn)
logger.warning("Complete?")
Now a simple automation combining both services and we have it going. Nice one.