Add a bit of typing to appease pylance

This commit is contained in:
Matthew Stratford 2020-12-19 14:57:37 +00:00
parent fc3e5db08c
commit 6d587b1e2b
No known key found for this signature in database
GPG key ID: 9E53C8B3F0B57395
11 changed files with 143 additions and 98 deletions

View file

@ -4,4 +4,5 @@ mutagen
sounddevice
autopep8
setproctitle
pyttsx3
pyttsx3
typing_extensions

View file

@ -1,26 +1,26 @@
from typing import Any, Dict
import sounddevice as sd
import importlib
from helpers.os_environment import isMacOS
class DeviceManager():
@classmethod
def _isOutput(self, device):
def _isOutput(cls, device:Dict[str,Any]) -> bool:
return device["max_output_channels"] > 0
@classmethod
def _getDevices(self):
def _getDevices(cls) -> sd.DeviceList:
# To update the list of devices
# Sadly this doesn't work on MacOS.
if not isMacOS():
sd._terminate()
sd._initialize()
devices = sd.query_devices()
devices: sd.DeviceList = sd.query_devices()
return devices
@classmethod
def getOutputs(self):
outputs = filter(self._isOutput, self._getDevices())
def getOutputs(cls) -> sd.DeviceList:
outputs: sd.DeviceList = filter(cls._isOutput, cls._getDevices())
return outputs

View file

@ -5,9 +5,9 @@ import os
class LoggingManager():
logger = None
logger: logging.Logger
def __init__(self, name):
def __init__(self, name: str):
self.logger = logging.getLogger(name)
filename: str = resolve_external_file_path("/logs/" + name + ".log")
@ -33,5 +33,5 @@ class LoggingManager():
logging.shutdown()
@property
def log(self):
def log(self) -> logging.Logger:
return self.logger

View file

@ -26,7 +26,7 @@ from helpers.os_environment import resolve_external_file_path
class MyRadioAPI():
@classmethod
def get_filename(self, item: PlanItem):
def get_filename(cls, item: PlanItem):
format = "mp3" # TODO: Maybe we want this customisable?
if item.trackId:
itemType = "track"

View file

@ -22,11 +22,11 @@ def isMacOS():
# This must be used to that relative file paths resolve inside the bundled versions.
def resolve_local_file_path(relative_path):
def resolve_local_file_path(relative_path: str):
""" Get absolute path to resource, works for dev and for PyInstaller """
try:
# PyInstaller creates a temp folder and stores path in _MEIPASS
base_path = sys._MEIPASS
base_path:str = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
@ -35,7 +35,7 @@ def resolve_local_file_path(relative_path):
# Use this to resolve paths to resources not bundled within the bundled exe.
def resolve_external_file_path(relative_path):
def resolve_external_file_path(relative_path: str):
if (not relative_path.startswith("/")):
relative_path = "/" + relative_path
# Pass through abspath to correct any /'s with \'s on Windows

View file

@ -1,15 +1,18 @@
from helpers.types import PlayerState
import json
import os
import logging
from logging import CRITICAL, INFO
import time
from datetime import datetime
from copy import copy
from typing import List
from plan import PlanItem
from helpers.logging_manager import LoggingManager
from helpers.os_environment import resolve_external_file_path
from helpers.types import ServerState
from typing import Any, Dict, List, NewType, Optional, Union
class StateManager:
filepath = None
@ -32,7 +35,7 @@ class StateManager:
# Try creating the file.
open(self.filepath, "x")
except:
self._log("Failed to create state file.", logging.CRITICAL)
self._log("Failed to create state file.", CRITICAL)
return
with open(self.filepath, 'r') as file:
@ -101,7 +104,7 @@ class StateManager:
with open(self.filepath, "w") as file:
file.write(state_json)
def update(self, key, value, index = -1):
def update(self, key: str, value: Any, index: int = -1):
update_file = True
if (key in self.__rate_limit_params_until.keys()):
# The key we're trying to update is expected to be updating very often,
@ -135,10 +138,10 @@ class StateManager:
if (update_file == True):
self.write_to_file(state_to_update)
def _log(self, text, level=logging.INFO):
def _log(self, text:str, level: int = INFO):
self.logger.log.log(level, "State Manager: " + text)
def _logException(self, text):
def _logException(self, text:str):
self.logger.log.exception("State Manager: " + text)
@property

36
helpers/types.py Normal file
View file

@ -0,0 +1,36 @@
from enum import Enum
from plan import PlanItem
from typing import List, Optional
from typing_extensions import TypedDict
class ServerState(TypedDict):
server_version: str
server_name: str
host: str
port: int
num_channels: int
class RepeatMode(Enum):
NONE = 0
ONE = 1
ALL = 2
class PlayerState(TypedDict):
initialised: bool
loaded_item: Optional[PlanItem]
channel: int
playing: bool
paused: bool
loaded: bool
pos: float
pos_offset: float
pos_true: float
remaining: float
length: float
auto_advance: bool
repeat: RepeatMode
play_on_load: bool
output: Optional[str]
show_plan: List[PlanItem]
last_updated: str

14
plan.py
View file

@ -12,7 +12,7 @@
November 2020
"""
from typing import Dict
from typing import Any, Dict, Optional
import os
class PlanItem:
@ -20,8 +20,8 @@ class PlanItem:
_filename: str = ""
_title: str = ""
_artist: str = ""
_trackId: int = None
_managedId: int = None
_trackId: Optional[int] = None
_managedId: Optional[int] = None
@property
def timeslotItemId(self) -> int:
@ -40,15 +40,15 @@ class PlanItem:
return "{0} - {1}".format(self._title, self._artist) if self._artist else self._title
@property
def trackId(self) -> int:
def trackId(self) -> Optional[int]:
return self._trackId
@property
def managedId(self) -> int:
def managedId(self) -> Optional[int]:
return self._managedId
@property
def __dict__(self) -> Dict[str, any]:
def __dict__(self):
return {
"timeslotItemId": self.timeslotItemId,
"trackId": self._trackId,
@ -59,7 +59,7 @@ class PlanItem:
"filename": self.filename
}
def __init__(self, new_item: Dict[str, any]):
def __init__(self, new_item: Dict[str, Any]):
self._timeslotItemId = new_item["timeslotItemId"]
self._trackId = new_item["trackId"] if "trackId" in new_item else None
self._managedId = new_item["managedId"] if "managedId" in new_item else None

103
player.py
View file

@ -19,6 +19,7 @@
# It is key that whenever the parent server tells us to do something
# that we respond with something, FAIL or OKAY. The server doesn't like to be kept waiting.
from helpers.types import PlayerState, RepeatMode
from queue import Empty
import multiprocessing
import setproctitle
@ -27,7 +28,7 @@ import json
import time
import sys
from typing import Callable, Dict, List
from typing import Any, Callable, Dict, List, Optional
from plan import PlanItem
@ -93,7 +94,7 @@ class Player():
return False
@property
def isPaused(self):
def isPaused(self) -> bool:
return self.state.state["paused"]
@property
@ -107,7 +108,7 @@ class Player():
# We're not playing now, so we can quickly test run
# If that works, we're loaded.
try:
position = self.state.state["pos"]
position: float = self.state.state["pos"]
mixer.music.set_volume(0)
mixer.music.play(0)
except:
@ -137,7 +138,7 @@ class Player():
### Audio Playout Related Methods
def play(self, pos=0):
def play(self, pos: float = 0):
try:
mixer.music.play(0, pos)
self.state.update("pos_offset", pos)
@ -158,7 +159,7 @@ class Player():
def unpause(self):
if not self.isPlaying:
position = self.state.state["pos_true"]
position: float = self.state.state["pos_true"]
try:
self.play(position)
except:
@ -182,7 +183,7 @@ class Player():
return True
# return False
def seek(self, pos):
def seek(self, pos: float) -> bool:
if self.isPlaying:
try:
self.play(pos)
@ -224,14 +225,14 @@ class Player():
### Show Plan Related Methods
def add_to_plan(self, new_item: Dict[str, any]) -> bool:
def add_to_plan(self, new_item: Dict[str, Any]) -> bool:
self.state.update("show_plan", self.state.state["show_plan"] + [PlanItem(new_item)])
return True
def remove_from_plan(self, timeslotItemId: int) -> bool:
plan_copy = copy.copy(self.state.state["show_plan"])
for i in range(len(plan_copy)):
if plan_copy[i].timeslotItemId == timeslotItemId:
plan_copy: List[PlanItem] = copy.copy(self.state.state["show_plan"])
for i in plan_copy:
if i.timeslotItemId == timeslotItemId:
plan_copy.remove(i)
self.state.update("show_plan", plan_copy)
return True
@ -245,25 +246,25 @@ class Player():
if not self.isPlaying:
self.unload()
found: bool = False
showplan = self.state.state["show_plan"]
loaded_item: PlanItem
loaded_item: Optional[PlanItem] = None
for i in range(len(showplan)):
if showplan[i].timeslotItemId == timeslotItemId:
loaded_item = showplan[i]
found = True
break
if not found:
if loaded_item == None:
self.logger.log.error("Failed to find timeslotItemId: {}".format(timeslotItemId))
return False
if (loaded_item.filename == "" or loaded_item.filename == None):
loaded_item.filename = MyRadioAPI.get_filename(item = loaded_item)
if not loaded_item.filename:
return False
self.state.update("loaded_item", loaded_item)
for i in range(len(showplan)):
@ -281,11 +282,11 @@ class Player():
return False
try:
if ".mp3" in filename:
song = MP3(filename)
if ".mp3" in loaded_item.filename:
song = MP3(loaded_item.filename)
self.state.update("length", song.info.length)
else:
self.state.update("length", mixer.Sound(filename).get_length()/1000)
self.state.update("length", mixer.Sound(loaded_item.filename).get_length()/1000)
except:
self.logger.log.exception("Failed to update the length of item.")
return False
@ -309,7 +310,7 @@ class Player():
except:
self.logger.log.exception("Failed to quit mixer.")
def output(self, name=None):
def output(self, name: Optional[str] = None):
wasPlaying = self.state.state["playing"]
name = None if name == "none" else name
@ -332,7 +333,7 @@ class Player():
return True
def _updateState(self, pos=None):
def _updateState(self, pos: Optional[float] = None):
self.state.update("initialised", self.isInit)
if self.isInit:
@ -350,32 +351,35 @@ class Player():
self.state.update("remaining", self.state.state["length"] - self.state.state["pos_true"])
if self.state.state["remaining"] == 0 and self.state.state["loaded_item"]:
# Track has ended
print("Finished", self.state.state["loaded_item"].name)
loaded_item = self.state.state["loaded_item"]
if loaded_item == None or self.state.state["remaining"] != 0:
return
# Repeat 1
if self.state.state["repeat"] == "ONE":
self.play()
# Track has ended
print("Finished", loaded_item.name)
# Auto Advance
elif self.state.state["auto_advance"]:
for i in range(len(self.state.state["show_plan"])):
if self.state.state["show_plan"][i].timeslotItemId == self.state.state["loaded_item"].timeslotItemId:
if len(self.state.state["show_plan"]) > i+1:
self.load(self.state.state["show_plan"][i+1].timeslotItemId)
break
# Repeat 1
if self.state.state["repeat"] == "ONE":
self.play()
# Repeat All
elif self.state.state["repeat"] == "ALL":
self.load(self.state.state["show_plan"][0].timeslotItemId)
# Auto Advance
elif self.state.state["auto_advance"]:
for i in range(len(self.state.state["show_plan"])):
if self.state.state["show_plan"][i].timeslotItemId == loaded_item.timeslotItemId:
if len(self.state.state["show_plan"]) > i+1:
self.load(self.state.state["show_plan"][i+1].timeslotItemId)
break
# Play on Load
if self.state.state["play_on_load"]:
self.play()
# Repeat All
elif self.state.state["repeat"] == "ALL":
self.load(self.state.state["show_plan"][0].timeslotItemId)
# Play on Load
if self.state.state["play_on_load"]:
self.play()
def _retMsg(self, msg, okay_str=False):
def _retMsg(self, msg: Any, okay_str: Any = False):
response = self.last_msg + ":"
if msg == True:
response += "OKAY"
@ -389,7 +393,7 @@ class Player():
if self.out_q:
self.out_q.put(response)
def __init__(self, channel, in_q, out_q):
def __init__(self, channel: int, in_q: multiprocessing.Queue, out_q: multiprocessing.Queue):
process_title = "Player: Channel " + str(channel)
setproctitle.setproctitle(process_title)
@ -406,15 +410,16 @@ class Player():
loaded_state = copy.copy(self.state.state)
if loaded_state["output"]:
self.logger.log.info("Setting output to: " + loaded_state["output"])
self.logger.log.info("Setting output to: " + str(loaded_state["output"]))
self.output(loaded_state["output"])
else:
self.logger.log.info("Using default output device.")
self.output()
if loaded_state["loaded_item"]:
self.logger.log.info("Loading filename: " + str(loaded_state["loaded_item"].filename))
self.load(loaded_state["loaded_item"].timeslotItemId)
loaded_item = loaded_state["loaded_item"]
if loaded_item:
self.logger.log.info("Loading filename: " + str(loaded_item.filename))
self.load(loaded_item.timeslotItemId)
if loaded_state["pos_true"] != 0:
self.logger.log.info("Seeking to pos_true: " + str(loaded_state["pos_true"]))
@ -447,7 +452,7 @@ class Player():
elif self.isInit:
message_types: Dict[str, Callable[any, bool]] = { # TODO Check Types
message_types: Dict[str, Callable[..., Any]] = { # TODO Check Types
"STATUS": lambda: self._retMsg(self.status, True),
# Audio Playout
@ -503,7 +508,7 @@ class Player():
sys.exit(0)
def showOutput(in_q, out_q):
def showOutput(in_q: multiprocessing.Queue, out_q: multiprocessing.Queue):
print("Starting showOutput().")
while True:
time.sleep(0.01)
@ -515,8 +520,8 @@ if __name__ == "__main__":
if isMacOS():
multiprocessing.set_start_method("spawn", True)
in_q = multiprocessing.Queue()
out_q = multiprocessing.Queue()
in_q: multiprocessing.Queue[Any] = multiprocessing.Queue()
out_q: multiprocessing.Queue[Any] = multiprocessing.Queue()
outputProcess = multiprocessing.Process(
target=showOutput,

View file

@ -12,10 +12,10 @@
Date:
October, November 2020
"""
import multiprocessing
import player
from flask import Flask, render_template, send_from_directory, request, jsonify
from typing import Any, Optional
import json
import setproctitle
import logging
@ -77,7 +77,7 @@ stopping = False
# General Endpoints
@app.errorhandler(404)
def page_not_found(e):
def page_not_found(e: Any):
data = {
'ui_page': "404",
"ui_title": "404"
@ -161,7 +161,7 @@ def restart_server():
@app.route("/player/<int:channel>/play")
def play(channel):
def play(channel: int):
channel_to_q[channel].put("PLAY")
@ -169,7 +169,7 @@ def play(channel):
@app.route("/player/<int:channel>/pause")
def pause(channel):
def pause(channel: int):
channel_to_q[channel].put("PAUSE")
@ -177,7 +177,7 @@ def pause(channel):
@app.route("/player/<int:channel>/unpause")
def unPause(channel):
def unPause(channel: int):
channel_to_q[channel].put("UNPAUSE")
@ -185,15 +185,15 @@ def unPause(channel):
@app.route("/player/<int:channel>/stop")
def stop(channel):
def stop(channel: int):
channel_to_q[channel].put("STOP")
return ui_status()
@app.route("/player/<int:channel>/seek/<int:pos>")
def seek(channel, pos):
@app.route("/player/<int:channel>/seek/<float:pos>")
def seek(channel: int, pos: float):
channel_to_q[channel].put("SEEK:" + str(pos))
@ -201,8 +201,8 @@ def seek(channel, pos):
@app.route("/player/<int:channel>/output/<name>")
def output(channel, name):
channel_to_q[channel].put("OUTPUT:" + name)
def output(channel: int, name: Optional[str]):
channel_to_q[channel].put("OUTPUT:" + str(name))
return ui_status()
@ -213,7 +213,7 @@ def autoadvance(channel: int, state: int):
@app.route("/player/<int:channel>/repeat/<state>")
def repeat(channel: int, state):
def repeat(channel: int, state: str):
channel_to_q[channel].put("REPEAT:" + state.upper())
return ui_status()
@ -232,7 +232,7 @@ def load(channel:int, timeslotItemId: int):
@app.route("/player/<int:channel>/unload")
def unload(channel):
def unload(channel: int):
channel_to_q[channel].put("UNLOAD")
@ -241,7 +241,7 @@ def unload(channel):
@app.route("/player/<int:channel>/add", methods=["POST"])
def add_to_plan(channel: int):
new_item: Dict[str, any] = {
new_item: Dict[str, Any] = {
"timeslotItemId": int(request.form["timeslotItemId"]),
"filename": request.form["filename"],
"title": request.form["title"],
@ -252,8 +252,8 @@ def add_to_plan(channel: int):
return new_item
@app.route("/player/<int:channel>/move/<int:timeslotItemId>/<int:position>")
def move_plan(channel: int, timeslotItemId: int, position: int):
@app.route("/player/<int:channel>/move/<int:timeslotItemId>/<float:position>")
def move_plan(channel: int, timeslotItemId: int, position: float):
channel_to_q[channel].put("MOVE:" + json.dumps({"timeslotItemId": timeslotItemId, "position": position}))
# TODO Return
@ -261,7 +261,7 @@ def move_plan(channel: int, timeslotItemId: int, position: int):
@app.route("/player/<int:channel>/remove/<int:timeslotItemId>")
def remove_plan(channel: int, timeslotItemId: int):
channel_to_q[channel].put("REMOVE:" + timeslotItemId)
channel_to_q[channel].put("REMOVE:" + str(timeslotItemId))
# TODO Return
return True
@ -284,7 +284,7 @@ def channel_json(channel: int):
except:
return status(channel)
def status(channel):
def status(channel: int):
channel_to_q[channel].put("STATUS")
while True:
response = channel_from_q[channel].get()
@ -320,7 +320,7 @@ def clear_all_channels():
@app.route('/static/<path:path>')
def send_static(path):
def send_static(path: str):
return send_from_directory('ui-static', path)
@ -380,7 +380,7 @@ def startServer():
)
text_to_speach.runAndWait()
new_item: Dict[str, any] = {
new_item: Dict[str,Any] = {
"timeslotItemId": 0,
"filename": "dev/welcome.mp3",
"title": "Welcome to BAPSicle",

View file

@ -6,16 +6,16 @@
{% if data %}
<code>
{% for player in data.channels %}
<a href="/player/{{player.channel}}/play">Play</a>
<a href="/player/{{player.channel}}/play">Play</a>
{% if player.paused %}
<a href="/player/{{player.channel}}/unpause">UnPause</a>
{% else %}
<a href="/player/{{player.channel}}/pause">Pause</a>
{% endif %}
<a href="/player/{{player.channel}}/stop">Stop</a>
<a href="/player/{{player.channel}}/seek/50">Seek 50</a>
<a href="/player/{{player.channel}}/seek/50.0">Seek 50</a>
{{player}}<br>
{% endfor %}
</code>
{% endif %}
{% endblock %}
{% endblock %}