Mysterious Spark Checkpoints Behaviour

Mysterious Spark Checkpoints Behaviour

·

3 min read

It all started from a change in the checkpoint path of our Spark applications. We use Spark Structured streaming and AWS S3 buckets to maintain checkpoints. Let’s say we were using s3://bucket/spark/topic/ as the checkpoint path, and we changed it to s3://bucket/spark/topic/v1.

Given that we weren’t touching anything from the content of this directory, and we didn’t want to double ingest the data and do the chore of reprocessing everything and cleanup, we decided to move on with a straightforward migration script to move existing checkpoint details to the new path. On the paper, everything was clear and expected to work, right?

Nope! The error:

‍‍‍java.lang.AssertionError: assertion failed: There are \[1\] sources in the checkpoint offsets and now there are \[3\] sources requested by the query. Cannot continue.

The initial search ends in this StackOverflow thread and circles back to fault tolerance semantic docs, but it doesn’t make sense! Right? We haven’t changed anything regarding the sources or the application’s business logic. There are also no hard-coded values to tell Spark they are changing the checkpoint path.

The code was introduced many versions ago to Spark, and all it does is check the number of sources used by the current process and compare it with the file extracted from the S3 bucket to make sure they match. Don’t they fit?

No! We just deleted the existing offset to pass over this error and realized the new checkpoints are rather strange:

v1
{"batchWatermarkMs":0,"batchTimestampMs":1703081374762,"conf":
{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}} 
{"topic_1":{"0":326}} 
{"topic_1":{"0":326}}
{"topic_1":{"0":326}}

Lines 3 to 5 are supposed to show the sources used in the query, and for some reason, the only source topic we have is repeated three times in the updated application.

Further digging into the code, we found the logical plan of the application drives the whole checkpoint mechanism. It takes one df.explain("formatted"), and we realize there are three occurrences of StreamingRelation with Kafka as argument and the same set of output columns.

We are reading one topic and pass it to business logic, which has not been changed. Is that causing the issue? Checking the business logic, we realize we have, indeed, three select statement on the input data frame, and we union their results together.

But why wasn’t this issue happening before, and it is now? Comparing our branches, there is another change in the code: The way we invoke the business logic on the input data frame has a slight difference. Back then, we were doing the following:
df.writeStream.foreachBatch(transformation)

And we changed it to the following to make it more clear:

df.transform(transformation).writeStream

Each of these approaches has its benefits; however, the main difference is how the Spark optimizer orchestrates the job. For the first one, it only relies on the input batch as a single data frame to do many selects; however, for the latter, it distributes the tasks into N number of reads + selects, where N is the number of select statements.

So, in theory, if we add a new select Statement to a new application: it’ll contradict the fault tolerance rules and fail unless we delete the checkpoints! Of course, if that’s not the desired behavior, we can contain it into foreachBatch callback, and even cache the initial data frame, and have consistency with the cost of some parallelism.

That was it. A storage behavior, with some curiosity, turned into a learning point for us. What is your experience with this subject?