@@ -993,6 +993,37 @@ public void testGetPinnedTimestampLockedFilesWithPinnedTimestampsDifferentPrefix
993993 assertEquals (0 , metadataFilePinnedTimestampCache .size ());
994994 }
995995
996+ /**
997+ * This test checks the case when a stale writer is uploading metadata files with higher timestamp, but lower primary
998+ * term.
999+ */
1000+ public void testGetPinnedTimestampLockedFilesForDivergentWrites () {
1001+ setupRemotePinnedTimestampFeature (true );
1002+
1003+ Map <Long , String > metadataFilePinnedTimestampCache = new HashMap <>();
1004+
1005+ // Pinned timestamp 7000
1006+ // Primary Term - Timestamp in md file
1007+ // 6 - 7002
1008+ // 3 - 6999
1009+ // 4 - 6998
1010+ // 5 - 6995
1011+ // 5 - 6990
1012+ Tuple <Map <Long , String >, Set <String >> metadataAndLocks = testGetPinnedTimestampLockedFilesWithPinnedTimestamps (
1013+ Map .of (7002L , 6L , 6999L , 3L , 6998L , 4L , 6995L , 5L , 6990L , 5L ),
1014+ Set .of (4000L , 5000L , 6000L , 7000L ),
1015+ metadataFilePinnedTimestampCache
1016+ );
1017+ Map <Long , String > metadataFiles = metadataAndLocks .v1 ();
1018+ Set <String > implicitLockedFiles = metadataAndLocks .v2 ();
1019+
1020+ assertEquals (1 , implicitLockedFiles .size ());
1021+ assertTrue (implicitLockedFiles .contains (metadataFiles .get (6995L )));
1022+ // Now we cache all the matches except the last one.
1023+ assertEquals (1 , metadataFilePinnedTimestampCache .size ());
1024+ assertEquals (metadataFiles .get (6995L ), metadataFilePinnedTimestampCache .get (7000L ));
1025+ }
1026+
9961027 public void testFilterOutMetadataFilesBasedOnAgeFeatureDisabled () {
9971028 setupRemotePinnedTimestampFeature (false );
9981029 List <String > metadataFiles = new ArrayList <>();
0 commit comments