Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions presto-docs/src/main/sphinx/connector/elasticsearch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ Elasticsearch Presto
``long`` ``BIGINT``
``text`` ``VARCHAR``
``date`` ``TIMESTAMP``
``ip`` ``IPADDRESS``
(others) (unsupported)
============= =============

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@

import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.RowType;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.common.type.TypeSignature;
import com.facebook.presto.elasticsearch.client.ElasticsearchClient;
import com.facebook.presto.elasticsearch.client.IndexMetadata;
import com.facebook.presto.elasticsearch.client.IndexMetadata.DateTimeType;
Expand Down Expand Up @@ -65,11 +68,13 @@ public class ElasticsearchMetadata
{
private final ElasticsearchClient client;
private final String schemaName;
private final Type ipAddressType;

@Inject
public ElasticsearchMetadata(ElasticsearchClient client, ElasticsearchConfig config)
public ElasticsearchMetadata(TypeManager typeManager, ElasticsearchClient client, ElasticsearchConfig config)
{
requireNonNull(config, "config is null");
this.ipAddressType = typeManager.getType(new TypeSignature(StandardTypes.IPADDRESS));
this.client = requireNonNull(client, "client is null");
this.schemaName = config.getDefaultSchema();
}
Expand Down Expand Up @@ -244,6 +249,8 @@ else if (type instanceof PrimitiveType) {
return BOOLEAN;
case "binary":
return VARBINARY;
case "ip":
return ipAddressType;
}
}
else if (type instanceof DateTimeType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.common.block.PageBuilderStatus;
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.RowType;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.elasticsearch.client.ElasticsearchClient;
import com.facebook.presto.elasticsearch.decoders.ArrayDecoder;
Expand All @@ -29,6 +30,7 @@
import com.facebook.presto.elasticsearch.decoders.DoubleDecoder;
import com.facebook.presto.elasticsearch.decoders.IdColumnDecoder;
import com.facebook.presto.elasticsearch.decoders.IntegerDecoder;
import com.facebook.presto.elasticsearch.decoders.IpAddressDecoder;
import com.facebook.presto.elasticsearch.decoders.RealDecoder;
import com.facebook.presto.elasticsearch.decoders.RowDecoder;
import com.facebook.presto.elasticsearch.decoders.ScoreColumnDecoder;
Expand Down Expand Up @@ -309,6 +311,9 @@ else if (type.equals(INTEGER)) {
else if (type.equals(BIGINT)) {
return new BigintDecoder(path);
}
else if (type.getTypeSignature().getBase().equals(StandardTypes.IPADDRESS)) {
return new IpAddressDecoder(path, type);
}
else if (type instanceof RowType) {
RowType rowType = (RowType) type;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.elasticsearch.decoders;

import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.spi.PrestoException;
import com.google.common.net.InetAddresses;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.elasticsearch.search.SearchHit;

import java.util.function.Supplier;

import static com.facebook.presto.elasticsearch.ElasticsearchErrorCode.ELASTICSEARCH_TYPE_MISMATCH;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_CAST_ARGUMENT;
import static io.airlift.slice.Slices.wrappedBuffer;
import static java.lang.String.format;
import static java.lang.System.arraycopy;
import static java.util.Objects.requireNonNull;

public class IpAddressDecoder
implements Decoder
{
private final String path;
private final Type ipAddressType;

public IpAddressDecoder(String path, Type type)
{
this.path = requireNonNull(path, "path is null");
this.ipAddressType = requireNonNull(type, "type is null");
}

@Override
public void decode(SearchHit hit, Supplier<Object> getter, BlockBuilder output)
{
Object value = getter.get();
if (value == null) {
output.appendNull();
}
else if (value instanceof String) {
String address = (String) value;
Slice slice = castToIpAddress(Slices.utf8Slice(address));
ipAddressType.writeSlice(output, slice);
}
else {
throw new PrestoException(ELASTICSEARCH_TYPE_MISMATCH, format("Expected a string value for field '%s' of type IP: %s [%s]", path, value, value.getClass().getSimpleName()));
}
}

// This is a copy of IpAddressOperators.castFromVarcharToIpAddress method
private Slice castToIpAddress(Slice slice)
{
byte[] address;
try {
address = InetAddresses.forString(slice.toStringUtf8()).getAddress();
}
catch (IllegalArgumentException e) {
throw new PrestoException(INVALID_CAST_ARGUMENT, "Cannot cast value to IPADDRESS: " + slice.toStringUtf8());
}

byte[] bytes;
if (address.length == 4) {
bytes = new byte[16];
bytes[10] = (byte) 0xff;
bytes[11] = (byte) 0xff;
arraycopy(address, 0, bytes, 12, 4);
}
else if (address.length == 16) {
bytes = address;
}
else {
throw new PrestoException(GENERIC_INTERNAL_ERROR, "Invalid InetAddress length: " + address.length);
}

return wrappedBuffer(bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,9 @@ public void testDataTypes()
"keyword_column", "type=keyword",
"text_column", "type=text",
"binary_column", "type=binary",
"timestamp_column", "type=date")
"timestamp_column", "type=date",
"ipv4_column", "type=ip",
"ipv6_column", "type=ip")
.get();

index(indexName, ImmutableMap.<String, Object>builder()
Expand All @@ -400,6 +402,8 @@ public void testDataTypes()
.put("text_column", "some text")
.put("binary_column", new byte[] {(byte) 0xCA, (byte) 0xFE})
.put("timestamp_column", 0)
.put("ipv4_column", "192.0.2.4")
.put("ipv6_column", "2001:db8:0:1:1:1:1:1")
.build());

embeddedElasticsearchNode.getClient()
Expand All @@ -417,11 +421,14 @@ public void testDataTypes()
"keyword_column, " +
"text_column, " +
"binary_column, " +
"timestamp_column " +
"timestamp_column, " +
"ipv4_column, " +
"ipv6_column " +
"FROM types");

MaterializedResult expected = resultBuilder(getSession(), rows.getTypes())
.row(true, 1.0f, 1.0d, 1, 1L, "cool", "some text", new byte[] {(byte) 0xCA, (byte) 0xFE}, LocalDateTime.of(1970, 1, 1, 0, 0))
.row(true, 1.0f, 1.0d, 1, 1L, "cool", "some text", new byte[] {(byte) 0xCA, (byte) 0xFE},
LocalDateTime.of(1970, 1, 1, 0, 0), "192.0.2.4", "2001:db8:0:1:1:1:1:1")
.build();

assertEquals(rows.getMaterializedRows(), expected.getMaterializedRows());
Expand All @@ -447,7 +454,9 @@ public void testFilters()
"keyword_column", "type=keyword",
"text_column", "type=text",
"binary_column", "type=binary",
"timestamp_column", "type=date")
"timestamp_column", "type=date",
"ipv4_column", "type=ip",
"ipv6_column", "type=ip")
.get();

index(indexName, ImmutableMap.<String, Object>builder()
Expand All @@ -462,6 +471,8 @@ public void testFilters()
.put("text_column", "some text")
.put("binary_column", new byte[] {(byte) 0xCA, (byte) 0xFE})
.put("timestamp_column", 1569888000000L)
.put("ipv4_column", "192.0.2.4")
.put("ipv6_column", "2001:db8:0:1:1:1:1:1")
.build());

embeddedElasticsearchNode.getClient()
Expand Down Expand Up @@ -534,6 +545,10 @@ public void testFilters()
assertQuery("SELECT count(*) FROM filter_pushdown WHERE timestamp_column = TIMESTAMP '2019-10-02 00:00:00'", "VALUES 0");
assertQuery("SELECT count(*) FROM filter_pushdown WHERE timestamp_column > TIMESTAMP '2001-01-01 00:00:00'", "VALUES 1");
assertQuery("SELECT count(*) FROM filter_pushdown WHERE timestamp_column < TIMESTAMP '2030-01-01 00:00:00'", "VALUES 1");

// ipaddress
assertQuery("SELECT count(ipv4_column) FROM filter_pushdown", "VALUES 1");
assertQuery("SELECT count(ipv6_column) FROM filter_pushdown", "VALUES 1");
}

@Test
Expand All @@ -558,7 +573,9 @@ public void testDataTypesNested()
" \"keyword_column\": { \"type\": \"keyword\" },\n" +
" \"text_column\": { \"type\": \"text\" },\n" +
" \"binary_column\": { \"type\": \"binary\" },\n" +
" \"timestamp_column\": { \"type\": \"date\" }\n" +
" \"timestamp_column\": { \"type\": \"date\" },\n" +
" \"ipv4_column\": { \"type\": \"ip\" },\n" +
" \"ipv6_column\": { \"type\": \"ip\" }\n" +
" }\n" +
" }\n" +
" }" +
Expand All @@ -578,6 +595,8 @@ public void testDataTypesNested()
.put("text_column", "some text")
.put("binary_column", new byte[] {(byte) 0xCA, (byte) 0xFE})
.put("timestamp_column", 0)
.put("ipv4_column", "192.0.2.4")
.put("ipv6_column", "2001:db8:0:1:1:1:1:1")
.build()));

embeddedElasticsearchNode.getClient()
Expand All @@ -596,11 +615,14 @@ public void testDataTypesNested()
"field.keyword_column, " +
"field.text_column, " +
"field.binary_column, " +
"field.timestamp_column " +
"field.timestamp_column, " +
"field.ipv4_column, " +
"field.ipv6_column " +
"FROM types_nested");

MaterializedResult expected = resultBuilder(getSession(), rows.getTypes())
.row(true, 1.0f, 1.0d, 1, 1L, "cool", "some text", new byte[] {(byte) 0xCA, (byte) 0xFE}, LocalDateTime.of(1970, 1, 1, 0, 0))
.row(true, 1.0f, 1.0d, 1, 1L, "cool", "some text", new byte[] {(byte) 0xCA, (byte) 0xFE},
LocalDateTime.of(1970, 1, 1, 0, 0), "192.0.2.4", "2001:db8:0:1:1:1:1:1")
.build();

assertEquals(rows.getMaterializedRows(), expected.getMaterializedRows());
Expand Down Expand Up @@ -629,7 +651,9 @@ public void testNestedTypeDataTypesNested()
" \"keyword_column\": { \"type\": \"keyword\" },\n" +
" \"text_column\": { \"type\": \"text\" },\n" +
" \"binary_column\": { \"type\": \"binary\" },\n" +
" \"timestamp_column\": { \"type\": \"date\" }\n" +
" \"timestamp_column\": { \"type\": \"date\" },\n" +
" \"ipv4_column\": { \"type\": \"ip\" },\n" +
" \"ipv6_column\": { \"type\": \"ip\" }\n" +
" }\n" +
" }\n" +
" }" +
Expand All @@ -649,6 +673,8 @@ public void testNestedTypeDataTypesNested()
.put("text_column", "some text")
.put("binary_column", new byte[] {(byte) 0xCA, (byte) 0xFE})
.put("timestamp_column", 0)
.put("ipv4_column", "192.0.2.4")
.put("ipv6_column", "2001:db8:0:1:1:1:1:1")
.build()));

embeddedElasticsearchNode.getClient()
Expand All @@ -667,12 +693,14 @@ public void testNestedTypeDataTypesNested()
"nested_field.keyword_column, " +
"nested_field.text_column, " +
"nested_field.binary_column, " +
"nested_field.timestamp_column " +
"nested_field.timestamp_column, " +
"nested_field.ipv4_column, " +
"nested_field.ipv6_column " +
"FROM nested_type_nested");

MaterializedResult expected = resultBuilder(getSession(), rows.getTypes())
.row(true, 1.0f, 1.0d, 1, 1L, "cool", "some text", new byte[] {(byte) 0xCA, (byte) 0xFE},
LocalDateTime.of(1970, 1, 1, 0, 0))
LocalDateTime.of(1970, 1, 1, 0, 0), "192.0.2.4", "2001:db8:0:1:1:1:1:1")
.build();

assertEquals(rows.getMaterializedRows(), expected.getMaterializedRows());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import static com.facebook.presto.testing.MaterializedResult.DEFAULT_PRECISION;
import static com.facebook.presto.type.IntervalDayTimeType.INTERVAL_DAY_TIME;
import static com.facebook.presto.type.IntervalYearMonthType.INTERVAL_YEAR_MONTH;
import static com.facebook.presto.type.IpAddressType.IPADDRESS;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.transform;
Expand Down Expand Up @@ -228,6 +229,9 @@ else if (INTERVAL_DAY_TIME.equals(type)) {
else if (INTERVAL_YEAR_MONTH.equals(type)) {
return new SqlIntervalYearMonth(IntervalYearMonth.parseMonths(String.valueOf(value)));
}
else if (IPADDRESS.equals(type)) {
return value;
}
else if (type instanceof ArrayType) {
return ((List<Object>) value).stream()
.map(element -> convertToRowValue(((ArrayType) type).getElementType(), element))
Expand Down