Pyspark – Difference between 2 dataframes – Identify inserts, updates and deletes

Issue

I have 2 dataframes df1(old) and df2(new). I am trying compare df2 with df1 and find the newly added rows, deleted rows, updated rows along with the names of the columns that got updated.

Here is the code that I have written

from pyspark.sql.functions import col, array, when, array_remove, lit

data1 = [("James","rob","Smith","36636","M",3000),
    ("Michael","Rose","jim","40288","M",4000),
    ("Robert","dunkin","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","60563","F",-1)
  ]

data2 = [("James","rob","Smith","36636","M",3000),
    ("Robert","dunkin","Williams","42114","M",2000),
    ("Maria","Anne","Jones","72712","F",3000),
    ("Yesh","Reddy","Brown","75234","M",3000),
    ("Jen","Mary","Brown","60563","F",-1)
  ]
  schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df1 = spark.createDataFrame(data=data1,schema=schema)
df2 = spark.createDataFrame(data=data2,schema=schema)
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c not in ['firstname','middlename','lastname']]

select_expr =[
                col("firstname"),col("middlename"),col("lastname"),
                *[df2[c] for c in df2.columns if c not in ['firstname','middlename','lastname']], 
                array_remove(array(*conditions_), "").alias("updated_columns")
]

df1.join(df2, ["firstname","middlename","lastname"],"inner").select(*select_expr).show()

Here is the output that I got

+---------+----------+--------+-----+------+------+---------------+
|firstname|middlename|lastname|   id|gender|salary|updated_columns|
+---------+----------+--------+-----+------+------+---------------+
|    James|       rob|   Smith|36636|     M|  3000|             []|
|   Robert|    dunkin|Williams|42114|     M|  2000|       [salary]|
|    Maria|      Anne|   Jones|72712|     F|  3000|   [id, salary]|
|      Jen|      Mary|   Brown|60563|     F|    -1|             []|
+---------+----------+--------+-----+------+------+---------------+

Here is the output that I am expecting

+---------+----------+--------+-----+------+------+---------------+-----------------+
|firstname|middlename|lastname|   id|gender|salary|updated_columns|           status|
+---------+----------+--------+-----+------+------+---------------+-----------------+
|    James|       rob|   Smith|36636|     M|  3000|             []|        unchanged|
|   Robert|    dunkin|Williams|42114|     M|  2000|       [salary]|          updated|
|  Michael|      Rose|     jim|40288|     M|  4000|             []|          deleted|
|    Maria|      Anne|   Jones|72712|     F|  3000|   [id, salary]|          updated|
|     Yesh|     Reddy|   Brown|75234|     M|  3000|             []|            added|
|      Jen|      Mary|   Brown|60563|     F|    -1|             []|        unchanged|
+---------+----------+--------+-----+------+------+---------------+-----------------+

I know that I can find the added and deleted rows using the left anti joins separately. But, I am looking for ways to update my existing join to get the above output.

Solution

An outer join would help in your case. I have modified the code you have given to do this and also include the status column.

Minimally Working Example

from pyspark.sql.functions import col, array, when, array_remove, lit, size, coalesce
from pyspark.sql.types import *

data1 = [("James","rob","Smith","36636","M",3000),
    ("Michael","Rose","jim","40288","M",4000),
    ("Robert","dunkin","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","60563","F",-1)
  ]

data2 = [("James","rob","Smith","36636","M",3000),
    ("Robert","dunkin","Williams","42114","M",2000),
    ("Maria","Anne","Jones","72712","F",3000),
    ("Yesh","Reddy","Brown","75234","M",3000),
    ("Jen","Mary","Brown","60563","F",-1)
  ]
schema = StructType([
    StructField("firstname",StringType(),True),
    StructField("middlename",StringType(),True),
    StructField("lastname",StringType(),True),
    StructField("id", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("salary", IntegerType(), True)
  ])
 
df1 = spark.createDataFrame(data=data1,schema=schema)
df2 = spark.createDataFrame(data=data2,schema=schema)
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c not in ['firstname','middlename','lastname']]

Logic for status Column and modified select_expr to coalesce values from df2 and df1 with preference given to df2 to get the update to date data.

status = when(df1["id"].isNull(), lit("added")).when(df2["id"].isNull(), lit("deleted")).when(size(array_remove(array(*conditions_), "")) > 0, lit("updated")).otherwise("unchanged")

select_expr =[
                col("firstname"),col("middlename"),col("lastname"),
                *[coalesce(df2[c], df1[c]).alias(c) for c in df2.columns if c not in ['firstname','middlename','lastname']],                
                array_remove(array(*conditions_), "").alias("updated_columns"),
                status.alias("status"),
]

Finally, applying an outer join.

df1.join(df2, ["firstname","middlename","lastname"],"outer").select(*select_expr).show()

Output

+---------+----------+--------+-----+------+------+---------------+---------+
|firstname|middlename|lastname|   id|gender|salary|updated_columns|   status|
+---------+----------+--------+-----+------+------+---------------+---------+
|    James|       rob|   Smith|36636|     M|  3000|             []|unchanged|
|      Jen|      Mary|   Brown|60563|     F|    -1|             []|unchanged|
|    Maria|      Anne|   Jones|72712|     F|  3000|   [id, salary]|  updated|
|  Michael|      Rose|     jim|40288|     M|  4000|             []|  deleted|
|   Robert|    dunkin|Williams|42114|     M|  2000|       [salary]|  updated|
|     Yesh|     Reddy|   Brown|75234|     M|  3000|             []|    added|
+---------+----------+--------+-----+------+------+---------------+---------+

Answered By – Nithish

This Answer collected from stackoverflow, is licensed under cc by-sa 2.5 , cc by-sa 3.0 and cc by-sa 4.0

Leave a Reply

(*) Required, Your email will not be published