Python Forum
asyncio: executing tasks + httpserver
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
asyncio: executing tasks + httpserver
#1
Hello All!

My English can be badly, sorry in advance

OS : Windows && ArchLinux
Python version 3.7.2

Task: Need do control by program use HttpServer and don't interfere with program work
Short about code: Has run_mind():
- it get options from DB (example: run task in time, etc)
- next running HttpServer for control this program using GET
- next do infinity loop (using while), where doit any tasks using asyncio


Trable: On the running HttpServer, next executing is stopping on "await asyncio.sleep(1)" in loop "while". I think, what HttpServer - using listening port Freezing, and don't give to running any tasks

Lost sleep for 3 days. Say me please, how can task solve, how do it: Listen HttpServer (or other alternative variants, example: WebSocket, Socket, JsonServer, any..) AND doing Task in while?

Thx!!!




Source:
import _functions
import config
import asyncio
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Process, Queue
import multiprocessing, logging

import nest_asyncio
nest_asyncio.apply()

class GlobVar:
    options = dict()

from urllib.parse import urlparse
import urllib.parse
#import myurl
from http.server import BaseHTTPRequestHandler, HTTPServer # python3
class HandleRequests(BaseHTTPRequestHandler):
    def _set_headers(self):
##        self.send_response(200)
##        self.send_header('Content-type', 'application/json')
##        self.end_headers()
        pass

    def do_GET__old(self):
        self._set_headers()
        self.wfile.write("received get request")
       
    def do_GET(self):
##         self._set_headers()
##         print(self.headers)
##        content_len = int(self.headers.getheader('content-length', 0))
##        post_body = self.rfile.read(content_len)
##        print(post_body)

         print(self.path)

         #query = urlparse(self.path).query
         #parsed_query = parse_qs(query)
           
         #print(query)
         #print(parsed_query)

         self.send_response(200)
         self.send_header('content-type','application/json')
         self.end_headers()

         if self.path == '/':
            self.wfile.write(b"{ 0:false, 1:'Enter command', 'type':'error'}")

         else:

            if urllib.parse.urlparse(self.path).path == '/':

               # Обработка здесь уже, что там пришло



               q = urlparse(self.path)

               print(q)

             

               query = urllib.parse.urlparse(self.path).query

               __arr = (dict(urllib.parse.parse_qsl(query)))



               print(__arr)



               GlobVar.options[4]['__running'] = 5



               # print(myurl.parse(self.path))



               self.wfile.write(b"{'hello world!'}")

               self.wfile.write(b"{'hello world 2!'}")

               self.wfile.write(bytes(self.path, 'utf-8') )

            else:

               self.wfile.write(b"{'hello...!'}")

           

    def do_POST(self):

        '''Reads post request body'''

        self._set_headers()

        content_len = int(self.headers.getheader('content-length', 0))

        post_body = self.rfile.read(content_len)

        self.wfile.write("received post request:<br>{}".format(post_body))





async def new_proc_wss(arr):

    v = 7

    if v == 1:

       pass

    elif v == 7:

      print('start http...')

      # Параллельно не работает, тормозит и пиздец

      host = ''

      port = 8002

      await HTTPServer((host, port), HandleRequests).serve_forever()



def sub_loop(arr):

    loop = asyncio.new_event_loop()

    asyncio.set_event_loop(loop)

    loop.run_until_complete(new_proc_wss(arr))



# v 12 (down)

import asyncore

import socket

class EchoHandler(asyncore.dispatcher_with_send):

    def handle_read(self):

        data = self.recv(1024)

        if data == "close":self.close()

        self.send(data)



class EchoServer(asyncore.dispatcher):

     def __init__(self, host, port):

        asyncore.dispatcher.__init__(self)

        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)

        self.set_reuse_addr()

        self.bind((host, port))

        self.listen(11)

   

    def handle_accept(self):

        pair = self.accept()

        if pair is not None:

            sock, addr = pair

            print ('conn', addr)

            handler = EchoHandler(sock)


# async def run():

async def run(executor):
    print(' run mind()');
    GlobVar.options = config.getOptions()
    print('options:', GlobVar.options)
    # Запуск автозапуска
    # a_dict.keys()

    for key in GlobVar.options:
        arr = GlobVar.options[key]
        if arr['mo_mode'] == 1:

            if arr['mo_str'] == 'websocket':

                #_websocket2.create_websocket()

                #print('..1')

                number = 5

                #__name = '_websocket2'

                __name = '_websocket2_async'

                #if arr['mo_str_2'] != False:

                #   __name = arr['mo_str_2']

                # default MODE run() v = 4

                  

                v = 3

                if v == 0:

                   pass

                elif v == 10:

                   # ERROR: Cannot run the event loop while another loop is running

                   ## medicine - nest_asyncio.apply()

                   # NEW FUCK: не доходит до next

                   #

                   await asyncio.get_event_loop().run_in_executor(executor, sub_loop(arr))

                   print('next...')

                elif v == 11:

                   # MODE run() v = 5

                   # Стопарит дальнейшее выполнение и не доходит до finished

                   task_4 = new_proc_wss(arr)

                   # tasks_list = [task_1, task_2, task_3, task_4]

                   tasks_list = [task_4]

                   finished, unfinished = await asyncio.wait(tasks_list, loop=executor, return_when=asyncio.ALL_COMPLETED)

                   print(finished, unfinished)

                   for unfinished_task in unfinished:

                       unfinished_task.cancel()

                   print('finished:: ', finished)

                elif v == 12:

                   # НЕ доходит до NEXT

                   host = ''

                   port = 2222



                   server = EchoServer(host, port)

                   await asyncore.loop()

                   print('next...')



                elif v == 13:

                   pass



                elif v == 1:

                   proc = multiprocessing.Process(target=_websocket2_async.create_websocket, name=__name, args=(number,))

                   #proc = Process(target=child, args=())

                   #print('..2')

                   procs.append(proc)

                   proc.daemon=True

                   print('..3')

                   #try:

                   proc.start()

                   #except OSError as err:

                   #    print(err)

                   # m = multiprocessing.Manager()

                  

                   print('..4')

                   #proc.join()

                   print('..5, proc = ', proc, ', p.pid = ', proc.pid, ', is_alive = ', proc.is_alive)

                  

                elif v == 3:

                   # Работает, но стопарит дальнейшее выполнение, на await asyncio.sleep(1)

                  

                   print('load wss 8000')

                   # async

                   future = asyncio.ensure_future(new_proc_wss(arr))



                   def callback_wss(fut):

                       result = fut.result()

                       print('finish callback_wss')

                      

                   #asyncio.ensure_future(new_proc(arr)).add_done_callback(callback)

                   print(111)

                   future.add_done_callback(callback_wss)

                   print(222)



                elif v == 4:

                   # Запускается и закрывается процесс, но это новый процесс, общаться через межпроц взаимодействие - на самый крайний случай

                   print('v4')

                   proc = multiprocessing.Process(target=new_proc_wss, name=__name, args=(arr,))

                   #proc = Process(target=child, args=())

                   #print('..2')

                   #procs.append(proc)

                   #### proc.daemon=True

                   print('..3')

                   #try:

                   proc.start()

                   print('starting')



                elif v == 5:

                   # Запускает хттп и тормозит на этом, до next не доходит...

                   await new_proc_wss(arr)

                   print('next...')



                elif v == 6:

                   # Блокирует дальнейшее выполнеие asyncio, до next не доходит...

                   task = asyncio.create_task(new_proc_wss(arr))

                   print('next...')



                elif v == 7:

                   # Блокирует дальнейшее выполнеие, до next не доходит...

                   results = await asyncio.gather(new_proc_wss(arr))

                   print(results)

                   print('next...')



                elif v == 8:

                  # ERROR - Cannot run the event loop while another loop is running

                  loop = asyncio.new_event_loop()

                  # Each client connection will create a new protocol instance

                  coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)

                  server = loop.run_until_complete(coro)

                  # Serve requests until CTRL+c is pressed

                  print('Serving on {}'.format(server.sockets[0].getsockname()))

                  try:

                      loop.run_forever()

                  except KeyboardInterrupt:

                      pass

                elif v == 9:

                   # До NEXT не доходит

                   # https://docs.python.org/3.7/library/asyncio-eventloop.html#asyncio.Server.serve_forever

                   async def client_connected(reader, writer):

                      # Communicate with the client with

                      # reader/writer streams.  For example:

                      await reader.readline()

    

                   srv = await asyncio.start_server(

                       client_connected, '127.0.0.1', '8003')

                   await srv.serve_forever()

                   print('next...')



    # Запуск цикла

    boom=1

    while boom >0:

        print('iteration1...')

        

        await asyncio.sleep(1)

        # time.sleep(1)

        

        print('iteration2...')



        # Запуск всякой лабуды в указанное время



def run_mind():
   v = 4
   if v == 1:
       pass
   elif v == 4:
      loop = asyncio.ProactorEventLoop()
      asyncio.set_event_loop(loop)
      loop.run_until_complete(run(loop))
      loop.run_forever()
   elif v == 5:
      # https://stackoverflow.com/questions/38193596/asyncio-multiprocessing-unix
      executor = ProcessPoolExecutor()
      # executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
      asyncio.get_event_loop().run_until_complete(run(executor))

if __name__ == '__main__':
   run_mind()
Reply
#2
Is there a reason you're using more than one event loop at once?
Reply
#3
(Aug-29-2020, 09:12 PM)nilamo Wrote: Is there a reason you're using more than one event loop at once?

Hello! Yep, has reason. But task is resolved with help Thread:

Maybe this help for anyone:

def _create_httpserver(p):
    host = ''
    port = p
    HTTPServer((host, port), HandleRequests).serve_forever()

...........

t = threading.Thread(target=_create_httpserver, args=[8080)
t.start()
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  drawing a table with the status of tasks in each thread pyfoo 3 356 Mar-01-2024, 09:29 AM
Last Post: nerdyaks
  How to script repetitive tasks in dynaform using python BenneGrus 0 1,306 Dec-22-2021, 08:36 AM
Last Post: BenneGrus
  How to add asynchronous tasks as they are needed? AlekseyPython 2 3,938 Jan-11-2019, 02:58 AM
Last Post: AlekseyPython
  How I can limit quantity of parallel executable tasks in asyncio? AlekseyPython 1 2,396 Oct-24-2018, 10:22 AM
Last Post: AlekseyPython
  BaseHTTPServer.HTTPServer pick-a-port? degenaro 1 2,472 Jul-05-2018, 08:36 PM
Last Post: gontajones
  run two tasks concurrently tony1812 1 2,582 Jul-24-2017, 05:43 PM
Last Post: Larz60+
  Tasks for Python Lamon112 2 33,562 Jan-13-2017, 03:32 AM
Last Post: metulburr

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020