From 410c22bb1ebe6363c84f9259bc16836c805dd7a4 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 12 Jan 2022 10:52:13 +0100 Subject: [PATCH 1/6] Remove unused method --- .../kinesis/KinesisShardCheckpointer.java | 17 ----------------- .../plugin/kinesis/util/MockKinesisClient.java | 12 ------------ 2 files changed, 29 deletions(-) diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisShardCheckpointer.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisShardCheckpointer.java index 76ab6ff55f9e..de6aec05610a 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisShardCheckpointer.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisShardCheckpointer.java @@ -30,8 +30,6 @@ public class KinesisShardCheckpointer private String logicalProcessName; private int currentIterationNumber; private KinesisClientLease kinesisClientLease; - private long checkpointIntervalMillis; - private long nextCheckpointTimeMillis; public KinesisShardCheckpointer( AmazonDynamoDB dynamoDBClient, @@ -65,7 +63,6 @@ public KinesisShardCheckpointer( this.kinesisSplit = kinesisSplit; this.logicalProcessName = logicalProcessName; this.currentIterationNumber = currentIterationNumber; - this.checkpointIntervalMillis = checkpointIntervalMS; try { this.leaseManager.createLeaseTableIfNotExists(dynamoReadCapacity, dynamoWriteCapacity); @@ -82,12 +79,6 @@ public KinesisShardCheckpointer( catch (ProvisionedThroughputException | InvalidStateException | DependencyException e) { throw new RuntimeException(e); } - resetNextCheckpointTime(); - } - - private void resetNextCheckpointTime() - { - nextCheckpointTimeMillis = System.nanoTime() + checkpointIntervalMillis * 1_000_000; } private String createCheckpointKey(int iterationNo) @@ -117,7 +108,6 @@ public void checkpoint(String lastReadSequenceNumber) catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { throw new RuntimeException(e); } - resetNextCheckpointTime(); } //return checkpoint of previous iteration if found @@ -145,11 +135,4 @@ public String getLastReadSeqNumber() } return lastReadSeqNumber; } - - public void checkpointIfTimeUp(String lastReadSeqNo) - { - if (System.nanoTime() >= nextCheckpointTimeMillis) { - checkpoint(lastReadSeqNo); - } - } } diff --git a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/MockKinesisClient.java b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/MockKinesisClient.java index b1944b5888de..5433453dfab0 100644 --- a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/MockKinesisClient.java +++ b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/MockKinesisClient.java @@ -174,11 +174,6 @@ public List getShardsFrom(String afterShardId) } } - public void activate() - { - this.streamStatus = "ACTIVE"; - } - public PutRecordResult putRecord(ByteBuffer data, String partitionKey) { // Create record and insert into the shards. Initially just do it @@ -203,13 +198,6 @@ record = record.withData(data).withPartitionKey(partitionKey).withSequenceNumber return result; } - - public void clearRecords() - { - for (InternalShard shard : this.shards) { - shard.clearRecords(); - } - } } public static class ShardIterator From 4ecbfbff27a1e43cc51f21cfb7b3580815b790d6 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 12 Jan 2022 17:25:56 +0100 Subject: [PATCH 2/6] Remove unimplemented Kinesis config --- .../io/trino/plugin/kinesis/KinesisConfig.java | 16 ++-------------- .../trino/plugin/kinesis/KinesisRecordSet.java | 2 -- .../plugin/kinesis/KinesisShardCheckpointer.java | 3 --- .../trino/plugin/kinesis/TestKinesisConfig.java | 3 --- 4 files changed, 2 insertions(+), 22 deletions(-) diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisConfig.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisConfig.java index 1b30b55d8b60..36aed9e4def4 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisConfig.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisConfig.java @@ -15,6 +15,7 @@ import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; +import io.airlift.configuration.DefunctConfig; import io.airlift.units.Duration; import io.airlift.units.MinDuration; @@ -24,6 +25,7 @@ import java.util.concurrent.TimeUnit; +@DefunctConfig("kinesis.checkpoint-interval") public class KinesisConfig { private String defaultSchema = "default"; @@ -43,7 +45,6 @@ public class KinesisConfig private boolean checkpointEnabled; private long dynamoReadCapacity = 50L; private long dynamoWriteCapacity = 10L; - private Duration checkpointInterval = new Duration(60000, TimeUnit.MILLISECONDS); private String logicalProcessName = "process1"; private int iteratorNumber; @@ -277,19 +278,6 @@ public KinesisConfig setDynamoWriteCapacity(long dynamoWriteCapacity) return this; } - public Duration getCheckpointInterval() - { - return checkpointInterval; - } - - @Config("kinesis.checkpoint-interval") - @ConfigDescription("Intervals at which to checkpoint shard iterator details") - public KinesisConfig setCheckpointInterval(Duration checkpointInterval) - { - this.checkpointInterval = checkpointInterval; - return this; - } - public String getLogicalProcessName() { return logicalProcessName; diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisRecordSet.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisRecordSet.java index 55bc87ddd83a..9aa5060ae913 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisRecordSet.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisRecordSet.java @@ -115,7 +115,6 @@ public class KinesisRecordSet requireNonNull(kinesisConfig, "kinesisConfig is null"); long dynamoReadCapacity = kinesisConfig.getDynamoReadCapacity(); long dynamoWriteCapacity = kinesisConfig.getDynamoWriteCapacity(); - long checkPointIntervalMillis = kinesisConfig.getCheckpointInterval().toMillis(); this.isLogBatches = kinesisConfig.isLogBatches(); this.clientManager = requireNonNull(clientManager, "clientManager is null"); @@ -159,7 +158,6 @@ public class KinesisRecordSet split, logicalProcessName, curIterationNumber, - checkPointIntervalMillis, dynamoReadCapacity, dynamoWriteCapacity); diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisShardCheckpointer.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisShardCheckpointer.java index de6aec05610a..faf68358e079 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisShardCheckpointer.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisShardCheckpointer.java @@ -37,7 +37,6 @@ public KinesisShardCheckpointer( KinesisSplit kinesisSplit, String logicalProcessName, int currentIterationNumber, - long checkpointIntervalMS, long dynamoReadCapacity, long dynamoWriteCapacity) { @@ -45,7 +44,6 @@ public KinesisShardCheckpointer( kinesisSplit, logicalProcessName, currentIterationNumber, - checkpointIntervalMS, dynamoReadCapacity, dynamoWriteCapacity); } @@ -55,7 +53,6 @@ public KinesisShardCheckpointer( KinesisSplit kinesisSplit, String logicalProcessName, int currentIterationNumber, - long checkpointIntervalMS, long dynamoReadCapacity, long dynamoWriteCapacity) { diff --git a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/TestKinesisConfig.java b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/TestKinesisConfig.java index ae9030437dd6..7315bbf97325 100644 --- a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/TestKinesisConfig.java +++ b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/TestKinesisConfig.java @@ -44,7 +44,6 @@ public void testDefaults() .setCheckpointEnabled(false) .setDynamoReadCapacity(50) .setDynamoWriteCapacity(10) - .setCheckpointInterval(new Duration(60000, TimeUnit.MILLISECONDS)) .setLogicalProcessName("process1") .setIteratorNumber(0)); } @@ -70,7 +69,6 @@ public void testExplicitPropertyMappings() .put("kinesis.checkpoint-enabled", "true") .put("kinesis.dynamo-read-capacity", "100") .put("kinesis.dynamo-write-capacity", "20") - .put("kinesis.checkpoint-interval", "50000ms") .put("kinesis.checkpoint-logical-name", "process") .put("kinesis.iterator-number", "1") .build(); @@ -93,7 +91,6 @@ public void testExplicitPropertyMappings() .setCheckpointEnabled(true) .setDynamoReadCapacity(100) .setDynamoWriteCapacity(20) - .setCheckpointInterval(new Duration(50000, TimeUnit.MILLISECONDS)) .setLogicalProcessName("process") .setIteratorNumber(1); From 43bd7b97b1484bd91d797470ff9b415521b5e0f7 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 12 Jan 2022 10:50:50 +0100 Subject: [PATCH 3/6] Remove dead variable writes --- .../java/io/trino/jdbc/AbstractTrinoResultSet.java | 3 +-- .../java/io/trino/operator/TopNRankingOperator.java | 2 +- .../src/main/java/io/trino/spi/type/Int128.java | 2 +- .../elasticsearch/decoders/TimestampDecoder.java | 2 +- .../hive/metastore/cache/CachingHiveMetastore.java | 1 - .../thrift/DefaultThriftMetastoreClientFactory.java | 2 +- .../java/io/trino/plugin/hive/orc/OrcPageSource.java | 2 +- .../trino/plugin/kinesis/KinesisShardCheckpointer.java | 2 +- .../trino/plugin/kinesis/util/MockKinesisClient.java | 10 +++++----- .../tests/product/hive/TestHiveTransactionalTable.java | 4 +--- 10 files changed, 13 insertions(+), 17 deletions(-) diff --git a/client/trino-jdbc/src/main/java/io/trino/jdbc/AbstractTrinoResultSet.java b/client/trino-jdbc/src/main/java/io/trino/jdbc/AbstractTrinoResultSet.java index d11a5f0d1d22..ec587074a46f 100644 --- a/client/trino-jdbc/src/main/java/io/trino/jdbc/AbstractTrinoResultSet.java +++ b/client/trino-jdbc/src/main/java/io/trino/jdbc/AbstractTrinoResultSet.java @@ -1980,9 +1980,8 @@ private static ParsedTimestamp parseTimestamp(String value) Optional timezone = Optional.ofNullable(matcher.group("timezone")); long picosOfSecond = 0; - int precision = 0; if (fraction != null) { - precision = fraction.length(); + int precision = fraction.length(); verify(precision <= 12, "Unsupported timestamp precision %s: %s", precision, value); long fractionValue = Long.parseLong(fraction); picosOfSecond = rescale(fractionValue, precision, 12); diff --git a/core/trino-main/src/main/java/io/trino/operator/TopNRankingOperator.java b/core/trino-main/src/main/java/io/trino/operator/TopNRankingOperator.java index 33ac3706e54b..31c2b4d37f2a 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TopNRankingOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/TopNRankingOperator.java @@ -367,7 +367,7 @@ public Page getOutput() outputIterator = groupedTopNBuilder.buildResult(); } - Page output = null; + Page output; if (outputIterator != null && outputIterator.hasNext()) { // rewrite to expected column ordering output = outputIterator.next().getColumns(outputChannels); diff --git a/core/trino-spi/src/main/java/io/trino/spi/type/Int128.java b/core/trino-spi/src/main/java/io/trino/spi/type/Int128.java index b5c6e2de5d01..509c40de635c 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/type/Int128.java +++ b/core/trino-spi/src/main/java/io/trino/spi/type/Int128.java @@ -120,7 +120,7 @@ public static Int128 valueOf(String value) public static Int128 valueOf(BigInteger value) { long low = value.longValue(); - long high = 0; + long high; try { high = value.shiftRight(64).longValueExact(); } diff --git a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/decoders/TimestampDecoder.java b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/decoders/TimestampDecoder.java index 3e730df56925..9248291826e0 100644 --- a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/decoders/TimestampDecoder.java +++ b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/decoders/TimestampDecoder.java @@ -49,7 +49,7 @@ public TimestampDecoder(String path) public void decode(SearchHit hit, Supplier getter, BlockBuilder output) { DocumentField documentField = hit.getFields().get(path); - Object value = null; + Object value; if (documentField != null) { if (documentField.getValues().size() > 1) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java index 6774eae6690c..08bc9c59209f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java @@ -1029,7 +1029,6 @@ public void alterTransactionalTable(HiveIdentity identity, Table table, long tra delegate.alterTransactionalTable(identity, table, transactionId, writeId, principalPrivileges); } finally { - identity = updateIdentity(identity); invalidateTable(table.getDatabaseName(), table.getTableName()); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/DefaultThriftMetastoreClientFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/DefaultThriftMetastoreClientFactory.java index b2816f0d4d13..8e8cd0ab44fc 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/DefaultThriftMetastoreClientFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/DefaultThriftMetastoreClientFactory.java @@ -124,9 +124,9 @@ private static Optional buildSslContext( try { // load KeyStore if configured and get KeyManagers KeyManager[] keyManagers = null; - KeyStore keyStore = null; char[] keyManagerPassword = new char[0]; if (keyStorePath.isPresent()) { + KeyStore keyStore; try { keyStore = PemReader.loadKeyStore(keyStorePath.get(), keyStorePath.get(), keyStorePassword); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSource.java index 8b32a9ad32eb..94d2c90a1e44 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSource.java @@ -77,7 +77,7 @@ public class OrcPageSource private final FileFormatDataSourceStats stats; // Row ID relative to all the original files of the same bucket ID before this file in lexicographic order - private Optional originalFileRowId = Optional.empty(); + private final Optional originalFileRowId; private long completedPositions; diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisShardCheckpointer.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisShardCheckpointer.java index faf68358e079..f07718b049b9 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisShardCheckpointer.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisShardCheckpointer.java @@ -111,8 +111,8 @@ public void checkpoint(String lastReadSequenceNumber) public String getLastReadSeqNumber() { String lastReadSeqNumber = null; - KinesisClientLease oldLease = null; if (currentIterationNumber > 0) { + KinesisClientLease oldLease; try { oldLease = leaseManager.getLease(createCheckpointKey(currentIterationNumber - 1)); } diff --git a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/MockKinesisClient.java b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/MockKinesisClient.java index 5433453dfab0..e53ef407d41c 100644 --- a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/MockKinesisClient.java +++ b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/MockKinesisClient.java @@ -111,8 +111,8 @@ public void clearRecords() public static class InternalStream { - private String streamName = ""; - private String streamAmazonResourceName = ""; + private final String streamName; + private final String streamAmazonResourceName; private String streamStatus = "CREATING"; private List shards = new ArrayList<>(); private int sequenceNo = 100; @@ -202,8 +202,8 @@ record = record.withData(data).withPartitionKey(partitionKey).withSequenceNumber public static class ShardIterator { - public String streamId = ""; - public int shardIndex; + public final String streamId; + public final int shardIndex; public int recordIndex; public ShardIterator(String streamId, int shardIndex, int recordIndex) @@ -398,7 +398,6 @@ public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) } // TODO: incorporate maximum batch size (getRecordsRequest.getLimit) - GetRecordsResult result = null; InternalStream stream = this.getStream(iterator.streamId); if (stream == null) { throw new AmazonClientException("Unknown stream or bad shard iterator."); @@ -406,6 +405,7 @@ public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) InternalShard shard = stream.getShards().get(iterator.shardIndex); + GetRecordsResult result; if (iterator.recordIndex == 100) { result = new GetRecordsResult(); List recs = shard.getRecords(); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java index c55c9adb514c..1c42f1b42c45 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java @@ -1746,10 +1746,8 @@ void withTemporaryTable(String rootName, boolean transactional, boolean isPartit if (transactional) { ensureTransactionalHive(); } - String tableName = null; try (TemporaryHiveTable table = TemporaryHiveTable.temporaryHiveTable(tableName(rootName, isPartitioned, bucketingType))) { - tableName = table.getName(); - testRunner.accept(tableName); + testRunner.accept(table.getName()); } } From eae5dc0099268165f48606fbeaefb761818e3e72 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 12 Jan 2022 10:56:19 +0100 Subject: [PATCH 4/6] Fix typo --- .../java/io/trino/spi/connector/ConnectorTableVersion.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorTableVersion.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorTableVersion.java index 0b7df426f456..e295ce33b6d6 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorTableVersion.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorTableVersion.java @@ -26,7 +26,7 @@ public class ConnectorTableVersion public ConnectorTableVersion(PointerType pointerType, Type versionType, Object version) { - requireNonNull(pointerType, "travelType is null"); + requireNonNull(pointerType, "pointerType is null"); requireNonNull(versionType, "versionType is null"); requireNonNull(version, "version is null"); this.pointerType = pointerType; @@ -53,7 +53,7 @@ public Object getVersion() public String toString() { return new StringBuilder("ConnectorTableVersion{") - .append("travelType=").append(pointerType) + .append("pointerType=").append(pointerType) .append(", versionType=").append(versionType) .append(", version=").append(version) .append('}') From b8a5f8108db1959c22540bee0031a71225b296c9 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 12 Jan 2022 10:57:53 +0100 Subject: [PATCH 5/6] Remove default branch from enum switch This lets error-prone ensure the code is updated when enum changes. --- .../java/io/trino/sql/analyzer/StatementAnalyzer.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java index 5589b63240dc..6d6438a1eb73 100644 --- a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java +++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java @@ -4341,18 +4341,13 @@ private boolean isValidTemporalType(Type type) private PointerType toPointerType(QueryPeriod.RangeType type) { - PointerType pointerType = null; switch (type) { case TIMESTAMP: - pointerType = PointerType.TEMPORAL; - break; + return PointerType.TEMPORAL; case VERSION: - pointerType = PointerType.TARGET_ID; - break; - default: - throw new TrinoException(NOT_SUPPORTED, format("No TravelType maps from RangeType %s.", type.name())); + return PointerType.TARGET_ID; } - return pointerType; + throw new UnsupportedOperationException("Unsupported range type: " + type); } } From 6842281780e3938d072bd9aae852b3950c6421dd Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 12 Jan 2022 22:31:00 +0100 Subject: [PATCH 6/6] empty