diff --git a/airbyte-config/config-models/src/main/resources/types/ResetSourceConfiguration.yaml b/airbyte-config/config-models/src/main/resources/types/ResetSourceConfiguration.yaml index 3860c716e141..a9d131971394 100644 --- a/airbyte-config/config-models/src/main/resources/types/ResetSourceConfiguration.yaml +++ b/airbyte-config/config-models/src/main/resources/types/ResetSourceConfiguration.yaml @@ -4,7 +4,7 @@ title: ResetSourceConfiguration description: configuration of the reset source type: object -additionalProperties: true +additionalProperties: false required: - streamsToReset properties: diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java b/airbyte-workers/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java index 156ad1c31cc5..84d0715cbd9c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java @@ -5,29 +5,95 @@ package io.airbyte.workers.internal; import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ResetSourceConfiguration; +import io.airbyte.config.StateType; +import io.airbyte.config.StateWrapper; +import io.airbyte.config.StreamDescriptor; import io.airbyte.config.WorkerSourceConfig; +import io.airbyte.config.helpers.StateMessageHelper; +import io.airbyte.protocol.models.AirbyteGlobalState; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; +import io.airbyte.protocol.models.AirbyteStreamState; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; import java.util.Optional; +import java.util.Queue; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; /** * This source will never emit any messages. It can be used in cases where that is helpful (hint: * reset connection jobs). */ +@Slf4j public class EmptyAirbyteSource implements AirbyteSource { private final AtomicBoolean hasEmittedState; + private final Queue streamsToReset = new LinkedList<>(); + // TODO: Once we are sure that the legacy way of transmitting the state is not use anymore, we need + // to remove this variable and the associated + // checks + private boolean isResetBasedForConfig; + private boolean isStarted = false; + private Optional stateWrapper; public EmptyAirbyteSource() { hasEmittedState = new AtomicBoolean(); } @Override - public void start(final WorkerSourceConfig sourceConfig, final Path jobRoot) throws Exception { - // no op. + public void start(final WorkerSourceConfig workerSourceConfig, final Path jobRoot) throws Exception { + + if (workerSourceConfig == null || workerSourceConfig.getSourceConnectionConfiguration() == null) { + // TODO: When the jobConfig is fully updated and tested, we can remove this extra check that makes + // us compatible with running a reset with + // a null config + /* + * This is a protection against reverting a commit that set the resetSourceConfiguration, it makes + * that there is not side effect of such a revert. The legacy behavior is to have the config as an + * empty jsonObject, this is an extra protection if the workerConfiguration is null. In the previous + * implementation it was unused so passing it as null should not result in a NPE or a parsing + * failure. + */ + isResetBasedForConfig = false; + } else { + final ResetSourceConfiguration resetSourceConfiguration; + resetSourceConfiguration = parseResetSourceConfigurationAndLogError(workerSourceConfig); + streamsToReset.addAll(resetSourceConfiguration.getStreamsToReset()); + + if (streamsToReset.isEmpty()) { + // TODO: This is done to be able to handle the transition period where we can have no stream being + // pass to the configuration because the + // logic of populating this list is not implemented + /* + * This is a protection against reverting a commit that set the resetSourceConfiguration, it makes + * that there is not side effect of such a revert. The legacy behavior is to have the config as an + * empty object, it has been changed here: + * https://github.com/airbytehq/airbyte/pull/13696/files#diff- + * f51ff997b60a346c704608bb1cd7d22457eda2559b42987d5fa1281d568fc222L40 + */ + isResetBasedForConfig = false; + } else { + stateWrapper = StateMessageHelper.getTypedState(workerSourceConfig.getState().getState()); + + if (stateWrapper.isPresent() && + stateWrapper.get().getStateType() == StateType.LEGACY && + !isResetAllStreamsInCatalog(workerSourceConfig)) { + log.error("The state a legacy one but we are trying to do a partial update, this is not supported."); + throw new IllegalStateException("Try to perform a partial reset on a legacy state"); + } + + isResetBasedForConfig = true; + } + } + isStarted = true; } // always finished. it has no data to send. @@ -43,11 +109,20 @@ public int getExitValue() { @Override public Optional attemptRead() { - if (!hasEmittedState.get()) { - hasEmittedState.compareAndSet(false, true); - return Optional.of(new AirbyteMessage().withType(Type.STATE).withState(new AirbyteStateMessage().withData(Jsons.emptyObject()))); + if (!isStarted) { + throw new IllegalStateException("The empty source has not been started."); + } + + if (isResetBasedForConfig) { + if (stateWrapper.get().getStateType() == StateType.STREAM) { + return emitStreamState(); + } else if (stateWrapper.get().getStateType() == StateType.GLOBAL) { + return emitGlobalState(); + } else { + return emitLegacyState(); + } } else { - return Optional.empty(); + return emitLegacyState(); } } @@ -61,4 +136,114 @@ public void cancel() throws Exception { // no op. } + private Optional emitStreamState() { + // Per stream, it will emit one message per stream being reset + if (!streamsToReset.isEmpty()) { + final StreamDescriptor streamDescriptor = streamsToReset.poll(); + return Optional.of(getNullStreamStateMessage(streamDescriptor)); + } else { + return Optional.empty(); + } + } + + private Optional emitGlobalState() { + if (hasEmittedState.get()) { + return Optional.empty(); + } else { + hasEmittedState.compareAndSet(false, true); + return Optional.of(getNullGlobalMessage(streamsToReset, stateWrapper.get().getGlobal())); + } + } + + private Optional emitLegacyState() { + if (hasEmittedState.get()) { + return Optional.empty(); + } else { + hasEmittedState.compareAndSet(false, true); + return Optional.of(new AirbyteMessage().withType(Type.STATE) + .withState(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY).withData(Jsons.emptyObject()))); + } + } + + private boolean isResetAllStreamsInCatalog(final WorkerSourceConfig sourceConfig) { + final Set catalogStreamDescriptors = sourceConfig.getCatalog().getStreams().stream().map( + configuredAirbyteStream -> new StreamDescriptor() + .withName(configuredAirbyteStream.getStream().getName()) + .withNamespace(configuredAirbyteStream.getStream().getNamespace())) + .collect(Collectors.toSet()); + final Set configStreamDescriptors = new HashSet<>(streamsToReset); + + return catalogStreamDescriptors.equals(configStreamDescriptors); + } + + private AirbyteMessage getNullStreamStateMessage(final StreamDescriptor streamsToReset) { + return new AirbyteMessage() + .withType(Type.STATE) + .withState( + new AirbyteStateMessage() + .withType(AirbyteStateType.STREAM) + .withStream( + new AirbyteStreamState() + .withStreamDescriptor(new io.airbyte.protocol.models.StreamDescriptor() + .withName(streamsToReset.getName()) + .withNamespace(streamsToReset.getNamespace())) + .withStreamState(null))); + } + + private AirbyteMessage getNullGlobalMessage(final Queue streamsToReset, final AirbyteStateMessage currentState) { + final AirbyteGlobalState globalState = new AirbyteGlobalState(); + globalState.setStreamStates(new ArrayList<>()); + + currentState.getGlobal().getStreamStates().forEach(existingState -> globalState.getStreamStates() + .add( + new AirbyteStreamState() + .withStreamDescriptor(existingState.getStreamDescriptor()) + .withStreamState( + streamsToReset.contains(new StreamDescriptor() + .withName(existingState.getStreamDescriptor().getName()) + .withNamespace(existingState.getStreamDescriptor().getNamespace())) ? null : existingState.getStreamState()))); + + // If all the streams in the current state have been reset, we consider this to be a full reset, so + // reset the shared state as well + if (currentState.getGlobal().getStreamStates().size() == globalState.getStreamStates().stream() + .filter(streamState -> streamState.getStreamState() == null).count()) { + log.info("All the streams of a global state have been reset, the shared state will be erased as well"); + globalState.setSharedState(null); + } else { + log.info("This is a partial reset, the shared state will be preserved"); + globalState.setSharedState(currentState.getGlobal().getSharedState()); + } + + // Add state being reset that are not in the current state. This is made to follow the contract of + // the global state always containing the entire + // state + streamsToReset.forEach(configStreamDescriptor -> { + final io.airbyte.protocol.models.StreamDescriptor streamDescriptor = new io.airbyte.protocol.models.StreamDescriptor() + .withName(configStreamDescriptor.getName()) + .withNamespace(configStreamDescriptor.getNamespace()); + if (!currentState.getGlobal().getStreamStates().stream().map(streamState -> streamState.getStreamDescriptor()).toList() + .contains(streamDescriptor)) { + globalState.getStreamStates().add(new AirbyteStreamState() + .withStreamDescriptor(streamDescriptor) + .withStreamState(null)); + } + }); + + return new AirbyteMessage() + .withType(Type.STATE) + .withState( + new AirbyteStateMessage() + .withType(AirbyteStateType.GLOBAL) + .withGlobal(globalState)); + } + + private ResetSourceConfiguration parseResetSourceConfigurationAndLogError(final WorkerSourceConfig workerSourceConfig) { + try { + return Jsons.object(workerSourceConfig.getSourceConnectionConfiguration(), ResetSourceConfiguration.class); + } catch (final IllegalArgumentException e) { + log.error("The configuration provided to the reset has an invalid format"); + throw e; + } + } + } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/internal/EmptyAirbyteSourceTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/internal/EmptyAirbyteSourceTest.java new file mode 100644 index 000000000000..8d14c6352712 --- /dev/null +++ b/airbyte-workers/src/test/java/io/airbyte/workers/internal/EmptyAirbyteSourceTest.java @@ -0,0 +1,415 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.internal; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ResetSourceConfiguration; +import io.airbyte.config.State; +import io.airbyte.config.WorkerSourceConfig; +import io.airbyte.protocol.models.AirbyteGlobalState; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.AirbyteStreamState; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.StreamDescriptor; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class EmptyAirbyteSourceTest { + + private EmptyAirbyteSource emptyAirbyteSource; + private final AirbyteMessage EMPTY_MESSAGE = + new AirbyteMessage().withType(Type.STATE) + .withState(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY).withData(Jsons.emptyObject())); + + private final ConfiguredAirbyteCatalog airbyteCatalog = new ConfiguredAirbyteCatalog() + .withStreams(Lists.newArrayList( + new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("a")), + new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("b")), + new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("c")))); + + @BeforeEach + public void init() { + emptyAirbyteSource = new EmptyAirbyteSource(); + } + + @Test + public void testLegacy() throws Exception { + emptyAirbyteSource.start(new WorkerSourceConfig(), null); + + legacyStateResult(); + } + + @Test + public void testLegacyWithEmptyConfig() throws Exception { + emptyAirbyteSource.start(new WorkerSourceConfig().withSourceConnectionConfiguration(Jsons.emptyObject()), null); + + legacyStateResult(); + } + + @Test + public void testLegacyWithWrongConfigFormat() throws Exception { + emptyAirbyteSource.start(new WorkerSourceConfig().withSourceConnectionConfiguration(Jsons.jsonNode( + Map.of("not", "expected"))), null); + + legacyStateResult(); + } + + @Test + public void testEmptyListOfStreams() throws Exception { + final ResetSourceConfiguration resetSourceConfiguration = new ResetSourceConfiguration() + .withStreamsToReset(new ArrayList<>()); + final WorkerSourceConfig workerSourceConfig = new WorkerSourceConfig() + .withSourceConnectionConfiguration(Jsons.jsonNode(resetSourceConfiguration)) + .withCatalog(airbyteCatalog); + + emptyAirbyteSource.start(workerSourceConfig, null); + + legacyStateResult(); + } + + @Test + public void nonStartedSource() { + final Throwable thrown = Assertions.catchThrowable(() -> emptyAirbyteSource.attemptRead()); + Assertions.assertThat(thrown) + .isInstanceOf(IllegalStateException.class); + } + + @Test + public void testGlobal() throws Exception { + final List streamDescriptors = getProtocolStreamDescriptorFromName(Lists.newArrayList("a", "b", "c")); + + final List streamToReset = getConfigStreamDescriptorFromName(Lists.newArrayList("a", "b", "c")); + + final ResetSourceConfiguration resetSourceConfiguration = new ResetSourceConfiguration() + .withStreamsToReset(streamToReset); + final WorkerSourceConfig workerSourceConfig = new WorkerSourceConfig() + .withSourceConnectionConfiguration(Jsons.jsonNode(resetSourceConfiguration)) + .withState(new State() + .withState(Jsons.jsonNode(createGlobalState(streamDescriptors, Jsons.emptyObject())))) + .withCatalog(airbyteCatalog); + + emptyAirbyteSource.start(workerSourceConfig, null); + + final Optional maybeMessage = emptyAirbyteSource.attemptRead(); + Assertions.assertThat(maybeMessage) + .isNotEmpty(); + + final AirbyteMessage message = maybeMessage.get(); + Assertions.assertThat(message.getType()).isEqualTo(Type.STATE); + + /* + * The comparison could be what it is below but it makes it hard to see what is the diff. It has + * been break dow into multiples assertions. (same comment in the other tests) + * + * AirbyteStateMessage expectedState = new AirbyteStateMessage() + * .withStateType(AirbyteStateType.GLOBAL) .withGlobal( new AirbyteGlobalState() + * .withSharedState(Jsons.emptyObject()) .withStreamStates( Lists.newArrayList( new + * AirbyteStreamState().withStreamState(null).withStreamDescriptor(new + * StreamDescriptor().withName("a")), new + * AirbyteStreamState().withStreamState(null).withStreamDescriptor(new + * StreamDescriptor().withName("b")), new + * AirbyteStreamState().withStreamState(null).withStreamDescriptor(new + * StreamDescriptor().withName("c")) ) ) ); + * + * Assertions.assertThat(stateMessage).isEqualTo(expectedState); + */ + final AirbyteStateMessage stateMessage = message.getState(); + Assertions.assertThat(stateMessage.getType()).isEqualTo(AirbyteStateType.GLOBAL); + Assertions.assertThat(stateMessage.getGlobal().getSharedState()).isNull(); + Assertions.assertThat(stateMessage.getGlobal().getStreamStates()) + .map(streamState -> streamState.getStreamDescriptor()) + .containsExactlyElementsOf(streamDescriptors); + Assertions.assertThat(stateMessage.getGlobal().getStreamStates()) + .map(streamState -> streamState.getStreamState()) + .containsOnlyNulls(); + + Assertions.assertThat(emptyAirbyteSource.attemptRead()) + .isEmpty(); + } + + @Test + public void testGlobalPartial() throws Exception { + final String NOT_RESET_STREAM_NAME = "c"; + + final List streamDescriptors = getProtocolStreamDescriptorFromName(Lists.newArrayList("a", "b", NOT_RESET_STREAM_NAME)); + + final List streamToReset = getConfigStreamDescriptorFromName(Lists.newArrayList("a", "b")); + + final ResetSourceConfiguration resetSourceConfiguration = new ResetSourceConfiguration() + .withStreamsToReset(streamToReset); + final WorkerSourceConfig workerSourceConfig = new WorkerSourceConfig() + .withSourceConnectionConfiguration(Jsons.jsonNode(resetSourceConfiguration)) + .withState(new State() + .withState(Jsons.jsonNode(createGlobalState(streamDescriptors, Jsons.emptyObject())))) + .withCatalog(airbyteCatalog); + + emptyAirbyteSource.start(workerSourceConfig, null); + + final Optional maybeMessage = emptyAirbyteSource.attemptRead(); + Assertions.assertThat(maybeMessage) + .isNotEmpty(); + + final AirbyteMessage message = maybeMessage.get(); + Assertions.assertThat(message.getType()).isEqualTo(Type.STATE); + + final AirbyteStateMessage stateMessage = message.getState(); + + Assertions.assertThat(stateMessage.getType()).isEqualTo(AirbyteStateType.GLOBAL); + Assertions.assertThat(stateMessage.getGlobal().getSharedState()).isEqualTo(Jsons.emptyObject()); + Assertions.assertThat(stateMessage.getGlobal().getStreamStates()) + .filteredOn(streamState -> streamState.getStreamDescriptor().getName() != NOT_RESET_STREAM_NAME) + .map(AirbyteStreamState::getStreamState) + .containsOnlyNulls(); + Assertions.assertThat(stateMessage.getGlobal().getStreamStates()) + .filteredOn(streamState -> streamState.getStreamDescriptor().getName() == NOT_RESET_STREAM_NAME) + .map(AirbyteStreamState::getStreamState) + .contains(Jsons.emptyObject()); + + Assertions.assertThat(emptyAirbyteSource.attemptRead()) + .isEmpty(); + } + + @Test + public void testGlobalNewStream() throws Exception { + final String NEW_STREAM = "c"; + + final List streamDescriptors = getProtocolStreamDescriptorFromName(Lists.newArrayList("a", "b")); + + final List streamToReset = getConfigStreamDescriptorFromName(Lists.newArrayList("a", "b", NEW_STREAM)); + + final ResetSourceConfiguration resetSourceConfiguration = new ResetSourceConfiguration() + .withStreamsToReset(streamToReset); + final WorkerSourceConfig workerSourceConfig = new WorkerSourceConfig() + .withSourceConnectionConfiguration(Jsons.jsonNode(resetSourceConfiguration)) + .withState(new State() + .withState(Jsons.jsonNode(createGlobalState(streamDescriptors, Jsons.emptyObject())))) + .withCatalog(airbyteCatalog); + + emptyAirbyteSource.start(workerSourceConfig, null); + + final Optional maybeMessage = emptyAirbyteSource.attemptRead(); + Assertions.assertThat(maybeMessage) + .isNotEmpty(); + + final AirbyteMessage message = maybeMessage.get(); + Assertions.assertThat(message.getType()).isEqualTo(Type.STATE); + + final AirbyteStateMessage stateMessage = message.getState(); + + Assertions.assertThat(stateMessage.getType()).isEqualTo(AirbyteStateType.GLOBAL); + Assertions.assertThat(stateMessage.getGlobal().getSharedState()).isNull(); + Assertions.assertThat(stateMessage.getGlobal().getStreamStates()) + .map(AirbyteStreamState::getStreamState) + .containsOnlyNulls(); + Assertions.assertThat(stateMessage.getGlobal().getStreamStates()) + .filteredOn(streamState -> streamState.getStreamDescriptor().getName() == NEW_STREAM) + .hasSize(1); + + Assertions.assertThat(emptyAirbyteSource.attemptRead()) + .isEmpty(); + } + + @Test + public void testPerStream() throws Exception { + final List streamDescriptors = getProtocolStreamDescriptorFromName(Lists.newArrayList("a", "b", "c")); + + final List streamToReset = getConfigStreamDescriptorFromName(Lists.newArrayList("a", "b", "c")); + + final ResetSourceConfiguration resetSourceConfiguration = new ResetSourceConfiguration() + .withStreamsToReset(streamToReset); + final WorkerSourceConfig workerSourceConfig = new WorkerSourceConfig() + .withSourceConnectionConfiguration(Jsons.jsonNode(resetSourceConfiguration)) + .withState(new State() + .withState(Jsons.jsonNode(createPerStreamState(streamDescriptors)))) + .withCatalog(airbyteCatalog); + + emptyAirbyteSource.start(workerSourceConfig, null); + + streamToReset.forEach(this::testReceiveNullStreamState); + + Assertions.assertThat(emptyAirbyteSource.attemptRead()) + .isEmpty(); + } + + @Test + public void testPerStreamWithExtraState() throws Exception { + // This should never happen but nothing keeps us from processing the reset and not fail + final List streamDescriptors = getProtocolStreamDescriptorFromName(Lists.newArrayList("a", "b", "c", "d")); + + final List streamToReset = getConfigStreamDescriptorFromName(Lists.newArrayList("a", "b", "c")); + + final ResetSourceConfiguration resetSourceConfiguration = new ResetSourceConfiguration() + .withStreamsToReset(streamToReset); + final WorkerSourceConfig workerSourceConfig = new WorkerSourceConfig() + .withSourceConnectionConfiguration(Jsons.jsonNode(resetSourceConfiguration)) + .withState(new State() + .withState(Jsons.jsonNode(createPerStreamState(streamDescriptors)))) + .withCatalog(airbyteCatalog); + + emptyAirbyteSource.start(workerSourceConfig, null); + + streamToReset.forEach(this::testReceiveNullStreamState); + + Assertions.assertThat(emptyAirbyteSource.attemptRead()) + .isEmpty(); + } + + @Test + public void testPerStreamWithMissingState() throws Exception { + final String NEW_STREAM = "c"; + + final List streamDescriptors = getProtocolStreamDescriptorFromName(Lists.newArrayList("a", "b")); + + final List streamToReset = getConfigStreamDescriptorFromName(Lists.newArrayList("a", "b", NEW_STREAM)); + + final ResetSourceConfiguration resetSourceConfiguration = new ResetSourceConfiguration() + .withStreamsToReset(streamToReset); + final WorkerSourceConfig workerSourceConfig = new WorkerSourceConfig() + .withSourceConnectionConfiguration(Jsons.jsonNode(resetSourceConfiguration)) + .withState(new State() + .withState(Jsons.jsonNode(createPerStreamState(streamDescriptors)))) + .withCatalog(airbyteCatalog); + + emptyAirbyteSource.start(workerSourceConfig, null); + + streamToReset.forEach(this::testReceiveNullStreamState); + + Assertions.assertThat(emptyAirbyteSource.attemptRead()) + .isEmpty(); + } + + @Test + public void testLegacyWithNewConfigMissingStream() { + + final List streamToReset = getConfigStreamDescriptorFromName(Lists.newArrayList("a", "b", "c")); + + final ResetSourceConfiguration resetSourceConfiguration = new ResetSourceConfiguration() + .withStreamsToReset(streamToReset); + final ConfiguredAirbyteCatalog airbyteCatalogWithExtraStream = new ConfiguredAirbyteCatalog() + .withStreams(Lists.newArrayList( + new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("a")), + new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("b")), + new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("c")), + new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("d")))); + + final WorkerSourceConfig workerSourceConfig = new WorkerSourceConfig() + .withSourceConnectionConfiguration(Jsons.jsonNode(resetSourceConfiguration)) + .withState(new State() + .withState(Jsons.emptyObject())) + .withCatalog(airbyteCatalogWithExtraStream); + + Assertions.assertThatThrownBy(() -> emptyAirbyteSource.start(workerSourceConfig, null)) + .isInstanceOf(IllegalStateException.class); + + } + + @Test + public void testLegacyWithNewConfig() throws Exception { + final List streamToReset = getConfigStreamDescriptorFromName(Lists.newArrayList("a", "b", "c")); + + final ResetSourceConfiguration resetSourceConfiguration = new ResetSourceConfiguration() + .withStreamsToReset(streamToReset); + final ConfiguredAirbyteCatalog airbyteCatalogWithExtraStream = new ConfiguredAirbyteCatalog() + .withStreams(Lists.newArrayList( + new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("a")), + new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("b")), + new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("c")))); + + final WorkerSourceConfig workerSourceConfig = new WorkerSourceConfig() + .withSourceConnectionConfiguration(Jsons.jsonNode(resetSourceConfiguration)) + .withState(new State() + .withState(Jsons.emptyObject())) + .withCatalog(airbyteCatalogWithExtraStream); + + emptyAirbyteSource.start(workerSourceConfig, null); + + final Optional maybeMessage = emptyAirbyteSource.attemptRead(); + Assertions.assertThat(maybeMessage) + .isNotEmpty(); + + final AirbyteMessage message = maybeMessage.get(); + Assertions.assertThat(message.getType()).isEqualTo(Type.STATE); + + final AirbyteStateMessage stateMessage = message.getState(); + Assertions.assertThat(stateMessage.getType()).isEqualTo(AirbyteStateType.LEGACY); + Assertions.assertThat(stateMessage.getData()).isEqualTo(Jsons.emptyObject()); + + Assertions.assertThat(emptyAirbyteSource.attemptRead()) + .isEmpty(); + } + + private void testReceiveNullStreamState(final io.airbyte.config.StreamDescriptor streamDescriptor) { + final Optional maybeMessage = emptyAirbyteSource.attemptRead(); + Assertions.assertThat(maybeMessage) + .isNotEmpty(); + + final AirbyteMessage message = maybeMessage.get(); + Assertions.assertThat(message.getType()).isEqualTo(Type.STATE); + + final AirbyteStateMessage stateMessage = message.getState(); + Assertions.assertThat(stateMessage.getType()).isEqualTo(AirbyteStateType.STREAM); + Assertions.assertThat(stateMessage.getStream().getStreamDescriptor()).isEqualTo(new StreamDescriptor() + .withName(streamDescriptor.getName()) + .withNamespace(streamDescriptor.getNamespace())); + Assertions.assertThat(stateMessage.getStream().getStreamState()).isNull(); + } + + private List getProtocolStreamDescriptorFromName(final List names) { + return names.stream().map( + name -> new StreamDescriptor().withName(name)).toList(); + } + + private List getConfigStreamDescriptorFromName(final List names) { + return names.stream().map( + name -> new io.airbyte.config.StreamDescriptor().withName(name)).toList(); + } + + private void legacyStateResult() { + Assertions.assertThat(emptyAirbyteSource.attemptRead()) + .isNotEmpty() + .contains(EMPTY_MESSAGE); + + Assertions.assertThat(emptyAirbyteSource.attemptRead()) + .isEmpty(); + } + + private List createPerStreamState(final List streamDescriptors) { + return streamDescriptors.stream().map(streamDescriptor -> new AirbyteStateMessage() + .withType(AirbyteStateType.STREAM) + .withStream( + new AirbyteStreamState() + .withStreamDescriptor(streamDescriptor) + .withStreamState(Jsons.emptyObject()))) + .toList(); + } + + private List createGlobalState(final List streamDescriptors, final JsonNode sharedState) { + final AirbyteGlobalState globalState = new AirbyteGlobalState() + .withSharedState(sharedState) + .withStreamStates( + streamDescriptors.stream().map(streamDescriptor -> new AirbyteStreamState() + .withStreamDescriptor(streamDescriptor) + .withStreamState(Jsons.emptyObject())) + .toList()); + + return Lists.newArrayList( + new AirbyteStateMessage() + .withType(AirbyteStateType.GLOBAL) + .withGlobal(globalState)); + } + +}