diff --git a/engine/src/main/java/com/arcadedb/database/EmbeddedDatabase.java b/engine/src/main/java/com/arcadedb/database/EmbeddedDatabase.java index bcb57d3af9..b49fe8bf6c 100644 --- a/engine/src/main/java/com/arcadedb/database/EmbeddedDatabase.java +++ b/engine/src/main/java/com/arcadedb/database/EmbeddedDatabase.java @@ -217,7 +217,7 @@ private void openInternal() { if (fileManager.getFiles().isEmpty()) schema.create(mode); else - schema.load(mode); + schema.load(mode, true); if (mode == PaginatedFile.MODE.READ_WRITE) checkForRecovery(); diff --git a/engine/src/main/java/com/arcadedb/engine/TransactionManager.java b/engine/src/main/java/com/arcadedb/engine/TransactionManager.java index d45ec93e6e..fb4f91ba5b 100644 --- a/engine/src/main/java/com/arcadedb/engine/TransactionManager.java +++ b/engine/src/main/java/com/arcadedb/engine/TransactionManager.java @@ -272,7 +272,7 @@ public boolean applyChanges(final WALFile.WALTransaction tx) { final PaginatedFile file; if (!database.getFileManager().existsFile(txPage.fileId)) { - LogManager.instance().log(this, Level.WARNING, "Error on restoring transaction. Found deleted file %d", null, txPage.fileId); + LogManager.instance().log(this, Level.WARNING, "Error on restoring transaction: received operation on deleted file %d", null, txPage.fileId); continue; } diff --git a/engine/src/main/java/com/arcadedb/exception/SchemaException.java b/engine/src/main/java/com/arcadedb/exception/SchemaException.java index 4373d03763..f9a72c370b 100644 --- a/engine/src/main/java/com/arcadedb/exception/SchemaException.java +++ b/engine/src/main/java/com/arcadedb/exception/SchemaException.java @@ -15,14 +15,12 @@ */ package com.arcadedb.exception; -import java.io.IOException; - public class SchemaException extends ArcadeDBException { public SchemaException(final String s) { super(s); } - public SchemaException(String s, IOException e) { + public SchemaException(String s, Exception e) { super(s, e); } } diff --git a/engine/src/main/java/com/arcadedb/index/lsm/LSMTreeIndexMutable.java b/engine/src/main/java/com/arcadedb/index/lsm/LSMTreeIndexMutable.java index 3ab5f6ee15..a0921db6b5 100644 --- a/engine/src/main/java/com/arcadedb/index/lsm/LSMTreeIndexMutable.java +++ b/engine/src/main/java/com/arcadedb/index/lsm/LSMTreeIndexMutable.java @@ -48,7 +48,7 @@ public class LSMTreeIndexMutable extends LSMTreeIndexAbstract { private int subIndexFileId = -1; private LSMTreeIndexCompacted subIndex = null; private final AtomicLong statsAdjacentSteps = new AtomicLong(); - private final int minPagesToScheduleACompaction; + private int minPagesToScheduleACompaction; private int currentMutablePages = 0; /** @@ -78,24 +78,7 @@ protected LSMTreeIndexMutable(final LSMTreeIndex mainIndex, final DatabaseIntern protected LSMTreeIndexMutable(final LSMTreeIndex mainIndex, final DatabaseInternal database, final String name, final boolean unique, final String filePath, final int id, final PaginatedFile.MODE mode, final int pageSize) throws IOException { super(mainIndex, database, name, unique, filePath, id, mode, pageSize); - - final BasePage currentPage = this.database.getTransaction().getPage(new PageId(file.getFileId(), 0), pageSize); - - int pos = INT_SERIALIZED_SIZE + INT_SERIALIZED_SIZE + BYTE_SERIALIZED_SIZE + INT_SERIALIZED_SIZE; - - // TODO: COUNT THE MUTABLE PAGES FROM THE TAIL BACK TO THE HEAD - currentMutablePages = 1; - - subIndexFileId = currentPage.readInt(pos); - - pos += INT_SERIALIZED_SIZE; - - final int len = currentPage.readByte(pos++); - this.keyTypes = new byte[len]; - for (int i = 0; i < len; ++i) - this.keyTypes[i] = currentPage.readByte(pos++); - - minPagesToScheduleACompaction = database.getConfiguration().getValueAsInteger(GlobalConfiguration.INDEX_COMPACTION_MIN_PAGES_SCHEDULE); + onAfterLoad(); } @Override @@ -107,19 +90,38 @@ public void close() { @Override public void onAfterLoad() { - if (subIndexFileId > -1) { - try { + // RELOAD THE PAGE. THIS CAN BE CALLED AT CREATION OF THE OBJECT (CONSTRUCTOR) OR IN A TX WHEN DATABASE STRUCTURE CHANGES + try { + final BasePage currentPage = this.database.getTransaction().getPage(new PageId(file.getFileId(), 0), pageSize); + + int pos = INT_SERIALIZED_SIZE + INT_SERIALIZED_SIZE + BYTE_SERIALIZED_SIZE + INT_SERIALIZED_SIZE; + + // TODO: COUNT THE MUTABLE PAGES FROM THE TAIL BACK TO THE HEAD + currentMutablePages = 1; + + subIndexFileId = currentPage.readInt(pos); + + pos += INT_SERIALIZED_SIZE; + + final int len = currentPage.readByte(pos++); + this.keyTypes = new byte[len]; + for (int i = 0; i < len; ++i) + this.keyTypes[i] = currentPage.readByte(pos++); + + minPagesToScheduleACompaction = database.getConfiguration().getValueAsInteger(GlobalConfiguration.INDEX_COMPACTION_MIN_PAGES_SCHEDULE); + + if (subIndexFileId > 0) { subIndex = (LSMTreeIndexCompacted) database.getSchema().getFileById(subIndexFileId); subIndex.mainIndex = mainIndex; subIndex.keyTypes = keyTypes; - } catch (Exception e) { - LogManager.instance().log(this, Level.SEVERE, - "Invalid sub-index for index '%s', ignoring it. WARNING: This could lead on using partial indexes. Please recreate the index from scratch (error=%s)", - null, name, e.getMessage()); - - database.getSchema().dropIndex(name); } + } catch (Exception e) { + LogManager.instance().log(this, Level.SEVERE, + "Invalid sub-index for index '%s', ignoring it. WARNING: This could lead on using partial indexes. Please recreate the index from scratch (error=%s)", + null, name, e.getMessage()); + + database.getSchema().dropIndex(name); } } diff --git a/engine/src/main/java/com/arcadedb/query/sql/parser/DropPropertyStatement.java b/engine/src/main/java/com/arcadedb/query/sql/parser/DropPropertyStatement.java index 544c33ed25..8d758d7c7e 100644 --- a/engine/src/main/java/com/arcadedb/query/sql/parser/DropPropertyStatement.java +++ b/engine/src/main/java/com/arcadedb/query/sql/parser/DropPropertyStatement.java @@ -20,7 +20,6 @@ import com.arcadedb.database.Database; import com.arcadedb.exception.CommandExecutionException; import com.arcadedb.index.Index; -import com.arcadedb.index.IndexInternal; import com.arcadedb.index.TypeIndex; import com.arcadedb.query.sql.executor.CommandContext; import com.arcadedb.query.sql.executor.InternalResultSet; @@ -63,8 +62,8 @@ public ResultSet executeDDL(CommandContext ctx) { if (!indexes.isEmpty()) { if (force) { for (final Index index : indexes) { - ((IndexInternal) index).drop(); - ResultInternal result = new ResultInternal(); + database.getSchema().dropIndex(index.getName()); + final ResultInternal result = new ResultInternal(); result.setProperty("operation", "cascade drop index"); result.setProperty("indexName", index.getName()); rs.add(result); diff --git a/engine/src/main/java/com/arcadedb/schema/DocumentType.java b/engine/src/main/java/com/arcadedb/schema/DocumentType.java index 15190be981..80a496e6e1 100644 --- a/engine/src/main/java/com/arcadedb/schema/DocumentType.java +++ b/engine/src/main/java/com/arcadedb/schema/DocumentType.java @@ -172,7 +172,7 @@ public void dropProperty(final String propertyName) { } public Index createTypeIndex(final EmbeddedSchema.INDEX_TYPE indexType, final boolean unique, final String... propertyNames) { - return schema.createTypeIndex(indexType, unique, name, propertyNames); + return schema.createTypeIndex(indexType, unique, name, propertyNames, LSMTreeIndexAbstract.DEF_PAGE_SIZE, LSMTreeIndexAbstract.NULL_STRATEGY.SKIP, null); } public Index createTypeIndex(final EmbeddedSchema.INDEX_TYPE indexType, final boolean unique, String[] propertyNames, final int pageSize) { @@ -227,7 +227,7 @@ public DocumentType addBucket(final Bucket bucket) { public DocumentType removeBucket(final Bucket bucket) { recordFileChanges(() -> { - addBucketInternal(bucket); + removeBucketInternal(bucket); return null; }); return this; @@ -529,8 +529,11 @@ public void removeIndexInternal(final TypeIndex index) { for (IndexInternal idx : index.getIndexesOnBuckets()) { final List list = bucketIndexesByBucket.get(idx.getAssociatedBucketId()); - if (list != null) + if (list != null) { list.remove(idx); + if (list.isEmpty()) + bucketIndexesByBucket.remove(idx.getAssociatedBucketId()); + } } for (DocumentType parent : parentTypes) @@ -550,12 +553,10 @@ protected void addBucketInternal(final Bucket bucket) { } protected void removeBucketInternal(final Bucket bucket) { - for (DocumentType cl : schema.getTypes()) { - if (!cl.hasBucket(bucket.getName())) - throw new SchemaException( - "Cannot remove the bucket '" + bucket.getName() + "' to the type '" + name + "', because the bucket is not associated to the type '" + cl.getName() - + "'"); - } + if (!buckets.contains(bucket)) + throw new SchemaException( + "Cannot remove the bucket '" + bucket.getName() + "' to the type '" + name + "', because the bucket is not associated to the type '" + getName() + + "'"); buckets.remove(bucket); } diff --git a/engine/src/main/java/com/arcadedb/schema/EmbeddedSchema.java b/engine/src/main/java/com/arcadedb/schema/EmbeddedSchema.java index 75419a72c7..fd685c0090 100644 --- a/engine/src/main/java/com/arcadedb/schema/EmbeddedSchema.java +++ b/engine/src/main/java/com/arcadedb/schema/EmbeddedSchema.java @@ -122,7 +122,13 @@ public void create(final PaginatedFile.MODE mode) { } } - public void load(final PaginatedFile.MODE mode) throws IOException { + public void load(final PaginatedFile.MODE mode, final boolean initialize) throws IOException { + files.clear(); + types.clear(); + bucketMap.clear(); + indexMap.clear(); + dictionary = null; + final Collection filesToOpen = database.getFileManager().getFiles(); // REGISTER THE DICTIONARY FIRST @@ -154,51 +160,13 @@ else if (mainComponent instanceof IndexInternal) } } - for (PaginatedComponent f : files) - if (f != null) - f.onAfterLoad(); + if (initialize) + initComponents(); readConfiguration(); updateSecurity(); } - public void loadChanges() throws IOException { - final Collection filesToOpen = database.getFileManager().getFiles(); - - final PaginatedFile.MODE mode = database.getMode(); - - final List newFilesLoaded = new ArrayList<>(); - - for (PaginatedFile file : filesToOpen) { - if (file != null && !Dictionary.DICT_EXT.equals(file.getFileExtension())) { - final PaginatedComponent pf = paginatedComponentFactory.createComponent(file, mode); - - if (pf != null) { - if (pf.getId() < files.size() && files.get(pf.getId()) != null) - // ALREADY LOADED - continue; - - final Object mainComponent = pf.getMainComponent(); - - if (mainComponent instanceof Bucket) - bucketMap.put(pf.getName(), (Bucket) mainComponent); - else if (mainComponent instanceof IndexInternal) - indexMap.put(pf.getName(), (IndexInternal) mainComponent); - - registerFile(pf); - - newFilesLoaded.add(pf); - } - } - } - - for (PaginatedComponent f : newFilesLoaded) - if (f != null) - f.onAfterLoad(); - - readConfiguration(); - } - @Override public TimeZone getTimeZone() { return timeZone; @@ -413,13 +381,19 @@ public Index[] getIndexes() { public void dropIndex(final String indexName) { database.checkPermissionsOnDatabase(SecurityDatabaseUser.DATABASE_ACCESS.UPDATE_SCHEMA); - final IndexInternal index = indexMap.remove(indexName); - if (index == null) - return; - - index.drop(); + recordFileChanges(() -> { + multipleUpdate = true; + try { + final IndexInternal index = indexMap.remove(indexName); + if (index == null) + return null; - saveConfiguration(); + index.drop(); + } catch (Exception e) { + throw new SchemaException("Cannot drop the index '" + indexName + "' (error=" + e + ")", e); + } + return null; + }); } @Override @@ -432,18 +406,18 @@ public Index getIndexByName(final String indexName) { @Override public TypeIndex createTypeIndex(final INDEX_TYPE indexType, final boolean unique, final String typeName, final String... propertyNames) { - return createTypeIndex(indexType, unique, typeName, propertyNames, LSMTreeIndexAbstract.DEF_PAGE_SIZE, null); + return createTypeIndex(indexType, unique, typeName, propertyNames, LSMTreeIndexAbstract.DEF_PAGE_SIZE, LSMTreeIndexAbstract.NULL_STRATEGY.SKIP, null); } @Override public TypeIndex createTypeIndex(final INDEX_TYPE indexType, final boolean unique, final String typeName, final String[] propertyNames, final int pageSize) { - return createTypeIndex(indexType, unique, typeName, propertyNames, pageSize, LSMTreeIndexAbstract.NULL_STRATEGY.ERROR, null); + return createTypeIndex(indexType, unique, typeName, propertyNames, pageSize, LSMTreeIndexAbstract.NULL_STRATEGY.SKIP, null); } @Override public TypeIndex createTypeIndex(final INDEX_TYPE indexType, final boolean unique, final String typeName, final String[] propertyNames, final int pageSize, final Index.BuildIndexCallback callback) { - return createTypeIndex(indexType, unique, typeName, propertyNames, pageSize, LSMTreeIndexAbstract.NULL_STRATEGY.ERROR, callback); + return createTypeIndex(indexType, unique, typeName, propertyNames, pageSize, LSMTreeIndexAbstract.NULL_STRATEGY.SKIP, callback); } @Override @@ -476,7 +450,7 @@ public TypeIndex createTypeIndex(final INDEX_TYPE indexType, final boolean uniqu final List buckets = type.getBuckets(true); final Index[] indexes = new Index[buckets.size()]; - return recordFileChanges(() -> { + recordFileChanges(() -> { database.transaction(() -> { try { @@ -490,7 +464,6 @@ public TypeIndex createTypeIndex(final INDEX_TYPE indexType, final boolean uniqu } catch (IOException e) { throw new SchemaException("Cannot create index on type '" + typeName + "' (error=" + e + ")", e); } - }, false, 1, null, (error) -> { for (int j = 0; j < indexes.length; j++) { final IndexInternal indexToRemove = (IndexInternal) indexes[j]; @@ -498,9 +471,10 @@ public TypeIndex createTypeIndex(final INDEX_TYPE indexType, final boolean uniqu indexToRemove.drop(); } }); - - return type.getPolymorphicIndexByProperties(propertyNames); + return null; }); + + return type.getPolymorphicIndexByProperties(propertyNames); } @Override @@ -606,7 +580,6 @@ private Index createBucketIndex(final DocumentType type, final byte[] keyTypes, throw new DatabaseMetadataException("Cannot create index '" + indexName + "' on type '" + typeName + "' because it already exists"); return recordFileChanges(() -> { - final IndexInternal index = indexFactory.createIndex(indexType.name(), database, indexName, unique, databasePath + "/" + indexName, PaginatedFile.MODE.READ_WRITE, keyTypes, pageSize, nullStrategy, callback); @@ -731,7 +704,7 @@ public void dropType(final String typeName) { sub.addParentType(parent); } - final List buckets = type.getBuckets(false); + final List buckets = new ArrayList<>(type.getBuckets(false)); final Set bucketIds = new HashSet<>(buckets.size()); for (Bucket b : buckets) bucketIds.add(b.getId()); @@ -741,8 +714,10 @@ public void dropType(final String typeName) { dropIndex(m.getName()); // DELETE ALL ASSOCIATED BUCKETS - for (Bucket b : buckets) + for (Bucket b : buckets) { + type.removeBucket(b); dropBucket(b.getName()); + } if (type instanceof VertexType) database.getGraphEngine().dropVertexType(database, (VertexType) type); @@ -765,6 +740,12 @@ public void dropBucket(final String bucketName) { final Bucket bucket = getBucketByName(bucketName); recordFileChanges(() -> { + for (DocumentType type : types.values()) { + if (type.buckets.contains(bucket)) + throw new SchemaException( + "Error on dropping bucket '" + bucketName + "' because it is assigned to type '" + type.getName() + "'. Remove the association first"); + } + database.getPageManager().deleteFile(bucket.getId()); try { database.getFileManager().dropFile(bucket.getId()); @@ -1306,6 +1287,12 @@ public void registerFile(final PaginatedComponent file) { files.set(fileId, file); } + public void initComponents() { + for (PaginatedComponent f : files) + if (f != null) + f.onAfterLoad(); + } + public boolean isDirty() { return dirtyConfiguration; } diff --git a/server/src/main/java/com/arcadedb/server/ha/Replica2LeaderNetworkExecutor.java b/server/src/main/java/com/arcadedb/server/ha/Replica2LeaderNetworkExecutor.java index 7340a5b0f8..86e9453faf 100755 --- a/server/src/main/java/com/arcadedb/server/ha/Replica2LeaderNetworkExecutor.java +++ b/server/src/main/java/com/arcadedb/server/ha/Replica2LeaderNetworkExecutor.java @@ -435,7 +435,7 @@ private void installDatabase(final Binary buffer, final String db, final Databas // RELOAD THE SCHEMA database.getSchema().getEmbedded().close(); DatabaseContext.INSTANCE.init(database); - database.getSchema().getEmbedded().load(PaginatedFile.MODE.READ_ONLY); + database.getSchema().getEmbedded().load(PaginatedFile.MODE.READ_WRITE, true); } private void installFile(final Binary buffer, final String db, final DatabaseInternal database, final int fileId, final String fileName) throws IOException { diff --git a/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java b/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java index 2b5d68d78e..7215ddecf0 100644 --- a/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java +++ b/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java @@ -159,7 +159,17 @@ public void replicateTx(final TransactionContext tx, final Pair 1), reqQuorum, timeout); + final TxRequest req = new TxRequest(getName(), bufferChanges, reqQuorum > 1); + + final DatabaseChangeStructureRequest changeStructureRequest = getChangeStructure(-1); + if (changeStructureRequest != null) { + // RESET STRUCTURE CHANGES FROM THIS POINT ONWARDS + proxied.getFileManager().stopRecordingChanges(); + proxied.getFileManager().startRecordingChanges(); + req.changeStructure = changeStructureRequest; + } + + server.getHA().sendCommandToReplicasWithQuorum(req, reqQuorum, timeout); // COMMIT 2ND PHASE ONLY IF THE QUORUM HAS BEEN REACHED tx.commit2ndPhase(changes); @@ -650,26 +660,7 @@ public RET recordFileChanges(final Callable callback) { try { result.set(callback.call()); - final List fileChanges = proxied.getFileManager().getRecordedChanges(); - - if (fileChanges.isEmpty() &&// - !proxied.getSchema().getEmbedded().isDirty() && // - proxied.getSchema().getEmbedded().getVersion() == schemaVersionBefore) - // NO CHANGES - return null; - - final Map addFiles = new HashMap<>(); - final Map removeFiles = new HashMap<>(); - for (FileManager.FileChange c : fileChanges) { - if (c.create) - addFiles.put(c.fileId, c.fileName); - else - removeFiles.put(c.fileId, c.fileName); - } - - final String schemaJson = proxied.getSchema().getEmbedded().serializeConfiguration().toString(); - - return new DatabaseChangeStructureRequest(proxied.getName(), schemaJson, addFiles, removeFiles); + return getChangeStructure(schemaVersionBefore); } finally { proxied.getFileManager().stopRecordingChanges(); @@ -684,4 +675,28 @@ public RET recordFileChanges(final Callable callback) { return (RET) result.get(); } + + private DatabaseChangeStructureRequest getChangeStructure(final long schemaVersionBefore) { + final List fileChanges = proxied.getFileManager().getRecordedChanges(); + + if (fileChanges == null ||// + (fileChanges.isEmpty() &&// + !proxied.getSchema().getEmbedded().isDirty() && // + schemaVersionBefore > -1 && proxied.getSchema().getEmbedded().getVersion() == schemaVersionBefore)) + // NO CHANGES + return null; + + final Map addFiles = new HashMap<>(); + final Map removeFiles = new HashMap<>(); + for (FileManager.FileChange c : fileChanges) { + if (c.create) + addFiles.put(c.fileId, c.fileName); + else + removeFiles.put(c.fileId, c.fileName); + } + + final String schemaJson = proxied.getSchema().getEmbedded().serializeConfiguration().toString(); + + return new DatabaseChangeStructureRequest(proxied.getName(), schemaJson, addFiles, removeFiles); + } } diff --git a/server/src/main/java/com/arcadedb/server/ha/message/DatabaseChangeStructureRequest.java b/server/src/main/java/com/arcadedb/server/ha/message/DatabaseChangeStructureRequest.java index 895eb1dbf6..4ef5f056ed 100755 --- a/server/src/main/java/com/arcadedb/server/ha/message/DatabaseChangeStructureRequest.java +++ b/server/src/main/java/com/arcadedb/server/ha/message/DatabaseChangeStructureRequest.java @@ -18,6 +18,7 @@ import com.arcadedb.database.Binary; import com.arcadedb.database.DatabaseFactory; import com.arcadedb.database.DatabaseInternal; +import com.arcadedb.engine.PaginatedFile; import com.arcadedb.log.LogManager; import com.arcadedb.schema.EmbeddedSchema; import com.arcadedb.server.ArcadeDBServer; @@ -113,25 +114,10 @@ public HACommand execute(final HAServer server, final String remoteServerName, f try { final DatabaseInternal db = (DatabaseInternal) server.getServer().getDatabase(databaseName); - final String databasePath = db.getDatabasePath(); - - // ADD FILES - for (Map.Entry entry : filesToAdd.entrySet()) { - db.getFileManager().getOrCreateFile(entry.getKey(), databasePath + "/" + entry.getValue()); - } - - // REMOVE FILES - for (Map.Entry entry : filesToRemove.entrySet()) { - db.getFileManager().dropFile(entry.getKey()); - } - - // REPLACE SCHEMA FILE - final File file = new File(db.getDatabasePath() + "/" + EmbeddedSchema.SCHEMA_FILE_NAME); - FileUtils.writeContentToStream(file, schemaJson.getBytes(DatabaseFactory.getDefaultCharset())); + updateFiles(db); // RELOAD SCHEMA - db.getSchema().getEmbedded().loadChanges(); - + db.getSchema().getEmbedded().load(PaginatedFile.MODE.READ_WRITE, true); return new DatabaseChangeStructureResponse(); } catch (Exception e) { @@ -140,6 +126,24 @@ public HACommand execute(final HAServer server, final String remoteServerName, f } } + public void updateFiles(final DatabaseInternal db) throws IOException { + final String databasePath = db.getDatabasePath(); + + // ADD FILES + for (Map.Entry entry : filesToAdd.entrySet()) { + db.getFileManager().getOrCreateFile(entry.getKey(), databasePath + "/" + entry.getValue()); + } + + // REMOVE FILES + for (Map.Entry entry : filesToRemove.entrySet()) { + db.getFileManager().dropFile(entry.getKey()); + } + + // REPLACE SCHEMA FILE + final File file = new File(db.getDatabasePath() + "/" + EmbeddedSchema.SCHEMA_FILE_NAME); + FileUtils.writeContentToStream(file, schemaJson.getBytes(DatabaseFactory.getDefaultCharset())); + } + @Override public String toString() { return "dbchangestructure add=" + filesToAdd + " remote=" + filesToRemove; diff --git a/server/src/main/java/com/arcadedb/server/ha/message/TxRequest.java b/server/src/main/java/com/arcadedb/server/ha/message/TxRequest.java index b60ca9f198..002aed9e22 100755 --- a/server/src/main/java/com/arcadedb/server/ha/message/TxRequest.java +++ b/server/src/main/java/com/arcadedb/server/ha/message/TxRequest.java @@ -17,20 +17,23 @@ import com.arcadedb.database.Binary; import com.arcadedb.database.DatabaseInternal; +import com.arcadedb.engine.PaginatedFile; import com.arcadedb.engine.WALException; import com.arcadedb.engine.WALFile; +import com.arcadedb.log.LogManager; import com.arcadedb.server.ArcadeDBServer; import com.arcadedb.server.ha.HAServer; import com.arcadedb.server.ha.ReplicationException; -import java.nio.channels.ClosedChannelException; -import java.util.logging.Level; +import java.nio.channels.*; +import java.util.logging.*; /** * Replicate a transaction. No response is expected. */ public class TxRequest extends TxRequestAbstract { - private boolean waitForResponse; + private boolean waitForResponse; + public DatabaseChangeStructureRequest changeStructure; public TxRequest() { } @@ -41,14 +44,25 @@ public TxRequest(final String dbName, final Binary bufferChanges, final boolean } @Override - public void toStream(Binary stream) { + public void toStream(final Binary stream) { stream.putByte((byte) (waitForResponse ? 1 : 0)); + + if (changeStructure != null) { + stream.putByte((byte) 1); + changeStructure.toStream(stream); + } else + stream.putByte((byte) 0); + super.toStream(stream); } @Override - public void fromStream(ArcadeDBServer server, Binary stream) { + public void fromStream(final ArcadeDBServer server, final Binary stream) { waitForResponse = stream.getByte() == 1; + if (stream.getByte() == 1) { + changeStructure = new DatabaseChangeStructureRequest(); + changeStructure.fromStream(server, stream); + } super.fromStream(server, stream); } @@ -58,6 +72,18 @@ public HACommand execute(final HAServer server, final String remoteServerName, f if (!db.isOpen()) throw new ReplicationException("Database '" + databaseName + "' is closed"); + if (changeStructure != null) + try { + // APPLY CHANGE OF STRUCTURE FIRST + changeStructure.updateFiles(db); + + // RELOAD THE SCHEMA BUT NOT INITIALIZE THE COMPONENTS (SOME NEW PAGES COULD BE IN THE TX ITSELF) + db.getSchema().getEmbedded().load(PaginatedFile.MODE.READ_WRITE, false); + } catch (Exception e) { + LogManager.instance().log(this, Level.SEVERE, "Error on changing database structure request from the leader node", e); + throw new ReplicationException("Error on changing database structure request from the leader node", e); + } + final WALFile.WALTransaction walTx = readTxFromBuffer(); try { @@ -74,6 +100,10 @@ public HACommand execute(final HAServer server, final String remoteServerName, f throw e; } + if (changeStructure != null) + // INITIALIZE THE COMPONENTS (SOME NEW PAGES COULD BE IN THE TX ITSELF) + db.getSchema().getEmbedded().initComponents(); + if (waitForResponse) return new TxResponse(); diff --git a/server/src/test/java/com/arcadedb/server/ha/ReplicationChangeSchema.java b/server/src/test/java/com/arcadedb/server/ha/ReplicationChangeSchema.java index 49f500bccb..f60ee3e607 100644 --- a/server/src/test/java/com/arcadedb/server/ha/ReplicationChangeSchema.java +++ b/server/src/test/java/com/arcadedb/server/ha/ReplicationChangeSchema.java @@ -16,7 +16,12 @@ package com.arcadedb.server.ha; import com.arcadedb.database.Database; +import com.arcadedb.engine.Bucket; import com.arcadedb.exception.SchemaException; +import com.arcadedb.exception.TransactionException; +import com.arcadedb.index.Index; +import com.arcadedb.schema.Property; +import com.arcadedb.schema.Schema; import com.arcadedb.schema.Type; import com.arcadedb.schema.VertexType; import org.junit.jupiter.api.Assertions; @@ -27,15 +32,30 @@ public class ReplicationChangeSchema extends ReplicationServerIT { public void testReplication() { super.testReplication(); + // CREATE NEW TYPE final Database database0 = getServerDatabase(0, getDatabaseName()); - final VertexType type0 = database0.getSchema().createVertexType("RuntimeVertex0"); - type0.createProperty("nameNotFoundInDictionary", Type.STRING); - final Database database1 = getServerDatabase(1, getDatabaseName()); + final VertexType type0 = database0.getSchema().createVertexType("RuntimeVertex0"); + Assertions.assertNotNull(database0.getSchema().getType("RuntimeVertex0")); Assertions.assertNotNull(database1.getSchema().getType("RuntimeVertex0")); + + // CREATE NEW PROPERTY + type0.createProperty("nameNotFoundInDictionary", Type.STRING); + Assertions.assertNotNull(database0.getSchema().getType("RuntimeVertex0").getProperty("nameNotFoundInDictionary")); Assertions.assertNotNull(database1.getSchema().getType("RuntimeVertex0").getProperty("nameNotFoundInDictionary")); + // CREATE NEW BUCKET + final Bucket newBucket = database0.getSchema().createBucket("newBucket"); + + Assertions.assertTrue(database0.getSchema().existsBucket("newBucket")); + Assertions.assertTrue(database1.getSchema().existsBucket("newBucket")); + + type0.addBucket(newBucket); + Assertions.assertTrue(database0.getSchema().getType("RuntimeVertex0").hasBucket("newBucket")); + Assertions.assertTrue(database0.getSchema().getType("RuntimeVertex0").hasBucket("newBucket")); + + // CHANGE SCHEMA FROM A REPLICA (ERROR EXPECTED) try { database1.getSchema().createVertexType("RuntimeVertex1"); Assertions.fail(); @@ -45,6 +65,63 @@ public void testReplication() { Assertions.assertFalse(database0.getSchema().existsType("RuntimeVertex1")); Assertions.assertFalse(database1.getSchema().existsType("RuntimeVertex1")); + + // DROP PROPERTY + type0.dropProperty("nameNotFoundInDictionary"); + Assertions.assertFalse(database0.getSchema().getType("RuntimeVertex0").existsProperty("nameNotFoundInDictionary")); + Assertions.assertFalse(database1.getSchema().getType("RuntimeVertex0").existsProperty("nameNotFoundInDictionary")); + + // DROP NEW BUCKET + try { + database0.getSchema().dropBucket("newBucket"); + } catch (SchemaException e) { + // EXPECTED + } + + database0.getSchema().getType("RuntimeVertex0").removeBucket(database0.getSchema().getBucketByName("newBucket")); + Assertions.assertFalse(database0.getSchema().getType("RuntimeVertex0").hasBucket("newBucket")); + Assertions.assertFalse(database0.getSchema().getType("RuntimeVertex0").hasBucket("newBucket")); + + database0.getSchema().dropBucket("newBucket"); + Assertions.assertFalse(database0.getSchema().existsBucket("newBucket")); + Assertions.assertFalse(database1.getSchema().existsBucket("newBucket")); + + // DROP TYPE + database0.getSchema().dropType("RuntimeVertex0"); + Assertions.assertFalse(database0.getSchema().existsType("RuntimeVertex0")); + Assertions.assertFalse(database1.getSchema().existsType("RuntimeVertex0")); + + final VertexType indexedType = database0.getSchema().createVertexType("IndexedVertex0"); + Assertions.assertNotNull(database0.getSchema().getType("IndexedVertex0")); + Assertions.assertNotNull(database1.getSchema().getType("IndexedVertex0")); + + // CREATE NEW PROPERTY + final Property indexedProperty = indexedType.createProperty("propertyIndexed", Type.INTEGER); + Assertions.assertNotNull(database0.getSchema().getType("IndexedVertex0").getProperty("propertyIndexed")); + Assertions.assertNotNull(database1.getSchema().getType("IndexedVertex0").getProperty("propertyIndexed")); + + final Index idx = indexedProperty.createIndex(Schema.INDEX_TYPE.LSM_TREE, true); + Assertions.assertEquals(1, database0.getSchema().getType("IndexedVertex0").getAllIndexes(true).size()); + Assertions.assertEquals(1, database1.getSchema().getType("IndexedVertex0").getAllIndexes(true).size()); + + for (int i = 0; i < 10; i++) + database0.newVertex("IndexedVertex0").set("propertyIndexed", i).save(); + + database0.commit(); + + for (int i = 0; i < 10; i++) + database1.newVertex("IndexedVertex0").set("propertyIndexed", i).save(); + + try { + database1.commit(); + Assertions.fail(); + } catch (TransactionException e) { + // EXPECTED + } + + database0.getSchema().dropIndex(idx.getName()); + Assertions.assertEquals(0, database0.getSchema().getType("IndexedVertex0").getAllIndexes(true).size()); + Assertions.assertEquals(0, database1.getSchema().getType("IndexedVertex0").getAllIndexes(true).size()); } protected int getServerCount() {