anishshri-db commented on code in PR #46351: URL: https://github.com/apache/spark/pull/46351#discussion_r1588759416
########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala: ########## @@ -388,6 +388,33 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } } + test("SPARK-48105: state store unload/close happens during the maintenance") { + tryWithProviderResource( + newStoreProvider(opId = Random.nextInt(), partition = 0, minDeltasForSnapshot = 1)) { + provider => + val store = provider.getStore(0).asInstanceOf[provider.HDFSBackedStateStore] + val keys = (1 to 20).map(i => ("a" + i)) + keys.foreach(put(store, _, 0, 0)) + // commit state store with 20 keys. + store.commit() + // get the state store iterator: mimic the case which the iterator is hold in the + // maintenance thread. + val storeIterator = store.iterator() + // If the provider is loaded in another executor, it will be unloaded and closed in + // current executor. + provider.close() + // the store iterator should still be valid as the maintenance thread may have already + // hold it and is doing snapshotting even thought the state store is unloaded. + val outputKeys = new mutable.ArrayBuffer[String] + while (storeIterator.hasNext) { Review Comment: Does this test fail before the change ? Predictably ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org