To find the difference between the current row value and the previous row value in spark programming with PySpark is as below
Let say, we have the following DataFrame and we shall now calculate the difference of values between consecutive rows.
+---+-----+
| id|value|
+---+-----+
| 1| 65|
| 2| 66|
| 3| 65|
| 4| 68|
| 5| 71|
+---+-----+
We first, create a new column with previous row's value as below
+---+-----+----------+
| id|value|prev_value|
+---+-----+----------+
| 1| 65| null|
| 2| 66| 65|
| 3| 65| 66|
| 4| 68| 65|
| 5| 71| 68|
+---+-----+----------+
The following code snippet finds us the desired results.
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window
sc = SparkContext(appName="PrevRowDiffApp")
sqlc = SQLContext(sc)
rdd = sc.parallelize([(1, 65), (2, 66), (3, 65), (4, 68), (5, 71)])
df = sqlc.createDataFrame(rdd, ["id", "value"])
my_window = Window.partitionBy().orderBy("id")
df = df.withColumn("prev_value", F.lag(df.value).over(my_window))
df = df.withColumn("diff", F.when(F.isnull(df.value - df.prev_value), 0)
.otherwise(df.value - df.prev_value))
df.show()
The final result is in diff column.
+---+-----+----------+----+
| id|value|prev_value|diff|
+---+-----+----------+----+
| 1| 65| null| 0|
| 2| 66| 65| 1|
| 3| 65| 66| -1|
| 4| 68| 65| 3|
| 5| 71| 68| 3|
+---+-----+----------+----+
Hope this helps!