Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@
<module>presto-tests</module>
<module>presto-product-tests</module>
<module>presto-jdbc</module>
<module>presto-pinot</module>
<module>presto-pinot-toolkit</module>
<module>presto-cli</module>
<module>presto-benchmark-driver</module>
<module>presto-server</module>
Expand Down Expand Up @@ -293,6 +295,18 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-pinot</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-pinot-toolkit</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-mysql</artifactId>
Expand Down Expand Up @@ -1235,6 +1249,12 @@
<version>3.1.4-1</version>
</dependency>

<dependency>
<groupId>com.facebook.presto.pinot</groupId>
<artifactId>pinot-driver</artifactId>
<version>0.1.1</version>
</dependency>

<!-- force newer version to be used for dependencies -->
<dependency>
<groupId>org.javassist</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ public JdbcPlanOptimizerProvider(
}

@Override
public Set<ConnectorPlanOptimizer> getConnectorPlanOptimizers()
public Set<ConnectorPlanOptimizer> getLogicalPlanOptimizers()
{
return ImmutableSet.of();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way is to add a getStage() method in ConnectorPlanOptimizer but current design is fine given we won't (probably shouldn't) have more than two stages.

}

@Override
public Set<ConnectorPlanOptimizer> getPhysicalPlanOptimizers()
{
return ImmutableSet.of(new JdbcComputePushdown(
functionManager,
Expand Down
1 change: 1 addition & 0 deletions presto-docs/src/main/sphinx/connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ from different data sources.
connector/memory
connector/mongodb
connector/mysql
connector/pinot
connector/postgresql
connector/redis
connector/redshift
Expand Down
100 changes: 100 additions & 0 deletions presto-docs/src/main/sphinx/connector/pinot.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
===============
Pinot Connector
===============

The Pinot connector allows querying and creating tables in an external Pinot
database. This can be used to query pinot data or join pinot data with
something else.

Configuration
-------------

To configure the Pinot connector, create a catalog properties file
in ``etc/catalog`` named, for example, ``pinot.properties``, to
mount the Pinot connector as the ``pinot`` catalog.
Create the file with the following contents, replacing the
connection properties as appropriate for your setup:

.. code-block:: none

connector.name=pinot
pinot.controller-urls=controller_host1:9000,controller_host2:9000

Where the ``pinot.controller-urls`` property allows you to specify a
comma separated list of the pinot controller host/port pairs.

Multiple Pinot Servers
^^^^^^^^^^^^^^^^^^^^^^

You can have as many catalogs as you need, so if you have additional
Pinot clusters, simply add another properties file to ``etc/catalog``
with a different name (making sure it ends in ``.properties``). For
example, if you name the property file ``sales.properties``, Presto
will create a catalog named ``sales`` using the configured connector.

Querying Pinot
--------------

The Pinot catalog exposes all pinot tables inside a flat schema. The
schema name is immaterial when querying but running ``SHOW SCHEMAS``,
will show just one schema entry of ``default``.

The name of the pinot catalog is the catalog file you created above
without the ``.properties`` extension.

For example, if you created a
file called ``mypinotcluster.properties``, you can see all the tables
in it using the command::

SHOW TABLES from mypinotcluster.default

OR::

SHOW TABLES from mypinotcluster.foo

Both of these commands will list all the tables in your pinot cluster.
This is because Pinot does not have a notion of schemas.

Consider you have a table called ``clicks`` in the ``mypinotcluster``.
You can see a list of the columns in the ``clicks`` table using either
of the following::

DESCRIBE mypinotcluster.dontcare.clicks;
SHOW COLUMNS FROM mypinotcluster.dontcare.clicks;

Finally, you can access the ``clicks`` table::

SELECT count(*) FROM mypinotcluster.default.clicks;


How the Pinot connector works
-----------------------------

The connector tries to push the maximal subquery inferred from the
presto query into pinot. It can push down everything Pinot supports
including aggregations, group by, all UDFs etc. It generates the
correct Pinot PQL keeping Pinot's quirks in mind.

By default, it sends aggregation and limit queries to the Pinot broker
and does a parallel scan for non-aggregation/non-limit queries. The
pinot broker queries create a single split that lets the Pinot broker
do the scatter gather. Whereas, in the parallel scan mode, there is
one split created for one-or-more Pinot segments and the Pinot servers
are directly contacted by the Presto servers (ie., the Pinot broker is
not involved in the parallel scan mode)

There are a few configurations that control this behavior:

* ``pinot.prefer-broker-queries``: This config is true by default.
Setting it to false will also create parallel plans for
aggregation and limit queries.
* ``pinot.forbid-segment-queries``: This config is false by default.
Setting it to true will forbid parallel querying and force all
querying to happen via the broker.
* ``pinot.non-aggregate-limit-for-broker-queries``: To prevent
overwhelming the broker, the connector only allows querying the
pinot broker for ``short`` queries. We define a ``short`` query to
be either an aggregation (or group-by) query or a query with a limit
less than the value configured for
``pinot.non-aggregate-limit-for-broker-queries``. The default value
for this limit is 25K rows.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@ public class HivePlanOptimizerProvider
implements ConnectorPlanOptimizerProvider
{
@Override
public Set<ConnectorPlanOptimizer> getConnectorPlanOptimizers()
public Set<ConnectorPlanOptimizer> getLogicalPlanOptimizers()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we should split ConnectorPlanOptimizer interface changes to a separate commit.

{
return ImmutableSet.of();
}

@Override
public Set<ConnectorPlanOptimizer> getPhysicalPlanOptimizers()
{
return ImmutableSet.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorPlanOptimizer;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider;
import com.google.common.collect.ImmutableMap;

Expand All @@ -24,6 +25,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Maps.transformValues;
import static java.util.Objects.requireNonNull;
Expand All @@ -43,8 +45,20 @@ public void addPlanOptimizerProvider(ConnectorId connectorId, ConnectorPlanOptim
"ConnectorPlanOptimizerProvider for connector '%s' is already registered", connectorId);
}

public Map<ConnectorId, Set<ConnectorPlanOptimizer>> getOptimizers()
public Map<ConnectorId, Set<ConnectorPlanOptimizer>> getOptimizers(PlanPhase phase)
{
return ImmutableMap.copyOf(transformValues(planOptimizerProviders, ConnectorPlanOptimizerProvider::getConnectorPlanOptimizers));
switch (phase) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put this change to a separate commit.

case LOGICAL:
return ImmutableMap.copyOf(transformValues(planOptimizerProviders, ConnectorPlanOptimizerProvider::getLogicalPlanOptimizers));
case PHYSICAL:
return ImmutableMap.copyOf(transformValues(planOptimizerProviders, ConnectorPlanOptimizerProvider::getPhysicalPlanOptimizers));
default:
throw new PrestoException(GENERIC_INTERNAL_ERROR, "Unknown plan phase " + phase);
}
}

public enum PlanPhase
{
LOGICAL, PHYSICAL
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@
import java.util.List;
import java.util.Set;

import static com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager.PlanPhase.LOGICAL;
import static com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager.PlanPhase.PHYSICAL;

public class PlanOptimizers
{
private final List<PlanOptimizer> optimizers;
Expand Down Expand Up @@ -464,6 +467,10 @@ public PlanOptimizers(
new TranslateExpressions(metadata, sqlParser).rules()));
// After this point, all planNodes should not contain OriginalExpression

// TODO: move PushdownSubfields below this rule
// Pass a supplier so that we pickup connector optimizers that are installed later
builder.add(new ApplyConnectorOptimization(() -> planOptimizerManager.getOptimizers(LOGICAL)));

if (!forceSingleNode) {
builder.add(new ReplicateSemiJoinInDelete()); // Must run before AddExchanges
builder.add((new IterativeOptimizer(
Expand Down Expand Up @@ -532,11 +539,8 @@ public PlanOptimizers(
new AddIntermediateAggregations(),
new RemoveRedundantIdentityProjections())));

// TODO: Do not move other PlanNode to SPI until ApplyConnectorOptimization is moved to the end of logical planning (i.e., where AddExchanges lives)
// TODO: Run PruneUnreferencedOutputs and UnaliasSymbolReferences once we have cleaned it up
// Pass a supplier so that we pickup connector optimizers that are installed later
builder.add(
new ApplyConnectorOptimization(planOptimizerManager::getOptimizers),
new ApplyConnectorOptimization(() -> planOptimizerManager.getOptimizers(PHYSICAL)),
new IterativeOptimizer(
ruleStats,
statsCalculator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -889,4 +889,9 @@ public TypeProvider getTypes()
{
return TypeProvider.viewOf(variables);
}

public PlanNodeIdAllocator getIdAllocator()
{
return idAllocator;
}
}
Loading