diff --git a/engine/src/main/java/com/arcadedb/database/DatabaseInternal.java b/engine/src/main/java/com/arcadedb/database/DatabaseInternal.java index 79482396db..f947f81862 100644 --- a/engine/src/main/java/com/arcadedb/database/DatabaseInternal.java +++ b/engine/src/main/java/com/arcadedb/database/DatabaseInternal.java @@ -100,4 +100,6 @@ enum CALLBACK_EVENT { ExecutionPlanCache getExecutionPlanCache(); int getEdgeListSize(int previousSize); + + RET recordFileChanges(final Callable callback); } diff --git a/engine/src/main/java/com/arcadedb/database/EmbeddedDatabase.java b/engine/src/main/java/com/arcadedb/database/EmbeddedDatabase.java index 03f2164040..94b8a3d2e7 100644 --- a/engine/src/main/java/com/arcadedb/database/EmbeddedDatabase.java +++ b/engine/src/main/java/com/arcadedb/database/EmbeddedDatabase.java @@ -1397,6 +1397,11 @@ public RET executeInWriteLock(final Callable callable) { } } + @Override + public RET recordFileChanges(final Callable callback) { + return (RET) executeInWriteLock(callback); + } + @Override public StatementCache getStatementCache() { return statementCache; diff --git a/engine/src/main/java/com/arcadedb/schema/DocumentType.java b/engine/src/main/java/com/arcadedb/schema/DocumentType.java index fb56deb217..721b3fef2a 100644 --- a/engine/src/main/java/com/arcadedb/schema/DocumentType.java +++ b/engine/src/main/java/com/arcadedb/schema/DocumentType.java @@ -26,6 +26,7 @@ import com.arcadedb.index.lsm.LSMTreeIndexAbstract; import java.util.*; +import java.util.concurrent.*; public class DocumentType { protected final EmbeddedSchema schema; @@ -65,9 +66,11 @@ public DocumentType addParentType(final DocumentType parent) { if (allProperties.contains(p)) throw new IllegalArgumentException("Property '" + p + "' is already defined in type '" + name + "' or any parent types"); - parentTypes.add(parent); - parent.subTypes.add(this); - schema.saveConfiguration(); + recordFileChanges(() -> { + parentTypes.add(parent); + parent.subTypes.add(this); + return null; + }); return this; } @@ -76,12 +79,14 @@ public void removeParentType(final String parentName) { } public void removeParentType(final DocumentType parent) { - if (!parentTypes.remove(parent)) - // ALREADY REMOVED PARENT - return; + recordFileChanges(() -> { + if (!parentTypes.remove(parent)) + // ALREADY REMOVED PARENT + return null; - parent.subTypes.remove(this); - schema.saveConfiguration(); + parent.subTypes.remove(this); + return null; + }); } public boolean instanceOf(final String type) { @@ -132,10 +137,10 @@ public Property createProperty(final String propertyName, final Type propertyTyp final Property property = new Property(this, propertyName, propertyType); - properties.put(propertyName, property); - - schema.saveConfiguration(); - + recordFileChanges(() -> { + properties.put(propertyName, property); + return null; + }); return property; } @@ -160,8 +165,10 @@ public Property getOrCreateProperty(final String propertyName, final Type proper } public void dropProperty(final String propertyName) { - properties.remove(propertyName); - schema.saveConfiguration(); + recordFileChanges(() -> { + properties.remove(propertyName); + return null; + }); } public Index createTypeIndex(final EmbeddedSchema.INDEX_TYPE indexType, final boolean unique, final String... propertyNames) { @@ -211,14 +218,18 @@ public List getBuckets(final boolean polymorphic) { } public DocumentType addBucket(final Bucket bucket) { - addBucketInternal(bucket); - schema.saveConfiguration(); + recordFileChanges(() -> { + addBucketInternal(bucket); + return null; + }); return this; } public DocumentType removeBucket(final Bucket bucket) { - addBucketInternal(bucket); - schema.saveConfiguration(); + recordFileChanges(() -> { + addBucketInternal(bucket); + return null; + }); return this; } @@ -468,13 +479,12 @@ public boolean isTheSameAs(final Object o) { } @Override - public boolean equals(Object o) { + public boolean equals(final Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - DocumentType that = (DocumentType) o; - + final DocumentType that = (DocumentType) o; return name.equals(that.name); } @@ -583,4 +593,9 @@ public boolean isSubTypeOf(final String type) { } return false; } + + protected void recordFileChanges(final Callable callback) { + schema.recordFileChanges(callback); + schema.saveConfiguration(); + } } diff --git a/engine/src/main/java/com/arcadedb/schema/EmbeddedSchema.java b/engine/src/main/java/com/arcadedb/schema/EmbeddedSchema.java index 2979ef7e37..2a69ef3f38 100644 --- a/engine/src/main/java/com/arcadedb/schema/EmbeddedSchema.java +++ b/engine/src/main/java/com/arcadedb/schema/EmbeddedSchema.java @@ -29,6 +29,7 @@ import com.arcadedb.engine.PaginatedFile; import com.arcadedb.exception.ConfigurationException; import com.arcadedb.exception.DatabaseMetadataException; +import com.arcadedb.exception.DatabaseOperationException; import com.arcadedb.exception.SchemaException; import com.arcadedb.index.Index; import com.arcadedb.index.IndexFactory; @@ -291,22 +292,19 @@ public Bucket createBucket(final String bucketName) { public Bucket createBucket(final String bucketName, final int pageSize) { database.checkPermissionsOnDatabase(SecurityDatabaseUser.DATABASE_ACCESS.UPDATE_SCHEMA); - return (Bucket) database.executeInWriteLock(new Callable() { - @Override - public Object call() { - if (bucketMap.containsKey(bucketName)) - throw new SchemaException("Cannot create bucket '" + bucketName + "' because already exists"); + if (bucketMap.containsKey(bucketName)) + throw new SchemaException("Cannot create bucket '" + bucketName + "' because already exists"); - try { - final Bucket bucket = new Bucket(database, bucketName, databasePath + "/" + bucketName, PaginatedFile.MODE.READ_WRITE, pageSize); - registerFile(bucket); - bucketMap.put(bucketName, bucket); + return recordFileChanges(() -> { + try { + final Bucket bucket = new Bucket(database, bucketName, databasePath + "/" + bucketName, PaginatedFile.MODE.READ_WRITE, pageSize); + registerFile(bucket); + bucketMap.put(bucketName, bucket); - return bucket; + return bucket; - } catch (IOException e) { - throw new SchemaException("Cannot create bucket '" + bucketName + "' (error=" + e + ")", e); - } + } catch (IOException e) { + throw new SchemaException("Cannot create bucket '" + bucketName + "' (error=" + e + ")", e); } }); } @@ -455,56 +453,52 @@ public TypeIndex createTypeIndex(final INDEX_TYPE indexType, final boolean uniqu if (propertyNames.length == 0) throw new DatabaseMetadataException("Cannot create index on type '" + typeName + "' because there are no property defined"); - return (TypeIndex) database.executeInWriteLock(new Callable() { - @Override - public Object call() { - final DocumentType type = getType(typeName); + final DocumentType type = getType(typeName); - final TypeIndex index = type.getPolymorphicIndexByProperties(propertyNames); - if (index != null) - throw new IllegalArgumentException( - "Found the existent index '" + index.getName() + "' defined on the properties '" + Arrays.asList(propertyNames) + "' for type '" + typeName - + "'"); + final TypeIndex index = type.getPolymorphicIndexByProperties(propertyNames); + if (index != null) + throw new IllegalArgumentException( + "Found the existent index '" + index.getName() + "' defined on the properties '" + Arrays.asList(propertyNames) + "' for type '" + typeName + "'"); - // CHECK ALL THE PROPERTIES EXIST - final byte[] keyTypes = new byte[propertyNames.length]; - int i = 0; + // CHECK ALL THE PROPERTIES EXIST + final byte[] keyTypes = new byte[propertyNames.length]; + int i = 0; - for (String propertyName : propertyNames) { - final Property property = type.getPolymorphicPropertyIfExists(propertyName); - if (property == null) - throw new SchemaException("Cannot create the index on type '" + typeName + "." + propertyName + "' because the property does not exist"); + for (String propertyName : propertyNames) { + final Property property = type.getPolymorphicPropertyIfExists(propertyName); + if (property == null) + throw new SchemaException("Cannot create the index on type '" + typeName + "." + propertyName + "' because the property does not exist"); - keyTypes[i++] = property.getType().getBinaryType(); - } + keyTypes[i++] = property.getType().getBinaryType(); + } - final List buckets = type.getBuckets(true); - final Index[] indexes = new Index[buckets.size()]; + final List buckets = type.getBuckets(true); + final Index[] indexes = new Index[buckets.size()]; - database.transaction(() -> { + return recordFileChanges(() -> { + database.transaction(() -> { - try { - for (int idx = 0; idx < buckets.size(); ++idx) { - final Bucket bucket = buckets.get(idx); - indexes[idx] = createBucketIndex(type, keyTypes, bucket, typeName, indexType, unique, pageSize, nullStrategy, callback, propertyNames); - } + try { + for (int idx = 0; idx < buckets.size(); ++idx) { + final Bucket bucket = buckets.get(idx); + indexes[idx] = createBucketIndex(type, keyTypes, bucket, typeName, indexType, unique, pageSize, nullStrategy, callback, propertyNames); + } - saveConfiguration(); + saveConfiguration(); - } catch (IOException e) { - throw new SchemaException("Cannot create index on type '" + typeName + "' (error=" + e + ")", e); - } + } catch (IOException e) { + throw new SchemaException("Cannot create index on type '" + typeName + "' (error=" + e + ")", e); + } - }, false, 1, null, (error) -> { - for (int j = 0; j < indexes.length; j++) { - final IndexInternal indexToRemove = (IndexInternal) indexes[j]; - if (indexToRemove != null) - indexToRemove.drop(); - } - }); + }, false, 1, null, (error) -> { + for (int j = 0; j < indexes.length; j++) { + final IndexInternal indexToRemove = (IndexInternal) indexes[j]; + if (indexToRemove != null) + indexToRemove.drop(); + } + }); - return type.getPolymorphicIndexByProperties(propertyNames); - } + return type.getPolymorphicIndexByProperties(propertyNames); }); } @@ -550,54 +544,50 @@ public Index createBucketIndex(final INDEX_TYPE indexType, final boolean unique, if (propertyNames.length == 0) throw new DatabaseMetadataException("Cannot create index on type '" + typeName + "' because there are no property defined"); - return (Index) database.executeInWriteLock(new Callable() { - @Override - public Object call() { - - final DocumentType type = getType(typeName); + final DocumentType type = getType(typeName); - // CHECK ALL THE PROPERTIES EXIST - final byte[] keyTypes = new byte[propertyNames.length]; - int i = 0; + // CHECK ALL THE PROPERTIES EXIST + final byte[] keyTypes = new byte[propertyNames.length]; + int i = 0; - for (String propertyName : propertyNames) { - final Property property = type.getPolymorphicPropertyIfExists(propertyName); - if (property == null) - throw new SchemaException("Cannot create the index on type '" + typeName + "." + propertyName + "' because the property does not exist"); + for (String propertyName : propertyNames) { + final Property property = type.getPolymorphicPropertyIfExists(propertyName); + if (property == null) + throw new SchemaException("Cannot create the index on type '" + typeName + "." + propertyName + "' because the property does not exist"); - keyTypes[i++] = property.getType().getBinaryType(); - } + keyTypes[i++] = property.getType().getBinaryType(); + } - final AtomicReference result = new AtomicReference<>(); - database.transaction(() -> { + return recordFileChanges(() -> { + final AtomicReference result = new AtomicReference<>(); + database.transaction(() -> { - Bucket bucket = null; - final List buckets = type.getBuckets(false); - for (Bucket b : buckets) { - if (bucketName.equals(b.getName())) { - bucket = b; - break; - } + Bucket bucket = null; + final List buckets = type.getBuckets(false); + for (Bucket b : buckets) { + if (bucketName.equals(b.getName())) { + bucket = b; + break; } + } - try { - final Index index = createBucketIndex(type, keyTypes, bucket, typeName, indexType, unique, pageSize, nullStrategy, callback, propertyNames); - result.set(index); + try { + final Index index = createBucketIndex(type, keyTypes, bucket, typeName, indexType, unique, pageSize, nullStrategy, callback, propertyNames); + result.set(index); - saveConfiguration(); + saveConfiguration(); - } catch (IOException e) { - throw new SchemaException("Cannot create index on type '" + typeName + "' (error=" + e + ")", e); - } - }, false, 1, null, (error) -> { - final Index indexToRemove = result.get(); - if (indexToRemove != null) { - ((IndexInternal) indexToRemove).drop(); - } - }); + } catch (IOException e) { + throw new SchemaException("Cannot create index on type '" + typeName + "' (error=" + e + ")", e); + } + }, false, 1, null, (error) -> { + final Index indexToRemove = result.get(); + if (indexToRemove != null) { + ((IndexInternal) indexToRemove).drop(); + } + }); - return result.get(); - } + return result.get(); }); } @@ -614,55 +604,54 @@ private Index createBucketIndex(final DocumentType type, final byte[] keyTypes, if (indexMap.containsKey(indexName)) throw new DatabaseMetadataException("Cannot create index '" + indexName + "' on type '" + typeName + "' because it already exists"); - final IndexInternal index = indexFactory.createIndex(indexType.name(), database, indexName, unique, databasePath + "/" + indexName, - PaginatedFile.MODE.READ_WRITE, keyTypes, pageSize, nullStrategy, callback); + return recordFileChanges(() -> { - registerFile(index.getPaginatedComponent()); + final IndexInternal index = indexFactory.createIndex(indexType.name(), database, indexName, unique, databasePath + "/" + indexName, + PaginatedFile.MODE.READ_WRITE, keyTypes, pageSize, nullStrategy, callback); - indexMap.put(indexName, index); + registerFile(index.getPaginatedComponent()); - type.addIndexInternal(index, bucket.getId(), propertyNames); - index.build(callback); + indexMap.put(indexName, index); - return index; + type.addIndexInternal(index, bucket.getId(), propertyNames); + index.build(callback); + return index; + }); } public Index createManualIndex(final INDEX_TYPE indexType, final boolean unique, final String indexName, final byte[] keyTypes, final int pageSize, final LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy) { database.checkPermissionsOnDatabase(SecurityDatabaseUser.DATABASE_ACCESS.UPDATE_SCHEMA); - return (Index) database.executeInWriteLock(new Callable() { - @Override - public Object call() { - if (indexMap.containsKey(indexName)) - throw new SchemaException("Cannot create index '" + indexName + "' because already exists"); + if (indexMap.containsKey(indexName)) + throw new SchemaException("Cannot create index '" + indexName + "' because already exists"); - final AtomicReference result = new AtomicReference<>(); - database.transaction(() -> { + return recordFileChanges(() -> { + final AtomicReference result = new AtomicReference<>(); + database.transaction(() -> { - try { - final IndexInternal index = indexFactory.createIndex(indexType.name(), database, FileUtils.encode(indexName, ENCODING), unique, - databasePath + "/" + indexName, PaginatedFile.MODE.READ_WRITE, keyTypes, pageSize, nullStrategy, null); + try { + final IndexInternal index = indexFactory.createIndex(indexType.name(), database, FileUtils.encode(indexName, ENCODING), unique, + databasePath + "/" + indexName, PaginatedFile.MODE.READ_WRITE, keyTypes, pageSize, nullStrategy, null); - result.set(index); + result.set(index); - if (index instanceof PaginatedComponent) - registerFile((PaginatedComponent) index); + if (index instanceof PaginatedComponent) + registerFile((PaginatedComponent) index); - indexMap.put(indexName, index); + indexMap.put(indexName, index); - } catch (IOException e) { - throw new SchemaException("Cannot create index '" + indexName + "' (error=" + e + ")", e); - } - }, false, 1, null, (error) -> { - final IndexInternal indexToRemove = result.get(); - if (indexToRemove != null) { - indexToRemove.drop(); - } - }); + } catch (IOException e) { + throw new SchemaException("Cannot create index '" + indexName + "' (error=" + e + ")", e); + } + }, false, 1, null, (error) -> { + final IndexInternal indexToRemove = result.get(); + if (indexToRemove != null) { + indexToRemove.drop(); + } + }); - return result.get(); - } + return result.get(); }); } @@ -727,47 +716,44 @@ public boolean existsType(final String typeName) { public void dropType(final String typeName) { database.checkPermissionsOnDatabase(SecurityDatabaseUser.DATABASE_ACCESS.UPDATE_SCHEMA); - database.executeInWriteLock(new Callable() { - @Override - public Object call() { - multipleUpdate = true; - try { - final DocumentType type = database.getSchema().getType(typeName); + recordFileChanges(() -> { + multipleUpdate = true; + try { + final DocumentType type = database.getSchema().getType(typeName); - // CHECK INHERITANCE TREE AND ATTACH SUB-TYPES DIRECTLY TO THE PARENT TYPE + // CHECK INHERITANCE TREE AND ATTACH SUB-TYPES DIRECTLY TO THE PARENT TYPE + for (DocumentType parent : type.parentTypes) + parent.subTypes.remove(type); + for (DocumentType sub : type.subTypes) { + sub.parentTypes.remove(type); for (DocumentType parent : type.parentTypes) - parent.subTypes.remove(type); - for (DocumentType sub : type.subTypes) { - sub.parentTypes.remove(type); - for (DocumentType parent : type.parentTypes) - sub.addParentType(parent); - } + sub.addParentType(parent); + } - final List buckets = type.getBuckets(false); - final Set bucketIds = new HashSet<>(buckets.size()); - for (Bucket b : buckets) - bucketIds.add(b.getId()); + final List buckets = type.getBuckets(false); + final Set bucketIds = new HashSet<>(buckets.size()); + for (Bucket b : buckets) + bucketIds.add(b.getId()); - // DELETE ALL ASSOCIATED INDEXES - for (Index m : type.getAllIndexes(true)) - dropIndex(m.getName()); + // DELETE ALL ASSOCIATED INDEXES + for (Index m : type.getAllIndexes(true)) + dropIndex(m.getName()); - // DELETE ALL ASSOCIATED BUCKETS - for (Bucket b : buckets) - dropBucket(b.getName()); + // DELETE ALL ASSOCIATED BUCKETS + for (Bucket b : buckets) + dropBucket(b.getName()); - if (type instanceof VertexType) - database.getGraphEngine().dropVertexType(database, (VertexType) type); + if (type instanceof VertexType) + database.getGraphEngine().dropVertexType(database, (VertexType) type); - if (types.remove(typeName) == null) - throw new SchemaException("Type '" + typeName + "' not found"); - } finally { - multipleUpdate = false; - saveConfiguration(); - updateSecurity(); - } - return null; + if (types.remove(typeName) == null) + throw new SchemaException("Type '" + typeName + "' not found"); + } finally { + multipleUpdate = false; + saveConfiguration(); + updateSecurity(); } + return null; }); } @@ -775,29 +761,26 @@ public Object call() { public void dropBucket(final String bucketName) { database.checkPermissionsOnDatabase(SecurityDatabaseUser.DATABASE_ACCESS.UPDATE_SCHEMA); - database.executeInWriteLock(new Callable() { - @Override - public Object call() { - final Bucket bucket = getBucketByName(bucketName); - - database.getPageManager().deleteFile(bucket.getId()); - try { - database.getFileManager().dropFile(bucket.getId()); - } catch (IOException e) { - LogManager.instance().log(this, Level.SEVERE, "Error on deleting bucket '%s'", e, bucketName); - } - removeFile(bucket.getId()); + final Bucket bucket = getBucketByName(bucketName); - bucketMap.remove(bucketName); + recordFileChanges(() -> { + database.getPageManager().deleteFile(bucket.getId()); + try { + database.getFileManager().dropFile(bucket.getId()); + } catch (IOException e) { + LogManager.instance().log(this, Level.SEVERE, "Error on deleting bucket '%s'", e, bucketName); + } + removeFile(bucket.getId()); - for (Index idx : new ArrayList<>(indexMap.values())) { - if (idx.getAssociatedBucketId() == bucket.getId()) - dropIndex(idx.getName()); - } + bucketMap.remove(bucketName); - saveConfiguration(); - return null; + for (Index idx : new ArrayList<>(indexMap.values())) { + if (idx.getAssociatedBucketId() == bucket.getId()) + dropIndex(idx.getName()); } + + saveConfiguration(); + return null; }); } @@ -821,36 +804,33 @@ public DocumentType createDocumentType(final String typeName, final int buckets, if (buckets > 32) throw new IllegalArgumentException("Cannot create " + buckets + " buckets: maximum is 32"); - return (DocumentType) database.executeInWriteLock(new Callable() { - @Override - public Object call() { - if (typeName.indexOf(",") > -1) - throw new IllegalArgumentException("Type name '" + typeName + "' contains non valid characters"); - - if (types.containsKey(typeName)) - throw new SchemaException("Type '" + typeName + "' already exists"); - - // CREATE ENTRY IN DICTIONARY IF NEEDED. THIS IS USED BY EMBEDDED DOCUMENT WHERE THE DICTIONARY ID IS SAVED - dictionary.getIdByName(typeName, true); - - final DocumentType c = new DocumentType(EmbeddedSchema.this, typeName); - types.put(typeName, c); - - for (int i = 0; i < buckets; ++i) { - final String bucketName = FileUtils.encode(typeName, ENCODING) + "_" + i; - if (existsBucket(bucketName)) { - LogManager.instance().log(this, Level.WARNING, "Reusing found bucket '%s' for type '%s'", null, bucketName, typeName); - c.addBucket(getBucketByName(bucketName)); - } else - // CREATE A NEW ONE - c.addBucket(createBucket(bucketName, pageSize)); - } + if (typeName.indexOf(",") > -1) + throw new IllegalArgumentException("Type name '" + typeName + "' contains non valid characters"); - saveConfiguration(); - updateSecurity(); + if (types.containsKey(typeName)) + throw new SchemaException("Type '" + typeName + "' already exists"); + + return recordFileChanges(() -> { + // CREATE ENTRY IN DICTIONARY IF NEEDED. THIS IS USED BY EMBEDDED DOCUMENT WHERE THE DICTIONARY ID IS SAVED + dictionary.getIdByName(typeName, true); + + final DocumentType c = new DocumentType(EmbeddedSchema.this, typeName); + types.put(typeName, c); - return c; + for (int i = 0; i < buckets; ++i) { + final String bucketName = FileUtils.encode(typeName, ENCODING) + "_" + i; + if (existsBucket(bucketName)) { + LogManager.instance().log(this, Level.WARNING, "Reusing found bucket '%s' for type '%s'", null, bucketName, typeName); + c.addBucket(getBucketByName(bucketName)); + } else + // CREATE A NEW ONE + c.addBucket(createBucket(bucketName, pageSize)); } + + saveConfiguration(); + updateSecurity(); + + return c; }); } @@ -898,35 +878,32 @@ public VertexType createVertexType(String typeName, int buckets, final int pageS if (buckets > 32) throw new IllegalArgumentException("Cannot create " + buckets + " buckets: maximum is 32"); - return (VertexType) database.executeInWriteLock(new Callable() { - @Override - public Object call() { - if (typeName.indexOf(",") > -1) - throw new IllegalArgumentException("Vertex type name '" + typeName + "' contains non valid characters"); - - if (types.containsKey(typeName)) - throw new SchemaException("Vertex type '" + typeName + "' already exists"); - - final VertexType c = new VertexType(EmbeddedSchema.this, typeName); - types.put(typeName, c); - - for (int i = 0; i < buckets; ++i) { - final String bucketName = FileUtils.encode(typeName, ENCODING) + "_" + i; - if (existsBucket(bucketName)) { - LogManager.instance().log(this, Level.WARNING, "Reusing found bucket '%s' for type '%s'", null, bucketName, typeName); - c.addBucket(getBucketByName(bucketName)); - } else - // CREATE A NEW ONE - c.addBucket(createBucket(bucketName, pageSize)); - } + if (typeName.indexOf(",") > -1) + throw new IllegalArgumentException("Vertex type name '" + typeName + "' contains non valid characters"); - database.getGraphEngine().createVertexType(database, c); + if (types.containsKey(typeName)) + throw new SchemaException("Vertex type '" + typeName + "' already exists"); - saveConfiguration(); - updateSecurity(); + return recordFileChanges(() -> { + final VertexType c = new VertexType(EmbeddedSchema.this, typeName); + types.put(typeName, c); - return c; + for (int i = 0; i < buckets; ++i) { + final String bucketName = FileUtils.encode(typeName, ENCODING) + "_" + i; + if (existsBucket(bucketName)) { + LogManager.instance().log(this, Level.WARNING, "Reusing found bucket '%s' for type '%s'", null, bucketName, typeName); + c.addBucket(getBucketByName(bucketName)); + } else + // CREATE A NEW ONE + c.addBucket(createBucket(bucketName, pageSize)); } + + database.getGraphEngine().createVertexType(database, c); + + saveConfiguration(); + updateSecurity(); + + return c; }); } @@ -974,32 +951,30 @@ public EdgeType createEdgeType(final String typeName, final int buckets, final i if (buckets > 32) throw new IllegalArgumentException("Cannot create " + buckets + " buckets: maximum is 32"); - return (EdgeType) database.executeInWriteLock(new Callable() { - @Override - public Object call() { - if (typeName.indexOf(",") > -1) - throw new IllegalArgumentException("Edge type name '" + typeName + "' contains non valid characters"); - - if (types.containsKey(typeName)) - throw new SchemaException("Edge type '" + typeName + "' already exists"); - final DocumentType c = new EdgeType(EmbeddedSchema.this, typeName); - types.put(typeName, c); - - for (int i = 0; i < buckets; ++i) { - final String bucketName = FileUtils.encode(typeName, ENCODING) + "_" + i; - if (existsBucket(bucketName)) { - LogManager.instance().log(this, Level.WARNING, "Reusing found bucket '%s' for type '%s'", null, bucketName, typeName); - c.addBucket(getBucketByName(bucketName)); - } else - // CREATE A NEW ONE - c.addBucket(createBucket(bucketName, pageSize)); - } + if (typeName.indexOf(",") > -1) + throw new IllegalArgumentException("Edge type name '" + typeName + "' contains non valid characters"); - saveConfiguration(); - updateSecurity(); + if (types.containsKey(typeName)) + throw new SchemaException("Edge type '" + typeName + "' already exists"); + + return recordFileChanges(() -> { + final DocumentType c = new EdgeType(EmbeddedSchema.this, typeName); + types.put(typeName, c); - return c; + for (int i = 0; i < buckets; ++i) { + final String bucketName = FileUtils.encode(typeName, ENCODING) + "_" + i; + if (existsBucket(bucketName)) { + LogManager.instance().log(this, Level.WARNING, "Reusing found bucket '%s' for type '%s'", null, bucketName, typeName); + c.addBucket(getBucketByName(bucketName)); + } else + // CREATE A NEW ONE + c.addBucket(createBucket(bucketName, pageSize)); } + + saveConfiguration(); + updateSecurity(); + + return c; }); } @@ -1338,4 +1313,16 @@ private void updateSecurity() { if (security != null) security.updateSchema(database); } + + protected RET recordFileChanges(final Callable callback) { + if (readingFromFile || !loadInRamCompleted) { + try { + return (RET) callback.call(); + } catch (Exception e) { + throw new DatabaseOperationException("Error on updating the schema", e); + } + } + + return database.getWrappedDatabaseInstance().recordFileChanges(callback); + } } diff --git a/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java b/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java index 6dea1c118f..66fa688a47 100644 --- a/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java +++ b/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java @@ -44,6 +44,7 @@ import com.arcadedb.engine.WALFileFactory; import com.arcadedb.exception.ConfigurationException; import com.arcadedb.exception.NeedRetryException; +import com.arcadedb.exception.SchemaException; import com.arcadedb.exception.TransactionException; import com.arcadedb.graph.Edge; import com.arcadedb.graph.GraphEngine; @@ -71,7 +72,6 @@ public class ReplicatedDatabase implements DatabaseInternal { private final ArcadeDBServer server; private final EmbeddedDatabase proxied; - private final ReplicatedSchema schema; private final HAServer.QUORUM quorum; private final long timeout; @@ -84,8 +84,6 @@ public ReplicatedDatabase(final ArcadeDBServer server, final EmbeddedDatabase pr this.quorum = HAServer.QUORUM.valueOf(proxied.getConfiguration().getValueAsString(GlobalConfiguration.HA_QUORUM).toUpperCase()); this.timeout = proxied.getConfiguration().getValueAsLong(GlobalConfiguration.HA_QUORUM_TIMEOUT); this.proxied.setWrappedDatabaseInstance(this); - - this.schema = new ReplicatedSchema(this, proxied.getSchema().getEmbedded()); } @Override @@ -475,7 +473,7 @@ public Edge newEdgeByKeys(final String sourceVertexType, final String[] sourceVe @Override public Schema getSchema() { - return schema; + return proxied.getSchema(); } @Override @@ -626,7 +624,7 @@ public String toString() { return proxied.toString(); } - protected Object recordFileChanges(final Callable callback) { + public RET recordFileChanges(final Callable callback) { final HAServer ha = server.getHA(); final AtomicReference result = new AtomicReference<>(); @@ -634,9 +632,11 @@ protected Object recordFileChanges(final Callable callback) { // ACQUIRE A DATABASE WRITE LOCK. THE LOCK IS REENTRANT, SO THE ACQUISITION DOWN THE LINE IS GOING TO PASS BECAUSE ALREADY ACQUIRED HERE final DatabaseChangeStructureRequest command = proxied.executeInWriteLock(() -> { if (!ha.isLeader()) { - // NOT THE LEADER< NOT RESPONSIBLE TO SEND CHANGES TO OTHER SERVERS - result.set(callback.call()); - return null; + // NOT THE LEADER: NOT RESPONSIBLE TO SEND CHANGES TO OTHER SERVERS + // TODO: Issue #118SchemaException + throw new SchemaException("Changes to the schema must be executed on the leader server"); +// result.set(callback.call()); +// return null; } if (!proxied.getFileManager().startRecordingChanges()) { @@ -670,10 +670,10 @@ protected Object recordFileChanges(final Callable callback) { if (command != null) { // SEND THE COMMAND OUTSIDE THE EXCLUSIVE LOCK - final int quorum = ha.getConfiguredServers() - 1; + final int quorum = ha.getConfiguredServers(); ha.sendCommandToReplicasWithQuorum(command, quorum, timeout); } - return result.get(); + return (RET) result.get(); } } diff --git a/server/src/main/java/com/arcadedb/server/ha/ReplicatedDocumentType.java b/server/src/main/java/com/arcadedb/server/ha/ReplicatedDocumentType.java new file mode 100644 index 0000000000..e1546052e7 --- /dev/null +++ b/server/src/main/java/com/arcadedb/server/ha/ReplicatedDocumentType.java @@ -0,0 +1,117 @@ +/* + * Copyright © 2021-present Arcade Data Ltd (info@arcadedata.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.arcadedb.server.ha; + +import com.arcadedb.database.DatabaseInternal; +import com.arcadedb.index.Index; +import com.arcadedb.index.lsm.LSMTreeIndexAbstract; +import com.arcadedb.schema.DocumentType; +import com.arcadedb.schema.EmbeddedSchema; +import com.arcadedb.schema.Property; +import com.arcadedb.schema.Type; + +public class ReplicatedDocumentType extends DocumentType { + public ReplicatedDocumentType(EmbeddedSchema schema, String name) { + super(schema, name); + } + + @Override + public Property createProperty(String propertyName, Type propertyType) { + return (Property) ((ReplicatedDatabase) ((DatabaseInternal) schema.getDatabase()).getWrappedDatabaseInstance()).recordFileChanges( + () -> super.createProperty(propertyName, propertyType)); + } + + @Override + public Property createProperty(String propertyName, Class propertyType) { + return (Property) ((ReplicatedDatabase) ((DatabaseInternal) schema.getDatabase()).getWrappedDatabaseInstance()).recordFileChanges( + () -> super.createProperty(propertyName, propertyType)); + } + + @Override + public Property createProperty(String propertyName, String propertyType) { + return (Property) ((ReplicatedDatabase) ((DatabaseInternal) schema.getDatabase()).getWrappedDatabaseInstance()).recordFileChanges( + () -> super.createProperty(propertyName, propertyType)); + } + + @Override + public Property getOrCreateProperty(String propertyName, Type propertyType) { + return (Property) ((ReplicatedDatabase) ((DatabaseInternal) schema.getDatabase()).getWrappedDatabaseInstance()).recordFileChanges( + () -> super.getOrCreateProperty(propertyName, propertyType)); + } + + @Override + public Property getOrCreateProperty(String propertyName, Class propertyType) { + return (Property) ((ReplicatedDatabase) ((DatabaseInternal) schema.getDatabase()).getWrappedDatabaseInstance()).recordFileChanges( + () -> super.getOrCreateProperty(propertyName, propertyType)); + } + + @Override + public Property getOrCreateProperty(String propertyName, String propertyType) { + return (Property) ((ReplicatedDatabase) ((DatabaseInternal) schema.getDatabase()).getWrappedDatabaseInstance()).recordFileChanges( + () -> super.getOrCreateProperty(propertyName, propertyType)); + } + + @Override + public Index createTypeIndex(EmbeddedSchema.INDEX_TYPE indexType, boolean unique, String... propertyNames) { + return (Index) ((ReplicatedDatabase) ((DatabaseInternal) schema.getDatabase()).getWrappedDatabaseInstance()).recordFileChanges( + () -> super.createTypeIndex(indexType, unique, propertyNames)); + } + + @Override + public Index createTypeIndex(EmbeddedSchema.INDEX_TYPE indexType, boolean unique, String[] propertyNames, int pageSize) { + return (Index) ((ReplicatedDatabase) ((DatabaseInternal) schema.getDatabase()).getWrappedDatabaseInstance()).recordFileChanges( + () -> super.createTypeIndex(indexType, unique, propertyNames, pageSize)); + } + + @Override + public Index createTypeIndex(EmbeddedSchema.INDEX_TYPE indexType, boolean unique, String[] propertyNames, int pageSize, Index.BuildIndexCallback callback) { + return (Index) ((ReplicatedDatabase) ((DatabaseInternal) schema.getDatabase()).getWrappedDatabaseInstance()).recordFileChanges( + () -> super.createTypeIndex(indexType, unique, propertyNames, pageSize, callback)); + } + + @Override + public Index createTypeIndex(EmbeddedSchema.INDEX_TYPE indexType, boolean unique, String[] propertyNames, int pageSize, + LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy, Index.BuildIndexCallback callback) { + return (Index) ((ReplicatedDatabase) ((DatabaseInternal) schema.getDatabase()).getWrappedDatabaseInstance()).recordFileChanges( + () -> super.createTypeIndex(indexType, unique, propertyNames, pageSize, nullStrategy, callback)); + } + + @Override + public Index getOrCreateTypeIndex(EmbeddedSchema.INDEX_TYPE indexType, boolean unique, String... propertyNames) { + return (Index) ((ReplicatedDatabase) ((DatabaseInternal) schema.getDatabase()).getWrappedDatabaseInstance()).recordFileChanges( + () -> super.getOrCreateTypeIndex(indexType, unique, propertyNames)); + } + + @Override + public Index getOrCreateTypeIndex(EmbeddedSchema.INDEX_TYPE indexType, boolean unique, String[] propertyNames, int pageSize) { + return (Index) ((ReplicatedDatabase) ((DatabaseInternal) schema.getDatabase()).getWrappedDatabaseInstance()).recordFileChanges( + () -> super.getOrCreateTypeIndex(indexType, unique, propertyNames, pageSize)); + } + + @Override + public Index getOrCreateTypeIndex(EmbeddedSchema.INDEX_TYPE indexType, boolean unique, String[] propertyNames, int pageSize, + Index.BuildIndexCallback callback) { + return (Index) ((ReplicatedDatabase) ((DatabaseInternal) schema.getDatabase()).getWrappedDatabaseInstance()).recordFileChanges( + () -> super.getOrCreateTypeIndex(indexType, unique, propertyNames, pageSize, callback)); + } + + @Override + public Index getOrCreateTypeIndex(EmbeddedSchema.INDEX_TYPE indexType, boolean unique, String[] propertyNames, int pageSize, + LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy, Index.BuildIndexCallback callback) { + return (Index) ((ReplicatedDatabase) ((DatabaseInternal) schema.getDatabase()).getWrappedDatabaseInstance()).recordFileChanges( + () -> super.getOrCreateTypeIndex(indexType, unique, propertyNames, pageSize, nullStrategy, callback)); + } +} diff --git a/server/src/main/java/com/arcadedb/server/ha/ReplicatedSchema.java b/server/src/main/java/com/arcadedb/server/ha/ReplicatedSchema.java deleted file mode 100644 index ff6527ec5e..0000000000 --- a/server/src/main/java/com/arcadedb/server/ha/ReplicatedSchema.java +++ /dev/null @@ -1,385 +0,0 @@ -/* - * Copyright © 2021-present Arcade Data Ltd (info@arcadedata.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.arcadedb.server.ha; - -import com.arcadedb.database.Database; -import com.arcadedb.engine.Bucket; -import com.arcadedb.engine.Dictionary; -import com.arcadedb.engine.PaginatedComponent; -import com.arcadedb.engine.PaginatedFile; -import com.arcadedb.index.Index; -import com.arcadedb.index.TypeIndex; -import com.arcadedb.index.lsm.LSMTreeIndexAbstract; -import com.arcadedb.schema.DocumentType; -import com.arcadedb.schema.EdgeType; -import com.arcadedb.schema.EmbeddedSchema; -import com.arcadedb.schema.Schema; -import com.arcadedb.schema.VertexType; - -import java.io.*; -import java.util.*; - -public class ReplicatedSchema implements Schema { - private final EmbeddedSchema proxied; - private final ReplicatedDatabase replicatedDatabase; - - public ReplicatedSchema(final ReplicatedDatabase replicatedDatabase, final EmbeddedSchema proxied) { - this.replicatedDatabase = replicatedDatabase; - this.proxied = proxied; - } - - @Override - public EmbeddedSchema getEmbedded() { - return proxied; - } - - public void create(PaginatedFile.MODE mode) { - proxied.create(mode); - } - - public void load(PaginatedFile.MODE mode) throws IOException { - proxied.load(mode); - } - - @Override - public TimeZone getTimeZone() { - return proxied.getTimeZone(); - } - - @Override - public void setTimeZone(TimeZone timeZone) { - proxied.setTimeZone(timeZone); - } - - @Override - public String getDateFormat() { - return proxied.getDateFormat(); - } - - @Override - public void setDateFormat(String dateFormat) { - proxied.setDateFormat(dateFormat); - } - - @Override - public String getDateTimeFormat() { - return proxied.getDateTimeFormat(); - } - - @Override - public void setDateTimeFormat(String dateTimeFormat) { - proxied.setDateTimeFormat(dateTimeFormat); - } - - @Override - public PaginatedComponent getFileById(int id) { - return proxied.getFileById(id); - } - - @Override - public PaginatedComponent getFileByIdIfExists(int id) { - return proxied.getFileByIdIfExists(id); - } - - public void removeFile(int fileId) { - replicatedDatabase.recordFileChanges(() -> { - proxied.removeFile(fileId); - return null; - }); - } - - @Override - public Collection getBuckets() { - return proxied.getBuckets(); - } - - @Override - public boolean existsBucket(String bucketName) { - return proxied.existsBucket(bucketName); - } - - @Override - public Bucket getBucketByName(String name) { - return proxied.getBucketByName(name); - } - - @Override - public Bucket getBucketById(int id) { - return proxied.getBucketById(id); - } - - @Override - public Bucket createBucket(String bucketName) { - return (Bucket) replicatedDatabase.recordFileChanges(() -> proxied.createBucket(bucketName)); - } - - public Bucket createBucket(String bucketName, int pageSize) { - return (Bucket) replicatedDatabase.recordFileChanges(() -> proxied.createBucket(bucketName, pageSize)); - } - - @Override - public String getEncoding() { - return proxied.getEncoding(); - } - - @Override - public DocumentType copyType(String typeName, String newTypeName, Class newTypeClass, int buckets, int pageSize, - int transactionBatchSize) { - return proxied.copyType(typeName, newTypeName, newTypeClass, buckets, pageSize, transactionBatchSize); - } - - @Override - public boolean existsIndex(String indexName) { - return proxied.existsIndex(indexName); - } - - @Override - public Index[] getIndexes() { - return proxied.getIndexes(); - } - - @Override - public void dropIndex(String indexName) { - replicatedDatabase.recordFileChanges(() -> { - proxied.dropIndex(indexName); - return null; - }); - } - - @Override - public Index getIndexByName(String indexName) { - return proxied.getIndexByName(indexName); - } - - @Override - public TypeIndex createTypeIndex(EmbeddedSchema.INDEX_TYPE indexType, boolean unique, String typeName, String... propertyNames) { - return (TypeIndex) replicatedDatabase.recordFileChanges(() -> proxied.createTypeIndex(indexType, unique, typeName, propertyNames)); - } - - @Override - public TypeIndex createTypeIndex(EmbeddedSchema.INDEX_TYPE indexType, boolean unique, String typeName, String[] propertyNames, int pageSize) { - return (TypeIndex) replicatedDatabase.recordFileChanges(() -> proxied.createTypeIndex(indexType, unique, typeName, propertyNames, pageSize)); - } - - @Override - public TypeIndex createTypeIndex(EmbeddedSchema.INDEX_TYPE indexType, boolean unique, String typeName, String[] propertyNames, int pageSize, - Index.BuildIndexCallback callback) { - return (TypeIndex) replicatedDatabase.recordFileChanges(() -> proxied.createTypeIndex(indexType, unique, typeName, propertyNames, pageSize, callback)); - } - - @Override - public TypeIndex createTypeIndex(EmbeddedSchema.INDEX_TYPE indexType, boolean unique, String typeName, String[] propertyNames, int pageSize, - LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy, Index.BuildIndexCallback callback) { - return (TypeIndex) replicatedDatabase.recordFileChanges( - () -> proxied.createTypeIndex(indexType, unique, typeName, propertyNames, pageSize, nullStrategy, callback)); - } - - @Override - public TypeIndex getOrCreateTypeIndex(EmbeddedSchema.INDEX_TYPE indexType, boolean unique, String typeName, String... propertyNames) { - return (TypeIndex) replicatedDatabase.recordFileChanges(() -> proxied.getOrCreateTypeIndex(indexType, unique, typeName, propertyNames)); - } - - @Override - public TypeIndex getOrCreateTypeIndex(EmbeddedSchema.INDEX_TYPE indexType, boolean unique, String typeName, String[] propertyNames, int pageSize) { - return (TypeIndex) replicatedDatabase.recordFileChanges(() -> proxied.getOrCreateTypeIndex(indexType, unique, typeName, propertyNames, pageSize)); - } - - @Override - public TypeIndex getOrCreateTypeIndex(EmbeddedSchema.INDEX_TYPE indexType, boolean unique, String typeName, String[] propertyNames, int pageSize, - Index.BuildIndexCallback callback) { - return (TypeIndex) replicatedDatabase.recordFileChanges(() -> proxied.getOrCreateTypeIndex(indexType, unique, typeName, propertyNames, pageSize, callback)); - } - - @Override - public TypeIndex getOrCreateTypeIndex(EmbeddedSchema.INDEX_TYPE indexType, boolean unique, String typeName, String[] propertyNames, int pageSize, - LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy, Index.BuildIndexCallback callback) { - return (TypeIndex) replicatedDatabase.recordFileChanges( - () -> proxied.getOrCreateTypeIndex(indexType, unique, typeName, propertyNames, pageSize, nullStrategy, callback)); - } - - @Override - public Index createBucketIndex(EmbeddedSchema.INDEX_TYPE indexType, boolean unique, String typeName, String bucketName, String[] propertyNames, int pageSize, - LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy, Index.BuildIndexCallback callback) { - - return (Index) replicatedDatabase.recordFileChanges( - () -> proxied.createBucketIndex(indexType, unique, typeName, bucketName, propertyNames, pageSize, nullStrategy, callback)); - } - - @Override - public Index createManualIndex(EmbeddedSchema.INDEX_TYPE indexType, boolean unique, String indexName, byte[] keyTypes, int pageSize, - LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy) { - return (Index) replicatedDatabase.recordFileChanges(() -> proxied.createManualIndex(indexType, unique, indexName, keyTypes, pageSize, nullStrategy)); - } - - public void close() { - proxied.close(); - } - - @Override - public Dictionary getDictionary() { - return proxied.getDictionary(); - } - - public Database getReplicatedDatabase() { - return proxied.getDatabase(); - } - - @Override - public Collection getTypes() { - return proxied.getTypes(); - } - - @Override - public DocumentType getType(String typeName) { - return proxied.getType(typeName); - } - - @Override - public String getTypeNameByBucketId(int bucketId) { - return proxied.getTypeNameByBucketId(bucketId); - } - - @Override - public DocumentType getTypeByBucketId(int bucketId) { - return proxied.getTypeByBucketId(bucketId); - } - - @Override - public boolean existsType(String typeName) { - return proxied.existsType(typeName); - } - - @Override - public void dropType(final String typeName) { - replicatedDatabase.recordFileChanges(() -> { - proxied.dropType(typeName); - return null; - }); - } - - @Override - public void dropBucket(final String bucketName) { - replicatedDatabase.recordFileChanges(() -> { - proxied.dropBucket(bucketName); - return null; - }); - } - - @Override - public DocumentType createDocumentType(final String typeName) { - return (DocumentType) replicatedDatabase.recordFileChanges(() -> proxied.createDocumentType(typeName)); - } - - @Override - public DocumentType createDocumentType(final String typeName, final int buckets) { - return (DocumentType) replicatedDatabase.recordFileChanges(() -> proxied.createDocumentType(typeName, buckets)); - } - - @Override - public DocumentType createDocumentType(final String typeName, final int buckets, final int pageSize) { - return (DocumentType) replicatedDatabase.recordFileChanges(() -> proxied.createDocumentType(typeName, buckets, pageSize)); - } - - @Override - public DocumentType getOrCreateDocumentType(String typeName) { - return (DocumentType) replicatedDatabase.recordFileChanges(() -> proxied.getOrCreateDocumentType(typeName)); - } - - @Override - public DocumentType getOrCreateDocumentType(String typeName, final int buckets) { - return (DocumentType) replicatedDatabase.recordFileChanges(() -> proxied.getOrCreateDocumentType(typeName, buckets)); - } - - @Override - public DocumentType getOrCreateDocumentType(String typeName, final int buckets, final int pageSize) { - return (DocumentType) replicatedDatabase.recordFileChanges(() -> proxied.getOrCreateDocumentType(typeName, buckets, pageSize)); - } - - @Override - public VertexType createVertexType(final String typeName) { - return (VertexType) replicatedDatabase.recordFileChanges(() -> proxied.createVertexType(typeName)); - } - - @Override - public VertexType createVertexType(final String typeName, final int buckets) { - return (VertexType) replicatedDatabase.recordFileChanges(() -> proxied.createVertexType(typeName, buckets)); - } - - @Override - public VertexType createVertexType(final String typeName, final int buckets, final int pageSize) { - return (VertexType) replicatedDatabase.recordFileChanges(() -> proxied.createVertexType(typeName, buckets, pageSize)); - } - - @Override - public VertexType getOrCreateVertexType(final String typeName) { - return (VertexType) replicatedDatabase.recordFileChanges(() -> proxied.getOrCreateVertexType(typeName)); - } - - @Override - public VertexType getOrCreateVertexType(final String typeName, final int buckets) { - return (VertexType) replicatedDatabase.recordFileChanges(() -> proxied.getOrCreateVertexType(typeName, buckets)); - } - - @Override - public VertexType getOrCreateVertexType(final String typeName, final int buckets, final int pageSize) { - return (VertexType) replicatedDatabase.recordFileChanges(() -> proxied.getOrCreateVertexType(typeName, buckets, pageSize)); - } - - @Override - public EdgeType createEdgeType(final String typeName) { - return (EdgeType) replicatedDatabase.recordFileChanges(() -> proxied.createEdgeType(typeName)); - } - - @Override - public EdgeType createEdgeType(final String typeName, final int buckets) { - return (EdgeType) replicatedDatabase.recordFileChanges(() -> proxied.createEdgeType(typeName, buckets)); - } - - @Override - public EdgeType createEdgeType(final String typeName, final int buckets, final int pageSize) { - return (EdgeType) replicatedDatabase.recordFileChanges(() -> proxied.createEdgeType(typeName, buckets, pageSize)); - } - - @Override - public EdgeType getOrCreateEdgeType(final String typeName) { - return (EdgeType) replicatedDatabase.recordFileChanges(() -> proxied.getOrCreateEdgeType(typeName)); - } - - @Override - public EdgeType getOrCreateEdgeType(final String typeName, int buckets) { - return (EdgeType) replicatedDatabase.recordFileChanges(() -> proxied.getOrCreateEdgeType(typeName, buckets)); - } - - @Override - public EdgeType getOrCreateEdgeType(final String typeName, int buckets, int pageSize) { - return (EdgeType) replicatedDatabase.recordFileChanges(() -> proxied.getOrCreateEdgeType(typeName, buckets, pageSize)); - } - - public void saveConfiguration() { - replicatedDatabase.recordFileChanges(() -> { - proxied.saveConfiguration(); - return null; - }); - } - - public void registerFile(final PaginatedComponent file) { - proxied.registerFile(file); - } - - public boolean isDirty() { - return proxied.isDirty(); - } -} diff --git a/server/src/main/java/com/arcadedb/server/ha/message/DatabaseChangeStructureRequest.java b/server/src/main/java/com/arcadedb/server/ha/message/DatabaseChangeStructureRequest.java index 4e29ff4aa5..895eb1dbf6 100755 --- a/server/src/main/java/com/arcadedb/server/ha/message/DatabaseChangeStructureRequest.java +++ b/server/src/main/java/com/arcadedb/server/ha/message/DatabaseChangeStructureRequest.java @@ -25,10 +25,9 @@ import com.arcadedb.server.ha.ReplicationException; import com.arcadedb.utility.FileUtils; -import java.io.File; -import java.util.HashMap; -import java.util.Map; -import java.util.logging.Level; +import java.io.*; +import java.util.*; +import java.util.logging.*; public class DatabaseChangeStructureRequest extends HAAbstractCommand { private String databaseName; diff --git a/server/src/main/java/com/arcadedb/server/ha/message/DatabaseChangeStructureResponse.java b/server/src/main/java/com/arcadedb/server/ha/message/DatabaseChangeStructureResponse.java index c5d2874423..ea813e2588 100755 --- a/server/src/main/java/com/arcadedb/server/ha/message/DatabaseChangeStructureResponse.java +++ b/server/src/main/java/com/arcadedb/server/ha/message/DatabaseChangeStructureResponse.java @@ -15,8 +15,11 @@ */ package com.arcadedb.server.ha.message; +import com.arcadedb.log.LogManager; import com.arcadedb.server.ha.HAServer; +import java.util.logging.*; + /** * Response for a transaction. This is needed to check the quorum by the leader. */ @@ -24,6 +27,7 @@ public class DatabaseChangeStructureResponse extends HAAbstractCommand { @Override public HACommand execute(final HAServer server, final String remoteServerName, final long messageNumber) { server.receivedResponse(remoteServerName, messageNumber); + LogManager.instance().log(this, Level.INFO, "Database change structure received"); return null; } diff --git a/server/src/test/java/com/arcadedb/server/ha/HTTP2ServersIT.java b/server/src/test/java/com/arcadedb/server/ha/HTTP2ServersIT.java index d36b019afb..22cb62d4e8 100644 --- a/server/src/test/java/com/arcadedb/server/ha/HTTP2ServersIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/HTTP2ServersIT.java @@ -20,6 +20,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.io.*; import java.net.*; import java.util.*; import java.util.logging.*; @@ -79,4 +80,96 @@ public void propagationOfSchema() throws Exception { } + @Test + public void checkQuery() throws Exception { + testEachServer((serverIndex) -> { + HttpURLConnection connection = (HttpURLConnection) new URL( + "http://127.0.0.1:248" + +serverIndex + "/api/v1/query/graph/sql/select%20from%20V1%20limit%201").openConnection(); + + connection.setRequestMethod("GET"); + connection.setRequestProperty("Authorization", + "Basic " + Base64.getEncoder().encodeToString(("root:" + BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS).getBytes())); + connection.connect(); + + try { + final String response = readResponse(connection); + + LogManager.instance().log(this, Level.INFO, "TEST: Response: %s", null, response); + + Assertions.assertEquals(200, connection.getResponseCode()); + + Assertions.assertEquals("OK", connection.getResponseMessage()); + + Assertions.assertTrue(response.contains("V1")); + + } finally { + connection.disconnect(); + } + }); + } + + @Test + public void checkRecordLoading() throws Exception { + testEachServer((serverIndex) -> { + HttpURLConnection connection = (HttpURLConnection) new URL( + "http://127.0.0.1:248" + serverIndex + "/api/v1/document/graph/" + BaseGraphServerTest.root.getIdentity().toString().substring(1)).openConnection(); + + connection.setRequestMethod("GET"); + connection.setRequestProperty("Authorization", + "Basic " + Base64.getEncoder().encodeToString(("root:" + BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS).getBytes())); + connection.connect(); + + try { + final String response = readResponse(connection); + + LogManager.instance().log(this, Level.INFO, "TEST: Response: %s", null, response); + + Assertions.assertEquals(200, connection.getResponseCode()); + + Assertions.assertEquals("OK", connection.getResponseMessage()); + + Assertions.assertTrue(response.contains("V1")); + + } finally { + connection.disconnect(); + } + }); + } + + @Test + public void checkRecordCreate() throws Exception { + testEachServer((serverIndex) -> { + HttpURLConnection connection = (HttpURLConnection) new URL("http://127.0.0.1:248" + serverIndex + "/api/v1/document/graph").openConnection(); + + connection.setRequestMethod("POST"); + + final String payload = "{\"@type\":\"Person\",\"name\":\"Jay\",\"surname\":\"Miner\",\"age\":69}"; + + connection.setRequestMethod("POST"); + connection.setRequestProperty("Authorization", + "Basic " + Base64.getEncoder().encodeToString(("root:" + BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS).getBytes())); + connection.setDoOutput(true); + + connection.connect(); + + PrintWriter pw = new PrintWriter(new OutputStreamWriter(connection.getOutputStream())); + pw.write(payload); + pw.close(); + + try { + final String response = readResponse(connection); + + Assertions.assertEquals(200, connection.getResponseCode()); + Assertions.assertEquals("OK", connection.getResponseMessage()); + + LogManager.instance().log(this, Level.INFO, "TEST: Response: %s", null, response); + + Assertions.assertTrue(response.contains("#")); + + } finally { + connection.disconnect(); + } + }); + + } } diff --git a/server/src/test/java/com/arcadedb/server/ha/ReplicationChangeSchema.java b/server/src/test/java/com/arcadedb/server/ha/ReplicationChangeSchema.java new file mode 100644 index 0000000000..c0ef345ffb --- /dev/null +++ b/server/src/test/java/com/arcadedb/server/ha/ReplicationChangeSchema.java @@ -0,0 +1,58 @@ +/* + * Copyright © 2021-present Arcade Data Ltd (info@arcadedata.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.arcadedb.server.ha; + +import com.arcadedb.database.Database; +import com.arcadedb.exception.SchemaException; +import com.arcadedb.schema.Type; +import com.arcadedb.schema.VertexType; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ReplicationChangeSchema extends ReplicationServerIT { + @Test + public void testReplication() { + super.testReplication(); + + final Database database0 = getServerDatabase(0, getDatabaseName()); + final VertexType type0 = database0.getSchema().createVertexType("RuntimeVertex0"); + type0.createProperty("id", Type.STRING); + + final Database database1 = getServerDatabase(1, getDatabaseName()); + + Assertions.assertNotNull(database1.getSchema().getType("RuntimeVertex0")); + Assertions.assertNotNull(database1.getSchema().getType("RuntimeVertex0").getProperty("id")); + + try { + database1.getSchema().createVertexType("RuntimeVertex1"); + Assertions.fail(); + } catch (SchemaException e) { + // EXPECTED + } + + Assertions.assertFalse(database0.getSchema().existsType("RuntimeVertex1")); + Assertions.assertFalse(database1.getSchema().existsType("RuntimeVertex1")); + } + + protected int getServerCount() { + return 2; + } + + @Override + protected int getTxs() { + return 10; + } +} diff --git a/server/src/test/java/com/arcadedb/server/ha/TwoServersIT.java b/server/src/test/java/com/arcadedb/server/ha/TwoServersIT.java deleted file mode 100644 index 48e1f0501a..0000000000 --- a/server/src/test/java/com/arcadedb/server/ha/TwoServersIT.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright © 2021-present Arcade Data Ltd (info@arcadedata.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.arcadedb.server.ha; - -import com.arcadedb.log.LogManager; -import com.arcadedb.server.BaseGraphServerTest; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.Base64; -import java.util.logging.Level; - -public class TwoServersIT extends BaseGraphServerTest { - @Override - protected int getServerCount() { - return 2; - } - - @Test - public void checkQuery() throws Exception { - testEachServer((serverIndex) -> { - HttpURLConnection connection = (HttpURLConnection) new URL( - "http://127.0.0.1:248" + +serverIndex + "/api/v1/query/graph/sql/select%20from%20V1%20limit%201").openConnection(); - - connection.setRequestMethod("GET"); - connection.setRequestProperty("Authorization", "Basic " + Base64.getEncoder().encodeToString(("root:" + BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS).getBytes())); - connection.connect(); - - try { - final String response = readResponse(connection); - - LogManager.instance().log(this, Level.INFO, "TEST: Response: %s", null, response); - - Assertions.assertEquals(200, connection.getResponseCode()); - - Assertions.assertEquals("OK", connection.getResponseMessage()); - - Assertions.assertTrue(response.contains("V1")); - - } finally { - connection.disconnect(); - } - }); - } - - @Test - public void checkRecordLoading() throws Exception { - testEachServer((serverIndex) -> { - HttpURLConnection connection = (HttpURLConnection) new URL( - "http://127.0.0.1:248" + serverIndex + "/api/v1/document/graph/" + BaseGraphServerTest.root.getIdentity().toString().substring(1)).openConnection(); - - connection.setRequestMethod("GET"); - connection.setRequestProperty("Authorization", "Basic " + Base64.getEncoder().encodeToString(("root:" + BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS).getBytes())); - connection.connect(); - - try { - final String response = readResponse(connection); - - LogManager.instance().log(this, Level.INFO, "TEST: Response: %s", null, response); - - Assertions.assertEquals(200, connection.getResponseCode()); - - Assertions.assertEquals("OK", connection.getResponseMessage()); - - Assertions.assertTrue(response.contains("V1")); - - } finally { - connection.disconnect(); - } - }); - } - - @Test - public void checkRecordCreate() throws Exception { - testEachServer((serverIndex) -> { - HttpURLConnection connection = (HttpURLConnection) new URL("http://127.0.0.1:248" + serverIndex + "/api/v1/document/graph").openConnection(); - - connection.setRequestMethod("POST"); - - final String payload = "{\"@type\":\"Person\",\"name\":\"Jay\",\"surname\":\"Miner\",\"age\":69}"; - - connection.setRequestMethod("POST"); - connection.setRequestProperty("Authorization", "Basic " + Base64.getEncoder().encodeToString(("root:" + BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS).getBytes())); - connection.setDoOutput(true); - - connection.connect(); - - PrintWriter pw = new PrintWriter(new OutputStreamWriter(connection.getOutputStream())); - pw.write(payload); - pw.close(); - - try { - final String response = readResponse(connection); - - Assertions.assertEquals(200, connection.getResponseCode()); - Assertions.assertEquals("OK", connection.getResponseMessage()); - - LogManager.instance().log(this, Level.INFO, "TEST: Response: %s", null, response); - - Assertions.assertTrue(response.contains("#")); - - } finally { - connection.disconnect(); - } - }); - - } -}