diff --git a/src/main/java/org/elasticsearch/river/mongodb/Indexer.java b/src/main/java/org/elasticsearch/river/mongodb/Indexer.java index 070a2532..e4011f51 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/Indexer.java +++ b/src/main/java/org/elasticsearch/river/mongodb/Indexer.java @@ -33,6 +33,8 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchHit; +import com.mongodb.BasicDBObject; +import com.mongodb.DBObject; import com.mongodb.gridfs.GridFSDBFile; class Indexer implements Runnable { @@ -149,16 +151,29 @@ private BSONTimestamp processBlockingQueue( return lastTimestamp; } - if (scriptExecutable != null - && definition.isAdvancedTransformation()) { - return applyAdvancedTransformation(bulk, entry); - } - String objectId = ""; if (entry.getData().get(MongoDBRiver.MONGODB_ID_FIELD) != null) { objectId = entry.getData().get(MongoDBRiver.MONGODB_ID_FIELD).toString(); } + // TODO: Should the river support script filter, + // advanced_transformation, include_collection for GridFS? + if (entry.isAttachment()) { + try { + updateBulkRequest(bulk, entry.getData(), objectId, + operation, definition.getIndexName(), + definition.getTypeName(), null, null); + } catch (IOException ioEx) { + logger.error("Update bulk failed.", ioEx); + } + return lastTimestamp; + } + + if (scriptExecutable != null + && definition.isAdvancedTransformation()) { + return applyAdvancedTransformation(bulk, entry); + } + if (logger.isDebugEnabled()) { logger.debug("updateBulkRequest for id: [{}], operation: [{}]", objectId, operation); @@ -180,7 +195,7 @@ private BSONTimestamp processBlockingQueue( } catch (IOException e) { logger.warn("failed to parse {}", e); } - Map data = entry.getData(); + Map data = entry.getData().toMap(); if (scriptExecutable != null) { if (ctx != null) { ctx.put("document", entry.getData()); @@ -234,7 +249,7 @@ private BSONTimestamp processBlockingQueue( String parent = extractParent(ctx); String routing = extractRouting(ctx); objectId = extractObjectId(ctx, objectId); - updateBulkRequest(bulk, data, objectId, operation, index, type, + updateBulkRequest(bulk, new BasicDBObject(data), objectId, operation, index, type, routing, parent); } catch (IOException e) { logger.warn("failed to parse {}", e, entry.getData()); @@ -243,7 +258,7 @@ private BSONTimestamp processBlockingQueue( } private void updateBulkRequest(final BulkRequestBuilder bulk, - Map data, String objectId, String operation, + DBObject data, String objectId, String operation, String index, String type, String routing, String parent) throws IOException { if (logger.isDebugEnabled()) { @@ -251,12 +266,17 @@ private void updateBulkRequest(final BulkRequestBuilder bulk, "Operation: {} - index: {} - type: {} - routing: {} - parent: {}", operation, index, type, routing, parent); } + boolean isAttachment = false; + + if (logger.isDebugEnabled()) { + isAttachment = (data instanceof GridFSDBFile); + } if (MongoDBRiver.OPLOG_INSERT_OPERATION.equals(operation)) { if (logger.isDebugEnabled()) { logger.debug( "Insert operation - id: {} - contains attachment: {}", operation, objectId, - data.containsKey(MongoDBRiver.IS_MONGODB_ATTACHMENT)); + isAttachment); } bulk.add(indexRequest(index).type(type).id(objectId) .source(build(data, objectId)).routing(routing) @@ -267,7 +287,7 @@ private void updateBulkRequest(final BulkRequestBuilder bulk, if (logger.isDebugEnabled()) { logger.debug( "Update operation - id: {} - contains attachment: {}", - objectId, data.containsKey(MongoDBRiver.IS_MONGODB_ATTACHMENT)); + objectId, isAttachment); } deleteBulkRequest(bulk, objectId, index, type, routing, parent); bulk.add(indexRequest(index).type(type).id(objectId) @@ -283,7 +303,7 @@ private void updateBulkRequest(final BulkRequestBuilder bulk, } if (MongoDBRiver.OPLOG_COMMAND_OPERATION.equals(operation)) { if (definition.isDropCollection()) { - if (data.containsKey(MongoDBRiver.OPLOG_DROP_COMMAND_OPERATION) + if (data.get(MongoDBRiver.OPLOG_DROP_COMMAND_OPERATION) != null && data.get(MongoDBRiver.OPLOG_DROP_COMMAND_OPERATION).equals( definition.getMongoCollection())) { logger.info("Drop collection request [{}], [{}]", @@ -400,7 +420,7 @@ private BSONTimestamp applyAdvancedTransformation( if (scriptExecutable != null) { if (ctx != null && documents != null) { - document.put("data", entry.getData()); + document.put("data", entry.getData().toMap()); if (!objectId.isEmpty()) { document.put("id", objectId); } @@ -449,9 +469,9 @@ private BSONTimestamp applyAdvancedTransformation( String routing = extractRouting(item); String action = extractOperation(item); boolean ignore = isDocumentIgnored(item); - Map _data = (Map) item + Map data = (Map) item .get("data"); - objectId = extractObjectId(_data, objectId); + objectId = extractObjectId(data, objectId); if (logger.isDebugEnabled()) { logger.debug( "Id: {} - operation: {} - ignore: {} - index: {} - type: {} - routing: {} - parent: {}", @@ -462,7 +482,7 @@ private BSONTimestamp applyAdvancedTransformation( continue; } try { - updateBulkRequest(bulk, _data, objectId, + updateBulkRequest(bulk, new BasicDBObject(data), objectId, operation, index, type, routing, parent); } catch (IOException ioEx) { @@ -477,16 +497,16 @@ private BSONTimestamp applyAdvancedTransformation( return lastTimestamp; } - private XContentBuilder build(final Map data, - final String objectId) throws IOException { - if (data.containsKey(MongoDBRiver.IS_MONGODB_ATTACHMENT)) { + @SuppressWarnings("unchecked") + private XContentBuilder build(final DBObject data, final String objectId) + throws IOException { + if (data instanceof GridFSDBFile) { logger.info("Add Attachment: {} to index {} / type {}", objectId, definition.getIndexName(), definition.getTypeName()); - return MongoDBHelper.serialize((GridFSDBFile) data - .get(MongoDBRiver.MONGODB_ATTACHMENT)); + return MongoDBHelper.serialize((GridFSDBFile) data); } else { - return XContentFactory.jsonBuilder().map(data); + return XContentFactory.jsonBuilder().map(data.toMap()); } } diff --git a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java index afb43776..4663e235 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java +++ b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java @@ -60,6 +60,7 @@ import com.mongodb.MongoClient; import com.mongodb.MongoException; import com.mongodb.ServerAddress; +import com.mongodb.gridfs.GridFSDBFile; import com.mongodb.util.JSON; /** @@ -67,11 +68,10 @@ * @author flaper87 (Flavio Percoco Premoli) * @author aparo (Alberto Paro) * @author kryptt (Rodolfo Hansen) + * @author benmccann (Ben McCann) */ public class MongoDBRiver extends AbstractRiverComponent implements River { - public final static String IS_MONGODB_ATTACHMENT = "is_mongodb_attachment"; - public final static String MONGODB_ATTACHMENT = "mongodb_attachment"; public final static String TYPE = "mongodb"; public final static String NAME = "mongodb-river"; public final static String STATUS = "_mongodbstatus"; @@ -450,30 +450,30 @@ static void updateLastTimestamp(final MongoDBRiverDefinition definition, protected static class QueueEntry { - private Map data; - private String operation; - private BSONTimestamp oplogTimestamp; + private final DBObject data; + private final String operation; + private final BSONTimestamp oplogTimestamp; - public QueueEntry( - Map data) { - this.data = data; - this.operation = OPLOG_INSERT_OPERATION; + public QueueEntry(DBObject data) { + this(null, OPLOG_INSERT_OPERATION, data); } - public QueueEntry( - BSONTimestamp oplogTimestamp, - String oplogOperation, - Map data) { + public QueueEntry(BSONTimestamp oplogTimestamp, String oplogOperation, + DBObject data) { this.data = data; this.operation = oplogOperation; this.oplogTimestamp = oplogTimestamp; } public boolean isOplogEntry() { - return oplogTimestamp != null; + return oplogTimestamp != null; + } + + public boolean isAttachment() { + return (data instanceof GridFSDBFile); } - public Map getData() { + public DBObject getData() { return data; } diff --git a/src/main/java/org/elasticsearch/river/mongodb/Slurper.java b/src/main/java/org/elasticsearch/river/mongodb/Slurper.java index 4c7a9413..f4a15ac0 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/Slurper.java +++ b/src/main/java/org/elasticsearch/river/mongodb/Slurper.java @@ -1,9 +1,7 @@ package org.elasticsearch.river.mongodb; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; @@ -30,6 +28,7 @@ import com.mongodb.ServerAddress; import com.mongodb.gridfs.GridFS; import com.mongodb.gridfs.GridFSDBFile; +import com.mongodb.gridfs.GridFSFile; import com.mongodb.util.JSON; class Slurper implements Runnable { @@ -137,11 +136,25 @@ protected BSONTimestamp doInitialImport() throws InterruptedException { BSONTimestamp startTimestamp = getCurrentOplogTimestamp(); DBCursor cursor = null; try { - cursor = slurpedCollection.find(); - while (cursor.hasNext()) { - DBObject object = cursor.next(); - Map map = applyFieldFilter(object).toMap(); - addToStream(MongoDBRiver.OPLOG_INSERT_OPERATION, null, map); + if (!definition.isMongoGridFS()) { + cursor = slurpedCollection.find(); + while (cursor.hasNext()) { + DBObject object = cursor.next(); + addToStream(MongoDBRiver.OPLOG_INSERT_OPERATION, null, applyFieldFilter(object)); + } + } else { + // TODO: To be optimized. https://github.com/mongodb/mongo-java-driver/pull/48#issuecomment-25241988 + // possible option: Get the object id list from .fs collection then call GriDFS.findOne + GridFS grid = new GridFS(mongo.getDB(definition.getMongoDb()), + definition.getMongoCollection()); + cursor = grid.getFileList(); + while (cursor.hasNext()) { + DBObject object = cursor.next(); + if (object instanceof GridFSDBFile) { + GridFSDBFile file = grid.findOne(new ObjectId(object.get(MongoDBRiver.MONGODB_ID_FIELD).toString())); + addToStream(MongoDBRiver.OPLOG_INSERT_OPERATION, null, file); + } + } } } finally { if (cursor != null) { @@ -240,7 +253,6 @@ private DBCursor processFullOplog() throws InterruptedException { return oplogCursor(currentTimestamp); } - @SuppressWarnings("unchecked") private void processOplogEntry(final DBObject entry) throws InterruptedException { String operation = entry.get(MongoDBRiver.OPLOG_OPERATION).toString(); @@ -300,29 +312,31 @@ private void processOplogEntry(final DBObject entry) throw new NullPointerException(MongoDBRiver.MONGODB_ID_FIELD); } logger.info("Add attachment: {}", objectId); - object = applyFieldFilter(object); - HashMap data = new HashMap(); - data.put(MongoDBRiver.IS_MONGODB_ATTACHMENT, true); - data.put(MongoDBRiver.MONGODB_ATTACHMENT, object); - data.put(MongoDBRiver.MONGODB_ID_FIELD, objectId); - addToStream(operation, oplogTimestamp, data); + addToStream(operation, oplogTimestamp, applyFieldFilter(object)); } else { if (MongoDBRiver.OPLOG_UPDATE_OPERATION.equals(operation)) { DBObject update = (DBObject) entry.get(MongoDBRiver.OPLOG_UPDATE); logger.debug("Updated item: {}", update); addQueryToStream(operation, oplogTimestamp, update); } else { - Map map = applyFieldFilter(object).toMap(); - addToStream(operation, oplogTimestamp, map); + addToStream(operation, oplogTimestamp, applyFieldFilter(object)); } } } private DBObject applyFieldFilter(DBObject object) { - object = MongoDBHelper.applyExcludeFields(object, - definition.getExcludeFields()); - object = MongoDBHelper.applyIncludeFields(object, - definition.getIncludeFields()); + if (object instanceof GridFSFile) { + GridFSFile file = (GridFSFile) object; + DBObject metadata = file.getMetaData(); + if (metadata != null) { + file.setMetaData(applyFieldFilter(metadata)); + } + } else { + object = MongoDBHelper.applyExcludeFields(object, + definition.getExcludeFields()); + object = MongoDBHelper.applyIncludeFields(object, + definition.getIncludeFields()); + } return object; } @@ -422,7 +436,6 @@ private DBCursor oplogCursor(final BSONTimestamp timestampOverride) { .setOptions(options); } - @SuppressWarnings("unchecked") private void addQueryToStream(final String operation, final BSONTimestamp currentTimestamp, final DBObject update) throws InterruptedException { @@ -433,13 +446,12 @@ private void addQueryToStream(final String operation, } for (DBObject item : slurpedCollection.find(update, findKeys)) { - addToStream(operation, currentTimestamp, item.toMap()); + addToStream(operation, currentTimestamp, item); } } private void addToStream(final String operation, - final BSONTimestamp currentTimestamp, - final Map data) throws InterruptedException { + final BSONTimestamp currentTimestamp, final DBObject data) throws InterruptedException { if (logger.isDebugEnabled()) { logger.debug( "addToStream - operation [{}], currentTimestamp [{}], data [{}]", diff --git a/src/test/java/org/elasticsearch/river/mongodb/RiverMongoDBTestAbstract.java b/src/test/java/org/elasticsearch/river/mongodb/RiverMongoDBTestAbstract.java index cb4d81e4..50af0309 100644 --- a/src/test/java/org/elasticsearch/river/mongodb/RiverMongoDBTestAbstract.java +++ b/src/test/java/org/elasticsearch/river/mongodb/RiverMongoDBTestAbstract.java @@ -80,7 +80,7 @@ public abstract class RiverMongoDBTestAbstract { public static final String TEST_SIMPLE_MONGODB_DOCUMENT_JSON = "/org/elasticsearch/river/mongodb/script/test-simple-mongodb-document.json"; protected final ESLogger logger = Loggers.getLogger(getClass()); - protected final static long wait = 6000; + protected final static long wait = 2000; public static final String ADMIN_DATABASE_NAME = "admin"; public static final String LOCAL_DATABASE_NAME = "local"; diff --git a/src/test/java/org/elasticsearch/river/mongodb/gridfs/RiverMongoWithGridFSInitialImportTest.java b/src/test/java/org/elasticsearch/river/mongodb/gridfs/RiverMongoWithGridFSInitialImportTest.java new file mode 100644 index 00000000..9ef8f780 --- /dev/null +++ b/src/test/java/org/elasticsearch/river/mongodb/gridfs/RiverMongoWithGridFSInitialImportTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.river.mongodb.gridfs; + +import static org.elasticsearch.client.Requests.countRequest; +import static org.elasticsearch.common.io.Streams.copyToBytesFromClasspath; +import static org.elasticsearch.index.query.QueryBuilders.fieldQuery; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import org.bson.types.ObjectId; +import org.elasticsearch.action.count.CountResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.river.mongodb.RiverMongoDBTestAbstract; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; +import com.mongodb.WriteConcern; +import com.mongodb.gridfs.GridFS; +import com.mongodb.gridfs.GridFSDBFile; +import com.mongodb.gridfs.GridFSInputFile; + +public class RiverMongoWithGridFSInitialImportTest extends + RiverMongoDBTestAbstract { + + private DB mongoDB; + private DBCollection mongoCollection; + + protected RiverMongoWithGridFSInitialImportTest() { + super("testgridfs-initialimport-" + System.currentTimeMillis(), + "testgridfs-initialimport-" + System.currentTimeMillis(), "fs", + "testattachmentindex-initialimport-" + + System.currentTimeMillis()); + } + + // @BeforeClass + public void createDatabase() { + logger.debug("createDatabase {}", getDatabase()); + try { + mongoDB = getMongo().getDB(getDatabase()); + mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE); + logger.info("Start createCollection"); + mongoCollection = mongoDB.createCollection(getCollection(), null); + Assert.assertNotNull(mongoCollection); + Thread.sleep(wait); + } catch (Throwable t) { + logger.error("createDatabase failed.", t); + } + } + + private void createRiver() throws Exception { + super.createRiver(TEST_MONGODB_RIVER_GRIDFS_JSON); + Thread.sleep(wait); + } + + // @AfterClass + public void cleanUp() { + super.deleteRiver(); + logger.info("Drop database " + mongoDB.getName()); + mongoDB.dropDatabase(); + } + + @Test + public void testImportAttachmentInitialImport() throws Exception { + logger.debug("*** testImportAttachmentInitialImport ***"); + try { + createDatabase(); + byte[] content = copyToBytesFromClasspath(RiverMongoWithGridFSTest.TEST_ATTACHMENT_HTML); + logger.debug("Content in bytes: {}", content.length); + GridFS gridFS = new GridFS(mongoDB); + GridFSInputFile in = gridFS.createFile(content); + in.setFilename("test-attachment.html"); + in.setContentType("text/html"); + in.save(); + in.validate(); + + String id = in.getId().toString(); + logger.debug("GridFS in: {}", in); + logger.debug("Document created with id: {}", id); + + GridFSDBFile out = gridFS.findOne(in.getFilename()); + logger.debug("GridFS from findOne: {}", out); + out = gridFS.findOne(new ObjectId(id)); + logger.debug("GridFS from findOne: {}", out); + Assert.assertEquals(out.getId(), in.getId()); + + createRiver(); + Thread.sleep(wait); + refreshIndex(); + + CountResponse countResponse = getNode().client() + .count(countRequest(getIndex())).actionGet(); + logger.debug("Index total count: {}", countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(1l)); + + countResponse = getNode() + .client() + .count(countRequest(getIndex()) + .query(fieldQuery("_id", id))).actionGet(); + logger.debug("Index count for id {}: {}", id, + countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(1l)); + + SearchResponse response = getNode().client() + .prepareSearch(getIndex()) + .setQuery(QueryBuilders.queryString("Aliquam")).execute() + .actionGet(); + logger.debug("SearchResponse {}", response.toString()); + long totalHits = response.getHits().getTotalHits(); + logger.debug("TotalHits: {}", totalHits); + assertThat(totalHits, equalTo(1l)); + + in = gridFS.createFile(content); + in.setFilename("test-attachment-2.html"); + in.setContentType("text/html"); + in.save(); + in.validate(); + + id = in.getId().toString(); + + out = gridFS.findOne(in.getFilename()); + logger.debug("GridFS from findOne: {}", out); + out = gridFS.findOne(new ObjectId(id)); + logger.debug("GridFS from findOne: {}", out); + Assert.assertEquals(out.getId(), in.getId()); + + Thread.sleep(wait); + refreshIndex(); + + countResponse = getNode().client().count(countRequest(getIndex())) + .actionGet(); + logger.debug("Index total count: {}", countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(2l)); + + countResponse = getNode() + .client() + .count(countRequest(getIndex()) + .query(fieldQuery("_id", id))).actionGet(); + logger.debug("Index count for id {}: {}", id, + countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(1l)); + + response = getNode().client().prepareSearch(getIndex()) + .setQuery(QueryBuilders.queryString("Aliquam")).execute() + .actionGet(); + logger.debug("SearchResponse {}", response.toString()); + totalHits = response.getHits().getTotalHits(); + logger.debug("TotalHits: {}", totalHits); + assertThat(totalHits, equalTo(2l)); + + DBCursor cursor = gridFS.getFileList(); + try { + while (cursor.hasNext()) { + DBObject object = cursor.next(); + gridFS.remove(new ObjectId(object.get("_id").toString())); + } + } finally { + cursor.close(); + } + + Thread.sleep(wait); + refreshIndex(); + + countResponse = getNode() + .client() + .count(countRequest(getIndex()) + .query(fieldQuery("_id", id))).actionGet(); + logger.debug("Count after delete request: {}", + countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(0L)); + } catch (Throwable t) { + logger.error("testImportAttachmentInitialImport failed.", t); + Assert.fail("testImportAttachmentInitialImport failed", t); + } finally { + cleanUp(); + } + } + +} diff --git a/src/test/java/org/elasticsearch/river/mongodb/gridfs/RiverMongoWithGridFSTest.java b/src/test/java/org/elasticsearch/river/mongodb/gridfs/RiverMongoWithGridFSTest.java index e0d9f140..a0ee752b 100644 --- a/src/test/java/org/elasticsearch/river/mongodb/gridfs/RiverMongoWithGridFSTest.java +++ b/src/test/java/org/elasticsearch/river/mongodb/gridfs/RiverMongoWithGridFSTest.java @@ -42,19 +42,17 @@ import com.mongodb.gridfs.GridFSDBFile; import com.mongodb.gridfs.GridFSInputFile; -@Test public class RiverMongoWithGridFSTest extends RiverMongoDBTestAbstract { - private static final String TEST_ATTACHMENT_PDF = "/org/elasticsearch/river/mongodb/gridfs/lorem.pdf"; - private static final String TEST_ATTACHMENT_HTML = "/org/elasticsearch/river/mongodb/gridfs/test-attachment.html"; + public static final String TEST_ATTACHMENT_PDF = "/org/elasticsearch/river/mongodb/gridfs/lorem.pdf"; + public static final String TEST_ATTACHMENT_HTML = "/org/elasticsearch/river/mongodb/gridfs/test-attachment.html"; private DB mongoDB; private DBCollection mongoCollection; protected RiverMongoWithGridFSTest() { - super("testgridfs-" + System.currentTimeMillis(), - "testgridfs-" + System.currentTimeMillis(), - "fs", - "testattachmentindex-" + System.currentTimeMillis()); + super("testgridfs-" + System.currentTimeMillis(), "testgridfs-" + + System.currentTimeMillis(), "fs", "testattachmentindex-" + + System.currentTimeMillis()); } @BeforeClass @@ -64,6 +62,7 @@ public void createDatabase() { mongoDB = getMongo().getDB(getDatabase()); mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE); super.createRiver(TEST_MONGODB_RIVER_GRIDFS_JSON); + Thread.sleep(wait); logger.info("Start createCollection"); mongoCollection = mongoDB.createCollection(getCollection(), null); Assert.assertNotNull(mongoCollection); @@ -82,113 +81,139 @@ public void cleanUp() { @Test public void testImportAttachment() throws Exception { logger.debug("*** testImportAttachment ***"); - byte[] content = copyToBytesFromClasspath(TEST_ATTACHMENT_HTML); - logger.debug("Content in bytes: {}", content.length); - GridFS gridFS = new GridFS(mongoDB); - GridFSInputFile in = gridFS.createFile(content); - in.setFilename("test-attachment.html"); - in.setContentType("text/html"); - in.save(); - in.validate(); - - String id = in.getId().toString(); - logger.debug("GridFS in: {}", in); - logger.debug("Document created with id: {}", id); - - GridFSDBFile out = gridFS.findOne(in.getFilename()); - logger.debug("GridFS from findOne: {}", out); - out = gridFS.findOne(new ObjectId(id)); - logger.debug("GridFS from findOne: {}", out); - Assert.assertEquals(out.getId(), in.getId()); - - Thread.sleep(wait); - refreshIndex(); - - CountResponse countResponse = getNode().client() - .count(countRequest(getIndex())).actionGet(); - logger.debug("Index total count: {}", countResponse.getCount()); - assertThat(countResponse.getCount(), equalTo(1l)); - - countResponse = getNode().client() - .count(countRequest(getIndex()).query(fieldQuery("_id", id))) - .actionGet(); - logger.debug("Index count for id {}: {}", id, countResponse.getCount()); - assertThat(countResponse.getCount(), equalTo(1l)); - - SearchResponse response = getNode().client().prepareSearch(getIndex()) - .setQuery(QueryBuilders.queryString("Aliquam")).execute() - .actionGet(); - logger.debug("SearchResponse {}", response.toString()); - long totalHits = response.getHits().getTotalHits(); - logger.debug("TotalHits: {}", totalHits); - assertThat(totalHits, equalTo(1l)); - - gridFS.remove(new ObjectId(id)); - - Thread.sleep(wait); - refreshIndex(); - - countResponse = getNode().client() - .count(countRequest(getIndex()).query(fieldQuery("_id", id))) - .actionGet(); - logger.debug("Count after delete request: {}", countResponse.getCount()); - assertThat(countResponse.getCount(), equalTo(0L)); + try { +// createDatabase(); + byte[] content = copyToBytesFromClasspath(TEST_ATTACHMENT_HTML); + logger.debug("Content in bytes: {}", content.length); + GridFS gridFS = new GridFS(mongoDB); + GridFSInputFile in = gridFS.createFile(content); + in.setFilename("test-attachment.html"); + in.setContentType("text/html"); + in.save(); + in.validate(); + + String id = in.getId().toString(); + logger.debug("GridFS in: {}", in); + logger.debug("Document created with id: {}", id); + + GridFSDBFile out = gridFS.findOne(in.getFilename()); + logger.debug("GridFS from findOne: {}", out); + out = gridFS.findOne(new ObjectId(id)); + logger.debug("GridFS from findOne: {}", out); + Assert.assertEquals(out.getId(), in.getId()); + + Thread.sleep(wait); + refreshIndex(); + + CountResponse countResponse = getNode().client() + .count(countRequest(getIndex())).actionGet(); + logger.debug("Index total count: {}", countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(1l)); + + countResponse = getNode() + .client() + .count(countRequest(getIndex()) + .query(fieldQuery("_id", id))).actionGet(); + logger.debug("Index count for id {}: {}", id, + countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(1l)); + + SearchResponse response = getNode().client() + .prepareSearch(getIndex()) + .setQuery(QueryBuilders.queryString("Aliquam")).execute() + .actionGet(); + logger.debug("SearchResponse {}", response.toString()); + long totalHits = response.getHits().getTotalHits(); + logger.debug("TotalHits: {}", totalHits); + assertThat(totalHits, equalTo(1l)); + + gridFS.remove(new ObjectId(id)); + + Thread.sleep(wait); + refreshIndex(); + + countResponse = getNode() + .client() + .count(countRequest(getIndex()) + .query(fieldQuery("_id", id))).actionGet(); + logger.debug("Count after delete request: {}", + countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(0L)); + } catch (Throwable t) { + logger.error("testImportAttachment failed.", t); + Assert.fail("testImportAttachment failed", t); + } finally { +// cleanUp(); + } } @Test public void testImportPDFAttachment() throws Exception { logger.debug("*** testImportPDFAttachment ***"); - byte[] content = copyToBytesFromClasspath(TEST_ATTACHMENT_PDF); - logger.debug("Content in bytes: {}", content.length); - GridFS gridFS = new GridFS(mongoDB); - GridFSInputFile in = gridFS.createFile(content); - in.setFilename("lorem.pdf"); - in.setContentType("application/pdf"); - in.save(); - in.validate(); - - String id = in.getId().toString(); - logger.debug("GridFS in: {}", in); - logger.debug("Document created with id: {}", id); - - GridFSDBFile out = gridFS.findOne(in.getFilename()); - logger.debug("GridFS from findOne: {}", out); - out = gridFS.findOne(new ObjectId(id)); - logger.debug("GridFS from findOne: {}", out); - Assert.assertEquals(out.getId(), in.getId()); - - Thread.sleep(wait); - refreshIndex(); - - CountResponse countResponse = getNode().client() - .count(countRequest(getIndex())).actionGet(); - logger.debug("Index total count: {}", countResponse.getCount()); - assertThat(countResponse.getCount(), equalTo(1l)); - - countResponse = getNode().client() - .count(countRequest(getIndex()).query(fieldQuery("_id", id))) - .actionGet(); - logger.debug("Index count for id {}: {}", id, countResponse.getCount()); - assertThat(countResponse.getCount(), equalTo(1l)); - - SearchResponse response = getNode().client().prepareSearch(getIndex()) - .setQuery(QueryBuilders.queryString("Lorem ipsum dolor")) - .execute().actionGet(); - logger.debug("SearchResponse {}", response.toString()); - long totalHits = response.getHits().getTotalHits(); - logger.debug("TotalHits: {}", totalHits); - assertThat(totalHits, equalTo(1l)); - - gridFS.remove(new ObjectId(id)); - - Thread.sleep(wait); - refreshIndex(); - - countResponse = getNode().client() - .count(countRequest(getIndex()).query(fieldQuery("_id", id))) - .actionGet(); - logger.debug("Count after delete request: {}", countResponse.getCount()); - assertThat(countResponse.getCount(), equalTo(0L)); + try { +// createDatabase(); + byte[] content = copyToBytesFromClasspath(TEST_ATTACHMENT_PDF); + logger.debug("Content in bytes: {}", content.length); + GridFS gridFS = new GridFS(mongoDB); + GridFSInputFile in = gridFS.createFile(content); + in.setFilename("lorem.pdf"); + in.setContentType("application/pdf"); + in.save(); + in.validate(); + + String id = in.getId().toString(); + logger.debug("GridFS in: {}", in); + logger.debug("Document created with id: {}", id); + + GridFSDBFile out = gridFS.findOne(in.getFilename()); + logger.debug("GridFS from findOne: {}", out); + out = gridFS.findOne(new ObjectId(id)); + logger.debug("GridFS from findOne: {}", out); + Assert.assertEquals(out.getId(), in.getId()); + + Thread.sleep(wait); + refreshIndex(); + + CountResponse countResponse = getNode().client() + .count(countRequest(getIndex())).actionGet(); + logger.debug("Index total count: {}", countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(1l)); + + countResponse = getNode() + .client() + .count(countRequest(getIndex()) + .query(fieldQuery("_id", id))).actionGet(); + logger.debug("Index count for id {}: {}", id, + countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(1l)); + + SearchResponse response = getNode().client() + .prepareSearch(getIndex()) + .setQuery(QueryBuilders.queryString("Lorem ipsum dolor")) + .execute().actionGet(); + logger.debug("SearchResponse {}", response.toString()); + long totalHits = response.getHits().getTotalHits(); + logger.debug("TotalHits: {}", totalHits); + assertThat(totalHits, equalTo(1l)); + + gridFS.remove(new ObjectId(id)); + + Thread.sleep(wait); + refreshIndex(); + + countResponse = getNode() + .client() + .count(countRequest(getIndex()) + .query(fieldQuery("_id", id))).actionGet(); + logger.debug("Count after delete request: {}", + countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(0L)); + } catch (Throwable t) { + logger.error("testImportPDFAttachment failed.", t); + Assert.fail("testImportPDFAttachment failed", t); + } finally { +// cleanUp(); + } } /* @@ -196,62 +221,78 @@ public void testImportPDFAttachment() throws Exception { */ @Test public void testImportAttachmentWithCustomMetadata() throws Exception { - logger.debug("*** testImportAttachment ***"); - byte[] content = copyToBytesFromClasspath(TEST_ATTACHMENT_HTML); - logger.debug("Content in bytes: {}", content.length); - GridFS gridFS = new GridFS(mongoDB); - GridFSInputFile in = gridFS.createFile(content); - in.setFilename("test-attachment.html"); - in.setContentType("text/html"); - BasicDBObject metadata = new BasicDBObject(); - metadata.put("attribut1", "value1"); - metadata.put("attribut2", "value2"); - in.put("metadata", metadata); - in.save(); - in.validate(); - - String id = in.getId().toString(); - logger.debug("GridFS in: {}", in); - logger.debug("Document created with id: {}", id); - - GridFSDBFile out = gridFS.findOne(in.getFilename()); - logger.debug("GridFS from findOne: {}", out); - out = gridFS.findOne(new ObjectId(id)); - logger.debug("GridFS from findOne: {}", out); - Assert.assertEquals(out.getId(), in.getId()); - - Thread.sleep(wait); - refreshIndex(); - - CountResponse countResponse = getNode().client() - .count(countRequest(getIndex())).actionGet(); - logger.debug("Index total count: {}", countResponse.getCount()); - assertThat(countResponse.getCount(), equalTo(1l)); - - countResponse = getNode().client() - .count(countRequest(getIndex()).query(fieldQuery("_id", id))) - .actionGet(); - logger.debug("Index count for id {}: {}", id, countResponse.getCount()); - assertThat(countResponse.getCount(), equalTo(1l)); - - SearchResponse response = getNode().client().prepareSearch(getIndex()) - .setQuery(QueryBuilders.queryString("metadata.attribut1:value1")).execute() - .actionGet(); - logger.debug("SearchResponse {}", response.toString()); - long totalHits = response.getHits().getTotalHits(); - logger.debug("TotalHits: {}", totalHits); - assertThat(totalHits, equalTo(1l)); - - gridFS.remove(new ObjectId(id)); - - Thread.sleep(wait); - refreshIndex(); - - countResponse = getNode().client() - .count(countRequest(getIndex()).query(fieldQuery("_id", id))) - .actionGet(); - logger.debug("Count after delete request: {}", countResponse.getCount()); - assertThat(countResponse.getCount(), equalTo(0L)); + logger.debug("*** testImportAttachmentWithCustomMetadata ***"); + try { +// createDatabase(); + byte[] content = copyToBytesFromClasspath(TEST_ATTACHMENT_HTML); + logger.debug("Content in bytes: {}", content.length); + GridFS gridFS = new GridFS(mongoDB); + GridFSInputFile in = gridFS.createFile(content); + in.setFilename("test-attachment.html"); + in.setContentType("text/html"); + BasicDBObject metadata = new BasicDBObject(); + metadata.put("attribut1", "value1"); + metadata.put("attribut2", "value2"); + in.put("metadata", metadata); + in.save(); + in.validate(); + + String id = in.getId().toString(); + logger.debug("GridFS in: {}", in); + logger.debug("Document created with id: {}", id); + + GridFSDBFile out = gridFS.findOne(in.getFilename()); + logger.debug("GridFS from findOne: {}", out); + out = gridFS.findOne(new ObjectId(id)); + logger.debug("GridFS from findOne: {}", out); + Assert.assertEquals(out.getId(), in.getId()); + + Thread.sleep(wait); + refreshIndex(); + + CountResponse countResponse = getNode().client() + .count(countRequest(getIndex())).actionGet(); + logger.debug("Index total count: {}", countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(1l)); + + countResponse = getNode() + .client() + .count(countRequest(getIndex()) + .query(fieldQuery("_id", id))).actionGet(); + logger.debug("Index count for id {}: {}", id, + countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(1l)); + + SearchResponse response = getNode() + .client() + .prepareSearch(getIndex()) + .setQuery( + QueryBuilders + .queryString("metadata.attribut1:value1")) + .execute().actionGet(); + logger.debug("SearchResponse {}", response.toString()); + long totalHits = response.getHits().getTotalHits(); + logger.debug("TotalHits: {}", totalHits); + assertThat(totalHits, equalTo(1l)); + + gridFS.remove(new ObjectId(id)); + + Thread.sleep(wait); + refreshIndex(); + + countResponse = getNode() + .client() + .count(countRequest(getIndex()) + .query(fieldQuery("_id", id))).actionGet(); + logger.debug("Count after delete request: {}", + countResponse.getCount()); + assertThat(countResponse.getCount(), equalTo(0L)); + } catch (Throwable t) { + logger.error("testImportAttachmentWithCustomMetadata failed.", t); + Assert.fail("testImportAttachmentWithCustomMetadata failed", t); + } finally { +// cleanUp(); + } } }