Python Code Review and Challenge - Printable Version +- Python Forum (https://python-forum.io) +-- Forum: Python Coding (https://python-forum.io/forum-7.html) +--- Forum: Homework (https://python-forum.io/forum-9.html) +--- Thread: Python Code Review and Challenge (/thread-36055.html) |
Python Code Review and Challenge - cpatte7372 - Jan-13-2022 Hello Community, I have been given a challenge to review the following PySpark code and comment on any areas where I see there are errors. While I understand most of the code there are few areas that I would like clarifying. For example, there is a requirement to: # Column F: Need to apply conversion factor of 2.5 i.e. Value 2, conversion factor 2.5 = 5Also there is the following requirement: # Join very_large_dataframe to small_product_dimension_dataframe on column [B] # Only join records to small_product_dimension_dataframe where O is greater then 10 # Keep only Column [P]The first part of the requirement is achievable using the following code: very_large_dataframe = very_large_dataframe.join(small_product_dimension_dataframe, (very_large_dataframe.B == small_product_dimension_dataframe.B))However, I'm not sure how to achieve the following two requirements, the full code is below. Any thoughts greatly appreciated. MIN_SUM_THRESHOLD = 10, 000, 000 def has_columns(data_frame, column_name_list): for column_name in column_name_list: if not data_frame.columns.contains(column_name): raise Exception('Column is missing: ' + column_name) def column_count(data_frame): return data_frame.columns.size def process(): # Create spark session spark = SparkSession.builder.getOrCreate() # very_large_dataframe # 250 GB of CSV files from client which must have only 10 columns [A, B, C, D, E, F, G, H, I, J] # [A, B] contains string data # [C, D, E, F, G, H, I, J] contains decimals with precision 5, scale 2 (i.e. 125.75) # [A, B, C, D, E] should not be null # [F, G, H, I, J] should may be null very_large_dataset_location = '/Sourced/location_1' very_large_dataframe = spark.read.csv(very_large_dataset_location, header=True, sep="\t") # validate column count if column_count(very_large_dataframe) != 10: raise Exception('Incorrect column count: ' + column_count(very_large_dataframe)) # validate that dataframe has all required columns has_columns(very_large_dataframe, ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J']) # TODO # Column F: Need to apply conversion factor of 2.5 i.e. Value 2, conversion factor 2.5 = 5 # Remove duplicates in column [A] # 50% of records in column [A] could potentially be duplicates very_large_dataframe = very_large_dataframe.dropDuplicates(['A']) # Get count of column F and ensure it is above MIN_SUM_THRESHOLD total_sum_of_column_F = very_large_dataframe.agg(sum('F')).collect()[0][0] if total_sum_of_column_F < MIN_SUM_THRESHOLD: raise Exception('total_sum_of_column_A: ' + total_sum_of_column_F + ' is below threshold: ' + MIN_SUM_THRESHOLD) # small_geography_dimension_dataframe # 25 MB of parquet, 4 columns [A, K, L, M] # Columns [A, K, L] contain only string data # Column [M] is an integer # Columns [A, K, L, M] contain all non nullable data. Assume this is the case small_geography_dimension_dataset = '/location_2' small_geography_dimension_dataframe = spark.read.parquet(small_geography_dimension_dataset) # Join very_large_dataframe to small_geography_dimension_dataframe on column [A] # Include only column [M] from small_geography_dimension_dataframe on new very_large_dataframe # No data (row count) loss should occur from very_large_dataframe very_large_dataframe = very_large_dataframe.join(small_geography_dimension_dataframe, (very_large_dataframe.A == small_geography_dimension_dataframe.A)) # small_product_dimension_dataframe # 50 MB of parquet, 4 columns [B, N, O, P] # Columns [B, N] contain only string data # Columns [O, P] contain only integers # Columns [B, N, O, P] contain all non nullable data. Assume this is the case small_product_dimension_dataset = './location_3' # 50 MB of parquet small_product_dimension_dataframe = spark.read.parquet(small_product_dimension_dataset) # TODO # Join very_large_dataframe to small_product_dimension_dataframe on column [B] # Only join records to small_product_dimension_dataframe where O is greater then 10 # Keep only Column [P] very_large_dataframe = very_large_dataframe.join(small_product_dimension_dataframe, (very_large_dataframe.B == small_product_dimension_dataframe.B)) RE: Python Code Review and Challenge - buran - Jan-14-2022 Do you review code by someone else or are you required to write that code? I moved the thread to Homework section of the forum. RE: Python Code Review and Challenge - cpatte7372 - Jan-14-2022 (Jan-14-2022, 09:43 AM)buran Wrote: Do you review code by someone else or are you required to write that code?Thanks for reaching out. I need to review the code, however I'm also required to write the code. |