I have pyspark script that is working fine. This script will fetch data from mysql and create hive tables in HDFS.
The pyspark script is below.
The shell script is below.
#!/bin/bash
source /home/$USER/spark/source.sh
[ $# -ne 1 ] && { echo "Usage : $0 table ";exit 1; }
args_file=$1
TIMESTAMP=date "+%Y-%m-%d"
touch /home/$USER/logs/${TIMESTAMP}.success_log
touch /home/$USER/logs/${TIMESTAMP}.fail_log
success_logs=/home/$USER/logs/${TIMESTAMP}.success_log
failed_logs=/home/$USER/logs/${TIMESTAMP}.fail_log
#Function to get the status of the job creation
function log_status
{
status=$1
message=$2
if [ "$status" -ne 0 ]; then
echo "date +\"%Y-%m-%d %H:%M:%S\" [ERROR] $message [Status] $status : failed" | tee -a "${failed_logs}"
#echo "Please find the attached log file for more details"
exit 1
else
echo "date +\"%Y-%m-%d %H:%M:%S\" [INFO] $message [Status] $status : success" | tee -a "${success_logs}"
fi
}
while read -r table ;do
spark-submit --name "${table}" --master "yarn-client" --num-executors 2 --executor-memory 6g --executor-cores 1 --conf "spark.yarn.executor.memoryOverhead=609" /home/$USER/spark/sql_spark.py ${table} ${hivedb} ${domain} ${port} ${mysqldb} ${username} ${password} > /tmp/logging/${table}.log 2>&1
g_STATUS=$?
log_status $g_STATUS "Spark job ${table} Execution"
done < "${args_file}"
echo "************************************************************************************************************************************************************************"
I am able to collect logs for each individual table in the args_file using the above shell script.
Now I have more than 200 tables in mysql. I have modified the pyspark script like below. I have create a function to itreate over the args_file and execute the code.
New spark script
How can I achieve my requirement? Or is the method I am doing is completely wrong
> New shell script:
spark-submit --name "${args_file}" --master "yarn-client" --num-executors 2 --executor-memory 6g --executor-cores 1 --conf "spark.yarn.executor.memoryOverhead=609" /home/$USER/spark/sql_spark.py ${table} ${hivedb} ${domain} ${port} ${mysqldb} ${username} ${password} > /tmp/logging/${args_file}.log 2>&
The pyspark script is below.
#!/usr/bin/env python import sys from pyspark import SparkContext, SparkConf from pyspark.sql import HiveContext conf = SparkConf() sc = SparkContext(conf=conf) sqlContext = HiveContext(sc) #Condition to specify exact number of arguments in the spark-submit command line if len(sys.argv) != 8: print "Invalid number of args......" print "Usage: spark-submit import.py Arguments" exit() table = sys.argv[1] hivedb = sys.argv[2] domain = sys.argv[3] port=sys.argv[4] mysqldb=sys.argv[5] username=sys.argv[6] password=sys.argv[7] df = sqlContext.read.format("jdbc").option("url", "{}:{}/{}".format(domain,port,mysqldb)).option("driver", "com.mysql.jdbc.Driver").option("dbtable","{}".format(table)).option("user", "{}".format(username)).option("password", "{}".format(password)).load() #Register dataframe as table df.registerTempTable("mytempTable") # create hive table from temp table: sqlContext.sql("create table {}.{} as select * from mytempTable".format(hivedb,table)) sc.stop()Now this pyspark script will be invoked by using a shell script. For this shell script I am passing table names as arguments from a file.
The shell script is below.
#!/bin/bash
source /home/$USER/spark/source.sh
[ $# -ne 1 ] && { echo "Usage : $0 table ";exit 1; }
args_file=$1
TIMESTAMP=date "+%Y-%m-%d"
touch /home/$USER/logs/${TIMESTAMP}.success_log
touch /home/$USER/logs/${TIMESTAMP}.fail_log
success_logs=/home/$USER/logs/${TIMESTAMP}.success_log
failed_logs=/home/$USER/logs/${TIMESTAMP}.fail_log
#Function to get the status of the job creation
function log_status
{
status=$1
message=$2
if [ "$status" -ne 0 ]; then
echo "date +\"%Y-%m-%d %H:%M:%S\" [ERROR] $message [Status] $status : failed" | tee -a "${failed_logs}"
#echo "Please find the attached log file for more details"
exit 1
else
echo "date +\"%Y-%m-%d %H:%M:%S\" [INFO] $message [Status] $status : success" | tee -a "${success_logs}"
fi
}
while read -r table ;do
spark-submit --name "${table}" --master "yarn-client" --num-executors 2 --executor-memory 6g --executor-cores 1 --conf "spark.yarn.executor.memoryOverhead=609" /home/$USER/spark/sql_spark.py ${table} ${hivedb} ${domain} ${port} ${mysqldb} ${username} ${password} > /tmp/logging/${table}.log 2>&1
g_STATUS=$?
log_status $g_STATUS "Spark job ${table} Execution"
done < "${args_file}"
echo "************************************************************************************************************************************************************************"
I am able to collect logs for each individual table in the args_file using the above shell script.
Now I have more than 200 tables in mysql. I have modified the pyspark script like below. I have create a function to itreate over the args_file and execute the code.
New spark script
#!/usr/bin/env python import sys from pyspark import SparkContext, SparkConf from pyspark.sql import HiveContext conf = SparkConf() sc = SparkContext(conf=conf) sqlContext = HiveContext(sc) #Condition to specify exact number of arguments in the spark-submit command line if len(sys.argv) != 8: print "Invalid number of args......" print "Usage: spark-submit import.py Arguments" exit() args_file = sys.argv[1] hivedb = sys.argv[2] domain = sys.argv[3] port=sys.argv[4] mysqldb=sys.argv[5] username=sys.argv[6] password=sys.argv[7] def testing(table, hivedb, domain, port, mysqldb, username, password): print "*********************************************************table = {} ***************************".format(table) df = sqlContext.read.format("jdbc").option("url", "{}:{}/{}".format(domain,port,mysqldb)).option("driver", "com.mysql.jdbc.Driver").option("dbtable","{}".format(table)).option("user", "{}".format(username)).option("password", "{}".format(password)).load() #Register dataframe as table df.registerTempTable("mytempTable") # create hive table from temp table: sqlContext.sql("create table {}.{} stored as parquet as select * from mytempTable".format(hivedb,table)) input = sc.textFile('/user/XXXXXXX/spark_args/%s' %args_file).collect() for table in input: testing(table, hivedb, domain, port, mysqldb, username, password) sc.stop()Now I want to collect the logs for individual table in args_file. But I am getting only one log file that has the log for all the tables.
How can I achieve my requirement? Or is the method I am doing is completely wrong
> New shell script:
spark-submit --name "${args_file}" --master "yarn-client" --num-executors 2 --executor-memory 6g --executor-cores 1 --conf "spark.yarn.executor.memoryOverhead=609" /home/$USER/spark/sql_spark.py ${table} ${hivedb} ${domain} ${port} ${mysqldb} ${username} ${password} > /tmp/logging/${args_file}.log 2>&