diff --git a/core/src/main/java/org/polypheny/db/plan/AlgOptTable.java b/core/src/main/java/org/polypheny/db/plan/AlgOptTable.java index 1fec3e212f..0cea2d2046 100644 --- a/core/src/main/java/org/polypheny/db/plan/AlgOptTable.java +++ b/core/src/main/java/org/polypheny/db/plan/AlgOptTable.java @@ -135,6 +135,8 @@ public interface AlgOptTable extends Wrapper { */ List getColumnStrategies(); + void setPreferredPlacement( String placement ); + String getPreferredPlacement(); default Table getTable() { return null; diff --git a/core/src/main/java/org/polypheny/db/prepare/AlgOptTableImpl.java b/core/src/main/java/org/polypheny/db/prepare/AlgOptTableImpl.java index 80077f8b17..e65d784399 100644 --- a/core/src/main/java/org/polypheny/db/prepare/AlgOptTableImpl.java +++ b/core/src/main/java/org/polypheny/db/prepare/AlgOptTableImpl.java @@ -100,6 +100,8 @@ public class AlgOptTableImpl extends Prepare.AbstractPreparingTable { private final transient Function expressionFunction; private final ImmutableList names; + String preferredPlacement; + /** * Estimate for the row count, or null. *

@@ -121,6 +123,7 @@ private AlgOptTableImpl( this.table = table; // may be null this.expressionFunction = expressionFunction; // may be null this.rowCount = rowCount; // may be null + this.preferredPlacement = null; } @@ -403,6 +406,18 @@ public AccessType getAllowedAccess() { } + @Override + public void setPreferredPlacement( String placement ) { + preferredPlacement = placement; + } + + + @Override + public String getPreferredPlacement() { + return preferredPlacement; + } + + /** * Helper for {@link #getColumnStrategies()}. */ diff --git a/core/src/main/java/org/polypheny/db/processing/LogicalAlgAnalyzeShuttle.java b/core/src/main/java/org/polypheny/db/processing/LogicalAlgAnalyzeShuttle.java index 1ba3b3de60..47bdab1469 100644 --- a/core/src/main/java/org/polypheny/db/processing/LogicalAlgAnalyzeShuttle.java +++ b/core/src/main/java/org/polypheny/db/processing/LogicalAlgAnalyzeShuttle.java @@ -292,7 +292,7 @@ public AlgNode visit( LogicalMatch match ) { @Override public AlgNode visit( Scan scan ) { - hashBasis.add( "Scan#" + scan.getTable().getQualifiedName() ); + hashBasis.add( "Scan#" + scan.getTable().getQualifiedName() + "@" + scan.getTable().getPreferredPlacement() ); // get available columns for every table scan this.getAvailableColumns( scan ); diff --git a/core/src/test/java/org/polypheny/db/catalog/MockCatalogReader.java b/core/src/test/java/org/polypheny/db/catalog/MockCatalogReader.java index 5d77c710be..b3fbce983e 100644 --- a/core/src/test/java/org/polypheny/db/catalog/MockCatalogReader.java +++ b/core/src/test/java/org/polypheny/db/catalog/MockCatalogReader.java @@ -390,6 +390,17 @@ public AlgDataType getRowType() { } + @Override + public void setPreferredPlacement( String placement ) { + } + + + @Override + public String getPreferredPlacement() { + return null; + } + + public static MockTable create( MockCatalogReader catalogReader, MockSchema schema, String name, boolean stream, double rowCount ) { return create( catalogReader, schema, name, stream, rowCount, null ); } diff --git a/dbms/src/main/java/org/polypheny/db/routing/routers/AbstractDqlRouter.java b/dbms/src/main/java/org/polypheny/db/routing/routers/AbstractDqlRouter.java index 93b549f3b7..8689974188 100644 --- a/dbms/src/main/java/org/polypheny/db/routing/routers/AbstractDqlRouter.java +++ b/dbms/src/main/java/org/polypheny/db/routing/routers/AbstractDqlRouter.java @@ -17,10 +17,15 @@ package org.polypheny.db.routing.routers; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; @@ -43,7 +48,10 @@ import org.polypheny.db.algebra.logical.relational.LogicalValues; import org.polypheny.db.catalog.Catalog; import org.polypheny.db.catalog.Catalog.Pattern; +import org.polypheny.db.catalog.entity.CatalogAdapter; +import org.polypheny.db.catalog.entity.CatalogColumnPlacement; import org.polypheny.db.catalog.entity.CatalogTable; +import org.polypheny.db.catalog.exceptions.UnknownAdapterIdRuntimeException; import org.polypheny.db.plan.AlgOptCluster; import org.polypheny.db.prepare.AlgOptTableImpl; import org.polypheny.db.rex.RexBuilder; @@ -82,6 +90,8 @@ public abstract class AbstractDqlRouter extends BaseRouter implements Router { */ protected boolean cancelQuery = false; + // catalogTable.id -> unique placement name + final Map preferencePerTable = new HashMap<>(); /** * Abstract methods which will determine routing strategy. Not implemented in abstract class. @@ -113,6 +123,45 @@ protected abstract List handleNonePartitioning( AlgOptCluster cluster, LogicalQueryInformation queryInformation ); + /* Copied from BaseRouter.java, with added awarness of placement preference */ + public Map> selectPlacementWithPreference( CatalogTable table ) { + // Find the adapter with the most column placements + int adapterIdWithMostPlacements = -1; + int numOfPlacements = 0; + final String preferredPlacement = preferencePerTable.getOrDefault( table.id, null ); + for ( Entry> entry : catalog.getColumnPlacementsByAdapter( table.id ).entrySet() ) { + if ( preferredPlacement != null ) { + try { + final CatalogAdapter adapter = catalog.getAdapter( entry.getKey() ); + if ( ! preferredPlacement.equals( adapter.uniqueName ) ) { + continue; + } + } catch (UnknownAdapterIdRuntimeException e) { + // getColumnplacementsByAdapter can return id + // values which are not valid + continue; + } + } + if ( entry.getValue().size() > numOfPlacements ) { + adapterIdWithMostPlacements = entry.getKey(); + numOfPlacements = entry.getValue().size(); + } + } + + // Take the adapter with most placements as base and add missing column placements + List placementList = new LinkedList<>(); + for ( long cid : table.fieldIds ) { + if ( catalog.getDataPlacement( adapterIdWithMostPlacements, table.id ).columnPlacementsOnAdapter.contains( cid ) ) { + placementList.add( Catalog.getInstance().getColumnPlacement( adapterIdWithMostPlacements, cid ) ); + } else { + placementList.add( Catalog.getInstance().getColumnPlacement( cid ).get( 0 ) ); + } + } + + return new HashMap<>() {{ + put( table.partitionProperty.partitionIds.get( 0 ), placementList ); + }}; + } /** * Abstract router only routes DQL queries. @@ -225,6 +274,7 @@ protected List buildSelect( AlgNode node, List handleVerticalPartitioningOrReplication( AlgNod @Override protected List handleNonePartitioning( AlgNode node, CatalogTable catalogTable, Statement statement, List builders, AlgOptCluster cluster, LogicalQueryInformation queryInformation ) { // Get placements and convert into placement distribution - final Map> placements = selectPlacement( catalogTable ); + final Map> placements = selectPlacementWithPreference( catalogTable ); // Only one builder available builders.get( 0 ).addPhysicalInfo( placements ); diff --git a/plugins/sql-language/src/main/codegen/templates/Parser.jj b/plugins/sql-language/src/main/codegen/templates/Parser.jj index 4a3ae39044..549eb9567c 100644 --- a/plugins/sql-language/src/main/codegen/templates/Parser.jj +++ b/plugins/sql-language/src/main/codegen/templates/Parser.jj @@ -4262,7 +4262,8 @@ SqlIdentifier CompoundIdentifier() : { List list = new ArrayList(); List posList = new ArrayList(); - String p; + String p, at = null; + SqlIdentifier s; boolean star = false; } { @@ -4286,12 +4287,18 @@ SqlIdentifier CompoundIdentifier() : posList.add(getPos()); } )? + ( + + at = Identifier() + )? { ParserPos pos = ParserPos.sum(posList); if (star) { return SqlIdentifier.star(list, pos, posList); } - return new SqlIdentifier(list, null, pos, posList); + s = new SqlIdentifier(list, null, pos, posList); + s.preferredPlacement = at; + return s; } } @@ -6057,6 +6064,7 @@ SqlPostfixOperator PostfixRowOperator() : | < ASSIGNMENT: "ASSIGNMENT" > | < ASYMMETRIC: "ASYMMETRIC" > | < AT: "AT" > +| < ATSYMBOL: "@" > | < ATOMIC: "ATOMIC" > | < ATTRIBUTE: "ATTRIBUTE" > | < ATTRIBUTES: "ATTRIBUTES" > diff --git a/plugins/sql-language/src/main/java/org/polypheny/db/sql/language/SqlIdentifier.java b/plugins/sql-language/src/main/java/org/polypheny/db/sql/language/SqlIdentifier.java index a4943b83f9..192ea03707 100644 --- a/plugins/sql-language/src/main/java/org/polypheny/db/sql/language/SqlIdentifier.java +++ b/plugins/sql-language/src/main/java/org/polypheny/db/sql/language/SqlIdentifier.java @@ -64,6 +64,8 @@ public class SqlIdentifier extends SqlNode implements Identifier { protected ImmutableList componentPositions; + public String preferredPlacement; + /** * Creates a compound identifier, for example foo.bar. * @@ -315,6 +317,11 @@ public void unparse( SqlWriter writer, int leftPrec, int rightPrec ) { i++; } + if ( preferredPlacement != null ) { + writer.print(" @ "); + writer.identifier( preferredPlacement ); + } + if ( null != collation ) { collation.unparse( writer, leftPrec, rightPrec ); } diff --git a/plugins/sql-language/src/main/java/org/polypheny/db/sql/sql2alg/SqlToAlgConverter.java b/plugins/sql-language/src/main/java/org/polypheny/db/sql/sql2alg/SqlToAlgConverter.java index 832b11f4f0..161893bfe0 100644 --- a/plugins/sql-language/src/main/java/org/polypheny/db/sql/sql2alg/SqlToAlgConverter.java +++ b/plugins/sql-language/src/main/java/org/polypheny/db/sql/sql2alg/SqlToAlgConverter.java @@ -1969,6 +1969,7 @@ private void convertIdentifier( Blackboard bb, SqlIdentifier id, SqlNodeList ext final String datasetName = datasetStack.isEmpty() ? null : datasetStack.peek(); final boolean[] usedDataset = { false }; AlgOptTable table = SqlValidatorUtil.getAlgOptTable( fromNamespace, catalogReader, datasetName, usedDataset ); + table.setPreferredPlacement( id.preferredPlacement ); if ( extendedColumns != null && extendedColumns.size() > 0 ) { assert table != null; final ValidatorTable validatorTable = table.unwrap( ValidatorTable.class );