Jan-13-2022, 03:01 PM
Hello Community,
I have been given a challenge to review the following PySpark code and comment on any areas where I see there are errors.
While I understand most of the code there are few areas that I would like clarifying. For example, there is a requirement to:
Also there is the following requirement:
The first part of the requirement is achievable using the following code:
However, I'm not sure how to achieve the following two requirements, the full code is below. Any thoughts greatly appreciated.
I have been given a challenge to review the following PySpark code and comment on any areas where I see there are errors.
While I understand most of the code there are few areas that I would like clarifying. For example, there is a requirement to:
1 |
# Column F: Need to apply conversion factor of 2.5 i.e. Value 2, conversion factor 2.5 = 5 |
1 2 3 |
# Join very_large_dataframe to small_product_dimension_dataframe on column [B] # Only join records to small_product_dimension_dataframe where O is greater then 10 # Keep only Column [P] |
1 2 |
very_large_dataframe = very_large_dataframe.join(small_product_dimension_dataframe, (very_large_dataframe.B = = small_product_dimension_dataframe.B)) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
MIN_SUM_THRESHOLD = 10 , 000 , 000 def has_columns(data_frame, column_name_list): for column_name in column_name_list: if not data_frame.columns.contains(column_name): raise Exception( 'Column is missing: ' + column_name) def column_count(data_frame): return data_frame.columns.size def process(): # Create spark session spark = SparkSession.builder.getOrCreate() # very_large_dataframe # 250 GB of CSV files from client which must have only 10 columns [A, B, C, D, E, F, G, H, I, J] # [A, B] contains string data # [C, D, E, F, G, H, I, J] contains decimals with precision 5, scale 2 (i.e. 125.75) # [A, B, C, D, E] should not be null # [F, G, H, I, J] should may be null very_large_dataset_location = '/Sourced/location_1' very_large_dataframe = spark.read.csv(very_large_dataset_location, header = True , sep = "\t" ) # validate column count if column_count(very_large_dataframe) ! = 10 : raise Exception( 'Incorrect column count: ' + column_count(very_large_dataframe)) # validate that dataframe has all required columns has_columns(very_large_dataframe, [ 'A' , 'B' , 'C' , 'D' , 'E' , 'F' , 'G' , 'H' , 'I' , 'J' ]) # TODO # Column F: Need to apply conversion factor of 2.5 i.e. Value 2, conversion factor 2.5 = 5 # Remove duplicates in column [A] # 50% of records in column [A] could potentially be duplicates very_large_dataframe = very_large_dataframe.dropDuplicates([ 'A' ]) # Get count of column F and ensure it is above MIN_SUM_THRESHOLD total_sum_of_column_F = very_large_dataframe.agg( sum ( 'F' )).collect()[ 0 ][ 0 ] if total_sum_of_column_F < MIN_SUM_THRESHOLD: raise Exception( 'total_sum_of_column_A: ' + total_sum_of_column_F + ' is below threshold: ' + MIN_SUM_THRESHOLD) # small_geography_dimension_dataframe # 25 MB of parquet, 4 columns [A, K, L, M] # Columns [A, K, L] contain only string data # Column [M] is an integer # Columns [A, K, L, M] contain all non nullable data. Assume this is the case small_geography_dimension_dataset = '/location_2' small_geography_dimension_dataframe = spark.read.parquet(small_geography_dimension_dataset) # Join very_large_dataframe to small_geography_dimension_dataframe on column [A] # Include only column [M] from small_geography_dimension_dataframe on new very_large_dataframe # No data (row count) loss should occur from very_large_dataframe very_large_dataframe = very_large_dataframe.join(small_geography_dimension_dataframe, (very_large_dataframe.A = = small_geography_dimension_dataframe.A)) # small_product_dimension_dataframe # 50 MB of parquet, 4 columns [B, N, O, P] # Columns [B, N] contain only string data # Columns [O, P] contain only integers # Columns [B, N, O, P] contain all non nullable data. Assume this is the case small_product_dimension_dataset = './location_3' # 50 MB of parquet small_product_dimension_dataframe = spark.read.parquet(small_product_dimension_dataset) # TODO # Join very_large_dataframe to small_product_dimension_dataframe on column [B] # Only join records to small_product_dimension_dataframe where O is greater then 10 # Keep only Column [P] very_large_dataframe = very_large_dataframe.join(small_product_dimension_dataframe, (very_large_dataframe.B = = small_product_dimension_dataframe.B)) |