diff --git a/helpers/myradio_api.py b/helpers/myradio_api.py index 866f17f..8fb331f 100644 --- a/helpers/myradio_api.py +++ b/helpers/myradio_api.py @@ -19,8 +19,9 @@ from typing import Optional import aiohttp import json -from logging import INFO +from logging import INFO, ERROR, WARNING import os +import requests from baps_types.plan import PlanItem from helpers.os_environment import resolve_external_file_path @@ -36,42 +37,108 @@ class MyRadioAPI: self.logger = logger self.config = config - async def get(self, url, timeout=10): + async def async_call(self, url, method="GET", data=None, timeout=10): + async with aiohttp.ClientSession(read_timeout=timeout) as session: - async with session.get(url) as response: - if response.status != 200: + func = session.get(url) + status_code = -1 + if method == "GET": + #func = session.get(url) + status_code = 200 + elif method == "POST": + func = session.post(url, data=data) + status_code = 201 + elif method == "PUT": + func = session.put(url) + status_code = 201 + + async with func as response: + if response.status != status_code: self._logException( "Failed to get API request. Status code: " + str(response.status) ) self._logException(str(response.text())) return await response.read() - async def get_non_api_call(self, url): + def call(self, url, method="GET", data=None, timeout=10, json_payload=True): + r = None + status_code = -1 + if method == "GET": + r = requests.get(url, timeout=timeout) + status_code = 200 + elif method == "POST": + r = requests.post(url, data, timeout=timeout) + status_code = 201 + elif method == "PUT": + r = requests.put(url, data, timeout=timeout) + status_code = 200 - url = "{}{}".format(self.config.get()["myradio_base_url"], url) + if r.status_code != status_code: + self._logException( + "Failed to get API request. Status code: " + str(r.status_code) + ) + self._logException(str(r.text)) + return json.loads(r.text) if json_payload else r.text + + async def async_api_call(self, url, api_version="v2", method="GET", data=None, timeout=10): + if api_version == "v2": + url = "{}/v2{}".format(self.config.get()["myradio_api_url"], url) + elif api_version == "non": + url = "{}{}".format(self.config.get()["myradio_base_url"], url) + else: + self._logException("Invalid API version. Request not sent.") + return None if "?" in url: url += "&api_key={}".format(self.config.get()["myradio_api_key"]) else: url += "?api_key={}".format(self.config.get()["myradio_api_key"]) - self._log("Requesting non-API URL: " + url) - request = self.get(url) + self._log("Requesting API V2 URL with method {}: {}".format(method, url)) + + request = None + if method == "GET": + request = self.async_call(url, method="GET", timeout=timeout) + elif method == "POST": + self._log("POST data: {}".format(data)) + request = self.async_call(url, data=data, method="POST", timeout=timeout) + elif method == "PUT": + request = self.async_call(url, method="PUT", timeout=timeout) + else: + self._logException("Invalid API method. Request not sent.") + return None self._log("Finished request.") return request - async def get_apiv2_call(self, url): + def api_call(self, url, api_version="v2", method="GET", data=None, timeout=10): - url = "{}/v2{}".format(self.config.get()["myradio_api_url"], url) + if api_version == "v2": + url = "{}/v2{}".format(self.config.get()["myradio_api_url"], url) + elif api_version == "non": + url = "{}{}".format(self.config.get()["myradio_base_url"], url) + else: + self._logException("Invalid API version. Request not sent.") + return None if "?" in url: url += "&api_key={}".format(self.config.get()["myradio_api_key"]) else: url += "?api_key={}".format(self.config.get()["myradio_api_key"]) - self._log("Requesting API V2 URL: " + url) - request = self.get(url) + self._log("Requesting API V2 URL with method {}: {}".format(method, url)) + + request = None + if method == "GET": + request = self.call(url, method="GET", timeout=timeout) + elif method == "POST": + self._log("POST data: {}".format(data)) + request = self.call(url, data=data, method="POST", timeout=timeout) + elif method == "PUT": + request = self.call(url, method="PUT", timeout=timeout) + else: + self._logException("Invalid API method. Request not sent.") + return None self._log("Finished request.") return request @@ -80,7 +147,7 @@ class MyRadioAPI: async def get_showplans(self): url = "/timeslot/currentandnextobjects?n=10" - request = await self.get_apiv2_call(url) + request = await self.async_api_call(url) if not request: self._logException("Failed to get list of show plans.") @@ -110,7 +177,7 @@ class MyRadioAPI: async def get_showplan(self, timeslotid: int): url = "/timeslot/{}/showplan".format(timeslotid) - request = await self.get_apiv2_call(url) + request = await self.async_api_call(url) if not request: self._logException("Failed to get show plan.") @@ -154,7 +221,7 @@ class MyRadioAPI: return filename # File doesn't exist, download it. - request = await self.get_non_api_call(url) + request = await self.async_api_call(url, api_version="non") if not request: return None @@ -171,7 +238,7 @@ class MyRadioAPI: # Gets the list of managed music playlists. async def get_playlist_music(self): url = "/playlist/allitonesplaylists" - request = await self.get_apiv2_call(url) + request = await self.async_api_call(url) if not request: self._logException("Failed to retrieve music playlists.") @@ -182,7 +249,7 @@ class MyRadioAPI: # Gets the list of managed aux playlists (sfx, beds etc.) async def get_playlist_aux(self): url = "/nipswebPlaylist/allmanagedplaylists" - request = await self.get_apiv2_call(url) + request = await self.async_api_call(url) if not request: self._logException("Failed to retrieve music playlists.") @@ -197,7 +264,7 @@ class MyRadioAPI: library_id = library_id[library_id.index("-") + 1:] url = "/nipswebPlaylist/{}/items".format(library_id) - request = await self.get_apiv2_call(url) + request = await self.async_api_call(url) if not request: self._logException( @@ -211,7 +278,7 @@ class MyRadioAPI: async def get_playlist_music_items(self, library_id: str): url = "/playlist/{}/tracks".format(library_id) - request = await self.get_apiv2_call(url) + request = await self.async_api_call(url) if not request: self._logException( @@ -227,7 +294,7 @@ class MyRadioAPI: url = "/track/search?title={}&artist={}&digitised=1&limit={}".format( title if title else "", artist if artist else "", limit ) - request = await self.get_apiv2_call(url) + request = await self.async_api_call(url) if not request: self._logException("Failed to search for track.") @@ -235,6 +302,43 @@ class MyRadioAPI: return json.loads(await request)["payload"] + def post_tracklist_start(self, item: PlanItem): + if item.type != "central": + self._log("Not tracklisting, {} is not a track.".format(item.name)) + return False + + self._log("Tracklisting item: {}".format(item.name)) + + source: str = self.config.get()["myradio_api_tracklist_source"] + data = { + "trackid": item.trackid, + "sourceid": int(source) if source.isnumeric() else source + } + # Starttime and timeslotid are default in the API to current time/show. + tracklist_id = None + try: + tracklist_id = self.api_call("/tracklistItem/", method="POST", data=data)["payload"]["audiologid"] + except Exception as e: + self._logException("Failed to get tracklistid. {}".format(e)) + + if not tracklist_id or not isinstance(tracklist_id, int): + self._log("Failed to tracklist! API rejected tracklist.", ERROR) + return + return tracklist_id + + def post_tracklist_end(self, tracklistitemid: int): + if not tracklistitemid: + self._log("Tracklistitemid is None, can't end tracklist.", WARNING) + return False + if not isinstance(tracklistitemid, int): + self._logException("Tracklistitemid '{}' is not an integer!".format(tracklistitemid)) + return False + + self._log("Ending tracklistitemid {}".format(tracklistitemid)) + + result = self.api_call("/tracklistItem/{}/endtime".format(tracklistitemid), method="PUT") + print(result) + def _log(self, text: str, level: int = INFO): self.logger.log.log(level, "MyRadio API: " + text) diff --git a/helpers/state_manager.py b/helpers/state_manager.py index 146b4d1..06524ae 100644 --- a/helpers/state_manager.py +++ b/helpers/state_manager.py @@ -4,7 +4,7 @@ from logging import CRITICAL, INFO import time from datetime import datetime from copy import copy -from typing import Any, List +from typing import Any, Dict, List from baps_types.plan import PlanItem from helpers.logging_manager import LoggingManager @@ -24,7 +24,7 @@ class StateManager: self, name, logger: LoggingManager, - default_state=None, + default_state: Dict[str, Any] = None, rate_limit_params=[], rate_limit_period_s=5, ): @@ -57,10 +57,9 @@ class StateManager: if file_state == "": self._log("State file is empty. Setting default state.") self.state = default_state - self.__state_in_file = copy(self.state) else: try: - file_state = json.loads(file_state) + file_state: Dict[str, Any] = json.loads(file_state) # Turn from JSON -> PlanItem if "channel" in file_state: @@ -75,12 +74,18 @@ class StateManager: # Now feed the loaded state into the initialised state manager. self.state = file_state + + # If there are any new config options in the default state, save them. + # Uses update() to save them to file too. + for key in default_state.keys(): + if not key in file_state.keys(): + self.update(key, default_state[key]) + except Exception: self._logException( "Failed to parse state JSON. Resetting to default state." ) self.state = default_state - self.__state_in_file = copy(self.state) # Now setup the rate limiting # Essentially rate limit all values to "now" to start with, allowing the first update @@ -103,8 +108,6 @@ class StateManager: def write_to_file(self, state): - self.__state_in_file = state - # Make sure we're not manipulating state state_to_json = copy(state) diff --git a/player.py b/player.py index bb633bf..0575807 100644 --- a/player.py +++ b/player.py @@ -20,6 +20,7 @@ # that we respond with something, FAIL or OKAY. The server doesn't like to be kept waiting. # Stop the Pygame Hello message. +from baps_types.enums import TracklistMode import os os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "hide" @@ -33,6 +34,7 @@ from typing import Any, Callable, Dict, List, Optional from pygame import mixer from mutagen.mp3 import MP3 from syncer import sync +from threading import Timer from helpers.myradio_api import MyRadioAPI from helpers.state_manager import StateManager @@ -42,6 +44,7 @@ from baps_types.marker import Marker # TODO ENUM VALID_MESSAGE_SOURCES = ["WEBSOCKET", "UI", "CONTROLLER", "TEST", "ALL"] +TRACKLISTING_DELAYED_S = 20 class Player: @@ -58,6 +61,9 @@ class Player: stopped_manually: bool = False + tracklist_start_timer: Optional[Timer] = None + tracklist_end_timer: Optional[Timer] = None + __default_state = { "initialised": False, "loaded_item": None, @@ -75,6 +81,8 @@ class Player: "play_on_load": False, "output": None, "show_plan": [], + "tracklist_mode": "off", + "tracklist_id": None, } __rate_limited_params = ["pos", "pos_offset", "pos_true", "remaining"] @@ -116,7 +124,6 @@ class Player: # We're not playing now, so we can quickly test run # If that works, we're loaded. try: - position: float = self.state.get()["pos"] mixer.music.set_volume(0) mixer.music.play(0) except Exception: @@ -128,10 +135,9 @@ class Player: ) pass return False - if position > 0: - self.pause() - else: - self.stop() + finally: + mixer.music.stop() + mixer.music.set_volume(1) return True @@ -168,7 +174,7 @@ class Player: self.logger.log.exception("Failed to play at pos: " + str(pos)) return False self.state.update("paused", False) - + self._potentially_tracklist() self.stopped_manually = False return True @@ -206,6 +212,9 @@ class Player: return False self.state.update("paused", False) + if user_initiated: + self._potentially_end_tracklist() + self.stopped_manually = True if not self.state.get()["loaded_item"]: @@ -394,6 +403,12 @@ class Player: except Exception: self.logger.log.exception("Failed to unload channel.") return False + + self._potentially_end_tracklist() + # If we unloaded successfully, reset the tracklist_id, ready for the next item. + if not self.isLoaded: + self.state.update("tracklist_id", None) + return not self.isLoaded def quit(self): @@ -473,7 +488,78 @@ class Player: # Helper functions + # This essentially allows the tracklist end API call to happen in a separate thread, to avoid hanging playout/loading. + def _potentially_tracklist(self): + mode: TracklistMode = self.state.get()["tracklist_mode"] + + time: int = -1 + if mode == "on": + time = 1 # Let's do it pretty quickly. + elif mode == "delayed": + # Let's do it in a bit, once we're sure it's been playing. (Useful if we've got no idea if it's live or cueing.) + time = TRACKLISTING_DELAYED_S + + if time >= 0 and not self.tracklist_start_timer: + self.logger.log.info("Setting timer for tracklisting in {} secs due to Mode: {}".format(time, mode)) + self.tracklist_start_timer = Timer(time, self._tracklist_start) + self.tracklist_start_timer.start() + elif self.tracklist_start_timer: + self.logger.log.error("Failed to potentially tracklist, timer already busy.") + + # This essentially allows the tracklist end API call to happen in a separate thread, to avoid hanging playout/loading. + def _potentially_end_tracklist(self): + loaded_item = self.state.get()["loaded_item"] + if not loaded_item: + self.logger.log.warning("Tried to call _tracklist_end() with no loaded item!") + + # Make a copy of the tracklist_id, it will get reset as we load the next item. + tracklist_id = self.state.get()["tracklist_id"] + self.logger.log.info("Setting timer for ending tracklist_id {}".format(tracklist_id)) + if tracklist_id: + self.logger.log.info("Attempting to end tracklist_id {}".format(tracklist_id)) + if self.tracklist_end_timer: + self.logger.log.error("Failed to potentially end tracklist, timer already busy.") + return + # This threads it, so it won't hang track loading if it fails. + self.tracklist_end_timer = Timer(1, self._tracklist_end, [tracklist_id]) + self.tracklist_end_timer.start() + else: + self.logger.log.warning("Failed to potentially end tracklist, no tracklist started.") + + def _tracklist_start(self): + loaded_item = self.state.get()["loaded_item"] + if not loaded_item: + self.logger.log.error("Tried to call _tracklist_start() with no loaded item!") + return + + tracklist_id = self.state.get()["tracklist_id"] + if (not tracklist_id): + self.logger.log.info("Tracklisting item: {}".format(loaded_item.name)) + tracklist_id = self.api.post_tracklist_start(loaded_item) + if not tracklist_id: + self.logger.log.error("Failed to tracklist {}".format(loaded_item.name)) + else: + self.logger.log.info("Tracklist id: {}".format(tracklist_id)) + self.state.update("tracklist_id", tracklist_id) + else: + self.logger.log.info("Not tracklisting item {}, already got tracklistid: {}".format( + loaded_item.name, tracklist_id)) + + self.tracklist_start_timer = None + + def _tracklist_end(self, tracklist_id): + + if tracklist_id: + self.logger.log.info("Attempting to end tracklist_id {}".format(tracklist_id)) + self.api.post_tracklist_end(tracklist_id) + else: + self.logger.log.error("Tracklist_id to _tracklist_end() missing. Failed to end tracklist.") + + self.tracklist_end_timer = None + def _ended(self): + self._potentially_end_tracklist() + loaded_item = self.state.get()["loaded_item"] if not loaded_item: @@ -529,6 +615,7 @@ class Player: self.state.update("pos", 0) # Reset back to 0 if stopped. self.state.update("pos_offset", 0) + # If the state is changing from playing to not playing, and the user didn't stop it, the item must have ended. if ( self.state.get()["playing"] and not self.isPlaying @@ -594,7 +681,7 @@ class Player: custom_prefix="ALL:STATUS:") def __init__( - self, channel: int, in_q: multiprocessing.Queue, out_q: multiprocessing.Queue, server_config: StateManager + self, channel: int, in_q: multiprocessing.Queue, out_q: multiprocessing.Queue, server_state: StateManager ): process_title = "Player: Channel " + str(channel) @@ -606,7 +693,7 @@ class Player: self.logger = LoggingManager("Player" + str(channel)) - self.api = MyRadioAPI(self.logger, server_config) + self.api = MyRadioAPI(self.logger, server_state) self.state = StateManager( "Player" + str(channel), @@ -618,6 +705,7 @@ class Player: self.state.add_callback(self._send_status) self.state.update("channel", channel) + self.state.update("tracklist_mode", server_state.get()["tracklist_mode"]) loaded_state = copy.copy(self.state.state) diff --git a/server.py b/server.py index 34f0b98..517340e 100644 --- a/server.py +++ b/server.py @@ -39,6 +39,8 @@ from controllers.mattchbox_usb import MattchBox from helpers.the_terminator import Terminator import player +PROCESS_KILL_TIMEOUT_S = 5 + setproctitle("server.py") """ Proxy Manager to proxy Class Objects into multiprocessing processes, instead of making a copy. """ @@ -51,8 +53,8 @@ class ProxyManager(m.BaseManager): class BAPSicleServer: default_state = { - "server_version": "", - "server_build": "", + "server_version": "unknown", + "server_build": "unknown", "server_name": "URY BAPSicle", "host": "localhost", "port": 13500, @@ -63,7 +65,9 @@ class BAPSicleServer: "myradio_api_key": None, "myradio_base_url": "https://ury.org.uk/myradio", "myradio_api_url": "https://ury.org.uk/api", - "running_state": "running" + "myradio_api_tracklist_source": "", + "running_state": "running", + "tracklist_mode": "off", } player_to_q: List[Queue] = [] @@ -219,7 +223,7 @@ class BAPSicleServer: print("Stopping Websocket Server") self.websocket_to_q[0].put("WEBSOCKET:QUIT") if self.websockets_server: - self.websockets_server.join() + self.websockets_server.join(timeout=PROCESS_KILL_TIMEOUT_S) del self.websockets_server print("Stopping Players") @@ -227,26 +231,26 @@ class BAPSicleServer: q.put("ALL:QUIT") for player in self.player: - player.join() + player.join(timeout=PROCESS_KILL_TIMEOUT_S) del self.player print("Stopping Web Server") if self.webserver: self.webserver.terminate() - self.webserver.join() + self.webserver.join(timeout=PROCESS_KILL_TIMEOUT_S) del self.webserver print("Stopping Player Handler") if self.player_handler: self.player_handler.terminate() - self.player_handler.join() + self.player_handler.join(timeout=PROCESS_KILL_TIMEOUT_S) del self.player_handler print("Stopping Controllers") if self.controller_handler: self.controller_handler.terminate() - self.controller_handler.join() + self.controller_handler.join(timeout=PROCESS_KILL_TIMEOUT_S) del self.controller_handler diff --git a/web_server.py b/web_server.py index 0a3ba56..8168588 100644 --- a/web_server.py +++ b/web_server.py @@ -9,8 +9,6 @@ import asyncio from jinja2 import Environment, FileSystemLoader from urllib.parse import unquote -# , render_template, send_from_directory, request, jsonify, abort -#from flask_cors import CORS from setproctitle import setproctitle import logging from typing import Any, Optional, List