Skip to content

Commit

Permalink
Fix sendBulkRequest exception in Elasticsearch indexing (#1333)
Browse files Browse the repository at this point in the history
  • Loading branch information
yxzhu16 authored Jul 24, 2020
1 parent a5715cf commit 88418ee
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 1 deletion.
4 changes: 4 additions & 0 deletions src/main/java/io/anserini/index/IndexArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ public class IndexArgs {
usage = "Elasticsearch batch index requests size.")
public int esBatch = 1000;

@Option(name = "-es.bulk", metaVar = "[n]",
usage = "Elasticsearch max bulk requests size in bytes.")
public int esBulk = 80000000;

@Option(name = "-es.hostname", metaVar = "[host]",
usage = "Elasticsearch host.")
public String esHostname = "localhost";
Expand Down
16 changes: 15 additions & 1 deletion src/main/java/io/anserini/index/IndexCollection.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.SolrInputDocument;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
Expand Down Expand Up @@ -514,7 +515,10 @@ public void run() {

String indexName = (args.esIndex != null) ? args.esIndex : input.getFileName().toString();
bulkRequest.add(new IndexRequest(indexName).id(sourceDocument.id()).source(builder));
if (bulkRequest.numberOfActions() == args.esBatch) {

// sendBulkRequest when the batch size is reached OR the bulk size is reached
if (bulkRequest.numberOfActions() == args.esBatch ||
bulkRequest.estimatedSizeInBytes() >= args.esBulk) {
sendBulkRequest();
}

Expand Down Expand Up @@ -573,6 +577,16 @@ private void sendBulkRequest() {
bulkRequest = new BulkRequest();
} catch (Exception e) {
LOG.error("Error sending bulk requests to Elasticsearch", e);

// Log the 10 docs that have the largest sizes in this request
List<DocWriteRequest<?>> docs = bulkRequest.requests();
Collections.sort(docs, (d1, d2) -> ((IndexRequest) d2).source().length() - ((IndexRequest) d1).source().length());

LOG.info("Error sending bulkRequest. The 10 largest docs in this request are the following cord_uid: ");
for (int i = 0; i < 10; i++) {
IndexRequest doc = (IndexRequest) docs.get(i);
LOG.info(doc.id());
}
} finally {
if (esClient != null) {
try {
Expand Down

0 comments on commit 88418ee

Please sign in to comment.