Skip to content

Commit fede7fc

Browse files
committed
[FLINK-24493] [flink-connector-base] Improved Demultiplexing Sink Test Suite and Coverage
1 parent 3d7f00d commit fede7fc

File tree

5 files changed

+374
-19
lines changed

5 files changed

+374
-19
lines changed

flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/DemultiplexingSinkIT.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
package org.apache.flink.connector.base.sink;
1919

20+
import org.apache.flink.api.connector.sink2.SinkWriter;
2021
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
2122
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2223
import org.apache.flink.test.junit5.MiniClusterExtension;
2324

2425
import org.junit.jupiter.api.Test;
2526
import org.junit.jupiter.api.extension.RegisterExtension;
2627

28+
import java.io.Serializable;
2729
import java.util.ArrayList;
2830
import java.util.List;
2931
import java.util.concurrent.ConcurrentHashMap;
@@ -54,7 +56,7 @@ void testBasicElementRouting() throws Exception {
5456
final DemultiplexingSink<String, String> demuxSink = new DemultiplexingSink<>(router);
5557

5658
// Create a data stream with elements that will route to different sinks
57-
env.fromElements("apple", "banana", "apricot", "blueberry", "avocado", "cherry")
59+
env.fromData("apple", "banana", "apricot", "blueberry", "avocado", "cherry")
5860
.sinkTo(demuxSink);
5961

6062
// Execute the job
@@ -117,8 +119,7 @@ public org.apache.flink.api.connector.sink2.SinkWriter<String> createWriter(
117119
}
118120

119121
/** A simple collecting sink writer for testing. */
120-
private static class CollectingSinkWriter
121-
implements org.apache.flink.api.connector.sink2.SinkWriter<String> {
122+
private static class CollectingSinkWriter implements SinkWriter<String>, Serializable {
122123
private static final long serialVersionUID = 1L;
123124

124125
private final String route;

flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/DemultiplexingSinkStateManagementTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.junit.jupiter.api.Test;
3030

3131
import java.util.ArrayList;
32-
import java.util.Arrays;
3332
import java.util.Collection;
3433
import java.util.List;
3534
import java.util.concurrent.atomic.AtomicInteger;
@@ -133,8 +132,7 @@ void testRestoreWithNonStatefulSinks() throws Exception {
133132

134133
// Create writer with restored state
135134
DemultiplexingSinkWriter<String, String> writer =
136-
new DemultiplexingSinkWriter<>(
137-
nonStatefulRouter, context, Arrays.asList(dummyState));
135+
new DemultiplexingSinkWriter<>(nonStatefulRouter, context, List.of(dummyState));
138136

139137
// Write to trigger sink creation
140138
writer.write("apple", createContext());

flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/DemultiplexingSinkStateSerializerTest.java

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ void testSerializeDeserializeWithComplexRouteKeys() throws IOException {
9494
void testDeserializeWithWrongVersion() {
9595
final DemultiplexingSinkStateSerializer<String> serializer =
9696
new DemultiplexingSinkStateSerializer<>();
97-
final byte[] serialized = new byte[] {1, 2, 3, 4}; // Some dummy data
97+
final byte[] serialized = new byte[] {1, 2, 3, 4};
9898

9999
assertThatThrownBy(() -> serializer.deserialize(999, serialized))
100100
.isInstanceOf(IOException.class)
@@ -109,6 +109,158 @@ void testGetVersion() {
109109
assertThat(serializer.getVersion()).isEqualTo(1);
110110
}
111111

112+
@Test
113+
void testSetRouteStateWithNullState() {
114+
DemultiplexingSinkState<String> state = new DemultiplexingSinkState<>();
115+
116+
// Add a route first
117+
state.setRouteState("route1", new byte[] {1, 2, 3});
118+
assertThat(state.getRoutes()).containsExactly("route1");
119+
120+
// Setting null state should remove the route
121+
state.setRouteState("route1", null);
122+
assertThat(state.getRoutes()).isEmpty();
123+
assertThat(state.getRouteState("route1")).isNull();
124+
}
125+
126+
@Test
127+
void testSetRouteStateWithNullStateOnNonExistentRoute() {
128+
DemultiplexingSinkState<String> state = new DemultiplexingSinkState<>();
129+
130+
// Setting null state on non-existent route should be no-op
131+
state.setRouteState("nonExistent", null);
132+
assertThat(state.getRoutes()).isEmpty();
133+
}
134+
135+
@Test
136+
void testEqualsWithSameInstance() {
137+
DemultiplexingSinkState<String> state = new DemultiplexingSinkState<>();
138+
state.setRouteState("route1", new byte[] {1, 2, 3});
139+
140+
assertThat(state.equals(state)).isTrue();
141+
}
142+
143+
@Test
144+
void testEqualsWithNull() {
145+
DemultiplexingSinkState<String> state = new DemultiplexingSinkState<>();
146+
147+
assertThat(state.equals(null)).isFalse();
148+
}
149+
150+
@Test
151+
void testEqualsWithDifferentClass() {
152+
DemultiplexingSinkState<String> state = new DemultiplexingSinkState<>();
153+
154+
assertThat(state.equals("not a state")).isFalse();
155+
}
156+
157+
@Test
158+
void testEqualsWithDifferentRouteSizes() {
159+
DemultiplexingSinkState<String> state1 = new DemultiplexingSinkState<>();
160+
state1.setRouteState("route1", new byte[] {1, 2, 3});
161+
162+
DemultiplexingSinkState<String> state2 = new DemultiplexingSinkState<>();
163+
state2.setRouteState("route1", new byte[] {1, 2, 3});
164+
state2.setRouteState("route2", new byte[] {4, 5, 6});
165+
166+
assertThat(state1.equals(state2)).isFalse();
167+
assertThat(state2.equals(state1)).isFalse();
168+
}
169+
170+
@Test
171+
void testEqualsWithSameRoutesButDifferentStates() {
172+
DemultiplexingSinkState<String> state1 = new DemultiplexingSinkState<>();
173+
state1.setRouteState("route1", new byte[] {1, 2, 3});
174+
175+
DemultiplexingSinkState<String> state2 = new DemultiplexingSinkState<>();
176+
state2.setRouteState("route1", new byte[] {4, 5, 6});
177+
178+
assertThat(state1.equals(state2)).isFalse();
179+
}
180+
181+
@Test
182+
void testEqualsWithSameRoutesAndStates() {
183+
DemultiplexingSinkState<String> state1 = new DemultiplexingSinkState<>();
184+
state1.setRouteState("route1", new byte[] {1, 2, 3});
185+
state1.setRouteState("route2", new byte[] {4, 5, 6});
186+
187+
DemultiplexingSinkState<String> state2 = new DemultiplexingSinkState<>();
188+
state2.setRouteState("route1", new byte[] {1, 2, 3});
189+
state2.setRouteState("route2", new byte[] {4, 5, 6});
190+
191+
assertThat(state1.equals(state2)).isTrue();
192+
assertThat(state2.equals(state1)).isTrue();
193+
}
194+
195+
@Test
196+
void testHashCodeConsistency() {
197+
DemultiplexingSinkState<String> state1 = new DemultiplexingSinkState<>();
198+
state1.setRouteState("route1", new byte[] {1, 2, 3});
199+
state1.setRouteState("route2", new byte[] {4, 5, 6});
200+
201+
DemultiplexingSinkState<String> state2 = new DemultiplexingSinkState<>();
202+
state2.setRouteState("route1", new byte[] {1, 2, 3});
203+
state2.setRouteState("route2", new byte[] {4, 5, 6});
204+
205+
// Equal objects must have equal hash codes
206+
assertThat(state1.equals(state2)).isTrue();
207+
assertThat(state1.hashCode()).isEqualTo(state2.hashCode());
208+
}
209+
210+
@Test
211+
void testHashCodeWithEmptyState() {
212+
DemultiplexingSinkState<String> state1 = new DemultiplexingSinkState<>();
213+
DemultiplexingSinkState<String> state2 = new DemultiplexingSinkState<>();
214+
215+
assertThat(state1.hashCode()).isEqualTo(state2.hashCode());
216+
}
217+
218+
@Test
219+
void testHashCodeWithDifferentStates() {
220+
DemultiplexingSinkState<String> state1 = new DemultiplexingSinkState<>();
221+
state1.setRouteState("route1", new byte[] {1, 2, 3});
222+
223+
DemultiplexingSinkState<String> state2 = new DemultiplexingSinkState<>();
224+
state2.setRouteState("route1", new byte[] {4, 5, 6});
225+
226+
// Different objects should typically have different hash codes
227+
assertThat(state1.hashCode()).isNotEqualTo(state2.hashCode());
228+
}
229+
230+
@Test
231+
void testToStringWithEmptyState() {
232+
DemultiplexingSinkState<String> state = new DemultiplexingSinkState<>();
233+
234+
String toString = state.toString();
235+
assertThat(toString).contains("DemultiplexingSinkState");
236+
assertThat(toString).contains("routeCount=0");
237+
assertThat(toString).contains("routes=[]");
238+
}
239+
240+
@Test
241+
void testToStringWithSingleRoute() {
242+
DemultiplexingSinkState<String> state = new DemultiplexingSinkState<>();
243+
state.setRouteState("route1", new byte[] {1, 2, 3});
244+
245+
String toString = state.toString();
246+
assertThat(toString).contains("DemultiplexingSinkState");
247+
assertThat(toString).contains("routeCount=1");
248+
assertThat(toString).contains("route1");
249+
}
250+
251+
@Test
252+
void testToStringWithMultipleRoutes() {
253+
DemultiplexingSinkState<String> state = new DemultiplexingSinkState<>();
254+
state.setRouteState("route1", new byte[] {1, 2, 3});
255+
state.setRouteState("route2", new byte[] {4, 5, 6});
256+
257+
String toString = state.toString();
258+
assertThat(toString).contains("DemultiplexingSinkState");
259+
assertThat(toString).contains("routeCount=2");
260+
assertThat(toString).contains("route1");
261+
assertThat(toString).contains("route2");
262+
}
263+
112264
/** A complex route key for testing serialization. */
113265
private static class ComplexRouteKey implements java.io.Serializable {
114266
private static final long serialVersionUID = 1L;

flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/DemultiplexingSinkTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,51 @@ void testWriterStateSerializer() {
7070
.isInstanceOf(DemultiplexingSinkStateSerializer.class);
7171
}
7272

73+
@Test
74+
void testRestoreWriter() {
75+
final TestSinkRouter router = new TestSinkRouter();
76+
final DemultiplexingSink<String, String> sink = new DemultiplexingSink<>(router);
77+
final TestSinkInitContext context = new TestSinkInitContext();
78+
79+
// Create some state to restore from
80+
final DemultiplexingSinkState<String> state1 = new DemultiplexingSinkState<>();
81+
state1.setRouteState("a", new byte[] {1, 2, 3});
82+
state1.setRouteState("b", new byte[] {4, 5, 6});
83+
84+
final DemultiplexingSinkState<String> state2 = new DemultiplexingSinkState<>();
85+
state2.setRouteState("c", new byte[] {7, 8, 9});
86+
87+
final java.util.List<DemultiplexingSinkState<String>> recoveredStates =
88+
java.util.Arrays.asList(state1, state2);
89+
90+
// Restore the writer with the states
91+
final var restoredWriter = sink.restoreWriter(context, recoveredStates);
92+
93+
// Verify that the restored writer is the correct type
94+
assertThat(restoredWriter).isInstanceOf(DemultiplexingSinkWriter.class);
95+
96+
// Verify that the writer was created successfully
97+
assertThat(restoredWriter).isNotNull();
98+
}
99+
100+
@Test
101+
void testRestoreWriterWithEmptyState() {
102+
final TestSinkRouter router = new TestSinkRouter();
103+
final DemultiplexingSink<String, String> sink = new DemultiplexingSink<>(router);
104+
final TestSinkInitContext context = new TestSinkInitContext();
105+
106+
// Create an empty state
107+
final DemultiplexingSinkState<String> emptyState = new DemultiplexingSinkState<>();
108+
final java.util.List<DemultiplexingSinkState<String>> recoveredStates = List.of(emptyState);
109+
110+
// Restore the writer with empty state
111+
final var restoredWriter = sink.restoreWriter(context, recoveredStates);
112+
113+
// Verify that the restored writer is created successfully even with empty state
114+
assertThat(restoredWriter).isNotNull();
115+
assertThat(restoredWriter).isInstanceOf(DemultiplexingSinkWriter.class);
116+
}
117+
73118
/** Test implementation of {@link SinkRouter}. */
74119
private static class TestSinkRouter implements SinkRouter<String, String> {
75120
private final AtomicInteger sinkCreationCount = new AtomicInteger(0);

0 commit comments

Comments
 (0)