Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,94 @@ public class FirstValueFunction
private final int argumentChannel;
private final boolean ignoreNulls;

// Record a frame start with corresponding first non-null position
// or last seen null position.
// The non-null position is valid for all frames starting from the recorded frame start or later,
// and ending at the recorded non-null position or later.
private int recordedFrameStart = -1;
private int recordedValuePosition = -1;
private int recordedNullPosition = -1;

public FirstValueFunction(List<Integer> argumentChannels, boolean ignoreNulls)
{
this.argumentChannel = getOnlyElement(argumentChannels);
this.ignoreNulls = ignoreNulls;
}

@Override
public void reset()
{
recordedFrameStart = -1;
recordedValuePosition = -1;
recordedNullPosition = -1;
}

@Override
public void processRow(BlockBuilder output, int frameStart, int frameEnd, int currentPosition)
{
// empty frame
if (frameStart < 0) {
output.appendNull();
return;
}

int valuePosition = frameStart;
if (!ignoreNulls) {
windowIndex.appendTo(argumentChannel, frameStart, output);
return;
}

if (ignoreNulls) {
while (valuePosition >= 0 && valuePosition <= frameEnd) {
if (!windowIndex.isNull(argumentChannel, valuePosition)) {
break;
if (recordedFrameStart >= 0 && frameStart >= recordedFrameStart) {
// try to use the recorded non-null position
if (recordedValuePosition >= 0 && frameStart <= recordedValuePosition) {
if (frameEnd < recordedValuePosition) {
// only nulls in the frame
output.appendNull();
return;
}
// use the recorded non-null position
windowIndex.appendTo(argumentChannel, recordedValuePosition, output);
return;
}
// try to use the recorded null position
if (recordedNullPosition >= 0 && frameStart <= recordedNullPosition) {
if (frameEnd <= recordedNullPosition) {
// only nulls in the frame
output.appendNull();
return;
}

valuePosition++;
}
}

if (valuePosition > frameEnd) {
output.appendNull();
return;
// there is no recorded position or the current frame is before / after the recorded frame
// find the first non-null or last null position for current the frame and record it for future reference
// try to optimize by using the recorded null position
int valuePosition;
if (recordedFrameStart >= 0 && recordedNullPosition >= 0 && frameStart >= recordedFrameStart && frameStart <= recordedNullPosition) {
valuePosition = recordedNullPosition;
}
else {
valuePosition = frameStart;
}

recordedFrameStart = frameStart;
recordedValuePosition = -1;
recordedNullPosition = -1;

while (valuePosition >= 0 && valuePosition <= frameEnd) {
if (!windowIndex.isNull(argumentChannel, valuePosition)) {
break;
}

valuePosition++;
}

if (valuePosition > frameEnd) {
recordedNullPosition = frameEnd;
output.appendNull();
return;
}

recordedValuePosition = valuePosition;
windowIndex.appendTo(argumentChannel, valuePosition, output);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,94 @@ public class LastValueFunction
private final int argumentChannel;
private final boolean ignoreNulls;

// Record a frame end with corresponding first non-null position
// or last seen null position (going backwards).
// The non-null position is valid for all frames starting from the recorded non-null position or before
// and ending at the recorded frame end or before.
private int recordedFrameEnd = -1;
private int recordedValuePosition = -1;
private int recordedNullPosition = -1;

public LastValueFunction(List<Integer> argumentChannels, boolean ignoreNulls)
{
this.argumentChannel = getOnlyElement(argumentChannels);
this.ignoreNulls = ignoreNulls;
}

@Override
public void reset()
{
recordedFrameEnd = -1;
recordedValuePosition = -1;
recordedNullPosition = -1;
}

@Override
public void processRow(BlockBuilder output, int frameStart, int frameEnd, int currentPosition)
{
// empty frame
if (frameStart < 0) {
output.appendNull();
return;
}

int valuePosition = frameEnd;
if (!ignoreNulls) {
windowIndex.appendTo(argumentChannel, frameEnd, output);
return;
}

if (ignoreNulls) {
while (valuePosition >= frameStart) {
if (!windowIndex.isNull(argumentChannel, valuePosition)) {
break;
if (recordedFrameEnd >= 0 && frameEnd <= recordedFrameEnd) {
// try to use the recorded non-null position
if (recordedValuePosition >= 0 && frameEnd >= recordedValuePosition) {
if (frameStart > recordedValuePosition) {
// only nulls in the frame
output.appendNull();
return;
}
// use the recorded non-null position
windowIndex.appendTo(argumentChannel, recordedValuePosition, output);
return;
}
// try to use the recorded null position
if (recordedNullPosition >= 0 && frameEnd >= recordedNullPosition) {
if (frameStart >= recordedNullPosition) {
// only nulls in the frame
output.appendNull();
return;
}

valuePosition--;
}
}

if (valuePosition < frameStart) {
output.appendNull();
return;
// there is no recorded position or the current frame is before / after the recorded frame
// find the first non-null or last null position for current the frame and record it for future reference
// try to optimize by using the recorded null position
int valuePosition;
if (recordedFrameEnd >= 0 && recordedNullPosition >= 0 && frameEnd <= recordedFrameEnd && frameEnd >= recordedNullPosition) {
valuePosition = recordedNullPosition;
}
else {
valuePosition = frameEnd;
}

recordedFrameEnd = frameEnd;
recordedValuePosition = -1;
recordedNullPosition = -1;

while (valuePosition >= frameStart) {
if (!windowIndex.isNull(argumentChannel, valuePosition)) {
break;
}

valuePosition--;
}

if (valuePosition < frameStart) {
recordedNullPosition = frameStart;
output.appendNull();
return;
}

recordedValuePosition = valuePosition;
windowIndex.appendTo(argumentChannel, valuePosition, output);
}
}