Jan-13-2022, 03:01 PM
Hello Community,
I have been given a challenge to review the following PySpark code and comment on any areas where I see there are errors.
While I understand most of the code there are few areas that I would like clarifying. For example, there is a requirement to:
I have been given a challenge to review the following PySpark code and comment on any areas where I see there are errors.
While I understand most of the code there are few areas that I would like clarifying. For example, there is a requirement to:
# Column F: Need to apply conversion factor of 2.5 i.e. Value 2, conversion factor 2.5 = 5Also there is the following requirement:
# Join very_large_dataframe to small_product_dimension_dataframe on column [B] # Only join records to small_product_dimension_dataframe where O is greater then 10 # Keep only Column [P]The first part of the requirement is achievable using the following code:
very_large_dataframe = very_large_dataframe.join(small_product_dimension_dataframe, (very_large_dataframe.B == small_product_dimension_dataframe.B))However, I'm not sure how to achieve the following two requirements, the full code is below. Any thoughts greatly appreciated.
MIN_SUM_THRESHOLD = 10, 000, 000 def has_columns(data_frame, column_name_list): for column_name in column_name_list: if not data_frame.columns.contains(column_name): raise Exception('Column is missing: ' + column_name) def column_count(data_frame): return data_frame.columns.size def process(): # Create spark session spark = SparkSession.builder.getOrCreate() # very_large_dataframe # 250 GB of CSV files from client which must have only 10 columns [A, B, C, D, E, F, G, H, I, J] # [A, B] contains string data # [C, D, E, F, G, H, I, J] contains decimals with precision 5, scale 2 (i.e. 125.75) # [A, B, C, D, E] should not be null # [F, G, H, I, J] should may be null very_large_dataset_location = '/Sourced/location_1' very_large_dataframe = spark.read.csv(very_large_dataset_location, header=True, sep="\t") # validate column count if column_count(very_large_dataframe) != 10: raise Exception('Incorrect column count: ' + column_count(very_large_dataframe)) # validate that dataframe has all required columns has_columns(very_large_dataframe, ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J']) # TODO # Column F: Need to apply conversion factor of 2.5 i.e. Value 2, conversion factor 2.5 = 5 # Remove duplicates in column [A] # 50% of records in column [A] could potentially be duplicates very_large_dataframe = very_large_dataframe.dropDuplicates(['A']) # Get count of column F and ensure it is above MIN_SUM_THRESHOLD total_sum_of_column_F = very_large_dataframe.agg(sum('F')).collect()[0][0] if total_sum_of_column_F < MIN_SUM_THRESHOLD: raise Exception('total_sum_of_column_A: ' + total_sum_of_column_F + ' is below threshold: ' + MIN_SUM_THRESHOLD) # small_geography_dimension_dataframe # 25 MB of parquet, 4 columns [A, K, L, M] # Columns [A, K, L] contain only string data # Column [M] is an integer # Columns [A, K, L, M] contain all non nullable data. Assume this is the case small_geography_dimension_dataset = '/location_2' small_geography_dimension_dataframe = spark.read.parquet(small_geography_dimension_dataset) # Join very_large_dataframe to small_geography_dimension_dataframe on column [A] # Include only column [M] from small_geography_dimension_dataframe on new very_large_dataframe # No data (row count) loss should occur from very_large_dataframe very_large_dataframe = very_large_dataframe.join(small_geography_dimension_dataframe, (very_large_dataframe.A == small_geography_dimension_dataframe.A)) # small_product_dimension_dataframe # 50 MB of parquet, 4 columns [B, N, O, P] # Columns [B, N] contain only string data # Columns [O, P] contain only integers # Columns [B, N, O, P] contain all non nullable data. Assume this is the case small_product_dimension_dataset = './location_3' # 50 MB of parquet small_product_dimension_dataframe = spark.read.parquet(small_product_dimension_dataset) # TODO # Join very_large_dataframe to small_product_dimension_dataframe on column [B] # Only join records to small_product_dimension_dataframe where O is greater then 10 # Keep only Column [P] very_large_dataframe = very_large_dataframe.join(small_product_dimension_dataframe, (very_large_dataframe.B == small_product_dimension_dataframe.B))