From 6e68ba6380dc825a242e7f0d0a53442bba3a4a61 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Mon, 4 Dec 2023 12:32:54 +0900 Subject: [PATCH 01/10] Extract COPY TO format implementations This is a part of making COPY format extendable. See also these past discussions: * New Copy Formats - avro/orc/parquet: https://www.postgresql.org/message-id/flat/20180210151304.fonjztsynewldfba%40gmail.com * Make COPY extendable in order to support Parquet and other formats: https://www.postgresql.org/message-id/flat/CAJ7c6TM6Bz1c3F04Cy6%2BSzuWfKmr0kU8c_3Stnvh_8BR0D6k8Q%40mail.gmail.com This doesn't change the current behavior. This just introduces CopyToRoutine, which just has function pointers of format implementation like TupleTableSlotOps, and use it for existing "text", "csv" and "binary" format implementations. Note that CopyToRoutine can't be used from extensions yet because CopySend*() aren't exported yet. Extensions can't send formatted data to a destination without CopySend*(). They will be exported by subsequent patches. Here is a benchmark result with/without this change because there was a discussion that we should care about performance regression: https://www.postgresql.org/message-id/3741749.1655952719%40sss.pgh.pa.us > I think that step 1 ought to be to convert the existing formats into > plug-ins, and demonstrate that there's no significant loss of > performance. You can see that there is no significant loss of performance: Data: Random 32 bit integers: CREATE TABLE data (int32 integer); INSERT INTO data SELECT random() * 10000 FROM generate_series(1, ${n_records}); The number of records: 100K, 1M and 10M 100K without this change: format,elapsed time (ms) text,11.002 csv,11.696 binary,11.352 100K with this change: format,elapsed time (ms) text,100000,11.562 csv,100000,11.889 binary,100000,10.825 1M without this change: format,elapsed time (ms) text,108.359 csv,114.233 binary,111.251 1M with this change: format,elapsed time (ms) text,111.269 csv,116.277 binary,104.765 10M without this change: format,elapsed time (ms) text,1090.763 csv,1136.103 binary,1137.141 10M with this change: format,elapsed time (ms) text,1082.654 csv,1196.991 binary,1069.697 --- contrib/file_fdw/file_fdw.c | 2 +- src/backend/commands/copy.c | 43 +++- src/backend/commands/copyfrom.c | 2 +- src/backend/commands/copyto.c | 428 ++++++++++++++++++++------------ src/include/commands/copy.h | 7 +- src/include/commands/copyapi.h | 59 +++++ 6 files changed, 376 insertions(+), 165 deletions(-) create mode 100644 src/include/commands/copyapi.h diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c index 249d82d3a0593..9e4e819858979 100644 --- a/contrib/file_fdw/file_fdw.c +++ b/contrib/file_fdw/file_fdw.c @@ -329,7 +329,7 @@ file_fdw_validator(PG_FUNCTION_ARGS) /* * Now apply the core COPY code's validation logic for more checks. */ - ProcessCopyOptions(NULL, NULL, true, other_options); + ProcessCopyOptions(NULL, NULL, true, NULL, other_options); /* * Either filename or program option is required for file_fdw foreign diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index cc0786c6f4aec..5f3697a5f966c 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -442,6 +442,9 @@ defGetCopyOnErrorChoice(DefElem *def, ParseState *pstate, bool is_from) * a list of options. In that usage, 'opts_out' can be passed as NULL and * the collected data is just leaked until CurrentMemoryContext is reset. * + * 'cstate' is CopyToState* for !is_from, CopyFromState* for is_from. 'cstate' + * may be NULL. For example, file_fdw uses NULL. + * * Note that additional checking, such as whether column names listed in FORCE * QUOTE actually exist, has to be applied later. This just checks for * self-consistency of the options list. @@ -450,6 +453,7 @@ void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, + void *cstate, List *options) { bool format_specified = false; @@ -464,7 +468,13 @@ ProcessCopyOptions(ParseState *pstate, opts_out->file_encoding = -1; - /* Extract options from the statement node tree */ + /* Text is the default format. */ + opts_out->to_routine = &CopyToRoutineText; + + /* + * Extract only the "format" option to detect target routine as the first + * step + */ foreach(option, options) { DefElem *defel = lfirst_node(DefElem, option); @@ -479,15 +489,29 @@ ProcessCopyOptions(ParseState *pstate, if (strcmp(fmt, "text") == 0) /* default format */ ; else if (strcmp(fmt, "csv") == 0) + { opts_out->csv_mode = true; + opts_out->to_routine = &CopyToRoutineCSV; + } else if (strcmp(fmt, "binary") == 0) + { opts_out->binary = true; + opts_out->to_routine = &CopyToRoutineBinary; + } else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY format \"%s\" not recognized", fmt), parser_errposition(pstate, defel->location))); } + } + /* Extract options except "format" from the statement node tree */ + foreach(option, options) + { + DefElem *defel = lfirst_node(DefElem, option); + + if (strcmp(defel->defname, "format") == 0) + continue; else if (strcmp(defel->defname, "freeze") == 0) { if (freeze_specified) @@ -616,11 +640,18 @@ ProcessCopyOptions(ParseState *pstate, opts_out->on_error = defGetCopyOnErrorChoice(defel, pstate, is_from); } else - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("option \"%s\" not recognized", - defel->defname), - parser_errposition(pstate, defel->location))); + { + bool processed = false; + + if (!is_from) + processed = opts_out->to_routine->CopyToProcessOption(cstate, defel); + if (!processed) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("option \"%s\" not recognized", + defel->defname), + parser_errposition(pstate, defel->location))); + } } /* diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 1fe70b9133827..fb3d4d929602e 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -1416,7 +1416,7 @@ BeginCopyFrom(ParseState *pstate, oldcontext = MemoryContextSwitchTo(cstate->copycontext); /* Extract options from the statement node tree */ - ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options); + ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , cstate, options); /* Process the target relation */ cstate->rel = rel; diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index d3dc3fc854f14..6547b7c654ad4 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -131,6 +131,275 @@ static void CopySendEndOfRow(CopyToState cstate); static void CopySendInt32(CopyToState cstate, int32 val); static void CopySendInt16(CopyToState cstate, int16 val); +/* + * CopyToRoutine implementations. + */ + +/* + * CopyToRoutine implementation for "text" and "csv". CopyToText*() + * refer cstate->opts.csv_mode and change their behavior. We can split this + * implementation and stop referring cstate->opts.csv_mode later. + */ + +/* All "text" and "csv" options are parsed in ProcessCopyOptions(). We may + * move the code to here later. */ +static bool +CopyToTextProcessOption(CopyToState cstate, DefElem *defel) +{ + return false; +} + +static int16 +CopyToTextGetFormat(CopyToState cstate) +{ + return 0; +} + +static void +CopyToTextSendEndOfRow(CopyToState cstate) +{ + switch (cstate->copy_dest) + { + case COPY_FILE: + /* Default line termination depends on platform */ +#ifndef WIN32 + CopySendChar(cstate, '\n'); +#else + CopySendString(cstate, "\r\n"); +#endif + break; + case COPY_FRONTEND: + /* The FE/BE protocol uses \n as newline for all platforms */ + CopySendChar(cstate, '\n'); + break; + default: + break; + } + CopySendEndOfRow(cstate); +} + +static void +CopyToTextStart(CopyToState cstate, TupleDesc tupDesc) +{ + int num_phys_attrs; + ListCell *cur; + + num_phys_attrs = tupDesc->natts; + /* Get info about the columns we need to process. */ + cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Oid out_func_oid; + bool isvarlena; + Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); + + getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } + + /* + * For non-binary copy, we need to convert null_print to file encoding, + * because it will be sent directly with CopySendString. + */ + if (cstate->need_transcoding) + cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, + cstate->opts.null_print_len, + cstate->file_encoding); + + /* if a header has been requested send the line */ + if (cstate->opts.header_line) + { + bool hdr_delim = false; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + char *colname; + + if (hdr_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + hdr_delim = true; + + colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); + + if (cstate->opts.csv_mode) + CopyAttributeOutCSV(cstate, colname, false, + list_length(cstate->attnumlist) == 1); + else + CopyAttributeOutText(cstate, colname); + } + + CopyToTextSendEndOfRow(cstate); + } +} + +static void +CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + bool need_delim = false; + FmgrInfo *out_functions = cstate->out_functions; + ListCell *cur; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (need_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + need_delim = true; + + if (isnull) + { + CopySendString(cstate, cstate->opts.null_print_client); + } + else + { + char *string; + + string = OutputFunctionCall(&out_functions[attnum - 1], value); + if (cstate->opts.csv_mode) + CopyAttributeOutCSV(cstate, string, + cstate->opts.force_quote_flags[attnum - 1], + list_length(cstate->attnumlist) == 1); + else + CopyAttributeOutText(cstate, string); + } + } + + CopyToTextSendEndOfRow(cstate); +} + +static void +CopyToTextEnd(CopyToState cstate) +{ +} + +/* + * CopyToRoutine implementation for "binary". + */ + +/* All "binary" options are parsed in ProcessCopyOptions(). We may move the + * code to here later. */ +static bool +CopyToBinaryProcessOption(CopyToState cstate, DefElem *defel) +{ + return false; +} + +static int16 +CopyToBinaryGetFormat(CopyToState cstate) +{ + return 1; +} + +static void +CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc) +{ + int num_phys_attrs; + ListCell *cur; + + num_phys_attrs = tupDesc->natts; + /* Get info about the columns we need to process. */ + cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Oid out_func_oid; + bool isvarlena; + Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); + + getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } + + { + /* Generate header for a binary copy */ + int32 tmp; + + /* Signature */ + CopySendData(cstate, BinarySignature, 11); + /* Flags field */ + tmp = 0; + CopySendInt32(cstate, tmp); + /* No header extension */ + tmp = 0; + CopySendInt32(cstate, tmp); + } +} + +static void +CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + FmgrInfo *out_functions = cstate->out_functions; + ListCell *cur; + + /* Binary per-tuple header */ + CopySendInt16(cstate, list_length(cstate->attnumlist)); + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (isnull) + { + CopySendInt32(cstate, -1); + } + else + { + bytea *outputbytes; + + outputbytes = SendFunctionCall(&out_functions[attnum - 1], value); + CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); + CopySendData(cstate, VARDATA(outputbytes), + VARSIZE(outputbytes) - VARHDRSZ); + } + } + + CopySendEndOfRow(cstate); +} + +static void +CopyToBinaryEnd(CopyToState cstate) +{ + /* Generate trailer for a binary copy */ + CopySendInt16(cstate, -1); + /* Need to flush out the trailer */ + CopySendEndOfRow(cstate); +} + +CopyToRoutine CopyToRoutineText = { + .CopyToProcessOption = CopyToTextProcessOption, + .CopyToGetFormat = CopyToTextGetFormat, + .CopyToStart = CopyToTextStart, + .CopyToOneRow = CopyToTextOneRow, + .CopyToEnd = CopyToTextEnd, +}; + +/* + * We can use the same CopyToRoutine for both of "text" and "csv" because + * CopyToText*() refer cstate->opts.csv_mode and change their behavior. We can + * split the implementations and stop referring cstate->opts.csv_mode later. + */ +CopyToRoutine CopyToRoutineCSV = { + .CopyToProcessOption = CopyToTextProcessOption, + .CopyToGetFormat = CopyToTextGetFormat, + .CopyToStart = CopyToTextStart, + .CopyToOneRow = CopyToTextOneRow, + .CopyToEnd = CopyToTextEnd, +}; + +CopyToRoutine CopyToRoutineBinary = { + .CopyToProcessOption = CopyToBinaryProcessOption, + .CopyToGetFormat = CopyToBinaryGetFormat, + .CopyToStart = CopyToBinaryStart, + .CopyToOneRow = CopyToBinaryOneRow, + .CopyToEnd = CopyToBinaryEnd, +}; /* * Send copy start/stop messages for frontend copies. These have changed @@ -141,7 +410,7 @@ SendCopyBegin(CopyToState cstate) { StringInfoData buf; int natts = list_length(cstate->attnumlist); - int16 format = (cstate->opts.binary ? 1 : 0); + int16 format = cstate->opts.to_routine->CopyToGetFormat(cstate); int i; pq_beginmessage(&buf, PqMsg_CopyOutResponse); @@ -198,16 +467,6 @@ CopySendEndOfRow(CopyToState cstate) switch (cstate->copy_dest) { case COPY_FILE: - if (!cstate->opts.binary) - { - /* Default line termination depends on platform */ -#ifndef WIN32 - CopySendChar(cstate, '\n'); -#else - CopySendString(cstate, "\r\n"); -#endif - } - if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) @@ -242,10 +501,6 @@ CopySendEndOfRow(CopyToState cstate) } break; case COPY_FRONTEND: - /* The FE/BE protocol uses \n as newline for all platforms */ - if (!cstate->opts.binary) - CopySendChar(cstate, '\n'); - /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len); break; @@ -431,7 +686,7 @@ BeginCopyTo(ParseState *pstate, oldcontext = MemoryContextSwitchTo(cstate->copycontext); /* Extract options from the statement node tree */ - ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options); + ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , cstate, options); /* Process the source/target relation or query */ if (rel) @@ -748,8 +1003,6 @@ DoCopyTo(CopyToState cstate) bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL); bool fe_copy = (pipe && whereToSendOutput == DestRemote); TupleDesc tupDesc; - int num_phys_attrs; - ListCell *cur; uint64 processed; if (fe_copy) @@ -759,32 +1012,11 @@ DoCopyTo(CopyToState cstate) tupDesc = RelationGetDescr(cstate->rel); else tupDesc = cstate->queryDesc->tupDesc; - num_phys_attrs = tupDesc->natts; cstate->opts.null_print_client = cstate->opts.null_print; /* default */ /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ cstate->fe_msgbuf = makeStringInfo(); - /* Get info about the columns we need to process. */ - cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - Oid out_func_oid; - bool isvarlena; - Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - - if (cstate->opts.binary) - getTypeBinaryOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - else - getTypeOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); - } - /* * Create a temporary memory context that we can reset once per row to * recover palloc'd memory. This avoids any problems with leaks inside @@ -795,57 +1027,7 @@ DoCopyTo(CopyToState cstate) "COPY TO", ALLOCSET_DEFAULT_SIZES); - if (cstate->opts.binary) - { - /* Generate header for a binary copy */ - int32 tmp; - - /* Signature */ - CopySendData(cstate, BinarySignature, 11); - /* Flags field */ - tmp = 0; - CopySendInt32(cstate, tmp); - /* No header extension */ - tmp = 0; - CopySendInt32(cstate, tmp); - } - else - { - /* - * For non-binary copy, we need to convert null_print to file - * encoding, because it will be sent directly with CopySendString. - */ - if (cstate->need_transcoding) - cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, - cstate->opts.null_print_len, - cstate->file_encoding); - - /* if a header has been requested send the line */ - if (cstate->opts.header_line) - { - bool hdr_delim = false; - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - char *colname; - - if (hdr_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - hdr_delim = true; - - colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); - - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, colname, false, - list_length(cstate->attnumlist) == 1); - else - CopyAttributeOutText(cstate, colname); - } - - CopySendEndOfRow(cstate); - } - } + cstate->opts.to_routine->CopyToStart(cstate, tupDesc); if (cstate->rel) { @@ -884,13 +1066,7 @@ DoCopyTo(CopyToState cstate) processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } - if (cstate->opts.binary) - { - /* Generate trailer for a binary copy */ - CopySendInt16(cstate, -1); - /* Need to flush out the trailer */ - CopySendEndOfRow(cstate); - } + cstate->opts.to_routine->CopyToEnd(cstate); MemoryContextDelete(cstate->rowcontext); @@ -906,71 +1082,15 @@ DoCopyTo(CopyToState cstate) static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) { - bool need_delim = false; - FmgrInfo *out_functions = cstate->out_functions; MemoryContext oldcontext; - ListCell *cur; - char *string; MemoryContextReset(cstate->rowcontext); oldcontext = MemoryContextSwitchTo(cstate->rowcontext); - if (cstate->opts.binary) - { - /* Binary per-tuple header */ - CopySendInt16(cstate, list_length(cstate->attnumlist)); - } - /* Make sure the tuple is fully deconstructed */ slot_getallattrs(slot); - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - Datum value = slot->tts_values[attnum - 1]; - bool isnull = slot->tts_isnull[attnum - 1]; - - if (!cstate->opts.binary) - { - if (need_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - need_delim = true; - } - - if (isnull) - { - if (!cstate->opts.binary) - CopySendString(cstate, cstate->opts.null_print_client); - else - CopySendInt32(cstate, -1); - } - else - { - if (!cstate->opts.binary) - { - string = OutputFunctionCall(&out_functions[attnum - 1], - value); - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, string, - cstate->opts.force_quote_flags[attnum - 1], - list_length(cstate->attnumlist) == 1); - else - CopyAttributeOutText(cstate, string); - } - else - { - bytea *outputbytes; - - outputbytes = SendFunctionCall(&out_functions[attnum - 1], - value); - CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); - CopySendData(cstate, VARDATA(outputbytes), - VARSIZE(outputbytes) - VARHDRSZ); - } - } - } - - CopySendEndOfRow(cstate); + cstate->opts.to_routine->CopyToOneRow(cstate, slot); MemoryContextSwitchTo(oldcontext); } diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index b3da3cb0be799..34bea880caa42 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -14,6 +14,7 @@ #ifndef COPY_H #define COPY_H +#include "commands/copyapi.h" #include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "parser/parse_node.h" @@ -74,11 +75,11 @@ typedef struct CopyFormatOptions bool convert_selectively; /* do selective binary conversion? */ CopyOnErrorChoice on_error; /* what to do when error happened */ List *convert_select; /* list of column names (can be NIL) */ + CopyToRoutine *to_routine; /* callback routines for COPY TO */ } CopyFormatOptions; -/* These are private in commands/copy[from|to].c */ +/* This is private in commands/copyfrom.c */ typedef struct CopyFromStateData *CopyFromState; -typedef struct CopyToStateData *CopyToState; typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); typedef void (*copy_data_dest_cb) (void *data, int len); @@ -87,7 +88,7 @@ extern void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, uint64 *processed); -extern void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, List *options); +extern void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, void *cstate, List *options); extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options); diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h new file mode 100644 index 0000000000000..eb68f2fb7b329 --- /dev/null +++ b/src/include/commands/copyapi.h @@ -0,0 +1,59 @@ +/*------------------------------------------------------------------------- + * + * copyapi.h + * API for COPY TO/FROM handlers + * + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/commands/copyapi.h + * + *------------------------------------------------------------------------- + */ +#ifndef COPYAPI_H +#define COPYAPI_H + +#include "executor/tuptable.h" +#include "nodes/parsenodes.h" + +/* This is private in commands/copyto.c */ +typedef struct CopyToStateData *CopyToState; + +typedef bool (*CopyToProcessOption_function) (CopyToState cstate, DefElem *defel); +typedef int16 (*CopyToGetFormat_function) (CopyToState cstate); +typedef void (*CopyToStart_function) (CopyToState cstate, TupleDesc tupDesc); +typedef void (*CopyToOneRow_function) (CopyToState cstate, TupleTableSlot *slot); +typedef void (*CopyToEnd_function) (CopyToState cstate); + +/* Routines for a COPY TO format implementation. */ +typedef struct CopyToRoutine +{ + /* + * Called for processing one COPY TO option. This will return false when + * the given option is invalid. + */ + CopyToProcessOption_function CopyToProcessOption; + + /* + * Called when COPY TO is started. This will return a format as int16 + * value. It's used for the CopyOutResponse message. + */ + CopyToGetFormat_function CopyToGetFormat; + + /* Called when COPY TO is started. This will send a header. */ + CopyToStart_function CopyToStart; + + /* Copy one row for COPY TO. */ + CopyToOneRow_function CopyToOneRow; + + /* Called when COPY TO is ended. This will send a trailer. */ + CopyToEnd_function CopyToEnd; +} CopyToRoutine; + +/* Built-in CopyToRoutine for "text", "csv" and "binary". */ +extern CopyToRoutine CopyToRoutineText; +extern CopyToRoutine CopyToRoutineCSV; +extern CopyToRoutine CopyToRoutineBinary; + +#endif /* COPYAPI_H */ From a597f8a2beec12971d77419f08b5722f531774f3 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Tue, 23 Jan 2024 13:58:38 +0900 Subject: [PATCH 02/10] Add support for adding custom COPY TO format This uses the handler approach like tablesample. The approach creates an internal function that returns an internal struct. In this case, a COPY TO handler returns a CopyToRoutine. We will add support for custom COPY FROM format later. We'll use the same handler for COPY TO and COPY FROM. PostgreSQL calls a COPY TO/FROM handler with "is_from" argument. It's true for COPY FROM and false for COPY TO: copy_handler(true) returns CopyToRoutine copy_handler(false) returns CopyFromRoutine (not exist yet) We discussed that we introduce a wrapper struct for it: typedef struct CopyRoutine { NodeTag type; /* either CopyToRoutine or CopyFromRoutine */ Node *routine; } copy_handler(true) returns CopyRoutine with CopyToRoutine copy_handler(false) returns CopyRoutine with CopyFromRoutine See also: https://www.postgresql.org/message-id/flat/CAD21AoCunywHird3GaPzWe6s9JG1wzxj3Cr6vGN36DDheGjOjA%40mail.gmail.com But I noticed that we don't need the wrapper struct. We can just CopyToRoutine or CopyFromRoutine. Because we can distinct the returned struct by checking its NodeTag. So I don't use the wrapper struct approach. --- src/backend/commands/copy.c | 84 ++++++++++++++----- src/backend/nodes/Makefile | 1 + src/backend/nodes/gen_node_support.pl | 2 + src/backend/utils/adt/pseudotypes.c | 1 + src/include/catalog/pg_proc.dat | 6 ++ src/include/catalog/pg_type.dat | 6 ++ src/include/commands/copyapi.h | 2 + src/include/nodes/meson.build | 1 + src/test/modules/Makefile | 1 + src/test/modules/meson.build | 1 + src/test/modules/test_copy_format/.gitignore | 4 + src/test/modules/test_copy_format/Makefile | 23 +++++ .../expected/test_copy_format.out | 17 ++++ src/test/modules/test_copy_format/meson.build | 33 ++++++++ .../test_copy_format/sql/test_copy_format.sql | 8 ++ .../test_copy_format--1.0.sql | 8 ++ .../test_copy_format/test_copy_format.c | 77 +++++++++++++++++ .../test_copy_format/test_copy_format.control | 4 + 18 files changed, 260 insertions(+), 19 deletions(-) mode change 100644 => 100755 src/backend/nodes/gen_node_support.pl create mode 100644 src/test/modules/test_copy_format/.gitignore create mode 100644 src/test/modules/test_copy_format/Makefile create mode 100644 src/test/modules/test_copy_format/expected/test_copy_format.out create mode 100644 src/test/modules/test_copy_format/meson.build create mode 100644 src/test/modules/test_copy_format/sql/test_copy_format.sql create mode 100644 src/test/modules/test_copy_format/test_copy_format--1.0.sql create mode 100644 src/test/modules/test_copy_format/test_copy_format.c create mode 100644 src/test/modules/test_copy_format/test_copy_format.control diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 5f3697a5f966c..6f0db0ae7caf8 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -32,6 +32,7 @@ #include "parser/parse_coerce.h" #include "parser/parse_collate.h" #include "parser/parse_expr.h" +#include "parser/parse_func.h" #include "parser/parse_relation.h" #include "rewrite/rewriteHandler.h" #include "utils/acl.h" @@ -430,6 +431,69 @@ defGetCopyOnErrorChoice(DefElem *def, ParseState *pstate, bool is_from) return COPY_ON_ERROR_STOP; /* keep compiler quiet */ } +/* + * Process the "format" option. + * + * This function checks whether the option value is a built-in format such as + * "text" and "csv" or not. If the option value isn't a built-in format, this + * function finds a COPY format handler that returns a CopyToRoutine. If no + * COPY format handler is found, this function reports an error. + */ +static void +ProcessCopyOptionCustomFormat(ParseState *pstate, + CopyFormatOptions *opts_out, + bool is_from, + DefElem *defel) +{ + char *format; + Oid funcargtypes[1]; + Oid handlerOid = InvalidOid; + Datum datum; + void *routine; + + format = defGetString(defel); + + /* built-in formats */ + if (strcmp(format, "text") == 0) + /* default format */ return; + else if (strcmp(format, "csv") == 0) + { + opts_out->csv_mode = true; + opts_out->to_routine = &CopyToRoutineCSV; + return; + } + else if (strcmp(format, "binary") == 0) + { + opts_out->binary = true; + opts_out->to_routine = &CopyToRoutineBinary; + return; + } + + /* custom format */ + if (!is_from) + { + funcargtypes[0] = INTERNALOID; + handlerOid = LookupFuncName(list_make1(makeString(format)), 1, + funcargtypes, true); + } + if (!OidIsValid(handlerOid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY format \"%s\" not recognized", format), + parser_errposition(pstate, defel->location))); + + datum = OidFunctionCall1(handlerOid, BoolGetDatum(is_from)); + routine = DatumGetPointer(datum); + if (routine == NULL || !IsA(routine, CopyToRoutine)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY handler function %s(%u) did not return a CopyToRoutine struct", + format, handlerOid), + parser_errposition(pstate, defel->location))); + + opts_out->to_routine = routine; +} + /* * Process the statement option list for COPY. * @@ -481,28 +545,10 @@ ProcessCopyOptions(ParseState *pstate, if (strcmp(defel->defname, "format") == 0) { - char *fmt = defGetString(defel); - if (format_specified) errorConflictingDefElem(defel, pstate); format_specified = true; - if (strcmp(fmt, "text") == 0) - /* default format */ ; - else if (strcmp(fmt, "csv") == 0) - { - opts_out->csv_mode = true; - opts_out->to_routine = &CopyToRoutineCSV; - } - else if (strcmp(fmt, "binary") == 0) - { - opts_out->binary = true; - opts_out->to_routine = &CopyToRoutineBinary; - } - else - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("COPY format \"%s\" not recognized", fmt), - parser_errposition(pstate, defel->location))); + ProcessCopyOptionCustomFormat(pstate, opts_out, is_from, defel); } } /* Extract options except "format" from the statement node tree */ diff --git a/src/backend/nodes/Makefile b/src/backend/nodes/Makefile index 66bbad8e6e02d..173ee11811c35 100644 --- a/src/backend/nodes/Makefile +++ b/src/backend/nodes/Makefile @@ -49,6 +49,7 @@ node_headers = \ access/sdir.h \ access/tableam.h \ access/tsmapi.h \ + commands/copyapi.h \ commands/event_trigger.h \ commands/trigger.h \ executor/tuptable.h \ diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl old mode 100644 new mode 100755 index 2f0a59bc874b0..bd397f45ac1d2 --- a/src/backend/nodes/gen_node_support.pl +++ b/src/backend/nodes/gen_node_support.pl @@ -61,6 +61,7 @@ sub elem access/sdir.h access/tableam.h access/tsmapi.h + commands/copyapi.h commands/event_trigger.h commands/trigger.h executor/tuptable.h @@ -85,6 +86,7 @@ sub elem access/sdir.h access/tableam.h access/tsmapi.h + commands/copyapi.h commands/event_trigger.h commands/trigger.h executor/tuptable.h diff --git a/src/backend/utils/adt/pseudotypes.c b/src/backend/utils/adt/pseudotypes.c index a3a991f634d39..d308780c43876 100644 --- a/src/backend/utils/adt/pseudotypes.c +++ b/src/backend/utils/adt/pseudotypes.c @@ -373,6 +373,7 @@ PSEUDOTYPE_DUMMY_IO_FUNCS(fdw_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(table_am_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(index_am_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(tsm_handler); +PSEUDOTYPE_DUMMY_IO_FUNCS(copy_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(internal); PSEUDOTYPE_DUMMY_IO_FUNCS(anyelement); PSEUDOTYPE_DUMMY_IO_FUNCS(anynonarray); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 29af4ce65d5c7..d4e426687c904 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -7617,6 +7617,12 @@ { oid => '3312', descr => 'I/O', proname => 'tsm_handler_out', prorettype => 'cstring', proargtypes => 'tsm_handler', prosrc => 'tsm_handler_out' }, +{ oid => '8753', descr => 'I/O', + proname => 'copy_handler_in', proisstrict => 'f', prorettype => 'copy_handler', + proargtypes => 'cstring', prosrc => 'copy_handler_in' }, +{ oid => '8754', descr => 'I/O', + proname => 'copy_handler_out', prorettype => 'cstring', + proargtypes => 'copy_handler', prosrc => 'copy_handler_out' }, { oid => '267', descr => 'I/O', proname => 'table_am_handler_in', proisstrict => 'f', prorettype => 'table_am_handler', proargtypes => 'cstring', diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat index d29194da31fa8..2040d5da83fc9 100644 --- a/src/include/catalog/pg_type.dat +++ b/src/include/catalog/pg_type.dat @@ -632,6 +632,12 @@ typcategory => 'P', typinput => 'tsm_handler_in', typoutput => 'tsm_handler_out', typreceive => '-', typsend => '-', typalign => 'i' }, +{ oid => '8752', + descr => 'pseudo-type for the result of a copy to/from method functoin', + typname => 'copy_handler', typlen => '4', typbyval => 't', typtype => 'p', + typcategory => 'P', typinput => 'copy_handler_in', + typoutput => 'copy_handler_out', typreceive => '-', typsend => '-', + typalign => 'i' }, { oid => '269', typname => 'table_am_handler', descr => 'pseudo-type for the result of a table AM handler function', diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index eb68f2fb7b329..9c25e1c41509b 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -29,6 +29,8 @@ typedef void (*CopyToEnd_function) (CopyToState cstate); /* Routines for a COPY TO format implementation. */ typedef struct CopyToRoutine { + NodeTag type; + /* * Called for processing one COPY TO option. This will return false when * the given option is invalid. diff --git a/src/include/nodes/meson.build b/src/include/nodes/meson.build index b665e55b6571f..103df1a787357 100644 --- a/src/include/nodes/meson.build +++ b/src/include/nodes/meson.build @@ -11,6 +11,7 @@ node_support_input_i = [ 'access/sdir.h', 'access/tableam.h', 'access/tsmapi.h', + 'commands/copyapi.h', 'commands/event_trigger.h', 'commands/trigger.h', 'executor/tuptable.h', diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index e32c8925f67dd..9d57b868d5f36 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -15,6 +15,7 @@ SUBDIRS = \ spgist_name_ops \ test_bloomfilter \ test_copy_callbacks \ + test_copy_format \ test_custom_rmgrs \ test_ddl_deparse \ test_dsa \ diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index 397e0906e6f72..d76f2a6003e90 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -13,6 +13,7 @@ subdir('spgist_name_ops') subdir('ssl_passphrase_callback') subdir('test_bloomfilter') subdir('test_copy_callbacks') +subdir('test_copy_format') subdir('test_custom_rmgrs') subdir('test_ddl_deparse') subdir('test_dsa') diff --git a/src/test/modules/test_copy_format/.gitignore b/src/test/modules/test_copy_format/.gitignore new file mode 100644 index 0000000000000..5dcb3ff972350 --- /dev/null +++ b/src/test/modules/test_copy_format/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/src/test/modules/test_copy_format/Makefile b/src/test/modules/test_copy_format/Makefile new file mode 100644 index 0000000000000..8497f91624d5a --- /dev/null +++ b/src/test/modules/test_copy_format/Makefile @@ -0,0 +1,23 @@ +# src/test/modules/test_copy_format/Makefile + +MODULE_big = test_copy_format +OBJS = \ + $(WIN32RES) \ + test_copy_format.o +PGFILEDESC = "test_copy_format - test custom COPY FORMAT" + +EXTENSION = test_copy_format +DATA = test_copy_format--1.0.sql + +REGRESS = test_copy_format + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_copy_format +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out b/src/test/modules/test_copy_format/expected/test_copy_format.out new file mode 100644 index 0000000000000..3a24ae7b97827 --- /dev/null +++ b/src/test/modules/test_copy_format/expected/test_copy_format.out @@ -0,0 +1,17 @@ +CREATE EXTENSION test_copy_format; +CREATE TABLE public.test (a INT, b INT, c INT); +INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +COPY public.test TO stdout WITH ( + option_before 'before', + format 'test_copy_format', + option_after 'after' +); +NOTICE: test_copy_format: is_from=false +NOTICE: CopyToProcessOption: "option_before"="before" +NOTICE: CopyToProcessOption: "option_after"="after" +NOTICE: CopyToGetFormat +NOTICE: CopyToStart: natts=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToEnd diff --git a/src/test/modules/test_copy_format/meson.build b/src/test/modules/test_copy_format/meson.build new file mode 100644 index 0000000000000..4cefe7b709aea --- /dev/null +++ b/src/test/modules/test_copy_format/meson.build @@ -0,0 +1,33 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +test_copy_format_sources = files( + 'test_copy_format.c', +) + +if host_system == 'windows' + test_copy_format_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'test_copy_format', + '--FILEDESC', 'test_copy_format - test custom COPY FORMAT',]) +endif + +test_copy_format = shared_module('test_copy_format', + test_copy_format_sources, + kwargs: pg_test_mod_args, +) +test_install_libs += test_copy_format + +test_install_data += files( + 'test_copy_format.control', + 'test_copy_format--1.0.sql', +) + +tests += { + 'name': 'test_copy_format', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'regress': { + 'sql': [ + 'test_copy_format', + ], + }, +} diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql b/src/test/modules/test_copy_format/sql/test_copy_format.sql new file mode 100644 index 0000000000000..0eb7ed2e11605 --- /dev/null +++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql @@ -0,0 +1,8 @@ +CREATE EXTENSION test_copy_format; +CREATE TABLE public.test (a INT, b INT, c INT); +INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +COPY public.test TO stdout WITH ( + option_before 'before', + format 'test_copy_format', + option_after 'after' +); diff --git a/src/test/modules/test_copy_format/test_copy_format--1.0.sql b/src/test/modules/test_copy_format/test_copy_format--1.0.sql new file mode 100644 index 0000000000000..d24ea03ce99ca --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format--1.0.sql @@ -0,0 +1,8 @@ +/* src/test/modules/test_copy_format/test_copy_format--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_copy_format" to load this file. \quit + +CREATE FUNCTION test_copy_format(internal) + RETURNS copy_handler + AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_copy_format/test_copy_format.c b/src/test/modules/test_copy_format/test_copy_format.c new file mode 100644 index 0000000000000..a2219afcde6fc --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format.c @@ -0,0 +1,77 @@ +/*-------------------------------------------------------------------------- + * + * test_copy_format.c + * Code for testing custom COPY format. + * + * Portions Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_copy_format/test_copy_format.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "commands/copy.h" +#include "commands/defrem.h" + +PG_MODULE_MAGIC; + +static bool +CopyToProcessOption(CopyToState cstate, DefElem *defel) +{ + ereport(NOTICE, + (errmsg("CopyToProcessOption: \"%s\"=\"%s\"", + defel->defname, defGetString(defel)))); + return true; +} + +static int16 +CopyToGetFormat(CopyToState cstate) +{ + ereport(NOTICE, (errmsg("CopyToGetFormat"))); + return 0; +} + +static void +CopyToStart(CopyToState cstate, TupleDesc tupDesc) +{ + ereport(NOTICE, (errmsg("CopyToStart: natts=%d", tupDesc->natts))); +} + +static void +CopyToOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + ereport(NOTICE, (errmsg("CopyToOneRow: tts_nvalid=%u", slot->tts_nvalid))); +} + +static void +CopyToEnd(CopyToState cstate) +{ + ereport(NOTICE, (errmsg("CopyToEnd"))); +} + +static const CopyToRoutine CopyToRoutineTestCopyFormat = { + .type = T_CopyToRoutine, + .CopyToProcessOption = CopyToProcessOption, + .CopyToGetFormat = CopyToGetFormat, + .CopyToStart = CopyToStart, + .CopyToOneRow = CopyToOneRow, + .CopyToEnd = CopyToEnd, +}; + +PG_FUNCTION_INFO_V1(test_copy_format); +Datum +test_copy_format(PG_FUNCTION_ARGS) +{ + bool is_from = PG_GETARG_BOOL(0); + + ereport(NOTICE, + (errmsg("test_copy_format: is_from=%s", is_from ? "true" : "false"))); + + if (is_from) + elog(ERROR, "COPY FROM isn't supported yet"); + + PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat); +} diff --git a/src/test/modules/test_copy_format/test_copy_format.control b/src/test/modules/test_copy_format/test_copy_format.control new file mode 100644 index 0000000000000..f05a6362358d1 --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format.control @@ -0,0 +1,4 @@ +comment = 'Test code for custom COPY format' +default_version = '1.0' +module_pathname = '$libdir/test_copy_format' +relocatable = true From c3a59753b1157dc8e47e719263f58677acc33178 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Tue, 23 Jan 2024 14:54:10 +0900 Subject: [PATCH 03/10] Export CopyToStateData It's for custom COPY TO format handlers implemented as extension. This just moves codes. This doesn't change codes except CopyDest enum values. CopyDest enum values such as COPY_FILE are conflicted CopySource enum values defined in copyfrom_internal.h. So COPY_DEST_ prefix instead of COPY_ prefix is used. For example, COPY_FILE is renamed to COPY_DEST_FILE. Note that this change isn't enough to implement a custom COPY TO format handler as extension. We'll do the followings in a subsequent commit: 1. Add an opaque space for custom COPY TO format handler 2. Export CopySendEndOfRow() to flush buffer --- src/backend/commands/copyto.c | 74 +++----------------- src/include/commands/copy.h | 59 ---------------- src/include/commands/copyapi.h | 120 ++++++++++++++++++++++++++++++++- 3 files changed, 127 insertions(+), 126 deletions(-) diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 6547b7c654ad4..cfc74ee7b1d28 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -43,64 +43,6 @@ #include "utils/rel.h" #include "utils/snapmgr.h" -/* - * Represents the different dest cases we need to worry about at - * the bottom level - */ -typedef enum CopyDest -{ - COPY_FILE, /* to file (or a piped program) */ - COPY_FRONTEND, /* to frontend */ - COPY_CALLBACK, /* to callback function */ -} CopyDest; - -/* - * This struct contains all the state variables used throughout a COPY TO - * operation. - * - * Multi-byte encodings: all supported client-side encodings encode multi-byte - * characters by having the first byte's high bit set. Subsequent bytes of the - * character can have the high bit not set. When scanning data in such an - * encoding to look for a match to a single-byte (ie ASCII) character, we must - * use the full pg_encoding_mblen() machinery to skip over multibyte - * characters, else we might find a false match to a trailing byte. In - * supported server encodings, there is no possibility of a false match, and - * it's faster to make useless comparisons to trailing bytes than it is to - * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true - * when we have to do it the hard way. - */ -typedef struct CopyToStateData -{ - /* low-level state data */ - CopyDest copy_dest; /* type of copy source/destination */ - FILE *copy_file; /* used if copy_dest == COPY_FILE */ - StringInfo fe_msgbuf; /* used for all dests during COPY TO */ - - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy to */ - QueryDesc *queryDesc; /* executable query to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDOUT */ - bool is_program; /* is 'filename' a program to popen? */ - copy_data_dest_cb data_dest_cb; /* function for writing data */ - - CopyFormatOptions opts; - Node *whereClause; /* WHERE condition (or NULL) */ - - /* - * Working state - */ - MemoryContext copycontext; /* per-copy execution context */ - - FmgrInfo *out_functions; /* lookup info for output functions */ - MemoryContext rowcontext; /* per-row evaluation context */ - uint64 bytes_processed; /* number of bytes processed so far */ -} CopyToStateData; - /* DestReceiver for COPY (query) TO */ typedef struct { @@ -160,7 +102,7 @@ CopyToTextSendEndOfRow(CopyToState cstate) { switch (cstate->copy_dest) { - case COPY_FILE: + case COPY_DEST_FILE: /* Default line termination depends on platform */ #ifndef WIN32 CopySendChar(cstate, '\n'); @@ -168,7 +110,7 @@ CopyToTextSendEndOfRow(CopyToState cstate) CopySendString(cstate, "\r\n"); #endif break; - case COPY_FRONTEND: + case COPY_DEST_FRONTEND: /* The FE/BE protocol uses \n as newline for all platforms */ CopySendChar(cstate, '\n'); break; @@ -419,7 +361,7 @@ SendCopyBegin(CopyToState cstate) for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); - cstate->copy_dest = COPY_FRONTEND; + cstate->copy_dest = COPY_DEST_FRONTEND; } static void @@ -466,7 +408,7 @@ CopySendEndOfRow(CopyToState cstate) switch (cstate->copy_dest) { - case COPY_FILE: + case COPY_DEST_FILE: if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) @@ -500,11 +442,11 @@ CopySendEndOfRow(CopyToState cstate) errmsg("could not write to COPY file: %m"))); } break; - case COPY_FRONTEND: + case COPY_DEST_FRONTEND: /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len); break; - case COPY_CALLBACK: + case COPY_DEST_CALLBACK: cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len); break; } @@ -877,12 +819,12 @@ BeginCopyTo(ParseState *pstate, /* See Multibyte encoding comment above */ cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); - cstate->copy_dest = COPY_FILE; /* default */ + cstate->copy_dest = COPY_DEST_FILE; /* default */ if (data_dest_cb) { progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; - cstate->copy_dest = COPY_CALLBACK; + cstate->copy_dest = COPY_DEST_CALLBACK; cstate->data_dest_cb = data_dest_cb; } else if (pipe) diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 34bea880caa42..b3f4682f95e06 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -20,69 +20,10 @@ #include "parser/parse_node.h" #include "tcop/dest.h" -/* - * Represents whether a header line should be present, and whether it must - * match the actual names (which implies "true"). - */ -typedef enum CopyHeaderChoice -{ - COPY_HEADER_FALSE = 0, - COPY_HEADER_TRUE, - COPY_HEADER_MATCH, -} CopyHeaderChoice; - -/* - * Represents where to save input processing errors. More values to be added - * in the future. - */ -typedef enum CopyOnErrorChoice -{ - COPY_ON_ERROR_STOP = 0, /* immediately throw errors, default */ - COPY_ON_ERROR_IGNORE, /* ignore errors */ -} CopyOnErrorChoice; - -/* - * A struct to hold COPY options, in a parsed form. All of these are related - * to formatting, except for 'freeze', which doesn't really belong here, but - * it's expedient to parse it along with all the other options. - */ -typedef struct CopyFormatOptions -{ - /* parameters from the COPY command */ - int file_encoding; /* file or remote side's character encoding, - * -1 if not specified */ - bool binary; /* binary format? */ - bool freeze; /* freeze rows on loading? */ - bool csv_mode; /* Comma Separated Value format? */ - CopyHeaderChoice header_line; /* header line? */ - char *null_print; /* NULL marker string (server encoding!) */ - int null_print_len; /* length of same */ - char *null_print_client; /* same converted to file encoding */ - char *default_print; /* DEFAULT marker string */ - int default_print_len; /* length of same */ - char *delim; /* column delimiter (must be 1 byte) */ - char *quote; /* CSV quote char (must be 1 byte) */ - char *escape; /* CSV escape char (must be 1 byte) */ - List *force_quote; /* list of column names */ - bool force_quote_all; /* FORCE_QUOTE *? */ - bool *force_quote_flags; /* per-column CSV FQ flags */ - List *force_notnull; /* list of column names */ - bool force_notnull_all; /* FORCE_NOT_NULL *? */ - bool *force_notnull_flags; /* per-column CSV FNN flags */ - List *force_null; /* list of column names */ - bool force_null_all; /* FORCE_NULL *? */ - bool *force_null_flags; /* per-column CSV FN flags */ - bool convert_selectively; /* do selective binary conversion? */ - CopyOnErrorChoice on_error; /* what to do when error happened */ - List *convert_select; /* list of column names (can be NIL) */ - CopyToRoutine *to_routine; /* callback routines for COPY TO */ -} CopyFormatOptions; - /* This is private in commands/copyfrom.c */ typedef struct CopyFromStateData *CopyFromState; typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); -typedef void (*copy_data_dest_cb) (void *data, int len); extern void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 9c25e1c41509b..a869d78d72237 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -14,10 +14,10 @@ #ifndef COPYAPI_H #define COPYAPI_H +#include "executor/execdesc.h" #include "executor/tuptable.h" #include "nodes/parsenodes.h" -/* This is private in commands/copyto.c */ typedef struct CopyToStateData *CopyToState; typedef bool (*CopyToProcessOption_function) (CopyToState cstate, DefElem *defel); @@ -58,4 +58,122 @@ extern CopyToRoutine CopyToRoutineText; extern CopyToRoutine CopyToRoutineCSV; extern CopyToRoutine CopyToRoutineBinary; +/* + * Represents whether a header line should be present, and whether it must + * match the actual names (which implies "true"). + */ +typedef enum CopyHeaderChoice +{ + COPY_HEADER_FALSE = 0, + COPY_HEADER_TRUE, + COPY_HEADER_MATCH, +} CopyHeaderChoice; + +/* + * Represents where to save input processing errors. More values to be added + * in the future. + */ +typedef enum CopyOnErrorChoice +{ + COPY_ON_ERROR_STOP = 0, /* immediately throw errors, default */ + COPY_ON_ERROR_IGNORE, /* ignore errors */ +} CopyOnErrorChoice; + +/* + * A struct to hold COPY options, in a parsed form. All of these are related + * to formatting, except for 'freeze', which doesn't really belong here, but + * it's expedient to parse it along with all the other options. + */ +typedef struct CopyFormatOptions +{ + /* parameters from the COPY command */ + int file_encoding; /* file or remote side's character encoding, + * -1 if not specified */ + bool binary; /* binary format? */ + bool freeze; /* freeze rows on loading? */ + bool csv_mode; /* Comma Separated Value format? */ + CopyHeaderChoice header_line; /* header line? */ + char *null_print; /* NULL marker string (server encoding!) */ + int null_print_len; /* length of same */ + char *null_print_client; /* same converted to file encoding */ + char *default_print; /* DEFAULT marker string */ + int default_print_len; /* length of same */ + char *delim; /* column delimiter (must be 1 byte) */ + char *quote; /* CSV quote char (must be 1 byte) */ + char *escape; /* CSV escape char (must be 1 byte) */ + List *force_quote; /* list of column names */ + bool force_quote_all; /* FORCE_QUOTE *? */ + bool *force_quote_flags; /* per-column CSV FQ flags */ + List *force_notnull; /* list of column names */ + bool force_notnull_all; /* FORCE_NOT_NULL *? */ + bool *force_notnull_flags; /* per-column CSV FNN flags */ + List *force_null; /* list of column names */ + bool force_null_all; /* FORCE_NULL *? */ + bool *force_null_flags; /* per-column CSV FN flags */ + bool convert_selectively; /* do selective binary conversion? */ + CopyOnErrorChoice on_error; /* what to do when error happened */ + List *convert_select; /* list of column names (can be NIL) */ + CopyToRoutine *to_routine; /* callback routines for COPY TO */ +} CopyFormatOptions; + +/* + * Represents the different dest cases we need to worry about at + * the bottom level + */ +typedef enum CopyDest +{ + COPY_DEST_FILE, /* to file (or a piped program) */ + COPY_DEST_FRONTEND, /* to frontend */ + COPY_DEST_CALLBACK, /* to callback function */ +} CopyDest; + +typedef void (*copy_data_dest_cb) (void *data, int len); + +/* + * This struct contains all the state variables used throughout a COPY TO + * operation. + * + * Multi-byte encodings: all supported client-side encodings encode multi-byte + * characters by having the first byte's high bit set. Subsequent bytes of the + * character can have the high bit not set. When scanning data in such an + * encoding to look for a match to a single-byte (ie ASCII) character, we must + * use the full pg_encoding_mblen() machinery to skip over multibyte + * characters, else we might find a false match to a trailing byte. In + * supported server encodings, there is no possibility of a false match, and + * it's faster to make useless comparisons to trailing bytes than it is to + * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true + * when we have to do it the hard way. + */ +typedef struct CopyToStateData +{ + /* low-level state data */ + CopyDest copy_dest; /* type of copy source/destination */ + FILE *copy_file; /* used if copy_dest == COPY_FILE */ + StringInfo fe_msgbuf; /* used for all dests during COPY TO */ + + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + + /* parameters from the COPY command */ + Relation rel; /* relation to copy to */ + QueryDesc *queryDesc; /* executable query to copy from */ + List *attnumlist; /* integer list of attnums to copy */ + char *filename; /* filename, or NULL for STDOUT */ + bool is_program; /* is 'filename' a program to popen? */ + copy_data_dest_cb data_dest_cb; /* function for writing data */ + + CopyFormatOptions opts; + Node *whereClause; /* WHERE condition (or NULL) */ + + /* + * Working state + */ + MemoryContext copycontext; /* per-copy execution context */ + + FmgrInfo *out_functions; /* lookup info for output functions */ + MemoryContext rowcontext; /* per-row evaluation context */ + uint64 bytes_processed; /* number of bytes processed so far */ +} CopyToStateData; + #endif /* COPYAPI_H */ From 4b177469f3fb8f14f0cd6bff3c7878dcafd9b760 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Tue, 23 Jan 2024 15:12:43 +0900 Subject: [PATCH 04/10] Add support for implementing custom COPY TO format as extension * Add CopyToStateData::opaque that can be used to keep data for custom COPY TO format implementation * Export CopySendEndOfRow() to flush data in CopyToStateData::fe_msgbuf * Rename CopySendEndOfRow() to CopyToStateFlush() because it's a method for CopyToState and it's used for flushing. End-of-row related codes were moved to CopyToTextSendEndOfRow(). --- src/backend/commands/copyto.c | 15 +++++++-------- src/include/commands/copyapi.h | 5 +++++ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index cfc74ee7b1d28..b5d8678394ffe 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -69,7 +69,6 @@ static void SendCopyEnd(CopyToState cstate); static void CopySendData(CopyToState cstate, const void *databuf, int datasize); static void CopySendString(CopyToState cstate, const char *str); static void CopySendChar(CopyToState cstate, char c); -static void CopySendEndOfRow(CopyToState cstate); static void CopySendInt32(CopyToState cstate, int32 val); static void CopySendInt16(CopyToState cstate, int16 val); @@ -117,7 +116,7 @@ CopyToTextSendEndOfRow(CopyToState cstate) default: break; } - CopySendEndOfRow(cstate); + CopyToStateFlush(cstate); } static void @@ -302,7 +301,7 @@ CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot) } } - CopySendEndOfRow(cstate); + CopyToStateFlush(cstate); } static void @@ -311,7 +310,7 @@ CopyToBinaryEnd(CopyToState cstate) /* Generate trailer for a binary copy */ CopySendInt16(cstate, -1); /* Need to flush out the trailer */ - CopySendEndOfRow(cstate); + CopyToStateFlush(cstate); } CopyToRoutine CopyToRoutineText = { @@ -377,8 +376,8 @@ SendCopyEnd(CopyToState cstate) * CopySendData sends output data to the destination (file or frontend) * CopySendString does the same for null-terminated strings * CopySendChar does the same for single characters - * CopySendEndOfRow does the appropriate thing at end of each data row - * (data is not actually flushed except by CopySendEndOfRow) + * CopyToStateFlush flushes the buffered data + * (data is not actually flushed except by CopyToStateFlush) * * NB: no data conversion is applied by these functions *---------- @@ -401,8 +400,8 @@ CopySendChar(CopyToState cstate, char c) appendStringInfoCharMacro(cstate->fe_msgbuf, c); } -static void -CopySendEndOfRow(CopyToState cstate) +void +CopyToStateFlush(CopyToState cstate) { StringInfo fe_msgbuf = cstate->fe_msgbuf; diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index a869d78d72237..ffad433a21119 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -174,6 +174,11 @@ typedef struct CopyToStateData FmgrInfo *out_functions; /* lookup info for output functions */ MemoryContext rowcontext; /* per-row evaluation context */ uint64 bytes_processed; /* number of bytes processed so far */ + + /* For custom format implementation */ + void *opaque; /* private space */ } CopyToStateData; +extern void CopyToStateFlush(CopyToState cstate); + #endif /* COPYAPI_H */ From 781955f19ad27cdd66748be539bf45cf1b925856 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Tue, 23 Jan 2024 17:21:23 +0900 Subject: [PATCH 05/10] Extract COPY FROM format implementations This doesn't change the current behavior. This just introduces CopyFromRoutine, which just has function pointers of format implementation like TupleTableSlotOps, and use it for existing "text", "csv" and "binary" format implementations. Note that CopyFromRoutine can't be used from extensions yet because CopyRead*() aren't exported yet. Extensions can't read data from a source without CopyRead*(). They will be exported by subsequent patches. --- src/backend/commands/copy.c | 3 + src/backend/commands/copyfrom.c | 216 +++++++++++---- src/backend/commands/copyfromparse.c | 326 ++++++++++++----------- src/include/commands/copy.h | 3 - src/include/commands/copyapi.h | 44 +++ src/include/commands/copyfrom_internal.h | 4 + 6 files changed, 391 insertions(+), 205 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 6f0db0ae7caf8..ec6dfff8ab4d2 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -459,12 +459,14 @@ ProcessCopyOptionCustomFormat(ParseState *pstate, else if (strcmp(format, "csv") == 0) { opts_out->csv_mode = true; + opts_out->from_routine = &CopyFromRoutineCSV; opts_out->to_routine = &CopyToRoutineCSV; return; } else if (strcmp(format, "binary") == 0) { opts_out->binary = true; + opts_out->from_routine = &CopyFromRoutineBinary; opts_out->to_routine = &CopyToRoutineBinary; return; } @@ -533,6 +535,7 @@ ProcessCopyOptions(ParseState *pstate, opts_out->file_encoding = -1; /* Text is the default format. */ + opts_out->from_routine = &CopyFromRoutineText; opts_out->to_routine = &CopyToRoutineText; /* diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index fb3d4d929602e..d556ebb5d67ab 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -108,6 +108,170 @@ static char *limit_printout_length(const char *str); static void ClosePipeFromProgram(CopyFromState cstate); + +/* + * CopyFromRoutine implementations. + */ + +/* + * CopyFromRoutine implementation for "text" and "csv". CopyFromText*() + * refer cstate->opts.csv_mode and change their behavior. We can split this + * implementation and stop referring cstate->opts.csv_mode later. + */ + +/* All "text" and "csv" options are parsed in ProcessCopyOptions(). We may + * move the code to here later. */ +static bool +CopyFromTextProcessOption(CopyFromState cstate, DefElem *defel) +{ + return false; +} + +static int16 +CopyFromTextGetFormat(CopyFromState cstate) +{ + return 0; +} + +static void +CopyFromTextStart(CopyFromState cstate, TupleDesc tupDesc) +{ + AttrNumber num_phys_attrs = tupDesc->natts; + AttrNumber attr_count; + + /* + * If encoding conversion is needed, we need another buffer to hold the + * converted input data. Otherwise, we can just point input_buf to the + * same buffer as raw_buf. + */ + if (cstate->need_transcoding) + { + cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); + cstate->input_buf_index = cstate->input_buf_len = 0; + } + else + cstate->input_buf = cstate->raw_buf; + cstate->input_reached_eof = false; + + initStringInfo(&cstate->line_buf); + + /* + * Pick up the required catalog information for each attribute in the + * relation, including the input function, the element type (to pass to + * the input function). + */ + cstate->in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + cstate->typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); + for (int attnum = 1; attnum <= num_phys_attrs; attnum++) + { + Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1); + Oid in_func_oid; + + /* We don't need info for dropped attributes */ + if (att->attisdropped) + continue; + + /* Fetch the input function and typioparam info */ + getTypeInputInfo(att->atttypid, + &in_func_oid, &cstate->typioparams[attnum - 1]); + fmgr_info(in_func_oid, &cstate->in_functions[attnum - 1]); + } + + /* create workspace for CopyReadAttributes results */ + attr_count = list_length(cstate->attnumlist); + cstate->max_fields = attr_count; + cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); +} + +static void +CopyFromTextEnd(CopyFromState cstate) +{ +} + +/* + * CopyFromRoutine implementation for "binary". + */ + +/* All "binary" options are parsed in ProcessCopyOptions(). We may move the + * code to here later. */ +static bool +CopyFromBinaryProcessOption(CopyFromState cstate, DefElem *defel) +{ + return false; +} + +static int16 +CopyFromBinaryGetFormat(CopyFromState cstate) +{ + return 1; +} + +static void +CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc) +{ + AttrNumber num_phys_attrs = tupDesc->natts; + + /* + * Pick up the required catalog information for each attribute in the + * relation, including the input function, the element type (to pass to + * the input function). + */ + cstate->in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + cstate->typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); + for (int attnum = 1; attnum <= num_phys_attrs; attnum++) + { + Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1); + Oid in_func_oid; + + /* We don't need info for dropped attributes */ + if (att->attisdropped) + continue; + + /* Fetch the input function and typioparam info */ + getTypeBinaryInputInfo(att->atttypid, + &in_func_oid, &cstate->typioparams[attnum - 1]); + fmgr_info(in_func_oid, &cstate->in_functions[attnum - 1]); + } + + /* Read and verify binary header */ + ReceiveCopyBinaryHeader(cstate); +} + +static void +CopyFromBinaryEnd(CopyFromState cstate) +{ +} + +CopyFromRoutine CopyFromRoutineText = { + .CopyFromProcessOption = CopyFromTextProcessOption, + .CopyFromGetFormat = CopyFromTextGetFormat, + .CopyFromStart = CopyFromTextStart, + .CopyFromOneRow = CopyFromTextOneRow, + .CopyFromEnd = CopyFromTextEnd, +}; + +/* + * We can use the same CopyFromRoutine for both of "text" and "csv" because + * CopyFromText*() refer cstate->opts.csv_mode and change their behavior. We can + * split the implementations and stop referring cstate->opts.csv_mode later. + */ +CopyFromRoutine CopyFromRoutineCSV = { + .CopyFromProcessOption = CopyFromTextProcessOption, + .CopyFromGetFormat = CopyFromTextGetFormat, + .CopyFromStart = CopyFromTextStart, + .CopyFromOneRow = CopyFromTextOneRow, + .CopyFromEnd = CopyFromTextEnd, +}; + +CopyFromRoutine CopyFromRoutineBinary = { + .CopyFromProcessOption = CopyFromBinaryProcessOption, + .CopyFromGetFormat = CopyFromBinaryGetFormat, + .CopyFromStart = CopyFromBinaryStart, + .CopyFromOneRow = CopyFromBinaryOneRow, + .CopyFromEnd = CopyFromBinaryEnd, +}; + + /* * error context callback for COPY FROM * @@ -1384,9 +1548,6 @@ BeginCopyFrom(ParseState *pstate, TupleDesc tupDesc; AttrNumber num_phys_attrs, num_defaults; - FmgrInfo *in_functions; - Oid *typioparams; - Oid in_func_oid; int *defmap; ExprState **defexprs; MemoryContext oldcontext; @@ -1571,25 +1732,6 @@ BeginCopyFrom(ParseState *pstate, cstate->raw_buf_index = cstate->raw_buf_len = 0; cstate->raw_reached_eof = false; - if (!cstate->opts.binary) - { - /* - * If encoding conversion is needed, we need another buffer to hold - * the converted input data. Otherwise, we can just point input_buf - * to the same buffer as raw_buf. - */ - if (cstate->need_transcoding) - { - cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); - cstate->input_buf_index = cstate->input_buf_len = 0; - } - else - cstate->input_buf = cstate->raw_buf; - cstate->input_reached_eof = false; - - initStringInfo(&cstate->line_buf); - } - initStringInfo(&cstate->attribute_buf); /* Assign range table and rteperminfos, we'll need them in CopyFrom. */ @@ -1608,8 +1750,6 @@ BeginCopyFrom(ParseState *pstate, * the input function), and info about defaults and constraints. (Which * input function we use depends on text/binary format choice.) */ - in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); - typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); defmap = (int *) palloc(num_phys_attrs * sizeof(int)); defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); @@ -1621,15 +1761,6 @@ BeginCopyFrom(ParseState *pstate, if (att->attisdropped) continue; - /* Fetch the input function and typioparam info */ - if (cstate->opts.binary) - getTypeBinaryInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - else - getTypeInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); - /* Get default info if available */ defexprs[attnum - 1] = NULL; @@ -1689,8 +1820,6 @@ BeginCopyFrom(ParseState *pstate, cstate->bytes_processed = 0; /* We keep those variables in cstate. */ - cstate->in_functions = in_functions; - cstate->typioparams = typioparams; cstate->defmap = defmap; cstate->defexprs = defexprs; cstate->volatile_defexprs = volatile_defexprs; @@ -1763,20 +1892,7 @@ BeginCopyFrom(ParseState *pstate, pgstat_progress_update_multi_param(3, progress_cols, progress_vals); - if (cstate->opts.binary) - { - /* Read and verify binary header */ - ReceiveCopyBinaryHeader(cstate); - } - - /* create workspace for CopyReadAttributes results */ - if (!cstate->opts.binary) - { - AttrNumber attr_count = list_length(cstate->attnumlist); - - cstate->max_fields = attr_count; - cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); - } + cstate->opts.from_routine->CopyFromStart(cstate, tupDesc); MemoryContextSwitchTo(oldcontext); @@ -1789,6 +1905,8 @@ BeginCopyFrom(ParseState *pstate, void EndCopyFrom(CopyFromState cstate) { + cstate->opts.from_routine->CopyFromEnd(cstate); + /* No COPY FROM related resources except memory. */ if (cstate->is_program) { diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 7cacd0b752c98..49632f75e4dd6 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -172,7 +172,7 @@ ReceiveCopyBegin(CopyFromState cstate) { StringInfoData buf; int natts = list_length(cstate->attnumlist); - int16 format = (cstate->opts.binary ? 1 : 0); + int16 format = cstate->opts.from_routine->CopyFromGetFormat(cstate); int i; pq_beginmessage(&buf, PqMsg_CopyInResponse); @@ -840,199 +840,219 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) return true; } -/* - * Read next tuple from file for COPY FROM. Return false if no more tuples. - * - * 'econtext' is used to evaluate default expression for each column that is - * either not read from the file or is using the DEFAULT option of COPY FROM. - * It can be NULL when no default values are used, i.e. when all columns are - * read from the file, and DEFAULT option is unset. - * - * 'values' and 'nulls' arrays must be the same length as columns of the - * relation passed to BeginCopyFrom. This function fills the arrays. - */ bool -NextCopyFrom(CopyFromState cstate, ExprContext *econtext, - Datum *values, bool *nulls) +CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls) { TupleDesc tupDesc; - AttrNumber num_phys_attrs, - attr_count, - num_defaults = cstate->num_defaults; + AttrNumber attr_count; FmgrInfo *in_functions = cstate->in_functions; Oid *typioparams = cstate->typioparams; - int i; - int *defmap = cstate->defmap; ExprState **defexprs = cstate->defexprs; + char **field_strings; + ListCell *cur; + int fldct; + int fieldno; + char *string; tupDesc = RelationGetDescr(cstate->rel); - num_phys_attrs = tupDesc->natts; attr_count = list_length(cstate->attnumlist); - /* Initialize all values for row to NULL */ - MemSet(values, 0, num_phys_attrs * sizeof(Datum)); - MemSet(nulls, true, num_phys_attrs * sizeof(bool)); - MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool)); + /* read raw fields in the next line */ + if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) + return false; - if (!cstate->opts.binary) - { - char **field_strings; - ListCell *cur; - int fldct; - int fieldno; - char *string; + /* check for overflowing fields */ + if (attr_count > 0 && fldct > attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); - /* read raw fields in the next line */ - if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) - return false; + fieldno = 0; - /* check for overflowing fields */ - if (attr_count > 0 && fldct > attr_count) + /* Loop to read the user attributes on the line. */ + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + if (fieldno >= fldct) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + string = field_strings[fieldno++]; - fieldno = 0; - - /* Loop to read the user attributes on the line. */ - foreach(cur, cstate->attnumlist) + if (cstate->convert_select_flags && + !cstate->convert_select_flags[m]) { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); - string = field_strings[fieldno++]; - - if (cstate->convert_select_flags && - !cstate->convert_select_flags[m]) - { - /* ignore input field, leaving column as NULL */ - continue; - } + /* ignore input field, leaving column as NULL */ + continue; + } - if (cstate->opts.csv_mode) + if (cstate->opts.csv_mode) + { + if (string == NULL && + cstate->opts.force_notnull_flags[m]) { - if (string == NULL && - cstate->opts.force_notnull_flags[m]) - { - /* - * FORCE_NOT_NULL option is set and column is NULL - - * convert it to the NULL string. - */ - string = cstate->opts.null_print; - } - else if (string != NULL && cstate->opts.force_null_flags[m] - && strcmp(string, cstate->opts.null_print) == 0) - { - /* - * FORCE_NULL option is set and column matches the NULL - * string. It must have been quoted, or otherwise the - * string would already have been set to NULL. Convert it - * to NULL as specified. - */ - string = NULL; - } + /* + * FORCE_NOT_NULL option is set and column is NULL - convert + * it to the NULL string. + */ + string = cstate->opts.null_print; } - - cstate->cur_attname = NameStr(att->attname); - cstate->cur_attval = string; - - if (string != NULL) - nulls[m] = false; - - if (cstate->defaults[m]) + else if (string != NULL && cstate->opts.force_null_flags[m] + && strcmp(string, cstate->opts.null_print) == 0) { /* - * The caller must supply econtext and have switched into the - * per-tuple memory context in it. + * FORCE_NULL option is set and column matches the NULL + * string. It must have been quoted, or otherwise the string + * would already have been set to NULL. Convert it to NULL as + * specified. */ - Assert(econtext != NULL); - Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); - - values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); + string = NULL; } + } + + cstate->cur_attname = NameStr(att->attname); + cstate->cur_attval = string; + + if (string != NULL) + nulls[m] = false; + if (cstate->defaults[m]) + { /* - * If ON_ERROR is specified with IGNORE, skip rows with soft - * errors + * The caller must supply econtext and have switched into the + * per-tuple memory context in it. */ - else if (!InputFunctionCallSafe(&in_functions[m], - string, - typioparams[m], - att->atttypmod, - (Node *) cstate->escontext, - &values[m])) - { - cstate->num_errors++; - return true; - } + Assert(econtext != NULL); + Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; + values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); } - Assert(fieldno == attr_count); + /* + * If ON_ERROR is specified with IGNORE, skip rows with soft errors + */ + else if (!InputFunctionCallSafe(&in_functions[m], + string, + typioparams[m], + att->atttypmod, + (Node *) cstate->escontext, + &values[m])) + { + cstate->num_errors++; + return true; + } + + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; } - else - { - /* binary */ - int16 fld_count; - ListCell *cur; - cstate->cur_lineno++; + Assert(fieldno == attr_count); - if (!CopyGetInt16(cstate, &fld_count)) - { - /* EOF detected (end of file, or protocol-level EOF) */ - return false; - } + return true; +} - if (fld_count == -1) - { - /* - * Received EOF marker. Wait for the protocol-level EOF, and - * complain if it doesn't come immediately. In COPY FROM STDIN, - * this ensures that we correctly handle CopyFail, if client - * chooses to send that now. When copying from file, we could - * ignore the rest of the file like in text mode, but we choose to - * be consistent with the COPY FROM STDIN case. - */ - char dummy; +bool +CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + int16 fld_count; + ListCell *cur; - if (CopyReadBinaryData(cstate, &dummy, 1) > 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - return false; - } + tupDesc = RelationGetDescr(cstate->rel); + attr_count = list_length(cstate->attnumlist); - if (fld_count != attr_count) + cstate->cur_lineno++; + + if (!CopyGetInt16(cstate, &fld_count)) + { + /* EOF detected (end of file, or protocol-level EOF) */ + return false; + } + + if (fld_count == -1) + { + /* + * Received EOF marker. Wait for the protocol-level EOF, and complain + * if it doesn't come immediately. In COPY FROM STDIN, this ensures + * that we correctly handle CopyFail, if client chooses to send that + * now. When copying from file, we could ignore the rest of the file + * like in text mode, but we choose to be consistent with the COPY + * FROM STDIN case. + */ + char dummy; + + if (CopyReadBinaryData(cstate, &dummy, 1) > 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); + errmsg("received copy data after EOF marker"))); + return false; + } - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - cstate->cur_attname = NameStr(att->attname); - values[m] = CopyReadBinaryAttribute(cstate, - &in_functions[m], - typioparams[m], - att->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; - } + if (fld_count != attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + cstate->cur_attname = NameStr(att->attname); + values[m] = CopyReadBinaryAttribute(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; } + return true; +} + +/* + * Read next tuple from file for COPY FROM. Return false if no more tuples. + * + * 'econtext' is used to evaluate default expression for each column that is + * either not read from the file or is using the DEFAULT option of COPY FROM. + * It can be NULL when no default values are used, i.e. when all columns are + * read from the file, and DEFAULT option is unset. + * + * 'values' and 'nulls' arrays must be the same length as columns of the + * relation passed to BeginCopyFrom. This function fills the arrays. + */ +bool +NextCopyFrom(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls) +{ + TupleDesc tupDesc; + AttrNumber num_phys_attrs, + num_defaults = cstate->num_defaults; + int i; + int *defmap = cstate->defmap; + ExprState **defexprs = cstate->defexprs; + + tupDesc = RelationGetDescr(cstate->rel); + num_phys_attrs = tupDesc->natts; + + /* Initialize all values for row to NULL */ + MemSet(values, 0, num_phys_attrs * sizeof(Datum)); + MemSet(nulls, true, num_phys_attrs * sizeof(bool)); + MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool)); + + if (!cstate->opts.from_routine->CopyFromOneRow(cstate, econtext, values, + nulls)) + return false; + /* * Now compute and insert any defaults available for the columns not * provided by the input data. Anything not processed here or above will diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index b3f4682f95e06..df29d42555f25 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -20,9 +20,6 @@ #include "parser/parse_node.h" #include "tcop/dest.h" -/* This is private in commands/copyfrom.c */ -typedef struct CopyFromStateData *CopyFromState; - typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); extern void DoCopy(ParseState *pstate, const CopyStmt *stmt, diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index ffad433a21119..323e4705d2391 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -18,6 +18,49 @@ #include "executor/tuptable.h" #include "nodes/parsenodes.h" +/* This is private in commands/copyfrom.c */ +typedef struct CopyFromStateData *CopyFromState; + +typedef bool (*CopyFromProcessOption_function) (CopyFromState cstate, DefElem *defel); +typedef int16 (*CopyFromGetFormat_function) (CopyFromState cstate); +typedef void (*CopyFromStart_function) (CopyFromState cstate, TupleDesc tupDesc); +typedef bool (*CopyFromOneRow_function) (CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls); +typedef void (*CopyFromEnd_function) (CopyFromState cstate); + +/* Routines for a COPY FROM format implementation. */ +typedef struct CopyFromRoutine +{ + /* + * Called for processing one COPY FROM option. This will return false when + * the given option is invalid. + */ + CopyFromProcessOption_function CopyFromProcessOption; + + /* + * Called when COPY FROM is started. This will return a format as int16 + * value. It's used for the CopyInResponse message. + */ + CopyFromGetFormat_function CopyFromGetFormat; + + /* + * Called when COPY FROM is started. This will initialize something and + * receive a header. + */ + CopyFromStart_function CopyFromStart; + + /* Copy one row. It returns false if no more tuples. */ + CopyFromOneRow_function CopyFromOneRow; + + /* Called when COPY FROM is ended. This will finalize something. */ + CopyFromEnd_function CopyFromEnd; +} CopyFromRoutine; + +/* Built-in CopyFromRoutine for "text", "csv" and "binary". */ +extern CopyFromRoutine CopyFromRoutineText; +extern CopyFromRoutine CopyFromRoutineCSV; +extern CopyFromRoutine CopyFromRoutineBinary; + + typedef struct CopyToStateData *CopyToState; typedef bool (*CopyToProcessOption_function) (CopyToState cstate, DefElem *defel); @@ -113,6 +156,7 @@ typedef struct CopyFormatOptions bool convert_selectively; /* do selective binary conversion? */ CopyOnErrorChoice on_error; /* what to do when error happened */ List *convert_select; /* list of column names (can be NIL) */ + CopyFromRoutine *from_routine; /* callback routines for COPY FROM */ CopyToRoutine *to_routine; /* callback routines for COPY TO */ } CopyFormatOptions; diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index cad52fcc78370..921c1513f7296 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -183,4 +183,8 @@ typedef struct CopyFromStateData extern void ReceiveCopyBegin(CopyFromState cstate); extern void ReceiveCopyBinaryHeader(CopyFromState cstate); +extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls); +extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls); + + #endif /* COPYFROM_INTERNAL_H */ From f48e7b629a8d15fc70cd4cc4737dd2ad61910cc9 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Wed, 24 Jan 2024 11:07:14 +0900 Subject: [PATCH 06/10] Add support for adding custom COPY FROM format We use the same approach as we used for custom COPY TO format. Now, custom COPY format handler can return COPY TO format routines or COPY FROM format routines based on the "is_from" argument: copy_handler(true) returns CopyToRoutine copy_handler(false) returns CopyFromRoutine --- src/backend/commands/copy.c | 53 +++++++++++++------ src/include/commands/copyapi.h | 2 + .../expected/test_copy_format.out | 12 +++++ .../test_copy_format/sql/test_copy_format.sql | 6 +++ .../test_copy_format/test_copy_format.c | 50 +++++++++++++++-- 5 files changed, 105 insertions(+), 18 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index ec6dfff8ab4d2..479f36868c6bd 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -472,12 +472,9 @@ ProcessCopyOptionCustomFormat(ParseState *pstate, } /* custom format */ - if (!is_from) - { - funcargtypes[0] = INTERNALOID; - handlerOid = LookupFuncName(list_make1(makeString(format)), 1, - funcargtypes, true); - } + funcargtypes[0] = INTERNALOID; + handlerOid = LookupFuncName(list_make1(makeString(format)), 1, + funcargtypes, true); if (!OidIsValid(handlerOid)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -486,14 +483,36 @@ ProcessCopyOptionCustomFormat(ParseState *pstate, datum = OidFunctionCall1(handlerOid, BoolGetDatum(is_from)); routine = DatumGetPointer(datum); - if (routine == NULL || !IsA(routine, CopyToRoutine)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("COPY handler function %s(%u) did not return a CopyToRoutine struct", - format, handlerOid), - parser_errposition(pstate, defel->location))); - - opts_out->to_routine = routine; + if (is_from) + { + if (routine == NULL || !IsA(routine, CopyFromRoutine)) + ereport( + ERROR, + (errcode( + ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY handler function " + "%s(%u) did not return a " + "CopyFromRoutine struct", + format, handlerOid), + parser_errposition( + pstate, defel->location))); + opts_out->from_routine = routine; + } + else + { + if (routine == NULL || !IsA(routine, CopyToRoutine)) + ereport( + ERROR, + (errcode( + ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY handler function " + "%s(%u) did not return a " + "CopyToRoutine struct", + format, handlerOid), + parser_errposition( + pstate, defel->location))); + opts_out->to_routine = routine; + } } /* @@ -692,7 +711,11 @@ ProcessCopyOptions(ParseState *pstate, { bool processed = false; - if (!is_from) + if (is_from) + processed = + opts_out->from_routine->CopyFromProcessOption( + cstate, defel); + else processed = opts_out->to_routine->CopyToProcessOption(cstate, defel); if (!processed) ereport(ERROR, diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 323e4705d2391..ef1bb201c2729 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -30,6 +30,8 @@ typedef void (*CopyFromEnd_function) (CopyFromState cstate); /* Routines for a COPY FROM format implementation. */ typedef struct CopyFromRoutine { + NodeTag type; + /* * Called for processing one COPY FROM option. This will return false when * the given option is invalid. diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out b/src/test/modules/test_copy_format/expected/test_copy_format.out index 3a24ae7b97827..6af69f0eb70d3 100644 --- a/src/test/modules/test_copy_format/expected/test_copy_format.out +++ b/src/test/modules/test_copy_format/expected/test_copy_format.out @@ -1,6 +1,18 @@ CREATE EXTENSION test_copy_format; CREATE TABLE public.test (a INT, b INT, c INT); INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +COPY public.test FROM stdin WITH ( + option_before 'before', + format 'test_copy_format', + option_after 'after' +); +NOTICE: test_copy_format: is_from=true +NOTICE: CopyFromProcessOption: "option_before"="before" +NOTICE: CopyFromProcessOption: "option_after"="after" +NOTICE: CopyFromGetFormat +NOTICE: CopyFromStart: natts=3 +NOTICE: CopyFromOneRow +NOTICE: CopyFromEnd COPY public.test TO stdout WITH ( option_before 'before', format 'test_copy_format', diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql b/src/test/modules/test_copy_format/sql/test_copy_format.sql index 0eb7ed2e11605..94d3c789a01db 100644 --- a/src/test/modules/test_copy_format/sql/test_copy_format.sql +++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql @@ -1,6 +1,12 @@ CREATE EXTENSION test_copy_format; CREATE TABLE public.test (a INT, b INT, c INT); INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +COPY public.test FROM stdin WITH ( + option_before 'before', + format 'test_copy_format', + option_after 'after' +); +\. COPY public.test TO stdout WITH ( option_before 'before', format 'test_copy_format', diff --git a/src/test/modules/test_copy_format/test_copy_format.c b/src/test/modules/test_copy_format/test_copy_format.c index a2219afcde6fc..5e1b40e881d2f 100644 --- a/src/test/modules/test_copy_format/test_copy_format.c +++ b/src/test/modules/test_copy_format/test_copy_format.c @@ -18,6 +18,50 @@ PG_MODULE_MAGIC; +static bool +CopyFromProcessOption(CopyFromState cstate, DefElem *defel) +{ + ereport(NOTICE, + (errmsg("CopyFromProcessOption: \"%s\"=\"%s\"", + defel->defname, defGetString(defel)))); + return true; +} + +static int16 +CopyFromGetFormat(CopyFromState cstate) +{ + ereport(NOTICE, (errmsg("CopyFromGetFormat"))); + return 0; +} + +static void +CopyFromStart(CopyFromState cstate, TupleDesc tupDesc) +{ + ereport(NOTICE, (errmsg("CopyFromStart: natts=%d", tupDesc->natts))); +} + +static bool +CopyFromOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls) +{ + ereport(NOTICE, (errmsg("CopyFromOneRow"))); + return false; +} + +static void +CopyFromEnd(CopyFromState cstate) +{ + ereport(NOTICE, (errmsg("CopyFromEnd"))); +} + +static const CopyFromRoutine CopyFromRoutineTestCopyFormat = { + .type = T_CopyFromRoutine, + .CopyFromProcessOption = CopyFromProcessOption, + .CopyFromGetFormat = CopyFromGetFormat, + .CopyFromStart = CopyFromStart, + .CopyFromOneRow = CopyFromOneRow, + .CopyFromEnd = CopyFromEnd, +}; + static bool CopyToProcessOption(CopyToState cstate, DefElem *defel) { @@ -71,7 +115,7 @@ test_copy_format(PG_FUNCTION_ARGS) (errmsg("test_copy_format: is_from=%s", is_from ? "true" : "false"))); if (is_from) - elog(ERROR, "COPY FROM isn't supported yet"); - - PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat); + PG_RETURN_POINTER(&CopyFromRoutineTestCopyFormat); + else + PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat); } From 1ed575fda7f196ea411e9e53dd9c0739f160fb78 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Wed, 24 Jan 2024 14:16:29 +0900 Subject: [PATCH 07/10] Export CopyFromStateData It's for custom COPY FROM format handlers implemented as extension. This just moves codes. This doesn't change codes except CopySource enum values. CopySource enum values changes aren't required but I did like I did for CopyDest enum values. I changed COPY_ prefix to COPY_SOURCE_ prefix. For example, COPY_FILE to COPY_SOURCE_FILE. Note that this change isn't enough to implement a custom COPY FROM format handler as extension. We'll do the followings in a subsequent commit: 1. Add an opaque space for custom COPY FROM format handler 2. Export CopyReadBinaryData() to read the next data --- src/backend/commands/copyfrom.c | 4 +- src/backend/commands/copyfromparse.c | 10 +- src/include/commands/copy.h | 2 - src/include/commands/copyapi.h | 156 ++++++++++++++++++++++- src/include/commands/copyfrom_internal.h | 150 ---------------------- 5 files changed, 162 insertions(+), 160 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index d556ebb5d67ab..b4ac7cbd2c24f 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -1710,7 +1710,7 @@ BeginCopyFrom(ParseState *pstate, pg_encoding_to_char(GetDatabaseEncoding())))); } - cstate->copy_src = COPY_FILE; /* default */ + cstate->copy_src = COPY_SOURCE_FILE; /* default */ cstate->whereClause = whereClause; @@ -1829,7 +1829,7 @@ BeginCopyFrom(ParseState *pstate, if (data_source_cb) { progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; - cstate->copy_src = COPY_CALLBACK; + cstate->copy_src = COPY_SOURCE_CALLBACK; cstate->data_source_cb = data_source_cb; } else if (pipe) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 49632f75e4dd6..a78a790060460 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -181,7 +181,7 @@ ReceiveCopyBegin(CopyFromState cstate) for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); - cstate->copy_src = COPY_FRONTEND; + cstate->copy_src = COPY_SOURCE_FRONTEND; cstate->fe_msgbuf = makeStringInfo(); /* We *must* flush here to ensure FE knows it can send. */ pq_flush(); @@ -249,7 +249,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) switch (cstate->copy_src) { - case COPY_FILE: + case COPY_SOURCE_FILE: bytesread = fread(databuf, 1, maxread, cstate->copy_file); if (ferror(cstate->copy_file)) ereport(ERROR, @@ -258,7 +258,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) if (bytesread == 0) cstate->raw_reached_eof = true; break; - case COPY_FRONTEND: + case COPY_SOURCE_FRONTEND: while (maxread > 0 && bytesread < minread && !cstate->raw_reached_eof) { int avail; @@ -341,7 +341,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) bytesread += avail; } break; - case COPY_CALLBACK: + case COPY_SOURCE_CALLBACK: bytesread = cstate->data_source_cb(databuf, minread, maxread); break; } @@ -1099,7 +1099,7 @@ CopyReadLine(CopyFromState cstate) * after \. up to the protocol end of copy data. (XXX maybe better * not to treat \. as special?) */ - if (cstate->copy_src == COPY_FRONTEND) + if (cstate->copy_src == COPY_SOURCE_FRONTEND) { int inbytes; diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index df29d42555f25..cd41d320749bd 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -20,8 +20,6 @@ #include "parser/parse_node.h" #include "tcop/dest.h" -typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); - extern void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, uint64 *processed); diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index ef1bb201c2729..b7e8f627bf615 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -14,11 +14,12 @@ #ifndef COPYAPI_H #define COPYAPI_H +#include "commands/trigger.h" #include "executor/execdesc.h" #include "executor/tuptable.h" +#include "nodes/miscnodes.h" #include "nodes/parsenodes.h" -/* This is private in commands/copyfrom.c */ typedef struct CopyFromStateData *CopyFromState; typedef bool (*CopyFromProcessOption_function) (CopyFromState cstate, DefElem *defel); @@ -162,6 +163,159 @@ typedef struct CopyFormatOptions CopyToRoutine *to_routine; /* callback routines for COPY TO */ } CopyFormatOptions; + +/* + * Represents the different source cases we need to worry about at + * the bottom level + */ +typedef enum CopySource +{ + COPY_SOURCE_FILE, /* from file (or a piped program) */ + COPY_SOURCE_FRONTEND, /* from frontend */ + COPY_SOURCE_CALLBACK, /* from callback function */ +} CopySource; + +/* + * Represents the end-of-line terminator type of the input + */ +typedef enum EolType +{ + EOL_UNKNOWN, + EOL_NL, + EOL_CR, + EOL_CRNL, +} EolType; + +typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); + +/* + * This struct contains all the state variables used throughout a COPY FROM + * operation. + */ +typedef struct CopyFromStateData +{ + /* low-level state data */ + CopySource copy_src; /* type of copy source */ + FILE *copy_file; /* used if copy_src == COPY_FILE */ + StringInfo fe_msgbuf; /* used if copy_src == COPY_FRONTEND */ + + EolType eol_type; /* EOL type of input */ + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + Oid conversion_proc; /* encoding conversion function */ + + /* parameters from the COPY command */ + Relation rel; /* relation to copy from */ + List *attnumlist; /* integer list of attnums to copy */ + char *filename; /* filename, or NULL for STDIN */ + bool is_program; /* is 'filename' a program to popen? */ + copy_data_source_cb data_source_cb; /* function for reading data */ + + CopyFormatOptions opts; + bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ + Node *whereClause; /* WHERE condition (or NULL) */ + + /* these are just for error messages, see CopyFromErrorCallback */ + const char *cur_relname; /* table name for error messages */ + uint64 cur_lineno; /* line number for error messages */ + const char *cur_attname; /* current att for error messages */ + const char *cur_attval; /* current att value for error messages */ + bool relname_only; /* don't output line number, att, etc. */ + + /* + * Working state + */ + MemoryContext copycontext; /* per-copy execution context */ + + AttrNumber num_defaults; /* count of att that are missing and have + * default value */ + FmgrInfo *in_functions; /* array of input functions for each attrs */ + Oid *typioparams; /* array of element types for in_functions */ + ErrorSaveContext *escontext; /* soft error trapper during in_functions + * execution */ + uint64 num_errors; /* total number of rows which contained soft + * errors */ + int *defmap; /* array of default att numbers related to + * missing att */ + ExprState **defexprs; /* array of default att expressions for all + * att */ + bool *defaults; /* if DEFAULT marker was found for + * corresponding att */ + bool volatile_defexprs; /* is any of defexprs volatile? */ + List *range_table; /* single element list of RangeTblEntry */ + List *rteperminfos; /* single element list of RTEPermissionInfo */ + ExprState *qualexpr; + + TransitionCaptureState *transition_capture; + + /* + * These variables are used to reduce overhead in COPY FROM. + * + * attribute_buf holds the separated, de-escaped text for each field of + * the current line. The CopyReadAttributes functions return arrays of + * pointers into this buffer. We avoid palloc/pfree overhead by re-using + * the buffer on each cycle. + * + * In binary COPY FROM, attribute_buf holds the binary data for the + * current field, but the usage is otherwise similar. + */ + StringInfoData attribute_buf; + + /* field raw data pointers found by COPY FROM */ + + int max_fields; + char **raw_fields; + + /* + * Similarly, line_buf holds the whole input line being processed. The + * input cycle is first to read the whole line into line_buf, and then + * extract the individual attribute fields into attribute_buf. line_buf + * is preserved unmodified so that we can display it in error messages if + * appropriate. (In binary mode, line_buf is not used.) + */ + StringInfoData line_buf; + bool line_buf_valid; /* contains the row being processed? */ + + /* + * input_buf holds input data, already converted to database encoding. + * + * In text mode, CopyReadLine parses this data sufficiently to locate line + * boundaries, then transfers the data to line_buf. We guarantee that + * there is a \0 at input_buf[input_buf_len] at all times. (In binary + * mode, input_buf is not used.) + * + * If encoding conversion is not required, input_buf is not a separate + * buffer but points directly to raw_buf. In that case, input_buf_len + * tracks the number of bytes that have been verified as valid in the + * database encoding, and raw_buf_len is the total number of bytes stored + * in the buffer. + */ +#define INPUT_BUF_SIZE 65536 /* we palloc INPUT_BUF_SIZE+1 bytes */ + char *input_buf; + int input_buf_index; /* next byte to process */ + int input_buf_len; /* total # of bytes stored */ + bool input_reached_eof; /* true if we reached EOF */ + bool input_reached_error; /* true if a conversion error happened */ + /* Shorthand for number of unconsumed bytes available in input_buf */ +#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index) + + /* + * raw_buf holds raw input data read from the data source (file or client + * connection), not yet converted to the database encoding. Like with + * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len]. + */ +#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ + char *raw_buf; + int raw_buf_index; /* next byte to process */ + int raw_buf_len; /* total # of bytes stored */ + bool raw_reached_eof; /* true if we reached EOF */ + + /* Shorthand for number of unconsumed bytes available in raw_buf */ +#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) + + uint64 bytes_processed; /* number of bytes processed so far */ +} CopyFromStateData; + /* * Represents the different dest cases we need to worry about at * the bottom level diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index 921c1513f7296..f8f6120255067 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -18,28 +18,6 @@ #include "commands/trigger.h" #include "nodes/miscnodes.h" -/* - * Represents the different source cases we need to worry about at - * the bottom level - */ -typedef enum CopySource -{ - COPY_FILE, /* from file (or a piped program) */ - COPY_FRONTEND, /* from frontend */ - COPY_CALLBACK, /* from callback function */ -} CopySource; - -/* - * Represents the end-of-line terminator type of the input - */ -typedef enum EolType -{ - EOL_UNKNOWN, - EOL_NL, - EOL_CR, - EOL_CRNL, -} EolType; - /* * Represents the insert method to be used during COPY FROM. */ @@ -52,134 +30,6 @@ typedef enum CopyInsertMethod * ExecForeignBatchInsert only if valid */ } CopyInsertMethod; -/* - * This struct contains all the state variables used throughout a COPY FROM - * operation. - */ -typedef struct CopyFromStateData -{ - /* low-level state data */ - CopySource copy_src; /* type of copy source */ - FILE *copy_file; /* used if copy_src == COPY_FILE */ - StringInfo fe_msgbuf; /* used if copy_src == COPY_FRONTEND */ - - EolType eol_type; /* EOL type of input */ - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - Oid conversion_proc; /* encoding conversion function */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDIN */ - bool is_program; /* is 'filename' a program to popen? */ - copy_data_source_cb data_source_cb; /* function for reading data */ - - CopyFormatOptions opts; - bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ - Node *whereClause; /* WHERE condition (or NULL) */ - - /* these are just for error messages, see CopyFromErrorCallback */ - const char *cur_relname; /* table name for error messages */ - uint64 cur_lineno; /* line number for error messages */ - const char *cur_attname; /* current att for error messages */ - const char *cur_attval; /* current att value for error messages */ - bool relname_only; /* don't output line number, att, etc. */ - - /* - * Working state - */ - MemoryContext copycontext; /* per-copy execution context */ - - AttrNumber num_defaults; /* count of att that are missing and have - * default value */ - FmgrInfo *in_functions; /* array of input functions for each attrs */ - Oid *typioparams; /* array of element types for in_functions */ - ErrorSaveContext *escontext; /* soft error trapper during in_functions - * execution */ - uint64 num_errors; /* total number of rows which contained soft - * errors */ - int *defmap; /* array of default att numbers related to - * missing att */ - ExprState **defexprs; /* array of default att expressions for all - * att */ - bool *defaults; /* if DEFAULT marker was found for - * corresponding att */ - bool volatile_defexprs; /* is any of defexprs volatile? */ - List *range_table; /* single element list of RangeTblEntry */ - List *rteperminfos; /* single element list of RTEPermissionInfo */ - ExprState *qualexpr; - - TransitionCaptureState *transition_capture; - - /* - * These variables are used to reduce overhead in COPY FROM. - * - * attribute_buf holds the separated, de-escaped text for each field of - * the current line. The CopyReadAttributes functions return arrays of - * pointers into this buffer. We avoid palloc/pfree overhead by re-using - * the buffer on each cycle. - * - * In binary COPY FROM, attribute_buf holds the binary data for the - * current field, but the usage is otherwise similar. - */ - StringInfoData attribute_buf; - - /* field raw data pointers found by COPY FROM */ - - int max_fields; - char **raw_fields; - - /* - * Similarly, line_buf holds the whole input line being processed. The - * input cycle is first to read the whole line into line_buf, and then - * extract the individual attribute fields into attribute_buf. line_buf - * is preserved unmodified so that we can display it in error messages if - * appropriate. (In binary mode, line_buf is not used.) - */ - StringInfoData line_buf; - bool line_buf_valid; /* contains the row being processed? */ - - /* - * input_buf holds input data, already converted to database encoding. - * - * In text mode, CopyReadLine parses this data sufficiently to locate line - * boundaries, then transfers the data to line_buf. We guarantee that - * there is a \0 at input_buf[input_buf_len] at all times. (In binary - * mode, input_buf is not used.) - * - * If encoding conversion is not required, input_buf is not a separate - * buffer but points directly to raw_buf. In that case, input_buf_len - * tracks the number of bytes that have been verified as valid in the - * database encoding, and raw_buf_len is the total number of bytes stored - * in the buffer. - */ -#define INPUT_BUF_SIZE 65536 /* we palloc INPUT_BUF_SIZE+1 bytes */ - char *input_buf; - int input_buf_index; /* next byte to process */ - int input_buf_len; /* total # of bytes stored */ - bool input_reached_eof; /* true if we reached EOF */ - bool input_reached_error; /* true if a conversion error happened */ - /* Shorthand for number of unconsumed bytes available in input_buf */ -#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index) - - /* - * raw_buf holds raw input data read from the data source (file or client - * connection), not yet converted to the database encoding. Like with - * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len]. - */ -#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ - char *raw_buf; - int raw_buf_index; /* next byte to process */ - int raw_buf_len; /* total # of bytes stored */ - bool raw_reached_eof; /* true if we reached EOF */ - - /* Shorthand for number of unconsumed bytes available in raw_buf */ -#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) - - uint64 bytes_processed; /* number of bytes processed so far */ -} CopyFromStateData; - extern void ReceiveCopyBegin(CopyFromState cstate); extern void ReceiveCopyBinaryHeader(CopyFromState cstate); From 3e847de1acb2fd6966ef01192204448711ca3d5e Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Wed, 24 Jan 2024 14:19:08 +0900 Subject: [PATCH 08/10] Add support for implementing custom COPY FROM format as extension * Add CopyFromStateData::opaque that can be used to keep data for custom COPY From format implementation * Export CopyReadBinaryData() to read the next data * Rename CopyReadBinaryData() to CopyFromStateRead() because it's a method for CopyFromState and "BinaryData" is redundant. --- src/backend/commands/copyfromparse.c | 21 ++++++++++----------- src/include/commands/copyapi.h | 5 +++++ 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index a78a790060460..f8a194635d65e 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -165,7 +165,6 @@ static int CopyGetData(CopyFromState cstate, void *databuf, static inline bool CopyGetInt32(CopyFromState cstate, int32 *val); static inline bool CopyGetInt16(CopyFromState cstate, int16 *val); static void CopyLoadInputBuf(CopyFromState cstate); -static int CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes); void ReceiveCopyBegin(CopyFromState cstate) @@ -194,7 +193,7 @@ ReceiveCopyBinaryHeader(CopyFromState cstate) int32 tmp; /* Signature */ - if (CopyReadBinaryData(cstate, readSig, 11) != 11 || + if (CopyFromStateRead(cstate, readSig, 11) != 11 || memcmp(readSig, BinarySignature, 11) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), @@ -222,7 +221,7 @@ ReceiveCopyBinaryHeader(CopyFromState cstate) /* Skip extension header, if present */ while (tmp-- > 0) { - if (CopyReadBinaryData(cstate, readSig, 1) != 1) + if (CopyFromStateRead(cstate, readSig, 1) != 1) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (wrong length)"))); @@ -364,7 +363,7 @@ CopyGetInt32(CopyFromState cstate, int32 *val) { uint32 buf; - if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) + if (CopyFromStateRead(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) { *val = 0; /* suppress compiler warning */ return false; @@ -381,7 +380,7 @@ CopyGetInt16(CopyFromState cstate, int16 *val) { uint16 buf; - if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) + if (CopyFromStateRead(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) { *val = 0; /* suppress compiler warning */ return false; @@ -692,14 +691,14 @@ CopyLoadInputBuf(CopyFromState cstate) } /* - * CopyReadBinaryData + * CopyFromStateRead * * Reads up to 'nbytes' bytes from cstate->copy_file via cstate->raw_buf * and writes them to 'dest'. Returns the number of bytes read (which * would be less than 'nbytes' only if we reach EOF). */ -static int -CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes) +int +CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes) { int copied_bytes = 0; @@ -988,7 +987,7 @@ CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, */ char dummy; - if (CopyReadBinaryData(cstate, &dummy, 1) > 0) + if (CopyFromStateRead(cstate, &dummy, 1) > 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("received copy data after EOF marker"))); @@ -1997,8 +1996,8 @@ CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo, resetStringInfo(&cstate->attribute_buf); enlargeStringInfo(&cstate->attribute_buf, fld_size); - if (CopyReadBinaryData(cstate, cstate->attribute_buf.data, - fld_size) != fld_size) + if (CopyFromStateRead(cstate, cstate->attribute_buf.data, + fld_size) != fld_size) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index b7e8f627bf615..22accc83ab86d 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -314,8 +314,13 @@ typedef struct CopyFromStateData #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) uint64 bytes_processed; /* number of bytes processed so far */ + + /* For custom format implementation */ + void *opaque; /* private space */ } CopyFromStateData; +extern int CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes); + /* * Represents the different dest cases we need to worry about at * the bottom level From f0a8151feff44823881c3c4e1e7aca4f9bd690d5 Mon Sep 17 00:00:00 2001 From: Zhao Junwang Date: Sat, 27 Jan 2024 09:53:31 +0800 Subject: [PATCH 09/10] change CopyToGetFormat to CopyToSendCopyBegin and export more api Signed-off-by: Zhao Junwang --- src/backend/commands/copyto.c | 65 ++++++++++--------- src/include/commands/copyapi.h | 12 ++-- .../test_copy_format/test_copy_format.c | 7 +- 3 files changed, 46 insertions(+), 38 deletions(-) diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index b5d8678394ffe..e2a4964015fdb 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -66,11 +66,6 @@ static void CopyAttributeOutCSV(CopyToState cstate, const char *string, /* Low-level communications functions */ static void SendCopyBegin(CopyToState cstate); static void SendCopyEnd(CopyToState cstate); -static void CopySendData(CopyToState cstate, const void *databuf, int datasize); -static void CopySendString(CopyToState cstate, const char *str); -static void CopySendChar(CopyToState cstate, char c); -static void CopySendInt32(CopyToState cstate, int32 val); -static void CopySendInt16(CopyToState cstate, int16 val); /* * CopyToRoutine implementations. @@ -90,10 +85,20 @@ CopyToTextProcessOption(CopyToState cstate, DefElem *defel) return false; } -static int16 -CopyToTextGetFormat(CopyToState cstate) +static void +CopyToTextSendCopyBegin(CopyToState cstate) { - return 0; + StringInfoData buf; + int natts = list_length(cstate->attnumlist); + int16 format = 0; + int i; + + pq_beginmessage(&buf, PqMsg_CopyOutResponse); + pq_sendbyte(&buf, format); /* overall format */ + pq_sendint16(&buf, natts); + for (i = 0; i < natts; i++) + pq_sendint16(&buf, format); /* per-column formats */ + pq_endmessage(&buf); } static void @@ -230,10 +235,20 @@ CopyToBinaryProcessOption(CopyToState cstate, DefElem *defel) return false; } -static int16 -CopyToBinaryGetFormat(CopyToState cstate) +static void +CopyToBinarySendCopyBegin(CopyToState cstate) { - return 1; + StringInfoData buf; + int natts = list_length(cstate->attnumlist); + int16 format = 1; + int i; + + pq_beginmessage(&buf, PqMsg_CopyOutResponse); + pq_sendbyte(&buf, format); /* overall format */ + pq_sendint16(&buf, natts); + for (i = 0; i < natts; i++) + pq_sendint16(&buf, format); /* per-column formats */ + pq_endmessage(&buf); } static void @@ -315,7 +330,7 @@ CopyToBinaryEnd(CopyToState cstate) CopyToRoutine CopyToRoutineText = { .CopyToProcessOption = CopyToTextProcessOption, - .CopyToGetFormat = CopyToTextGetFormat, + .CopyToSendCopyBegin = CopyToTextSendCopyBegin, .CopyToStart = CopyToTextStart, .CopyToOneRow = CopyToTextOneRow, .CopyToEnd = CopyToTextEnd, @@ -328,7 +343,7 @@ CopyToRoutine CopyToRoutineText = { */ CopyToRoutine CopyToRoutineCSV = { .CopyToProcessOption = CopyToTextProcessOption, - .CopyToGetFormat = CopyToTextGetFormat, + .CopyToSendCopyBegin = CopyToTextSendCopyBegin, .CopyToStart = CopyToTextStart, .CopyToOneRow = CopyToTextOneRow, .CopyToEnd = CopyToTextEnd, @@ -336,7 +351,7 @@ CopyToRoutine CopyToRoutineCSV = { CopyToRoutine CopyToRoutineBinary = { .CopyToProcessOption = CopyToBinaryProcessOption, - .CopyToGetFormat = CopyToBinaryGetFormat, + .CopyToSendCopyBegin = CopyToBinarySendCopyBegin, .CopyToStart = CopyToBinaryStart, .CopyToOneRow = CopyToBinaryOneRow, .CopyToEnd = CopyToBinaryEnd, @@ -349,17 +364,7 @@ CopyToRoutine CopyToRoutineBinary = { static void SendCopyBegin(CopyToState cstate) { - StringInfoData buf; - int natts = list_length(cstate->attnumlist); - int16 format = cstate->opts.to_routine->CopyToGetFormat(cstate); - int i; - - pq_beginmessage(&buf, PqMsg_CopyOutResponse); - pq_sendbyte(&buf, format); /* overall format */ - pq_sendint16(&buf, natts); - for (i = 0; i < natts; i++) - pq_sendint16(&buf, format); /* per-column formats */ - pq_endmessage(&buf); + cstate->opts.to_routine->CopyToSendCopyBegin(cstate); cstate->copy_dest = COPY_DEST_FRONTEND; } @@ -382,19 +387,19 @@ SendCopyEnd(CopyToState cstate) * NB: no data conversion is applied by these functions *---------- */ -static void +void CopySendData(CopyToState cstate, const void *databuf, int datasize) { appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize); } -static void +void CopySendString(CopyToState cstate, const char *str) { appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str)); } -static void +void CopySendChar(CopyToState cstate, char c) { appendStringInfoCharMacro(cstate->fe_msgbuf, c); @@ -464,7 +469,7 @@ CopyToStateFlush(CopyToState cstate) /* * CopySendInt32 sends an int32 in network byte order */ -static inline void +inline void CopySendInt32(CopyToState cstate, int32 val) { uint32 buf; @@ -476,7 +481,7 @@ CopySendInt32(CopyToState cstate, int32 val) /* * CopySendInt16 sends an int16 in network byte order */ -static inline void +inline void CopySendInt16(CopyToState cstate, int16 val) { uint16 buf; diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 22accc83ab86d..0a05b24c54b14 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -67,7 +67,7 @@ extern CopyFromRoutine CopyFromRoutineBinary; typedef struct CopyToStateData *CopyToState; typedef bool (*CopyToProcessOption_function) (CopyToState cstate, DefElem *defel); -typedef int16 (*CopyToGetFormat_function) (CopyToState cstate); +typedef void (*CopyToSendCopyBegin_function) (CopyToState cstate); typedef void (*CopyToStart_function) (CopyToState cstate, TupleDesc tupDesc); typedef void (*CopyToOneRow_function) (CopyToState cstate, TupleTableSlot *slot); typedef void (*CopyToEnd_function) (CopyToState cstate); @@ -84,10 +84,9 @@ typedef struct CopyToRoutine CopyToProcessOption_function CopyToProcessOption; /* - * Called when COPY TO is started. This will return a format as int16 - * value. It's used for the CopyOutResponse message. + * Called when COPY TO is started. */ - CopyToGetFormat_function CopyToGetFormat; + CopyToSendCopyBegin_function CopyToSendCopyBegin; /* Called when COPY TO is started. This will send a header. */ CopyToStart_function CopyToStart; @@ -384,6 +383,11 @@ typedef struct CopyToStateData void *opaque; /* private space */ } CopyToStateData; +extern void CopySendData(CopyToState cstate, const void *databuf, int datasize); +extern void CopySendString(CopyToState cstate, const char *str); +extern void CopySendChar(CopyToState cstate, char c); +extern void CopySendInt32(CopyToState cstate, int32 val); +extern void CopySendInt16(CopyToState cstate, int16 val); extern void CopyToStateFlush(CopyToState cstate); #endif /* COPYAPI_H */ diff --git a/src/test/modules/test_copy_format/test_copy_format.c b/src/test/modules/test_copy_format/test_copy_format.c index 5e1b40e881d2f..d833f22bbf4f7 100644 --- a/src/test/modules/test_copy_format/test_copy_format.c +++ b/src/test/modules/test_copy_format/test_copy_format.c @@ -71,11 +71,10 @@ CopyToProcessOption(CopyToState cstate, DefElem *defel) return true; } -static int16 -CopyToGetFormat(CopyToState cstate) +static void +CopyToSendCopyBegin(CopyToState cstate) { ereport(NOTICE, (errmsg("CopyToGetFormat"))); - return 0; } static void @@ -99,7 +98,7 @@ CopyToEnd(CopyToState cstate) static const CopyToRoutine CopyToRoutineTestCopyFormat = { .type = T_CopyToRoutine, .CopyToProcessOption = CopyToProcessOption, - .CopyToGetFormat = CopyToGetFormat, + .CopyToSendCopyBegin = CopyToSendCopyBegin, .CopyToStart = CopyToStart, .CopyToOneRow = CopyToOneRow, .CopyToEnd = CopyToEnd, From 7dc6c1c798178f31728d048d4d528181626b3695 Mon Sep 17 00:00:00 2001 From: Zhao Junwang Date: Sat, 27 Jan 2024 13:34:38 +0800 Subject: [PATCH 10/10] introduce contrib/pg_copy_json Signed-off-by: Zhao Junwang --- contrib/Makefile | 1 + contrib/meson.build | 1 + contrib/pg_copy_json/.gitignore | 4 + contrib/pg_copy_json/Makefile | 23 ++ .../pg_copy_json/expected/pg_copy_json.out | 80 +++++++ contrib/pg_copy_json/meson.build | 34 +++ contrib/pg_copy_json/pg_copy_json--1.0.sql | 9 + contrib/pg_copy_json/pg_copy_json.c | 218 ++++++++++++++++++ contrib/pg_copy_json/pg_copy_json.control | 5 + contrib/pg_copy_json/sql/pg_copy_json.sql | 59 +++++ src/backend/utils/adt/json.c | 5 +- src/include/utils/json.h | 2 + 12 files changed, 438 insertions(+), 3 deletions(-) create mode 100644 contrib/pg_copy_json/.gitignore create mode 100644 contrib/pg_copy_json/Makefile create mode 100644 contrib/pg_copy_json/expected/pg_copy_json.out create mode 100644 contrib/pg_copy_json/meson.build create mode 100644 contrib/pg_copy_json/pg_copy_json--1.0.sql create mode 100644 contrib/pg_copy_json/pg_copy_json.c create mode 100644 contrib/pg_copy_json/pg_copy_json.control create mode 100644 contrib/pg_copy_json/sql/pg_copy_json.sql diff --git a/contrib/Makefile b/contrib/Makefile index da4e2316a3b0c..82cc496aa2e1f 100644 --- a/contrib/Makefile +++ b/contrib/Makefile @@ -32,6 +32,7 @@ SUBDIRS = \ pageinspect \ passwordcheck \ pg_buffercache \ + pg_copy_json \ pg_freespacemap \ pg_prewarm \ pg_stat_statements \ diff --git a/contrib/meson.build b/contrib/meson.build index c12dc906ca765..38933d15d122e 100644 --- a/contrib/meson.build +++ b/contrib/meson.build @@ -45,6 +45,7 @@ subdir('oid2name') subdir('pageinspect') subdir('passwordcheck') subdir('pg_buffercache') +subdir('pg_copy_json') subdir('pgcrypto') subdir('pg_freespacemap') subdir('pg_prewarm') diff --git a/contrib/pg_copy_json/.gitignore b/contrib/pg_copy_json/.gitignore new file mode 100644 index 0000000000000..5dcb3ff972350 --- /dev/null +++ b/contrib/pg_copy_json/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/contrib/pg_copy_json/Makefile b/contrib/pg_copy_json/Makefile new file mode 100644 index 0000000000000..b0a348d618523 --- /dev/null +++ b/contrib/pg_copy_json/Makefile @@ -0,0 +1,23 @@ +# contrib/pg_copy_json//Makefile + +MODULE_big = pg_copy_json +OBJS = \ + $(WIN32RES) \ + pg_copy_json.o +PGFILEDESC = "pg_copy_json - COPY TO JSON (JavaScript Object Notation) format" + +EXTENSION = pg_copy_json +DATA = pg_copy_json--1.0.sql + +REGRESS = test_copy_format + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/pg_copy_json +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/pg_copy_json/expected/pg_copy_json.out b/contrib/pg_copy_json/expected/pg_copy_json.out new file mode 100644 index 0000000000000..73633c23033aa --- /dev/null +++ b/contrib/pg_copy_json/expected/pg_copy_json.out @@ -0,0 +1,80 @@ +-- +-- COPY TO JSON +-- +CREATE EXTENSION pg_copy_json; +-- test copying in JSON format with various styles +-- of embedded line ending characters +create temp table copytest ( + style text, + test text, + filler int); +insert into copytest values('DOS',E'abc\r\ndef',1); +insert into copytest values('Unix',E'abc\ndef',2); +insert into copytest values('Mac',E'abc\rdef',3); +insert into copytest values(E'esc\\ape',E'a\\r\\\r\\\n\\nb',4); +copy copytest to stdout with (format 'json'); +{"style":"DOS","test":"abc\r\ndef","filler":1} +{"style":"Unix","test":"abc\ndef","filler":2} +{"style":"Mac","test":"abc\rdef","filler":3} +{"style":"esc\\ape","test":"a\\r\\\r\\\n\\nb","filler":4} +-- pg_copy_json do not support COPY FROM +copy copytest from stdout with (format 'json'); +ERROR: cannot use JSON mode in COPY FROM +-- test copying in JSON format with various styles +-- of embedded escaped characters +create temp table copyjsontest ( + id bigserial, + f1 text, + f2 timestamptz); +insert into copyjsontest + select g.i, + CASE WHEN g.i % 2 = 0 THEN + 'line with '' in it: ' || g.i::text + ELSE + 'line with " in it: ' || g.i::text + END, + 'Mon Feb 10 17:32:01 1997 PST' + from generate_series(1,5) as g(i); +insert into copyjsontest (f1) values +(E'aaa\"bbb'::text), +(E'aaa\\bbb'::text), +(E'aaa\/bbb'::text), +(E'aaa\bbbb'::text), +(E'aaa\fbbb'::text), +(E'aaa\nbbb'::text), +(E'aaa\rbbb'::text), +(E'aaa\tbbb'::text); +copy copyjsontest to stdout with (format 'json'); +{"id":1,"f1":"line with \" in it: 1","f2":"1997-02-10T17:32:01-08:00"} +{"id":2,"f1":"line with ' in it: 2","f2":"1997-02-10T17:32:01-08:00"} +{"id":3,"f1":"line with \" in it: 3","f2":"1997-02-10T17:32:01-08:00"} +{"id":4,"f1":"line with ' in it: 4","f2":"1997-02-10T17:32:01-08:00"} +{"id":5,"f1":"line with \" in it: 5","f2":"1997-02-10T17:32:01-08:00"} +{"id":1,"f1":"aaa\"bbb","f2":null} +{"id":2,"f1":"aaa\\bbb","f2":null} +{"id":3,"f1":"aaa/bbb","f2":null} +{"id":4,"f1":"aaa\bbbb","f2":null} +{"id":5,"f1":"aaa\fbbb","f2":null} +{"id":6,"f1":"aaa\nbbb","f2":null} +{"id":7,"f1":"aaa\rbbb","f2":null} +{"id":8,"f1":"aaa\tbbb","f2":null} +-- test force array +copy copytest to stdout (format 'json', force_array); +[ + {"style":"DOS","test":"abc\r\ndef","filler":1} +,{"style":"Unix","test":"abc\ndef","filler":2} +,{"style":"Mac","test":"abc\rdef","filler":3} +,{"style":"esc\\ape","test":"a\\r\\\r\\\n\\nb","filler":4} +] +copy copytest to stdout (format 'json', force_array true); +[ + {"style":"DOS","test":"abc\r\ndef","filler":1} +,{"style":"Unix","test":"abc\ndef","filler":2} +,{"style":"Mac","test":"abc\rdef","filler":3} +,{"style":"esc\\ape","test":"a\\r\\\r\\\n\\nb","filler":4} +] +copy copytest to stdout (format 'json', force_array false); +{"style":"DOS","test":"abc\r\ndef","filler":1} +{"style":"Unix","test":"abc\ndef","filler":2} +{"style":"Mac","test":"abc\rdef","filler":3} +{"style":"esc\\ape","test":"a\\r\\\r\\\n\\nb","filler":4} diff --git a/contrib/pg_copy_json/meson.build b/contrib/pg_copy_json/meson.build new file mode 100644 index 0000000000000..71f9338267eda --- /dev/null +++ b/contrib/pg_copy_json/meson.build @@ -0,0 +1,34 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +pg_copy_json_sources = files( + 'pg_copy_json.c', +) + +if host_system == 'windows' + pg_copy_json_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'pg_copy_json', + '--FILEDESC', 'pg_copy_json - COPY TO JSON format',]) +endif + +pg_copy_json = shared_module('pg_copy_json', + pg_copy_json_sources, + kwargs: contrib_mod_args, +) +contrib_targets += pg_copy_json + +install_data( + 'pg_copy_json--1.0.sql', + 'pg_copy_json.control', + kwargs: contrib_data_args, +) + +tests += { + 'name': 'pg_copy_json', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'regress': { + 'sql': [ + 'pg_copy_json', + ], + }, +} diff --git a/contrib/pg_copy_json/pg_copy_json--1.0.sql b/contrib/pg_copy_json/pg_copy_json--1.0.sql new file mode 100644 index 0000000000000..d738a1e7e9f8f --- /dev/null +++ b/contrib/pg_copy_json/pg_copy_json--1.0.sql @@ -0,0 +1,9 @@ +/* contrib/pg_copy_json/copy_json--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION pg_copy_json" to load this file. \quit + +CREATE FUNCTION pg_catalog.json(internal) + RETURNS copy_handler + AS 'MODULE_PATHNAME', 'copy_json' + LANGUAGE C; diff --git a/contrib/pg_copy_json/pg_copy_json.c b/contrib/pg_copy_json/pg_copy_json.c new file mode 100644 index 0000000000000..cbfdee8e8b974 --- /dev/null +++ b/contrib/pg_copy_json/pg_copy_json.c @@ -0,0 +1,218 @@ +/*-------------------------------------------------------------------------- + * + * pg_copy_json.c + * COPY TO JSON (JavaScript Object Notation) format. + * + * Portions Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/test_copy_format.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "commands/copy.h" +#include "commands/defrem.h" +#include "funcapi.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "utils/json.h" + +PG_MODULE_MAGIC; + +typedef struct +{ + /* + * Force output of square brackets as array decorations at the beginning + * and end of output, with commas between the rows. + */ + bool force_array; + bool force_array_specified; + + /* need delimiter to start next json array element */ + bool json_row_delim_needed; +} CopyJsonData; + +static inline void +InitCopyJsonData(CopyJsonData *p) +{ + Assert(p); + p->force_array = false; + p->force_array_specified = false; + p->json_row_delim_needed = false; +} + +static void +CopyToJsonSendEndOfRow(CopyToState cstate) +{ + switch (cstate->copy_dest) + { + case COPY_DEST_FILE: + /* Default line termination depends on platform */ +#ifndef WIN32 + CopySendChar(cstate, '\n'); +#else + CopySendString(cstate, "\r\n"); +#endif + break; + case COPY_DEST_FRONTEND: + /* The FE/BE protocol uses \n as newline for all platforms */ + CopySendChar(cstate, '\n'); + break; + default: + break; + } + CopyToStateFlush(cstate); +} + +static bool +CopyToJsonProcessOption(CopyToState cstate, DefElem *defel) +{ + CopyJsonData *p; + + if (cstate->opaque == NULL) + { + MemoryContext oldcontext; + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + cstate->opaque = palloc0(sizeof(CopyJsonData)); + MemoryContextSwitchTo(oldcontext); + InitCopyJsonData(cstate->opaque); + } + + p = (CopyJsonData *)cstate->opaque; + + if (strcmp(defel->defname, "force_array") == 0) + { + if (p->force_array_specified) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("CopyToJsonProcessOption: redundant options \"%s\"=\"%s\"", + defel->defname, defGetString(defel))); + p->force_array_specified = true; + p->force_array = defGetBoolean(defel); + + return true; + } + + return false; +} + +static void +CopyToJsonSendCopyBegin(CopyToState cstate) +{ + StringInfoData buf; + int16 format = 0; + + pq_beginmessage(&buf, PqMsg_CopyOutResponse); + pq_sendbyte(&buf, format); /* overall format */ + /* + * JSON mode is always one non-binary column + */ + pq_sendint16(&buf, 1); + pq_sendint16(&buf, 0); + pq_endmessage(&buf); +} + +static void +CopyToJsonStart(CopyToState cstate, TupleDesc tupDesc) +{ + CopyJsonData *p; + + if (cstate->opaque == NULL) + { + MemoryContext oldcontext; + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + cstate->opaque = palloc0(sizeof(CopyJsonData)); + MemoryContextSwitchTo(oldcontext); + InitCopyJsonData(cstate->opaque); + } + + /* No need to alloc cstate->out_functions */ + + p = (CopyJsonData *)cstate->opaque; + + /* If FORCE_ARRAY has been specified send the open bracket. */ + if (p->force_array) + { + CopySendChar(cstate, '['); + CopyToJsonSendEndOfRow(cstate); + } +} + +static void +CopyToJsonOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + Datum rowdata; + StringInfo result; + CopyJsonData *p; + + Assert(cstate->opaque); + p = (CopyJsonData *)cstate->opaque; + + if(!cstate->rel) + { + for (int i = 0; i < slot->tts_tupleDescriptor->natts; i++) + { + /* Flat-copy the attribute array */ + memcpy(TupleDescAttr(slot->tts_tupleDescriptor, i), + TupleDescAttr(cstate->queryDesc->tupDesc, i), + 1 * sizeof(FormData_pg_attribute)); + } + BlessTupleDesc(slot->tts_tupleDescriptor); + } + rowdata = ExecFetchSlotHeapTupleDatum(slot); + result = makeStringInfo(); + composite_to_json(rowdata, result, false); + + if (p->json_row_delim_needed) + CopySendChar(cstate, ','); + else if (p->force_array) + { + /* first row needs no delimiter */ + CopySendChar(cstate, ' '); + p->json_row_delim_needed = true; + } + CopySendData(cstate, result->data, result->len); + CopyToJsonSendEndOfRow(cstate); +} + +static void +CopyToJsonEnd(CopyToState cstate) +{ + CopyJsonData *p; + + Assert(cstate->opaque); + p = (CopyJsonData *)cstate->opaque; + + /* If FORCE_ARRAY has been specified send the close bracket. */ + if (p->force_array) + { + CopySendChar(cstate, ']'); + CopyToJsonSendEndOfRow(cstate); + } +} + +static const CopyToRoutine CopyToRoutineJson = { + .type = T_CopyToRoutine, + .CopyToProcessOption = CopyToJsonProcessOption, + .CopyToSendCopyBegin = CopyToJsonSendCopyBegin, + .CopyToStart = CopyToJsonStart, + .CopyToOneRow = CopyToJsonOneRow, + .CopyToEnd = CopyToJsonEnd, +}; + +PG_FUNCTION_INFO_V1(copy_json); +Datum +copy_json(PG_FUNCTION_ARGS) +{ + bool is_from = PG_GETARG_BOOL(0); + + if (is_from) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use JSON mode in COPY FROM"))); + + PG_RETURN_POINTER(&CopyToRoutineJson); +} diff --git a/contrib/pg_copy_json/pg_copy_json.control b/contrib/pg_copy_json/pg_copy_json.control new file mode 100644 index 0000000000000..90b0a74603b4f --- /dev/null +++ b/contrib/pg_copy_json/pg_copy_json.control @@ -0,0 +1,5 @@ +# pg_copy_json extension +comment = 'COPY TO JSON format' +default_version = '1.0' +module_pathname = '$libdir/pg_copy_json' +relocatable = true diff --git a/contrib/pg_copy_json/sql/pg_copy_json.sql b/contrib/pg_copy_json/sql/pg_copy_json.sql new file mode 100644 index 0000000000000..73e7e514ac7c8 --- /dev/null +++ b/contrib/pg_copy_json/sql/pg_copy_json.sql @@ -0,0 +1,59 @@ +-- +-- COPY TO JSON +-- + +CREATE EXTENSION pg_copy_json; + +-- test copying in JSON format with various styles +-- of embedded line ending characters + +create temp table copytest ( + style text, + test text, + filler int); + +insert into copytest values('DOS',E'abc\r\ndef',1); +insert into copytest values('Unix',E'abc\ndef',2); +insert into copytest values('Mac',E'abc\rdef',3); +insert into copytest values(E'esc\\ape',E'a\\r\\\r\\\n\\nb',4); + +copy copytest to stdout with (format 'json'); + +-- pg_copy_json do not support COPY FROM +copy copytest from stdout with (format 'json'); + +-- test copying in JSON format with various styles +-- of embedded escaped characters + +create temp table copyjsontest ( + id bigserial, + f1 text, + f2 timestamptz); + +insert into copyjsontest + select g.i, + CASE WHEN g.i % 2 = 0 THEN + 'line with '' in it: ' || g.i::text + ELSE + 'line with " in it: ' || g.i::text + END, + 'Mon Feb 10 17:32:01 1997 PST' + from generate_series(1,5) as g(i); + +insert into copyjsontest (f1) values +(E'aaa\"bbb'::text), +(E'aaa\\bbb'::text), +(E'aaa\/bbb'::text), +(E'aaa\bbbb'::text), +(E'aaa\fbbb'::text), +(E'aaa\nbbb'::text), +(E'aaa\rbbb'::text), +(E'aaa\tbbb'::text); + +copy copyjsontest to stdout with (format 'json'); + +-- test force array + +copy copytest to stdout (format 'json', force_array); +copy copytest to stdout (format 'json', force_array true); +copy copytest to stdout (format 'json', force_array false); diff --git a/src/backend/utils/adt/json.c b/src/backend/utils/adt/json.c index d719a61f16bad..fabd4e611e1cf 100644 --- a/src/backend/utils/adt/json.c +++ b/src/backend/utils/adt/json.c @@ -83,8 +83,6 @@ typedef struct JsonAggState JsonUniqueBuilderState unique_check; } JsonAggState; -static void composite_to_json(Datum composite, StringInfo result, - bool use_line_feeds); static void array_dim_to_json(StringInfo result, int dim, int ndims, int *dims, Datum *vals, bool *nulls, int *valcount, JsonTypeCategory tcategory, Oid outfuncoid, @@ -507,8 +505,9 @@ array_to_json_internal(Datum array, StringInfo result, bool use_line_feeds) /* * Turn a composite / record into JSON. + * Exported so COPY TO can use it. */ -static void +void composite_to_json(Datum composite, StringInfo result, bool use_line_feeds) { HeapTupleHeader td; diff --git a/src/include/utils/json.h b/src/include/utils/json.h index 6d7f1b387d1ed..d5631171add4b 100644 --- a/src/include/utils/json.h +++ b/src/include/utils/json.h @@ -17,6 +17,8 @@ #include "lib/stringinfo.h" /* functions in json.c */ +extern void composite_to_json(Datum composite, StringInfo result, + bool use_line_feeds); extern void escape_json(StringInfo buf, const char *str); extern char *JsonEncodeDateTime(char *buf, Datum value, Oid typid, const int *tzp);