BAPSicle/helpers/state_manager.py

161 lines
5.8 KiB
Python

from helpers.types import PlayerState
import json
import os
from logging import CRITICAL, INFO
import time
from datetime import datetime
from copy import copy
from plan import PlanItem
from helpers.logging_manager import LoggingManager
from helpers.os_environment import resolve_external_file_path
from helpers.types import ServerState
from typing import Any, Dict, List, NewType, Optional, Union
class StateManager:
filepath = None
logger = None
callbacks: List[Any] = []
__state = {}
__state_in_file = {}
# Dict of times that params can be updated after, if the time is before current time, it can be written immediately.
__rate_limit_params_until = {}
__rate_limit_period_s = 0
def __init__(self, name, logger: LoggingManager, default_state=None, rate_limit_params=[], rate_limit_period_s=5):
self.logger = logger
self.filepath = resolve_external_file_path("/state/" + name + ".json")
self._log("State file path set to: " + self.filepath)
if not os.path.isfile(self.filepath):
self._log("No existing state file found.")
try:
# Try creating the file.
open(self.filepath, "x")
except:
self._log("Failed to create state file.", CRITICAL)
return
with open(self.filepath, 'r') as file:
file_state = file.read()
if file_state == "":
self._log("State file is empty. Setting default state.")
self.state = default_state
self.__state_in_file = copy(self.state)
else:
try:
file_state = json.loads(file_state)
# Turn from JSON -> PlanItem
if "channel" in file_state:
file_state["loaded_item"] = PlanItem(
file_state["loaded_item"]) if file_state["loaded_item"] else None
file_state["show_plan"] = [PlanItem(obj) for obj in file_state["show_plan"]]
# Now feed the loaded state into the initialised state manager.
self.state = file_state
except:
self._logException("Failed to parse state JSON. Resetting to default state.")
self.state = default_state
self.__state_in_file = copy(self.state)
# Now setup the rate limiting
# Essentially rate limit all values to "now" to start with, allowing the first update
# of all vars to succeed.
for param in rate_limit_params:
self.__rate_limit_params_until[param] = self._currentTimeS
self.__rate_limit_period_s = rate_limit_period_s
@property
def state(self):
return copy(self.__state)
@state.setter
def state(self, state):
self.__state = copy(state)
def write_to_file(self, state):
if self.__state_in_file == state:
# No change to be updated.
return
self.__state_in_file = state
# Make sure we're not manipulating state
state_to_json = copy(state)
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
state_to_json["last_updated"] = current_time
# Not the biggest fan of this, but maybe I'll get a better solution for this later
if "channel" in state_to_json: # If its a channel object
state_to_json["loaded_item"] = state_to_json["loaded_item"].__dict__ if state_to_json["loaded_item"] else None
state_to_json["show_plan"] = [repr.__dict__ for repr in state_to_json["show_plan"]]
try:
state_json = json.dumps(state_to_json, indent=2, sort_keys=True)
except:
self._logException("Failed to dump JSON state.")
else:
with open(self.filepath, "w") as file:
file.write(state_json)
def update(self, key: str, value: Any, index: int = -1):
update_file = True
if (key in self.__rate_limit_params_until.keys()):
# The key we're trying to update is expected to be updating very often,
# We're therefore going to check before saving it.
if self.__rate_limit_params_until[key] > self._currentTimeS:
update_file = False
else:
self.__rate_limit_params_until[key] = self._currentTimeS + self.__rate_limit_period_s
state_to_update = self.state
if key in state_to_update and index == -1 and state_to_update[key] == value:
# We're trying to update the state with the same value.
# In this case, ignore the update
return
if index > -1 and key in state_to_update:
if not isinstance(state_to_update[key], list):
return
list_items = state_to_update[key]
if index >= len(list_items):
return
list_items[index] = value
state_to_update[key] = list_items
else:
state_to_update[key] = value
self.state = state_to_update
if (update_file == True):
# Either a routine write, or state has changed.
# Update the file
self.write_to_file(state_to_update)
# Now tell any callback functions.
for callback in self.callbacks:
try:
callback()
except Exception as e:
self.logger.log.critical("Failed to execute status callback: {}".format(e))
def add_callback(self, function):
self.callbacks.append(function)
def _log(self, text:str, level: int = INFO):
self.logger.log.log(level, "State Manager: " + text)
def _logException(self, text:str):
self.logger.log.exception("State Manager: " + text)
@property
def _currentTimeS(self):
return time.time()