huanliwang-db commented on code in PR #46351: URL: https://github.com/apache/spark/pull/46351#discussion_r1589740422
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala: ########## @@ -351,7 +351,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with } override def close(): Unit = { - synchronized { loadedMaps.values.asScala.foreach(_.clear()) } + synchronized { loadedMaps.keySet().asScala.toSeq.foreach(loadedMaps.remove) } Review Comment: the implementation of treeMap.clear is ``` public void clear() { modCount++; size = 0; root = null; } ``` where ``` private transient Entry<K,V> root; ``` so i think it just resets for the `root` and won't clear the values ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala: ########## @@ -388,6 +388,33 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } } + test("SPARK-48105: state store unload/close happens during the maintenance") { + tryWithProviderResource( + newStoreProvider(opId = Random.nextInt(), partition = 0, minDeltasForSnapshot = 1)) { + provider => + val store = provider.getStore(0).asInstanceOf[provider.HDFSBackedStateStore] + val keys = (1 to 20).map(i => ("a" + i)) + keys.foreach(put(store, _, 0, 0)) + // commit state store with 20 keys. + store.commit() + // get the state store iterator: mimic the case which the iterator is hold in the + // maintenance thread. + val storeIterator = store.iterator() + // If the provider is loaded in another executor, it will be unloaded and closed in + // current executor. + provider.close() + // the store iterator should still be valid as the maintenance thread may have already + // hold it and is doing snapshotting even thought the state store is unloaded. + val outputKeys = new mutable.ArrayBuffer[String] + while (storeIterator.hasNext) { Review Comment: yes, it fails consistently without the change in `close` function. ``` - SPARK-48105: state store unload/close happens during the maintenance *** FAILED *** (725 milliseconds) [info] Vector("a1", "a10", "a11", "a12", "a13", "a14", "a15", "a16", "a17", "a18", "a19", "a2", "a20", "a3", "a4", "a5", "a6", "a7", "a8", "a9") did not equal ArrayBuffer("a1", "a11", "a12", "a13", "a15", "a18", "a19", "a4", "a6", "a8", "a9") (StateStoreSuite.scala:424) [info] Analysis: [info] Vector1(1: "a10" -> "a11", 2: "a11" -> "a12", 3: "a12" -> "a13", 4: "a13" -> "a15", 5: "a14" -> "a18", 6: "a15" -> "a19", 7: "a16" -> "a4", 8: "a17" -> "a6", 9: "a18" -> "a8", 10: "a19" -> "a9", 11: "a2" -> , 12: "a20" -> , 13: "a3" -> , 14: "a4" -> , 15: "a5" -> , 16: "a6" -> , 17: "a7" -> , 18: "a8" -> , 19: "a9" -> ) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org