from queue import Empty import unittest import multiprocessing import time import os import json from typing import List from player import Player from helpers.logging_manager import LoggingManager # How long to wait (by default) in secs for the player to respond. TIMEOUT_MSG_MAX_S = 10 TIMEOUT_QUIT_S = 10 test_dir = dir_path = os.path.dirname(os.path.realpath(__file__)) + "/" resource_dir = test_dir + "resources/" # All because constant dicts are still mutable in python :/ def getPlanItem(length: int, weight: int): if length not in [1,2,5]: raise ValueError("Invalid length dummy planitem.") # TODO: This assumes we're handling one channel where timeslotitemid is unique item = { "timeslotitemid": weight, "managedid": str(length), "filename": resource_dir + str(length)+"sec.mp3", "weight": weight, "title": str(length)+"sec", "length": "00:00:0{}".format(length) } return item def getPlanItemJSON(length: int, weight: int): return str(json.dumps(getPlanItem(length, weight))) class TestPlayer(unittest.TestCase): player: multiprocessing.Process player_from_q: multiprocessing.Queue player_to_q: multiprocessing.Queue logger: LoggingManager # initialization logic for the test suite declared in the test module # code that is executed before all tests in one test run @classmethod def setUpClass(cls): cls.logger = LoggingManager("Test_Player") # clean up logic for the test suite declared in the test module # code that is executed after all tests in one test run @classmethod def tearDownClass(cls): pass # initialization logic # code that is executed before each test def setUp(self): self.player_from_q = multiprocessing.Queue() self.player_to_q = multiprocessing.Queue() self.player = multiprocessing.Process(target=Player, args=(-1, self.player_to_q, self.player_from_q)) self.player.start() self._send_msg_wait_OKAY("CLEAR") # Empty any previous track items. self._send_msg_wait_OKAY("STOP") self._send_msg_wait_OKAY("UNLOAD") self._send_msg_wait_OKAY("PLAYONLOAD:False") self._send_msg_wait_OKAY("REPEAT:none") self._send_msg_wait_OKAY("AUTOADVANCE:True") # clean up logic # code that is executed after each test def tearDown(self): # Try to kill it, waits the timeout. if self._send_msg_and_wait("QUIT"): self.player.join(timeout=TIMEOUT_QUIT_S) self.logger.log.info("Player quit successfully.") else: self.logger.log.error("No response on teardown, terminating player.") # It's brain dead :/ self.player.terminate() def _send_msg(self, msg: str): self.player_to_q.put("TEST:{}".format(msg)) def _wait_for_msg(self, msg: str, sources_filter=["TEST"], timeout:int = TIMEOUT_MSG_MAX_S): elapsed = 0 got_anything = False while elapsed < timeout: try: response: str = self.player_from_q.get_nowait() if response: self.logger.log.info("Received response: {}\nWas looking for {}:{}".format(response, sources_filter, msg)) got_anything = True source = response[:response.index(":")] if source in sources_filter: return response[len(source+":"+msg)+1:] # +1 to remove trailing : on source. except Empty: pass finally: time.sleep(0.01) elapsed += 0.01 return False if got_anything else None def _send_msg_and_wait(self, msg:str, sources_filter=["TEST"], timeout: int = TIMEOUT_MSG_MAX_S): self._send_msg(msg) return self._wait_for_msg(msg, sources_filter, timeout) def _send_msg_wait_OKAY(self, msg:str, sources_filter=["TEST"], timeout: int = TIMEOUT_MSG_MAX_S): response = self._send_msg_and_wait(msg, sources_filter, timeout) self.assertTrue(response) self.assertTrue(isinstance(response, str)) response = response.split(":", 1) self.assertEqual(response[0], "OKAY") if len(response) > 1: return response[1] return None def test_player_running(self): response = self._send_msg_wait_OKAY("STATUS") self.assertTrue(response) json_obj = json.loads(response) self.assertTrue(json_obj["initialised"]) def test_player_play(self): response = self._send_msg_wait_OKAY("ADD:" + getPlanItemJSON(2,0)) # Should return nothing, just OKAY. self.assertFalse(response) # Check we can load the file response = self._send_msg_wait_OKAY("LOAD:0") self.assertFalse(response) # Check we can play the file response = self._send_msg_wait_OKAY("PLAY") self.assertFalse(response) time.sleep(1) response = self._send_msg_wait_OKAY("STATUS") self.assertTrue(response) json_obj = json.loads(response) self.assertTrue(json_obj["playing"]) # Check the file stops playing. # TODO: Make sure replay / play on load not enabled. time.sleep(2) response = self._send_msg_wait_OKAY("STATUS") self.assertTrue(response) json_obj = json.loads(response) self.assertFalse(json_obj["playing"]) # This test checks if the player progresses to the next item and plays on load. def test_play_on_load(self): self._send_msg_wait_OKAY("ADD:"+ getPlanItemJSON(2,0)) self._send_msg_wait_OKAY("ADD:"+ getPlanItemJSON(2,1)) self._send_msg_wait_OKAY("PLAYONLOAD:True") self._send_msg_wait_OKAY("LOAD:0") # We should be playing the first item. response = self._send_msg_wait_OKAY("STATUS") self.assertTrue(response) json_obj = json.loads(response) self.assertTrue(json_obj["playing"]) self.assertEqual(json_obj["loaded_item"]["weight"], 0) time.sleep(2) # Now we should be onto playing the second item. response = self._send_msg_wait_OKAY("STATUS") self.assertTrue(response) json_obj = json.loads(response) self.assertTrue(json_obj["playing"]) self.assertEqual(json_obj["loaded_item"]["weight"], 1) # Okay, now stop. Test if play on load causes havok with auto advance. self._send_msg_wait_OKAY("STOP") self._send_msg_wait_OKAY("AUTOADVANCE:False") self._send_msg_wait_OKAY("LOAD:0") time.sleep(3) # Now, we've not auto-advanced, but we've not loaded a new item. # Therefore, we shouldn't have played a second time. Leave repeat-one for that. response = self._send_msg_wait_OKAY("STATUS") self.assertTrue(response) json_obj = json.loads(response) self.assertFalse(json_obj["playing"]) self.assertEqual(json_obj["loaded_item"]["weight"], 0) # This test checks that the player repeats the first item without moving onto the second. def test_repeat_one(self): self._send_msg_wait_OKAY("ADD:"+ getPlanItemJSON(2,0)) # Add a second item to make sure we don't load this one when repeat one. self._send_msg_wait_OKAY("ADD:"+ getPlanItemJSON(2,1)) # TODO Test without play on load? What's the behaviour here? self._send_msg_wait_OKAY("PLAYONLOAD:True") self._send_msg_wait_OKAY("REPEAT:one") self._send_msg_wait_OKAY("LOAD:0") # Try 3 repeats to make sure. for repeat in range(3): # We should be playing the first item. response = self._send_msg_wait_OKAY("STATUS") self.assertTrue(response) json_obj = json.loads(response) self.assertTrue(json_obj["playing"]) # Check we're not playing the second item. self.assertEqual(json_obj["loaded_item"]["weight"], 0) time.sleep(2) # This test checks that the player repeats all plan items before playing the first again. def test_repeat_all(self): # Add two items to repeat all between self._send_msg_wait_OKAY("ADD:"+ getPlanItemJSON(2,0)) self._send_msg_wait_OKAY("ADD:"+ getPlanItemJSON(2,1)) # TODO Test without play on load? What's the behaviour here? self._send_msg_wait_OKAY("PLAYONLOAD:True") self._send_msg_wait_OKAY("REPEAT:all") self._send_msg_wait_OKAY("LOAD:0") time.sleep(0.2) # Try 3 repeats to make sure. for repeat in range(3): # We should be playing the first item. response = self._send_msg_wait_OKAY("STATUS") self.assertTrue(response) json_obj = json.loads(response) self.assertTrue(json_obj["playing"]) self.assertEqual(json_obj["loaded_item"]["weight"], 0) time.sleep(2.0) # We should be playing the second item. response = self._send_msg_wait_OKAY("STATUS") self.assertTrue(response) json_obj = json.loads(response) self.assertTrue(json_obj["playing"]) self.assertEqual(json_obj["loaded_item"]["weight"], 1) time.sleep(2.0) # runs the unit tests in the module if __name__ == '__main__': try: unittest.main() except SystemExit as e: if e == True: print("Tests failed :/") else: print("Tests passed!")