Python Forum
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
pipeline.py
#1
i have expressed the goal of converting all of my bash scripts to python Pray   one thing holding me back is so many command pipelines in these scripts Wall   people frequently suggest putting pipelines in shells Snooty   even the python documentation suggests the shell= feature (in subprocess) can be used for pipelines Shocked

but my goal was to eliminate the use of shells to avoid their risks and simplify building safe command structures Think   so i decided i needed a tool to do this just in python, totally bypassing shells Razz  

pipeline.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
"""Run a pipeline of commands.

file          pipeline.py
purpose       run a pipeline of commands
email         10054452614123394844460370234029112340408691

The intent is that this command works correctly under both Python 2 and
Python 3.  Please report failures or code improvement to the author.
"""

__license__ = """
Copyright (C) 2017, by Phil D. Howard - all other rights reserved

Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA, OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE, OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

The author may be contacted by decoding the number
10054452614123394844460370234029112340408691
(provu igi la numeron al duuma)
"""
import multiprocessing,os,sys

proc_list = None


def child( fdi, fdo, cmd ):
    """Run a given command in a child process from a list of arguments."""
    savefd_stderr = os.dup(2)
    if fdi < 0 or fdo < 0:
        return -1
    fds = []
    while True:
        fd = os.dup(fdo)
        if fd < 0:
            raise IOError('invalid fd from dup({})'.format(repr(fdo)))
        if fd > 2:
            break
        fds.append( fd )
    ndo, ndi = fd, os.dup(fdi)
    if ndi < 0:
        raise IOError('invalid fd from dup({})'.format(repr(fdo)))
    if ndi < 3:
        raise ValueError('fd in range 0..2 after they should all be assigned')
    os.close(fdo)
    os.close(fdi)
    os.dup2(ndi,0)
    os.dup2(ndo,1)
    os.dup2(ndo,2)
    os.close(ndo)
    os.close(ndi)
    os.execvp(cmd[0],cmd)
    raise OSError('os.execvp({},{}) failed').format(
        repr(cmd[0]),repr(cmd),file=sys.stderr)


def start( list_of_cmds, stdin=-1, stdout=-1 ):
    """Set up multi-command pipeline from list of commands.

function      start
purpose       Start a pipeline of commands.
syntax        pipeline.start( list_of_commands, stdin=, stdout= )
argument      1 only (list of lists of strings)
options       fdr= fd that pipeline (first command) will read from
              fdw= fd that pipeline (last command) will write to
returns       tuple of 2 fds for read end and write end of pipes
              created for pipeline if no fds given as options.

If no output is provided as option stdout= then a one-way pipe object will be
created with the last command of the pipeline being given the write side of
this object and the read side being returned in the return tuple index 0 for
the caller to read from.

If no input is provided as option stdin= then a one-way pipe object will be
created with the first command of the pipeline being given the read side of
this object and the write side being returned in the return tuple index 1 for
the caller to write to.

Only one pipeline instance can be created by this implementation.  A future
implementation will support multiple instances allow muliple pipelines to
be running concurrently.

There is no coded limit on the number of commands in the pipeline.

Call pipeline.close() when the pipeline is no longer needed."""
    global proc_list

    if len( list_of_cmds ) < 1:
        return (-1,-1)

    rfdw = rfdr = -1

    if stdout in (sys.stdout.fileno(),sys.stdout):
        sys.stdout.flush()
    if stdout in (sys.stderr.fileno(),sys.stderr):
        sys.stderr.flush()

    fdr = stdin if isinstance( stdin, int ) else stdin.fileno()
    fdw = stdout if isinstance( stdout, int ) else stdout.fileno()

    proc_list = []
    fdi = fdr
    for cmd in list_of_cmds[:-1]:
        if fdi < 0:
            pipe = os.pipe()
            fdi, rfdw = pipe
        pipe = os.pipe()
        fdo = pipe[1]
        proc = multiprocessing.Process( target=child, args=(fdi,fdo,cmd) )
        proc_list.append( proc )
        proc.start()
        os.close(fdi)
        os.close(fdo)
        fdi = pipe[0]

    cmd = list_of_cmds[-1]
    if fdw < 0:
        pipe = os.pipe()
        rfdr, fdo = pipe
    else:
        fdo = fdw
    proc = multiprocessing.Process( target=child, args=(fdi,fdo,cmd) )
    proc_list.append( proc )
    proc.start()
    if fdi < 0:
        os.close(fdi)
    if fdw < 0:
        os.close(fdo)

    ri = None if rfdr < 0 else os.fdopen(rfdr,'r')
    ro = None if rfdw < 0 else os.fdopen(rfdw,'w')
    return ( ri, ro )


def close( timeout=360 ):
    """Wait for the pipeline (all the commands) to end.

function      close
purpose       Wait for the pipeline (all the commands) to end
syntax        close( timeout= )
arguments     -none-
options       timeout= number of seconds to wait for commands to end
              (default is 360)
"""
    global proc_list
    for proc in proc_list:
        proc.join( int( timeout ) )
    proc_list = None
    return None


def run( commands, stdin=-1, stdout=-1, timeout=360 ):
    """Start and close in one function call"""
    start( commands, stdin=stdin, stdout=stdout )
    close( timeout=timeout)


def test_pipeline( args ):
    """Test the pipeline code (not implemented)."""
    pid='['+str(os.getpid())+']'
    print( pid, args[0]+':', 'not implemented' )
    return None

    
def help_pipeline():
    """Show pipeline library help docstrings."""
    import pipeline
    return help( pipeline )


def main( args ):
    """Run multiple command lines under separate instances of bash as a pipiline or if no commands are given disply help or if one command is 'test' run a test

function      main
purpose       Run multiple command lines under separate
              instances of bash as a pipiline
              or if no commands are given disply help
              or if one command is 'test' run a test
command       pipeline "command" ...
              pipeline help
              pipeline test
              pipeline
function      start( command_list, fdr=, fdw= )
arguments     command_list: a list of commands (lists)
              command: a list of arguments (strs)
              stdin= file or descriptor to read from
              stdout= file or descriptor to write to
"""
    if len(args) < 1:
        return help_pipeline()
    if len(args) > 1:
        subcmd = args[1]
        if subcmd == 'help':
            return help_pipeline()
        if subcmd == 'test':
            try:
                return test_pipeline( args )
            except KeyboardInterrupt:
                print( '\nOUCH!', file = sys.stderr )
                return 150
    run( [['bash','-c',c] for c in args[1:]], fdr=0, fdw=1 )
    return 0


if __name__ == '__main__':
    try:
        result = main( sys.argv )
        sys.stdout.flush()
    except KeyboardInterrupt:
        result = 141
        print( '' )
    except IOError:
        result = 142
    try:
        exit( int( result ) )
    except ValueError:
        print( str( result ), file=sys.stderr )
        exit( 1 )
    except TypeError:
        if result == None:
            exit( 0 )
        exit( 255 )

# EOF
02b4aeec7be82137cb9a142451981e9b
85d863fd60722faf546a9dc3d6696f63182d3028b32edc7aab7518a72b1202de

it seems to also work in python2.7 Dance


here is a small example script:
lspy.py:
#!/usr/bin/env python3
import pipeline,sys
pipeline.run( [ ['ls','-l'], ['grep','\\.py$'], ['tail'] ], stdout=sys.stdout )
Tradition is peer pressure from dead people

What do you call someone who speaks three languages? Trilingual. Two languages? Bilingual. One language? American.
Reply


Messages In This Thread
pipeline.py - by Skaperen - Jan-26-2017, 09:06 AM
RE: pipeline.py - by wavic - Jan-26-2017, 11:50 AM
RE: pipeline.py - by Skaperen - Jan-27-2017, 04:16 AM
RE: pipeline.py - by nilamo - Feb-03-2017, 09:29 PM
RE: pipeline.py - by Skaperen - Feb-04-2017, 04:56 AM
RE: pipeline.py - by wavic - Feb-04-2017, 01:37 PM
RE: pipeline.py - by Skaperen - Feb-05-2017, 03:03 AM
RE: pipeline.py - by wavic - Feb-05-2017, 05:30 AM
RE: pipeline.py - by Skaperen - Feb-24-2017, 10:23 AM
RE: pipeline.py - by wavic - Feb-24-2017, 11:27 AM
RE: pipeline.py - by Skaperen - Feb-25-2017, 02:53 AM
RE: pipeline.py - by Skaperen - Feb-25-2017, 05:11 AM

Possibly Related Threads…
Thread Author Replies Views Last Post
  Feature Extraction and Modeling Pipeline FelixLarry 1 2,165 Sep-07-2022, 09:42 AM
Last Post: Larz60+
  how to run a command pipeline and get its result output as a piped file Skaperen 0 1,948 Aug-04-2022, 11:59 PM
Last Post: Skaperen

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020