Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.streaming.api.lineage;

import org.apache.flink.annotation.PublicEvolving;

import java.util.Map;

/** Builtin config facet for dataset. */
@PublicEvolving
public interface DatasetConfigFacet extends LineageDatasetFacet {
Map<String, String> config();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.streaming.api.lineage;

import org.apache.flink.annotation.PublicEvolving;

import java.util.Map;

/** Builtin schema facet for dataset. */
@PublicEvolving
public interface DatasetSchemaFacet extends LineageDatasetFacet {
<T> Map<String, DatasetSchemaField<T>> fields();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.streaming.api.lineage;

import org.apache.flink.annotation.PublicEvolving;

/** Field for schema in dataset. */
@PublicEvolving
public interface DatasetSchemaField<T> {
/** The name of the field. */
String name();

/** The type of the field. */
T type();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.lineage;

import org.apache.flink.annotation.Internal;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/** Default implementation for {@link LineageGraph}. */
@Internal
public class DefaultLineageGraph implements LineageGraph {
private final List<LineageEdge> lineageEdges;
private final List<SourceLineageVertex> sources;
private final List<LineageVertex> sinks;

private DefaultLineageGraph(List<LineageEdge> lineageEdges) {
this.lineageEdges = lineageEdges;

Set<SourceLineageVertex> deduplicatedSources = new HashSet<>();
Set<LineageVertex> deduplicatedSinks = new HashSet<>();
for (LineageEdge lineageEdge : lineageEdges) {
deduplicatedSources.add(lineageEdge.source());
deduplicatedSinks.add(lineageEdge.sink());
}
this.sources = new ArrayList<>(deduplicatedSources);
this.sinks = new ArrayList<>(deduplicatedSinks);
}

@Override
public List<SourceLineageVertex> sources() {
return Collections.unmodifiableList(sources);
}

@Override
public List<LineageVertex> sinks() {
return Collections.unmodifiableList(sinks);
}

@Override
public List<LineageEdge> relations() {
return Collections.unmodifiableList(lineageEdges);
}

public static LineageGraphBuilder builder() {
return new LineageGraphBuilder();
}

/** Build the default lineage graph from {@link LineageEdge}. */
@Internal
public static class LineageGraphBuilder {
private final List<LineageEdge> lineageEdges;

private LineageGraphBuilder() {
this.lineageEdges = new ArrayList<>();
}

public LineageGraphBuilder addLineageEdge(LineageEdge lineageEdge) {
this.lineageEdges.add(lineageEdge);
return this;
}

public LineageGraphBuilder addLineageEdges(LineageEdge... lineageEdges) {
this.lineageEdges.addAll(Arrays.asList(lineageEdges));
return this;
}

public LineageGraph build() {
return new DefaultLineageGraph(lineageEdges);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.streaming.api.lineage;

import org.apache.flink.annotation.PublicEvolving;

import java.util.Map;

/** Lineage dataset represents the source or sink in the job. */
@PublicEvolving
public interface LineageDataset {
/* Name for this particular dataset. */
String name();

/* Unique name for this dataset's storage, for example, url for jdbc connector and location for lakehouse connector. */
String namespace();

/* Facets for the lineage vertex to describe the particular information of dataset, such as schema and config. */
Map<String, LineageDatasetFacet> facets();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.streaming.api.lineage;

import org.apache.flink.annotation.PublicEvolving;

/** Facet interface for dataset. */
@PublicEvolving
public interface LineageDatasetFacet {
/** Name for the facet which will be used as key in facets of LineageDataset. */
String name();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.streaming.api.lineage;

import org.apache.flink.annotation.PublicEvolving;

/** Lineage edge from source to sink. */
@PublicEvolving
public interface LineageEdge {
SourceLineageVertex source();

LineageVertex sink();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.streaming.api.lineage;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.graph.StreamGraph;

import java.util.List;

/**
* Job lineage is built according to {@link StreamGraph}, users can get sources, sinks and
* relationships from lineage and manage the relationship between jobs and tables.
*/
@PublicEvolving
public interface LineageGraph {
/* Source lineage vertex list. */
List<SourceLineageVertex> sources();

/* Sink lineage vertex list. */
List<LineageVertex> sinks();

/* lineage edges from sources to sinks. */
List<LineageEdge> relations();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.streaming.api.lineage;

import org.apache.flink.annotation.PublicEvolving;

import java.util.List;

/**
* Lineage vertex represents the connectors in lineage graph, including source {@link
* SourceLineageVertex} and sink.
*/
@PublicEvolving
public interface LineageVertex {
/* List of input (for source) or output (for sink) datasets interacted with by the connector */
List<LineageDataset> datasets();
}
Loading