Python Forum
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Design Strategy
#10
I would like to here about anything that jumps out as not being Pythonic.

I've been updating my code to use '@classmethod' where I could, trying to think more Phythonic. I've also been experimenting with the recommendation of putting our classes in seperate files. Here's what i've settled on, two class files that are referenced by a main project file.

My idea is that I can create an instance of the Autopilot class for every space craft, and these would be able to created their own taskt.

task_list.py
"""
source file: task_list.py

"""

from clsTask import *
from clsAutopilot import *
   


enterprise_ap =Autopilot()
uss_enterprise_tasks =Task()

#   Add tasks to task queue.
uss_enterprise_tasks.add(enterprise_ap.turn_craft_oaf(90, 10), STATUS.INIT)
uss_enterprise_tasks.add(enterprise_ap.stationary_position())
uss_enterprise_tasks.add(enterprise_ap.turn_craft_oaf(110, 270))
#   _.


Task.print_queue_info(uss_enterprise_tasks)

while not Task.empty_task_queues():
    Task.process_task_list()
clsAutopilot.py
Note that this class imports the same class used in the main project file.

"""
source file: clsAutopilot.py

"""

from clsTask import *

class Autopilot:
    parameters ={}
    variables ={}
    
    def turn_craft_oaf(self, target_angle, current_angle):
        'Function - generates a task (definition) and registers it in the queue.'
        #   Prepare parameter(s) for associated
        # class routine.
        obj =Task.definition()
        obj.name ="turn_craft_oaf"
        obj.linked_routine = self._turn_craft_oaf
        
        #   Store parameter information in this
        # class.
        par ={}
        par["target_angle"] =target_angle
        par["current_angle"] =current_angle
        self.parameters["_turn_craft_oaf"] =par
        #   _.
        
        return obj
    # turn_craft_oaf() ------------------------------------------------------
        
        
    @staticmethod
    def _turn_craft_oaf(obj):
        'Function - designed to be called from a taks queue.'
        print("   - running routine 'turn_craft_oaf'.")
        
        #   Get a dictionary of all the parameters
        # for this routine.
        par =Autopilot.parameters["_turn_craft_oaf"]
        #   _.

        #   Turn the craft in the opposite angle of flight.
        target_angle =par["target_angle"]
        current_angle =par["current_angle"]
        
        obj.status =STATUS.COMPLETED
    # _turn_craft_oaf() ------------------------------------------------------


    def stationary_position(self):
        'Function - generates a task (definition) and registers it in the queue.'
        obj =Task.definition()
        obj.name ="stationary_position"
        obj.linked_routine =self._stationary_position

        #   Prepare variables for the tasked linked routine.
        var ={}
        var["center"] =0
        self.variables["_stationary_position"] =var
        return obj
    # stationary_position() -------------------------------------------------

    
    @staticmethod
    def _stationary_position(obj):
        'Function - designed to be called from a taks queue.'
        print("   - running routine 'stationary_position'.")

        #   Get a dictionary of all the variables
        # for this routine.
        var =Autopilot.variables["_stationary_position"]
        #   _.

        if obj.status == STATUS.INIT:
            #   Setup some variables.
            var["center"] = 0
        else:
            var["center"] += 1
            if var["center"] >=2:
                obj.status = STATUS.COMPLETED
                
    # stationary_position() -------------------------------------------------
    
# Autopilot -----------------------------------------------------------------
"""
source file: clsTask.py

"""

from enum import Enum

STATUS = Enum("STATUS", "WAITING, INIT, RUNNING, COMPLETED")

class Task:
    'Common base class for managing task definitions in a queue.' # optional class documentation string.
    #_queue =[]
    _empty_task_queues =True    
    _task_list =[]              # All task objects are stored here.
    _running =0                  # Number of tasks running in the queue.

    class _MemSpace: # Container for the linked routine and vars.
        pass
    # _MemSpace -------------------------------------------------------------

    
    def __init__(self):
        self._queue =[]             # All task definitions are stored here.
        self._add_task_list(self)   # Adds the new task object to the list.
    # __init__() ------------------------------------------------------------
    

    @classmethod
    def _add_task_list(cls, task_list):
        'Class method - registers a new Task object with the class.'
        cls._task_list.append(task_list)
    # _add_task_list() -------------------------------------------------------
    
    
    @classmethod
    def _reset_task_list_status(cls):
        'Class method - resets the flag set when all queues are empty.'
        cls._empty_task_queues =False
    # _reset_task_list_status() ---------------------------------------------
    

    def add(self, task_definition, status=STATUS.WAITING):
        'Function - adds a task definition to the queue.'
        task_definition.status =status
        self._queue.append(task_definition)
        self._reset_task_list_status()
    # add() -----------------------------------------------------------------


    @staticmethod
    def definition():
        'Function - returns a new task definition object.'
        obj =Task._MemSpace()
        obj.status =STATUS.WAITING
        return obj
    # definition() ----------------------------------------------------------

    
    def empty(self):
        'Function to check if the queue is empty.'
        return len(self._queue) == 0
    # empty() ---------------------------------------------------------------


    @classmethod
    def empty_task_queues(cls):
        'Function - checks if all the queues are empty.'
        return cls._empty_task_queues
    # empty_task_queues() ----------------------------------------------------


    @staticmethod
    def print_queue_info(task):
        ' Function - prints the name and status of items in a queue.'
        print("      | ------------------------------------------------- |")
        print("      |               Task Queue Information              |")
        print("      | ------------------------------------------------- |")
              
        for q in task._queue:
            print("      | ", q.name, " | ", q.status)
            
        print("      | ------------------------------------------------- |")
        print()
    # print_queue_info() --------------------------------------------------------
    
        
    def process_task(self):
        'Function - runs the routine described in the task definition.'
        if self.status in (STATUS.INIT, STATUS.RUNNING):
            self.task_definition.linked_routine(self)
    # process_task() ---------------------------------------------------------


    @classmethod
    def process_task_list(cls):
        'Class method - processes the queues in all the tasks.'
        # process all the task processes in the list.
        task_queues_empty =True
        for tl in cls._task_list:
            cls.process_task_queue(tl)
            if not tl.empty():
                #   There's at least one task
                # remaining in a queue.
                task_queues_empty =False
                
        cls._empty_task_queues =task_queues_empty
    # process_task_list() ---------------------------------------------------
    
    
    def process_task_queue(self):
        'Function - run task definitions listed in a queue.'
        print("\n\nStarting loop (task queue count:[{}])".format(len(self._queue)))

        self._running =0
        new_queue = []
        for q in self._queue:
            if q.status in (STATUS.INIT, STATUS.RUNNING):
                q.linked_routine(q)
                if q.status ==STATUS.INIT:
                    #   Only permit an active task to run
                    # once as with the status of [STATUS.INIT].
                    q.status =STATUS.RUNNING
                    print("[process_task_queue] Upgraded status to 'Running'.")
                    
                if q.status in (STATUS.INIT, STATUS.RUNNING):
                    #   The task is still running.
                    self._running += 1
                    
                #print_queue_info(self)  # Debug information.
                Task.print_queue_info(self)  # Debug information.
            
            if q.status != STATUS.COMPLETED:
                new_queue.append(q)
            
        self._queue = new_queue
        if self._running == 0:
            if len(self._queue) > 0: # Activate the next available task.
                for t in self._queue:
                    if t.status ==STATUS.WAITING:
                        t.status =STATUS.INIT
                        print("Forced another task into action.")
                        break
    # process_task_queue() -----------------------------------------------------


    @classmethod
    def running(cls):
        'Class method - returns the number of active tasks in a queue.'
        return cls._running
    # running() -------------------------------------------------------------
    
# Task ----------------------------------------------------------------------
Reply


Messages In This Thread
Design Strategy - by microphone_head - Jul-23-2018, 12:57 AM
RE: Design Strategy - by Windspar - Jul-24-2018, 12:42 PM
RE: Design Strategy - by microphone_head - Jul-24-2018, 09:24 PM
RE: Design Strategy - by Windspar - Jul-24-2018, 10:15 PM
RE: Design Strategy - by microphone_head - Jul-27-2018, 09:16 PM
RE: Design Strategy - by Windspar - Jul-28-2018, 02:19 PM
RE: Design Strategy - by microphone_head - Jul-28-2018, 09:19 PM
RE: Design Strategy - by Windspar - Jul-28-2018, 10:15 PM
RE: Design Strategy - by microphone_head - Jul-28-2018, 10:33 PM
RE: Design Strategy - by microphone_head - Aug-01-2018, 08:30 PM

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020