Python Forum
How to call subprocess after for loop
Thread Rating:
  • 1 Vote(s) - 5 Average
  • 1
  • 2
  • 3
  • 4
  • 5
How to call subprocess after for loop
#1
I have pyspark script like below. In this script, I am collecting stdout and stderr of the script in a file and storing in local.

In this script, I am running the function initial in a loop over input file.

The script works fine, but with small error. In this script, I am using subprocess to move data in local to a different location.

I want this subprocess call to run after finishing the loop but instead, it runs when the loop runs for the first time and when the second time the loop runs it throws an error which is expected. As the file is present it throws an error.

Pyspark script


	#!/usr/bin/python

	import os
	import sys
	import traceback
	import subprocess
	import pandas as pd
	from datetime import datetime
	from pyspark import SparkContext, SparkConf
	from pyspark.sql import HiveContext
	conf = SparkConf()
	sc = SparkContext(conf=conf)
	sqlContext = HiveContext(sc)


	def initial(
		table,
		hivedb,
		domain,
		port,
		mysqldb,
		mysql_user,
		user,
		dir,
		):


		day = datetime.now().strftime('%Y-%m-%d')
		month = datetime.now().strftime('%Y-%m')

		Linux_path = '/data/logging/{}'.format(input_file)
		HDFS_path = '/user/{}/{}/logging/{}/{}/{}'.format(user,dir,mysqldb,month,day)

		subprocess.call(["hdfs", "dfs", "-mkdir", "-p", HDFS_path])
		subprocess.call(["rm", Linux_path])

		so = se = open('/data/logging/{}'.format(input_file), 'a',
					   0)

		#re-open stdout without buffering
		sys.stdout = os.fdopen(sys.stdout.fileno(), 'a', 0)

		# redirect stdout and stderr to the log file opened above
		os.dup2(so.fileno(), sys.stdout.fileno())
		os.dup2(se.fileno(), sys.stderr.fileno())

        ### CODE:
		
		Do something
		
		### if errors the print traceback
		
		### repeat the same for every table in input file
		

		subprocess.call(["hdfs", "dfs", "-put", Linux_path, HDFS_path])

		
	if len(sys.argv) != 9:
		print 'Invalid number of args......'
		print 'Usage: spark-submit file.py Input_path Output_path'
		exit()

	input_file = sys.argv[1]
	hivedb = sys.argv[2]
	domain = sys.argv[3]
	port = sys.argv[4]
	mysqldb = sys.argv[5]
	mysql_user = sys.argv[6]
	user = sys.argv[7]
	dir = sys.argv[8]

	input = \
		sc.textFile('/user/{}/{}/{}/args/{}'.format(user,
					dir, mysqldb,
					input_file)).collect()

	for table in input:
		initial(
			table,
			hivedb,
			domain,
			port,
			mysqldb,
			mysql_user,
			user,
			dir,
			)

	sc.stop()
	print '**********************************************************************************************************************************************************************'

How can I change my script so that the subprocess call will run after the for loop is done? What changes do I need to make in my script?
Reply


Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020