Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/master thesis schema evolution in polystores #429

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a86b361
merge-columns SqlAlterTableMergeColumns could be called
Sep 16, 2022
66cb080
merge-columns fist tries in DataMigrator
Sep 19, 2022
3ea3f46
merge-columns update working, method should be reorganized
Sep 20, 2022
c4d0a53
merge-columns merge of columns working
Sep 22, 2022
d579a90
merge-columns removing of the old columns added
Sep 22, 2022
d7e5223
merge-columns variables renamed, docs added
Sep 23, 2022
ad7732e
merge-columns default and nullable are working properly
Sep 23, 2022
10421b3
joinString added for MergeColumnsRequest
Nov 1, 2022
99adc41
respect the order of columns to merge
Nov 1, 2022
8b60ac7
Merge remote-tracking branch 'origin/master' into feature/merge-columns
Nov 9, 2022
d8fef69
Merge from the Master
Nov 9, 2022
4268e18
bugfix: exclude primary key(s) from merge
Nov 14, 2022
34ca336
bugfix: drop after mergecolumns fixed. dropcolumn of mongostore fixed.
Nov 15, 2022
9c881f4
relocate realional is working. migrate to docustore started.
Nov 24, 2022
1d11bb4
relational to document is working.
Nov 27, 2022
ac827e0
fixes of transfer table
Dec 3, 2022
d4ff64a
relational -> document: handling null values
Dec 7, 2022
5f70470
document -> document: transferTable implemented.
Dec 7, 2022
e3208d7
document -> relational. new table with varchar columns
Dec 10, 2022
9ee53c7
document -> relational. a simplified migration is working
Dec 11, 2022
5de662c
document -> relational -> document is working
Dec 13, 2022
38547d4
unused method removed
Dec 13, 2022
52505c0
little refacftor
Dec 14, 2022
dc07444
Merge pull request #1 from sulea/feature/merge-columns
sulea Dec 14, 2022
baa2856
A bigger amount of refactors
Dec 14, 2022
506e889
Merge remote-tracking branch 'origin/feature/master-thesis-schema-evo…
Dec 14, 2022
fd15c82
Merge pull request #2 from sulea/feature/transfer-table-old-concept
sulea Dec 14, 2022
61a3359
document -> relational is working also with embedded documents
Dec 20, 2022
9dcc5f5
some little refactor of trasnfertable
Dec 21, 2022
475f138
fixed relational -> document-based
Dec 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
A bigger amount of refactors
Attila Süle committed Dec 14, 2022
commit baa2856386a877782c23cffc4c58a33ce154ab0d
40 changes: 27 additions & 13 deletions catalog/src/main/java/org/polypheny/db/catalog/CatalogImpl.java
Original file line number Diff line number Diff line change
@@ -1893,34 +1893,44 @@ public long addTable( String name, long namespaceId, int ownerId, EntityType ent
return id;
}

@Override
public long relocateTable(CatalogTable sourceTable, long targetNamespaceId ) {

/**
* {@inheritDoc}
*/
@Override
public long relocateTable( CatalogTable sourceTable, long targetNamespaceId ) {
// Clone the source table by changing the ID of the parent namespace
CatalogTable targetTable = transferCatalogTable( sourceTable, targetNamespaceId );
synchronized ( this ) {
// Build the new immutable list for the source namespace by removing the table to transfer
ImmutableList<Long> reducedSourceSchemaChildren = ImmutableList
.copyOf( Collections2.filter( schemaChildren.get( sourceTable.namespaceId ),
Predicates.not( Predicates.equalTo( sourceTable.id ) ) ) );
// Build the new immutable list for the target namespace by adding the table to transfer
ImmutableList<Long> extendedTargetSchemaChildren = new ImmutableList.Builder<Long>()
.addAll( schemaChildren.get(targetNamespaceId ) )
.addAll( schemaChildren.get( targetNamespaceId ) )
.add( targetTable.id )
.build();

// Replace the immutable list for both the source and target namespaces
schemaChildren.replace( sourceTable.namespaceId, reducedSourceSchemaChildren );
schemaChildren.replace( targetNamespaceId, extendedTargetSchemaChildren );

// Replace the tables' trees with the cloned table
tables.replace( sourceTable.id, targetTable );
tableNames.remove( new Object[]{ sourceTable.databaseId, sourceTable.namespaceId, sourceTable.name } );
tableNames.put( new Object[]{ targetTable.databaseId, targetNamespaceId, targetTable.name }, targetTable );

for( Long fieldId: sourceTable.fieldIds ) {
// Replace the trees of the tables' columns with cloned columns
for ( Long fieldId : sourceTable.fieldIds ) {
CatalogColumn targetCatalogColumn = transferCatalogColumn( targetNamespaceId, columns.get( fieldId ) );
columns.replace( fieldId, targetCatalogColumn );
columnNames.remove( new Object[]{sourceTable.databaseId, sourceTable.namespaceId, sourceTable.id, targetCatalogColumn.name } );
columnNames.put( new Object[]{sourceTable.databaseId, targetNamespaceId, sourceTable.id, targetCatalogColumn.name }, targetCatalogColumn );
columnNames.remove( new Object[]{ sourceTable.databaseId, sourceTable.namespaceId, sourceTable.id, targetCatalogColumn.name } );
columnNames.put( new Object[]{ sourceTable.databaseId, targetNamespaceId, sourceTable.id, targetCatalogColumn.name }, targetCatalogColumn );
}

if( getSchema(sourceTable.namespaceId).namespaceType == NamespaceType.DOCUMENT ) {
// When transferring between document-based namespaces, also replace the collection trees.
if ( getSchema( sourceTable.namespaceId ).namespaceType == NamespaceType.DOCUMENT ) {
CatalogCollection targetCollection = transferCatalogCollection( collections.get( sourceTable.id ), targetNamespaceId );
collections.replace( sourceTable.id, targetCollection );
collectionNames.remove( new Object[]{ sourceTable.databaseId, sourceTable.namespaceId, sourceTable.name } );
@@ -1932,6 +1942,7 @@ public long relocateTable(CatalogTable sourceTable, long targetNamespaceId ) {
return sourceTable.id;
}


/**
* {@inheritDoc}
*/
@@ -5511,7 +5522,8 @@ private CatalogKey getKey( long keyId ) {
}
}

private static CatalogColumn transferCatalogColumn(long targetNamespaceId, CatalogColumn sourceCatalogColumn) {

private static CatalogColumn transferCatalogColumn( long targetNamespaceId, CatalogColumn sourceCatalogColumn ) {
CatalogColumn targetCatalogColumn = new CatalogColumn(
sourceCatalogColumn.id,
sourceCatalogColumn.name,
@@ -5527,11 +5539,12 @@ private static CatalogColumn transferCatalogColumn(long targetNamespaceId, Catal
sourceCatalogColumn.cardinality,
sourceCatalogColumn.nullable,
sourceCatalogColumn.collation,
sourceCatalogColumn.defaultValue);
sourceCatalogColumn.defaultValue );
return targetCatalogColumn;
}

private CatalogTable transferCatalogTable(CatalogTable sourceTable, long targetNamespaceId) {

private CatalogTable transferCatalogTable( CatalogTable sourceTable, long targetNamespaceId ) {
return new CatalogTable(
sourceTable.id,
sourceTable.name,
@@ -5544,18 +5557,19 @@ private CatalogTable transferCatalogTable(CatalogTable sourceTable, long targetN
sourceTable.dataPlacements,
sourceTable.modifiable,
sourceTable.partitionProperty,
sourceTable.connectedViews);
sourceTable.connectedViews );
}

private CatalogCollection transferCatalogCollection(CatalogCollection sourceCollection, long targetNamespaceId) {

private CatalogCollection transferCatalogCollection( CatalogCollection sourceCollection, long targetNamespaceId ) {
return new CatalogCollection(
sourceCollection.databaseId,
targetNamespaceId,
sourceCollection.id,
sourceCollection.name,
sourceCollection.placements,
sourceCollection.entityType,
sourceCollection.physicalName);
sourceCollection.physicalName );
}


9 changes: 8 additions & 1 deletion core/src/main/java/org/polypheny/db/catalog/Catalog.java
Original file line number Diff line number Diff line change
@@ -489,7 +489,14 @@ protected final boolean isValidIdentifier( final String str ) {
*/
public abstract long addTable( String name, long namespaceId, int ownerId, EntityType entityType, boolean modifiable );

public abstract long relocateTable(CatalogTable table, long targetNamespaceId );
/**
* Relocate a table from one namespace to another if the model of both namespaces is the same.
*
* @param table The name of the table to add
* @param targetNamespaceId The id of the schema
* @return The id of the table (the ID of the target table remains the same as that of the source table)
*/
public abstract long relocateTable( CatalogTable table, long targetNamespaceId );


/**
9 changes: 9 additions & 0 deletions core/src/main/java/org/polypheny/db/ddl/DdlManager.java
Original file line number Diff line number Diff line change
@@ -451,6 +451,15 @@ public static DdlManager getInstance() {
*/
public abstract void createTable( long schemaId, String tableName, List<FieldInformation> columns, List<ConstraintInformation> constraints, boolean ifNotExists, List<DataStore> stores, PlacementType placementType, Statement statement ) throws EntityAlreadyExistsException, ColumnNotExistsException, UnknownPartitionTypeException, UnknownColumnException, PartitionGroupNamesNotUniqueException;

/**
* Transfer a table between two namespaces.
* Currently, the transfer works between namespaces of the same model and between relational and document-based and vice versa.
*
* @param table the table about to be transfered
* @param targetSchemaId the id of the target namespace
* @param statement the used statement
* @param statement the used statement
*/
public abstract void transferTable( CatalogTable table, long targetSchemaId, Statement statement, List<String> primaryKeyColumnNames ) throws EntityAlreadyExistsException, DdlOnSourceException, UnknownTableException, UnknownColumnException;

/**
16 changes: 15 additions & 1 deletion core/src/main/java/org/polypheny/db/processing/DataMigrator.java
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
import org.polypheny.db.catalog.entity.CatalogGraphDatabase;
import org.polypheny.db.catalog.entity.CatalogTable;
import org.polypheny.db.catalog.exceptions.UnknownColumnException;
import org.polypheny.db.ddl.DdlManager;
import org.polypheny.db.transaction.Statement;
import org.polypheny.db.transaction.Transaction;

@@ -86,11 +87,24 @@ void copyPartitionData(

AlgRoot getSourceIterator( Statement statement, Map<Long, List<CatalogColumnPlacement>> placementDistribution );


void copyGraphData( CatalogGraphDatabase graph, Transaction transaction, Integer existingAdapterId, CatalogAdapter adapter );

/**
* Does migration when transferring between a relational and a document-based namespace.
*
* @param transaction Transactional scope
* @param sourceTable Source Table from where data is queried
* @param targetSchemaId ID of the target namespace
*/
void copyRelationalDataToDocumentData(Transaction transaction , CatalogTable sourceTable, long targetSchemaId);

/**
* Does migration when transferring between a document-based and a relational namespace.
*
* @param transaction Transactional scope
* @param jsonObjects List of the JSON-objects of the source collection
* @param table Target table created in the {@link DdlManager}
*/
void copyDocumentDataToRelationalData( Transaction transaction, List<JsonObject> jsonObjects, CatalogTable table ) throws UnknownColumnException;

}
3 changes: 2 additions & 1 deletion core/src/test/java/org/polypheny/db/catalog/MockCatalog.java
Original file line number Diff line number Diff line change
@@ -351,8 +351,9 @@ public long addTable( String name, long namespaceId, int ownerId, EntityType ent
throw new NotImplementedException();
}


@Override
public long relocateTable(CatalogTable sourceTable, long targetNamespaceId ) {
public long relocateTable( CatalogTable sourceTable, long targetNamespaceId ) {
throw new NotImplementedException();
}

75 changes: 43 additions & 32 deletions dbms/src/main/java/org/polypheny/db/ddl/DdlManagerImpl.java
Original file line number Diff line number Diff line change
@@ -2261,87 +2261,99 @@ public void createTable( long schemaId, String name, List<FieldInformation> fiel
}
}


/**
* {@inheritDoc}
*/
@Override
public void transferTable( CatalogTable sourceTable, long targetSchemaId, Statement statement, List<String> primaryKeyColumnNames ) throws EntityAlreadyExistsException, DdlOnSourceException, UnknownTableException, UnknownColumnException {
// Check if there is already an entity with this name
if ( assertEntityExists( targetSchemaId, sourceTable.name, true ) ) {
return;
}
CatalogSchema sourceNamespace = catalog.getSchema(sourceTable.namespaceId);
CatalogSchema targetNamespace = catalog.getSchema(targetSchemaId);

if ( sourceNamespace.getNamespaceType() == targetNamespace.getNamespaceType() ) {
catalog.relocateTable(sourceTable, targetSchemaId);
}
// Retrieve the catalog schema objects for later use
CatalogSchema sourceNamespace = catalog.getSchema( sourceTable.namespaceId );
CatalogSchema targetNamespace = catalog.getSchema( targetSchemaId );

if ( sourceNamespace.getNamespaceType() == NamespaceType.RELATIONAL && targetNamespace.getNamespaceType() == NamespaceType.DOCUMENT ) {
if ( sourceNamespace.getNamespaceType() == targetNamespace.getNamespaceType() ) {
// If the source and target namespaces are from the same model, it is sufficient to just move them in the catalog
catalog.relocateTable( sourceTable, targetSchemaId );
} else if ( sourceNamespace.getNamespaceType() == NamespaceType.RELATIONAL && targetNamespace.getNamespaceType() == NamespaceType.DOCUMENT ) {
// If the source namespace is relational and the target is document-based, the migration has to be called
// Create the new collection in the same datastore
List<DataStore> stores = sourceTable.dataPlacements
.stream()
.map(id -> (DataStore) AdapterManager.getInstance().getAdapter(id))
.collect(Collectors.toList());
PlacementType placementType = catalog.getDataPlacement(sourceTable.dataPlacements.get(0), sourceTable.id).placementType;
.map( id -> (DataStore) AdapterManager.getInstance().getAdapter( id ) )
.collect( Collectors.toList() );
PlacementType placementType = catalog.getDataPlacement( sourceTable.dataPlacements.get( 0 ), sourceTable.id ).placementType;
createCollection( targetSchemaId, sourceTable.name, false, stores, placementType, statement );

// Migrator
// Call the migrator
DataMigrator dataMigrator = statement.getTransaction().getDataMigrator();
dataMigrator.copyRelationalDataToDocumentData( statement.getTransaction(), sourceTable, targetSchemaId );

// Drop the source table
dropTable( sourceTable, statement );

statement.getQueryProcessor().resetCaches();
}

if ( sourceNamespace.getNamespaceType() == NamespaceType.DOCUMENT && targetNamespace.getNamespaceType() == NamespaceType.RELATIONAL ) {
} else if ( sourceNamespace.getNamespaceType() == NamespaceType.DOCUMENT && targetNamespace.getNamespaceType() == NamespaceType.RELATIONAL ) {
// If the source namespace is document-based and the target is relational, the migration has to be called
// Retrieve the data placements of the source catalog
CatalogCollection sourceCollection = catalog.getCollection( sourceTable.id );
List<DataStore> stores = sourceTable.dataPlacements
.stream()
.map(id -> (DataStore) AdapterManager.getInstance().getAdapter(id))
.collect(Collectors.toList());
PlacementType placementType = catalog.getDataPlacement(sourceTable.dataPlacements.get(0), sourceTable.id).placementType;

.map( id -> (DataStore) AdapterManager.getInstance().getAdapter( id ) )
.collect( Collectors.toList() );
PlacementType placementType = catalog.getDataPlacement( sourceTable.dataPlacements.get( 0 ), sourceTable.id ).placementType;

// Get all documents of the source collection. Here it is necessary to create the target table with its columns
String query = String.format( "db.%s.find({})", sourceTable.name );
QueryParameters parameters = new MqlQueryParameters( query, sourceNamespace.name, NamespaceType.DOCUMENT );
QueryParameters parameters = new MqlQueryParameters( query, sourceNamespace.name, NamespaceType.DOCUMENT );
AutomaticDdlProcessor mqlProcessor = (AutomaticDdlProcessor) statement.getTransaction().getProcessor( QueryLanguage.MONGO_QL );
MqlNode parsed = (MqlNode) mqlProcessor.parse( query ).get( 0 );
AlgRoot logicalRoot = mqlProcessor.translate( statement, parsed, parameters );
PolyImplementation polyImplementation = statement.getQueryProcessor().prepareQuery( logicalRoot, true );

Result result = getResult( QueryLanguage.MONGO_QL, statement, query, polyImplementation, statement.getTransaction(), false );

// Create a list of the JsonObjects skipping the _id column which is only needed for the documents but not for the table
List<String> fieldNames = new ArrayList();
List<JsonObject> jsonObjects = new ArrayList();
for ( String[] documents : result.getData()) {
for ( String document : documents) {
for ( String[] documents : result.getData() ) {
for ( String document : documents ) {
JsonObject jsonObject = JsonParser.parseString( document ).getAsJsonObject();
List<String> fieldsInDocument = new ArrayList<>( jsonObject.keySet());
List<String> fieldsInDocument = new ArrayList<>( jsonObject.keySet() );
fieldsInDocument.removeAll( fieldNames );
fieldsInDocument.remove( "_id");
fieldsInDocument.remove( "_id" );
fieldNames.addAll( fieldsInDocument );
jsonObjects.add( jsonObject );
}
}

ColumnTypeInformation typeInformation = new ColumnTypeInformation( PolyType.VARCHAR, PolyType.VARCHAR, 24, null, null, null, false );

// Create the target table
// Only VARCHAR(32) columns are added in the current version
ColumnTypeInformation typeInformation = new ColumnTypeInformation( PolyType.VARCHAR, PolyType.VARCHAR, 32, null, null, null, false );
List<FieldInformation> fieldInformations = fieldNames
.stream()
.map( fieldName -> new FieldInformation( fieldName, typeInformation, Collation.getDefaultCollation(), null, fieldNames.indexOf( fieldName ) + 1 ) )
.collect( Collectors.toList());
.collect( Collectors.toList() );

// Set the PKs selected by the user
List<ConstraintInformation> constraintInformations = Collections.singletonList( new ConstraintInformation( "primary", ConstraintType.PRIMARY, primaryKeyColumnNames ) );
createTable( targetSchemaId, sourceTable.name, fieldInformations, constraintInformations, false, stores, placementType, statement);
createTable( targetSchemaId, sourceTable.name, fieldInformations, constraintInformations, false, stores, placementType, statement );

// Migrator
// Call the DataMigrator
DataMigrator dataMigrator = statement.getTransaction().getDataMigrator();
dataMigrator.copyDocumentDataToRelationalData( statement.getTransaction(), jsonObjects, catalog.getTable( targetSchemaId, sourceTable.name ) );

// Remove the source collection
dropCollection( sourceCollection, statement );
statement.getQueryProcessor().resetCaches();
}
}


@NotNull
public static Result getResult(QueryLanguage language, Statement statement, String query, PolyImplementation result, Transaction transaction, final boolean noLimit ) {
public static Result getResult( QueryLanguage language, Statement statement, String query, PolyImplementation result, Transaction transaction, final boolean noLimit ) {
Catalog catalog = Catalog.getInstance();

List<List<Object>> rows = result.getRows( statement, noLimit ? -1 : language == QueryLanguage.CYPHER ? RuntimeConfig.UI_NODE_AMOUNT.getInteger() : RuntimeConfig.UI_PAGE_SIZE.getInteger() );
@@ -2350,12 +2362,10 @@ public static Result getResult(QueryLanguage language, Statement statement, Stri

CatalogTable catalogTable = null;


ArrayList<DbColumn> header = new ArrayList<>();
for ( AlgDataTypeField metaData : result.rowType.getFieldList() ) {
String columnName = metaData.getName();


DbColumn dbCol = new DbColumn(
metaData.getName(),
metaData.getType().getFullTypeString(),
@@ -3285,6 +3295,7 @@ private void prepareMonitoring( Statement statement, Kind kind, CatalogTable cat
}
}


@Override
public void dropFunction() {
throw new RuntimeException( "Not supported yet" );
Original file line number Diff line number Diff line change
@@ -150,25 +150,30 @@ public void copyGraphData( CatalogGraphDatabase target, Transaction transaction,
}


/**
* {@inheritDoc}
*/
@Override
public void copyRelationalDataToDocumentData( Transaction transaction, CatalogTable sourceTable, long targetSchemaId ) {
try {
Catalog catalog = Catalog.getInstance();

// Collect the columns of the source table
List<CatalogColumn> sourceColumns = new ArrayList<>();
for ( String columnName : sourceTable.getColumnNames() ) {
sourceColumns.add( catalog.getColumn( sourceTable.id, columnName ) );
}

// Retrieve the placements of the source table
Map<Long, List<CatalogColumnPlacement>> sourceColumnPlacements = new HashMap<>();
sourceColumnPlacements.put(
sourceTable.partitionProperty.partitionIds.get( 0 ),
selectSourcePlacements( sourceTable, sourceColumns, -1 ) );

Statement sourceStatement = transaction.createStatement();

Map<Long, List<CatalogColumnPlacement>> subDistribution = new HashMap<>( sourceColumnPlacements );
subDistribution.keySet().retainAll( Arrays.asList( sourceTable.partitionProperty.partitionIds.get( 0 ) ) );

// Initialize the source statement to read all values from the source table
Statement sourceStatement = transaction.createStatement();
AlgRoot sourceAlg = getSourceIterator( sourceStatement, subDistribution );
PolyImplementation result = sourceStatement.getQueryProcessor().prepareQuery(
sourceAlg,
@@ -177,6 +182,7 @@ public void copyRelationalDataToDocumentData( Transaction transaction, CatalogTa
false,
false );

// Build the data structure to map the columns to the physical placements
Map<String, Integer> sourceColMapping = new LinkedHashMap<>();
for ( CatalogColumn catalogColumn : sourceColumns ) {
int i = 0;
@@ -192,6 +198,7 @@ public void copyRelationalDataToDocumentData( Transaction transaction, CatalogTa
final Enumerable<Object> enumerable = result.enumerable( sourceStatement.getDataContext() );
Iterator<Object> sourceIterator = enumerable.iterator();
while ( sourceIterator.hasNext() ) {
// Build a data structure for all values of the source table for the insert query
List<List<Object>> rows = MetaImpl.collect( result.getCursorFactory(), LimitIterator.of( sourceIterator, batchSize ), new ArrayList<>() );
List<LinkedHashMap<String, Object>> values = new ArrayList<>();
for ( List<Object> list : rows ) {
@@ -200,6 +207,7 @@ public void copyRelationalDataToDocumentData( Transaction transaction, CatalogTa
values.add( currentRowValues );
}

// Create the insert query for all documents in the collection
boolean firstRow = true;
StringBuffer bf = new StringBuffer();
bf.append( "db." + sourceTable.name + ".insertMany([" );
@@ -212,7 +220,7 @@ public void copyRelationalDataToDocumentData( Transaction transaction, CatalogTa
}
boolean firstColumn = true;
for ( Map.Entry<String, Object> entry : row.entrySet() ) {
if (entry.getValue() != null ) {
if ( entry.getValue() != null ) {
if ( firstColumn == true ) {
firstColumn = false;
} else {
@@ -225,14 +233,16 @@ public void copyRelationalDataToDocumentData( Transaction transaction, CatalogTa
}
bf.append( "])" );

String query = bf.toString();

// Insert als documents into the newlz created collection
Statement targetStatement = transaction.createStatement();
String query = bf.toString();
AutomaticDdlProcessor mqlProcessor = (AutomaticDdlProcessor) transaction.getProcessor( Catalog.QueryLanguage.MONGO_QL );
QueryParameters parameters = new MqlQueryParameters( query, catalog.getSchema( targetSchemaId ).name, Catalog.NamespaceType.DOCUMENT );
MqlNode parsed = (MqlNode) mqlProcessor.parse( query ).get( 0 );
AlgRoot logicalRoot = mqlProcessor.translate( targetStatement, parsed, parameters );
PolyImplementation polyImplementation = targetStatement.getQueryProcessor().prepareQuery( logicalRoot, true );

// TODO: something is wrong with the transactions. Try to get rid of this.
Result updateRresult = getResult( Catalog.QueryLanguage.MONGO_QL, targetStatement, query, polyImplementation, transaction, false );
}
} catch ( Throwable t ) {
@@ -241,39 +251,48 @@ public void copyRelationalDataToDocumentData( Transaction transaction, CatalogTa
}


/**
* {@inheritDoc}
*/
@Override
public void copyDocumentDataToRelationalData( Transaction transaction, List<JsonObject> jsonObjects, CatalogTable targetTable ) throws UnknownColumnException {
final AlgDataTypeFactory typeFactory = new PolyTypeFactoryImpl( AlgDataTypeSystem.DEFAULT );
Catalog catalog = Catalog.getInstance();

// Get the values in all documents of the collection
// TODO: A data structure is needed to represent also 1:N relations of multiple tables
Map<CatalogColumn, List<Object>> columnValues = new HashMap<>();
for ( JsonObject jsonObject : jsonObjects) {
for ( JsonObject jsonObject : jsonObjects ) {
for ( String columnName : targetTable.getColumnNames() ) {
CatalogColumn column = catalog.getColumn( targetTable.id, columnName );
if ( !columnValues.containsKey( column ) ) {
columnValues.put( column, new LinkedList<>() );
}
JsonElement jsonElement = jsonObject.get( columnName );
if (jsonElement != null) {
columnValues.get( column ).add( jsonElement.getAsString() );
if ( jsonElement != null ) {
columnValues.get( column ).add( jsonElement.getAsString() );
} else {
columnValues.get( column ).add( null );
}
}
}

List<CatalogColumnPlacement> targetColumnPlacements = new LinkedList<>();
Statement targetStatement = transaction.createStatement();
AlgRoot targetAlg;
final AlgDataTypeFactory typeFactory = new PolyTypeFactoryImpl( AlgDataTypeSystem.DEFAULT );
List<CatalogColumnPlacement> targetColumnPlacements = new LinkedList<>();
for ( Entry<CatalogColumn, List<Object>> entry : columnValues.entrySet() ) {
// Add the values to the column to the statement
CatalogColumn targetColumn = catalog.getColumn( targetTable.id, entry.getKey().name );
targetStatement.getDataContext().addParameterValues(targetColumn.id, targetColumn.getAlgDataType( typeFactory ) , entry.getValue() );
List<DataStore> stores = RoutingManager.getInstance().getCreatePlacementStrategy().getDataStoresForNewColumn( targetColumn );
for ( DataStore store : stores ) {
targetColumnPlacements.add( Catalog.getInstance().getColumnPlacement( store.getAdapterId(), targetColumn.id ) );
targetStatement.getDataContext().addParameterValues( targetColumn.id, targetColumn.getAlgDataType( typeFactory ), entry.getValue() );

// Add all placements of the column to the targetColumnPlacements list
for ( DataStore store : RoutingManager.getInstance().getCreatePlacementStrategy().getDataStoresForNewColumn( targetColumn ) ) {
CatalogColumnPlacement columnPlacement = Catalog.getInstance().getColumnPlacement( store.getAdapterId(), targetColumn.id );
targetColumnPlacements.add( columnPlacement );
}
}

targetAlg = buildInsertStatement( targetStatement, targetColumnPlacements, targetTable.partitionProperty.partitionIds.get( 0 ) );
// Prepare the insert query
AlgRoot targetAlg = buildInsertStatement( targetStatement, targetColumnPlacements, targetTable.partitionProperty.partitionIds.get( 0 ) );
Iterator<?> iterator = targetStatement.getQueryProcessor()
.prepareQuery( targetAlg, targetAlg.validatedRowType, true, false, false )
.enumerable( targetStatement.getDataContext() )
Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@


/**
* Parse tree for {@code ALTER SCHEMA name OWNER TO} statement.
* Parse tree for {@code ALTER SCHEMA name TRANSFER tanle TO namespace} statement.
*/
public class SqlAlterSchemaTransferTable extends SqlAlterSchema {

@@ -60,23 +60,23 @@ public class SqlAlterSchemaTransferTable extends SqlAlterSchema {
/**
* Creates a SqlAlterSchemaOwner.
*/
public SqlAlterSchemaTransferTable(ParserPos pos, SqlIdentifier table, SqlIdentifier targetSchema, SqlNodeList primaryKeyColumns ) {
public SqlAlterSchemaTransferTable( ParserPos pos, SqlIdentifier table, SqlIdentifier targetSchema, SqlNodeList primaryKeyColumns ) {
super( pos );
this.table = Objects.requireNonNull(table);
this.targetSchema = Objects.requireNonNull(targetSchema);
this.table = Objects.requireNonNull( table );
this.targetSchema = Objects.requireNonNull( targetSchema );
this.primaryKeyColumns = primaryKeyColumns;
}


@Override
public List<Node> getOperandList() {
return ImmutableNullableList.of(table, targetSchema);
return ImmutableNullableList.of( table, targetSchema );
}


@Override
public List<SqlNode> getSqlOperandList() {
return ImmutableNullableList.of(table, targetSchema);
return ImmutableNullableList.of( table, targetSchema );
}


@@ -95,9 +95,9 @@ public void unparse( SqlWriter writer, int leftPrec, int rightPrec ) {
public void execute( Context context, Statement statement, QueryParameters parameters ) {
try {
Catalog catalog = Catalog.getInstance();
CatalogTable catalogTable = getCatalogTable( context, table);
CatalogTable catalogTable = getCatalogTable( context, table );

long targetSchemaId = catalog.getSchema( context.getDatabaseId(), targetSchema.getNames().get(0) ).id;
long targetSchemaId = catalog.getSchema( context.getDatabaseId(), targetSchema.getNames().get( 0 ) ).id;

List<String> primaryKeyColumnNames = (primaryKeyColumns != null)
? primaryKeyColumns.getList().stream().map( Node::toString ).collect( Collectors.toList() )
@@ -107,14 +107,14 @@ public void execute( Context context, Statement statement, QueryParameters param

} catch ( UnknownSchemaException e ) {
throw CoreUtil.newContextException( table.getPos(), RESOURCE.schemaNotFound( table.getSimple() ) );
} catch (EntityAlreadyExistsException e) {
throw CoreUtil.newContextException( table.getPos(), RESOURCE.tableExists( table.names.get( 1 ) ) );
} catch ( EntityAlreadyExistsException e ) {
throw CoreUtil.newContextException( table.getPos(), RESOURCE.tableExists( table.names.get( 1 ) ) );
} catch ( DdlOnSourceException e ) {
throw CoreUtil.newContextException( table.getPos(), RESOURCE.ddlOnSourceTable() );
} catch ( UnknownTableException e ) {
throw CoreUtil.newContextException( table.getPos(), RESOURCE.tableNotFound( e.getTableName()) );
throw CoreUtil.newContextException( table.getPos(), RESOURCE.tableNotFound( e.getTableName() ) );
} catch ( UnknownColumnException e ) {
throw CoreUtil.newContextException( table.getPos(), RESOURCE.columnNotFound( e.getColumnName() ));
throw CoreUtil.newContextException( table.getPos(), RESOURCE.columnNotFound( e.getColumnName() ) );
}
}