Add test to check file playing.

This commit is contained in:
Matthew Stratford 2021-04-06 22:39:08 +01:00
parent 168d44c66c
commit 02766583c0

View file

@ -2,13 +2,17 @@ from queue import Empty
import unittest
import multiprocessing
import time
import os
import json
from player import Player
from helpers.logging_manager import LoggingManager
# How long to wait (by default) in secs for the player to respond.
TIMEOUT_MSG_MAX_S = 10
TIMEOUT_QUIT_S = 10
test_dir = dir_path = os.path.dirname(os.path.realpath(__file__)) + "/"
resource_dir = test_dir + "resources/"
class TestPlayer(unittest.TestCase):
player: multiprocessing.Process
@ -16,6 +20,7 @@ class TestPlayer(unittest.TestCase):
player_to_q: multiprocessing.Queue
logger: LoggingManager
# initialization logic for the test suite declared in the test module
# code that is executed before all tests in one test run
@classmethod
@ -35,6 +40,7 @@ class TestPlayer(unittest.TestCase):
self.player_to_q = multiprocessing.Queue()
self.player = multiprocessing.Process(target=Player, args=(-1, self.player_to_q, self.player_from_q))
self.player.start()
self._send_msg_wait_OKAY("CLEAR") # Empty any previous track items.
# clean up logic
# code that is executed after each test
@ -62,8 +68,7 @@ class TestPlayer(unittest.TestCase):
got_anything = True
source = response[:response.index(":")]
if source in sources_filter:
if response.startswith("TEST:"+msg):
return response[len("TEST:"+msg):]
return response[len(source+":"+msg)+1:] # +1 to remove trailing : on source.
except Empty:
pass
finally:
@ -75,13 +80,79 @@ class TestPlayer(unittest.TestCase):
self._send_msg(msg)
return self._wait_for_msg(msg, sources_filter, timeout)
def _send_msg_wait_OKAY(self, msg:str, sources_filter=["TEST"], timeout: int = TIMEOUT_MSG_MAX_S):
response = self._send_msg_and_wait(msg, sources_filter, timeout)
self.assertTrue(response)
self.assertTrue(isinstance(response, str))
response = response.split(":", 1)
self.assertEqual(response[0], "OKAY")
if len(response) > 1:
return response[1]
return None
def test_player_running(self):
response = self._send_msg_and_wait("STATUS")
response = self._send_msg_wait_OKAY("STATUS")
# assert the status code of the response
self.assertTrue(response)
json_obj = json.loads(response)
self.assertTrue(json_obj["initialised"])
def test_player_play(self):
item = {
"timeslotitemid": "0",
"managedid": "1",
"filename": resource_dir + "2sec.mp3",
"weight": "0",
"title": "5sec",
"length": "00:00:02"
}
response = self._send_msg_wait_OKAY("ADD:"+ json.dumps(item))
# Should return nothing, just OKAY.
self.assertFalse(response)
# Check we can load the file
response = self._send_msg_wait_OKAY("LOAD:0")
self.assertFalse(response)
# Check we can play the file
response = self._send_msg_wait_OKAY("PLAY")
self.assertFalse(response)
time.sleep(1)
response = self._send_msg_wait_OKAY("STATUS")
self.assertTrue(response)
json_obj = json.loads(response)
self.assertTrue(json_obj["playing"])
# Check the file stops playing.
# TODO: Make sure replay / play on load not enabled.
time.sleep(2)
response = self._send_msg_wait_OKAY("STATUS")
self.assertTrue(response)
json_obj = json.loads(response)
self.assertFalse(json_obj["playing"])
# runs the unit tests in the module
if __name__ == '__main__':