Nov-06-2017, 07:45 PM
I have pyspark script like below. In this script, I am collecting stdout and stderr of the script in a file and storing in local.
In this script, I am running the function initial in a loop over input file.
The script works fine, but with small error. In this script, I am using subprocess to move data in local to a different location.
I want this subprocess call to run after finishing the loop but instead, it runs when the loop runs for the first time and when the second time the loop runs it throws an error which is expected. As the file is present it throws an error.
Pyspark script
How can I change my script so that the subprocess call will run after the for loop is done? What changes do I need to make in my script?
In this script, I am running the function initial in a loop over input file.
The script works fine, but with small error. In this script, I am using subprocess to move data in local to a different location.
I want this subprocess call to run after finishing the loop but instead, it runs when the loop runs for the first time and when the second time the loop runs it throws an error which is expected. As the file is present it throws an error.
Pyspark script
#!/usr/bin/python import os import sys import traceback import subprocess import pandas as pd from datetime import datetime from pyspark import SparkContext, SparkConf from pyspark.sql import HiveContext conf = SparkConf() sc = SparkContext(conf=conf) sqlContext = HiveContext(sc) def initial( table, hivedb, domain, port, mysqldb, mysql_user, user, dir, ): day = datetime.now().strftime('%Y-%m-%d') month = datetime.now().strftime('%Y-%m') Linux_path = '/data/logging/{}'.format(input_file) HDFS_path = '/user/{}/{}/logging/{}/{}/{}'.format(user,dir,mysqldb,month,day) subprocess.call(["hdfs", "dfs", "-mkdir", "-p", HDFS_path]) subprocess.call(["rm", Linux_path]) so = se = open('/data/logging/{}'.format(input_file), 'a', 0) #re-open stdout without buffering sys.stdout = os.fdopen(sys.stdout.fileno(), 'a', 0) # redirect stdout and stderr to the log file opened above os.dup2(so.fileno(), sys.stdout.fileno()) os.dup2(se.fileno(), sys.stderr.fileno()) ### CODE: Do something ### if errors the print traceback ### repeat the same for every table in input file subprocess.call(["hdfs", "dfs", "-put", Linux_path, HDFS_path]) if len(sys.argv) != 9: print 'Invalid number of args......' print 'Usage: spark-submit file.py Input_path Output_path' exit() input_file = sys.argv[1] hivedb = sys.argv[2] domain = sys.argv[3] port = sys.argv[4] mysqldb = sys.argv[5] mysql_user = sys.argv[6] user = sys.argv[7] dir = sys.argv[8] input = \ sc.textFile('/user/{}/{}/{}/args/{}'.format(user, dir, mysqldb, input_file)).collect() for table in input: initial( table, hivedb, domain, port, mysqldb, mysql_user, user, dir, ) sc.stop() print '**********************************************************************************************************************************************************************'
How can I change my script so that the subprocess call will run after the for loop is done? What changes do I need to make in my script?