Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
3d496d5
Emit the state to remove in the airbyte empty source
benmoriceau Jun 13, 2022
0e275d0
Handle legacy use case
benmoriceau Jun 13, 2022
78df38b
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 13, 2022
f003ad3
Update names
benmoriceau Jun 13, 2022
f24472c
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 14, 2022
d7a63b1
Add test for legacy
benmoriceau Jun 14, 2022
cbcdf92
tmp
benmoriceau Jun 14, 2022
6afbdbe
Common code to deserialize a state message in the new format
benmoriceau Jun 14, 2022
4331d10
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 14, 2022
09661f4
Merge branch 'bmoric/extract-message-serialization' of github.com:air…
benmoriceau Jun 14, 2022
f1269bf
tmp
benmoriceau Jun 14, 2022
ee7fd54
PR comments and type changed to typed
benmoriceau Jun 14, 2022
156e3c4
Merge branch 'bmoric/extract-message-serialization' of github.com:air…
benmoriceau Jun 14, 2022
23f943c
add new message support
benmoriceau Jun 15, 2022
780fe54
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 15, 2022
c255acd
Finish the tests
benmoriceau Jun 15, 2022
545e6ad
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/ext…
benmoriceau Jun 15, 2022
b2e3250
Format
benmoriceau Jun 15, 2022
8785ad7
Merge branch 'bmoric/extract-message-serialization' of github.com:air…
benmoriceau Jun 15, 2022
93c138c
tmp
benmoriceau Jun 16, 2022
7707a97
Add StateType and StateWrapper objects to the model
gosusnp Jun 16, 2022
e3c881b
Merge branch 'gosusnp/add_state_wrapper' of github.com:airbytehq/airb…
benmoriceau Jun 16, 2022
27adc8b
Use state wrapper instead of Either
benmoriceau Jun 16, 2022
c3d9110
Merge branch 'bmoric/extract-message-serialization' of github.com:air…
benmoriceau Jun 16, 2022
4ee75aa
Switch to optional
benmoriceau Jun 16, 2022
47d0ac4
Merge branch 'bmoric/extract-message-serialization' of github.com:air…
benmoriceau Jun 16, 2022
c213f71
Merge branch 'master' into bmoric/extract-message-serialization
benmoriceau Jun 16, 2022
7387b4a
Some PR comments
benmoriceau Jun 16, 2022
ad3371d
PR comments
benmoriceau Jun 16, 2022
11167e6
Add a todo
benmoriceau Jun 16, 2022
0f4be01
Add test for legacy with config
benmoriceau Jun 17, 2022
280b114
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/ext…
benmoriceau Jun 17, 2022
9cb57f6
PR comments
benmoriceau Jun 17, 2022
fdbc10e
PR comments
benmoriceau Jun 17, 2022
0ea7eba
Support array legacy state
benmoriceau Jun 17, 2022
0ae05a2
format
benmoriceau Jun 17, 2022
30e3475
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 17, 2022
03b0eeb
Remove additional properties check
benmoriceau Jun 17, 2022
59eba26
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/ext…
benmoriceau Jun 17, 2022
2749453
Merge branch 'bmoric/extract-message-serialization' of github.com:air…
benmoriceau Jun 17, 2022
859d81e
Small rename
benmoriceau Jun 17, 2022
60917b9
Rename variables
benmoriceau Jun 21, 2022
c14a9b7
PR comments
benmoriceau Jun 21, 2022
8c02f18
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 21, 2022
64d5389
extract ResetSourceConfig creation
benmoriceau Jun 21, 2022
7006dfa
Extract reset config creation
benmoriceau Jun 21, 2022
ce3fd6e
Fix test
benmoriceau Jun 21, 2022
d06e70e
rm error commit
benmoriceau Jun 21, 2022
c125b93
Add comments
benmoriceau Jun 22, 2022
fb242b8
format
benmoriceau Jun 23, 2022
5847625
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 23, 2022
44c306c
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 23, 2022
9eb52f8
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 23, 2022
5cae07c
Use new state type
benmoriceau Jun 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-protocol/protocol-models/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ plugins {
dependencies {
implementation 'javax.validation:validation-api:1.1.0.Final'
implementation 'org.apache.commons:commons-lang3:3.11'
implementation 'io.vavr:vavr:0.10.4'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be in the review because it is already in the base branch

}

jsonSchema2Pojo {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.protocol.models;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.vavr.control.Either;
import java.util.ArrayList;
import java.util.List;

public class StateMessageHelper {

public static class AirbyteStateMessageListTypeReference extends TypeReference<List<AirbyteStateMessage>> {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be in the review because it is already in the base branch


/**
* This a takes a json blob state and tries return either a legacy state in the format of a json
* object or a state message with the new format which is a list of airbyte state message.
*
* @param state
* @return Either a json blob (on the left) or a structure state message.
*/
public static Either<JsonNode, List<AirbyteStateMessage>> getTypedState(JsonNode state) {
if (state == null) {
return Either.right(new ArrayList<>());
}
try {
return Either.right(Jsons.object(state, new AirbyteStateMessageListTypeReference()));
} catch (final IllegalArgumentException e) {
return Either.left(state);
}
}

}
1 change: 1 addition & 0 deletions airbyte-workers/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ configurations {
dependencies {
implementation 'io.fabric8:kubernetes-client:5.12.2'
implementation 'io.temporal:temporal-sdk:1.8.1'
implementation 'io.vavr:vavr:0.10.4'
implementation 'org.apache.ant:ant:1.10.10'
implementation 'org.apache.commons:commons-lang3:3.11'
implementation 'org.apache.commons:commons-text:1.9'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,90 @@

package io.airbyte.workers.internal;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ResetSourceConfiguration;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.StateMessageHelper;
import io.vavr.control.Either;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.EmptyStackException;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Stack;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;

/**
* This source will never emit any messages. It can be used in cases where that is helpful (hint:
* reset connection jobs).
*/
@Slf4j
public class EmptyAirbyteSource implements AirbyteSource {

private final AtomicBoolean hasEmittedState;
private final Queue<StreamDescriptor> streamDescriptors = new LinkedList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename this variable to streamsToReset to be slightly more descriptive

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private boolean isPartialReset;
private boolean isStarted = false;
private Either<List<AirbyteStateMessage>, AirbyteStateMessage> perStreamOrGlobalState;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the tech spec you had mentioned a StateWrapper (link) which would allow us to avoid this either or construction. Is there a reason we aren't using it here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I think changing StateMessageHelper to return a StateWrapper would help simplify this code a lot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


public EmptyAirbyteSource() {
hasEmittedState = new AtomicBoolean();
}

@Override
public void start(final WorkerSourceConfig sourceConfig, final Path jobRoot) throws Exception {
// no op.

try {
if (sourceConfig == null || sourceConfig.getSourceConnectionConfiguration() == null) {
isPartialReset = false;
} else {
ResetSourceConfiguration sourceConfiguration = Jsons.object(sourceConfig.getSourceConnectionConfiguration(), ResetSourceConfiguration.class);
streamDescriptors.addAll(sourceConfiguration.getStreamsToReset());
if (streamDescriptors.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can get rid of this conditional if you move this check up into the conditional on line 53.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

isPartialReset = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is to allow this reset source to work with our existing logic of full connection resets, which is persisting a ResetSourceConfiguration with empty streamsToReset since we currently don't have the logic to add streams to the stream_reset table.

However, once we have fully migrated to persisting all streams to the stream_reset table in the full connection reset case, we will probably want this to throw an exception in this case instead, since we wouldn't expect this to be called with an empty set of stream descriptors at that point.

Could you add a TODO comment here indicating that we want to change this, and link a brief issue explaining this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

} else {
Either<JsonNode, List<AirbyteStateMessage>> eitherState = StateMessageHelper.getTypedState(sourceConfig.getState().getState());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked the code and we currently aren't passing any state object around when performing a reset. In DefaultJobCreator, we only set the state on a JobSyncConfig); we don't set any state on the JobResetConnectionConfig. In the JobSyncConfig case, this state field is passed into the StandardSyncInput state field, which is passed into the WorkerSourceConfig that this function is taking in.

I started looking into adding this to the JobResetConnectionConfig, and something I noticed is that the JobSyncConfig's state field is currently just a generic JsonNode (this is the json schema for that field).

Are we planning to keep the state field on JobSyncConfig and StandardSyncInput as just a generic JsonNode? It sounds like that is the plan based on this ticket, but I wanted to confirm. If so, we will need to make sure that we serialize whatever the new state persistence method returns back into json when saving the state to the JobSyncConfig, and we will need to add that state field and do something similar when persisting the JobResetConnectionConfig.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed that during a sit and work together, we will keep it as a json blob.

if (eitherState.isLeft()) {
log.error("The state is not compatible with a partial reset that have been requested");
throw new IllegalStateException("Legacy state for a partial reset");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by this. I was under the impression that we will always be persisting streamsToReset in the ResetSourceConfiguration, even in the case where a connection still uses legacy state (it would just be persisting all streams in the connection in that case).

But looking at the logic in this method, it seems that you expect the sourceConnectionConfiguration to always be null if the connection uses legacy state. And that would mean we need to conditionally change how we set the sourceConnectionConfiguration in the temporal workflow based on the connection's current state type, which I wasn't aware of.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated it with a check that compare the stream in the catalog and in the stream. If we have a legacy state, they have to be the same.


if (eitherState.isRight()) {
List<AirbyteStateMessage> stateMessages = eitherState.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

List<AirbyteStateMessage> stateMessagesPerStreamOnly =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

stateMessages.stream().filter(stateMessage -> stateMessage.getStateType() == AirbyteStateType.STREAM).toList();
if (stateMessages.isEmpty()) {
log.error("No state has been provide, no reset will be perform, this will wun nothing");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.error("No state has been provide, no reset will be perform, this will wun nothing");
log.error("No state has been provide, no reset will be perform, this will run nothing");

Copy link
Contributor Author

@benmoriceau benmoriceau Jun 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed anymore

streamDescriptors.clear();
} else if (stateMessages.size() == 1 && stateMessages.get(0).getStateType() == AirbyteStateType.GLOBAL) {
// Global
perStreamOrGlobalState = Either.right(stateMessages.get(0));
} else if (stateMessagesPerStreamOnly.size() == stateMessages.size()) {
// Per Stream
perStreamOrGlobalState = Either.left(stateMessages);
} else {
throw new IllegalStateException("The state that the empty source recieved is in an unexpected format and can not be use");
}
}
isPartialReset = true;
}
}
} catch (IllegalArgumentException e) {
// No op, the new format is not supported
isPartialReset = false;
}
isStarted = true;
}

// always finished. it has no data to send.
Expand All @@ -43,11 +103,44 @@ public int getExitValue() {

@Override
public Optional<AirbyteMessage> attemptRead() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you! this is so much easier to read!

if (!hasEmittedState.get()) {
hasEmittedState.compareAndSet(false, true);
return Optional.of(new AirbyteMessage().withType(Type.STATE).withState(new AirbyteStateMessage().withData(Jsons.emptyObject())));
if (!isStarted) {
throw new IllegalStateException("The empty source has not been started.");
}

if (isPartialReset) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these conditionals are hard to follow. can you break them into smaller methods please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (perStreamOrGlobalState.isLeft()) {
// Per stream, it will emit one message per stream being reset
if (!streamDescriptors.isEmpty()) {
StreamDescriptor streamDescriptor = streamDescriptors.poll();
List<AirbyteStateMessage> stateMessages = perStreamOrGlobalState.getLeft();
try {
while (hasState(stateMessages, streamDescriptor)) {
streamDescriptor = streamDescriptors.poll();
}
return Optional.of(getNullPerStreamMessage(streamDescriptor));
} catch (EmptyStackException e) {
return Optional.empty();
}
} else {
return Optional.empty();
}
} else {
// global state, it will emit one global message
if (hasEmittedState.get()) {
return Optional.empty();
} else {
hasEmittedState.compareAndSet(false, true);
return Optional.of(getNullGlobalMessage(streamDescriptors, perStreamOrGlobalState.get()));
}
}
} else {
return Optional.empty();
if (!hasEmittedState.get()) {
hasEmittedState.compareAndSet(false, true);
return Optional.of(new AirbyteMessage().withType(Type.STATE)
.withState(new AirbyteStateMessage().withStateType(AirbyteStateType.LEGACY).withData(Jsons.emptyObject())));
} else {
return Optional.empty();
}
}
}

Expand All @@ -61,4 +154,56 @@ public void cancel() throws Exception {
// no op.
}

private boolean hasState(List<AirbyteStateMessage> stateMessages, StreamDescriptor streamDescriptor) {
return stateMessages.stream().filter(stateMessage -> stateMessage.getStream().getStreamDescriptor().equals(streamDescriptor)).count() != 0;
}

private AirbyteMessage getNullPerStreamMessage(StreamDescriptor configStreamDescriptor) {
return new AirbyteMessage()
.withType(Type.STATE)
.withState(
new AirbyteStateMessage()
.withStateType(AirbyteStateType.STREAM)
.withStream(
new AirbyteStreamState()
.withStreamDescriptor(new io.airbyte.protocol.models.StreamDescriptor()
.withName(configStreamDescriptor.getName())
.withNamespace(configStreamDescriptor.getNamespace()))
.withStreamState(null)));
}

private AirbyteMessage getNullGlobalMessage(Queue<StreamDescriptor> configStreamDescriptors, AirbyteStateMessage currentState) {
AirbyteGlobalState globalState = new AirbyteGlobalState();
globalState.setStreamStates(new ArrayList<>());

currentState.getGlobal().getStreamStates().forEach(exitingState -> globalState.getStreamStates()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo: rename exitingState -> existingState

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.add(
new AirbyteStreamState()
.withStreamDescriptor(exitingState.getStreamDescriptor())
.withStreamState(
configStreamDescriptors.contains(new StreamDescriptor()
.withName(exitingState.getStreamDescriptor().getName())
.withNamespace(exitingState.getStreamDescriptor().getNamespace())) ?
null : exitingState.getStreamState()
)
)
);

if (currentState.getGlobal().getStreamStates().size() ==
globalState.getStreamStates().stream().filter(streamState -> streamState.getStreamState() == null).count()) {
log.info("All the streams of a global state have been reset, the shared state will be erased as well");
globalState.setSharedState(null);
} else {
log.info("This is a partial reset, the shared state will be preserve");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.info("This is a partial reset, the shared state will be preserve");
log.info("This is a partial reset, the shared state will be preserved");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

globalState.setSharedState(currentState.getGlobal().getSharedState());
}

return new AirbyteMessage()
.withType(Type.STATE)
.withState(
new AirbyteStateMessage()
.withStateType(AirbyteStateType.GLOBAL)
.withGlobal(globalState));
}

}
Loading