diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml index 04d75304468..0a8e77c0098 100644 --- a/elasticsearch/pom.xml +++ b/elasticsearch/pom.xml @@ -33,9 +33,13 @@ Zeppelin: Elasticsearch interpreter - 2.3.3 + **/Elasticsearch5Connector.java + **/Elasticsearch5InterpreterTest.java + 2.4.1 18.0 0.1.6 + 1.7 + 1.7 @@ -46,12 +50,6 @@ provided - - org.elasticsearch - elasticsearch - ${elasticsearch.version} - - com.google.guava guava @@ -63,7 +61,7 @@ json-flattener ${json-flattener.version} - + org.slf4j slf4j-api @@ -89,6 +87,34 @@ + + org.apache.maven.plugins + maven-compiler-plugin + 3.6.0 + + ${maven.compiler.source} + ${maven.compiler.target} + + ${exclude.src.elastic} + + + + + + default-testCompile + test-compile + + + ${exclude.test.elastic} + + + + testCompile + + + + + maven-dependency-plugin 2.8 @@ -131,7 +157,63 @@ + + + + elasticsearch-2 + + true + + + + org.elasticsearch + elasticsearch + ${elasticsearch.version} + + + + + + elasticsearch-5 + + **/Elasticsearch2Connector.java + **/Elasticsearch2InterpreterTest.java + 5.0.0 + 2.6.2 + 1.8 + 1.8 + + + + + org.elasticsearch.client + transport + ${elasticsearch.version} + + + + org.elasticsearch + elasticsearch + ${elasticsearch.version} + + + + org.apache.logging.log4j + log4j-api + ${log4j2.version} + + + + org.apache.logging.log4j + log4j-core + ${log4j2.version} + + + + + + diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/Elasticsearch2Connector.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/Elasticsearch2Connector.java new file mode 100644 index 00000000000..b2f96eb4119 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/Elasticsearch2Connector.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.elasticsearch; + +import com.google.gson.JsonParseException; +import org.apache.commons.lang.StringUtils; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.index.query.QueryBuilders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.*; + + +/** + * Connector for Elasticsearch 2.x used in ElasticsearchInterpreter. + */ +public class Elasticsearch2Connector extends ElasticsearchConnector { + + private static Logger logger = LoggerFactory.getLogger(ElasticsearchConnector.class); + + protected Elasticsearch2Connector(String host, int port, String clusterName, + int resultSize) { + super(host, port, clusterName, resultSize); + } + + @Override + public void connect(Properties props) { + logger.info("prop={}", props); + + try { + final Settings settings = Settings.builder() + .put("cluster.name", clusterName) + .put(props) + .build(); + client = TransportClient.builder().settings(settings).build() + .addTransportAddress( + new InetSocketTransportAddress(InetAddress.getByName(host), port)); + } catch (IOException e) { + logger.error("Open connection with Elasticsearch", e); + } + } + + @Override + public void release() { + if (client != null) { + client.close(); + } + } + + /** + * Execute a "delete" request. + * + * @param urlItems Items of the URL + * @return Result of the delete request, it contains the id of the deleted document + */ + public String executeDeleteQuery(String[] urlItems) { + if (urlItems.length != 3 + || StringUtils.isEmpty(urlItems[0]) + || StringUtils.isEmpty(urlItems[1]) + || StringUtils.isEmpty(urlItems[2])) { + throw new RuntimeException("Bad URL (it should be /index/type/id) : " + urlItems); + } + + final DeleteResponse response = client + .prepareDelete(urlItems[0], urlItems[1], urlItems[2]) + .get(); + + if (!response.isFound()) { + throw new RuntimeException("Document not found"); + } + + return response.getId(); + } + + @Override + protected SearchResponse searchData(String[] urlItems, String query, int size) { + + final SearchRequestBuilder reqBuilder = + new SearchRequestBuilder(client, SearchAction.INSTANCE); + reqBuilder.setIndices(); + + if (urlItems.length >= 1) { + reqBuilder.setIndices(StringUtils.split(urlItems[0], ",")); + } + if (urlItems.length > 1) { + reqBuilder.setTypes(StringUtils.split(urlItems[1], ",")); + } + + if (!StringUtils.isEmpty(query)) { + // The query can be either JSON-formatted, nor a Lucene query + // So, try to parse as a JSON => if there is an error, consider the query a Lucene one + try { + final Map source = gson.fromJson(query, Map.class); + reqBuilder.setExtraSource(source); + } catch (JsonParseException e) { + // This is not a JSON (or maybe not well formatted...) + reqBuilder.setQuery(QueryBuilders.queryStringQuery(query).analyzeWildcard(true)); + } + } + + reqBuilder.setSize(size); + final SearchResponse response = reqBuilder.get(); + return response; + } +} diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/Elasticsearch5Connector.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/Elasticsearch5Connector.java new file mode 100644 index 00000000000..76cf5c6cb7d --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/Elasticsearch5Connector.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.elasticsearch; + +import com.google.gson.JsonParseException; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.transport.client.PreBuiltTransportClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.Map; +import java.util.Properties; + + +/** + * Connector for Elasticsearch 5.x used in ElasticsearchInterpreter. + */ +public class Elasticsearch5Connector extends ElasticsearchConnector { + + private static Logger logger = LoggerFactory.getLogger(ElasticsearchConnector.class); + + protected Elasticsearch5Connector(String host, int port, String clusterName, int resultSize) { + super(host, port, clusterName, resultSize); + } + + @Override + public void connect(Properties props) { + logger.info("prop={}", props); + + try { + // Since 5.0, client doesn't allow invalid options ignored in the previous version. + // So, copy props and remove default so that user insert arbitrary valid options. + Properties filtered = new Properties(); + filtered.putAll(props); + filtered.remove(ELASTICSEARCH_CLUSTER_NAME); + filtered.remove(ELASTICSEARCH_HOST); + filtered.remove(ELASTICSEARCH_PORT); + filtered.remove(ELASTICSEARCH_RESULT_SIZE); + + final Settings settings = Settings.builder() + .put("cluster.name", clusterName) + .put(filtered) + .build(); + client = new PreBuiltTransportClient(settings) + .addTransportAddress( + new InetSocketTransportAddress(InetAddress.getByName(host), port)); + } catch (IOException e) { + logger.error("Open connection with Elasticsearch", e); + } + } + + /** + * Execute a "count" request. + * + * @param urlItems Items of the URL + * @param query May contains the JSON of the request + * @return Result of the count request, it contains the total hits + */ + @Override + public String executeCountQuery(String[] urlItems, String query) { + if (urlItems.length > 2) { + throw new RuntimeException( + "Bad URL (it should be /index1,index2,.../type1,type2,...) " + urlItems); + } + SearchRequestBuilder reqBuilder = null; + + if (urlItems.length >= 1) { + reqBuilder = client.prepareSearch(StringUtils.split(urlItems[0], ",")); + } else { + reqBuilder = client.prepareSearch(); + } + + if (urlItems.length > 1) { + reqBuilder.setTypes(StringUtils.split(urlItems[1], ",")); + } + + reqBuilder.setSize(0); + + if (!StringUtils.isEmpty(query)) { + reqBuilder.setQuery(QueryBuilders.wrapperQuery(query)); + } + + final SearchResponse response = reqBuilder.get(); + return "" + response.getHits().getTotalHits(); + } + + @Override + public void release() { + IOUtils.closeQuietly(client); + } + + /** + * Execute a "delete" request. + * + * @param urlItems Items of the URL + * @return Result of the delete request, it contains the id of the deleted document + */ + public String executeDeleteQuery(String[] urlItems) { + if (urlItems.length != 3 + || StringUtils.isEmpty(urlItems[0]) + || StringUtils.isEmpty(urlItems[1]) + || StringUtils.isEmpty(urlItems[2])) { + throw new RuntimeException("Bad URL (it should be /index/type/id) : " + urlItems); + } + + final DeleteResponse response = client + .prepareDelete(urlItems[0], urlItems[1], urlItems[2]) + .get(); + + if (RestStatus.NOT_FOUND == response.status()) { + throw new RuntimeException("Document not found"); + } + + return response.getId(); + } + + @Override + protected SearchResponse searchData(String[] urlItems, String query, int size) { + final SearchRequestBuilder reqBuilder = + new SearchRequestBuilder(client, SearchAction.INSTANCE); + reqBuilder.setIndices(); + + if (urlItems.length >= 1) { + reqBuilder.setIndices(StringUtils.split(urlItems[0], ",")); + } + if (urlItems.length > 1) { + reqBuilder.setTypes(StringUtils.split(urlItems[1], ",")); + } + + if (!StringUtils.isEmpty(query)) { + // The query can be either JSON-formatted, nor a Lucene query + // So, try to parse as a JSON => if there is an error, consider the query a Lucene one + try { + final Map source = gson.fromJson(query, Map.class); + // + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource() + .query(QueryBuilders.wrapperQuery(query)); + reqBuilder.setSource(sourceBuilder); + } catch (JsonParseException e) { + // This is not a JSON (or maybe not well formatted...) + reqBuilder.setQuery(QueryBuilders.queryStringQuery(query).analyzeWildcard(true)); + } + } + + reqBuilder.setSize(size); + final SearchResponse response = reqBuilder.get(); + return response; + } +} diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchConnector.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchConnector.java new file mode 100644 index 00000000000..8a08ae0a7b7 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchConnector.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.elasticsearch; + + +import com.github.wnameless.json.flattener.JsonFlattener; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.apache.commons.lang3.StringUtils; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHitField; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Interface for ElasticsearchConnector implementations. + */ +public abstract class ElasticsearchConnector { + + private static Logger logger = LoggerFactory.getLogger(ElasticsearchConnector.class); + private static final Pattern FIELD_NAME_PATTERN = Pattern.compile("\\[\\\\\"(.+)\\\\\"\\](.*)"); + private static final String HELP = "Elasticsearch interpreter:\n" + + "General format: ///