![]() |
pipeline.py - Printable Version +- Python Forum (https://python-forum.io) +-- Forum: General (https://python-forum.io/forum-1.html) +--- Forum: Code sharing (https://python-forum.io/forum-5.html) +--- Thread: pipeline.py (/thread-1797.html) Pages:
1
2
|
pipeline.py - Skaperen - Jan-26-2017 i have expressed the goal of converting all of my bash scripts to python ![]() ![]() ![]() ![]() but my goal was to eliminate the use of shells to avoid their risks and simplify building safe command structures ![]() ![]() pipeline.py: #!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import absolute_import, division, print_function, unicode_literals """Run a pipeline of commands. file pipeline.py purpose run a pipeline of commands email 10054452614123394844460370234029112340408691 The intent is that this command works correctly under both Python 2 and Python 3. Please report failures or code improvement to the author. """ __license__ = """ Copyright (C) 2017, by Phil D. Howard - all other rights reserved Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies. THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA, OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE, OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. The author may be contacted by decoding the number 10054452614123394844460370234029112340408691 (provu igi la numeron al duuma) """ import multiprocessing,os,sys proc_list = None def child( fdi, fdo, cmd ): """Run a given command in a child process from a list of arguments.""" savefd_stderr = os.dup(2) if fdi < 0 or fdo < 0: return -1 fds = [] while True: fd = os.dup(fdo) if fd < 0: raise IOError('invalid fd from dup({})'.format(repr(fdo))) if fd > 2: break fds.append( fd ) ndo, ndi = fd, os.dup(fdi) if ndi < 0: raise IOError('invalid fd from dup({})'.format(repr(fdo))) if ndi < 3: raise ValueError('fd in range 0..2 after they should all be assigned') os.close(fdo) os.close(fdi) os.dup2(ndi,0) os.dup2(ndo,1) os.dup2(ndo,2) os.close(ndo) os.close(ndi) os.execvp(cmd[0],cmd) raise OSError('os.execvp({},{}) failed').format( repr(cmd[0]),repr(cmd),file=sys.stderr) def start( list_of_cmds, stdin=-1, stdout=-1 ): """Set up multi-command pipeline from list of commands. function start purpose Start a pipeline of commands. syntax pipeline.start( list_of_commands, stdin=, stdout= ) argument 1 only (list of lists of strings) options fdr= fd that pipeline (first command) will read from fdw= fd that pipeline (last command) will write to returns tuple of 2 fds for read end and write end of pipes created for pipeline if no fds given as options. If no output is provided as option stdout= then a one-way pipe object will be created with the last command of the pipeline being given the write side of this object and the read side being returned in the return tuple index 0 for the caller to read from. If no input is provided as option stdin= then a one-way pipe object will be created with the first command of the pipeline being given the read side of this object and the write side being returned in the return tuple index 1 for the caller to write to. Only one pipeline instance can be created by this implementation. A future implementation will support multiple instances allow muliple pipelines to be running concurrently. There is no coded limit on the number of commands in the pipeline. Call pipeline.close() when the pipeline is no longer needed.""" global proc_list if len( list_of_cmds ) < 1: return (-1,-1) rfdw = rfdr = -1 if stdout in (sys.stdout.fileno(),sys.stdout): sys.stdout.flush() if stdout in (sys.stderr.fileno(),sys.stderr): sys.stderr.flush() fdr = stdin if isinstance( stdin, int ) else stdin.fileno() fdw = stdout if isinstance( stdout, int ) else stdout.fileno() proc_list = [] fdi = fdr for cmd in list_of_cmds[:-1]: if fdi < 0: pipe = os.pipe() fdi, rfdw = pipe pipe = os.pipe() fdo = pipe[1] proc = multiprocessing.Process( target=child, args=(fdi,fdo,cmd) ) proc_list.append( proc ) proc.start() os.close(fdi) os.close(fdo) fdi = pipe[0] cmd = list_of_cmds[-1] if fdw < 0: pipe = os.pipe() rfdr, fdo = pipe else: fdo = fdw proc = multiprocessing.Process( target=child, args=(fdi,fdo,cmd) ) proc_list.append( proc ) proc.start() if fdi < 0: os.close(fdi) if fdw < 0: os.close(fdo) ri = None if rfdr < 0 else os.fdopen(rfdr,'r') ro = None if rfdw < 0 else os.fdopen(rfdw,'w') return ( ri, ro ) def close( timeout=360 ): """Wait for the pipeline (all the commands) to end. function close purpose Wait for the pipeline (all the commands) to end syntax close( timeout= ) arguments -none- options timeout= number of seconds to wait for commands to end (default is 360) """ global proc_list for proc in proc_list: proc.join( int( timeout ) ) proc_list = None return None def run( commands, stdin=-1, stdout=-1, timeout=360 ): """Start and close in one function call""" start( commands, stdin=stdin, stdout=stdout ) close( timeout=timeout) def test_pipeline( args ): """Test the pipeline code (not implemented).""" pid='['+str(os.getpid())+']' print( pid, args[0]+':', 'not implemented' ) return None def help_pipeline(): """Show pipeline library help docstrings.""" import pipeline return help( pipeline ) def main( args ): """Run multiple command lines under separate instances of bash as a pipiline or if no commands are given disply help or if one command is 'test' run a test function main purpose Run multiple command lines under separate instances of bash as a pipiline or if no commands are given disply help or if one command is 'test' run a test command pipeline "command" ... pipeline help pipeline test pipeline function start( command_list, fdr=, fdw= ) arguments command_list: a list of commands (lists) command: a list of arguments (strs) stdin= file or descriptor to read from stdout= file or descriptor to write to """ if len(args) < 1: return help_pipeline() if len(args) > 1: subcmd = args[1] if subcmd == 'help': return help_pipeline() if subcmd == 'test': try: return test_pipeline( args ) except KeyboardInterrupt: print( '\nOUCH!', file = sys.stderr ) return 150 run( [['bash','-c',c] for c in args[1:]], fdr=0, fdw=1 ) return 0 if __name__ == '__main__': try: result = main( sys.argv ) sys.stdout.flush() except KeyboardInterrupt: result = 141 print( '' ) except IOError: result = 142 try: exit( int( result ) ) except ValueError: print( str( result ), file=sys.stderr ) exit( 1 ) except TypeError: if result == None: exit( 0 ) exit( 255 ) # EOF02b4aeec7be82137cb9a142451981e9b 85d863fd60722faf546a9dc3d6696f63182d3028b32edc7aab7518a72b1202de it seems to also work in python2.7 ![]() here is a small example script: lspy.py: #!/usr/bin/env python3 import pipeline,sys pipeline.run( [ ['ls','-l'], ['grep','\\.py$'], ['tail'] ], stdout=sys.stdout ) RE: pipeline.py - wavic - Jan-26-2017 Hm! As I see, the command in the example lists the files in a directory ( why -l option when you need only the names of the file ), checks for an extension and prints the last few results. Why so much code ( I have to look at it :) but no time now) when you ca do all this in pure Python with few lines of code? RE: pipeline.py - Skaperen - Jan-27-2017 (Jan-26-2017, 11:50 AM)wavic Wrote: Hm! if my example of calling pipeline.run() is a bad one, i can replace it with another one. if this can be done with less code, is that better? if so, do you want to do it? RE: pipeline.py - nilamo - Feb-03-2017 Ok, now write it without globals :p RE: pipeline.py - Skaperen - Feb-04-2017 globals makes a reliable way to leave very recently (updated in the fork context) updated data for processes. RE: pipeline.py - wavic - Feb-04-2017 Well, I was meaning to replace the user bash scripts/pipes completely with python code. What have you done looks interesting, though. I have to see it closely. RE: pipeline.py - Skaperen - Feb-05-2017 i ran into issues with the "pipes" created by the multiprocessing module and switched to module os pipes. that increased the code with more details, but solved bugs i was running into such as EOF needing an extra push (send more data) to make it through. since i have much experience in C with these pipes, and the data was all byte streams (the original shell pipelines used them) i was comfortable using os pipes. i expect to use them in the future for other projects. my code is larger, in part, because i can anticipate more odd conditions to check for. python is great at catching things higher up, and this helps ... i just need to see this more and more and learn how to deal with it well with good debugging details. imagine me trying to do this in C (if you know enough C to do it, too). i had been planning to do this for a while, but also realizing that C was not a suitable language for most of the things did in bash. python is (suitable). another part of why my code is larger is the common wrapper i start open source projects with, including the license text and the main() code (will usually be a command to use the module like in getmyip.py or test it like in pipline.py) RE: pipeline.py - wavic - Feb-05-2017 I am not a programmer. I tried C and C++ years ago but for a short time. Don't remember almost anything. I have a contact with a guy with real skills and he played with OpenCV for a while. Face recognition, object tracking... He's used Python and shared that if he had to do it in C, he'd grow old. RE: pipeline.py - Skaperen - Feb-24-2017 i re-wrote the pipeline module as a class so it can be instantiated and thus you can have two or more pipelines. i am still working on a deadlock bug in it. i know how to fix it by doing os.fork() for everything and not use the multiprocessing module at all. but i am thinking out a more pythonic way to fix it and trying to come up with something. ideas are welcome. enjoy.#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import division from __future__ import print_function from __future__ import unicode_literals """ Class to make a pipeline of commands in separate processes. file pipeline.py class pipeline purpose pipeline of commands in separate processes. email 10054452614123394844460370234029112340408691 methods cmd - given a list of argument, append this command. - given a list of commands, replace all commands. finish - finish a pipeline, wait for all commands to be done. io - given one or more i/o redirection options, set them. start - start the commands that are setup. stderr - set stderr for all commands in the pipeline. stdin - set stdin for the first command in the pipeline. stdout - set stdout for the last command in the pipeline. --for each of the 3 stdin/stdout/stderr you can set: file = that python file wiile used. int = that file descriptot will be used. str = that file name will be opened and used. None = the pipeline inherits the callaer's stardard i/o. False = the i/o redirect is made to "/dev/null". True = the i/o redirect is made to a pipe whch is returned to the caller of start(). The intent is that this command works correctly under both Python 2 and Python 3. Please report failures or code improvement to the author. """ __license__ = """ Copyright (C) 2017, by Phil D. Howard - all other rights reserved Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies. THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA, OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE, OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. The author may be contacted by decoding the number 10054452614123394844460370234029112340408691 (provu igi la numeron al duuma) """ import copy import multiprocessing import os import six import sys def dpr(*args,**opts): """Print for debugging. function dpr purpose print for debugging. note if environment variable 'nodebug' is set then no printing will happen. """ if 'nodebug' in os.environ: return opts['file'] = sys.stderr return print(*args,**opts) def isstr(s): """Check if argument is basically a string. function isstr purpose check if argument is basically a string. argument 1 (str) string to be checked """ return True if isinstance(s,six.string_types) else False class pipeline: """Run pipelines of executable commands""" def __init__(self,*args,**opts): """Initialize the class. method __init__ purpose initialize the class. arguments (passed to method cmd()) options (passed to method opts()) """ # initialize defaults for instantiation self.proc_list = [] self.cmd_list = [] self.devnull_in = os.devnull self.devnull_out = os.devnull self.devnull_err = os.devnull self.opt_stdin = None self.opt_stdout = None self.opt_stderr = None self.opt_timeout = 60 # initialize further from arguments and options at instantiation if len(args) > 0: self.cmd(*args) if len(opts) > 0: self.io(**opts) return def __proc__(self,fds,cmd): """Run as a child process to execvp() a command. method __proc__ purpose run as a child process to execvp() a command. """ stdin, stdout, stderr = fds if fds != (0,1,2): # if we need to re-arrange the file descriptors if stderr < 2: new_stderr = 0 while new_stderr < 2: # get new fd above 2 new_stderr = os.dup( stderr ) stderr = new_stderr if stdout < 2: new_stdout = 0 while new_stdout < 2: # get new fd above 2 new_stdout = os.dup( stdout ) stdout = new_stdout if stdin < 2: new_stdin = 0 while new_stdin < 2: # get new fd above 2 new_stdin = os.dup( stdin ) stdin = new_stdin os.dup2( stdin, 0 ) os.dup2( stdout, 1 ) os.dup2( stderr, 2 ) os.close( stdin ) os.close( stdout ) os.close( stderr ) os.execvp( cmd[0], cmd ) print('os.execvp({},{}) failed').format( repr(cmd[0]),repr(cmd),file=sys.stderr) print('os.execvp({},{}) failed').format( repr(cmd[0]),repr(cmd),file=sys.stdout) raise OSError('os.execvp({},{}) failed').format( repr(cmd[0]),repr(cmd)) return False def finish(self,timeout=360): """Finish a pipelines instance by joining all commands. method finish purpose finish a pipelines instance by joining all commands. return None """ for proc in self.proc_list: proc.join( int( timeout ) ) self.proc_list = [] return def cmd(self,*args): """add or replace commands for a pipeline. method cmd arguments 1 (list of list of str) list of commands to replace 1 (list of str) one command to add to list of commands return (list of list of str) previous list of commands """ prev_cmd_list = self.cmd_list if not isinstance(args,tuple): # something went wrong in arguments, raise an error raise TypeError('invalid arguments '+repr(args)) elif len(args) < 1: # no arguments given, nothing to do pass elif not isinstance(args[0],(tuple,list)): # not a list, raise an error raise TypeError('invalid commands '+repr(args[0])) elif len(args[0]) < 1: # an empty list is given, nothing to do pass elif isinstance(args[0][0],(tuple,list)): # a list of commands is given, replace the existing list self.cmd_list = copy.deepcopy(args[0]) elif isstr(args[0][0][0]): # a list of strings is one command, append it self.cmd_list += [ copy.copy(args[0]) ] else: # a list of other junk is given, raise an error raise TypeError('not list of commands or command '+repr(args[0])) return prev_cmd_list def io(self,*args,**opts): """Set i/o options for this pipeline instance. method opts purpose set one or more i/o options for this pipeline instance. arguments (not used) options stdin= stdout= stderr= timeout= """ prev_opts = {} if len(args) > 0: raise TypeError('unexpectede arguments '+repr(args)) for o in ('stdin','stdout','stderr','timeout',): if 'opt_'+o in dir(self): prev_opts[o] = getattr(self,'opt_'+o,None) if o in opts: setattr(self,'opt_'+o,opts[o]) return prev_opts def start(self): """Build the pipeline with commands and start it all. method start purpose build the pipeline with commands and start it all. arguments -none- options -none- returns (3-tuple) pipe ends or None for stdin,stdout,stderr note each command has stderr to the same target """ num_cmds = len( self.cmd_list ) if num_cmds < 1: return None stderr = self.opt_stderr re_stderr = None if stderr is None: stderr = sys.stderr.fileno() elif isinstance( stderr, bool ): if stderr is False: stderr = os.open( devnull_err, os.O_WRONLY ) elif stderr is True: stderr_pipe = os.pipe() re_stderr, stderr = stderr_pipe elif isinstance( stderr, str ): stderr = os.open( stderr, os.O_WRONLY ) elif 'fileno' in dir(stderr): stderr = stderr.fileno() elif not isinstance( stderr, int ): raise TypeError( 'Invalid type for stderr=' + repr(stderr) ) sys.stdout.flush() sys.stderr.flush() self.proc_list = [] last_cmd = num_cmds - 1 first_cmd = 0 for cmd_num in range( num_cmds ): cmd = self.cmd_list[cmd_num] if cmd_num == first_cmd: stdin = self.opt_stdin re_stdin = None if stdin is None: stdin = sys.stdin.fileno() elif isinstance( stdin, bool ): if stdin is False: stdin = os.open( self.devnull_in, os.O_WRONLY ) elif stdin is True: stdin_pipe = os.pipe() stdin, re_stdin = stdin_pipe elif isinstance( stdin, str ): stdin = os.open( stdin, os.O_WRONLY ) re_stdout = None elif 'fileno' in dir(stdin): stdin = stdin.fileno() elif not isinstance( stdin, int ): raise TypeError( 'Invalid type for stdin=' + repr(stdin) ) if cmd_num > first_cmd: stdin = pipe[0] if cmd_num < last_cmd: pipe = os.pipe() stdout = pipe[1] if cmd_num == last_cmd: stdout = self.opt_stdout re_stdout = None if stdout is None: stdout = sys.stdout.fileno() elif isinstance( stdout, bool ): if stdout is False: stdout = os.open( devnull_out, os.O_WRONLY ) elif stdout is True: stdout_pipe = os.pipe() re_stdout, stdout = stdout_pipe elif isinstance( stdout, str ): stdout = os.open( stdout, os.O_WRONLY ) elif 'fileno' in dir(stdout): stdout = stdout.fileno() elif not isinstance( stdout, int ): raise TypeError( 'Invalid type for stdout=' +repr(stdout) ) for x in ( (stdin,'in'), (stdout,'out'), (stderr,'err') ): s = x[0] n = x[1] for t in ( True, False ): if isinstance(s,bool) and s == t: raise TypeError( 'bad stdio, std'+n+' is '+repr(t) ) if not isinstance(s,int): raise TypeError( 'bad stdio, std'+n+' is not an int' ) fds = ( stdin, stdout, stderr ) proc = multiprocessing.Process(target=self.__proc__,args=(fds,cmd)) self.proc_list.append( proc ) proc.start() if cmd_num > first_cmd: os.close( stdin ) if cmd_num < last_cmd: os.close( stdout ) # if returning pipe ends, return them as python files if isinstance(re_stdin,int): re_stdin = os.fdopen(re_stdin,'w') if isinstance(re_stdout,int): re_stdout = os.fdopen(re_stdout,'r') if isinstance(re_stderr,int): re_stderr = os.fdopen(re_stderr,'r') return (re_stdin,re_stdout,re_stderr) def stdin(self,*a): """Set and/or get stdin file or file descriptor value method stdin argument 1 (int) file descriptor to use 1 (str) file name to open for input 1 (file) open file ... use .fileno() attribute 1 (None) no change 1 (True) create a pipe, use read end, return write end from start 1 (False) use '/dev/null' returns previous stdin value """ p = getattr(self,'opt_stdin',None) if len(a) > 0 and a[0] != None: self.opt_stdin = a[0] return p def stdout(self,*a): """Set and/or get stdout file or file descriptor value method stdin argument 1 (int) file descriptor to use 1 (str) file name to open for input 1 (file) open file ... use .fileno() attribute 1 (None) no change 1 (True) create a pipe, use write end, return read end from start 1 (False) use '/dev/null' returns previous stdout value """ p = getattr(self,'opt_stdout',None) if len(a) > 0 and a[0] != None: self.opt_stdout = a[0] return p def stderr(self,*a): """Set and/or get stderr file or file descriptor value method stderr argument 1 (int) file descriptor to use 1 (str) file name to open for input 1 (file) open file ... use .fileno() attribute 1 (None) no change 1 (True) create a pipe, use write end, return read end from start 1 (False) use '/dev/null' returns previous stderr value """ p = getattr(self,'opt_stderr',None) if len(a) > 0 and a[0] != None: self.opt_stderr = a[0] return p def run(*args,**opts): """Run a pipeline of commands - not a method of class pipeline function run purpose run a pipeline of commands - not a method of class pipeline """ p = pipeline(*args,**opts) p.start() return p.finish() def test_pipeline(args): """Run a simple test. function test_pipeline purpose run a simple test. """ p = pipeline() p.cmd( [ ['bash','-c',a] for a in args[1:] ] ) p.io( stdin=None, stdout=None, stderr=None ) r = p.start() print( 'p.start() returns:', repr( r ), file=sys.stderr ) p.finish() return 0 def main( args ): """Run here when run as a command. function main purpose run here when run as a command. """ return test_pipeline(args) if __name__ == '__main__': try: result = main( sys.argv ) sys.stdout.flush() except KeyboardInterrupt: print( '' ) exit( 141 ) except IOError: exit( 142 ) if result in (None,True,): exit( 0 ) if result in (False,): exit( 1 ) if isinstance( result, str ): print( result, file=sys.stderr ) exit( 1 ) if isinstance( result, (list,tuple) ): for r in result: if isinstance( r, six.string_types ): print( r, file=sys.stderr ) exit( 1 ) try: exit( int( result ) ) except ( TypeError, ValueError ): exit( 255 ) # EOF RE: pipeline.py - wavic - Feb-24-2017 I just read what deadlock means. Asyncio module? |