arundhaj

regression towards the datascience

Calculate difference with previous row in PySpark

 

To find the difference between the current row value and the previous row value in spark programming with PySpark is as below

Let say, we have the following DataFrame and we shall now calculate the difference of values between consecutive rows.

+---+-----+
| id|value|
+---+-----+
|  1|   65|
|  2|   66|
|  3|   65|
|  4|   68|
|  5|   71|
+---+-----+

We first, create a new column with previous row's value as below

+---+-----+----------+
| id|value|prev_value|
+---+-----+----------+
|  1|   65|      null|
|  2|   66|        65|
|  3|   65|        66|
|  4|   68|        65|
|  5|   71|        68|
+---+-----+----------+

The following code snippet finds us the desired results.

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window

sc = SparkContext(appName="PrevRowDiffApp")
sqlc = SQLContext(sc)

rdd = sc.parallelize([(1, 65), (2, 66), (3, 65), (4, 68), (5, 71)])

df = sqlc.createDataFrame(rdd, ["id", "value"])

my_window = Window.partitionBy().orderBy("id")

df = df.withColumn("prev_value", F.lag(df.value).over(my_window))
df = df.withColumn("diff", F.when(F.isnull(df.value - df.prev_value), 0)
                              .otherwise(df.value - df.prev_value))

df.show()

The final result is in diff column.

+---+-----+----------+----+
| id|value|prev_value|diff|
+---+-----+----------+----+
|  1|   65|      null|   0|
|  2|   66|        65|   1|
|  3|   65|        66|  -1|
|  4|   68|        65|   3|
|  5|   71|        68|   3|
+---+-----+----------+----+

Hope this helps!

Comments