Skip to content

Commit

Permalink
fix #1205, add ShardingExecuteGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Sep 1, 2018
1 parent b94a8ee commit a2d8217
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private <O> List<O> getResults(final O firstResult, final Collection<ListenableF
* @return execute result
* @throws SQLException throw if execute failure
*/
public <I, O> List<O> groupExecute(final Map<String, List<List<I>>> inputs, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
public <I, O> List<O> groupExecute(final Map<String, List<ShardingExecuteGroup<I>>> inputs, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
return groupExecute(inputs, null, callback);
}

Expand All @@ -147,42 +147,42 @@ public <I, O> List<O> groupExecute(final Map<String, List<List<I>>> inputs, fina
* @throws SQLException throw if execute failure
*/
public <I, O> List<O> groupExecute(
final Map<String, List<List<I>>> inputs, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
final Map<String, List<ShardingExecuteGroup<I>>> inputs, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
if (inputs.isEmpty()) {
return Collections.emptyList();
}
String firstKey = inputs.keySet().iterator().next();
Iterator<List<I>> firstInputGroup = inputs.get(firstKey).iterator();
Collection<I> firstInputs = firstInputGroup.next();
Iterator<ShardingExecuteGroup<I>> firstInputGroup = inputs.get(firstKey).iterator();
ShardingExecuteGroup<I> firstInputs = firstInputGroup.next();
inputs.put(firstKey, Lists.newArrayList(firstInputGroup));
Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(inputs, callback);
return getGroupResults(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
}

private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final Map<String, List<List<I>>> inputs, final ShardingGroupExecuteCallback<I, O> callback) {
private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final Map<String, List<ShardingExecuteGroup<I>>> inputs, final ShardingGroupExecuteCallback<I, O> callback) {
Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>();
for (Entry<String, List<List<I>>> entry : inputs.entrySet()) {
for (Entry<String, List<ShardingExecuteGroup<I>>> entry : inputs.entrySet()) {
result.addAll(asyncGroupExecute(entry.getValue(), callback));
}
return result;
}

private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final List<List<I>> inputs, final ShardingGroupExecuteCallback<I, O> callback) {
private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final List<ShardingExecuteGroup<I>> shardingExecuteGroups, final ShardingGroupExecuteCallback<I, O> callback) {
Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>();
for (final List<I> each : inputs) {
for (final ShardingExecuteGroup<I> each : shardingExecuteGroups) {
result.add(executorService.submit(new Callable<Collection<O>>() {

@Override
public Collection<O> call() throws SQLException {
return callback.execute(each);
return callback.execute(each.getInputs());
}
}));
}
return result;
}

private <I, O> Collection<O> syncGroupExecute(final Collection<I> inputs, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
return callback.execute(inputs);
private <I, O> Collection<O> syncGroupExecute(final ShardingExecuteGroup<I> executeGroup, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
return callback.execute(executeGroup.getInputs());
}

private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package io.shardingsphere.core.executor;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.util.List;

/**
* Sharding execute group.
*
* @author zhangliang
* @param <T> type of inputs value
*/
@RequiredArgsConstructor
@Getter
public final class ShardingExecuteGroup<T> {

private final List<T> inputs;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package io.shardingsphere.core.executor.sql.execute;

import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.executor.ShardingExecuteEngine;
import io.shardingsphere.core.event.executor.overall.OverallExecutionEvent;
import io.shardingsphere.core.executor.ShardingExecuteEngine;
import io.shardingsphere.core.executor.ShardingExecuteGroup;
import io.shardingsphere.core.executor.sql.StatementExecuteUnit;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorExceptionHandler;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -95,27 +96,27 @@ public <T> List<T> execute(final Collection<? extends StatementExecuteUnit> exec
* @return execute result
* @throws SQLException SQL exception
*/
public <T> List<T> execute(final Map<String, List<List<? extends StatementExecuteUnit>>> executeUnits, final SQLExecuteCallback<T> executeCallback) throws SQLException {
public <T> List<T> execute(final Map<String, List<ShardingExecuteGroup<? extends StatementExecuteUnit>>> executeUnits, final SQLExecuteCallback<T> executeCallback) throws SQLException {
return execute(executeUnits, null, executeCallback);
}

/**
* Execute.
*
* @param executeUnits execute units
* @param executeUnitGroups execute unit groups
* @param firstExecuteCallback first execute callback
* @param executeCallback execute callback
* @param <T> class type of return value
* @return execute result
* @throws SQLException SQL exception
*/
@SuppressWarnings("unchecked")
public <T> List<T> execute(final Map<String, List<List<? extends StatementExecuteUnit>>> executeUnits,
public <T> List<T> execute(final Map<String, List<ShardingExecuteGroup<? extends StatementExecuteUnit>>> executeUnitGroups,
final SQLExecuteCallback<T> firstExecuteCallback, final SQLExecuteCallback<T> executeCallback) throws SQLException {
OverallExecutionEvent event = new OverallExecutionEvent(executeUnits.size() > 1);
OverallExecutionEvent event = new OverallExecutionEvent(executeUnitGroups.size() > 1);
ShardingEventBusInstance.getInstance().post(event);
try {
List<T> result = executeEngine.groupExecute((Map) executeUnits, firstExecuteCallback, executeCallback);
List<T> result = executeEngine.groupExecute((Map) executeUnitGroups, firstExecuteCallback, executeCallback);
event.setExecuteSuccess();
return result;
// CHECKSTYLE:OFF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package io.shardingsphere.core.executor.sql.prepare;

import com.google.common.collect.Lists;
import io.shardingsphere.core.executor.ShardingExecuteGroup;
import io.shardingsphere.core.executor.sql.StatementExecuteUnit;
import io.shardingsphere.core.routing.SQLExecutionUnit;
import io.shardingsphere.core.routing.SQLUnit;
Expand Down Expand Up @@ -45,16 +46,17 @@ public final class SQLExecutePrepareTemplate {
private final int maxConnectionsSizePerQuery;

/**
* Get statement execute units.
* Get statement execute unit groups.
*
* @param sqlExecutionUnits units execution SQL units
* @param callback SQL execute prepare callback
* @return key is data source name, value is statement execute unit groups
* @throws SQLException SQL exception
*/
public Map<String, List<List<StatementExecuteUnit>>> getStatementExecuteUnits(final Collection<SQLExecutionUnit> sqlExecutionUnits, final SQLExecutePrepareCallback callback) throws SQLException {
public Map<String, List<ShardingExecuteGroup<StatementExecuteUnit>>> getStatementExecuteUnitGroups(
final Collection<SQLExecutionUnit> sqlExecutionUnits, final SQLExecutePrepareCallback callback) throws SQLException {
Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(sqlExecutionUnits);
Map<String, List<List<StatementExecuteUnit>>> result = new HashMap<>(sqlUnitGroups.size(), 1);
Map<String, List<ShardingExecuteGroup<StatementExecuteUnit>>> result = new HashMap<>(sqlUnitGroups.size(), 1);
for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
result.put(entry.getKey(), partitionSQLUnits(entry.getKey(), entry.getValue(), callback));
}
Expand All @@ -72,8 +74,9 @@ private Map<String, List<SQLUnit>> getSQLUnitGroups(final Collection<SQLExecutio
return result;
}

private List<List<StatementExecuteUnit>> partitionSQLUnits(final String dataSourceName, final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {
List<List<StatementExecuteUnit>> result = new LinkedList<>();
private List<ShardingExecuteGroup<StatementExecuteUnit>> partitionSQLUnits(
final String dataSourceName, final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {
List<ShardingExecuteGroup<StatementExecuteUnit>> result = new LinkedList<>();
int desiredPartitionSize = Math.max(sqlUnits.size() / maxConnectionsSizePerQuery, 1);
for (List<SQLUnit> each : Lists.partition(sqlUnits, desiredPartitionSize)) {
// TODO get connection sync to prevent dead lock
Expand All @@ -82,12 +85,12 @@ private List<List<StatementExecuteUnit>> partitionSQLUnits(final String dataSour
return result;
}

private List<StatementExecuteUnit> getStatementExecuteUnitGroup(
private ShardingExecuteGroup<StatementExecuteUnit> getStatementExecuteUnitGroup(
final Connection connection, final String dataSourceName, final List<SQLUnit> sqlUnitGroup, final SQLExecutePrepareCallback callback) throws SQLException {
List<StatementExecuteUnit> result = new LinkedList<>();
for (SQLUnit each : sqlUnitGroup) {
result.add(callback.createStatementExecuteUnit(connection, new SQLExecutionUnit(dataSourceName, each)));
}
return result;
return new ShardingExecuteGroup<>(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.Lists;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.core.executor.ShardingExecuteEngine;
import io.shardingsphere.core.executor.ShardingExecuteGroup;
import io.shardingsphere.core.executor.ShardingGroupExecuteCallback;
import io.shardingsphere.core.metadata.datasource.DataSourceMetaData;
import io.shardingsphere.core.metadata.datasource.ShardingDataSourceMetaData;
Expand Down Expand Up @@ -73,7 +74,7 @@ public TableMetaData load(final String logicTableName, final ShardingRule shardi
}

private List<TableMetaData> load(final Map<String, List<DataNode>> dataNodeGroups, final ShardingDataSourceNames shardingDataSourceNames) throws SQLException {
return executeEngine.groupExecute(partitionDataNodeGroups(dataNodeGroups), new ShardingGroupExecuteCallback<DataNode, TableMetaData>() {
return executeEngine.groupExecute(getDataNodeGroups(dataNodeGroups), new ShardingGroupExecuteCallback<DataNode, TableMetaData>() {

@Override
public Collection<TableMetaData> execute(final Collection<DataNode> dataNodes) throws SQLException {
Expand All @@ -89,17 +90,25 @@ private Collection<TableMetaData> load(final String dataSourceName, final String
Collection<TableMetaData> result = new LinkedList<>();
try (Connection connection = connectionManager.getConnection(dataSourceName)) {
for (DataNode each : dataNodes) {
result.add(new TableMetaData(isTableExist(connection, catalog, each.getTableName()) ? getColumnMetaDataList(connection, catalog, each.getTableName()) : Collections.<ColumnMetaData>emptyList()));
result.add(new TableMetaData(
isTableExist(connection, catalog, each.getTableName()) ? getColumnMetaDataList(connection, catalog, each.getTableName()) : Collections.<ColumnMetaData>emptyList()));
}
}
return result;
}

private Map<String, List<List<DataNode>>> partitionDataNodeGroups(final Map<String, List<DataNode>> dataNodeGroups) {
Map<String, List<List<DataNode>>> result = new HashMap<>(dataNodeGroups.size(), 1);
private Map<String, List<ShardingExecuteGroup<DataNode>>> getDataNodeGroups(final Map<String, List<DataNode>> dataNodeGroups) {
Map<String, List<ShardingExecuteGroup<DataNode>>> result = new HashMap<>(dataNodeGroups.size(), 1);
for (Entry<String, List<DataNode>> entry : dataNodeGroups.entrySet()) {
int desiredPartitionSize = Math.max(entry.getValue().size() / maxConnectionsSizePerQuery, 1);
result.put(entry.getKey(), Lists.partition(entry.getValue(), desiredPartitionSize));
result.put(entry.getKey(), getDataNodeGroups(entry.getValue()));
}
return result;
}

private List<ShardingExecuteGroup<DataNode>> getDataNodeGroups(final List<DataNode> dataNodes) {
List<ShardingExecuteGroup<DataNode>> result = new LinkedList<>();
for (List<DataNode> each : Lists.partition(dataNodes, Math.max(dataNodes.size() / maxConnectionsSizePerQuery, 1))) {
result.add(new ShardingExecuteGroup<>(each));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package io.shardingsphere.core.executor.prepared;

import io.shardingsphere.core.constant.SQLType;
import io.shardingsphere.core.executor.ShardingExecuteGroup;
import io.shardingsphere.core.executor.sql.execute.SQLExecuteCallback;
import io.shardingsphere.core.executor.sql.execute.SQLExecuteTemplate;

Expand All @@ -34,10 +35,10 @@ public final class ConnectionStrictlyPreparedStatementExecutor extends PreparedS

private final SQLExecuteTemplate executeTemplate;

private final Map<String, List<List<PreparedStatementUnit>>> preparedStatementUnitGroups;
private final Map<String, List<ShardingExecuteGroup<PreparedStatementUnit>>> preparedStatementUnitGroups;

public ConnectionStrictlyPreparedStatementExecutor(
final SQLType sqlType, final SQLExecuteTemplate executeTemplate, final Map<String, List<List<PreparedStatementUnit>>> preparedStatementUnitGroups) {
final SQLType sqlType, final SQLExecuteTemplate executeTemplate, final Map<String, List<ShardingExecuteGroup<PreparedStatementUnit>>> preparedStatementUnitGroups) {
super(sqlType);
this.executeTemplate = executeTemplate;
this.preparedStatementUnitGroups = preparedStatementUnitGroups;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package io.shardingsphere.core.executor.statement;

import io.shardingsphere.core.constant.SQLType;
import io.shardingsphere.core.executor.ShardingExecuteGroup;
import io.shardingsphere.core.executor.sql.execute.SQLExecuteCallback;
import io.shardingsphere.core.executor.sql.execute.SQLExecuteTemplate;

Expand All @@ -34,9 +35,9 @@ public final class ConnectionStrictlyStatementExecutor extends StatementExecutor

private final SQLExecuteTemplate executeTemplate;

private final Map<String, List<List<StatementUnit>>> statementUnitGroups;
private final Map<String, List<ShardingExecuteGroup<StatementUnit>>> statementUnitGroups;

public ConnectionStrictlyStatementExecutor(final SQLType sqlType, final SQLExecuteTemplate executeTemplate, final Map<String, List<List<StatementUnit>>> statementUnitGroups) {
public ConnectionStrictlyStatementExecutor(final SQLType sqlType, final SQLExecuteTemplate executeTemplate, final Map<String, List<ShardingExecuteGroup<StatementUnit>>> statementUnitGroups) {
super(sqlType);
this.executeTemplate = executeTemplate;
this.statementUnitGroups = statementUnitGroups;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,20 @@
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.event.merger.MergeEvent;
import io.shardingsphere.core.event.routing.RoutingEvent;
import io.shardingsphere.core.executor.ShardingExecuteGroup;
import io.shardingsphere.core.executor.batch.BatchPreparedStatementUnit;
import io.shardingsphere.core.executor.batch.ConnectionStrictlyBatchPreparedStatementExecutor;
import io.shardingsphere.core.executor.batch.MemoryStrictlyBatchPreparedStatementExecutor;
import io.shardingsphere.core.executor.prepared.ConnectionStrictlyPreparedStatementExecutor;
import io.shardingsphere.core.executor.prepared.MemoryStrictlyPreparedStatementExecutor;
import io.shardingsphere.core.executor.prepared.PreparedStatementExecutor;
import io.shardingsphere.core.executor.prepared.PreparedStatementUnit;
import io.shardingsphere.core.executor.sql.execute.SQLExecuteTemplate;
import io.shardingsphere.core.executor.sql.StatementExecuteUnit;
import io.shardingsphere.core.executor.sql.prepare.SQLExecutePrepareCallback;
import io.shardingsphere.core.executor.sql.prepare.SQLExecutePrepareTemplate;
import io.shardingsphere.core.executor.sql.execute.SQLExecuteTemplate;
import io.shardingsphere.core.executor.sql.execute.result.MemoryQueryResult;
import io.shardingsphere.core.executor.sql.execute.result.StreamQueryResult;
import io.shardingsphere.core.executor.sql.prepare.SQLExecutePrepareCallback;
import io.shardingsphere.core.executor.sql.prepare.SQLExecutePrepareTemplate;
import io.shardingsphere.core.jdbc.adapter.AbstractShardingPreparedStatementAdapter;
import io.shardingsphere.core.jdbc.core.ShardingContext;
import io.shardingsphere.core.jdbc.core.connection.ShardingConnection;
Expand Down Expand Up @@ -271,9 +272,9 @@ private Collection<PreparedStatementUnit> getExecuteUnitsForMemoryStrictly() thr
}

@SuppressWarnings("unchecked")
private Map<String, List<List<PreparedStatementUnit>>> getExecuteUnitsForConnectionStrictly() throws SQLException {
private Map<String, List<ShardingExecuteGroup<PreparedStatementUnit>>> getExecuteUnitsForConnectionStrictly() throws SQLException {
SQLExecutePrepareTemplate sqlExecutePrepareTemplate = new SQLExecutePrepareTemplate(connection.getShardingDataSource().getShardingContext().getMaxConnectionsSizePerQuery());
return (Map) sqlExecutePrepareTemplate.getStatementExecuteUnits(routeResult.getExecutionUnits(), new SQLExecutePrepareCallback() {
return (Map) sqlExecutePrepareTemplate.getStatementExecuteUnitGroups(routeResult.getExecutionUnits(), new SQLExecutePrepareCallback() {

@Override
public Connection getConnection(final String dataSourceName) throws SQLException {
Expand Down
Loading

0 comments on commit a2d8217

Please sign in to comment.