Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1980,9 +1980,8 @@ private static ParsedTimestamp parseTimestamp(String value)
Optional<String> timezone = Optional.ofNullable(matcher.group("timezone"));

long picosOfSecond = 0;
int precision = 0;
if (fraction != null) {
precision = fraction.length();
int precision = fraction.length();
verify(precision <= 12, "Unsupported timestamp precision %s: %s", precision, value);
long fractionValue = Long.parseLong(fraction);
picosOfSecond = rescale(fractionValue, precision, 12);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public Page getOutput()
outputIterator = groupedTopNBuilder.buildResult();
}

Page output = null;
Page output;
if (outputIterator != null && outputIterator.hasNext()) {
// rewrite to expected column ordering
output = outputIterator.next().getColumns(outputChannels);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4341,18 +4341,13 @@ private boolean isValidTemporalType(Type type)

private PointerType toPointerType(QueryPeriod.RangeType type)
{
PointerType pointerType = null;
switch (type) {
case TIMESTAMP:
pointerType = PointerType.TEMPORAL;
break;
return PointerType.TEMPORAL;
case VERSION:
pointerType = PointerType.TARGET_ID;
break;
default:
Comment thread
findepi marked this conversation as resolved.
throw new TrinoException(NOT_SUPPORTED, format("No TravelType maps from RangeType %s.", type.name()));
return PointerType.TARGET_ID;
}
return pointerType;
throw new UnsupportedOperationException("Unsupported range type: " + type);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class ConnectorTableVersion

public ConnectorTableVersion(PointerType pointerType, Type versionType, Object version)
{
requireNonNull(pointerType, "travelType is null");
requireNonNull(pointerType, "pointerType is null");
requireNonNull(versionType, "versionType is null");
requireNonNull(version, "version is null");
this.pointerType = pointerType;
Expand All @@ -53,7 +53,7 @@ public Object getVersion()
public String toString()
{
return new StringBuilder("ConnectorTableVersion{")
.append("travelType=").append(pointerType)
.append("pointerType=").append(pointerType)
.append(", versionType=").append(versionType)
.append(", version=").append(version)
.append('}')
Expand Down
2 changes: 1 addition & 1 deletion core/trino-spi/src/main/java/io/trino/spi/type/Int128.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public static Int128 valueOf(String value)
public static Int128 valueOf(BigInteger value)
{
long low = value.longValue();
long high = 0;
long high;
try {
high = value.shiftRight(64).longValueExact();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public TimestampDecoder(String path)
public void decode(SearchHit hit, Supplier<Object> getter, BlockBuilder output)
{
DocumentField documentField = hit.getFields().get(path);
Object value = null;
Object value;

if (documentField != null) {
if (documentField.getValues().size() > 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,6 @@ public void alterTransactionalTable(HiveIdentity identity, Table table, long tra
delegate.alterTransactionalTable(identity, table, transactionId, writeId, principalPrivileges);
}
finally {
identity = updateIdentity(identity);
invalidateTable(table.getDatabaseName(), table.getTableName());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ private static Optional<SSLContext> buildSslContext(
try {
// load KeyStore if configured and get KeyManagers
KeyManager[] keyManagers = null;
KeyStore keyStore = null;
char[] keyManagerPassword = new char[0];
if (keyStorePath.isPresent()) {
KeyStore keyStore;
try {
keyStore = PemReader.loadKeyStore(keyStorePath.get(), keyStorePath.get(), keyStorePassword);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class OrcPageSource
private final FileFormatDataSourceStats stats;

// Row ID relative to all the original files of the same bucket ID before this file in lexicographic order
private Optional<Long> originalFileRowId = Optional.empty();
private final Optional<Long> originalFileRowId;

private long completedPositions;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;

Expand All @@ -24,6 +25,7 @@

import java.util.concurrent.TimeUnit;

@DefunctConfig("kinesis.checkpoint-interval")
public class KinesisConfig
{
private String defaultSchema = "default";
Expand All @@ -43,7 +45,6 @@ public class KinesisConfig
private boolean checkpointEnabled;
private long dynamoReadCapacity = 50L;
private long dynamoWriteCapacity = 10L;
private Duration checkpointInterval = new Duration(60000, TimeUnit.MILLISECONDS);
private String logicalProcessName = "process1";
private int iteratorNumber;

Expand Down Expand Up @@ -277,19 +278,6 @@ public KinesisConfig setDynamoWriteCapacity(long dynamoWriteCapacity)
return this;
}

public Duration getCheckpointInterval()
{
return checkpointInterval;
}

@Config("kinesis.checkpoint-interval")
@ConfigDescription("Intervals at which to checkpoint shard iterator details")
public KinesisConfig setCheckpointInterval(Duration checkpointInterval)
{
this.checkpointInterval = checkpointInterval;
return this;
}

public String getLogicalProcessName()
{
return logicalProcessName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ public class KinesisRecordSet
requireNonNull(kinesisConfig, "kinesisConfig is null");
long dynamoReadCapacity = kinesisConfig.getDynamoReadCapacity();
long dynamoWriteCapacity = kinesisConfig.getDynamoWriteCapacity();
long checkPointIntervalMillis = kinesisConfig.getCheckpointInterval().toMillis();
this.isLogBatches = kinesisConfig.isLogBatches();

this.clientManager = requireNonNull(clientManager, "clientManager is null");
Expand Down Expand Up @@ -159,7 +158,6 @@ public class KinesisRecordSet
split,
logicalProcessName,
curIterationNumber,
checkPointIntervalMillis,
dynamoReadCapacity,
dynamoWriteCapacity);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,20 @@ public class KinesisShardCheckpointer
private String logicalProcessName;
private int currentIterationNumber;
private KinesisClientLease kinesisClientLease;
private long checkpointIntervalMillis;
private long nextCheckpointTimeMillis;

public KinesisShardCheckpointer(
AmazonDynamoDB dynamoDBClient,
String dynamoDBTable,
KinesisSplit kinesisSplit,
String logicalProcessName,
int currentIterationNumber,
long checkpointIntervalMS,
long dynamoReadCapacity,
long dynamoWriteCapacity)
{
this(new KinesisClientLeaseManager(dynamoDBTable, dynamoDBClient),
kinesisSplit,
logicalProcessName,
currentIterationNumber,
checkpointIntervalMS,
dynamoReadCapacity,
dynamoWriteCapacity);
}
Expand All @@ -57,15 +53,13 @@ public KinesisShardCheckpointer(
KinesisSplit kinesisSplit,
String logicalProcessName,
int currentIterationNumber,
long checkpointIntervalMS,
long dynamoReadCapacity,
long dynamoWriteCapacity)
{
this.leaseManager = leaseManager;
this.kinesisSplit = kinesisSplit;
this.logicalProcessName = logicalProcessName;
this.currentIterationNumber = currentIterationNumber;
this.checkpointIntervalMillis = checkpointIntervalMS;

try {
this.leaseManager.createLeaseTableIfNotExists(dynamoReadCapacity, dynamoWriteCapacity);
Expand All @@ -82,12 +76,6 @@ public KinesisShardCheckpointer(
catch (ProvisionedThroughputException | InvalidStateException | DependencyException e) {
throw new RuntimeException(e);
}
resetNextCheckpointTime();
}

private void resetNextCheckpointTime()
{
nextCheckpointTimeMillis = System.nanoTime() + checkpointIntervalMillis * 1_000_000;
}

private String createCheckpointKey(int iterationNo)
Expand Down Expand Up @@ -117,15 +105,14 @@ public void checkpoint(String lastReadSequenceNumber)
catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
throw new RuntimeException(e);
}
resetNextCheckpointTime();
}

//return checkpoint of previous iteration if found
public String getLastReadSeqNumber()
{
String lastReadSeqNumber = null;
KinesisClientLease oldLease = null;
if (currentIterationNumber > 0) {
KinesisClientLease oldLease;
try {
oldLease = leaseManager.getLease(createCheckpointKey(currentIterationNumber - 1));
}
Expand All @@ -145,11 +132,4 @@ public String getLastReadSeqNumber()
}
return lastReadSeqNumber;
}

public void checkpointIfTimeUp(String lastReadSeqNo)
{
if (System.nanoTime() >= nextCheckpointTimeMillis) {
checkpoint(lastReadSeqNo);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public void testDefaults()
.setCheckpointEnabled(false)
.setDynamoReadCapacity(50)
.setDynamoWriteCapacity(10)
.setCheckpointInterval(new Duration(60000, TimeUnit.MILLISECONDS))
.setLogicalProcessName("process1")
.setIteratorNumber(0));
}
Expand All @@ -70,7 +69,6 @@ public void testExplicitPropertyMappings()
.put("kinesis.checkpoint-enabled", "true")
.put("kinesis.dynamo-read-capacity", "100")
.put("kinesis.dynamo-write-capacity", "20")
.put("kinesis.checkpoint-interval", "50000ms")
.put("kinesis.checkpoint-logical-name", "process")
.put("kinesis.iterator-number", "1")
.build();
Expand All @@ -93,7 +91,6 @@ public void testExplicitPropertyMappings()
.setCheckpointEnabled(true)
.setDynamoReadCapacity(100)
.setDynamoWriteCapacity(20)
.setCheckpointInterval(new Duration(50000, TimeUnit.MILLISECONDS))
.setLogicalProcessName("process")
.setIteratorNumber(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ public void clearRecords()

public static class InternalStream
{
private String streamName = "";
private String streamAmazonResourceName = "";
private final String streamName;
private final String streamAmazonResourceName;
private String streamStatus = "CREATING";
private List<InternalShard> shards = new ArrayList<>();
private int sequenceNo = 100;
Expand Down Expand Up @@ -174,11 +174,6 @@ public List<InternalShard> getShardsFrom(String afterShardId)
}
}

public void activate()
{
this.streamStatus = "ACTIVE";
}

public PutRecordResult putRecord(ByteBuffer data, String partitionKey)
{
// Create record and insert into the shards. Initially just do it
Expand All @@ -203,19 +198,12 @@ record = record.withData(data).withPartitionKey(partitionKey).withSequenceNumber

return result;
}

public void clearRecords()
{
for (InternalShard shard : this.shards) {
shard.clearRecords();
}
}
}

public static class ShardIterator
{
public String streamId = "";
public int shardIndex;
public final String streamId;
public final int shardIndex;
public int recordIndex;

public ShardIterator(String streamId, int shardIndex, int recordIndex)
Expand Down Expand Up @@ -410,14 +398,14 @@ public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest)
}

// TODO: incorporate maximum batch size (getRecordsRequest.getLimit)
GetRecordsResult result = null;
InternalStream stream = this.getStream(iterator.streamId);
if (stream == null) {
throw new AmazonClientException("Unknown stream or bad shard iterator.");
}

InternalShard shard = stream.getShards().get(iterator.shardIndex);

GetRecordsResult result;
if (iterator.recordIndex == 100) {
result = new GetRecordsResult();
List<Record> recs = shard.getRecords();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1746,10 +1746,8 @@ void withTemporaryTable(String rootName, boolean transactional, boolean isPartit
if (transactional) {
ensureTransactionalHive();
}
String tableName = null;
try (TemporaryHiveTable table = TemporaryHiveTable.temporaryHiveTable(tableName(rootName, isPartitioned, bucketingType))) {
tableName = table.getName();
testRunner.accept(tableName);
testRunner.accept(table.getName());
}
}

Expand Down