Aug-28-2020, 09:36 PM
Hello All!
My English can be badly, sorry in advance
OS : Windows && ArchLinux
Python version 3.7.2
Task: Need do control by program use HttpServer and don't interfere with program work
Short about code: Has run_mind():
- it get options from DB (example: run task in time, etc)
- next running HttpServer for control this program using GET
- next do infinity loop (using while), where doit any tasks using asyncio
Trable: On the running HttpServer, next executing is stopping on "await asyncio.sleep(1)" in loop "while". I think, what HttpServer - using listening port Freezing, and don't give to running any tasks
Lost sleep for 3 days. Say me please, how can task solve, how do it: Listen HttpServer (or other alternative variants, example: WebSocket, Socket, JsonServer, any..) AND doing Task in while?
import _functions import config import asyncio from concurrent.futures import ProcessPoolExecutor from multiprocessing import Process, Queue import multiprocessing, logging import nest_asyncio nest_asyncio.apply() class GlobVar: options = dict() from urllib.parse import urlparse import urllib.parse #import myurl from http.server import BaseHTTPRequestHandler, HTTPServer # python3 class HandleRequests(BaseHTTPRequestHandler): def _set_headers(self): ## self.send_response(200) ## self.send_header('Content-type', 'application/json') ## self.end_headers() pass def do_GET__old(self): self._set_headers() self.wfile.write("received get request") def do_GET(self): ## self._set_headers() ## print(self.headers) ## content_len = int(self.headers.getheader('content-length', 0)) ## post_body = ## print(post_body) print(self.path) #query = urlparse(self.path).query #parsed_query = parse_qs(query) #print(query) #print(parsed_query) self.send_response(200) self.send_header('content-type','application/json') self.end_headers() if self.path == '/': self.wfile.write(b"{ 0:false, 1:'Enter command', 'type':'error'}") else: if urllib.parse.urlparse(self.path).path == '/': # Обработка здесь уже, что там пришло q = urlparse(self.path) print(q) query = urllib.parse.urlparse(self.path).query __arr = (dict(urllib.parse.parse_qsl(query))) print(__arr) GlobVar.options[4]['__running'] = 5 # print(myurl.parse(self.path)) self.wfile.write(b"{'hello world!'}") self.wfile.write(b"{'hello world 2!'}") self.wfile.write(bytes(self.path, 'utf-8') ) else: self.wfile.write(b"{'hello...!'}") def do_POST(self): '''Reads post request body''' self._set_headers() content_len = int(self.headers.getheader('content-length', 0)) post_body = self.wfile.write("received post request:<br>{}".format(post_body)) async def new_proc_wss(arr): v = 7 if v == 1: pass elif v == 7: print('start http...') # Параллельно не работает, тормозит и пиздец host = '' port = 8002 await HTTPServer((host, port), HandleRequests).serve_forever() def sub_loop(arr): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(new_proc_wss(arr)) # v 12 (down) import asyncore import socket class EchoHandler(asyncore.dispatcher_with_send): def handle_read(self): data = self.recv(1024) if data == "close":self.close() self.send(data) class EchoServer(asyncore.dispatcher): def __init__(self, host, port): asyncore.dispatcher.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind((host, port)) self.listen(11) def handle_accept(self): pair = self.accept() if pair is not None: sock, addr = pair print ('conn', addr) handler = EchoHandler(sock) # async def run(): async def run(executor): print(' run mind()'); GlobVar.options = config.getOptions() print('options:', GlobVar.options) # Запуск автозапуска # a_dict.keys() for key in GlobVar.options: arr = GlobVar.options[key] if arr['mo_mode'] == 1: if arr['mo_str'] == 'websocket': #_websocket2.create_websocket() #print('..1') number = 5 #__name = '_websocket2' __name = '_websocket2_async' #if arr['mo_str_2'] != False: # __name = arr['mo_str_2'] # default MODE run() v = 4 v = 3 if v == 0: pass elif v == 10: # ERROR: Cannot run the event loop while another loop is running ## medicine - nest_asyncio.apply() # NEW FUCK: не доходит до next # await asyncio.get_event_loop().run_in_executor(executor, sub_loop(arr)) print('next...') elif v == 11: # MODE run() v = 5 # Стопарит дальнейшее выполнение и не доходит до finished task_4 = new_proc_wss(arr) # tasks_list = [task_1, task_2, task_3, task_4] tasks_list = [task_4] finished, unfinished = await asyncio.wait(tasks_list, loop=executor, return_when=asyncio.ALL_COMPLETED) print(finished, unfinished) for unfinished_task in unfinished: unfinished_task.cancel() print('finished:: ', finished) elif v == 12: # НЕ доходит до NEXT host = '' port = 2222 server = EchoServer(host, port) await asyncore.loop() print('next...') elif v == 13: pass elif v == 1: proc = multiprocessing.Process(target=_websocket2_async.create_websocket, name=__name, args=(number,)) #proc = Process(target=child, args=()) #print('..2') procs.append(proc) proc.daemon=True print('..3') #try: proc.start() #except OSError as err: # print(err) # m = multiprocessing.Manager() print('..4') #proc.join() print('..5, proc = ', proc, ', = ',, ', is_alive = ', proc.is_alive) elif v == 3: # Работает, но стопарит дальнейшее выполнение, на await asyncio.sleep(1) print('load wss 8000') # async future = asyncio.ensure_future(new_proc_wss(arr)) def callback_wss(fut): result = fut.result() print('finish callback_wss') #asyncio.ensure_future(new_proc(arr)).add_done_callback(callback) print(111) future.add_done_callback(callback_wss) print(222) elif v == 4: # Запускается и закрывается процесс, но это новый процесс, общаться через межпроц взаимодействие - на самый крайний случай print('v4') proc = multiprocessing.Process(target=new_proc_wss, name=__name, args=(arr,)) #proc = Process(target=child, args=()) #print('..2') #procs.append(proc) #### proc.daemon=True print('..3') #try: proc.start() print('starting') elif v == 5: # Запускает хттп и тормозит на этом, до next не доходит... await new_proc_wss(arr) print('next...') elif v == 6: # Блокирует дальнейшее выполнеие asyncio, до next не доходит... task = asyncio.create_task(new_proc_wss(arr)) print('next...') elif v == 7: # Блокирует дальнейшее выполнеие, до next не доходит... results = await asyncio.gather(new_proc_wss(arr)) print(results) print('next...') elif v == 8: # ERROR - Cannot run the event loop while another loop is running loop = asyncio.new_event_loop() # Each client connection will create a new protocol instance coro = loop.create_server(EchoServerClientProtocol, '', 8888) server = loop.run_until_complete(coro) # Serve requests until CTRL+c is pressed print('Serving on {}'.format(server.sockets[0].getsockname())) try: loop.run_forever() except KeyboardInterrupt: pass elif v == 9: # До NEXT не доходит # async def client_connected(reader, writer): # Communicate with the client with # reader/writer streams. For example: await reader.readline() srv = await asyncio.start_server( client_connected, '', '8003') await srv.serve_forever() print('next...') # Запуск цикла boom=1 while boom >0: print('iteration1...') await asyncio.sleep(1) # time.sleep(1) print('iteration2...') # Запуск всякой лабуды в указанное время def run_mind(): v = 4 if v == 1: pass elif v == 4: loop = asyncio.ProactorEventLoop() asyncio.set_event_loop(loop) loop.run_until_complete(run(loop)) loop.run_forever() elif v == 5: # executor = ProcessPoolExecutor() # executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) asyncio.get_event_loop().run_until_complete(run(executor)) if __name__ == '__main__': run_mind()