Aug-01-2018, 08:30 PM
I would like to here about anything that jumps out as not being Pythonic.
I've been updating my code to use '@classmethod' where I could, trying to think more Phythonic. I've also been experimenting with the recommendation of putting our classes in seperate files. Here's what i've settled on, two class files that are referenced by a main project file.
My idea is that I can create an instance of the Autopilot class for every space craft, and these would be able to created their own taskt.
task_list.py
Note that this class imports the same class used in the main project file.
I've been updating my code to use '@classmethod' where I could, trying to think more Phythonic. I've also been experimenting with the recommendation of putting our classes in seperate files. Here's what i've settled on, two class files that are referenced by a main project file.
My idea is that I can create an instance of the Autopilot class for every space craft, and these would be able to created their own taskt.
task_list.py
""" source file: task_list.py """ from clsTask import * from clsAutopilot import * enterprise_ap =Autopilot() uss_enterprise_tasks =Task() # Add tasks to task queue. uss_enterprise_tasks.add(enterprise_ap.turn_craft_oaf(90, 10), STATUS.INIT) uss_enterprise_tasks.add(enterprise_ap.stationary_position()) uss_enterprise_tasks.add(enterprise_ap.turn_craft_oaf(110, 270)) # _. Task.print_queue_info(uss_enterprise_tasks) while not Task.empty_task_queues(): Task.process_task_list()clsAutopilot.py
Note that this class imports the same class used in the main project file.
""" source file: clsAutopilot.py """ from clsTask import * class Autopilot: parameters ={} variables ={} def turn_craft_oaf(self, target_angle, current_angle): 'Function - generates a task (definition) and registers it in the queue.' # Prepare parameter(s) for associated # class routine. obj =Task.definition() obj.name ="turn_craft_oaf" obj.linked_routine = self._turn_craft_oaf # Store parameter information in this # class. par ={} par["target_angle"] =target_angle par["current_angle"] =current_angle self.parameters["_turn_craft_oaf"] =par # _. return obj # turn_craft_oaf() ------------------------------------------------------ @staticmethod def _turn_craft_oaf(obj): 'Function - designed to be called from a taks queue.' print(" - running routine 'turn_craft_oaf'.") # Get a dictionary of all the parameters # for this routine. par =Autopilot.parameters["_turn_craft_oaf"] # _. # Turn the craft in the opposite angle of flight. target_angle =par["target_angle"] current_angle =par["current_angle"] obj.status =STATUS.COMPLETED # _turn_craft_oaf() ------------------------------------------------------ def stationary_position(self): 'Function - generates a task (definition) and registers it in the queue.' obj =Task.definition() obj.name ="stationary_position" obj.linked_routine =self._stationary_position # Prepare variables for the tasked linked routine. var ={} var["center"] =0 self.variables["_stationary_position"] =var return obj # stationary_position() ------------------------------------------------- @staticmethod def _stationary_position(obj): 'Function - designed to be called from a taks queue.' print(" - running routine 'stationary_position'.") # Get a dictionary of all the variables # for this routine. var =Autopilot.variables["_stationary_position"] # _. if obj.status == STATUS.INIT: # Setup some variables. var["center"] = 0 else: var["center"] += 1 if var["center"] >=2: obj.status = STATUS.COMPLETED # stationary_position() ------------------------------------------------- # Autopilot -----------------------------------------------------------------
""" source file: clsTask.py """ from enum import Enum STATUS = Enum("STATUS", "WAITING, INIT, RUNNING, COMPLETED") class Task: 'Common base class for managing task definitions in a queue.' # optional class documentation string. #_queue =[] _empty_task_queues =True _task_list =[] # All task objects are stored here. _running =0 # Number of tasks running in the queue. class _MemSpace: # Container for the linked routine and vars. pass # _MemSpace ------------------------------------------------------------- def __init__(self): self._queue =[] # All task definitions are stored here. self._add_task_list(self) # Adds the new task object to the list. # __init__() ------------------------------------------------------------ @classmethod def _add_task_list(cls, task_list): 'Class method - registers a new Task object with the class.' cls._task_list.append(task_list) # _add_task_list() ------------------------------------------------------- @classmethod def _reset_task_list_status(cls): 'Class method - resets the flag set when all queues are empty.' cls._empty_task_queues =False # _reset_task_list_status() --------------------------------------------- def add(self, task_definition, status=STATUS.WAITING): 'Function - adds a task definition to the queue.' task_definition.status =status self._queue.append(task_definition) self._reset_task_list_status() # add() ----------------------------------------------------------------- @staticmethod def definition(): 'Function - returns a new task definition object.' obj =Task._MemSpace() obj.status =STATUS.WAITING return obj # definition() ---------------------------------------------------------- def empty(self): 'Function to check if the queue is empty.' return len(self._queue) == 0 # empty() --------------------------------------------------------------- @classmethod def empty_task_queues(cls): 'Function - checks if all the queues are empty.' return cls._empty_task_queues # empty_task_queues() ---------------------------------------------------- @staticmethod def print_queue_info(task): ' Function - prints the name and status of items in a queue.' print(" | ------------------------------------------------- |") print(" | Task Queue Information |") print(" | ------------------------------------------------- |") for q in task._queue: print(" | ", q.name, " | ", q.status) print(" | ------------------------------------------------- |") print() # print_queue_info() -------------------------------------------------------- def process_task(self): 'Function - runs the routine described in the task definition.' if self.status in (STATUS.INIT, STATUS.RUNNING): self.task_definition.linked_routine(self) # process_task() --------------------------------------------------------- @classmethod def process_task_list(cls): 'Class method - processes the queues in all the tasks.' # process all the task processes in the list. task_queues_empty =True for tl in cls._task_list: cls.process_task_queue(tl) if not tl.empty(): # There's at least one task # remaining in a queue. task_queues_empty =False cls._empty_task_queues =task_queues_empty # process_task_list() --------------------------------------------------- def process_task_queue(self): 'Function - run task definitions listed in a queue.' print("\n\nStarting loop (task queue count:[{}])".format(len(self._queue))) self._running =0 new_queue = [] for q in self._queue: if q.status in (STATUS.INIT, STATUS.RUNNING): q.linked_routine(q) if q.status ==STATUS.INIT: # Only permit an active task to run # once as with the status of [STATUS.INIT]. q.status =STATUS.RUNNING print("[process_task_queue] Upgraded status to 'Running'.") if q.status in (STATUS.INIT, STATUS.RUNNING): # The task is still running. self._running += 1 #print_queue_info(self) # Debug information. Task.print_queue_info(self) # Debug information. if q.status != STATUS.COMPLETED: new_queue.append(q) self._queue = new_queue if self._running == 0: if len(self._queue) > 0: # Activate the next available task. for t in self._queue: if t.status ==STATUS.WAITING: t.status =STATUS.INIT print("Forced another task into action.") break # process_task_queue() ----------------------------------------------------- @classmethod def running(cls): 'Class method - returns the number of active tasks in a queue.' return cls._running # running() ------------------------------------------------------------- # Task ----------------------------------------------------------------------