Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental Maintenance & PII Lineage Changes #546

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/**
* Copyright 2023-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
{% macro last_n_hours(hours=8) %}
{%- set current_datetime_pst = in_dbt_utils.logical_date(timezone="US/Pacific") -%}
{%- set target_datetime_pst = current_datetime_pst - modules.datetime.timedelta(hours=hours) -%}
"{{ target_datetime_pst.strftime('%Y-%m-%d-%H') }}"
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Copyright 2022-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.incremental;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;


public class PIIContext {
private final List<String> inputPIIFields;
private final List<String> outputPIIFields;
private final ConcurrentHashMap<String, String> fieldToFullyQualifiedMap;

public PIIContext(List<String> inputPIIFields) {
this.inputPIIFields = inputPIIFields.stream().map(String::toLowerCase).collect(Collectors.toList());
this.outputPIIFields = new ArrayList<>();
this.fieldToFullyQualifiedMap = new ConcurrentHashMap<>();
}

public List<String> getInputPIIFields() {
return inputPIIFields;
}

public List<String> getOutputPIIFields() {
return outputPIIFields;
}

public void addOutputPIIField(String field) {
outputPIIFields.add(field);
}

public void addFieldToFullyQualifiedMap(String field, String fullyQualifiedField) {

// Check if the field already exists with the same fully qualified value
if (fieldToFullyQualifiedMap.containsKey(field)
&& fieldToFullyQualifiedMap.get(field).equals(fullyQualifiedField)) {
return;
}
// If the field exists but with a different fully qualified field, resolve the conflict
if (fieldToFullyQualifiedMap.containsKey(field)) {
String[] fieldParts = field.split("\\.");
for (int i = 0; i < 10; i++) { // Infinite loop, explicitly broken when a unique alias is found
final int currentIndex = i; // Effectively final variable for the lambda
String newField = IntStream.range(0, fieldParts.length)
.mapToObj(
j -> j == fieldParts.length - 1 && fieldParts.length > 1 ? fieldParts[j] : fieldParts[j] + currentIndex)
.collect(Collectors.joining("."));
// If the generated alias is not already in the map, add it and return
if (!fieldToFullyQualifiedMap.containsKey(newField)) {
fieldToFullyQualifiedMap.put(newField, fullyQualifiedField);
return;
}
}
throw new IllegalStateException("Could not resolve the conflict for field: " + field);
}
// Add the field to the map if it doesn't already exist
fieldToFullyQualifiedMap.put(field, fullyQualifiedField);
}

public Map<String, String> getFieldToFullyQualifiedMap() {
return fieldToFullyQualifiedMap;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/**
* Copyright 2022-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.incremental;

import java.util.ArrayList;
import java.util.List;

import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
import org.apache.calcite.rel.core.TableFunctionScan;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalCorrelate;
import org.apache.calcite.rel.logical.LogicalExchange;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalIntersect;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalMatch;
import org.apache.calcite.rel.logical.LogicalMinus;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.logical.LogicalUnion;
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexFieldAccess;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;


public class PIIFieldPropagator implements RelShuttle {
private final PIIContext context;

PIIFieldPropagator(PIIContext context) {
this.context = context;
}

@Override
public RelNode visit(LogicalProject project) {
List<RelNode> inputs = project.getInputs();
inputs.forEach(input -> input.accept(this));
propagatePIILineage(project);
return project;
}

private void propagatePIILineage(RelNode relNode) {
if (relNode instanceof LogicalProject) {
LogicalProject project = (LogicalProject) relNode;
List<RexNode> projections = project.getProjects();
List<String> outputFields = project.getRowType().getFieldNames();
List<String> inputFields = project.getInput().getRowType().getFieldNames();
for (int i = 0; i < projections.size(); i++) {
String outputField = outputFields.get(i);
RexNode rexNode = projections.get(i);
if (rexNode instanceof RexInputRef) {
String inputField = inputFields.get(((RexInputRef) rexNode).getIndex());
String fullyQualifiedField = context.getFieldToFullyQualifiedMap().get(inputField);
if (context.getInputPIIFields().contains(fullyQualifiedField.toLowerCase())
&& !context.getOutputPIIFields().contains(outputField)) {
context.addOutputPIIField(outputField);
}
} else if (rexNode instanceof RexFieldAccess) {
RexFieldAccess fieldAccess = (RexFieldAccess) rexNode;
String flattenedFieldPath = resolveNestedField(fieldAccess, inputFields);
String fullyQualifiedField = context.getFieldToFullyQualifiedMap().get(flattenedFieldPath);
if (context.getInputPIIFields().contains(fullyQualifiedField.toLowerCase())
&& !context.getOutputPIIFields().contains(outputField)) {
context.addOutputPIIField(outputField);
}
} else if (rexNode instanceof RexCall) {
RexCall call = (RexCall) rexNode;
if (call.getOperands().get(0) instanceof RexInputRef) {
String inputField = inputFields.get(((RexInputRef) call.getOperands().get(0)).getIndex());
String fullyQualifiedField = context.getFieldToFullyQualifiedMap().get(inputField);
if (context.getInputPIIFields().contains(fullyQualifiedField.toLowerCase())
&& !context.getOutputPIIFields().contains(outputField)) {
context.addOutputPIIField(outputField);
}
}
} else {
System.out.println("Unhandled RexNode type: " + rexNode.getClass().getName());
}
}
} else {
relNode.accept(this);
}
}

// Recursive function to resolve nested fields
private String resolveNestedField(RexFieldAccess fieldAccess, List<String> inputFields) {
// Step 1: Get the field name of the current access (e.g., 'memberId')
String fieldName = fieldAccess.getField().getName();
// Step 2: Get the reference (the parent field, e.g., '$0')
RexNode referenceNode = fieldAccess.getReferenceExpr();
// Step 3: If the reference node is a RexInputRef, resolve the parent path recursively
if (referenceNode instanceof RexInputRef) {
String parentField = inputFields.get(((RexInputRef) referenceNode).getIndex());
// If we hit the root, return the full path
return parentField + "." + fieldName;
} else if (referenceNode instanceof RexFieldAccess) {
// If we encounter another RexFieldAccess, recurse to resolve it
return resolveNestedField((RexFieldAccess) referenceNode, inputFields) + "." + fieldName;
} else {
throw new IllegalArgumentException("Unhandled reference node type: " + referenceNode.getClass().getName());
}
}

private String getFullyQualifiedTableName(RelNode relNode) {
if (relNode instanceof LogicalTableScan) {
LogicalTableScan tableScan = (LogicalTableScan) relNode;
String[] tableParts = tableScan.getTable().getQualifiedName().toArray(new String[0]);
if (tableParts.length == 1) {
return tableParts[0];
}
return String.join(".", tableParts[tableParts.length - 2], tableParts[tableParts.length - 1]);
} else if (relNode instanceof RelSubset) {
return getFullyQualifiedTableName(((RelSubset) relNode).getOriginal());
} else {
}
throw new IllegalArgumentException("Unable to extract fully qualified table name");
}

@Override
public RelNode visit(TableScan tableScan) {
String tableAlias = getFullyQualifiedTableName(tableScan);
List<String> flattenedFields = flattenFields(tableScan.getRowType(), "");
for (String field : flattenedFields) {
context.addFieldToFullyQualifiedMap(field, tableAlias + "." + field);
}
return tableScan;
}

public void visit(LogicalTableScan logicalTableScan) {
String tableAlias = getFullyQualifiedTableName(logicalTableScan);
List<String> flattenedFields = flattenFields(logicalTableScan.getRowType(), "");
for (String field : flattenedFields) {
context.addFieldToFullyQualifiedMap(field, tableAlias + "." + field);
}
}

@Override
public RelNode visit(TableFunctionScan scan) {
return scan;
}

@Override
public RelNode visit(LogicalValues values) {
return values;
}

@Override
public RelNode visit(LogicalFilter filter) {
return filter;
}

@Override
public RelNode visit(LogicalJoin join) {
// Check the left input
RelNode left = join.getLeft();
if (left instanceof LogicalTableScan) {
visit((LogicalTableScan) left);
} else {
// Optionally handle deeper traversal
left.accept(this);
}

// Check the right input
RelNode right = join.getRight();
if (right instanceof LogicalTableScan) {
visit((LogicalTableScan) right);
} else {
// Optionally handle deeper traversal
right.accept(this);
}
return join;
}

@Override
public RelNode visit(LogicalCorrelate correlate) {
return correlate;
}

@Override
public RelNode visit(LogicalUnion union) {
return union;
}

@Override
public RelNode visit(LogicalIntersect intersect) {
return intersect;
}

@Override
public RelNode visit(LogicalMinus minus) {
return minus;
}

@Override
public RelNode visit(LogicalAggregate aggregate) {
return aggregate;
}

@Override
public RelNode visit(LogicalMatch match) {
return match;
}

@Override
public RelNode visit(LogicalSort sort) {
return sort;
}

@Override
public RelNode visit(LogicalExchange exchange) {
return exchange;
}

@Override
public RelNode visit(RelNode other) {
return other;
}

private List<String> flattenFields(RelDataType relDataType, String prefix) {
List<String> flatFields = new ArrayList<>();
for (RelDataTypeField field : relDataType.getFieldList()) {
String fieldName = field.getName();
RelDataType fieldType = field.getType();
// Build the fully qualified field path
String fullPath = prefix.isEmpty() ? fieldName : prefix + "." + fieldName;
if (fieldType.isStruct()) {
// Recursively flatten nested STRUCT fields
flatFields.addAll(flattenFields(fieldType, fullPath));
} else {
// Add the fully qualified field path
flatFields.add(fullPath);
}
}
return flatFields;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/**
* Copyright 2022-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.incremental;

import org.apache.calcite.rel.RelShuttle;


public class PIIFieldPropagatorFactory {
public static RelShuttle createPropagator(PIIContext context) {
return new PIIFieldPropagator(context);
}
}
Loading
Loading