diff --git a/presto-docs/src/main/sphinx/connector/elasticsearch.rst b/presto-docs/src/main/sphinx/connector/elasticsearch.rst index 52441fbf1ab91..f1230ddf91a85 100644 --- a/presto-docs/src/main/sphinx/connector/elasticsearch.rst +++ b/presto-docs/src/main/sphinx/connector/elasticsearch.rst @@ -203,6 +203,7 @@ Elasticsearch Presto ``long`` ``BIGINT`` ``text`` ``VARCHAR`` ``date`` ``TIMESTAMP`` +``ip`` ``IPADDRESS`` (others) (unsupported) ============= ============= diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchMetadata.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchMetadata.java index 0e9f6e2c0a823..f148fa4cbbc17 100644 --- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchMetadata.java +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchMetadata.java @@ -15,7 +15,10 @@ import com.facebook.presto.common.type.ArrayType; import com.facebook.presto.common.type.RowType; +import com.facebook.presto.common.type.StandardTypes; import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.common.type.TypeSignature; import com.facebook.presto.elasticsearch.client.ElasticsearchClient; import com.facebook.presto.elasticsearch.client.IndexMetadata; import com.facebook.presto.elasticsearch.client.IndexMetadata.DateTimeType; @@ -65,11 +68,13 @@ public class ElasticsearchMetadata { private final ElasticsearchClient client; private final String schemaName; + private final Type ipAddressType; @Inject - public ElasticsearchMetadata(ElasticsearchClient client, ElasticsearchConfig config) + public ElasticsearchMetadata(TypeManager typeManager, ElasticsearchClient client, ElasticsearchConfig config) { requireNonNull(config, "config is null"); + this.ipAddressType = typeManager.getType(new TypeSignature(StandardTypes.IPADDRESS)); this.client = requireNonNull(client, "client is null"); this.schemaName = config.getDefaultSchema(); } @@ -244,6 +249,8 @@ else if (type instanceof PrimitiveType) { return BOOLEAN; case "binary": return VARBINARY; + case "ip": + return ipAddressType; } } else if (type instanceof DateTimeType) { diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ScanQueryPageSource.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ScanQueryPageSource.java index 63b42433bfcd9..c1672adc6f2d8 100644 --- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ScanQueryPageSource.java +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ScanQueryPageSource.java @@ -20,6 +20,7 @@ import com.facebook.presto.common.block.PageBuilderStatus; import com.facebook.presto.common.type.ArrayType; import com.facebook.presto.common.type.RowType; +import com.facebook.presto.common.type.StandardTypes; import com.facebook.presto.common.type.Type; import com.facebook.presto.elasticsearch.client.ElasticsearchClient; import com.facebook.presto.elasticsearch.decoders.ArrayDecoder; @@ -29,6 +30,7 @@ import com.facebook.presto.elasticsearch.decoders.DoubleDecoder; import com.facebook.presto.elasticsearch.decoders.IdColumnDecoder; import com.facebook.presto.elasticsearch.decoders.IntegerDecoder; +import com.facebook.presto.elasticsearch.decoders.IpAddressDecoder; import com.facebook.presto.elasticsearch.decoders.RealDecoder; import com.facebook.presto.elasticsearch.decoders.RowDecoder; import com.facebook.presto.elasticsearch.decoders.ScoreColumnDecoder; @@ -309,6 +311,9 @@ else if (type.equals(INTEGER)) { else if (type.equals(BIGINT)) { return new BigintDecoder(path); } + else if (type.getTypeSignature().getBase().equals(StandardTypes.IPADDRESS)) { + return new IpAddressDecoder(path, type); + } else if (type instanceof RowType) { RowType rowType = (RowType) type; diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/decoders/IpAddressDecoder.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/decoders/IpAddressDecoder.java new file mode 100644 index 0000000000000..19731e63576ba --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/decoders/IpAddressDecoder.java @@ -0,0 +1,90 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.elasticsearch.decoders; + +import com.facebook.presto.common.block.BlockBuilder; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.spi.PrestoException; +import com.google.common.net.InetAddresses; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import org.elasticsearch.search.SearchHit; + +import java.util.function.Supplier; + +import static com.facebook.presto.elasticsearch.ElasticsearchErrorCode.ELASTICSEARCH_TYPE_MISMATCH; +import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; +import static com.facebook.presto.spi.StandardErrorCode.INVALID_CAST_ARGUMENT; +import static io.airlift.slice.Slices.wrappedBuffer; +import static java.lang.String.format; +import static java.lang.System.arraycopy; +import static java.util.Objects.requireNonNull; + +public class IpAddressDecoder + implements Decoder +{ + private final String path; + private final Type ipAddressType; + + public IpAddressDecoder(String path, Type type) + { + this.path = requireNonNull(path, "path is null"); + this.ipAddressType = requireNonNull(type, "type is null"); + } + + @Override + public void decode(SearchHit hit, Supplier getter, BlockBuilder output) + { + Object value = getter.get(); + if (value == null) { + output.appendNull(); + } + else if (value instanceof String) { + String address = (String) value; + Slice slice = castToIpAddress(Slices.utf8Slice(address)); + ipAddressType.writeSlice(output, slice); + } + else { + throw new PrestoException(ELASTICSEARCH_TYPE_MISMATCH, format("Expected a string value for field '%s' of type IP: %s [%s]", path, value, value.getClass().getSimpleName())); + } + } + + // This is a copy of IpAddressOperators.castFromVarcharToIpAddress method + private Slice castToIpAddress(Slice slice) + { + byte[] address; + try { + address = InetAddresses.forString(slice.toStringUtf8()).getAddress(); + } + catch (IllegalArgumentException e) { + throw new PrestoException(INVALID_CAST_ARGUMENT, "Cannot cast value to IPADDRESS: " + slice.toStringUtf8()); + } + + byte[] bytes; + if (address.length == 4) { + bytes = new byte[16]; + bytes[10] = (byte) 0xff; + bytes[11] = (byte) 0xff; + arraycopy(address, 0, bytes, 12, 4); + } + else if (address.length == 16) { + bytes = address; + } + else { + throw new PrestoException(GENERIC_INTERNAL_ERROR, "Invalid InetAddress length: " + address.length); + } + + return wrappedBuffer(bytes); + } +} diff --git a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestElasticsearchIntegrationSmokeTest.java b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestElasticsearchIntegrationSmokeTest.java index 3ba850da7a186..8eee2b88f96a0 100644 --- a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestElasticsearchIntegrationSmokeTest.java +++ b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestElasticsearchIntegrationSmokeTest.java @@ -387,7 +387,9 @@ public void testDataTypes() "keyword_column", "type=keyword", "text_column", "type=text", "binary_column", "type=binary", - "timestamp_column", "type=date") + "timestamp_column", "type=date", + "ipv4_column", "type=ip", + "ipv6_column", "type=ip") .get(); index(indexName, ImmutableMap.builder() @@ -400,6 +402,8 @@ public void testDataTypes() .put("text_column", "some text") .put("binary_column", new byte[] {(byte) 0xCA, (byte) 0xFE}) .put("timestamp_column", 0) + .put("ipv4_column", "192.0.2.4") + .put("ipv6_column", "2001:db8:0:1:1:1:1:1") .build()); embeddedElasticsearchNode.getClient() @@ -417,11 +421,14 @@ public void testDataTypes() "keyword_column, " + "text_column, " + "binary_column, " + - "timestamp_column " + + "timestamp_column, " + + "ipv4_column, " + + "ipv6_column " + "FROM types"); MaterializedResult expected = resultBuilder(getSession(), rows.getTypes()) - .row(true, 1.0f, 1.0d, 1, 1L, "cool", "some text", new byte[] {(byte) 0xCA, (byte) 0xFE}, LocalDateTime.of(1970, 1, 1, 0, 0)) + .row(true, 1.0f, 1.0d, 1, 1L, "cool", "some text", new byte[] {(byte) 0xCA, (byte) 0xFE}, + LocalDateTime.of(1970, 1, 1, 0, 0), "192.0.2.4", "2001:db8:0:1:1:1:1:1") .build(); assertEquals(rows.getMaterializedRows(), expected.getMaterializedRows()); @@ -447,7 +454,9 @@ public void testFilters() "keyword_column", "type=keyword", "text_column", "type=text", "binary_column", "type=binary", - "timestamp_column", "type=date") + "timestamp_column", "type=date", + "ipv4_column", "type=ip", + "ipv6_column", "type=ip") .get(); index(indexName, ImmutableMap.builder() @@ -462,6 +471,8 @@ public void testFilters() .put("text_column", "some text") .put("binary_column", new byte[] {(byte) 0xCA, (byte) 0xFE}) .put("timestamp_column", 1569888000000L) + .put("ipv4_column", "192.0.2.4") + .put("ipv6_column", "2001:db8:0:1:1:1:1:1") .build()); embeddedElasticsearchNode.getClient() @@ -534,6 +545,10 @@ public void testFilters() assertQuery("SELECT count(*) FROM filter_pushdown WHERE timestamp_column = TIMESTAMP '2019-10-02 00:00:00'", "VALUES 0"); assertQuery("SELECT count(*) FROM filter_pushdown WHERE timestamp_column > TIMESTAMP '2001-01-01 00:00:00'", "VALUES 1"); assertQuery("SELECT count(*) FROM filter_pushdown WHERE timestamp_column < TIMESTAMP '2030-01-01 00:00:00'", "VALUES 1"); + + // ipaddress + assertQuery("SELECT count(ipv4_column) FROM filter_pushdown", "VALUES 1"); + assertQuery("SELECT count(ipv6_column) FROM filter_pushdown", "VALUES 1"); } @Test @@ -558,7 +573,9 @@ public void testDataTypesNested() " \"keyword_column\": { \"type\": \"keyword\" },\n" + " \"text_column\": { \"type\": \"text\" },\n" + " \"binary_column\": { \"type\": \"binary\" },\n" + - " \"timestamp_column\": { \"type\": \"date\" }\n" + + " \"timestamp_column\": { \"type\": \"date\" },\n" + + " \"ipv4_column\": { \"type\": \"ip\" },\n" + + " \"ipv6_column\": { \"type\": \"ip\" }\n" + " }\n" + " }\n" + " }" + @@ -578,6 +595,8 @@ public void testDataTypesNested() .put("text_column", "some text") .put("binary_column", new byte[] {(byte) 0xCA, (byte) 0xFE}) .put("timestamp_column", 0) + .put("ipv4_column", "192.0.2.4") + .put("ipv6_column", "2001:db8:0:1:1:1:1:1") .build())); embeddedElasticsearchNode.getClient() @@ -596,11 +615,14 @@ public void testDataTypesNested() "field.keyword_column, " + "field.text_column, " + "field.binary_column, " + - "field.timestamp_column " + + "field.timestamp_column, " + + "field.ipv4_column, " + + "field.ipv6_column " + "FROM types_nested"); MaterializedResult expected = resultBuilder(getSession(), rows.getTypes()) - .row(true, 1.0f, 1.0d, 1, 1L, "cool", "some text", new byte[] {(byte) 0xCA, (byte) 0xFE}, LocalDateTime.of(1970, 1, 1, 0, 0)) + .row(true, 1.0f, 1.0d, 1, 1L, "cool", "some text", new byte[] {(byte) 0xCA, (byte) 0xFE}, + LocalDateTime.of(1970, 1, 1, 0, 0), "192.0.2.4", "2001:db8:0:1:1:1:1:1") .build(); assertEquals(rows.getMaterializedRows(), expected.getMaterializedRows()); @@ -629,7 +651,9 @@ public void testNestedTypeDataTypesNested() " \"keyword_column\": { \"type\": \"keyword\" },\n" + " \"text_column\": { \"type\": \"text\" },\n" + " \"binary_column\": { \"type\": \"binary\" },\n" + - " \"timestamp_column\": { \"type\": \"date\" }\n" + + " \"timestamp_column\": { \"type\": \"date\" },\n" + + " \"ipv4_column\": { \"type\": \"ip\" },\n" + + " \"ipv6_column\": { \"type\": \"ip\" }\n" + " }\n" + " }\n" + " }" + @@ -649,6 +673,8 @@ public void testNestedTypeDataTypesNested() .put("text_column", "some text") .put("binary_column", new byte[] {(byte) 0xCA, (byte) 0xFE}) .put("timestamp_column", 0) + .put("ipv4_column", "192.0.2.4") + .put("ipv6_column", "2001:db8:0:1:1:1:1:1") .build())); embeddedElasticsearchNode.getClient() @@ -667,12 +693,14 @@ public void testNestedTypeDataTypesNested() "nested_field.keyword_column, " + "nested_field.text_column, " + "nested_field.binary_column, " + - "nested_field.timestamp_column " + + "nested_field.timestamp_column, " + + "nested_field.ipv4_column, " + + "nested_field.ipv6_column " + "FROM nested_type_nested"); MaterializedResult expected = resultBuilder(getSession(), rows.getTypes()) .row(true, 1.0f, 1.0d, 1, 1L, "cool", "some text", new byte[] {(byte) 0xCA, (byte) 0xFE}, - LocalDateTime.of(1970, 1, 1, 0, 0)) + LocalDateTime.of(1970, 1, 1, 0, 0), "192.0.2.4", "2001:db8:0:1:1:1:1:1") .build(); assertEquals(rows.getMaterializedRows(), expected.getMaterializedRows()); diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/TestingPrestoClient.java b/presto-tests/src/main/java/com/facebook/presto/tests/TestingPrestoClient.java index d2831666cfeaf..7b4029471be6c 100644 --- a/presto-tests/src/main/java/com/facebook/presto/tests/TestingPrestoClient.java +++ b/presto-tests/src/main/java/com/facebook/presto/tests/TestingPrestoClient.java @@ -70,6 +70,7 @@ import static com.facebook.presto.testing.MaterializedResult.DEFAULT_PRECISION; import static com.facebook.presto.type.IntervalDayTimeType.INTERVAL_DAY_TIME; import static com.facebook.presto.type.IntervalYearMonthType.INTERVAL_YEAR_MONTH; +import static com.facebook.presto.type.IpAddressType.IPADDRESS; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Iterables.transform; @@ -228,6 +229,9 @@ else if (INTERVAL_DAY_TIME.equals(type)) { else if (INTERVAL_YEAR_MONTH.equals(type)) { return new SqlIntervalYearMonth(IntervalYearMonth.parseMonths(String.valueOf(value))); } + else if (IPADDRESS.equals(type)) { + return value; + } else if (type instanceof ArrayType) { return ((List) value).stream() .map(element -> convertToRowValue(((ArrayType) type).getElementType(), element))