From a45cbda37e96e4fb8e633e286d045f77997771ee Mon Sep 17 00:00:00 2001 From: Matthew Stratford Date: Sun, 18 Apr 2021 03:14:14 +0100 Subject: [PATCH] Switch API to async, remove APIHandler. --- api_handler.py | 99 ------------------------------------ build/requirements.txt | 2 + helpers/myradio_api.py | 82 ++++++++++++++---------------- player.py | 5 +- server.py | 15 +----- web_server.py | 113 +++++++++-------------------------------- 6 files changed, 69 insertions(+), 247 deletions(-) delete mode 100644 api_handler.py diff --git a/api_handler.py b/api_handler.py deleted file mode 100644 index 5764235..0000000 --- a/api_handler.py +++ /dev/null @@ -1,99 +0,0 @@ -from helpers.state_manager import StateManager -import json -from multiprocessing import Queue, current_process -from helpers.logging_manager import LoggingManager -from helpers.myradio_api import MyRadioAPI -from setproctitle import setproctitle -from os import _exit - -# The API handler is needed from the main flask thread to process API requests. -# Flask is not able to handle these during page loads, requests.get() hangs. - - -class APIHandler: - logger: LoggingManager - api: MyRadioAPI - server_to_q: Queue - server_from_q: Queue - - def __init__(self, server_from_q: Queue, server_to_q: Queue, server_config: StateManager): - - process_title = "APIHandler" - setproctitle(process_title) - current_process().name = process_title - - self.server_from_q = server_from_q - self.server_to_q = server_to_q - self.logger = LoggingManager("APIHandler") - self.api = MyRadioAPI(self.logger, server_config) - - self.handle() - - def handle(self): - try: - while self.server_from_q: - # Wait for an API request to come in. - request = self.server_from_q.get() - self.logger.log.info("Received Request: {}".format(request)) - if request == "LIST_PLANS": - self.server_to_q.put( - request + ":" + json.dumps(self.api.get_showplans()) - ) - elif request == "LIST_PLAYLIST_MUSIC": - self.server_to_q.put( - request + ":" + - json.dumps(self.api.get_playlist_music()) - ) - elif request == "LIST_PLAYLIST_AUX": - self.server_to_q.put( - request + ":" + json.dumps(self.api.get_playlist_aux()) - ) - - else: - # Commands with params - command = request[: request.index(":")] - params = request[request.index(":") + 1:] - - if command == "GET_PLAYLIST_AUX": - self.server_to_q.put( - request - + ":" - + json.dumps(self.api.get_playlist_aux_items(str(params))) - ) - elif command == "GET_PLAYLIST_MUSIC": - self.server_to_q.put( - request - + ":" - + json.dumps(self.api.get_playlist_music_items(str(params))) - ) - elif command == "SEARCH_TRACK": - try: - params = json.loads(params) - - self.server_to_q.put( - request - + ":" - + json.dumps( - self.api.get_track_search( - str(params["title"]), str( - params["artist"]) - ) - ) - ) - except Exception as e: - self.logger.log.exception( - "Failed to parse params with message {}, command {}, params {}\n{}".format( - request, command, params, e - ) - ) - - # Catch the handler being killed externally. - except KeyboardInterrupt: - self.logger.log.info("Received KeyboardInterupt") - except SystemExit: - self.logger.log.info("Received SystemExit") - except Exception as e: - self.logger.log.exception( - "Received unexpected exception: {}".format(e)) - del self.logger - _exit(0) diff --git a/build/requirements.txt b/build/requirements.txt index db85fbc..3b2f34f 100644 --- a/build/requirements.txt +++ b/build/requirements.txt @@ -2,6 +2,8 @@ autopep8 pygame==2.0.1 sanic sanic-cors +syncer +aiohttp mutagen sounddevice autopep8 diff --git a/helpers/myradio_api.py b/helpers/myradio_api.py index 4f6f90a..997072e 100644 --- a/helpers/myradio_api.py +++ b/helpers/myradio_api.py @@ -17,7 +17,7 @@ November 2020 """ from typing import Optional -import requests +import aiohttp import json from logging import INFO import os @@ -36,7 +36,17 @@ class MyRadioAPI: self.logger = logger self.config = config - def get_non_api_call(self, url): + async def get(self, url, timeout=10): + async with aiohttp.ClientSession(read_timeout=timeout) as session: + async with session.get(url) as response: + if response.status != 200: + self._logException( + "Failed to get API request. Status code: " + str(response.status) + ) + self._logException(str(response.text())) + return await response.read() + + async def get_non_api_call(self, url): url = "{}{}".format(self.config.state["myradio_base_url"], url) @@ -46,19 +56,12 @@ class MyRadioAPI: url += "?api_key={}".format(self.config.state["myradio_api_key"]) self._log("Requesting non-API URL: " + url) - request = requests.get(url, timeout=10) + request = self.get(url) self._log("Finished request.") - if request.status_code != 200: - self._logException( - "Failed to get API request. Status code: " + str(request.status_code) - ) - self._logException(str(request.content)) - return None - return request - def get_apiv2_call(self, url): + async def get_apiv2_call(self, url): url = "{}/v2{}".format(self.config.state["myradio_api_url"], url) @@ -68,29 +71,22 @@ class MyRadioAPI: url += "?api_key={}".format(self.config.state["myradio_api_key"]) self._log("Requesting API V2 URL: " + url) - request = requests.get(url, timeout=10) + request = self.get(url) self._log("Finished request.") - if request.status_code != 200: - self._logException( - "Failed to get API request. Status code: " + str(request.status_code) - ) - self._logException(str(request.content)) - return None - return request # Show plans - def get_showplans(self): + async def get_showplans(self): url = "/timeslot/currentandnextobjects?n=10" - request = self.get_apiv2_call(url) + request = await self.get_apiv2_call(url) if not request: self._logException("Failed to get list of show plans.") return None - payload = json.loads(request.content)["payload"] + payload = json.loads(await request)["payload"] if not payload["current"]: self._logException("API did not return a current show.") @@ -111,20 +107,20 @@ class MyRadioAPI: # TODO filter out jukebox return shows - def get_showplan(self, timeslotid: int): + async def get_showplan(self, timeslotid: int): url = "/timeslot/{}/showplan".format(timeslotid) - request = self.get_apiv2_call(url) + request = await self.get_apiv2_call(url) if not request: self._logException("Failed to get show plan.") return None - return json.loads(request.content)["payload"] + return json.loads(await request)["payload"] # Audio Library - def get_filename(self, item: PlanItem): + async def get_filename(self, item: PlanItem): format = "mp3" # TODO: Maybe we want this customisable? if item.trackid: itemType = "track" @@ -158,14 +154,14 @@ class MyRadioAPI: return filename # File doesn't exist, download it. - request = self.get_non_api_call(url) + request = await self.get_non_api_call(url) if not request: return None try: with open(filename, "wb") as file: - file.write(request.content) + file.write(await request) except Exception as e: self._logException("Failed to write music file: {}".format(e)) return None @@ -173,35 +169,35 @@ class MyRadioAPI: return filename # Gets the list of managed music playlists. - def get_playlist_music(self): + async def get_playlist_music(self): url = "/playlist/allitonesplaylists" - request = self.get_apiv2_call(url) + request = await self.get_apiv2_call(url) if not request: self._logException("Failed to retrieve music playlists.") return None - return json.loads(request.content)["payload"] + return json.loads(await request)["payload"] # Gets the list of managed aux playlists (sfx, beds etc.) - def get_playlist_aux(self): + async def get_playlist_aux(self): url = "/nipswebPlaylist/allmanagedplaylists" - request = self.get_apiv2_call(url) + request = await self.get_apiv2_call(url) if not request: self._logException("Failed to retrieve music playlists.") return None - return json.loads(request.content)["payload"] + return json.loads(await request)["payload"] # Loads the playlist items for a certain managed aux playlist - def get_playlist_aux_items(self, library_id: str): + async def get_playlist_aux_items(self, library_id: str): # Sometimes they have "aux-", we only need the index. if library_id.index("-") > -1: library_id = library_id[library_id.index("-") + 1:] url = "/nipswebPlaylist/{}/items".format(library_id) - request = self.get_apiv2_call(url) + request = await self.get_apiv2_call(url) if not request: self._logException( @@ -209,13 +205,13 @@ class MyRadioAPI: ) return None - return json.loads(request.content)["payload"] + return json.loads(await request)["payload"] # Loads the playlist items for a certain managed playlist - def get_playlist_music_items(self, library_id: str): + async def get_playlist_music_items(self, library_id: str): url = "/playlist/{}/tracks".format(library_id) - request = self.get_apiv2_call(url) + request = await self.get_apiv2_call(url) if not request: self._logException( @@ -223,21 +219,21 @@ class MyRadioAPI: ) return None - return json.loads(request.content)["payload"] + return json.loads(await request)["payload"] - def get_track_search( + async def get_track_search( self, title: Optional[str], artist: Optional[str], limit: int = 100 ): url = "/track/search?title={}&artist={}&digitised=1&limit={}".format( title if title else "", artist if artist else "", limit ) - request = self.get_apiv2_call(url) + request = await self.get_apiv2_call(url) if not request: self._logException("Failed to search for track.") return None - return json.loads(request.content)["payload"] + return json.loads(await request)["payload"] def _log(self, text: str, level: int = INFO): self.logger.log.log(level, "MyRadio API: " + text) diff --git a/player.py b/player.py index 0d6d47c..4a1796f 100644 --- a/player.py +++ b/player.py @@ -32,6 +32,7 @@ import time from typing import Any, Callable, Dict, List, Optional from pygame import mixer from mutagen.mp3 import MP3 +from syncer import sync from helpers.myradio_api import MyRadioAPI from helpers.state_manager import StateManager @@ -254,7 +255,7 @@ class Player: # Show Plan Related Methods def get_plan(self, message: int): - plan = self.api.get_showplan(message) + plan = sync(self.api.get_showplan(message)) self.clear_channel_plan() channel = self.state.state["channel"] self.logger.log.info(plan) @@ -338,7 +339,7 @@ class Player: reload = True if reload: - loaded_item.filename = self.api.get_filename(item=loaded_item) + loaded_item.filename = sync(self.api.get_filename(item=loaded_item)) if not loaded_item.filename: return False diff --git a/server.py b/server.py index a1d2cfd..f6afd18 100644 --- a/server.py +++ b/server.py @@ -35,7 +35,6 @@ from helpers.logging_manager import LoggingManager from websocket_server import WebsocketServer from web_server import WebServer from player_handler import PlayerHandler -from api_handler import APIHandler from controllers.mattchbox_usb import MattchBox from helpers.the_terminator import Terminator import player @@ -124,13 +123,6 @@ class BAPSicleServer: ) self.player[channel].start() - self.api_to_q = multiprocessing.Queue() - self.api_from_q = multiprocessing.Queue() - self.api_handler = multiprocessing.Process( - target=APIHandler, args=(self.api_to_q, self.api_from_q, self.state) - ) - self.api_handler.start() - self.player_handler = multiprocessing.Process( target=PlayerHandler, args=(self.player_from_q, self.websocket_to_q, self.ui_to_q, self.controller_to_q), @@ -150,7 +142,7 @@ class BAPSicleServer: self.controller_handler.start() self.webserver = multiprocessing.Process( - target=WebServer, args=(self.player_to_q, self.ui_to_q, self.api_to_q, self.api_from_q, self.state) + target=WebServer, args=(self.player_to_q, self.ui_to_q, self.state) ) self.webserver.start() @@ -213,11 +205,6 @@ class BAPSicleServer: self.player_handler.terminate() self.player_handler.join() - print("Stopping API Handler") - if self.api_handler: - self.api_handler.terminate() - self.api_handler.join() - print("Stopping Controllers") if self.controller_handler: self.controller_handler.terminate() diff --git a/web_server.py b/web_server.py index 4330825..848e23e 100644 --- a/web_server.py +++ b/web_server.py @@ -1,3 +1,4 @@ +from helpers.myradio_api import MyRadioAPI from sanic import Sanic from sanic.exceptions import NotFound, abort from sanic.response import html, text, file, redirect @@ -35,9 +36,7 @@ def render_template(file, data, status=200): logger: LoggingManager server_state: StateManager - -api_from_q: Queue -api_to_q: Queue +api: MyRadioAPI player_to_q: List[Queue] = [] player_from_q: List[Queue] = [] @@ -159,7 +158,7 @@ def player_simple(request, channel: int, command: str): abort(404) -@app.route("/player//seek/") +@app.route("/player//seek/") def player_seek(request, channel: int, pos: float): player_to_q[channel].put("UI:SEEK:" + str(pos)) @@ -240,104 +239,39 @@ def plan_clear(request): # API Proxy Endpoints @app.route("/plan/list") -def api_list_showplans(request): +async def api_list_showplans(request): - while not api_from_q.empty(): - api_from_q.get() # Just waste any previous status responses. - - api_to_q.put("LIST_PLANS") - - while True: - try: - response = api_from_q.get_nowait() - if response.startswith("LIST_PLANS:"): - response = response[response.index(":") + 1:] - return text(response) - - except Empty: - pass - - sleep(0.02) + return resp_json(await api.get_showplans()) -@app.route("/library/search/") -def api_search_library(request, type: str): +@app.route("/library/search/track") +async def api_search_library(request): - if type not in ["managed", "track"]: - abort(404) - - while not api_from_q.empty(): - api_from_q.get() # Just waste any previous status responses. - - params = json.dumps( - {"title": request.args.get("title"), "artist": request.args.get("artist")} - ) - command = "SEARCH_TRACK:{}".format(params) - api_to_q.put(command) - - while True: - try: - response = api_from_q.get_nowait() - if response.startswith("SEARCH_TRACK:"): - response = response[len(command)+1:] - return text(response) - - except Empty: - pass - - sleep(0.02) + return resp_json(await api.get_track_search(request.args.get("title"), request.args.get("artist"))) @app.route("/library/playlists/") -def api_get_playlists(request, type: str): +async def api_get_playlists(request, type: str): if type not in ["music", "aux"]: abort(401) - while not api_from_q.empty(): - api_from_q.get() # Just waste any previous status responses. - - command = "LIST_PLAYLIST_{}".format(type.upper()) - api_to_q.put(command) - - while True: - try: - response = api_from_q.get_nowait() - if response.startswith(command): - response = response.split(":", 1)[1] - return text(response) - - except Empty: - pass - - sleep(0.02) + if type == "music": + return resp_json(await api.get_playlist_music()) + else: + return resp_json(await api.get_playlist_aux()) @app.route("/library/playlist//") -def api_get_playlist(request, type: str, library_id: str): +async def api_get_playlist(request, type: str, library_id: str): if type not in ["music", "aux"]: abort(401) - while not api_from_q.empty(): - api_from_q.get() # Just waste any previous status responses. - - command = "GET_PLAYLIST_{}:{}".format(type.upper(), library_id) - api_to_q.put(command) - - while True: - try: - response = api_from_q.get_nowait() - if response.startswith(command): - response = response[len(command) + 1:] - if response == "null": - abort(401) - return text(response) - - except Empty: - pass - - sleep(0.02) + if type == "music": + return resp_json(await api.get_playlist_music_items(library_id)) + else: + return resp_json(await api.get_playlist_aux_items(library_id)) # JSON Outputs @@ -406,15 +340,16 @@ def quit(request): # Don't use reloader, it causes Nested Processes! -def WebServer(player_to: List[Queue], player_from: List[Queue], api_to: Queue, api_from: Queue, state: StateManager): +def WebServer(player_to: List[Queue], player_from: List[Queue], state: StateManager): - global player_to_q, player_from_q, api_to_q, api_from_q, server_state + global player_to_q, player_from_q, server_state, api player_to_q = player_to player_from_q = player_from - api_from_q = api_from - api_to_q = api_to server_state = state + logger = LoggingManager("WebServer") + api = MyRadioAPI(logger, state) + process_title = "Web Server" setproctitle(process_title) CORS(app, supports_credentials=True) # Allow ALL CORS!!! @@ -432,7 +367,7 @@ def WebServer(player_to: List[Queue], player_from: List[Queue], api_to: Queue, a host=server_state.state["host"], port=server_state.state["port"], debug=True, - workers=1, + # workers=10, auto_reload=False # use_reloader=False,