Implement rate limiting to stop spamming state file writes
This commit is contained in:
parent
e191595768
commit
bddab0208d
2 changed files with 74 additions and 11 deletions
|
@ -2,6 +2,9 @@ from helpers.logging_manager import LoggingManager
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
from copy import copy
|
||||||
from helpers.os_environment import resolve_external_file_path
|
from helpers.os_environment import resolve_external_file_path
|
||||||
|
|
||||||
|
|
||||||
|
@ -9,8 +12,13 @@ class StateManager:
|
||||||
filepath = None
|
filepath = None
|
||||||
logger = None
|
logger = None
|
||||||
__state = {}
|
__state = {}
|
||||||
|
__state_in_file = {}
|
||||||
|
# Dict of times that params can be updated after, if the time is before current time, it can be written immediately.
|
||||||
|
__rate_limit_params_until = {}
|
||||||
|
__rate_limit_period_s = 0
|
||||||
|
|
||||||
def __init__(self, name, logger: LoggingManager, default_state=None):
|
|
||||||
|
def __init__(self, name, logger: LoggingManager, default_state=None, rate_limit_params=[], rate_limit_period_s = 5):
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
|
|
||||||
self.filepath = resolve_external_file_path("/state/" + name + ".json")
|
self.filepath = resolve_external_file_path("/state/" + name + ".json")
|
||||||
|
@ -30,7 +38,8 @@ class StateManager:
|
||||||
|
|
||||||
if file_state == "":
|
if file_state == "":
|
||||||
self._log("State file is empty. Setting default state.")
|
self._log("State file is empty. Setting default state.")
|
||||||
self.state = default_state
|
self.state = copy(default_state)
|
||||||
|
self.__state_in_file = copy(self.state)
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
self.__state = json.loads(file_state)
|
self.__state = json.loads(file_state)
|
||||||
|
@ -38,14 +47,35 @@ class StateManager:
|
||||||
self._logException("Failed to parse state JSON. Resetting to default state.")
|
self._logException("Failed to parse state JSON. Resetting to default state.")
|
||||||
self.state = default_state
|
self.state = default_state
|
||||||
|
|
||||||
|
# Now setup the rate limiting
|
||||||
|
# Essentially rate limit all values to "now" to start with, allowing the first update
|
||||||
|
# of all vars to succeed.
|
||||||
|
for param in rate_limit_params:
|
||||||
|
self.__rate_limit_params_until[param] = self._currentTimeS
|
||||||
|
self.__rate_limit_period_s = rate_limit_period_s
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def state(self):
|
def state(self):
|
||||||
return self.__state
|
return copy(self.__state)
|
||||||
|
|
||||||
@state.setter
|
@state.setter
|
||||||
def state(self, state):
|
def state(self, state):
|
||||||
self.__state = state
|
self.__state = copy(state)
|
||||||
|
|
||||||
|
def write_to_file(self,state):
|
||||||
|
if self.__state_in_file == state:
|
||||||
|
# No change to be updated.
|
||||||
|
return
|
||||||
|
|
||||||
|
self.__state_in_file = state
|
||||||
|
|
||||||
|
# Make sure we're not manipulating state
|
||||||
|
state = copy(state)
|
||||||
|
|
||||||
|
now = datetime.now()
|
||||||
|
|
||||||
|
current_time = now.strftime("%H:%M:%S")
|
||||||
|
state["last_updated"] = current_time
|
||||||
try:
|
try:
|
||||||
state_json = json.dumps(state, indent=2, sort_keys=True)
|
state_json = json.dumps(state, indent=2, sort_keys=True)
|
||||||
except:
|
except:
|
||||||
|
@ -56,12 +86,36 @@ class StateManager:
|
||||||
file.write(state_json)
|
file.write(state_json)
|
||||||
|
|
||||||
def update(self, key, value):
|
def update(self, key, value):
|
||||||
state = self.state
|
update_file = True
|
||||||
state[key] = value
|
if (key in self.__rate_limit_params_until.keys()):
|
||||||
self.state = state
|
# The key we're trying to update is expected to be updating very often,
|
||||||
|
# We're therefore going to check before saving it.
|
||||||
|
if self.__rate_limit_params_until[key] > self._currentTimeS:
|
||||||
|
update_file = False
|
||||||
|
else:
|
||||||
|
self.__rate_limit_params_until[key] = self._currentTimeS + self.__rate_limit_period_s
|
||||||
|
|
||||||
|
|
||||||
|
state_to_update = self.state
|
||||||
|
|
||||||
|
if state_to_update[key] == value:
|
||||||
|
# We're trying to update the state with the same value.
|
||||||
|
# In this case, ignore the update
|
||||||
|
return
|
||||||
|
|
||||||
|
state_to_update[key] = value
|
||||||
|
|
||||||
|
self.state = state_to_update
|
||||||
|
|
||||||
|
if (update_file == True):
|
||||||
|
self.write_to_file(state_to_update)
|
||||||
|
|
||||||
def _log(self, text, level=logging.INFO):
|
def _log(self, text, level=logging.INFO):
|
||||||
self.logger.log.log(level, "State Manager: " + text)
|
self.logger.log.log(level, "State Manager: " + text)
|
||||||
|
|
||||||
def _logException(self, text):
|
def _logException(self, text):
|
||||||
self.logger.log.exception("State Manager: " + text)
|
self.logger.log.exception("State Manager: " + text)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _currentTimeS(self):
|
||||||
|
return time.time()
|
||||||
|
|
17
player.py
17
player.py
|
@ -43,6 +43,13 @@ class Player():
|
||||||
"output": None
|
"output": None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
__rate_limited_params = [
|
||||||
|
"pos",
|
||||||
|
"pos_offset",
|
||||||
|
"pos_true",
|
||||||
|
"remaining"
|
||||||
|
]
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def isInit(self):
|
def isInit(self):
|
||||||
try:
|
try:
|
||||||
|
@ -222,16 +229,18 @@ class Player():
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _updateState(self, pos=None):
|
def _updateState(self, pos=None):
|
||||||
|
|
||||||
self.state.update("initialised", self.isInit)
|
self.state.update("initialised", self.isInit)
|
||||||
if self.isInit:
|
if self.isInit:
|
||||||
if self.isPlaying:
|
if (pos):
|
||||||
|
self.state.update("pos", max(0, pos))
|
||||||
|
elif self.isPlaying:
|
||||||
# Get one last update in, incase we're about to pause/stop it.
|
# Get one last update in, incase we're about to pause/stop it.
|
||||||
self.state.update("pos", max(0, mixer.music.get_pos()/1000))
|
self.state.update("pos", max(0, mixer.music.get_pos()/1000))
|
||||||
self.state.update("playing", self.isPlaying)
|
self.state.update("playing", self.isPlaying)
|
||||||
self.state.update("loaded", self.isLoaded)
|
self.state.update("loaded", self.isLoaded)
|
||||||
|
|
||||||
if (pos):
|
|
||||||
self.state.update("pos", max(0, pos))
|
|
||||||
|
|
||||||
self.state.update("pos_true", self.state.state["pos"] + self.state.state["pos_offset"])
|
self.state.update("pos_true", self.state.state["pos"] + self.state.state["pos_offset"])
|
||||||
|
|
||||||
|
@ -262,7 +271,7 @@ class Player():
|
||||||
|
|
||||||
self.logger = LoggingManager("channel" + str(channel))
|
self.logger = LoggingManager("channel" + str(channel))
|
||||||
|
|
||||||
self.state = StateManager("channel" + str(channel), self.logger, self.__default_state)
|
self.state = StateManager("channel" + str(channel), self.logger, self.__default_state, self.__rate_limited_params)
|
||||||
self.state.update("channel", channel)
|
self.state.update("channel", channel)
|
||||||
|
|
||||||
loaded_state = copy.copy(self.state.state)
|
loaded_state = copy.copy(self.state.state)
|
||||||
|
|
Loading…
Reference in a new issue