huanliwang-db commented on code in PR #46351:
URL: https://github.com/apache/spark/pull/46351#discussion_r1589740422


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -351,7 +351,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   }
 
   override def close(): Unit = {
-    synchronized { loadedMaps.values.asScala.foreach(_.clear()) }
+    synchronized { 
loadedMaps.keySet().asScala.toSeq.foreach(loadedMaps.remove) }

Review Comment:
   the implementation of treeMap.clear is
   ```
   public void clear() {
           modCount++;
           size = 0;
           root = null;
       }
   ```
   where
   ```
   private transient Entry<K,V> root;
   ```
   so i think it just resets for the `root` and won't clear the values 
   



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -388,6 +388,33 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     }
   }
 
+  test("SPARK-48105: state store unload/close happens during the maintenance") 
{
+    tryWithProviderResource(
+      newStoreProvider(opId = Random.nextInt(), partition = 0, 
minDeltasForSnapshot = 1)) {
+      provider =>
+        val store = 
provider.getStore(0).asInstanceOf[provider.HDFSBackedStateStore]
+        val keys = (1 to 20).map(i => ("a" + i))
+        keys.foreach(put(store, _, 0, 0))
+        // commit state store with 20 keys.
+        store.commit()
+        // get the state store iterator: mimic the case which the iterator is 
hold in the
+        // maintenance thread.
+        val storeIterator = store.iterator()
+        // If the provider is loaded in another executor, it will be unloaded 
and closed in
+        // current executor.
+        provider.close()
+        // the store iterator should still be valid as the maintenance thread 
may have already
+        // hold it and is doing snapshotting even thought the state store is 
unloaded.
+        val outputKeys = new mutable.ArrayBuffer[String]
+        while (storeIterator.hasNext) {

Review Comment:
   yes, it fails consistently without the change in `close` function.
   
   ```
    - SPARK-48105: state store unload/close happens during the maintenance *** 
FAILED *** (725 milliseconds)
   [info]   Vector("a1", "a10", "a11", "a12", "a13", "a14", "a15", "a16", 
"a17", "a18", "a19", "a2", "a20", "a3", "a4", "a5", "a6", "a7", "a8", "a9") did 
not equal ArrayBuffer("a1", "a11", "a12", "a13", "a15", "a18", "a19", "a4", 
"a6", "a8", "a9") (StateStoreSuite.scala:424)
   [info]   Analysis:
   [info]   Vector1(1: "a10" -> "a11", 2: "a11" -> "a12", 3: "a12" -> "a13", 4: 
"a13" -> "a15", 5: "a14" -> "a18", 6: "a15" -> "a19", 7: "a16" -> "a4", 8: 
"a17" -> "a6", 9: "a18" -> "a8", 10: "a19" -> "a9", 11: "a2" -> , 12: "a20" -> 
, 13: "a3" -> , 14: "a4" -> , 15: "a5" -> , 16: "a6" -> , 17: "a7" -> , 18: 
"a8" -> , 19: "a9" -> )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to