23
23
import org .apache .commons .lang3 .NotImplementedException ;
24
24
import org .apache .flink .api .common .io .OutputFormat ;
25
25
import org .apache .flink .api .common .serialization .SerializationSchema ;
26
- import org .junit .Assert ;
27
- import org .junit .Test ;
26
+ import org .junit .jupiter .api .Test ;
28
27
import org .mockito .Mockito ;
29
28
30
29
import java .io .IOException ;
31
30
import java .net .URI ;
32
31
import java .util .concurrent .CompletableFuture ;
33
32
import java .util .concurrent .ExecutorService ;
34
33
35
- import static org .junit . Assert . assertEquals ;
36
- import static org .junit . Assert . assertTrue ;
37
- import static org .junit . Assert .fail ;
34
+ import static org .assertj . core . api . Assertions . assertThat ;
35
+ import static org .assertj . core . api . Assertions . assertThatThrownBy ;
36
+ import static org .assertj . core . api . Assertions .fail ;
38
37
import static org .mockito .Matchers .any ;
39
38
import static org .mockito .Matchers .anyObject ;
40
39
import static org .mockito .Matchers .anyString ;
@@ -73,37 +72,41 @@ public void testBuilderForSuccess() {
73
72
.withPravegaConfig (pravegaConfig )
74
73
.forStream (stream )
75
74
.build ();
76
- assertEquals ( stream .getScope (), outputFormat .getScope ());
77
- assertEquals ( stream . getStreamName (), outputFormat . getStream ());
78
- assertEquals ( serializationSchema , outputFormat .getSerializationSchema ());
79
- assertEquals ( eventRouter , outputFormat .getEventRouter ());
75
+ assertThat ( outputFormat .getScope ()). isEqualTo ( stream .getScope ());
76
+ assertThat ( outputFormat . getStream ()). isEqualTo ( stream . getStreamName ());
77
+ assertThat ( outputFormat .getSerializationSchema ()). isEqualTo ( serializationSchema );
78
+ assertThat ( outputFormat .getEventRouter ()). isEqualTo ( eventRouter );
80
79
}
81
80
82
81
/**
83
82
* Testing the builder for right configurations.
84
83
* Should fail since we don't pass {@link SerializationSchema}
85
84
*/
86
- @ Test ( expected = NullPointerException . class )
85
+ @ Test
87
86
public void testBuilderForFailure1 () {
88
87
PravegaConfig pravegaConfig = mock (PravegaConfig .class );
89
- FlinkPravegaOutputFormat .<String >builder ()
90
- .withEventRouter (eventRouter )
91
- .withPravegaConfig (pravegaConfig )
92
- .forStream (stream )
93
- .build ();
88
+ assertThatThrownBy (
89
+ () -> FlinkPravegaOutputFormat .<String >builder ()
90
+ .withEventRouter (eventRouter )
91
+ .withPravegaConfig (pravegaConfig )
92
+ .forStream (stream )
93
+ .build ())
94
+ .isInstanceOf (NullPointerException .class );
94
95
}
95
96
96
97
/**
97
98
* Testing the builder for right configurations.
98
99
* Should fail since we don't pass {@link Stream}
99
100
*/
100
- @ Test ( expected = IllegalStateException . class )
101
+ @ Test
101
102
public void testBuilderForFailure2 () {
102
- FlinkPravegaOutputFormat .<String >builder ()
103
- .withEventRouter (eventRouter )
104
- .withSerializationSchema (serializationSchema )
105
- .withPravegaConfig (pravegaConfig )
106
- .build ();
103
+ assertThatThrownBy (
104
+ () -> FlinkPravegaOutputFormat .<String >builder ()
105
+ .withEventRouter (eventRouter )
106
+ .withSerializationSchema (serializationSchema )
107
+ .withPravegaConfig (pravegaConfig )
108
+ .build ())
109
+ .isInstanceOf (IllegalStateException .class );
107
110
}
108
111
109
112
/**
@@ -133,7 +136,7 @@ public void testLifecycleMethods() throws Exception {
133
136
134
137
// test writeRecord success
135
138
spyFlinkPravegaOutputFormat .writeRecord ("test-1" );
136
- assertEquals ( 1 , spyFlinkPravegaOutputFormat .getPendingWritesCount ().get ());
139
+ assertThat ( spyFlinkPravegaOutputFormat .getPendingWritesCount ().get ()). isEqualTo ( 1 );
137
140
writeFuture .complete (null );
138
141
139
142
// test writeRecord induce failure
@@ -144,17 +147,17 @@ public void testLifecycleMethods() throws Exception {
144
147
try {
145
148
spyFlinkPravegaOutputFormat .writeRecord ("test-3" );
146
149
} catch (Exception e ) {
147
- Assert . assertTrue (e instanceof IOException );
148
- assertTrue (spyFlinkPravegaOutputFormat .isErrorOccurred ());
149
- assertEquals ( "test simulated" , e .getCause ().getMessage ());
150
- assertEquals ( 0 , spyFlinkPravegaOutputFormat .getPendingWritesCount ().get ());
150
+ assertThat (e instanceof IOException ). isTrue ( );
151
+ assertThat (spyFlinkPravegaOutputFormat .isErrorOccurred ()). isTrue ( );
152
+ assertThat ( e .getCause ().getMessage ()). isEqualTo ( "test simulated" );
153
+ assertThat ( spyFlinkPravegaOutputFormat .getPendingWritesCount ().get ()). isEqualTo ( 0 );
151
154
}
152
155
153
156
// test close error
154
157
try {
155
158
spyFlinkPravegaOutputFormat .close ();
156
159
} catch (Exception e ) {
157
- Assert . assertTrue (e instanceof IOException );
160
+ assertThat (e instanceof IOException ). isTrue ( );
158
161
}
159
162
160
163
// test close
@@ -173,7 +176,7 @@ public void testSchemaRegistrySerialization() throws Exception {
173
176
.forStream ("stream" )
174
177
.withSerializationSchemaFromRegistry ("stream" , Integer .class )
175
178
.build ();
176
- fail ();
179
+ fail (null );
177
180
} catch (NullPointerException e ) {
178
181
// "missing default scope"
179
182
}
@@ -185,7 +188,7 @@ public void testSchemaRegistrySerialization() throws Exception {
185
188
.forStream ("stream" )
186
189
.withSerializationSchemaFromRegistry ("stream" , Integer .class )
187
190
.build ();
188
- fail ();
191
+ fail (null );
189
192
} catch (NullPointerException e ) {
190
193
// "missing Schema Registry URI"
191
194
}
0 commit comments