Python Forum
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Clenaup CSV file
#1
Hi all,

I have a CSV file with duplicat header rows, for example

ID Alias Model FXRate Class
ID Alias Model FXRate Class
UL18 test GBP 1.3142 STK
UL19 test GBP 1.3142 STK
UL20 test SEK 0.10639 STK
UL21 test USD 1 STK
UL22 test USD 1 STK
UL23 test USD 1 STK
ID Alias Model FXRate Class

First row it' a header, second and last rows it's a duplicate
There may be a duplicate record in the middle of the data, and it may not be at the end
I use pyspark to read a file with the inferSchema=true option that changes the DF according to the type it recognizes, for example FXRate recognizes decimal and then I get this:

ID Alias Model FXRate Class
ID Alias Model NULL Class
UL18 test GBP 1.3142 STK
UL19 test GBP 1.3142 STK
UL20 test SEK 0.10639 STK
UL21 test USD 1 STK
UL22 test USD 1 STK
UL23 test USD 1 STK
ID Alias Model NULL Class

I thought of using pandas df which I create from sparkdf
This is my code not working properly
infer_schema = "true"
read_file_location = f'abfss://{ContainerName}@{dl_storage_account}.dfs.core.windows.net/{FilePath}/{FileName}'

df = (
  spark.read.format(FileType) \
    .option("inferSchema", infer_schema) \
    .option("header", Header) \
    .option("sep", Delimiter) \
    .load(read_file_location)
)

dup_count = df.groupBy(df.columns).count().where(f.col('count') > 1).select(f.max('count')).collect()[0][0] # get max duplicate rows in dataframe

if Header and dup_count is not None:
  for i in range(dup_count):
    df_header = spark.read.format(FileType).load(read_file_location).limit(1)
    df = df.exceptAll(df_header)
Thanks for the help
snippsat write Aug-17-2023, 11:13 AM:
Added code tag in your post,look at BBCode on how to use.

Attached Files

Thumbnail(s)
   
Reply
#2
Just as info a big No💤 for i in range(dup_count): in both Pandas or pyspark.
Can do in Pandas and covert with spark.createDataFrame(df)
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize a Spark session
spark = SparkSession.builder.appName("DataProcessing").getOrCreate()

df = pd.read_csv('fx.csv', sep=",")
df.drop_duplicates(keep='first', inplace=True)
df = df.iloc[1:]

# Convert the pandas DataFrame to a Spark DataFrame
df_spark = spark.createDataFrame(df)
df_spark.show()
spark.stop()
Output:
ID Alias Model FXRate Class 1 UL18 test GBP 1.3142 STK 2 UL19 test GBP 1.3142 STK 3 UL20 test SEK 0.10639 STK 6 UL21 test USD 1 STK 7 UL22 test USD 1 STK 8 UL23 test USD 1 STK 23/08/17 13:38:23 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes +----+-----+-----+-------+-----+ | ID|Alias|Model| FXRate|Class| +----+-----+-----+-------+-----+ |UL18| test| GBP| 1.3142| STK| |UL19| test| GBP| 1.3142| STK| |UL20| test| SEK|0.10639| STK| |UL21| test| USD| 1| STK| |UL22| test| USD| 1| STK| |UL23| test| USD| 1| STK| +----+-----+-----+-------+-----+
In just pyspark,i don't use pyspark but did help someone to install recently,so can do some test.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize a Spark session
spark = SparkSession.builder.appName("DataProcessing").getOrCreate()

df_spark = spark.read.csv('fx.csv', header=True, inferSchema=True)
df_spark = df_spark.dropDuplicates()
df_spark = df_spark.filter(col("ID") != "ID")
df_spark.show()
spark.stop()
Output:
+----+-----+-----+-------+-----+ | ID|Alias|Model| FXRate|Class| +----+-----+-----+-------+-----+ |UL18| test| GBP| 1.3142| STK| |UL20| test| SEK|0.10639| STK| |UL22| test| USD| 1.0| STK| |UL23| test| USD| 1.0| STK| |UL21| test| USD| 1.0| STK| |UL19| test| GBP| 1.3142| STK| +----+-----+-----+-------+-----+
fx.csv:
Output:
ID,Alias,Model,FXRate,Class ID,Alias,Model,FXRate,Class UL18,test,GBP,1.3142,STK UL19,test,GBP,1.3142,STK UL20,test,SEK,0.10639,STK ID,Alias,Model,FXRate,Class ID,Alias,Model,FXRate,Class UL21,test,USD,1,STK UL22,test,USD,1,STK UL23,test,USD,1,STK ID,Alias,Model,FXRate,Class
Reply


Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020