Python Forum

Full Version: PySpark Coding Challenge
You're currently viewing a stripped down version of our content. View the full version with proper formatting.
Hello Community,

I have been presented with a challenge that I'm struggling with.

The challenge is as follows:
Write three Python functions, register them as PySpark UDF functions ans use them to produce an output dataframe.
The following is a sample of the dataset, also attached:

Output:
----------------------------------------------+-----------------------+-----------+------------------------+ |Species |Category |Period |Annual percentage change| +----------------------------------------------+-----------------------+-----------+------------------------+ |Greenfinch (Chloris chloris) |Farmland birds |(1970-2014)|-1.13 | |Siskin (Carduelis spinus) |Woodland birds |(1995-2014)|2.26 | |European shag (Phalacrocorax artistotelis) |Seabirds |(1986-2014)|-2.31 | |Mute Swan (Cygnus olor) |Water and wetland birds|(1975-2014)|1.65 | |Collared Dove (Streptopelia decaocto) |Other |(1970-2014)|5.2 | +----------------------------------------------+-----------------------+-----------+------------------------+
The requirement is to create the following three functions:

1. get_english_name - this function should get the Species column value and return the English name.

2. get_start_year - this function should get the Period column value and return the year(an integer) when data collection began.

3. get_trend - this function should get the Annual percentage change column value and return the change trend category based on the following rules:
a. Annual percentage change less than -3.00 – return 'strong decline'
b. Annual percentage change between -3.00 and -0.50 (inclusive) – return 'weak decline'
c. Annual percentage change between -0.50 and 0.50 (exclusive) – return 'no change'
d. Annual percentage change between 0.50 and 3.00 (inclusive) – return 'weak increase'
e. Annual percentage change more than 3.00 – return 'strong increase'.

The functions then need to registered as PySpark UDF functions so that they can be used in PySpark.

Any assitance greatly appreciated.
Show us what you've done so far (python code), and where you are having difficulty.
def get_english_name(species):
pass


def get_start_year(period):
pass


def get_trend(annual_percentage_change):
pass
Come on, you can't seriously consider just writing the function signatures as actual effort, can you?
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType,IntegerType,StructType,StructField,FloatType
from pyspark.sql.functions import when, col, udf
spark = SparkSession.builder.appName("exp").getOrCreate()
sc = spark.sparkContext
@udf(returnType=StringType())
def get_english_name(val):
    return val[0:val.index(" (")]

@udf(returnType=IntegerType())
def get_start_year(val):
    return int(val[1:5])

@udf(returnType=StringType())
def get_trend(x):
    if x < -3.00:
        return "strong decline"
    elif -3.00 < x < -0.50:
        return "weak decline"
    elif -0.50 <x<0.50:
        return "no change"
    else:
        return "strong increase"
    
info = [("Greenfinch (Chloris chloris)","Farmland birds","(1970-2014)",-1.13),("Siskin (Carduelis spinus)","Woodland birds","(1995-2014)",2.26),
        ("European shag (Phalacrocorax artistotelis)","Seabirds","(1986-2014)",-2.31),("Mute Swan (Cygnus olor)","Water and wetland birds","(1975-2014)",1.65)
        ,("Collared Dove (Streptopelia decaocto)","other","(1970-2014)",5.2)] 
schema1 = StructType(
    [StructField("Species", StringType()),
     StructField("Category", StringType()),
     StructField("Period", StringType()),
     StructField("Annual_percentage_change", FloatType())
     ])

rdd = sc.parallelize(info)
data = spark.createDataFrame(rdd, schema=schema1) 

data2 = data.withColumn("English_Name", get_english_name(col("Species")))\
    .withColumn("start_yearn", get_start_year(col("Period")))\
        .withColumn("Trend", get_trend(col("Annual_percentage_change")))
data2.show()
spark.stop()