Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.persistence;

import static io.airbyte.db.instance.configs.jooq.generated.Tables.STATE;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StateType;
import io.airbyte.config.StateWrapper;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.StreamDescriptor;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.jooq.DSLContext;
import org.jooq.JSONB;
import org.jooq.Record;
import org.jooq.RecordMapper;
import org.jooq.impl.DSL;

/**
* State Persistence
*
* Handle persisting States to the Database.
*
* Supports migration from Legacy to Global or Stream. Other type migrations need to go through a
* reset. (an exception will be thrown)
*/
public class StatePersistence {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Add a class JavaDoc. This will be one less issue to fix when we enable PMD/Checkstyle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


private final ExceptionWrappingDatabase database;

public StatePersistence(final Database database) {
this.database = new ExceptionWrappingDatabase(database);
}

/**
* Get the current State of a Connection
*
* @param connectionId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jdpgrailsdev will our linters scream at us if we have @param, @return, and @throws without a description in them?

* @return
* @throws IOException
*/
public Optional<StateWrapper> getCurrentState(final UUID connectionId) throws IOException {
final List<StateRecord> records = this.database.query(ctx -> getStateRecords(ctx, connectionId));

if (records.isEmpty()) {
return Optional.empty();
}

return switch (getStateType(connectionId, records)) {
case GLOBAL -> Optional.of(buildGlobalState(records));
case STREAM -> Optional.of(buildStreamState(records));
default -> Optional.of(buildLegacyState(records));
};
}

/**
* Create or update the states described in the StateWrapper. Null states will be deleted.
*
* @param connectionId
* @param state
* @throws IOException
*/
public void updateOrCreateState(final UUID connectionId, final StateWrapper state) throws IOException {
final Optional<StateWrapper> previousState = getCurrentState(connectionId);
final boolean isMigration = previousState.isPresent() && previousState.get().getStateType() == StateType.LEGACY &&
state.getStateType() != StateType.LEGACY;

// The only case where we allow a state migration is moving from LEGACY.
// We expect any other migration to go through an explicit reset.
if (!isMigration && previousState.isPresent() && previousState.get().getStateType() != state.getStateType()) {
throw new IOException("Unexpected type migration from '" + previousState.get().getStateType() + "' to '" + state.getStateType() +
"'. Migration of StateType need to go through an explicit reset.");
}

this.database.transaction(ctx -> {
if (isMigration) {
clearLegacyState(ctx, connectionId);
}
switch (state.getStateType()) {
case GLOBAL -> saveGlobalState(ctx, connectionId, state.getGlobal().getGlobal());
case STREAM -> saveStreamState(ctx, connectionId, state.getStateMessages());
case LEGACY -> saveLegacyState(ctx, connectionId, state.getLegacyState());
}
return null;
});
}

private void clearLegacyState(final DSLContext ctx, final UUID connectionId) {
writeStateToDb(ctx, connectionId, null, null, StateType.LEGACY, null);
}

private void saveGlobalState(final DSLContext ctx, final UUID connectionId, final AirbyteGlobalState globalState) {
writeStateToDb(ctx, connectionId, null, null, StateType.GLOBAL, globalState.getSharedState());
for (final AirbyteStreamState streamState : globalState.getStreamStates()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Any reason not to use a stream/lambda for this? Are we concerned about swallowing any exceptions raised by writeStateToDb?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is very procedural in the end, a for loop felt more straightforward in this case.
About the exceptions, I'd rather let them bubble up for debugging errors if any.

writeStateToDb(ctx,
connectionId,
streamState.getStreamDescriptor().getName(),
streamState.getStreamDescriptor().getNamespace(),
StateType.GLOBAL,
streamState.getStreamState());
}
}

private void saveStreamState(final DSLContext ctx, final UUID connectionId, final List<AirbyteStateMessage> stateMessages) {
for (final AirbyteStateMessage stateMessage : stateMessages) {
final AirbyteStreamState streamState = stateMessage.getStream();
writeStateToDb(ctx,
connectionId,
streamState.getStreamDescriptor().getName(),
streamState.getStreamDescriptor().getNamespace(),
StateType.STREAM,
streamState.getStreamState());
}
}

private void saveLegacyState(final DSLContext ctx, final UUID connectionId, final JsonNode state) {
writeStateToDb(ctx, connectionId, null, null, StateType.LEGACY, state);
}

/**
* Performs the actual SQL operation depending on the state
*
* If the state is null, it will delete the row, otherwise do an insert or update on conflict
*/
void writeStateToDb(final DSLContext ctx,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice helper! great code reuse.

final UUID connectionId,
final String streamName,
final String namespace,
final StateType stateType,
final JsonNode state) {
if (state != null) {
boolean hasState = ctx.selectFrom(STATE)
.where(
STATE.CONNECTION_ID.eq(connectionId),
streamName != null ? STATE.STREAM_NAME.eq(streamName) : STATE.STREAM_NAME.isNull(),
namespace != null ? STATE.NAMESPACE.eq(namespace) : STATE.NAMESPACE.isNull())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines are repeated a few times in this function. Would it make sense to try to encapsulte/DRY this, or do you think that would make this logic less readable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored the inline if, which had the benefit of coming with a tiny bit of doc about why we need the null or equal in the first place.
For the where clause itself, I think it makes it easier to read the where clause rather than abstract it.

.fetch().isNotEmpty();

final JSONB jsonbState = JSONB.valueOf(Jsons.serialize(state));
final OffsetDateTime now = OffsetDateTime.now();

if (!hasState) {
ctx.insertInto(STATE)
.columns(
STATE.ID,
STATE.CREATED_AT,
STATE.UPDATED_AT,
STATE.CONNECTION_ID,
STATE.STREAM_NAME,
STATE.NAMESPACE,
STATE.STATE_,
STATE.TYPE)
.values(
UUID.randomUUID(),
now,
now,
connectionId,
streamName,
namespace,
jsonbState,
convertStateType(stateType))
.execute();

} else {
ctx.update(STATE)
.set(STATE.UPDATED_AT, now)
.set(STATE.STATE_, jsonbState)
.where(
STATE.CONNECTION_ID.eq(connectionId),
streamName != null ? STATE.STREAM_NAME.eq(streamName) : STATE.STREAM_NAME.isNull(),
namespace != null ? STATE.NAMESPACE.eq(namespace) : STATE.NAMESPACE.isNull())
.execute();
}

} else {
// If the state is null, we remove the state instead of keeping a null row
ctx.deleteFrom(STATE)
.where(
STATE.CONNECTION_ID.eq(connectionId),
streamName != null ? STATE.STREAM_NAME.eq(streamName) : STATE.STREAM_NAME.isNull(),
namespace != null ? STATE.NAMESPACE.eq(namespace) : STATE.NAMESPACE.isNull())
.execute();
}
}

/**
* Get the StateType for a given list of StateRecords
*
* @param connectionId The connectionId of the records, used to add more debugging context if an
* error is detected
* @param records The list of StateRecords to process, must not be empty
* @return the StateType of the records
* @throws IOException If StateRecords have inconsistent types
*/
private io.airbyte.db.instance.configs.jooq.generated.enums.StateType getStateType(final UUID connectionId, final List<StateRecord> records)
throws IOException {
final List<io.airbyte.db.instance.configs.jooq.generated.enums.StateType> types = records.stream().map(r -> r.type).distinct().toList();
if (types.size() == 1) {
return types.get(0);
}

throw new IOException("Inconsistent StateTypes for connectionId " + connectionId +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It looks like an IllegalStateException to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

" (" + String.join(", ", types.stream().map(io.airbyte.db.instance.configs.jooq.generated.enums.StateType::getLiteral).toList()) + ")");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit/don't have to modify: I think that map(stateType -> stateType.getLiteral()) will be easier to read :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not convinced due to the full path to the class as well. your comment convinced me 😂

}

/**
* Get the state records from the DB
*
* @param ctx A valid DSL context to use for the query
* @param connectionId the ID of the connection
* @return The StateRecords for the connectionId
*/
private List<StateRecord> getStateRecords(final DSLContext ctx, final UUID connectionId) {
return ctx.select(DSL.asterisk())
.from(STATE)
.where(STATE.CONNECTION_ID.eq(connectionId))
.fetch(getStateRecordMapper())
.stream().toList();
}

/**
* Build Global state
*
* The list of records can contain one global shared state that is the state without streamName and
* without namespace The other records should be tronslated into AirbyteStreamState
*/
private StateWrapper buildGlobalState(final List<StateRecord> records) {
// Split the global shared state from the other per stream records
final Map<Boolean, List<StateRecord>> partitions = records.stream()
.collect(Collectors.partitioningBy(r -> r.streamName == null && r.namespace == null));

final AirbyteGlobalState globalState = new AirbyteGlobalState()
.withSharedState(partitions.get(Boolean.TRUE).stream().map(r -> r.state).findFirst().orElse(null))
.withStreamStates(partitions.get(Boolean.FALSE).stream().map(this::buildAirbyteStreamState).toList());

final AirbyteStateMessage msg = new AirbyteStateMessage()
.withStateType(AirbyteStateType.GLOBAL)
.withGlobal(globalState);
return new StateWrapper().withStateType(StateType.GLOBAL).withGlobal(msg);
}

/**
* Build StateWrapper for a PerStream state
*/
private StateWrapper buildStreamState(final List<StateRecord> records) {
List<AirbyteStateMessage> messages = records.stream().map(
record -> new AirbyteStateMessage()
.withStateType(AirbyteStateType.STREAM)
.withStream(buildAirbyteStreamState(record)))
.toList();
return new StateWrapper().withStateType(StateType.STREAM).withStateMessages(messages);
}

/**
* Build a StateWrapper for Legacy state
*/
private StateWrapper buildLegacyState(final List<StateRecord> records) {
return new StateWrapper()
.withStateType(StateType.LEGACY)
.withLegacyState(records.get(0).state);
}

/**
* Convert a StateRecord to an AirbyteStreamState
*/
private AirbyteStreamState buildAirbyteStreamState(final StateRecord record) {
return new AirbyteStreamState()
.withStreamDescriptor(new StreamDescriptor().withName(record.streamName).withNamespace(record.namespace))
.withStreamState(record.state);
}

private static RecordMapper<Record, StateRecord> getStateRecordMapper() {
return record -> new StateRecord(
record.get(STATE.TYPE, io.airbyte.db.instance.configs.jooq.generated.enums.StateType.class),
record.get(STATE.STREAM_NAME, String.class),
record.get(STATE.NAMESPACE, String.class),
Jsons.deserialize(record.get(STATE.STATE_).data()));
}

private static io.airbyte.db.instance.configs.jooq.generated.enums.StateType convertStateType(final StateType stateType) {
return switch (stateType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a helper called Enums.convertTo(...) that should allow you to avoid needing to do this conversion.

case LEGACY -> io.airbyte.db.instance.configs.jooq.generated.enums.StateType.LEGACY;
case GLOBAL -> io.airbyte.db.instance.configs.jooq.generated.enums.StateType.GLOBAL;
case STREAM -> io.airbyte.db.instance.configs.jooq.generated.enums.StateType.STREAM;
default -> throw new RuntimeException("Unsupported StateType: " + stateType);
};
}

private record StateRecord(
io.airbyte.db.instance.configs.jooq.generated.enums.StateType type,
String streamName,
String namespace,
JsonNode state) {}

}
Loading