Clenaup CSV file - Printable Version +- Python Forum (https://python-forum.io) +-- Forum: Python Coding (https://python-forum.io/forum-7.html) +--- Forum: General Coding Help (https://python-forum.io/forum-8.html) +--- Thread: Clenaup CSV file (/thread-40552.html) |
Clenaup CSV file - IgorSh - Aug-17-2023 Hi all, I have a CSV file with duplicat header rows, for example ID Alias Model FXRate Class ID Alias Model FXRate Class UL18 test GBP 1.3142 STK UL19 test GBP 1.3142 STK UL20 test SEK 0.10639 STK UL21 test USD 1 STK UL22 test USD 1 STK UL23 test USD 1 STK ID Alias Model FXRate Class First row it' a header, second and last rows it's a duplicate There may be a duplicate record in the middle of the data, and it may not be at the end I use pyspark to read a file with the inferSchema=true option that changes the DF according to the type it recognizes, for example FXRate recognizes decimal and then I get this: ID Alias Model FXRate Class ID Alias Model NULL Class UL18 test GBP 1.3142 STK UL19 test GBP 1.3142 STK UL20 test SEK 0.10639 STK UL21 test USD 1 STK UL22 test USD 1 STK UL23 test USD 1 STK ID Alias Model NULL Class I thought of using pandas df which I create from sparkdf This is my code not working properly infer_schema = "true" read_file_location = f'abfss://{ContainerName}@{dl_storage_account}.dfs.core.windows.net/{FilePath}/{FileName}' df = ( spark.read.format(FileType) \ .option("inferSchema", infer_schema) \ .option("header", Header) \ .option("sep", Delimiter) \ .load(read_file_location) ) dup_count = df.groupBy(df.columns).count().where(f.col('count') > 1).select(f.max('count')).collect()[0][0] # get max duplicate rows in dataframe if Header and dup_count is not None: for i in range(dup_count): df_header = spark.read.format(FileType).load(read_file_location).limit(1) df = df.exceptAll(df_header)Thanks for the help RE: Clenaup CSV file - snippsat - Aug-17-2023 Just as info a big No💤 for i in range(dup_count): in both Pandas or pyspark.Can do in Pandas and covert with spark.createDataFrame(df) import pandas as pd from pyspark.sql import SparkSession from pyspark.sql.functions import col # Initialize a Spark session spark = SparkSession.builder.appName("DataProcessing").getOrCreate() df = pd.read_csv('fx.csv', sep=",") df.drop_duplicates(keep='first', inplace=True) df = df.iloc[1:] # Convert the pandas DataFrame to a Spark DataFrame df_spark = spark.createDataFrame(df) df_spark.show() spark.stop() In just pyspark,i don't use pyspark but did help someone to install recently,so can do some test.from pyspark.sql import SparkSession from pyspark.sql.functions import col # Initialize a Spark session spark = SparkSession.builder.appName("DataProcessing").getOrCreate() df_spark = spark.read.csv('fx.csv', header=True, inferSchema=True) df_spark = df_spark.dropDuplicates() df_spark = df_spark.filter(col("ID") != "ID") df_spark.show() spark.stop() fx.csv:
|