1313 */
1414package io .trino .plugin .kafka .schema .confluent ;
1515
16- import com .fasterxml .jackson .annotation .JsonCreator ;
17- import com .fasterxml .jackson .annotation .JsonProperty ;
1816import com .google .common .collect .ImmutableList ;
1917import com .google .common .collect .ImmutableMap ;
2018import dev .failsafe .Failsafe ;
@@ -93,7 +91,7 @@ public void testBasicTopic()
9391 {
9492 String topic = "topic-basic-MixedCase-" + randomNameSuffix ();
9593 assertTopic (
96- testingKafka , topic ,
94+ topic ,
9795 format ("SELECT col_1, col_2 FROM %s" , toDoubleQuoted (topic )),
9896 format ("SELECT col_1, col_2, col_3 FROM %s" , toDoubleQuoted (topic )),
9997 false ,
@@ -108,7 +106,7 @@ public void testTopicWithKeySubject()
108106 {
109107 String topic = "topic-Key-Subject-" + randomNameSuffix ();
110108 assertTopic (
111- testingKafka , topic ,
109+ topic ,
112110 format ("SELECT \" %s-key\" , col_1, col_2 FROM %s" , topic , toDoubleQuoted (topic )),
113111 format ("SELECT \" %s-key\" , col_1, col_2, col_3 FROM %s" , topic , toDoubleQuoted (topic )),
114112 true ,
@@ -182,7 +180,7 @@ public void testTopicWithRecordNameStrategy()
182180 {
183181 String topic = "topic-Record-Name-Strategy-" + randomNameSuffix ();
184182 assertTopic (
185- testingKafka , topic ,
183+ topic ,
186184 format ("SELECT \" %1$s-key\" , col_1, col_2 FROM \" %1$s&value-subject=%2$s\" " , topic , RECORD_NAME ),
187185 format ("SELECT \" %1$s-key\" , col_1, col_2, col_3 FROM \" %1$s&value-subject=%2$s\" " , topic , RECORD_NAME ),
188186 true ,
@@ -198,7 +196,7 @@ public void testTopicWithTopicRecordNameStrategy()
198196 {
199197 String topic = "topic-Topic-Record-Name-Strategy-" + randomNameSuffix ();
200198 assertTopic (
201- testingKafka , topic ,
199+ topic ,
202200 format ("SELECT \" %1$s-key\" , col_1, col_2 FROM \" %1$s&value-subject=%1$s-%2$s\" " , topic , RECORD_NAME ),
203201 format ("SELECT \" %1$s-key\" , col_1, col_2, col_3 FROM \" %1$s&value-subject=%1$s-%2$s\" " , topic , RECORD_NAME ),
204202 true ,
@@ -263,7 +261,6 @@ private static ImmutableMap.Builder<String, String> schemaRegistryAwareProducer(
263261 }
264262
265263 private void assertTopic (
266- TestingKafka testingKafka ,
267264 String topicName ,
268265 String initialQuery ,
269266 String evolvedQuery ,
@@ -278,10 +275,7 @@ private void assertTopic(
278275 waitUntilTableExists (topicName );
279276 assertCount (topicName , MESSAGE_COUNT );
280277
281- QueryAssertions queryAssertions = new QueryAssertions (getQueryRunner ());
282- queryAssertions .query (initialQuery )
283- .assertThat ()
284- .containsAll (getExpectedValues (messages , INITIAL_SCHEMA , isKeyIncluded ));
278+ assertThat (query (initialQuery )).matches (getExpectedValues (messages , INITIAL_SCHEMA , isKeyIncluded ));
285279
286280 List <ProducerRecord <Long , GenericRecord >> newMessages = createMessages (topicName , MESSAGE_COUNT , false );
287281 testingKafka .sendMessages (newMessages .stream (), producerConfig );
@@ -291,9 +285,8 @@ private void assertTopic(
291285 .addAll (newMessages )
292286 .build ();
293287 assertCount (topicName , allMessages .size ());
294- queryAssertions .query (evolvedQuery )
295- .assertThat ()
296- .containsAll (getExpectedValues (messages , EVOLVED_SCHEMA , isKeyIncluded ));
288+
289+ assertThat (query (evolvedQuery )).containsAll (getExpectedValues (messages , EVOLVED_SCHEMA , isKeyIncluded ));
297290 }
298291
299292 private static String getExpectedValues (List <ProducerRecord <Long , GenericRecord >> messages , Schema schema , boolean isKeyIncluded )
@@ -365,25 +358,25 @@ private void assertNotExists(String tableName)
365358 private void waitUntilTableExists (String tableName )
366359 {
367360 Failsafe .with (
368- RetryPolicy .builder ()
369- .withMaxAttempts (10 )
370- .withDelay (Duration .ofMillis (100 ))
371- .build ())
361+ RetryPolicy .builder ()
362+ .withMaxAttempts (10 )
363+ .withDelay (Duration .ofMillis (100 ))
364+ .build ())
372365 .run (() -> assertThat (schemaExists ()).isTrue ());
373366 Failsafe .with (
374- RetryPolicy .builder ()
375- .withMaxAttempts (10 )
376- .withDelay (Duration .ofMillis (100 ))
377- .build ())
367+ RetryPolicy .builder ()
368+ .withMaxAttempts (10 )
369+ .withDelay (Duration .ofMillis (100 ))
370+ .build ())
378371 .run (() -> assertThat (tableExists (tableName )).isTrue ());
379372 }
380373
381374 private boolean schemaExists ()
382375 {
383376 return getQueryRunner ().execute (format (
384- "SHOW SCHEMAS FROM %s LIKE '%s'" ,
385- getSession ().getCatalog ().orElseThrow (),
386- getSession ().getSchema ().orElseThrow ()))
377+ "SHOW SCHEMAS FROM %s LIKE '%s'" ,
378+ getSession ().getCatalog ().orElseThrow (),
379+ getSession ().getSchema ().orElseThrow ()))
387380 .getRowCount () == 1 ;
388381 }
389382
@@ -441,30 +434,11 @@ private static GenericRecord createRecordWithEvolvedSchema(long key)
441434 .build ();
442435 }
443436
444- private static class JsonValue
437+ private record JsonValue ( int id , String value )
445438 {
446- private final int id ;
447- private final String value ;
448-
449- @ JsonCreator
450- public JsonValue (
451- @ JsonProperty ("id" ) int id ,
452- @ JsonProperty ("value" ) String value )
453- {
454- this .id = id ;
455- this .value = requireNonNull (value , "value is null" );
456- }
457-
458- @ JsonProperty ("id" )
459- public int getId ()
460- {
461- return id ;
462- }
463-
464- @ JsonProperty ("value" )
465- public String getValue ()
439+ private JsonValue
466440 {
467- return value ;
441+ requireNonNull ( value , " value is null" ) ;
468442 }
469443 }
470444}
0 commit comments