Skip to content

Commit

Permalink
[#11628] YSQL: Implementing Async Flush for COPY command.
Browse files Browse the repository at this point in the history
Summary:
Currently, as part of any statement, YSQL does some processing and buffers
writes. The write buffer is flushed once either of the below conditions is hit -

(1) the write buffer is full (i.e., hits ysql_session_max_batch_size limit)
(2) a read op is required

On a flush, YSQL directs the writes to required tablet servers in different rpcs
(all issued in parallel). Only once responses to all RPCs are received, the YSQL
backend makes further progress. This waiting behaviour affects performance of
bulk loading using COPY FROM because YSQL spends a lot of time waiting for
responses. It would be ideal to use that wait time for reading further tuples from
the input source and perform necessary processing.

In this diff, we are adding some asynchrony to the flush to allow the YSQL's
COPY FROM to read more tuples after sending a set of rpcs to tablet servers
(without waiting for the responses).

This is done by storing the flush future and not waiting for its result
immediately. Only when YSQL refills its write buffer, it will wait for the
earlier flush's result just before performing the next flush call. Note that the
right choice of ysql_session_max_batch_size is required to help us mask almost
all of the wait time. The optimal batch size is one in which both of the
following tasks (which will run simultaneously after this diff) take almost the
same time -

(1) YSQL fetching and buffering ysql_session_max_batch_size rows
(2) Sending rpcs for the previous ysql_session_max_batch_size rows and arrival
of responses from the tserver

Note also that there might not be any value of ysql_session_max_batch_size for
which both tasks complete at roughly the same time. This could be due to the
inherently different speeds of disk reading and tablet servers' performance.

Test Plan: Tested manually locally and on portal clusters. Experiments show that there is generally a 20-25% increase in speed when using async flush versus using regular flushing.

Reviewers: kannan, smishra, pjain

Reviewed By: pjain

Subscribers: mtakahara, zyu, lnguyen, yql

Differential Revision: https://phabricator.dev.yugabyte.com/D15757
  • Loading branch information
nathanhjli committed Mar 10, 2022
1 parent 398a2b1 commit 1a3a344
Show file tree
Hide file tree
Showing 25 changed files with 297 additions and 97 deletions.
57 changes: 57 additions & 0 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/TestAsyncFlush.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.yb.pgsql;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.sql.Statement;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yb.client.TestUtils;
import org.yb.util.YBTestRunnerNonTsanOnly;

@RunWith(value = YBTestRunnerNonTsanOnly.class)
public class TestAsyncFlush extends BasePgSQLTest {
private static final Logger LOG = LoggerFactory.getLogger(TestAsyncFlush.class);

private void createDataFile(String absFilePath, int totalLines) throws IOException {
File file = new File(absFilePath);
file.createNewFile();

BufferedWriter writer = new BufferedWriter(new FileWriter(file));
writer.write("a,b,c\n");

for (int i = 0; i < totalLines; i++) {
writer.write(i+","+i+","+i+"\n");
}
writer.close();
}

@Test
public void testCopyWithAsyncFlush() throws Exception {
String absFilePath = TestUtils.getBaseTmpDir() + "/copy-async-flush.txt";
String tableName = "copyAsyncFlush";
int totalLines = 100000;

createDataFile(absFilePath, totalLines);

try (Statement statement = connection.createStatement()) {
statement.execute(String.format("CREATE TABLE %s (a int PRIMARY KEY, b int, c int)",
tableName));
statement.execute(String.format(
"COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER)",
tableName, absFilePath));

// Verify row count.
assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, totalLines);

// Verify specific rows are present.
assertOneRow(statement, "SELECT * FROM " + tableName + " WHERE a=0", 0, 0, 0);
assertOneRow(statement, "SELECT * FROM " + tableName + " WHERE a=50000", 50000, 50000, 50000);
assertOneRow(statement, "SELECT * FROM " + tableName + " WHERE a=99999", 99999, 99999, 99999);
}
}
}
2 changes: 1 addition & 1 deletion requirements_frozen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ shutilwhich==1.1.0
six==1.16.0
sys-detection==1.1.1
toml==0.10.2
typed-ast==1.5.1
typed-ast==1.4.3
typing-extensions==3.10.0.2
typing-utils==0.1.0
urllib3==1.26.7
Expand Down
2 changes: 1 addition & 1 deletion src/postgres/src/backend/bootstrap/bootstrap.c
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ InsertOneTuple(Oid objectid)
HeapTupleSetOid(tuple, objectid);

if (IsYugaByteEnabled())
YBCExecuteInsert(boot_reldesc, tupDesc, tuple);
YBCExecuteInsert(boot_reldesc, tupDesc, tuple, false /* use_async_flush */);
else
simple_heap_insert(boot_reldesc, tuple);

Expand Down
18 changes: 14 additions & 4 deletions src/postgres/src/backend/catalog/indexing.c
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,19 @@ YBCatalogTupleInsert(Relation heapRel, HeapTuple tup, bool yb_shared_insert)
*/
if (dboid == YBCGetDatabaseOid(heapRel))
continue; /* Will be done after the loop. */
YBCExecuteInsertForDb(dboid, heapRel, RelationGetDescr(heapRel), tup);
YBCExecuteInsertForDb(dboid,
heapRel,
RelationGetDescr(heapRel),
tup,
false /* use_async_flush */);
}
YB_FOR_EACH_DB_END;
}
oid = YBCExecuteInsertForDb(YBCGetDatabaseOid(heapRel),
heapRel,
RelationGetDescr(heapRel),
tup);
tup,
false /* use_async_flush */);
/* Update the local cache automatically */
YBSetSysCacheTuple(heapRel, tup);
}
Expand Down Expand Up @@ -359,14 +364,19 @@ CatalogTupleInsertWithInfo(Relation heapRel, HeapTuple tup,
*/
if (dboid == YBCGetDatabaseOid(heapRel))
continue; /* Will be done after the loop. */
YBCExecuteInsertForDb(dboid, heapRel, RelationGetDescr(heapRel), tup);
YBCExecuteInsertForDb(dboid,
heapRel,
RelationGetDescr(heapRel),
tup,
false /* use_async_flush */);
}
YB_FOR_EACH_DB_END;
}
oid = YBCExecuteInsertForDb(YBCGetDatabaseOid(heapRel),
heapRel,
RelationGetDescr(heapRel),
tup);
tup,
false /* use_async_flush */);
/* Update the local cache automatically */
YBSetSysCacheTuple(heapRel, tup);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ bool YbIncrementMasterCatalogVersionTableEntry(bool is_breaking_change)
ereport(LOG,
(errmsg("%s: incrementing master catalog version (%sbreaking)",
__func__, is_breaking_change ? "" : "non")));
HandleYBStatus(YBCPgDmlExecWriteOp(update_stmt, &rows_affected_count));
HandleYBStatus(YBCPgDmlExecWriteOp(update_stmt,
&rows_affected_count,
false /* use_async_flush */));
Assert(rows_affected_count == 1);

/* Cleanup. */
Expand Down
7 changes: 5 additions & 2 deletions src/postgres/src/backend/commands/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
#include "pg_yb_utils.h"
#include "executor/ybcModifyTable.h"


bool yb_use_async_flush = true;

#define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
#define OCTVALUE(c) ((c) - '0')
Expand Down Expand Up @@ -3005,7 +3005,10 @@ CopyFrom(CopyState cstate)
}
else
{
YBCExecuteInsert(resultRelInfo->ri_RelationDesc, tupDesc, tuple);
YBCExecuteInsert(resultRelInfo->ri_RelationDesc,
tupDesc,
tuple,
yb_use_async_flush);
}
}
else if (resultRelInfo->ri_FdwRoutine != NULL)
Expand Down
5 changes: 4 additions & 1 deletion src/postgres/src/backend/commands/createas.c
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,10 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)

if (IsYBRelation(myState->rel))
{
YBCExecuteInsert(myState->rel, RelationGetDescr(myState->rel), tuple);
YBCExecuteInsert(myState->rel,
RelationGetDescr(myState->rel),
tuple,
false /* use_async_flush */);
}
else
{
Expand Down
5 changes: 4 additions & 1 deletion src/postgres/src/backend/commands/matview.c
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,10 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
tuple = ExecMaterializeSlot(slot);
if (IsYBRelation(myState->transientrel))
{
YBCExecuteInsert(myState->transientrel, RelationGetDescr(myState->transientrel), tuple);
YBCExecuteInsert(myState->transientrel,
RelationGetDescr(myState->transientrel),
tuple,
false /* use_async_flush */);
}
else
{
Expand Down
5 changes: 4 additions & 1 deletion src/postgres/src/backend/commands/tablecmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -7446,7 +7446,10 @@ YBCopyTableRowsUnchecked(Relation oldrel, Relation newrel, AttrNumber* attmap)
ExecStoreHeapTuple(tuple, newslot, false);

/* Write the tuple out to the new relation */
YBCExecuteInsert(newrel, newslot->tts_tupleDescriptor, tuple);
YBCExecuteInsert(newrel,
newslot->tts_tupleDescriptor,
tuple,
false /* use_async_flush */);

MemoryContextReset(per_tup_cxt);

Expand Down
20 changes: 14 additions & 6 deletions src/postgres/src/backend/commands/ybccmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,9 @@ YBCDropTable(Oid relationId)
{
HandleYBStatusIgnoreNotFound(YBCPgDmlBindTable(handle), &not_found);
int rows_affected_count = 0;
HandleYBStatusIgnoreNotFound(YBCPgDmlExecWriteOp(handle, &rows_affected_count),
HandleYBStatusIgnoreNotFound(YBCPgDmlExecWriteOp(handle,
&rows_affected_count,
false /* use_async_flush */),
&not_found);
}
}
Expand Down Expand Up @@ -705,11 +707,13 @@ YBCTruncateTable(Relation rel) {
/* Create table-level tombstone for colocated tables / tables in tablegroups */
HandleYBStatus(YBCPgNewTruncateColocated(databaseId,
relationId,
false,
false /* is_single_row_txn */,
&handle));
HandleYBStatus(YBCPgDmlBindTable(handle));
int rows_affected_count = 0;
HandleYBStatus(YBCPgDmlExecWriteOp(handle, &rows_affected_count));
HandleYBStatus(YBCPgDmlExecWriteOp(handle,
&rows_affected_count,
false /* use_async_flush */));
}
else
{
Expand Down Expand Up @@ -748,11 +752,13 @@ YBCTruncateTable(Relation rel) {
/* Create index-level tombstone for colocated indexes / indexes in tablegroups */
HandleYBStatus(YBCPgNewTruncateColocated(databaseId,
indexId,
false,
false /* is_single_row_txn */,
&handle));
HandleYBStatus(YBCPgDmlBindTable(handle));
int rows_affected_count = 0;
HandleYBStatus(YBCPgDmlExecWriteOp(handle, &rows_affected_count));
HandleYBStatus(YBCPgDmlExecWriteOp(handle,
&rows_affected_count,
false /* use_async_flush */));
}
else
{
Expand Down Expand Up @@ -1240,7 +1246,9 @@ YBCDropIndex(Oid relationId)
if (valid_handle) {
HandleYBStatusIgnoreNotFound(YBCPgDmlBindTable(handle), &not_found);
int rows_affected_count = 0;
HandleYBStatusIgnoreNotFound(YBCPgDmlExecWriteOp(handle, &rows_affected_count),
HandleYBStatusIgnoreNotFound(YBCPgDmlExecWriteOp(handle,
&rows_affected_count,
false /* use_async_flush */),
&not_found);
}
}
Expand Down
Loading

0 comments on commit 1a3a344

Please sign in to comment.