diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DatasetConfigFacet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DatasetConfigFacet.java new file mode 100644 index 0000000000000..660dcd315aba8 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DatasetConfigFacet.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Map; + +/** Builtin config facet for dataset. */ +@PublicEvolving +public interface DatasetConfigFacet extends LineageDatasetFacet { + Map config(); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DatasetSchemaFacet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DatasetSchemaFacet.java new file mode 100644 index 0000000000000..c25b2a07f5d96 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DatasetSchemaFacet.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Map; + +/** Builtin schema facet for dataset. */ +@PublicEvolving +public interface DatasetSchemaFacet extends LineageDatasetFacet { + Map> fields(); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DatasetSchemaField.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DatasetSchemaField.java new file mode 100644 index 0000000000000..db1def237c4de --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DatasetSchemaField.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +/** Field for schema in dataset. */ +@PublicEvolving +public interface DatasetSchemaField { + /** The name of the field. */ + String name(); + + /** The type of the field. */ + T type(); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageGraph.java new file mode 100644 index 0000000000000..15e70bfec6735 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageGraph.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.Internal; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** Default implementation for {@link LineageGraph}. */ +@Internal +public class DefaultLineageGraph implements LineageGraph { + private final List lineageEdges; + private final List sources; + private final List sinks; + + private DefaultLineageGraph(List lineageEdges) { + this.lineageEdges = lineageEdges; + + Set deduplicatedSources = new HashSet<>(); + Set deduplicatedSinks = new HashSet<>(); + for (LineageEdge lineageEdge : lineageEdges) { + deduplicatedSources.add(lineageEdge.source()); + deduplicatedSinks.add(lineageEdge.sink()); + } + this.sources = new ArrayList<>(deduplicatedSources); + this.sinks = new ArrayList<>(deduplicatedSinks); + } + + @Override + public List sources() { + return Collections.unmodifiableList(sources); + } + + @Override + public List sinks() { + return Collections.unmodifiableList(sinks); + } + + @Override + public List relations() { + return Collections.unmodifiableList(lineageEdges); + } + + public static LineageGraphBuilder builder() { + return new LineageGraphBuilder(); + } + + /** Build the default lineage graph from {@link LineageEdge}. */ + @Internal + public static class LineageGraphBuilder { + private final List lineageEdges; + + private LineageGraphBuilder() { + this.lineageEdges = new ArrayList<>(); + } + + public LineageGraphBuilder addLineageEdge(LineageEdge lineageEdge) { + this.lineageEdges.add(lineageEdge); + return this; + } + + public LineageGraphBuilder addLineageEdges(LineageEdge... lineageEdges) { + this.lineageEdges.addAll(Arrays.asList(lineageEdges)); + return this; + } + + public LineageGraph build() { + return new DefaultLineageGraph(lineageEdges); + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageDataset.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageDataset.java new file mode 100644 index 0000000000000..82d853d7e5264 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageDataset.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Map; + +/** Lineage dataset represents the source or sink in the job. */ +@PublicEvolving +public interface LineageDataset { + /* Name for this particular dataset. */ + String name(); + + /* Unique name for this dataset's storage, for example, url for jdbc connector and location for lakehouse connector. */ + String namespace(); + + /* Facets for the lineage vertex to describe the particular information of dataset, such as schema and config. */ + Map facets(); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageDatasetFacet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageDatasetFacet.java new file mode 100644 index 0000000000000..58cdb2b171755 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageDatasetFacet.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +/** Facet interface for dataset. */ +@PublicEvolving +public interface LineageDatasetFacet { + /** Name for the facet which will be used as key in facets of LineageDataset. */ + String name(); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageEdge.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageEdge.java new file mode 100644 index 0000000000000..dcc8f92d3d772 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageEdge.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +/** Lineage edge from source to sink. */ +@PublicEvolving +public interface LineageEdge { + SourceLineageVertex source(); + + LineageVertex sink(); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java new file mode 100644 index 0000000000000..7add1ce88edb0 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.api.graph.StreamGraph; + +import java.util.List; + +/** + * Job lineage is built according to {@link StreamGraph}, users can get sources, sinks and + * relationships from lineage and manage the relationship between jobs and tables. + */ +@PublicEvolving +public interface LineageGraph { + /* Source lineage vertex list. */ + List sources(); + + /* Sink lineage vertex list. */ + List sinks(); + + /* lineage edges from sources to sinks. */ + List relations(); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertex.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertex.java new file mode 100644 index 0000000000000..16e4549b57ec6 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertex.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.List; + +/** + * Lineage vertex represents the connectors in lineage graph, including source {@link + * SourceLineageVertex} and sink. + */ +@PublicEvolving +public interface LineageVertex { + /* List of input (for source) or output (for sink) datasets interacted with by the connector */ + List datasets(); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/SourceLineageVertex.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/SourceLineageVertex.java new file mode 100644 index 0000000000000..9657a2170baf1 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/SourceLineageVertex.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; + +/** + * Lineage vertex for source which has {@link Boundedness} to indicate whether the data for the + * source is bounded. + */ +@PublicEvolving +public interface SourceLineageVertex extends LineageVertex { + /** + * The boundedness for the source connector, users can get boundedness for each sources in the + * lineage and determine the job execution mode with RuntimeExecutionMode. + */ + Boundedness boundedness(); +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/DefaultLineageGraphTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/DefaultLineageGraphTest.java new file mode 100644 index 0000000000000..e2302ac5b8466 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/DefaultLineageGraphTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.api.connector.source.Boundedness; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Testing for lineage graph. */ +class DefaultLineageGraphTest { + @Test + void testLineageGraph() { + SourceLineageVertex source1 = new TestingSourceLineageVertex("source1"); + SourceLineageVertex source2 = new TestingSourceLineageVertex("source2"); + SourceLineageVertex source3 = new TestingSourceLineageVertex("source3"); + LineageVertex sink1 = new TestingLineageVertex("sink1"); + LineageVertex sink2 = new TestingLineageVertex("sink2"); + LineageGraph lineageGraph = + DefaultLineageGraph.builder() + .addLineageEdge(new TestingLineageEdge(source1, sink1)) + .addLineageEdges( + new TestingLineageEdge(source2, sink2), + new TestingLineageEdge(source3, sink1), + new TestingLineageEdge(source1, sink2)) + .build(); + assertThat(lineageGraph.sources()).containsExactlyInAnyOrder(source1, source2, source3); + assertThat(lineageGraph.sinks()).containsExactlyInAnyOrder(sink1, sink2); + assertThat(lineageGraph.relations()).hasSize(4); + } + + /** Testing sink lineage vertex. */ + static class TestingLineageVertex implements LineageVertex { + private final String id; + + private TestingLineageVertex(String id) { + this.id = id; + } + + private String id() { + return id; + } + + @Override + public List datasets() { + return new ArrayList<>(); + } + } + + /** Testing source lineage vertex. */ + static class TestingSourceLineageVertex extends TestingLineageVertex + implements SourceLineageVertex { + + private TestingSourceLineageVertex(String id) { + super(id); + } + + @Override + public Boundedness boundedness() { + return Boundedness.BOUNDED; + } + } + + /** Testing lineage edge. */ + static class TestingLineageEdge implements LineageEdge { + private final SourceLineageVertex source; + private final LineageVertex sink; + + private TestingLineageEdge(SourceLineageVertex source, LineageVertex sink) { + this.source = source; + this.sink = sink; + } + + @Override + public SourceLineageVertex source() { + return source; + } + + @Override + public LineageVertex sink() { + return sink; + } + } +}