HeartSaVioR commented on code in PR #46351:
URL: https://github.com/apache/spark/pull/46351#discussion_r1589729057


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -388,6 +388,33 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     }
   }
 
+  test("SPARK-48105: state store unload/close happens during the maintenance") 
{
+    tryWithProviderResource(
+      newStoreProvider(opId = Random.nextInt(), partition = 0, 
minDeltasForSnapshot = 1)) {
+      provider =>
+        val store = 
provider.getStore(0).asInstanceOf[provider.HDFSBackedStateStore]
+        val keys = (1 to 20).map(i => ("a" + i))
+        keys.foreach(put(store, _, 0, 0))
+        // commit state store with 20 keys.
+        store.commit()
+        // get the state store iterator: mimic the case which the iterator is 
hold in the
+        // maintenance thread.
+        val storeIterator = store.iterator()
+        // If the provider is loaded in another executor, it will be unloaded 
and closed in
+        // current executor.
+        provider.close()
+        // the store iterator should still be valid as the maintenance thread 
may have already
+        // hold it and is doing snapshotting even thought the state store is 
unloaded.

Review Comment:
   nit: though(t) remove t



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to