I have pyspark script that is working fine. This script will fetch data from mysql and create hive tables in HDFS.
The pyspark script is below.
Now this pyspark script will be invoked by using a shell script. For this shell script I am passing table names as arguments from a file.
The shell script is below.
#!/bin/bash
source /home/$USER/spark/source.sh
[ $# -ne 1 ] && { echo "Usage : $0 table ";exit 1; }
args_file=$1
TIMESTAMP=date "+%Y-%m-%d"
touch /home/$USER/logs/${TIMESTAMP}.success_log
touch /home/$USER/logs/${TIMESTAMP}.fail_log
success_logs=/home/$USER/logs/${TIMESTAMP}.success_log
failed_logs=/home/$USER/logs/${TIMESTAMP}.fail_log
#Function to get the status of the job creation
function log_status
{
status=$1
message=$2
if [ "$status" -ne 0 ]; then
echo "date +\"%Y-%m-%d %H:%M:%S\" [ERROR] $message [Status] $status : failed" | tee -a "${failed_logs}"
#echo "Please find the attached log file for more details"
exit 1
else
echo "date +\"%Y-%m-%d %H:%M:%S\" [INFO] $message [Status] $status : success" | tee -a "${success_logs}"
fi
}
while read -r table ;do
spark-submit --name "${table}" --master "yarn-client" --num-executors 2 --executor-memory 6g --executor-cores 1 --conf "spark.yarn.executor.memoryOverhead=609" /home/$USER/spark/sql_spark.py ${table} ${hivedb} ${domain} ${port} ${mysqldb} ${username} ${password} > /tmp/logging/${table}.log 2>&1
g_STATUS=$?
log_status $g_STATUS "Spark job ${table} Execution"
done < "${args_file}"
echo "************************************************************************************************************************************************************************"
I am able to collect logs for each individual table in the args_file using the above shell script.
Now I have more than 200 tables in mysql. I have modified the pyspark script like below. I have create a function to itreate over the args_file and execute the code.
New spark script
Now I want to collect the logs for individual table in args_file. But I am getting only one log file that has the log for all the tables.
How can I achieve my requirement? Or is the method I am doing is completely wrong
> New shell script:
spark-submit --name "${args_file}" --master "yarn-client" --num-executors 2 --executor-memory 6g --executor-cores 1 --conf "spark.yarn.executor.memoryOverhead=609" /home/$USER/spark/sql_spark.py ${table} ${hivedb} ${domain} ${port} ${mysqldb} ${username} ${password} > /tmp/logging/${args_file}.log 2>&
The pyspark script is below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
#!/usr/bin/env python import sys from pyspark import SparkContext, SparkConf from pyspark.sql import HiveContext conf = SparkConf() sc = SparkContext(conf = conf) sqlContext = HiveContext(sc) #Condition to specify exact number of arguments in the spark-submit command line if len (sys.argv) ! = 8 : print "Invalid number of args......" print "Usage: spark-submit import.py Arguments" exit() table = sys.argv[ 1 ] hivedb = sys.argv[ 2 ] domain = sys.argv[ 3 ] port = sys.argv[ 4 ] mysqldb = sys.argv[ 5 ] username = sys.argv[ 6 ] password = sys.argv[ 7 ] df = sqlContext.read. format ( "jdbc" ).option( "url" , "{}:{}/{}" . format (domain,port,mysqldb)).option( "driver" , "com.mysql.jdbc.Driver" ).option( "dbtable" , "{}" . format (table)).option( "user" , "{}" . format (username)).option( "password" , "{}" . format (password)).load() #Register dataframe as table df.registerTempTable( "mytempTable" ) # create hive table from temp table: sqlContext.sql( "create table {}.{} as select * from mytempTable" . format (hivedb,table)) sc.stop() |
The shell script is below.
#!/bin/bash
source /home/$USER/spark/source.sh
[ $# -ne 1 ] && { echo "Usage : $0 table ";exit 1; }
args_file=$1
TIMESTAMP=date "+%Y-%m-%d"
touch /home/$USER/logs/${TIMESTAMP}.success_log
touch /home/$USER/logs/${TIMESTAMP}.fail_log
success_logs=/home/$USER/logs/${TIMESTAMP}.success_log
failed_logs=/home/$USER/logs/${TIMESTAMP}.fail_log
#Function to get the status of the job creation
function log_status
{
status=$1
message=$2
if [ "$status" -ne 0 ]; then
echo "date +\"%Y-%m-%d %H:%M:%S\" [ERROR] $message [Status] $status : failed" | tee -a "${failed_logs}"
#echo "Please find the attached log file for more details"
exit 1
else
echo "date +\"%Y-%m-%d %H:%M:%S\" [INFO] $message [Status] $status : success" | tee -a "${success_logs}"
fi
}
while read -r table ;do
spark-submit --name "${table}" --master "yarn-client" --num-executors 2 --executor-memory 6g --executor-cores 1 --conf "spark.yarn.executor.memoryOverhead=609" /home/$USER/spark/sql_spark.py ${table} ${hivedb} ${domain} ${port} ${mysqldb} ${username} ${password} > /tmp/logging/${table}.log 2>&1
g_STATUS=$?
log_status $g_STATUS "Spark job ${table} Execution"
done < "${args_file}"
echo "************************************************************************************************************************************************************************"
I am able to collect logs for each individual table in the args_file using the above shell script.
Now I have more than 200 tables in mysql. I have modified the pyspark script like below. I have create a function to itreate over the args_file and execute the code.
New spark script
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
#!/usr/bin/env python import sys from pyspark import SparkContext, SparkConf from pyspark.sql import HiveContext conf = SparkConf() sc = SparkContext(conf = conf) sqlContext = HiveContext(sc) #Condition to specify exact number of arguments in the spark-submit command line if len (sys.argv) ! = 8 : print "Invalid number of args......" print "Usage: spark-submit import.py Arguments" exit() args_file = sys.argv[ 1 ] hivedb = sys.argv[ 2 ] domain = sys.argv[ 3 ] port = sys.argv[ 4 ] mysqldb = sys.argv[ 5 ] username = sys.argv[ 6 ] password = sys.argv[ 7 ] def testing(table, hivedb, domain, port, mysqldb, username, password): print "*********************************************************table = {} ***************************" . format (table) df = sqlContext.read. format ( "jdbc" ).option( "url" , "{}:{}/{}" . format (domain,port,mysqldb)).option( "driver" , "com.mysql.jdbc.Driver" ).option( "dbtable" , "{}" . format (table)).option( "user" , "{}" . format (username)).option( "password" , "{}" . format (password)).load() #Register dataframe as table df.registerTempTable( "mytempTable" ) # create hive table from temp table: sqlContext.sql( "create table {}.{} stored as parquet as select * from mytempTable" . format (hivedb,table)) input = sc.textFile( '/user/XXXXXXX/spark_args/%s' % args_file).collect() for table in input : testing(table, hivedb, domain, port, mysqldb, username, password) sc.stop() |
How can I achieve my requirement? Or is the method I am doing is completely wrong
> New shell script:
spark-submit --name "${args_file}" --master "yarn-client" --num-executors 2 --executor-memory 6g --executor-cores 1 --conf "spark.yarn.executor.memoryOverhead=609" /home/$USER/spark/sql_spark.py ${table} ${hivedb} ${domain} ${port} ${mysqldb} ${username} ${password} > /tmp/logging/${args_file}.log 2>&
1 |
|