Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a way to specify a preferred store which should be used for queries #439

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/java/org/polypheny/db/plan/AlgOptTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ public interface AlgOptTable extends Wrapper {
*/
List<ColumnStrategy> getColumnStrategies();

void setPreferredPlacement( String placement );
String getPreferredPlacement();

default Table getTable() {
return null;
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/org/polypheny/db/prepare/AlgOptTableImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ public class AlgOptTableImpl extends Prepare.AbstractPreparingTable {
private final transient Function<Class, Expression> expressionFunction;
private final ImmutableList<String> names;

String preferredPlacement;

/**
* Estimate for the row count, or null.
* <p>
Expand All @@ -121,6 +123,7 @@ private AlgOptTableImpl(
this.table = table; // may be null
this.expressionFunction = expressionFunction; // may be null
this.rowCount = rowCount; // may be null
this.preferredPlacement = null;
}


Expand Down Expand Up @@ -403,6 +406,18 @@ public AccessType getAllowedAccess() {
}


@Override
public void setPreferredPlacement( String placement ) {
preferredPlacement = placement;
}


@Override
public String getPreferredPlacement() {
return preferredPlacement;
}


/**
* Helper for {@link #getColumnStrategies()}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public AlgNode visit( LogicalMatch match ) {

@Override
public AlgNode visit( Scan scan ) {
hashBasis.add( "Scan#" + scan.getTable().getQualifiedName() );
hashBasis.add( "Scan#" + scan.getTable().getQualifiedName() + "@" + scan.getTable().getPreferredPlacement() );
// get available columns for every table scan
this.getAvailableColumns( scan );

Expand Down
11 changes: 11 additions & 0 deletions core/src/test/java/org/polypheny/db/catalog/MockCatalogReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,17 @@ public AlgDataType getRowType() {
}


@Override
public void setPreferredPlacement( String placement ) {
}


@Override
public String getPreferredPlacement() {
return null;
}


public static MockTable create( MockCatalogReader catalogReader, MockSchema schema, String name, boolean stream, double rowCount ) {
return create( catalogReader, schema, name, stream, rowCount, null );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@
package org.polypheny.db.routing.routers;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -43,7 +48,10 @@
import org.polypheny.db.algebra.logical.relational.LogicalValues;
import org.polypheny.db.catalog.Catalog;
import org.polypheny.db.catalog.Catalog.Pattern;
import org.polypheny.db.catalog.entity.CatalogAdapter;
import org.polypheny.db.catalog.entity.CatalogColumnPlacement;
import org.polypheny.db.catalog.entity.CatalogTable;
import org.polypheny.db.catalog.exceptions.UnknownAdapterIdRuntimeException;
import org.polypheny.db.plan.AlgOptCluster;
import org.polypheny.db.prepare.AlgOptTableImpl;
import org.polypheny.db.rex.RexBuilder;
Expand Down Expand Up @@ -82,6 +90,8 @@ public abstract class AbstractDqlRouter extends BaseRouter implements Router {
*/
protected boolean cancelQuery = false;

// catalogTable.id -> unique placement name
final Map<Long, String> preferencePerTable = new HashMap<>();

/**
* Abstract methods which will determine routing strategy. Not implemented in abstract class.
Expand Down Expand Up @@ -113,6 +123,45 @@ protected abstract List<RoutedAlgBuilder> handleNonePartitioning(
AlgOptCluster cluster,
LogicalQueryInformation queryInformation );

/* Copied from BaseRouter.java, with added awarness of placement preference */
public Map<Long, List<CatalogColumnPlacement>> selectPlacementWithPreference( CatalogTable table ) {
// Find the adapter with the most column placements
int adapterIdWithMostPlacements = -1;
int numOfPlacements = 0;
final String preferredPlacement = preferencePerTable.getOrDefault( table.id, null );
for ( Entry<Integer, ImmutableList<Long>> entry : catalog.getColumnPlacementsByAdapter( table.id ).entrySet() ) {
if ( preferredPlacement != null ) {
try {
final CatalogAdapter adapter = catalog.getAdapter( entry.getKey() );
if ( ! preferredPlacement.equals( adapter.uniqueName ) ) {
continue;
}
} catch (UnknownAdapterIdRuntimeException e) {
// getColumnplacementsByAdapter can return id
// values which are not valid
continue;
}
}
if ( entry.getValue().size() > numOfPlacements ) {
adapterIdWithMostPlacements = entry.getKey();
numOfPlacements = entry.getValue().size();
}
}

// Take the adapter with most placements as base and add missing column placements
List<CatalogColumnPlacement> placementList = new LinkedList<>();
for ( long cid : table.fieldIds ) {
if ( catalog.getDataPlacement( adapterIdWithMostPlacements, table.id ).columnPlacementsOnAdapter.contains( cid ) ) {
placementList.add( Catalog.getInstance().getColumnPlacement( adapterIdWithMostPlacements, cid ) );
} else {
placementList.add( Catalog.getInstance().getColumnPlacement( cid ).get( 0 ) );
}
}

return new HashMap<>() {{
put( table.partitionProperty.partitionIds.get( 0 ), placementList );
}};
}

/**
* Abstract router only routes DQL queries.
Expand Down Expand Up @@ -225,6 +274,7 @@ protected List<RoutedAlgBuilder> buildSelect( AlgNode node, List<RoutedAlgBuilde
}

CatalogTable catalogTable = catalog.getTable( logicalTable.getTableId() );
preferencePerTable.put( catalogTable.id, table.getPreferredPlacement() );

// Check if table is even horizontal partitioned
if ( catalogTable.partitionProperty.isPartitioned ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protected List<RoutedAlgBuilder> handleVerticalPartitioningOrReplication( AlgNod
@Override
protected List<RoutedAlgBuilder> handleNonePartitioning( AlgNode node, CatalogTable catalogTable, Statement statement, List<RoutedAlgBuilder> builders, AlgOptCluster cluster, LogicalQueryInformation queryInformation ) {
// Get placements and convert into placement distribution
final Map<Long, List<CatalogColumnPlacement>> placements = selectPlacement( catalogTable );
final Map<Long, List<CatalogColumnPlacement>> placements = selectPlacementWithPreference( catalogTable );

// Only one builder available
builders.get( 0 ).addPhysicalInfo( placements );
Expand Down
12 changes: 10 additions & 2 deletions plugins/sql-language/src/main/codegen/templates/Parser.jj
Original file line number Diff line number Diff line change
Expand Up @@ -4262,7 +4262,8 @@ SqlIdentifier CompoundIdentifier() :
{
List<String> list = new ArrayList<String>();
List<ParserPos> posList = new ArrayList<ParserPos>();
String p;
String p, at = null;
SqlIdentifier s;
boolean star = false;
}
{
Expand All @@ -4286,12 +4287,18 @@ SqlIdentifier CompoundIdentifier() :
posList.add(getPos());
}
)?
(
<ATSYMBOL>
at = Identifier()
)?
{
ParserPos pos = ParserPos.sum(posList);
if (star) {
return SqlIdentifier.star(list, pos, posList);
}
return new SqlIdentifier(list, null, pos, posList);
s = new SqlIdentifier(list, null, pos, posList);
s.preferredPlacement = at;
return s;
}
}

Expand Down Expand Up @@ -6057,6 +6064,7 @@ SqlPostfixOperator PostfixRowOperator() :
| < ASSIGNMENT: "ASSIGNMENT" >
| < ASYMMETRIC: "ASYMMETRIC" >
| < AT: "AT" >
| < ATSYMBOL: "@" >
| < ATOMIC: "ATOMIC" >
| < ATTRIBUTE: "ATTRIBUTE" >
| < ATTRIBUTES: "ATTRIBUTES" >
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class SqlIdentifier extends SqlNode implements Identifier {
protected ImmutableList<ParserPos> componentPositions;


public String preferredPlacement;

/**
* Creates a compound identifier, for example <code>foo.bar</code>.
*
Expand Down Expand Up @@ -315,6 +317,11 @@ public void unparse( SqlWriter writer, int leftPrec, int rightPrec ) {
i++;
}

if ( preferredPlacement != null ) {
writer.print(" @ ");
writer.identifier( preferredPlacement );
}

if ( null != collation ) {
collation.unparse( writer, leftPrec, rightPrec );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1969,6 +1969,7 @@ private void convertIdentifier( Blackboard bb, SqlIdentifier id, SqlNodeList ext
final String datasetName = datasetStack.isEmpty() ? null : datasetStack.peek();
final boolean[] usedDataset = { false };
AlgOptTable table = SqlValidatorUtil.getAlgOptTable( fromNamespace, catalogReader, datasetName, usedDataset );
table.setPreferredPlacement( id.preferredPlacement );
if ( extendedColumns != null && extendedColumns.size() > 0 ) {
assert table != null;
final ValidatorTable validatorTable = table.unwrap( ValidatorTable.class );
Expand Down