Switch API to async, remove APIHandler.

This commit is contained in:
Matthew Stratford 2021-04-18 03:14:14 +01:00
parent fcf9d8dceb
commit a45cbda37e
6 changed files with 69 additions and 247 deletions

View file

@ -1,99 +0,0 @@
from helpers.state_manager import StateManager
import json
from multiprocessing import Queue, current_process
from helpers.logging_manager import LoggingManager
from helpers.myradio_api import MyRadioAPI
from setproctitle import setproctitle
from os import _exit
# The API handler is needed from the main flask thread to process API requests.
# Flask is not able to handle these during page loads, requests.get() hangs.
class APIHandler:
logger: LoggingManager
api: MyRadioAPI
server_to_q: Queue
server_from_q: Queue
def __init__(self, server_from_q: Queue, server_to_q: Queue, server_config: StateManager):
process_title = "APIHandler"
setproctitle(process_title)
current_process().name = process_title
self.server_from_q = server_from_q
self.server_to_q = server_to_q
self.logger = LoggingManager("APIHandler")
self.api = MyRadioAPI(self.logger, server_config)
self.handle()
def handle(self):
try:
while self.server_from_q:
# Wait for an API request to come in.
request = self.server_from_q.get()
self.logger.log.info("Received Request: {}".format(request))
if request == "LIST_PLANS":
self.server_to_q.put(
request + ":" + json.dumps(self.api.get_showplans())
)
elif request == "LIST_PLAYLIST_MUSIC":
self.server_to_q.put(
request + ":" +
json.dumps(self.api.get_playlist_music())
)
elif request == "LIST_PLAYLIST_AUX":
self.server_to_q.put(
request + ":" + json.dumps(self.api.get_playlist_aux())
)
else:
# Commands with params
command = request[: request.index(":")]
params = request[request.index(":") + 1:]
if command == "GET_PLAYLIST_AUX":
self.server_to_q.put(
request
+ ":"
+ json.dumps(self.api.get_playlist_aux_items(str(params)))
)
elif command == "GET_PLAYLIST_MUSIC":
self.server_to_q.put(
request
+ ":"
+ json.dumps(self.api.get_playlist_music_items(str(params)))
)
elif command == "SEARCH_TRACK":
try:
params = json.loads(params)
self.server_to_q.put(
request
+ ":"
+ json.dumps(
self.api.get_track_search(
str(params["title"]), str(
params["artist"])
)
)
)
except Exception as e:
self.logger.log.exception(
"Failed to parse params with message {}, command {}, params {}\n{}".format(
request, command, params, e
)
)
# Catch the handler being killed externally.
except KeyboardInterrupt:
self.logger.log.info("Received KeyboardInterupt")
except SystemExit:
self.logger.log.info("Received SystemExit")
except Exception as e:
self.logger.log.exception(
"Received unexpected exception: {}".format(e))
del self.logger
_exit(0)

View file

@ -2,6 +2,8 @@ autopep8
pygame==2.0.1
sanic
sanic-cors
syncer
aiohttp
mutagen
sounddevice
autopep8

View file

@ -17,7 +17,7 @@
November 2020
"""
from typing import Optional
import requests
import aiohttp
import json
from logging import INFO
import os
@ -36,7 +36,17 @@ class MyRadioAPI:
self.logger = logger
self.config = config
def get_non_api_call(self, url):
async def get(self, url, timeout=10):
async with aiohttp.ClientSession(read_timeout=timeout) as session:
async with session.get(url) as response:
if response.status != 200:
self._logException(
"Failed to get API request. Status code: " + str(response.status)
)
self._logException(str(response.text()))
return await response.read()
async def get_non_api_call(self, url):
url = "{}{}".format(self.config.state["myradio_base_url"], url)
@ -46,19 +56,12 @@ class MyRadioAPI:
url += "?api_key={}".format(self.config.state["myradio_api_key"])
self._log("Requesting non-API URL: " + url)
request = requests.get(url, timeout=10)
request = self.get(url)
self._log("Finished request.")
if request.status_code != 200:
self._logException(
"Failed to get API request. Status code: " + str(request.status_code)
)
self._logException(str(request.content))
return None
return request
def get_apiv2_call(self, url):
async def get_apiv2_call(self, url):
url = "{}/v2{}".format(self.config.state["myradio_api_url"], url)
@ -68,29 +71,22 @@ class MyRadioAPI:
url += "?api_key={}".format(self.config.state["myradio_api_key"])
self._log("Requesting API V2 URL: " + url)
request = requests.get(url, timeout=10)
request = self.get(url)
self._log("Finished request.")
if request.status_code != 200:
self._logException(
"Failed to get API request. Status code: " + str(request.status_code)
)
self._logException(str(request.content))
return None
return request
# Show plans
def get_showplans(self):
async def get_showplans(self):
url = "/timeslot/currentandnextobjects?n=10"
request = self.get_apiv2_call(url)
request = await self.get_apiv2_call(url)
if not request:
self._logException("Failed to get list of show plans.")
return None
payload = json.loads(request.content)["payload"]
payload = json.loads(await request)["payload"]
if not payload["current"]:
self._logException("API did not return a current show.")
@ -111,20 +107,20 @@ class MyRadioAPI:
# TODO filter out jukebox
return shows
def get_showplan(self, timeslotid: int):
async def get_showplan(self, timeslotid: int):
url = "/timeslot/{}/showplan".format(timeslotid)
request = self.get_apiv2_call(url)
request = await self.get_apiv2_call(url)
if not request:
self._logException("Failed to get show plan.")
return None
return json.loads(request.content)["payload"]
return json.loads(await request)["payload"]
# Audio Library
def get_filename(self, item: PlanItem):
async def get_filename(self, item: PlanItem):
format = "mp3" # TODO: Maybe we want this customisable?
if item.trackid:
itemType = "track"
@ -158,14 +154,14 @@ class MyRadioAPI:
return filename
# File doesn't exist, download it.
request = self.get_non_api_call(url)
request = await self.get_non_api_call(url)
if not request:
return None
try:
with open(filename, "wb") as file:
file.write(request.content)
file.write(await request)
except Exception as e:
self._logException("Failed to write music file: {}".format(e))
return None
@ -173,35 +169,35 @@ class MyRadioAPI:
return filename
# Gets the list of managed music playlists.
def get_playlist_music(self):
async def get_playlist_music(self):
url = "/playlist/allitonesplaylists"
request = self.get_apiv2_call(url)
request = await self.get_apiv2_call(url)
if not request:
self._logException("Failed to retrieve music playlists.")
return None
return json.loads(request.content)["payload"]
return json.loads(await request)["payload"]
# Gets the list of managed aux playlists (sfx, beds etc.)
def get_playlist_aux(self):
async def get_playlist_aux(self):
url = "/nipswebPlaylist/allmanagedplaylists"
request = self.get_apiv2_call(url)
request = await self.get_apiv2_call(url)
if not request:
self._logException("Failed to retrieve music playlists.")
return None
return json.loads(request.content)["payload"]
return json.loads(await request)["payload"]
# Loads the playlist items for a certain managed aux playlist
def get_playlist_aux_items(self, library_id: str):
async def get_playlist_aux_items(self, library_id: str):
# Sometimes they have "aux-<ID>", we only need the index.
if library_id.index("-") > -1:
library_id = library_id[library_id.index("-") + 1:]
url = "/nipswebPlaylist/{}/items".format(library_id)
request = self.get_apiv2_call(url)
request = await self.get_apiv2_call(url)
if not request:
self._logException(
@ -209,13 +205,13 @@ class MyRadioAPI:
)
return None
return json.loads(request.content)["payload"]
return json.loads(await request)["payload"]
# Loads the playlist items for a certain managed playlist
def get_playlist_music_items(self, library_id: str):
async def get_playlist_music_items(self, library_id: str):
url = "/playlist/{}/tracks".format(library_id)
request = self.get_apiv2_call(url)
request = await self.get_apiv2_call(url)
if not request:
self._logException(
@ -223,21 +219,21 @@ class MyRadioAPI:
)
return None
return json.loads(request.content)["payload"]
return json.loads(await request)["payload"]
def get_track_search(
async def get_track_search(
self, title: Optional[str], artist: Optional[str], limit: int = 100
):
url = "/track/search?title={}&artist={}&digitised=1&limit={}".format(
title if title else "", artist if artist else "", limit
)
request = self.get_apiv2_call(url)
request = await self.get_apiv2_call(url)
if not request:
self._logException("Failed to search for track.")
return None
return json.loads(request.content)["payload"]
return json.loads(await request)["payload"]
def _log(self, text: str, level: int = INFO):
self.logger.log.log(level, "MyRadio API: " + text)

View file

@ -32,6 +32,7 @@ import time
from typing import Any, Callable, Dict, List, Optional
from pygame import mixer
from mutagen.mp3 import MP3
from syncer import sync
from helpers.myradio_api import MyRadioAPI
from helpers.state_manager import StateManager
@ -254,7 +255,7 @@ class Player:
# Show Plan Related Methods
def get_plan(self, message: int):
plan = self.api.get_showplan(message)
plan = sync(self.api.get_showplan(message))
self.clear_channel_plan()
channel = self.state.state["channel"]
self.logger.log.info(plan)
@ -338,7 +339,7 @@ class Player:
reload = True
if reload:
loaded_item.filename = self.api.get_filename(item=loaded_item)
loaded_item.filename = sync(self.api.get_filename(item=loaded_item))
if not loaded_item.filename:
return False

View file

@ -35,7 +35,6 @@ from helpers.logging_manager import LoggingManager
from websocket_server import WebsocketServer
from web_server import WebServer
from player_handler import PlayerHandler
from api_handler import APIHandler
from controllers.mattchbox_usb import MattchBox
from helpers.the_terminator import Terminator
import player
@ -124,13 +123,6 @@ class BAPSicleServer:
)
self.player[channel].start()
self.api_to_q = multiprocessing.Queue()
self.api_from_q = multiprocessing.Queue()
self.api_handler = multiprocessing.Process(
target=APIHandler, args=(self.api_to_q, self.api_from_q, self.state)
)
self.api_handler.start()
self.player_handler = multiprocessing.Process(
target=PlayerHandler,
args=(self.player_from_q, self.websocket_to_q, self.ui_to_q, self.controller_to_q),
@ -150,7 +142,7 @@ class BAPSicleServer:
self.controller_handler.start()
self.webserver = multiprocessing.Process(
target=WebServer, args=(self.player_to_q, self.ui_to_q, self.api_to_q, self.api_from_q, self.state)
target=WebServer, args=(self.player_to_q, self.ui_to_q, self.state)
)
self.webserver.start()
@ -213,11 +205,6 @@ class BAPSicleServer:
self.player_handler.terminate()
self.player_handler.join()
print("Stopping API Handler")
if self.api_handler:
self.api_handler.terminate()
self.api_handler.join()
print("Stopping Controllers")
if self.controller_handler:
self.controller_handler.terminate()

View file

@ -1,3 +1,4 @@
from helpers.myradio_api import MyRadioAPI
from sanic import Sanic
from sanic.exceptions import NotFound, abort
from sanic.response import html, text, file, redirect
@ -35,9 +36,7 @@ def render_template(file, data, status=200):
logger: LoggingManager
server_state: StateManager
api_from_q: Queue
api_to_q: Queue
api: MyRadioAPI
player_to_q: List[Queue] = []
player_from_q: List[Queue] = []
@ -159,7 +158,7 @@ def player_simple(request, channel: int, command: str):
abort(404)
@app.route("/player/<channel:int>/seek/<pos:float>")
@app.route("/player/<channel:int>/seek/<pos:number>")
def player_seek(request, channel: int, pos: float):
player_to_q[channel].put("UI:SEEK:" + str(pos))
@ -240,104 +239,39 @@ def plan_clear(request):
# API Proxy Endpoints
@app.route("/plan/list")
def api_list_showplans(request):
async def api_list_showplans(request):
while not api_from_q.empty():
api_from_q.get() # Just waste any previous status responses.
api_to_q.put("LIST_PLANS")
while True:
try:
response = api_from_q.get_nowait()
if response.startswith("LIST_PLANS:"):
response = response[response.index(":") + 1:]
return text(response)
except Empty:
pass
sleep(0.02)
return resp_json(await api.get_showplans())
@app.route("/library/search/<type>")
def api_search_library(request, type: str):
@app.route("/library/search/track")
async def api_search_library(request):
if type not in ["managed", "track"]:
abort(404)
while not api_from_q.empty():
api_from_q.get() # Just waste any previous status responses.
params = json.dumps(
{"title": request.args.get("title"), "artist": request.args.get("artist")}
)
command = "SEARCH_TRACK:{}".format(params)
api_to_q.put(command)
while True:
try:
response = api_from_q.get_nowait()
if response.startswith("SEARCH_TRACK:"):
response = response[len(command)+1:]
return text(response)
except Empty:
pass
sleep(0.02)
return resp_json(await api.get_track_search(request.args.get("title"), request.args.get("artist")))
@app.route("/library/playlists/<type:string>")
def api_get_playlists(request, type: str):
async def api_get_playlists(request, type: str):
if type not in ["music", "aux"]:
abort(401)
while not api_from_q.empty():
api_from_q.get() # Just waste any previous status responses.
command = "LIST_PLAYLIST_{}".format(type.upper())
api_to_q.put(command)
while True:
try:
response = api_from_q.get_nowait()
if response.startswith(command):
response = response.split(":", 1)[1]
return text(response)
except Empty:
pass
sleep(0.02)
if type == "music":
return resp_json(await api.get_playlist_music())
else:
return resp_json(await api.get_playlist_aux())
@app.route("/library/playlist/<type:string>/<library_id:string>")
def api_get_playlist(request, type: str, library_id: str):
async def api_get_playlist(request, type: str, library_id: str):
if type not in ["music", "aux"]:
abort(401)
while not api_from_q.empty():
api_from_q.get() # Just waste any previous status responses.
command = "GET_PLAYLIST_{}:{}".format(type.upper(), library_id)
api_to_q.put(command)
while True:
try:
response = api_from_q.get_nowait()
if response.startswith(command):
response = response[len(command) + 1:]
if response == "null":
abort(401)
return text(response)
except Empty:
pass
sleep(0.02)
if type == "music":
return resp_json(await api.get_playlist_music_items(library_id))
else:
return resp_json(await api.get_playlist_aux_items(library_id))
# JSON Outputs
@ -406,15 +340,16 @@ def quit(request):
# Don't use reloader, it causes Nested Processes!
def WebServer(player_to: List[Queue], player_from: List[Queue], api_to: Queue, api_from: Queue, state: StateManager):
def WebServer(player_to: List[Queue], player_from: List[Queue], state: StateManager):
global player_to_q, player_from_q, api_to_q, api_from_q, server_state
global player_to_q, player_from_q, server_state, api
player_to_q = player_to
player_from_q = player_from
api_from_q = api_from
api_to_q = api_to
server_state = state
logger = LoggingManager("WebServer")
api = MyRadioAPI(logger, state)
process_title = "Web Server"
setproctitle(process_title)
CORS(app, supports_credentials=True) # Allow ALL CORS!!!
@ -432,7 +367,7 @@ def WebServer(player_to: List[Queue], player_from: List[Queue], api_to: Queue, a
host=server_state.state["host"],
port=server_state.state["port"],
debug=True,
workers=1,
# workers=10,
auto_reload=False
# use_reloader=False,