Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4082,16 +4082,15 @@ components:
type: object
required:
- transformType
- streamDescriptor
properties:
transformType:
type: string
enum:
- add_stream
- remove_stream
- update_stream
addStream:
$ref: "#/components/schemas/StreamDescriptor"
removeStream:
streamDescriptor:
$ref: "#/components/schemas/StreamDescriptor"
updateStream:
type: array
Expand All @@ -4103,46 +4102,47 @@ components:
description: "Describes the difference between two Streams."
required:
- transformType
- fieldName
properties:
transformType:
type: string
enum:
- add_field
- remove_field
- update_field_schema
fieldName:
$ref: "#/components/schemas/FieldName"
addField:
$ref: "#/components/schemas/FieldNameAndSchema"
$ref: "#/components/schemas/FieldAdd"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why separate schemas for FieldAdd and FieldRemove? is there a benefit to this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alovew I could be convinced this was the wrong thing. What I want to make clear in the API is the pattern that each transformation type gets its own, separately named object. For add / remove it is slightly redundant. But when we add something like "add is hash-able" or something that it is clear that should go in its own field with its own object.

Basically was trying to make the pattern clear and thought the redundancy of one object was worth it. wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, yes this makes sense to me. I'm good with this 👍

removeField:
$ref: "#/components/schemas/FieldNameAndSchema"
$ref: "#/components/schemas/FieldRemove"
updateFieldSchema:
$ref: "#/components/schemas/FieldSchemaUpdate"
FieldNameAndSchema:
FieldAdd:
type: object
required:
- fieldName
- fieldSchema
properties:
fieldName:
type: array
items:
type: string
fieldSchema:
schema:
$ref: "#/components/schemas/FieldSchema"
FieldRemove:
type: object
properties:
schema:
$ref: "#/components/schemas/FieldSchema"
FieldSchemaUpdate:
type: object
required:
- fieldName
- oldSchema
- newSchema
properties:
fieldName:
type: array
items:
type: string
oldSchema:
$ref: "#/components/schemas/FieldSchema"
newSchema:
$ref: "#/components/schemas/FieldSchema"
FieldName:
description: A field name is a list of strings that for the path to the field.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: form?

also question - is the array of strings needed because of sources that have nested schemas? for db tables, will fieldName always be an array of one item (the column name)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed the typo

also question - is the array of strings needed because of sources that have nested schemas? for db tables, will fieldName always be an array of one item (the column name)?

yes. exactly.

type: array
items:
type: string
FieldSchema:
description: JSONSchema representation of the field
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import io.airbyte.commons.util.MoreLists;
import io.airbyte.protocol.models.transform_models.FieldTransform;
import io.airbyte.protocol.models.transform_models.StreamTransform;
import io.airbyte.protocol.models.transform_models.UpdateFieldTransform;
import io.airbyte.protocol.models.transform_models.UpdateFieldSchemaTransform;
import io.airbyte.protocol.models.transform_models.UpdateStreamTransform;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -306,7 +306,7 @@ public static Set<StreamTransform> getCatalogDiff(final AirbyteCatalog oldCatalo
final AirbyteStream streamOld = descriptorToStreamOld.get(descriptor);
final AirbyteStream streamNew = descriptorToStreamNew.get(descriptor);
if (!streamOld.equals(streamNew)) {
streamTransforms.add(StreamTransform.createUpdateStreamTransform(getStreamDiff(descriptor, streamOld, streamNew)));
streamTransforms.add(StreamTransform.createUpdateStreamTransform(descriptor, getStreamDiff(descriptor, streamOld, streamNew)));
}
});

Expand All @@ -333,10 +333,10 @@ private static UpdateStreamTransform getStreamDiff(final StreamDescriptor descri
final JsonNode newType = fieldNameToTypeNew.get(fieldName);

if (!oldType.equals(newType)) {
fieldTransforms.add(FieldTransform.createUpdateFieldTransform(new UpdateFieldTransform(fieldName, oldType, newType)));
fieldTransforms.add(FieldTransform.createUpdateFieldTransform(fieldName, new UpdateFieldSchemaTransform(oldType, newType)));
}
});
return new UpdateStreamTransform(descriptor, fieldTransforms);
return new UpdateStreamTransform(fieldTransforms);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,8 @@
@ToString
public class AddFieldTransform {

private final List<String> fieldName;
private final JsonNode schema;

public List<String> getFieldName() {
return new ArrayList<>(fieldName);
}

public JsonNode getSchema() {
return schema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,39 @@
public final class FieldTransform {

private final FieldTransformType transformType;
private final List<String> fieldName;
private final AddFieldTransform addFieldTransform;
private final RemoveFieldTransform removeFieldTransform;
private final UpdateFieldTransform updateFieldTransform;
private final UpdateFieldSchemaTransform updateFieldTransform;

public static FieldTransform createAddFieldTransform(final List<String> fieldName, final JsonNode schema) {
return createAddFieldTransform(new AddFieldTransform(fieldName, schema));
return createAddFieldTransform(fieldName, new AddFieldTransform(schema));
}

public static FieldTransform createAddFieldTransform(final AddFieldTransform addFieldTransform) {
return new FieldTransform(FieldTransformType.ADD_FIELD, addFieldTransform, null, null);
public static FieldTransform createAddFieldTransform(final List<String> fieldName, final AddFieldTransform addFieldTransform) {
return new FieldTransform(FieldTransformType.ADD_FIELD, fieldName, addFieldTransform, null, null);
}

public static FieldTransform createRemoveFieldTransform(final List<String> fieldName, final JsonNode schema) {
return createRemoveFieldTransform(new RemoveFieldTransform(fieldName, schema));
return createRemoveFieldTransform(fieldName, new RemoveFieldTransform(fieldName, schema));
}

public static FieldTransform createRemoveFieldTransform(final RemoveFieldTransform removeFieldTransform) {
return new FieldTransform(FieldTransformType.REMOVE_FIELD, null, removeFieldTransform, null);
public static FieldTransform createRemoveFieldTransform(final List<String> fieldName, final RemoveFieldTransform removeFieldTransform) {
return new FieldTransform(FieldTransformType.REMOVE_FIELD, fieldName, null, removeFieldTransform, null);
}

public static FieldTransform createUpdateFieldTransform(final UpdateFieldTransform updateFieldTransform) {
return new FieldTransform(FieldTransformType.UPDATE_FIELD, null, null, updateFieldTransform);
public static FieldTransform createUpdateFieldTransform(final List<String> fieldName, final UpdateFieldSchemaTransform updateFieldTransform) {
return new FieldTransform(FieldTransformType.UPDATE_FIELD, fieldName, null,null, updateFieldTransform);
}

public FieldTransformType getTransformType() {
return transformType;
}

public List<String> getFieldName() {
return fieldName;
}

public AddFieldTransform getAddFieldTransform() {
return addFieldTransform;
}
Expand All @@ -55,7 +60,7 @@ public RemoveFieldTransform getRemoveFieldTransform() {
return removeFieldTransform;
}

public UpdateFieldTransform getUpdateFieldTransform() {
public UpdateFieldSchemaTransform getUpdateFieldTransform() {
return updateFieldTransform;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,27 @@
public final class StreamTransform {

private final StreamTransformType transformType;
private final AddStreamTransform addStreamTransform;
private final RemoveStreamTransform removeStreamTransform;
private final StreamDescriptor streamDescriptor;
private final UpdateStreamTransform updateStreamTransform;

public static StreamTransform createAddStreamTransform(final StreamDescriptor streamDescriptor) {
return createAddStreamTransform(new AddStreamTransform(streamDescriptor));
}

public static StreamTransform createAddStreamTransform(final AddStreamTransform addStreamTransform) {
return new StreamTransform(StreamTransformType.ADD_STREAM, addStreamTransform, null, null);
return new StreamTransform(StreamTransformType.ADD_STREAM, streamDescriptor, null);
}

public static StreamTransform createRemoveStreamTransform(final StreamDescriptor streamDescriptor) {
return createRemoveStreamTransform(new RemoveStreamTransform(streamDescriptor));
}

public static StreamTransform createRemoveStreamTransform(final RemoveStreamTransform removeStreamTransform) {
return new StreamTransform(StreamTransformType.REMOVE_STREAM, null, removeStreamTransform, null);
return new StreamTransform(StreamTransformType.REMOVE_STREAM, streamDescriptor, null);
}

public static StreamTransform createUpdateStreamTransform(final UpdateStreamTransform updateStreamTransform) {
return new StreamTransform(StreamTransformType.UPDATE_STREAM, null, null, updateStreamTransform);
public static StreamTransform createUpdateStreamTransform(final StreamDescriptor streamDescriptor, final UpdateStreamTransform updateStreamTransform) {
return new StreamTransform(StreamTransformType.UPDATE_STREAM, streamDescriptor, updateStreamTransform);
}

public StreamTransformType getTransformType() {
return transformType;
}

public AddStreamTransform getAddStreamTransform() {
return addStreamTransform;
}

public RemoveStreamTransform getRemoveStreamTransform() {
return removeStreamTransform;
public StreamDescriptor getStreamDescriptor() {
return streamDescriptor;
}

public UpdateStreamTransform getUpdateStreamTransform() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,11 @@
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class UpdateFieldTransform {
public class UpdateFieldSchemaTransform {

private final List<String> fieldName;
private final JsonNode oldSchema;
private final JsonNode newSchema;

public List<String> getFieldName() {
return new ArrayList<>(fieldName);
}

public JsonNode getOldSchema() {
return oldSchema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
@ToString
public class UpdateStreamTransform {

private final StreamDescriptor streamDescriptor;
private final Set<FieldTransform> fieldTransforms;

public Set<FieldTransform> getFieldTransforms() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.protocol.models.transform_models.FieldTransform;
import io.airbyte.protocol.models.transform_models.StreamTransform;
import io.airbyte.protocol.models.transform_models.UpdateFieldTransform;
import io.airbyte.protocol.models.transform_models.UpdateFieldSchemaTransform;
import io.airbyte.protocol.models.transform_models.UpdateStreamTransform;
import java.io.IOException;
import java.util.Comparator;
Expand Down Expand Up @@ -103,12 +103,11 @@ void testGetCatalogDiff() throws IOException {
final List<StreamTransform> expectedDiff = Stream.of(
StreamTransform.createAddStreamTransform(new StreamDescriptor().withName("sales")),
StreamTransform.createRemoveStreamTransform(new StreamDescriptor().withName("accounts")),
StreamTransform.createUpdateStreamTransform(new UpdateStreamTransform(new StreamDescriptor().withName("users"), Set.of(
StreamTransform.createUpdateStreamTransform(new StreamDescriptor().withName("users"), new UpdateStreamTransform(Set.of(
FieldTransform.createAddFieldTransform(List.of("COD"), schema2.get("properties").get("COD")),
FieldTransform.createRemoveFieldTransform(List.of("something2"), schema1.get("properties").get("something2")),
FieldTransform.createRemoveFieldTransform(List.of("HKD"), schema1.get("properties").get("HKD")),
FieldTransform.createUpdateFieldTransform(new UpdateFieldTransform(
List.of("CAD"),
FieldTransform.createUpdateFieldTransform(List.of("CAD"), new UpdateFieldSchemaTransform(
schema1.get("properties").get("CAD"),
schema2.get("properties").get("CAD")))))))
.sorted(STREAM_TRANSFORM_COMPARATOR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

package io.airbyte.server.converters;

import io.airbyte.api.model.generated.FieldNameAndSchema;
import io.airbyte.api.model.generated.FieldAdd;
import io.airbyte.api.model.generated.FieldRemove;
import io.airbyte.api.model.generated.FieldSchemaUpdate;
import io.airbyte.api.model.generated.FieldTransform;
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.api.model.generated.StreamTransform;
import io.airbyte.commons.enums.Enums;
import io.airbyte.protocol.models.transform_models.FieldTransformType;
Expand All @@ -23,27 +23,10 @@ public class CatalogDiffConverters {
public static StreamTransform streamTransformToApi(final io.airbyte.protocol.models.transform_models.StreamTransform transform) {
return new StreamTransform()
.transformType(Enums.convertTo(transform.getTransformType(), StreamTransform.TransformTypeEnum.class))
.addStream(addStreamToApi(transform).orElse(null))
.removeStream(removeStreamToApi(transform).orElse(null))
.streamDescriptor(ProtocolConverters.streamDescriptorToApi(transform.getStreamDescriptor()))
.updateStream(updateStreamToApi(transform).orElse(null));
}

public static Optional<StreamDescriptor> addStreamToApi(final io.airbyte.protocol.models.transform_models.StreamTransform transform) {
if (transform.getTransformType() == StreamTransformType.ADD_STREAM) {
return Optional.ofNullable(ProtocolConverters.streamDescriptorToApi(transform.getAddStreamTransform().getStreamDescriptor()));
} else {
return Optional.empty();
}
}

public static Optional<StreamDescriptor> removeStreamToApi(final io.airbyte.protocol.models.transform_models.StreamTransform transform) {
if (transform.getTransformType() == StreamTransformType.REMOVE_STREAM) {
return Optional.ofNullable(ProtocolConverters.streamDescriptorToApi(transform.getRemoveStreamTransform().getStreamDescriptor()));
} else {
return Optional.empty();
}
}

public static Optional<List<FieldTransform>> updateStreamToApi(final io.airbyte.protocol.models.transform_models.StreamTransform transform) {
if (transform.getTransformType() == StreamTransformType.UPDATE_STREAM) {
return Optional.ofNullable(transform.getUpdateStreamTransform()
Expand All @@ -59,26 +42,25 @@ public static Optional<List<FieldTransform>> updateStreamToApi(final io.airbyte.
public static FieldTransform fieldTransformToApi(final io.airbyte.protocol.models.transform_models.FieldTransform transform) {
return new FieldTransform()
.transformType(Enums.convertTo(transform.getTransformType(), FieldTransform.TransformTypeEnum.class))
.fieldName(transform.getFieldName())
.addField(addFieldToApi(transform).orElse(null))
.removeField(removeFieldToApi(transform).orElse(null))
.updateFieldSchema(updateFieldToApi(transform).orElse(null));
}

private static Optional<FieldNameAndSchema> addFieldToApi(final io.airbyte.protocol.models.transform_models.FieldTransform transform) {
private static Optional<FieldAdd> addFieldToApi(final io.airbyte.protocol.models.transform_models.FieldTransform transform) {
if (transform.getTransformType() == FieldTransformType.ADD_FIELD) {
return Optional.of(new FieldNameAndSchema()
.fieldName(transform.getAddFieldTransform().getFieldName())
.fieldSchema(transform.getAddFieldTransform().getSchema()));
return Optional.of(new FieldAdd()
.schema(transform.getAddFieldTransform().getSchema()));
} else {
return Optional.empty();
}
}

private static Optional<FieldNameAndSchema> removeFieldToApi(final io.airbyte.protocol.models.transform_models.FieldTransform transform) {
private static Optional<FieldRemove> removeFieldToApi(final io.airbyte.protocol.models.transform_models.FieldTransform transform) {
if (transform.getTransformType() == FieldTransformType.REMOVE_FIELD) {
return Optional.of(new FieldNameAndSchema()
.fieldName(transform.getRemoveFieldTransform().getFieldName())
.fieldSchema(transform.getRemoveFieldTransform().getSchema()));
return Optional.of(new FieldRemove()
.schema(transform.getRemoveFieldTransform().getSchema()));
} else {
return Optional.empty();
}
Expand All @@ -87,7 +69,6 @@ private static Optional<FieldNameAndSchema> removeFieldToApi(final io.airbyte.pr
private static Optional<FieldSchemaUpdate> updateFieldToApi(final io.airbyte.protocol.models.transform_models.FieldTransform transform) {
if (transform.getTransformType() == FieldTransformType.UPDATE_FIELD) {
return Optional.of(new FieldSchemaUpdate()
.fieldName(transform.getUpdateFieldTransform().getFieldName())
.oldSchema(transform.getUpdateFieldTransform().getOldSchema())
.newSchema(transform.getUpdateFieldTransform().getNewSchema()));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE
.isSyncing(expected.getIsSyncing())
.catalogDiff(new CatalogDiff().transforms(List.of(
new StreamTransform().transformType(TransformTypeEnum.ADD_STREAM)
.addStream(new StreamDescriptor().name("users-data1"))
.streamDescriptor(new StreamDescriptor().name("users-data1"))
.updateStream(null))))
.resourceRequirements(new ResourceRequirements()
.cpuRequest(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getCpuRequest())
Expand Down
Loading