diff --git a/contrib/Makefile b/contrib/Makefile index da4e2316a3b0c..82cc496aa2e1f 100644 --- a/contrib/Makefile +++ b/contrib/Makefile @@ -32,6 +32,7 @@ SUBDIRS = \ pageinspect \ passwordcheck \ pg_buffercache \ + pg_copy_json \ pg_freespacemap \ pg_prewarm \ pg_stat_statements \ diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c index 249d82d3a0593..9e4e819858979 100644 --- a/contrib/file_fdw/file_fdw.c +++ b/contrib/file_fdw/file_fdw.c @@ -329,7 +329,7 @@ file_fdw_validator(PG_FUNCTION_ARGS) /* * Now apply the core COPY code's validation logic for more checks. */ - ProcessCopyOptions(NULL, NULL, true, other_options); + ProcessCopyOptions(NULL, NULL, true, NULL, other_options); /* * Either filename or program option is required for file_fdw foreign diff --git a/contrib/meson.build b/contrib/meson.build index c12dc906ca765..38933d15d122e 100644 --- a/contrib/meson.build +++ b/contrib/meson.build @@ -45,6 +45,7 @@ subdir('oid2name') subdir('pageinspect') subdir('passwordcheck') subdir('pg_buffercache') +subdir('pg_copy_json') subdir('pgcrypto') subdir('pg_freespacemap') subdir('pg_prewarm') diff --git a/contrib/pg_copy_json/.gitignore b/contrib/pg_copy_json/.gitignore new file mode 100644 index 0000000000000..5dcb3ff972350 --- /dev/null +++ b/contrib/pg_copy_json/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/contrib/pg_copy_json/Makefile b/contrib/pg_copy_json/Makefile new file mode 100644 index 0000000000000..b0a348d618523 --- /dev/null +++ b/contrib/pg_copy_json/Makefile @@ -0,0 +1,23 @@ +# contrib/pg_copy_json//Makefile + +MODULE_big = pg_copy_json +OBJS = \ + $(WIN32RES) \ + pg_copy_json.o +PGFILEDESC = "pg_copy_json - COPY TO JSON (JavaScript Object Notation) format" + +EXTENSION = pg_copy_json +DATA = pg_copy_json--1.0.sql + +REGRESS = test_copy_format + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/pg_copy_json +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/pg_copy_json/expected/pg_copy_json.out b/contrib/pg_copy_json/expected/pg_copy_json.out new file mode 100644 index 0000000000000..73633c23033aa --- /dev/null +++ b/contrib/pg_copy_json/expected/pg_copy_json.out @@ -0,0 +1,80 @@ +-- +-- COPY TO JSON +-- +CREATE EXTENSION pg_copy_json; +-- test copying in JSON format with various styles +-- of embedded line ending characters +create temp table copytest ( + style text, + test text, + filler int); +insert into copytest values('DOS',E'abc\r\ndef',1); +insert into copytest values('Unix',E'abc\ndef',2); +insert into copytest values('Mac',E'abc\rdef',3); +insert into copytest values(E'esc\\ape',E'a\\r\\\r\\\n\\nb',4); +copy copytest to stdout with (format 'json'); +{"style":"DOS","test":"abc\r\ndef","filler":1} +{"style":"Unix","test":"abc\ndef","filler":2} +{"style":"Mac","test":"abc\rdef","filler":3} +{"style":"esc\\ape","test":"a\\r\\\r\\\n\\nb","filler":4} +-- pg_copy_json do not support COPY FROM +copy copytest from stdout with (format 'json'); +ERROR: cannot use JSON mode in COPY FROM +-- test copying in JSON format with various styles +-- of embedded escaped characters +create temp table copyjsontest ( + id bigserial, + f1 text, + f2 timestamptz); +insert into copyjsontest + select g.i, + CASE WHEN g.i % 2 = 0 THEN + 'line with '' in it: ' || g.i::text + ELSE + 'line with " in it: ' || g.i::text + END, + 'Mon Feb 10 17:32:01 1997 PST' + from generate_series(1,5) as g(i); +insert into copyjsontest (f1) values +(E'aaa\"bbb'::text), +(E'aaa\\bbb'::text), +(E'aaa\/bbb'::text), +(E'aaa\bbbb'::text), +(E'aaa\fbbb'::text), +(E'aaa\nbbb'::text), +(E'aaa\rbbb'::text), +(E'aaa\tbbb'::text); +copy copyjsontest to stdout with (format 'json'); +{"id":1,"f1":"line with \" in it: 1","f2":"1997-02-10T17:32:01-08:00"} +{"id":2,"f1":"line with ' in it: 2","f2":"1997-02-10T17:32:01-08:00"} +{"id":3,"f1":"line with \" in it: 3","f2":"1997-02-10T17:32:01-08:00"} +{"id":4,"f1":"line with ' in it: 4","f2":"1997-02-10T17:32:01-08:00"} +{"id":5,"f1":"line with \" in it: 5","f2":"1997-02-10T17:32:01-08:00"} +{"id":1,"f1":"aaa\"bbb","f2":null} +{"id":2,"f1":"aaa\\bbb","f2":null} +{"id":3,"f1":"aaa/bbb","f2":null} +{"id":4,"f1":"aaa\bbbb","f2":null} +{"id":5,"f1":"aaa\fbbb","f2":null} +{"id":6,"f1":"aaa\nbbb","f2":null} +{"id":7,"f1":"aaa\rbbb","f2":null} +{"id":8,"f1":"aaa\tbbb","f2":null} +-- test force array +copy copytest to stdout (format 'json', force_array); +[ + {"style":"DOS","test":"abc\r\ndef","filler":1} +,{"style":"Unix","test":"abc\ndef","filler":2} +,{"style":"Mac","test":"abc\rdef","filler":3} +,{"style":"esc\\ape","test":"a\\r\\\r\\\n\\nb","filler":4} +] +copy copytest to stdout (format 'json', force_array true); +[ + {"style":"DOS","test":"abc\r\ndef","filler":1} +,{"style":"Unix","test":"abc\ndef","filler":2} +,{"style":"Mac","test":"abc\rdef","filler":3} +,{"style":"esc\\ape","test":"a\\r\\\r\\\n\\nb","filler":4} +] +copy copytest to stdout (format 'json', force_array false); +{"style":"DOS","test":"abc\r\ndef","filler":1} +{"style":"Unix","test":"abc\ndef","filler":2} +{"style":"Mac","test":"abc\rdef","filler":3} +{"style":"esc\\ape","test":"a\\r\\\r\\\n\\nb","filler":4} diff --git a/contrib/pg_copy_json/meson.build b/contrib/pg_copy_json/meson.build new file mode 100644 index 0000000000000..71f9338267eda --- /dev/null +++ b/contrib/pg_copy_json/meson.build @@ -0,0 +1,34 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +pg_copy_json_sources = files( + 'pg_copy_json.c', +) + +if host_system == 'windows' + pg_copy_json_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'pg_copy_json', + '--FILEDESC', 'pg_copy_json - COPY TO JSON format',]) +endif + +pg_copy_json = shared_module('pg_copy_json', + pg_copy_json_sources, + kwargs: contrib_mod_args, +) +contrib_targets += pg_copy_json + +install_data( + 'pg_copy_json--1.0.sql', + 'pg_copy_json.control', + kwargs: contrib_data_args, +) + +tests += { + 'name': 'pg_copy_json', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'regress': { + 'sql': [ + 'pg_copy_json', + ], + }, +} diff --git a/contrib/pg_copy_json/pg_copy_json--1.0.sql b/contrib/pg_copy_json/pg_copy_json--1.0.sql new file mode 100644 index 0000000000000..d738a1e7e9f8f --- /dev/null +++ b/contrib/pg_copy_json/pg_copy_json--1.0.sql @@ -0,0 +1,9 @@ +/* contrib/pg_copy_json/copy_json--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION pg_copy_json" to load this file. \quit + +CREATE FUNCTION pg_catalog.json(internal) + RETURNS copy_handler + AS 'MODULE_PATHNAME', 'copy_json' + LANGUAGE C; diff --git a/contrib/pg_copy_json/pg_copy_json.c b/contrib/pg_copy_json/pg_copy_json.c new file mode 100644 index 0000000000000..cbfdee8e8b974 --- /dev/null +++ b/contrib/pg_copy_json/pg_copy_json.c @@ -0,0 +1,218 @@ +/*-------------------------------------------------------------------------- + * + * pg_copy_json.c + * COPY TO JSON (JavaScript Object Notation) format. + * + * Portions Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/test_copy_format.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "commands/copy.h" +#include "commands/defrem.h" +#include "funcapi.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "utils/json.h" + +PG_MODULE_MAGIC; + +typedef struct +{ + /* + * Force output of square brackets as array decorations at the beginning + * and end of output, with commas between the rows. + */ + bool force_array; + bool force_array_specified; + + /* need delimiter to start next json array element */ + bool json_row_delim_needed; +} CopyJsonData; + +static inline void +InitCopyJsonData(CopyJsonData *p) +{ + Assert(p); + p->force_array = false; + p->force_array_specified = false; + p->json_row_delim_needed = false; +} + +static void +CopyToJsonSendEndOfRow(CopyToState cstate) +{ + switch (cstate->copy_dest) + { + case COPY_DEST_FILE: + /* Default line termination depends on platform */ +#ifndef WIN32 + CopySendChar(cstate, '\n'); +#else + CopySendString(cstate, "\r\n"); +#endif + break; + case COPY_DEST_FRONTEND: + /* The FE/BE protocol uses \n as newline for all platforms */ + CopySendChar(cstate, '\n'); + break; + default: + break; + } + CopyToStateFlush(cstate); +} + +static bool +CopyToJsonProcessOption(CopyToState cstate, DefElem *defel) +{ + CopyJsonData *p; + + if (cstate->opaque == NULL) + { + MemoryContext oldcontext; + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + cstate->opaque = palloc0(sizeof(CopyJsonData)); + MemoryContextSwitchTo(oldcontext); + InitCopyJsonData(cstate->opaque); + } + + p = (CopyJsonData *)cstate->opaque; + + if (strcmp(defel->defname, "force_array") == 0) + { + if (p->force_array_specified) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("CopyToJsonProcessOption: redundant options \"%s\"=\"%s\"", + defel->defname, defGetString(defel))); + p->force_array_specified = true; + p->force_array = defGetBoolean(defel); + + return true; + } + + return false; +} + +static void +CopyToJsonSendCopyBegin(CopyToState cstate) +{ + StringInfoData buf; + int16 format = 0; + + pq_beginmessage(&buf, PqMsg_CopyOutResponse); + pq_sendbyte(&buf, format); /* overall format */ + /* + * JSON mode is always one non-binary column + */ + pq_sendint16(&buf, 1); + pq_sendint16(&buf, 0); + pq_endmessage(&buf); +} + +static void +CopyToJsonStart(CopyToState cstate, TupleDesc tupDesc) +{ + CopyJsonData *p; + + if (cstate->opaque == NULL) + { + MemoryContext oldcontext; + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + cstate->opaque = palloc0(sizeof(CopyJsonData)); + MemoryContextSwitchTo(oldcontext); + InitCopyJsonData(cstate->opaque); + } + + /* No need to alloc cstate->out_functions */ + + p = (CopyJsonData *)cstate->opaque; + + /* If FORCE_ARRAY has been specified send the open bracket. */ + if (p->force_array) + { + CopySendChar(cstate, '['); + CopyToJsonSendEndOfRow(cstate); + } +} + +static void +CopyToJsonOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + Datum rowdata; + StringInfo result; + CopyJsonData *p; + + Assert(cstate->opaque); + p = (CopyJsonData *)cstate->opaque; + + if(!cstate->rel) + { + for (int i = 0; i < slot->tts_tupleDescriptor->natts; i++) + { + /* Flat-copy the attribute array */ + memcpy(TupleDescAttr(slot->tts_tupleDescriptor, i), + TupleDescAttr(cstate->queryDesc->tupDesc, i), + 1 * sizeof(FormData_pg_attribute)); + } + BlessTupleDesc(slot->tts_tupleDescriptor); + } + rowdata = ExecFetchSlotHeapTupleDatum(slot); + result = makeStringInfo(); + composite_to_json(rowdata, result, false); + + if (p->json_row_delim_needed) + CopySendChar(cstate, ','); + else if (p->force_array) + { + /* first row needs no delimiter */ + CopySendChar(cstate, ' '); + p->json_row_delim_needed = true; + } + CopySendData(cstate, result->data, result->len); + CopyToJsonSendEndOfRow(cstate); +} + +static void +CopyToJsonEnd(CopyToState cstate) +{ + CopyJsonData *p; + + Assert(cstate->opaque); + p = (CopyJsonData *)cstate->opaque; + + /* If FORCE_ARRAY has been specified send the close bracket. */ + if (p->force_array) + { + CopySendChar(cstate, ']'); + CopyToJsonSendEndOfRow(cstate); + } +} + +static const CopyToRoutine CopyToRoutineJson = { + .type = T_CopyToRoutine, + .CopyToProcessOption = CopyToJsonProcessOption, + .CopyToSendCopyBegin = CopyToJsonSendCopyBegin, + .CopyToStart = CopyToJsonStart, + .CopyToOneRow = CopyToJsonOneRow, + .CopyToEnd = CopyToJsonEnd, +}; + +PG_FUNCTION_INFO_V1(copy_json); +Datum +copy_json(PG_FUNCTION_ARGS) +{ + bool is_from = PG_GETARG_BOOL(0); + + if (is_from) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use JSON mode in COPY FROM"))); + + PG_RETURN_POINTER(&CopyToRoutineJson); +} diff --git a/contrib/pg_copy_json/pg_copy_json.control b/contrib/pg_copy_json/pg_copy_json.control new file mode 100644 index 0000000000000..90b0a74603b4f --- /dev/null +++ b/contrib/pg_copy_json/pg_copy_json.control @@ -0,0 +1,5 @@ +# pg_copy_json extension +comment = 'COPY TO JSON format' +default_version = '1.0' +module_pathname = '$libdir/pg_copy_json' +relocatable = true diff --git a/contrib/pg_copy_json/sql/pg_copy_json.sql b/contrib/pg_copy_json/sql/pg_copy_json.sql new file mode 100644 index 0000000000000..73e7e514ac7c8 --- /dev/null +++ b/contrib/pg_copy_json/sql/pg_copy_json.sql @@ -0,0 +1,59 @@ +-- +-- COPY TO JSON +-- + +CREATE EXTENSION pg_copy_json; + +-- test copying in JSON format with various styles +-- of embedded line ending characters + +create temp table copytest ( + style text, + test text, + filler int); + +insert into copytest values('DOS',E'abc\r\ndef',1); +insert into copytest values('Unix',E'abc\ndef',2); +insert into copytest values('Mac',E'abc\rdef',3); +insert into copytest values(E'esc\\ape',E'a\\r\\\r\\\n\\nb',4); + +copy copytest to stdout with (format 'json'); + +-- pg_copy_json do not support COPY FROM +copy copytest from stdout with (format 'json'); + +-- test copying in JSON format with various styles +-- of embedded escaped characters + +create temp table copyjsontest ( + id bigserial, + f1 text, + f2 timestamptz); + +insert into copyjsontest + select g.i, + CASE WHEN g.i % 2 = 0 THEN + 'line with '' in it: ' || g.i::text + ELSE + 'line with " in it: ' || g.i::text + END, + 'Mon Feb 10 17:32:01 1997 PST' + from generate_series(1,5) as g(i); + +insert into copyjsontest (f1) values +(E'aaa\"bbb'::text), +(E'aaa\\bbb'::text), +(E'aaa\/bbb'::text), +(E'aaa\bbbb'::text), +(E'aaa\fbbb'::text), +(E'aaa\nbbb'::text), +(E'aaa\rbbb'::text), +(E'aaa\tbbb'::text); + +copy copyjsontest to stdout with (format 'json'); + +-- test force array + +copy copytest to stdout (format 'json', force_array); +copy copytest to stdout (format 'json', force_array true); +copy copytest to stdout (format 'json', force_array false); diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index cc0786c6f4aec..479f36868c6bd 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -32,6 +32,7 @@ #include "parser/parse_coerce.h" #include "parser/parse_collate.h" #include "parser/parse_expr.h" +#include "parser/parse_func.h" #include "parser/parse_relation.h" #include "rewrite/rewriteHandler.h" #include "utils/acl.h" @@ -430,6 +431,90 @@ defGetCopyOnErrorChoice(DefElem *def, ParseState *pstate, bool is_from) return COPY_ON_ERROR_STOP; /* keep compiler quiet */ } +/* + * Process the "format" option. + * + * This function checks whether the option value is a built-in format such as + * "text" and "csv" or not. If the option value isn't a built-in format, this + * function finds a COPY format handler that returns a CopyToRoutine. If no + * COPY format handler is found, this function reports an error. + */ +static void +ProcessCopyOptionCustomFormat(ParseState *pstate, + CopyFormatOptions *opts_out, + bool is_from, + DefElem *defel) +{ + char *format; + Oid funcargtypes[1]; + Oid handlerOid = InvalidOid; + Datum datum; + void *routine; + + format = defGetString(defel); + + /* built-in formats */ + if (strcmp(format, "text") == 0) + /* default format */ return; + else if (strcmp(format, "csv") == 0) + { + opts_out->csv_mode = true; + opts_out->from_routine = &CopyFromRoutineCSV; + opts_out->to_routine = &CopyToRoutineCSV; + return; + } + else if (strcmp(format, "binary") == 0) + { + opts_out->binary = true; + opts_out->from_routine = &CopyFromRoutineBinary; + opts_out->to_routine = &CopyToRoutineBinary; + return; + } + + /* custom format */ + funcargtypes[0] = INTERNALOID; + handlerOid = LookupFuncName(list_make1(makeString(format)), 1, + funcargtypes, true); + if (!OidIsValid(handlerOid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY format \"%s\" not recognized", format), + parser_errposition(pstate, defel->location))); + + datum = OidFunctionCall1(handlerOid, BoolGetDatum(is_from)); + routine = DatumGetPointer(datum); + if (is_from) + { + if (routine == NULL || !IsA(routine, CopyFromRoutine)) + ereport( + ERROR, + (errcode( + ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY handler function " + "%s(%u) did not return a " + "CopyFromRoutine struct", + format, handlerOid), + parser_errposition( + pstate, defel->location))); + opts_out->from_routine = routine; + } + else + { + if (routine == NULL || !IsA(routine, CopyToRoutine)) + ereport( + ERROR, + (errcode( + ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY handler function " + "%s(%u) did not return a " + "CopyToRoutine struct", + format, handlerOid), + parser_errposition( + pstate, defel->location))); + opts_out->to_routine = routine; + } +} + /* * Process the statement option list for COPY. * @@ -442,6 +527,9 @@ defGetCopyOnErrorChoice(DefElem *def, ParseState *pstate, bool is_from) * a list of options. In that usage, 'opts_out' can be passed as NULL and * the collected data is just leaked until CurrentMemoryContext is reset. * + * 'cstate' is CopyToState* for !is_from, CopyFromState* for is_from. 'cstate' + * may be NULL. For example, file_fdw uses NULL. + * * Note that additional checking, such as whether column names listed in FORCE * QUOTE actually exist, has to be applied later. This just checks for * self-consistency of the options list. @@ -450,6 +538,7 @@ void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, + void *cstate, List *options) { bool format_specified = false; @@ -464,30 +553,33 @@ ProcessCopyOptions(ParseState *pstate, opts_out->file_encoding = -1; - /* Extract options from the statement node tree */ + /* Text is the default format. */ + opts_out->from_routine = &CopyFromRoutineText; + opts_out->to_routine = &CopyToRoutineText; + + /* + * Extract only the "format" option to detect target routine as the first + * step + */ foreach(option, options) { DefElem *defel = lfirst_node(DefElem, option); if (strcmp(defel->defname, "format") == 0) { - char *fmt = defGetString(defel); - if (format_specified) errorConflictingDefElem(defel, pstate); format_specified = true; - if (strcmp(fmt, "text") == 0) - /* default format */ ; - else if (strcmp(fmt, "csv") == 0) - opts_out->csv_mode = true; - else if (strcmp(fmt, "binary") == 0) - opts_out->binary = true; - else - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("COPY format \"%s\" not recognized", fmt), - parser_errposition(pstate, defel->location))); + ProcessCopyOptionCustomFormat(pstate, opts_out, is_from, defel); } + } + /* Extract options except "format" from the statement node tree */ + foreach(option, options) + { + DefElem *defel = lfirst_node(DefElem, option); + + if (strcmp(defel->defname, "format") == 0) + continue; else if (strcmp(defel->defname, "freeze") == 0) { if (freeze_specified) @@ -616,11 +708,22 @@ ProcessCopyOptions(ParseState *pstate, opts_out->on_error = defGetCopyOnErrorChoice(defel, pstate, is_from); } else - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("option \"%s\" not recognized", - defel->defname), - parser_errposition(pstate, defel->location))); + { + bool processed = false; + + if (is_from) + processed = + opts_out->from_routine->CopyFromProcessOption( + cstate, defel); + else + processed = opts_out->to_routine->CopyToProcessOption(cstate, defel); + if (!processed) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("option \"%s\" not recognized", + defel->defname), + parser_errposition(pstate, defel->location))); + } } /* diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 1fe70b9133827..b4ac7cbd2c24f 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -108,6 +108,170 @@ static char *limit_printout_length(const char *str); static void ClosePipeFromProgram(CopyFromState cstate); + +/* + * CopyFromRoutine implementations. + */ + +/* + * CopyFromRoutine implementation for "text" and "csv". CopyFromText*() + * refer cstate->opts.csv_mode and change their behavior. We can split this + * implementation and stop referring cstate->opts.csv_mode later. + */ + +/* All "text" and "csv" options are parsed in ProcessCopyOptions(). We may + * move the code to here later. */ +static bool +CopyFromTextProcessOption(CopyFromState cstate, DefElem *defel) +{ + return false; +} + +static int16 +CopyFromTextGetFormat(CopyFromState cstate) +{ + return 0; +} + +static void +CopyFromTextStart(CopyFromState cstate, TupleDesc tupDesc) +{ + AttrNumber num_phys_attrs = tupDesc->natts; + AttrNumber attr_count; + + /* + * If encoding conversion is needed, we need another buffer to hold the + * converted input data. Otherwise, we can just point input_buf to the + * same buffer as raw_buf. + */ + if (cstate->need_transcoding) + { + cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); + cstate->input_buf_index = cstate->input_buf_len = 0; + } + else + cstate->input_buf = cstate->raw_buf; + cstate->input_reached_eof = false; + + initStringInfo(&cstate->line_buf); + + /* + * Pick up the required catalog information for each attribute in the + * relation, including the input function, the element type (to pass to + * the input function). + */ + cstate->in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + cstate->typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); + for (int attnum = 1; attnum <= num_phys_attrs; attnum++) + { + Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1); + Oid in_func_oid; + + /* We don't need info for dropped attributes */ + if (att->attisdropped) + continue; + + /* Fetch the input function and typioparam info */ + getTypeInputInfo(att->atttypid, + &in_func_oid, &cstate->typioparams[attnum - 1]); + fmgr_info(in_func_oid, &cstate->in_functions[attnum - 1]); + } + + /* create workspace for CopyReadAttributes results */ + attr_count = list_length(cstate->attnumlist); + cstate->max_fields = attr_count; + cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); +} + +static void +CopyFromTextEnd(CopyFromState cstate) +{ +} + +/* + * CopyFromRoutine implementation for "binary". + */ + +/* All "binary" options are parsed in ProcessCopyOptions(). We may move the + * code to here later. */ +static bool +CopyFromBinaryProcessOption(CopyFromState cstate, DefElem *defel) +{ + return false; +} + +static int16 +CopyFromBinaryGetFormat(CopyFromState cstate) +{ + return 1; +} + +static void +CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc) +{ + AttrNumber num_phys_attrs = tupDesc->natts; + + /* + * Pick up the required catalog information for each attribute in the + * relation, including the input function, the element type (to pass to + * the input function). + */ + cstate->in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + cstate->typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); + for (int attnum = 1; attnum <= num_phys_attrs; attnum++) + { + Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1); + Oid in_func_oid; + + /* We don't need info for dropped attributes */ + if (att->attisdropped) + continue; + + /* Fetch the input function and typioparam info */ + getTypeBinaryInputInfo(att->atttypid, + &in_func_oid, &cstate->typioparams[attnum - 1]); + fmgr_info(in_func_oid, &cstate->in_functions[attnum - 1]); + } + + /* Read and verify binary header */ + ReceiveCopyBinaryHeader(cstate); +} + +static void +CopyFromBinaryEnd(CopyFromState cstate) +{ +} + +CopyFromRoutine CopyFromRoutineText = { + .CopyFromProcessOption = CopyFromTextProcessOption, + .CopyFromGetFormat = CopyFromTextGetFormat, + .CopyFromStart = CopyFromTextStart, + .CopyFromOneRow = CopyFromTextOneRow, + .CopyFromEnd = CopyFromTextEnd, +}; + +/* + * We can use the same CopyFromRoutine for both of "text" and "csv" because + * CopyFromText*() refer cstate->opts.csv_mode and change their behavior. We can + * split the implementations and stop referring cstate->opts.csv_mode later. + */ +CopyFromRoutine CopyFromRoutineCSV = { + .CopyFromProcessOption = CopyFromTextProcessOption, + .CopyFromGetFormat = CopyFromTextGetFormat, + .CopyFromStart = CopyFromTextStart, + .CopyFromOneRow = CopyFromTextOneRow, + .CopyFromEnd = CopyFromTextEnd, +}; + +CopyFromRoutine CopyFromRoutineBinary = { + .CopyFromProcessOption = CopyFromBinaryProcessOption, + .CopyFromGetFormat = CopyFromBinaryGetFormat, + .CopyFromStart = CopyFromBinaryStart, + .CopyFromOneRow = CopyFromBinaryOneRow, + .CopyFromEnd = CopyFromBinaryEnd, +}; + + /* * error context callback for COPY FROM * @@ -1384,9 +1548,6 @@ BeginCopyFrom(ParseState *pstate, TupleDesc tupDesc; AttrNumber num_phys_attrs, num_defaults; - FmgrInfo *in_functions; - Oid *typioparams; - Oid in_func_oid; int *defmap; ExprState **defexprs; MemoryContext oldcontext; @@ -1416,7 +1577,7 @@ BeginCopyFrom(ParseState *pstate, oldcontext = MemoryContextSwitchTo(cstate->copycontext); /* Extract options from the statement node tree */ - ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options); + ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , cstate, options); /* Process the target relation */ cstate->rel = rel; @@ -1549,7 +1710,7 @@ BeginCopyFrom(ParseState *pstate, pg_encoding_to_char(GetDatabaseEncoding())))); } - cstate->copy_src = COPY_FILE; /* default */ + cstate->copy_src = COPY_SOURCE_FILE; /* default */ cstate->whereClause = whereClause; @@ -1571,25 +1732,6 @@ BeginCopyFrom(ParseState *pstate, cstate->raw_buf_index = cstate->raw_buf_len = 0; cstate->raw_reached_eof = false; - if (!cstate->opts.binary) - { - /* - * If encoding conversion is needed, we need another buffer to hold - * the converted input data. Otherwise, we can just point input_buf - * to the same buffer as raw_buf. - */ - if (cstate->need_transcoding) - { - cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); - cstate->input_buf_index = cstate->input_buf_len = 0; - } - else - cstate->input_buf = cstate->raw_buf; - cstate->input_reached_eof = false; - - initStringInfo(&cstate->line_buf); - } - initStringInfo(&cstate->attribute_buf); /* Assign range table and rteperminfos, we'll need them in CopyFrom. */ @@ -1608,8 +1750,6 @@ BeginCopyFrom(ParseState *pstate, * the input function), and info about defaults and constraints. (Which * input function we use depends on text/binary format choice.) */ - in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); - typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); defmap = (int *) palloc(num_phys_attrs * sizeof(int)); defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); @@ -1621,15 +1761,6 @@ BeginCopyFrom(ParseState *pstate, if (att->attisdropped) continue; - /* Fetch the input function and typioparam info */ - if (cstate->opts.binary) - getTypeBinaryInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - else - getTypeInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); - /* Get default info if available */ defexprs[attnum - 1] = NULL; @@ -1689,8 +1820,6 @@ BeginCopyFrom(ParseState *pstate, cstate->bytes_processed = 0; /* We keep those variables in cstate. */ - cstate->in_functions = in_functions; - cstate->typioparams = typioparams; cstate->defmap = defmap; cstate->defexprs = defexprs; cstate->volatile_defexprs = volatile_defexprs; @@ -1700,7 +1829,7 @@ BeginCopyFrom(ParseState *pstate, if (data_source_cb) { progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; - cstate->copy_src = COPY_CALLBACK; + cstate->copy_src = COPY_SOURCE_CALLBACK; cstate->data_source_cb = data_source_cb; } else if (pipe) @@ -1763,20 +1892,7 @@ BeginCopyFrom(ParseState *pstate, pgstat_progress_update_multi_param(3, progress_cols, progress_vals); - if (cstate->opts.binary) - { - /* Read and verify binary header */ - ReceiveCopyBinaryHeader(cstate); - } - - /* create workspace for CopyReadAttributes results */ - if (!cstate->opts.binary) - { - AttrNumber attr_count = list_length(cstate->attnumlist); - - cstate->max_fields = attr_count; - cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); - } + cstate->opts.from_routine->CopyFromStart(cstate, tupDesc); MemoryContextSwitchTo(oldcontext); @@ -1789,6 +1905,8 @@ BeginCopyFrom(ParseState *pstate, void EndCopyFrom(CopyFromState cstate) { + cstate->opts.from_routine->CopyFromEnd(cstate); + /* No COPY FROM related resources except memory. */ if (cstate->is_program) { diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 7cacd0b752c98..f8a194635d65e 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -165,14 +165,13 @@ static int CopyGetData(CopyFromState cstate, void *databuf, static inline bool CopyGetInt32(CopyFromState cstate, int32 *val); static inline bool CopyGetInt16(CopyFromState cstate, int16 *val); static void CopyLoadInputBuf(CopyFromState cstate); -static int CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes); void ReceiveCopyBegin(CopyFromState cstate) { StringInfoData buf; int natts = list_length(cstate->attnumlist); - int16 format = (cstate->opts.binary ? 1 : 0); + int16 format = cstate->opts.from_routine->CopyFromGetFormat(cstate); int i; pq_beginmessage(&buf, PqMsg_CopyInResponse); @@ -181,7 +180,7 @@ ReceiveCopyBegin(CopyFromState cstate) for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); - cstate->copy_src = COPY_FRONTEND; + cstate->copy_src = COPY_SOURCE_FRONTEND; cstate->fe_msgbuf = makeStringInfo(); /* We *must* flush here to ensure FE knows it can send. */ pq_flush(); @@ -194,7 +193,7 @@ ReceiveCopyBinaryHeader(CopyFromState cstate) int32 tmp; /* Signature */ - if (CopyReadBinaryData(cstate, readSig, 11) != 11 || + if (CopyFromStateRead(cstate, readSig, 11) != 11 || memcmp(readSig, BinarySignature, 11) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), @@ -222,7 +221,7 @@ ReceiveCopyBinaryHeader(CopyFromState cstate) /* Skip extension header, if present */ while (tmp-- > 0) { - if (CopyReadBinaryData(cstate, readSig, 1) != 1) + if (CopyFromStateRead(cstate, readSig, 1) != 1) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (wrong length)"))); @@ -249,7 +248,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) switch (cstate->copy_src) { - case COPY_FILE: + case COPY_SOURCE_FILE: bytesread = fread(databuf, 1, maxread, cstate->copy_file); if (ferror(cstate->copy_file)) ereport(ERROR, @@ -258,7 +257,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) if (bytesread == 0) cstate->raw_reached_eof = true; break; - case COPY_FRONTEND: + case COPY_SOURCE_FRONTEND: while (maxread > 0 && bytesread < minread && !cstate->raw_reached_eof) { int avail; @@ -341,7 +340,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) bytesread += avail; } break; - case COPY_CALLBACK: + case COPY_SOURCE_CALLBACK: bytesread = cstate->data_source_cb(databuf, minread, maxread); break; } @@ -364,7 +363,7 @@ CopyGetInt32(CopyFromState cstate, int32 *val) { uint32 buf; - if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) + if (CopyFromStateRead(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) { *val = 0; /* suppress compiler warning */ return false; @@ -381,7 +380,7 @@ CopyGetInt16(CopyFromState cstate, int16 *val) { uint16 buf; - if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) + if (CopyFromStateRead(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) { *val = 0; /* suppress compiler warning */ return false; @@ -692,14 +691,14 @@ CopyLoadInputBuf(CopyFromState cstate) } /* - * CopyReadBinaryData + * CopyFromStateRead * * Reads up to 'nbytes' bytes from cstate->copy_file via cstate->raw_buf * and writes them to 'dest'. Returns the number of bytes read (which * would be less than 'nbytes' only if we reach EOF). */ -static int -CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes) +int +CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes) { int copied_bytes = 0; @@ -840,199 +839,219 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) return true; } -/* - * Read next tuple from file for COPY FROM. Return false if no more tuples. - * - * 'econtext' is used to evaluate default expression for each column that is - * either not read from the file or is using the DEFAULT option of COPY FROM. - * It can be NULL when no default values are used, i.e. when all columns are - * read from the file, and DEFAULT option is unset. - * - * 'values' and 'nulls' arrays must be the same length as columns of the - * relation passed to BeginCopyFrom. This function fills the arrays. - */ bool -NextCopyFrom(CopyFromState cstate, ExprContext *econtext, - Datum *values, bool *nulls) +CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls) { TupleDesc tupDesc; - AttrNumber num_phys_attrs, - attr_count, - num_defaults = cstate->num_defaults; + AttrNumber attr_count; FmgrInfo *in_functions = cstate->in_functions; Oid *typioparams = cstate->typioparams; - int i; - int *defmap = cstate->defmap; ExprState **defexprs = cstate->defexprs; + char **field_strings; + ListCell *cur; + int fldct; + int fieldno; + char *string; tupDesc = RelationGetDescr(cstate->rel); - num_phys_attrs = tupDesc->natts; attr_count = list_length(cstate->attnumlist); - /* Initialize all values for row to NULL */ - MemSet(values, 0, num_phys_attrs * sizeof(Datum)); - MemSet(nulls, true, num_phys_attrs * sizeof(bool)); - MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool)); + /* read raw fields in the next line */ + if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) + return false; - if (!cstate->opts.binary) - { - char **field_strings; - ListCell *cur; - int fldct; - int fieldno; - char *string; + /* check for overflowing fields */ + if (attr_count > 0 && fldct > attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); - /* read raw fields in the next line */ - if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) - return false; + fieldno = 0; + + /* Loop to read the user attributes on the line. */ + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); - /* check for overflowing fields */ - if (attr_count > 0 && fldct > attr_count) + if (fieldno >= fldct) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); - - fieldno = 0; + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + string = field_strings[fieldno++]; - /* Loop to read the user attributes on the line. */ - foreach(cur, cstate->attnumlist) + if (cstate->convert_select_flags && + !cstate->convert_select_flags[m]) { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); - string = field_strings[fieldno++]; - - if (cstate->convert_select_flags && - !cstate->convert_select_flags[m]) - { - /* ignore input field, leaving column as NULL */ - continue; - } + /* ignore input field, leaving column as NULL */ + continue; + } - if (cstate->opts.csv_mode) + if (cstate->opts.csv_mode) + { + if (string == NULL && + cstate->opts.force_notnull_flags[m]) { - if (string == NULL && - cstate->opts.force_notnull_flags[m]) - { - /* - * FORCE_NOT_NULL option is set and column is NULL - - * convert it to the NULL string. - */ - string = cstate->opts.null_print; - } - else if (string != NULL && cstate->opts.force_null_flags[m] - && strcmp(string, cstate->opts.null_print) == 0) - { - /* - * FORCE_NULL option is set and column matches the NULL - * string. It must have been quoted, or otherwise the - * string would already have been set to NULL. Convert it - * to NULL as specified. - */ - string = NULL; - } + /* + * FORCE_NOT_NULL option is set and column is NULL - convert + * it to the NULL string. + */ + string = cstate->opts.null_print; } - - cstate->cur_attname = NameStr(att->attname); - cstate->cur_attval = string; - - if (string != NULL) - nulls[m] = false; - - if (cstate->defaults[m]) + else if (string != NULL && cstate->opts.force_null_flags[m] + && strcmp(string, cstate->opts.null_print) == 0) { /* - * The caller must supply econtext and have switched into the - * per-tuple memory context in it. + * FORCE_NULL option is set and column matches the NULL + * string. It must have been quoted, or otherwise the string + * would already have been set to NULL. Convert it to NULL as + * specified. */ - Assert(econtext != NULL); - Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); - - values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); + string = NULL; } + } + + cstate->cur_attname = NameStr(att->attname); + cstate->cur_attval = string; + if (string != NULL) + nulls[m] = false; + + if (cstate->defaults[m]) + { /* - * If ON_ERROR is specified with IGNORE, skip rows with soft - * errors + * The caller must supply econtext and have switched into the + * per-tuple memory context in it. */ - else if (!InputFunctionCallSafe(&in_functions[m], - string, - typioparams[m], - att->atttypmod, - (Node *) cstate->escontext, - &values[m])) - { - cstate->num_errors++; - return true; - } + Assert(econtext != NULL); + Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; + values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); } - Assert(fieldno == attr_count); + /* + * If ON_ERROR is specified with IGNORE, skip rows with soft errors + */ + else if (!InputFunctionCallSafe(&in_functions[m], + string, + typioparams[m], + att->atttypmod, + (Node *) cstate->escontext, + &values[m])) + { + cstate->num_errors++; + return true; + } + + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; } - else - { - /* binary */ - int16 fld_count; - ListCell *cur; - cstate->cur_lineno++; + Assert(fieldno == attr_count); - if (!CopyGetInt16(cstate, &fld_count)) - { - /* EOF detected (end of file, or protocol-level EOF) */ - return false; - } + return true; +} - if (fld_count == -1) - { - /* - * Received EOF marker. Wait for the protocol-level EOF, and - * complain if it doesn't come immediately. In COPY FROM STDIN, - * this ensures that we correctly handle CopyFail, if client - * chooses to send that now. When copying from file, we could - * ignore the rest of the file like in text mode, but we choose to - * be consistent with the COPY FROM STDIN case. - */ - char dummy; +bool +CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + int16 fld_count; + ListCell *cur; - if (CopyReadBinaryData(cstate, &dummy, 1) > 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - return false; - } + tupDesc = RelationGetDescr(cstate->rel); + attr_count = list_length(cstate->attnumlist); + + cstate->cur_lineno++; + + if (!CopyGetInt16(cstate, &fld_count)) + { + /* EOF detected (end of file, or protocol-level EOF) */ + return false; + } - if (fld_count != attr_count) + if (fld_count == -1) + { + /* + * Received EOF marker. Wait for the protocol-level EOF, and complain + * if it doesn't come immediately. In COPY FROM STDIN, this ensures + * that we correctly handle CopyFail, if client chooses to send that + * now. When copying from file, we could ignore the rest of the file + * like in text mode, but we choose to be consistent with the COPY + * FROM STDIN case. + */ + char dummy; + + if (CopyFromStateRead(cstate, &dummy, 1) > 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); + errmsg("received copy data after EOF marker"))); + return false; + } - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - cstate->cur_attname = NameStr(att->attname); - values[m] = CopyReadBinaryAttribute(cstate, - &in_functions[m], - typioparams[m], - att->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; - } + if (fld_count != attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + cstate->cur_attname = NameStr(att->attname); + values[m] = CopyReadBinaryAttribute(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; } + return true; +} + +/* + * Read next tuple from file for COPY FROM. Return false if no more tuples. + * + * 'econtext' is used to evaluate default expression for each column that is + * either not read from the file or is using the DEFAULT option of COPY FROM. + * It can be NULL when no default values are used, i.e. when all columns are + * read from the file, and DEFAULT option is unset. + * + * 'values' and 'nulls' arrays must be the same length as columns of the + * relation passed to BeginCopyFrom. This function fills the arrays. + */ +bool +NextCopyFrom(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls) +{ + TupleDesc tupDesc; + AttrNumber num_phys_attrs, + num_defaults = cstate->num_defaults; + int i; + int *defmap = cstate->defmap; + ExprState **defexprs = cstate->defexprs; + + tupDesc = RelationGetDescr(cstate->rel); + num_phys_attrs = tupDesc->natts; + + /* Initialize all values for row to NULL */ + MemSet(values, 0, num_phys_attrs * sizeof(Datum)); + MemSet(nulls, true, num_phys_attrs * sizeof(bool)); + MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool)); + + if (!cstate->opts.from_routine->CopyFromOneRow(cstate, econtext, values, + nulls)) + return false; + /* * Now compute and insert any defaults available for the columns not * provided by the input data. Anything not processed here or above will @@ -1079,7 +1098,7 @@ CopyReadLine(CopyFromState cstate) * after \. up to the protocol end of copy data. (XXX maybe better * not to treat \. as special?) */ - if (cstate->copy_src == COPY_FRONTEND) + if (cstate->copy_src == COPY_SOURCE_FRONTEND) { int inbytes; @@ -1977,8 +1996,8 @@ CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo, resetStringInfo(&cstate->attribute_buf); enlargeStringInfo(&cstate->attribute_buf, fld_size); - if (CopyReadBinaryData(cstate, cstate->attribute_buf.data, - fld_size) != fld_size) + if (CopyFromStateRead(cstate, cstate->attribute_buf.data, + fld_size) != fld_size) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index d3dc3fc854f14..e2a4964015fdb 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -43,64 +43,6 @@ #include "utils/rel.h" #include "utils/snapmgr.h" -/* - * Represents the different dest cases we need to worry about at - * the bottom level - */ -typedef enum CopyDest -{ - COPY_FILE, /* to file (or a piped program) */ - COPY_FRONTEND, /* to frontend */ - COPY_CALLBACK, /* to callback function */ -} CopyDest; - -/* - * This struct contains all the state variables used throughout a COPY TO - * operation. - * - * Multi-byte encodings: all supported client-side encodings encode multi-byte - * characters by having the first byte's high bit set. Subsequent bytes of the - * character can have the high bit not set. When scanning data in such an - * encoding to look for a match to a single-byte (ie ASCII) character, we must - * use the full pg_encoding_mblen() machinery to skip over multibyte - * characters, else we might find a false match to a trailing byte. In - * supported server encodings, there is no possibility of a false match, and - * it's faster to make useless comparisons to trailing bytes than it is to - * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true - * when we have to do it the hard way. - */ -typedef struct CopyToStateData -{ - /* low-level state data */ - CopyDest copy_dest; /* type of copy source/destination */ - FILE *copy_file; /* used if copy_dest == COPY_FILE */ - StringInfo fe_msgbuf; /* used for all dests during COPY TO */ - - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy to */ - QueryDesc *queryDesc; /* executable query to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDOUT */ - bool is_program; /* is 'filename' a program to popen? */ - copy_data_dest_cb data_dest_cb; /* function for writing data */ - - CopyFormatOptions opts; - Node *whereClause; /* WHERE condition (or NULL) */ - - /* - * Working state - */ - MemoryContext copycontext; /* per-copy execution context */ - - FmgrInfo *out_functions; /* lookup info for output functions */ - MemoryContext rowcontext; /* per-row evaluation context */ - uint64 bytes_processed; /* number of bytes processed so far */ -} CopyToStateData; - /* DestReceiver for COPY (query) TO */ typedef struct { @@ -124,24 +66,181 @@ static void CopyAttributeOutCSV(CopyToState cstate, const char *string, /* Low-level communications functions */ static void SendCopyBegin(CopyToState cstate); static void SendCopyEnd(CopyToState cstate); -static void CopySendData(CopyToState cstate, const void *databuf, int datasize); -static void CopySendString(CopyToState cstate, const char *str); -static void CopySendChar(CopyToState cstate, char c); -static void CopySendEndOfRow(CopyToState cstate); -static void CopySendInt32(CopyToState cstate, int32 val); -static void CopySendInt16(CopyToState cstate, int16 val); +/* + * CopyToRoutine implementations. + */ /* - * Send copy start/stop messages for frontend copies. These have changed - * in past protocol redesigns. + * CopyToRoutine implementation for "text" and "csv". CopyToText*() + * refer cstate->opts.csv_mode and change their behavior. We can split this + * implementation and stop referring cstate->opts.csv_mode later. */ + +/* All "text" and "csv" options are parsed in ProcessCopyOptions(). We may + * move the code to here later. */ +static bool +CopyToTextProcessOption(CopyToState cstate, DefElem *defel) +{ + return false; +} + static void -SendCopyBegin(CopyToState cstate) +CopyToTextSendCopyBegin(CopyToState cstate) +{ + StringInfoData buf; + int natts = list_length(cstate->attnumlist); + int16 format = 0; + int i; + + pq_beginmessage(&buf, PqMsg_CopyOutResponse); + pq_sendbyte(&buf, format); /* overall format */ + pq_sendint16(&buf, natts); + for (i = 0; i < natts; i++) + pq_sendint16(&buf, format); /* per-column formats */ + pq_endmessage(&buf); +} + +static void +CopyToTextSendEndOfRow(CopyToState cstate) +{ + switch (cstate->copy_dest) + { + case COPY_DEST_FILE: + /* Default line termination depends on platform */ +#ifndef WIN32 + CopySendChar(cstate, '\n'); +#else + CopySendString(cstate, "\r\n"); +#endif + break; + case COPY_DEST_FRONTEND: + /* The FE/BE protocol uses \n as newline for all platforms */ + CopySendChar(cstate, '\n'); + break; + default: + break; + } + CopyToStateFlush(cstate); +} + +static void +CopyToTextStart(CopyToState cstate, TupleDesc tupDesc) +{ + int num_phys_attrs; + ListCell *cur; + + num_phys_attrs = tupDesc->natts; + /* Get info about the columns we need to process. */ + cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Oid out_func_oid; + bool isvarlena; + Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); + + getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } + + /* + * For non-binary copy, we need to convert null_print to file encoding, + * because it will be sent directly with CopySendString. + */ + if (cstate->need_transcoding) + cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, + cstate->opts.null_print_len, + cstate->file_encoding); + + /* if a header has been requested send the line */ + if (cstate->opts.header_line) + { + bool hdr_delim = false; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + char *colname; + + if (hdr_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + hdr_delim = true; + + colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); + + if (cstate->opts.csv_mode) + CopyAttributeOutCSV(cstate, colname, false, + list_length(cstate->attnumlist) == 1); + else + CopyAttributeOutText(cstate, colname); + } + + CopyToTextSendEndOfRow(cstate); + } +} + +static void +CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + bool need_delim = false; + FmgrInfo *out_functions = cstate->out_functions; + ListCell *cur; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (need_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + need_delim = true; + + if (isnull) + { + CopySendString(cstate, cstate->opts.null_print_client); + } + else + { + char *string; + + string = OutputFunctionCall(&out_functions[attnum - 1], value); + if (cstate->opts.csv_mode) + CopyAttributeOutCSV(cstate, string, + cstate->opts.force_quote_flags[attnum - 1], + list_length(cstate->attnumlist) == 1); + else + CopyAttributeOutText(cstate, string); + } + } + + CopyToTextSendEndOfRow(cstate); +} + +static void +CopyToTextEnd(CopyToState cstate) +{ +} + +/* + * CopyToRoutine implementation for "binary". + */ + +/* All "binary" options are parsed in ProcessCopyOptions(). We may move the + * code to here later. */ +static bool +CopyToBinaryProcessOption(CopyToState cstate, DefElem *defel) +{ + return false; +} + +static void +CopyToBinarySendCopyBegin(CopyToState cstate) { StringInfoData buf; int natts = list_length(cstate->attnumlist); - int16 format = (cstate->opts.binary ? 1 : 0); + int16 format = 1; int i; pq_beginmessage(&buf, PqMsg_CopyOutResponse); @@ -150,7 +249,123 @@ SendCopyBegin(CopyToState cstate) for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); - cstate->copy_dest = COPY_FRONTEND; +} + +static void +CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc) +{ + int num_phys_attrs; + ListCell *cur; + + num_phys_attrs = tupDesc->natts; + /* Get info about the columns we need to process. */ + cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Oid out_func_oid; + bool isvarlena; + Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); + + getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } + + { + /* Generate header for a binary copy */ + int32 tmp; + + /* Signature */ + CopySendData(cstate, BinarySignature, 11); + /* Flags field */ + tmp = 0; + CopySendInt32(cstate, tmp); + /* No header extension */ + tmp = 0; + CopySendInt32(cstate, tmp); + } +} + +static void +CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + FmgrInfo *out_functions = cstate->out_functions; + ListCell *cur; + + /* Binary per-tuple header */ + CopySendInt16(cstate, list_length(cstate->attnumlist)); + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (isnull) + { + CopySendInt32(cstate, -1); + } + else + { + bytea *outputbytes; + + outputbytes = SendFunctionCall(&out_functions[attnum - 1], value); + CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); + CopySendData(cstate, VARDATA(outputbytes), + VARSIZE(outputbytes) - VARHDRSZ); + } + } + + CopyToStateFlush(cstate); +} + +static void +CopyToBinaryEnd(CopyToState cstate) +{ + /* Generate trailer for a binary copy */ + CopySendInt16(cstate, -1); + /* Need to flush out the trailer */ + CopyToStateFlush(cstate); +} + +CopyToRoutine CopyToRoutineText = { + .CopyToProcessOption = CopyToTextProcessOption, + .CopyToSendCopyBegin = CopyToTextSendCopyBegin, + .CopyToStart = CopyToTextStart, + .CopyToOneRow = CopyToTextOneRow, + .CopyToEnd = CopyToTextEnd, +}; + +/* + * We can use the same CopyToRoutine for both of "text" and "csv" because + * CopyToText*() refer cstate->opts.csv_mode and change their behavior. We can + * split the implementations and stop referring cstate->opts.csv_mode later. + */ +CopyToRoutine CopyToRoutineCSV = { + .CopyToProcessOption = CopyToTextProcessOption, + .CopyToSendCopyBegin = CopyToTextSendCopyBegin, + .CopyToStart = CopyToTextStart, + .CopyToOneRow = CopyToTextOneRow, + .CopyToEnd = CopyToTextEnd, +}; + +CopyToRoutine CopyToRoutineBinary = { + .CopyToProcessOption = CopyToBinaryProcessOption, + .CopyToSendCopyBegin = CopyToBinarySendCopyBegin, + .CopyToStart = CopyToBinaryStart, + .CopyToOneRow = CopyToBinaryOneRow, + .CopyToEnd = CopyToBinaryEnd, +}; + +/* + * Send copy start/stop messages for frontend copies. These have changed + * in past protocol redesigns. + */ +static void +SendCopyBegin(CopyToState cstate) +{ + cstate->opts.to_routine->CopyToSendCopyBegin(cstate); + cstate->copy_dest = COPY_DEST_FRONTEND; } static void @@ -166,48 +381,38 @@ SendCopyEnd(CopyToState cstate) * CopySendData sends output data to the destination (file or frontend) * CopySendString does the same for null-terminated strings * CopySendChar does the same for single characters - * CopySendEndOfRow does the appropriate thing at end of each data row - * (data is not actually flushed except by CopySendEndOfRow) + * CopyToStateFlush flushes the buffered data + * (data is not actually flushed except by CopyToStateFlush) * * NB: no data conversion is applied by these functions *---------- */ -static void +void CopySendData(CopyToState cstate, const void *databuf, int datasize) { appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize); } -static void +void CopySendString(CopyToState cstate, const char *str) { appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str)); } -static void +void CopySendChar(CopyToState cstate, char c) { appendStringInfoCharMacro(cstate->fe_msgbuf, c); } -static void -CopySendEndOfRow(CopyToState cstate) +void +CopyToStateFlush(CopyToState cstate) { StringInfo fe_msgbuf = cstate->fe_msgbuf; switch (cstate->copy_dest) { - case COPY_FILE: - if (!cstate->opts.binary) - { - /* Default line termination depends on platform */ -#ifndef WIN32 - CopySendChar(cstate, '\n'); -#else - CopySendString(cstate, "\r\n"); -#endif - } - + case COPY_DEST_FILE: if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) @@ -241,15 +446,11 @@ CopySendEndOfRow(CopyToState cstate) errmsg("could not write to COPY file: %m"))); } break; - case COPY_FRONTEND: - /* The FE/BE protocol uses \n as newline for all platforms */ - if (!cstate->opts.binary) - CopySendChar(cstate, '\n'); - + case COPY_DEST_FRONTEND: /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len); break; - case COPY_CALLBACK: + case COPY_DEST_CALLBACK: cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len); break; } @@ -268,7 +469,7 @@ CopySendEndOfRow(CopyToState cstate) /* * CopySendInt32 sends an int32 in network byte order */ -static inline void +inline void CopySendInt32(CopyToState cstate, int32 val) { uint32 buf; @@ -280,7 +481,7 @@ CopySendInt32(CopyToState cstate, int32 val) /* * CopySendInt16 sends an int16 in network byte order */ -static inline void +inline void CopySendInt16(CopyToState cstate, int16 val) { uint16 buf; @@ -431,7 +632,7 @@ BeginCopyTo(ParseState *pstate, oldcontext = MemoryContextSwitchTo(cstate->copycontext); /* Extract options from the statement node tree */ - ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options); + ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , cstate, options); /* Process the source/target relation or query */ if (rel) @@ -622,12 +823,12 @@ BeginCopyTo(ParseState *pstate, /* See Multibyte encoding comment above */ cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); - cstate->copy_dest = COPY_FILE; /* default */ + cstate->copy_dest = COPY_DEST_FILE; /* default */ if (data_dest_cb) { progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; - cstate->copy_dest = COPY_CALLBACK; + cstate->copy_dest = COPY_DEST_CALLBACK; cstate->data_dest_cb = data_dest_cb; } else if (pipe) @@ -748,8 +949,6 @@ DoCopyTo(CopyToState cstate) bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL); bool fe_copy = (pipe && whereToSendOutput == DestRemote); TupleDesc tupDesc; - int num_phys_attrs; - ListCell *cur; uint64 processed; if (fe_copy) @@ -759,32 +958,11 @@ DoCopyTo(CopyToState cstate) tupDesc = RelationGetDescr(cstate->rel); else tupDesc = cstate->queryDesc->tupDesc; - num_phys_attrs = tupDesc->natts; cstate->opts.null_print_client = cstate->opts.null_print; /* default */ /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ cstate->fe_msgbuf = makeStringInfo(); - /* Get info about the columns we need to process. */ - cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - Oid out_func_oid; - bool isvarlena; - Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - - if (cstate->opts.binary) - getTypeBinaryOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - else - getTypeOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); - } - /* * Create a temporary memory context that we can reset once per row to * recover palloc'd memory. This avoids any problems with leaks inside @@ -795,57 +973,7 @@ DoCopyTo(CopyToState cstate) "COPY TO", ALLOCSET_DEFAULT_SIZES); - if (cstate->opts.binary) - { - /* Generate header for a binary copy */ - int32 tmp; - - /* Signature */ - CopySendData(cstate, BinarySignature, 11); - /* Flags field */ - tmp = 0; - CopySendInt32(cstate, tmp); - /* No header extension */ - tmp = 0; - CopySendInt32(cstate, tmp); - } - else - { - /* - * For non-binary copy, we need to convert null_print to file - * encoding, because it will be sent directly with CopySendString. - */ - if (cstate->need_transcoding) - cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, - cstate->opts.null_print_len, - cstate->file_encoding); - - /* if a header has been requested send the line */ - if (cstate->opts.header_line) - { - bool hdr_delim = false; - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - char *colname; - - if (hdr_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - hdr_delim = true; - - colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); - - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, colname, false, - list_length(cstate->attnumlist) == 1); - else - CopyAttributeOutText(cstate, colname); - } - - CopySendEndOfRow(cstate); - } - } + cstate->opts.to_routine->CopyToStart(cstate, tupDesc); if (cstate->rel) { @@ -884,13 +1012,7 @@ DoCopyTo(CopyToState cstate) processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } - if (cstate->opts.binary) - { - /* Generate trailer for a binary copy */ - CopySendInt16(cstate, -1); - /* Need to flush out the trailer */ - CopySendEndOfRow(cstate); - } + cstate->opts.to_routine->CopyToEnd(cstate); MemoryContextDelete(cstate->rowcontext); @@ -906,71 +1028,15 @@ DoCopyTo(CopyToState cstate) static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) { - bool need_delim = false; - FmgrInfo *out_functions = cstate->out_functions; MemoryContext oldcontext; - ListCell *cur; - char *string; MemoryContextReset(cstate->rowcontext); oldcontext = MemoryContextSwitchTo(cstate->rowcontext); - if (cstate->opts.binary) - { - /* Binary per-tuple header */ - CopySendInt16(cstate, list_length(cstate->attnumlist)); - } - /* Make sure the tuple is fully deconstructed */ slot_getallattrs(slot); - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - Datum value = slot->tts_values[attnum - 1]; - bool isnull = slot->tts_isnull[attnum - 1]; - - if (!cstate->opts.binary) - { - if (need_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - need_delim = true; - } - - if (isnull) - { - if (!cstate->opts.binary) - CopySendString(cstate, cstate->opts.null_print_client); - else - CopySendInt32(cstate, -1); - } - else - { - if (!cstate->opts.binary) - { - string = OutputFunctionCall(&out_functions[attnum - 1], - value); - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, string, - cstate->opts.force_quote_flags[attnum - 1], - list_length(cstate->attnumlist) == 1); - else - CopyAttributeOutText(cstate, string); - } - else - { - bytea *outputbytes; - - outputbytes = SendFunctionCall(&out_functions[attnum - 1], - value); - CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); - CopySendData(cstate, VARDATA(outputbytes), - VARSIZE(outputbytes) - VARHDRSZ); - } - } - } - - CopySendEndOfRow(cstate); + cstate->opts.to_routine->CopyToOneRow(cstate, slot); MemoryContextSwitchTo(oldcontext); } diff --git a/src/backend/nodes/Makefile b/src/backend/nodes/Makefile index 66bbad8e6e02d..173ee11811c35 100644 --- a/src/backend/nodes/Makefile +++ b/src/backend/nodes/Makefile @@ -49,6 +49,7 @@ node_headers = \ access/sdir.h \ access/tableam.h \ access/tsmapi.h \ + commands/copyapi.h \ commands/event_trigger.h \ commands/trigger.h \ executor/tuptable.h \ diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl old mode 100644 new mode 100755 index 2f0a59bc874b0..bd397f45ac1d2 --- a/src/backend/nodes/gen_node_support.pl +++ b/src/backend/nodes/gen_node_support.pl @@ -61,6 +61,7 @@ sub elem access/sdir.h access/tableam.h access/tsmapi.h + commands/copyapi.h commands/event_trigger.h commands/trigger.h executor/tuptable.h @@ -85,6 +86,7 @@ sub elem access/sdir.h access/tableam.h access/tsmapi.h + commands/copyapi.h commands/event_trigger.h commands/trigger.h executor/tuptable.h diff --git a/src/backend/utils/adt/json.c b/src/backend/utils/adt/json.c index d719a61f16bad..fabd4e611e1cf 100644 --- a/src/backend/utils/adt/json.c +++ b/src/backend/utils/adt/json.c @@ -83,8 +83,6 @@ typedef struct JsonAggState JsonUniqueBuilderState unique_check; } JsonAggState; -static void composite_to_json(Datum composite, StringInfo result, - bool use_line_feeds); static void array_dim_to_json(StringInfo result, int dim, int ndims, int *dims, Datum *vals, bool *nulls, int *valcount, JsonTypeCategory tcategory, Oid outfuncoid, @@ -507,8 +505,9 @@ array_to_json_internal(Datum array, StringInfo result, bool use_line_feeds) /* * Turn a composite / record into JSON. + * Exported so COPY TO can use it. */ -static void +void composite_to_json(Datum composite, StringInfo result, bool use_line_feeds) { HeapTupleHeader td; diff --git a/src/backend/utils/adt/pseudotypes.c b/src/backend/utils/adt/pseudotypes.c index a3a991f634d39..d308780c43876 100644 --- a/src/backend/utils/adt/pseudotypes.c +++ b/src/backend/utils/adt/pseudotypes.c @@ -373,6 +373,7 @@ PSEUDOTYPE_DUMMY_IO_FUNCS(fdw_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(table_am_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(index_am_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(tsm_handler); +PSEUDOTYPE_DUMMY_IO_FUNCS(copy_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(internal); PSEUDOTYPE_DUMMY_IO_FUNCS(anyelement); PSEUDOTYPE_DUMMY_IO_FUNCS(anynonarray); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 29af4ce65d5c7..d4e426687c904 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -7617,6 +7617,12 @@ { oid => '3312', descr => 'I/O', proname => 'tsm_handler_out', prorettype => 'cstring', proargtypes => 'tsm_handler', prosrc => 'tsm_handler_out' }, +{ oid => '8753', descr => 'I/O', + proname => 'copy_handler_in', proisstrict => 'f', prorettype => 'copy_handler', + proargtypes => 'cstring', prosrc => 'copy_handler_in' }, +{ oid => '8754', descr => 'I/O', + proname => 'copy_handler_out', prorettype => 'cstring', + proargtypes => 'copy_handler', prosrc => 'copy_handler_out' }, { oid => '267', descr => 'I/O', proname => 'table_am_handler_in', proisstrict => 'f', prorettype => 'table_am_handler', proargtypes => 'cstring', diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat index d29194da31fa8..2040d5da83fc9 100644 --- a/src/include/catalog/pg_type.dat +++ b/src/include/catalog/pg_type.dat @@ -632,6 +632,12 @@ typcategory => 'P', typinput => 'tsm_handler_in', typoutput => 'tsm_handler_out', typreceive => '-', typsend => '-', typalign => 'i' }, +{ oid => '8752', + descr => 'pseudo-type for the result of a copy to/from method functoin', + typname => 'copy_handler', typlen => '4', typbyval => 't', typtype => 'p', + typcategory => 'P', typinput => 'copy_handler_in', + typoutput => 'copy_handler_out', typreceive => '-', typsend => '-', + typalign => 'i' }, { oid => '269', typname => 'table_am_handler', descr => 'pseudo-type for the result of a table AM handler function', diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index b3da3cb0be799..cd41d320749bd 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -14,80 +14,17 @@ #ifndef COPY_H #define COPY_H +#include "commands/copyapi.h" #include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "parser/parse_node.h" #include "tcop/dest.h" -/* - * Represents whether a header line should be present, and whether it must - * match the actual names (which implies "true"). - */ -typedef enum CopyHeaderChoice -{ - COPY_HEADER_FALSE = 0, - COPY_HEADER_TRUE, - COPY_HEADER_MATCH, -} CopyHeaderChoice; - -/* - * Represents where to save input processing errors. More values to be added - * in the future. - */ -typedef enum CopyOnErrorChoice -{ - COPY_ON_ERROR_STOP = 0, /* immediately throw errors, default */ - COPY_ON_ERROR_IGNORE, /* ignore errors */ -} CopyOnErrorChoice; - -/* - * A struct to hold COPY options, in a parsed form. All of these are related - * to formatting, except for 'freeze', which doesn't really belong here, but - * it's expedient to parse it along with all the other options. - */ -typedef struct CopyFormatOptions -{ - /* parameters from the COPY command */ - int file_encoding; /* file or remote side's character encoding, - * -1 if not specified */ - bool binary; /* binary format? */ - bool freeze; /* freeze rows on loading? */ - bool csv_mode; /* Comma Separated Value format? */ - CopyHeaderChoice header_line; /* header line? */ - char *null_print; /* NULL marker string (server encoding!) */ - int null_print_len; /* length of same */ - char *null_print_client; /* same converted to file encoding */ - char *default_print; /* DEFAULT marker string */ - int default_print_len; /* length of same */ - char *delim; /* column delimiter (must be 1 byte) */ - char *quote; /* CSV quote char (must be 1 byte) */ - char *escape; /* CSV escape char (must be 1 byte) */ - List *force_quote; /* list of column names */ - bool force_quote_all; /* FORCE_QUOTE *? */ - bool *force_quote_flags; /* per-column CSV FQ flags */ - List *force_notnull; /* list of column names */ - bool force_notnull_all; /* FORCE_NOT_NULL *? */ - bool *force_notnull_flags; /* per-column CSV FNN flags */ - List *force_null; /* list of column names */ - bool force_null_all; /* FORCE_NULL *? */ - bool *force_null_flags; /* per-column CSV FN flags */ - bool convert_selectively; /* do selective binary conversion? */ - CopyOnErrorChoice on_error; /* what to do when error happened */ - List *convert_select; /* list of column names (can be NIL) */ -} CopyFormatOptions; - -/* These are private in commands/copy[from|to].c */ -typedef struct CopyFromStateData *CopyFromState; -typedef struct CopyToStateData *CopyToState; - -typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); -typedef void (*copy_data_dest_cb) (void *data, int len); - extern void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, uint64 *processed); -extern void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, List *options); +extern void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, void *cstate, List *options); extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options); diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h new file mode 100644 index 0000000000000..0a05b24c54b14 --- /dev/null +++ b/src/include/commands/copyapi.h @@ -0,0 +1,393 @@ +/*------------------------------------------------------------------------- + * + * copyapi.h + * API for COPY TO/FROM handlers + * + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/commands/copyapi.h + * + *------------------------------------------------------------------------- + */ +#ifndef COPYAPI_H +#define COPYAPI_H + +#include "commands/trigger.h" +#include "executor/execdesc.h" +#include "executor/tuptable.h" +#include "nodes/miscnodes.h" +#include "nodes/parsenodes.h" + +typedef struct CopyFromStateData *CopyFromState; + +typedef bool (*CopyFromProcessOption_function) (CopyFromState cstate, DefElem *defel); +typedef int16 (*CopyFromGetFormat_function) (CopyFromState cstate); +typedef void (*CopyFromStart_function) (CopyFromState cstate, TupleDesc tupDesc); +typedef bool (*CopyFromOneRow_function) (CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls); +typedef void (*CopyFromEnd_function) (CopyFromState cstate); + +/* Routines for a COPY FROM format implementation. */ +typedef struct CopyFromRoutine +{ + NodeTag type; + + /* + * Called for processing one COPY FROM option. This will return false when + * the given option is invalid. + */ + CopyFromProcessOption_function CopyFromProcessOption; + + /* + * Called when COPY FROM is started. This will return a format as int16 + * value. It's used for the CopyInResponse message. + */ + CopyFromGetFormat_function CopyFromGetFormat; + + /* + * Called when COPY FROM is started. This will initialize something and + * receive a header. + */ + CopyFromStart_function CopyFromStart; + + /* Copy one row. It returns false if no more tuples. */ + CopyFromOneRow_function CopyFromOneRow; + + /* Called when COPY FROM is ended. This will finalize something. */ + CopyFromEnd_function CopyFromEnd; +} CopyFromRoutine; + +/* Built-in CopyFromRoutine for "text", "csv" and "binary". */ +extern CopyFromRoutine CopyFromRoutineText; +extern CopyFromRoutine CopyFromRoutineCSV; +extern CopyFromRoutine CopyFromRoutineBinary; + + +typedef struct CopyToStateData *CopyToState; + +typedef bool (*CopyToProcessOption_function) (CopyToState cstate, DefElem *defel); +typedef void (*CopyToSendCopyBegin_function) (CopyToState cstate); +typedef void (*CopyToStart_function) (CopyToState cstate, TupleDesc tupDesc); +typedef void (*CopyToOneRow_function) (CopyToState cstate, TupleTableSlot *slot); +typedef void (*CopyToEnd_function) (CopyToState cstate); + +/* Routines for a COPY TO format implementation. */ +typedef struct CopyToRoutine +{ + NodeTag type; + + /* + * Called for processing one COPY TO option. This will return false when + * the given option is invalid. + */ + CopyToProcessOption_function CopyToProcessOption; + + /* + * Called when COPY TO is started. + */ + CopyToSendCopyBegin_function CopyToSendCopyBegin; + + /* Called when COPY TO is started. This will send a header. */ + CopyToStart_function CopyToStart; + + /* Copy one row for COPY TO. */ + CopyToOneRow_function CopyToOneRow; + + /* Called when COPY TO is ended. This will send a trailer. */ + CopyToEnd_function CopyToEnd; +} CopyToRoutine; + +/* Built-in CopyToRoutine for "text", "csv" and "binary". */ +extern CopyToRoutine CopyToRoutineText; +extern CopyToRoutine CopyToRoutineCSV; +extern CopyToRoutine CopyToRoutineBinary; + +/* + * Represents whether a header line should be present, and whether it must + * match the actual names (which implies "true"). + */ +typedef enum CopyHeaderChoice +{ + COPY_HEADER_FALSE = 0, + COPY_HEADER_TRUE, + COPY_HEADER_MATCH, +} CopyHeaderChoice; + +/* + * Represents where to save input processing errors. More values to be added + * in the future. + */ +typedef enum CopyOnErrorChoice +{ + COPY_ON_ERROR_STOP = 0, /* immediately throw errors, default */ + COPY_ON_ERROR_IGNORE, /* ignore errors */ +} CopyOnErrorChoice; + +/* + * A struct to hold COPY options, in a parsed form. All of these are related + * to formatting, except for 'freeze', which doesn't really belong here, but + * it's expedient to parse it along with all the other options. + */ +typedef struct CopyFormatOptions +{ + /* parameters from the COPY command */ + int file_encoding; /* file or remote side's character encoding, + * -1 if not specified */ + bool binary; /* binary format? */ + bool freeze; /* freeze rows on loading? */ + bool csv_mode; /* Comma Separated Value format? */ + CopyHeaderChoice header_line; /* header line? */ + char *null_print; /* NULL marker string (server encoding!) */ + int null_print_len; /* length of same */ + char *null_print_client; /* same converted to file encoding */ + char *default_print; /* DEFAULT marker string */ + int default_print_len; /* length of same */ + char *delim; /* column delimiter (must be 1 byte) */ + char *quote; /* CSV quote char (must be 1 byte) */ + char *escape; /* CSV escape char (must be 1 byte) */ + List *force_quote; /* list of column names */ + bool force_quote_all; /* FORCE_QUOTE *? */ + bool *force_quote_flags; /* per-column CSV FQ flags */ + List *force_notnull; /* list of column names */ + bool force_notnull_all; /* FORCE_NOT_NULL *? */ + bool *force_notnull_flags; /* per-column CSV FNN flags */ + List *force_null; /* list of column names */ + bool force_null_all; /* FORCE_NULL *? */ + bool *force_null_flags; /* per-column CSV FN flags */ + bool convert_selectively; /* do selective binary conversion? */ + CopyOnErrorChoice on_error; /* what to do when error happened */ + List *convert_select; /* list of column names (can be NIL) */ + CopyFromRoutine *from_routine; /* callback routines for COPY FROM */ + CopyToRoutine *to_routine; /* callback routines for COPY TO */ +} CopyFormatOptions; + + +/* + * Represents the different source cases we need to worry about at + * the bottom level + */ +typedef enum CopySource +{ + COPY_SOURCE_FILE, /* from file (or a piped program) */ + COPY_SOURCE_FRONTEND, /* from frontend */ + COPY_SOURCE_CALLBACK, /* from callback function */ +} CopySource; + +/* + * Represents the end-of-line terminator type of the input + */ +typedef enum EolType +{ + EOL_UNKNOWN, + EOL_NL, + EOL_CR, + EOL_CRNL, +} EolType; + +typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); + +/* + * This struct contains all the state variables used throughout a COPY FROM + * operation. + */ +typedef struct CopyFromStateData +{ + /* low-level state data */ + CopySource copy_src; /* type of copy source */ + FILE *copy_file; /* used if copy_src == COPY_FILE */ + StringInfo fe_msgbuf; /* used if copy_src == COPY_FRONTEND */ + + EolType eol_type; /* EOL type of input */ + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + Oid conversion_proc; /* encoding conversion function */ + + /* parameters from the COPY command */ + Relation rel; /* relation to copy from */ + List *attnumlist; /* integer list of attnums to copy */ + char *filename; /* filename, or NULL for STDIN */ + bool is_program; /* is 'filename' a program to popen? */ + copy_data_source_cb data_source_cb; /* function for reading data */ + + CopyFormatOptions opts; + bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ + Node *whereClause; /* WHERE condition (or NULL) */ + + /* these are just for error messages, see CopyFromErrorCallback */ + const char *cur_relname; /* table name for error messages */ + uint64 cur_lineno; /* line number for error messages */ + const char *cur_attname; /* current att for error messages */ + const char *cur_attval; /* current att value for error messages */ + bool relname_only; /* don't output line number, att, etc. */ + + /* + * Working state + */ + MemoryContext copycontext; /* per-copy execution context */ + + AttrNumber num_defaults; /* count of att that are missing and have + * default value */ + FmgrInfo *in_functions; /* array of input functions for each attrs */ + Oid *typioparams; /* array of element types for in_functions */ + ErrorSaveContext *escontext; /* soft error trapper during in_functions + * execution */ + uint64 num_errors; /* total number of rows which contained soft + * errors */ + int *defmap; /* array of default att numbers related to + * missing att */ + ExprState **defexprs; /* array of default att expressions for all + * att */ + bool *defaults; /* if DEFAULT marker was found for + * corresponding att */ + bool volatile_defexprs; /* is any of defexprs volatile? */ + List *range_table; /* single element list of RangeTblEntry */ + List *rteperminfos; /* single element list of RTEPermissionInfo */ + ExprState *qualexpr; + + TransitionCaptureState *transition_capture; + + /* + * These variables are used to reduce overhead in COPY FROM. + * + * attribute_buf holds the separated, de-escaped text for each field of + * the current line. The CopyReadAttributes functions return arrays of + * pointers into this buffer. We avoid palloc/pfree overhead by re-using + * the buffer on each cycle. + * + * In binary COPY FROM, attribute_buf holds the binary data for the + * current field, but the usage is otherwise similar. + */ + StringInfoData attribute_buf; + + /* field raw data pointers found by COPY FROM */ + + int max_fields; + char **raw_fields; + + /* + * Similarly, line_buf holds the whole input line being processed. The + * input cycle is first to read the whole line into line_buf, and then + * extract the individual attribute fields into attribute_buf. line_buf + * is preserved unmodified so that we can display it in error messages if + * appropriate. (In binary mode, line_buf is not used.) + */ + StringInfoData line_buf; + bool line_buf_valid; /* contains the row being processed? */ + + /* + * input_buf holds input data, already converted to database encoding. + * + * In text mode, CopyReadLine parses this data sufficiently to locate line + * boundaries, then transfers the data to line_buf. We guarantee that + * there is a \0 at input_buf[input_buf_len] at all times. (In binary + * mode, input_buf is not used.) + * + * If encoding conversion is not required, input_buf is not a separate + * buffer but points directly to raw_buf. In that case, input_buf_len + * tracks the number of bytes that have been verified as valid in the + * database encoding, and raw_buf_len is the total number of bytes stored + * in the buffer. + */ +#define INPUT_BUF_SIZE 65536 /* we palloc INPUT_BUF_SIZE+1 bytes */ + char *input_buf; + int input_buf_index; /* next byte to process */ + int input_buf_len; /* total # of bytes stored */ + bool input_reached_eof; /* true if we reached EOF */ + bool input_reached_error; /* true if a conversion error happened */ + /* Shorthand for number of unconsumed bytes available in input_buf */ +#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index) + + /* + * raw_buf holds raw input data read from the data source (file or client + * connection), not yet converted to the database encoding. Like with + * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len]. + */ +#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ + char *raw_buf; + int raw_buf_index; /* next byte to process */ + int raw_buf_len; /* total # of bytes stored */ + bool raw_reached_eof; /* true if we reached EOF */ + + /* Shorthand for number of unconsumed bytes available in raw_buf */ +#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) + + uint64 bytes_processed; /* number of bytes processed so far */ + + /* For custom format implementation */ + void *opaque; /* private space */ +} CopyFromStateData; + +extern int CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes); + +/* + * Represents the different dest cases we need to worry about at + * the bottom level + */ +typedef enum CopyDest +{ + COPY_DEST_FILE, /* to file (or a piped program) */ + COPY_DEST_FRONTEND, /* to frontend */ + COPY_DEST_CALLBACK, /* to callback function */ +} CopyDest; + +typedef void (*copy_data_dest_cb) (void *data, int len); + +/* + * This struct contains all the state variables used throughout a COPY TO + * operation. + * + * Multi-byte encodings: all supported client-side encodings encode multi-byte + * characters by having the first byte's high bit set. Subsequent bytes of the + * character can have the high bit not set. When scanning data in such an + * encoding to look for a match to a single-byte (ie ASCII) character, we must + * use the full pg_encoding_mblen() machinery to skip over multibyte + * characters, else we might find a false match to a trailing byte. In + * supported server encodings, there is no possibility of a false match, and + * it's faster to make useless comparisons to trailing bytes than it is to + * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true + * when we have to do it the hard way. + */ +typedef struct CopyToStateData +{ + /* low-level state data */ + CopyDest copy_dest; /* type of copy source/destination */ + FILE *copy_file; /* used if copy_dest == COPY_FILE */ + StringInfo fe_msgbuf; /* used for all dests during COPY TO */ + + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + + /* parameters from the COPY command */ + Relation rel; /* relation to copy to */ + QueryDesc *queryDesc; /* executable query to copy from */ + List *attnumlist; /* integer list of attnums to copy */ + char *filename; /* filename, or NULL for STDOUT */ + bool is_program; /* is 'filename' a program to popen? */ + copy_data_dest_cb data_dest_cb; /* function for writing data */ + + CopyFormatOptions opts; + Node *whereClause; /* WHERE condition (or NULL) */ + + /* + * Working state + */ + MemoryContext copycontext; /* per-copy execution context */ + + FmgrInfo *out_functions; /* lookup info for output functions */ + MemoryContext rowcontext; /* per-row evaluation context */ + uint64 bytes_processed; /* number of bytes processed so far */ + + /* For custom format implementation */ + void *opaque; /* private space */ +} CopyToStateData; + +extern void CopySendData(CopyToState cstate, const void *databuf, int datasize); +extern void CopySendString(CopyToState cstate, const char *str); +extern void CopySendChar(CopyToState cstate, char c); +extern void CopySendInt32(CopyToState cstate, int32 val); +extern void CopySendInt16(CopyToState cstate, int16 val); +extern void CopyToStateFlush(CopyToState cstate); + +#endif /* COPYAPI_H */ diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index cad52fcc78370..f8f6120255067 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -18,28 +18,6 @@ #include "commands/trigger.h" #include "nodes/miscnodes.h" -/* - * Represents the different source cases we need to worry about at - * the bottom level - */ -typedef enum CopySource -{ - COPY_FILE, /* from file (or a piped program) */ - COPY_FRONTEND, /* from frontend */ - COPY_CALLBACK, /* from callback function */ -} CopySource; - -/* - * Represents the end-of-line terminator type of the input - */ -typedef enum EolType -{ - EOL_UNKNOWN, - EOL_NL, - EOL_CR, - EOL_CRNL, -} EolType; - /* * Represents the insert method to be used during COPY FROM. */ @@ -52,135 +30,11 @@ typedef enum CopyInsertMethod * ExecForeignBatchInsert only if valid */ } CopyInsertMethod; -/* - * This struct contains all the state variables used throughout a COPY FROM - * operation. - */ -typedef struct CopyFromStateData -{ - /* low-level state data */ - CopySource copy_src; /* type of copy source */ - FILE *copy_file; /* used if copy_src == COPY_FILE */ - StringInfo fe_msgbuf; /* used if copy_src == COPY_FRONTEND */ - - EolType eol_type; /* EOL type of input */ - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - Oid conversion_proc; /* encoding conversion function */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDIN */ - bool is_program; /* is 'filename' a program to popen? */ - copy_data_source_cb data_source_cb; /* function for reading data */ - - CopyFormatOptions opts; - bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ - Node *whereClause; /* WHERE condition (or NULL) */ - - /* these are just for error messages, see CopyFromErrorCallback */ - const char *cur_relname; /* table name for error messages */ - uint64 cur_lineno; /* line number for error messages */ - const char *cur_attname; /* current att for error messages */ - const char *cur_attval; /* current att value for error messages */ - bool relname_only; /* don't output line number, att, etc. */ - - /* - * Working state - */ - MemoryContext copycontext; /* per-copy execution context */ - - AttrNumber num_defaults; /* count of att that are missing and have - * default value */ - FmgrInfo *in_functions; /* array of input functions for each attrs */ - Oid *typioparams; /* array of element types for in_functions */ - ErrorSaveContext *escontext; /* soft error trapper during in_functions - * execution */ - uint64 num_errors; /* total number of rows which contained soft - * errors */ - int *defmap; /* array of default att numbers related to - * missing att */ - ExprState **defexprs; /* array of default att expressions for all - * att */ - bool *defaults; /* if DEFAULT marker was found for - * corresponding att */ - bool volatile_defexprs; /* is any of defexprs volatile? */ - List *range_table; /* single element list of RangeTblEntry */ - List *rteperminfos; /* single element list of RTEPermissionInfo */ - ExprState *qualexpr; - - TransitionCaptureState *transition_capture; - - /* - * These variables are used to reduce overhead in COPY FROM. - * - * attribute_buf holds the separated, de-escaped text for each field of - * the current line. The CopyReadAttributes functions return arrays of - * pointers into this buffer. We avoid palloc/pfree overhead by re-using - * the buffer on each cycle. - * - * In binary COPY FROM, attribute_buf holds the binary data for the - * current field, but the usage is otherwise similar. - */ - StringInfoData attribute_buf; - - /* field raw data pointers found by COPY FROM */ - - int max_fields; - char **raw_fields; - - /* - * Similarly, line_buf holds the whole input line being processed. The - * input cycle is first to read the whole line into line_buf, and then - * extract the individual attribute fields into attribute_buf. line_buf - * is preserved unmodified so that we can display it in error messages if - * appropriate. (In binary mode, line_buf is not used.) - */ - StringInfoData line_buf; - bool line_buf_valid; /* contains the row being processed? */ - - /* - * input_buf holds input data, already converted to database encoding. - * - * In text mode, CopyReadLine parses this data sufficiently to locate line - * boundaries, then transfers the data to line_buf. We guarantee that - * there is a \0 at input_buf[input_buf_len] at all times. (In binary - * mode, input_buf is not used.) - * - * If encoding conversion is not required, input_buf is not a separate - * buffer but points directly to raw_buf. In that case, input_buf_len - * tracks the number of bytes that have been verified as valid in the - * database encoding, and raw_buf_len is the total number of bytes stored - * in the buffer. - */ -#define INPUT_BUF_SIZE 65536 /* we palloc INPUT_BUF_SIZE+1 bytes */ - char *input_buf; - int input_buf_index; /* next byte to process */ - int input_buf_len; /* total # of bytes stored */ - bool input_reached_eof; /* true if we reached EOF */ - bool input_reached_error; /* true if a conversion error happened */ - /* Shorthand for number of unconsumed bytes available in input_buf */ -#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index) - - /* - * raw_buf holds raw input data read from the data source (file or client - * connection), not yet converted to the database encoding. Like with - * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len]. - */ -#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ - char *raw_buf; - int raw_buf_index; /* next byte to process */ - int raw_buf_len; /* total # of bytes stored */ - bool raw_reached_eof; /* true if we reached EOF */ - - /* Shorthand for number of unconsumed bytes available in raw_buf */ -#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) - - uint64 bytes_processed; /* number of bytes processed so far */ -} CopyFromStateData; - extern void ReceiveCopyBegin(CopyFromState cstate); extern void ReceiveCopyBinaryHeader(CopyFromState cstate); +extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls); +extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls); + + #endif /* COPYFROM_INTERNAL_H */ diff --git a/src/include/nodes/meson.build b/src/include/nodes/meson.build index b665e55b6571f..103df1a787357 100644 --- a/src/include/nodes/meson.build +++ b/src/include/nodes/meson.build @@ -11,6 +11,7 @@ node_support_input_i = [ 'access/sdir.h', 'access/tableam.h', 'access/tsmapi.h', + 'commands/copyapi.h', 'commands/event_trigger.h', 'commands/trigger.h', 'executor/tuptable.h', diff --git a/src/include/utils/json.h b/src/include/utils/json.h index 6d7f1b387d1ed..d5631171add4b 100644 --- a/src/include/utils/json.h +++ b/src/include/utils/json.h @@ -17,6 +17,8 @@ #include "lib/stringinfo.h" /* functions in json.c */ +extern void composite_to_json(Datum composite, StringInfo result, + bool use_line_feeds); extern void escape_json(StringInfo buf, const char *str); extern char *JsonEncodeDateTime(char *buf, Datum value, Oid typid, const int *tzp); diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index e32c8925f67dd..9d57b868d5f36 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -15,6 +15,7 @@ SUBDIRS = \ spgist_name_ops \ test_bloomfilter \ test_copy_callbacks \ + test_copy_format \ test_custom_rmgrs \ test_ddl_deparse \ test_dsa \ diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index 397e0906e6f72..d76f2a6003e90 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -13,6 +13,7 @@ subdir('spgist_name_ops') subdir('ssl_passphrase_callback') subdir('test_bloomfilter') subdir('test_copy_callbacks') +subdir('test_copy_format') subdir('test_custom_rmgrs') subdir('test_ddl_deparse') subdir('test_dsa') diff --git a/src/test/modules/test_copy_format/.gitignore b/src/test/modules/test_copy_format/.gitignore new file mode 100644 index 0000000000000..5dcb3ff972350 --- /dev/null +++ b/src/test/modules/test_copy_format/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/src/test/modules/test_copy_format/Makefile b/src/test/modules/test_copy_format/Makefile new file mode 100644 index 0000000000000..8497f91624d5a --- /dev/null +++ b/src/test/modules/test_copy_format/Makefile @@ -0,0 +1,23 @@ +# src/test/modules/test_copy_format/Makefile + +MODULE_big = test_copy_format +OBJS = \ + $(WIN32RES) \ + test_copy_format.o +PGFILEDESC = "test_copy_format - test custom COPY FORMAT" + +EXTENSION = test_copy_format +DATA = test_copy_format--1.0.sql + +REGRESS = test_copy_format + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_copy_format +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out b/src/test/modules/test_copy_format/expected/test_copy_format.out new file mode 100644 index 0000000000000..6af69f0eb70d3 --- /dev/null +++ b/src/test/modules/test_copy_format/expected/test_copy_format.out @@ -0,0 +1,29 @@ +CREATE EXTENSION test_copy_format; +CREATE TABLE public.test (a INT, b INT, c INT); +INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +COPY public.test FROM stdin WITH ( + option_before 'before', + format 'test_copy_format', + option_after 'after' +); +NOTICE: test_copy_format: is_from=true +NOTICE: CopyFromProcessOption: "option_before"="before" +NOTICE: CopyFromProcessOption: "option_after"="after" +NOTICE: CopyFromGetFormat +NOTICE: CopyFromStart: natts=3 +NOTICE: CopyFromOneRow +NOTICE: CopyFromEnd +COPY public.test TO stdout WITH ( + option_before 'before', + format 'test_copy_format', + option_after 'after' +); +NOTICE: test_copy_format: is_from=false +NOTICE: CopyToProcessOption: "option_before"="before" +NOTICE: CopyToProcessOption: "option_after"="after" +NOTICE: CopyToGetFormat +NOTICE: CopyToStart: natts=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToEnd diff --git a/src/test/modules/test_copy_format/meson.build b/src/test/modules/test_copy_format/meson.build new file mode 100644 index 0000000000000..4cefe7b709aea --- /dev/null +++ b/src/test/modules/test_copy_format/meson.build @@ -0,0 +1,33 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +test_copy_format_sources = files( + 'test_copy_format.c', +) + +if host_system == 'windows' + test_copy_format_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'test_copy_format', + '--FILEDESC', 'test_copy_format - test custom COPY FORMAT',]) +endif + +test_copy_format = shared_module('test_copy_format', + test_copy_format_sources, + kwargs: pg_test_mod_args, +) +test_install_libs += test_copy_format + +test_install_data += files( + 'test_copy_format.control', + 'test_copy_format--1.0.sql', +) + +tests += { + 'name': 'test_copy_format', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'regress': { + 'sql': [ + 'test_copy_format', + ], + }, +} diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql b/src/test/modules/test_copy_format/sql/test_copy_format.sql new file mode 100644 index 0000000000000..94d3c789a01db --- /dev/null +++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql @@ -0,0 +1,14 @@ +CREATE EXTENSION test_copy_format; +CREATE TABLE public.test (a INT, b INT, c INT); +INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +COPY public.test FROM stdin WITH ( + option_before 'before', + format 'test_copy_format', + option_after 'after' +); +\. +COPY public.test TO stdout WITH ( + option_before 'before', + format 'test_copy_format', + option_after 'after' +); diff --git a/src/test/modules/test_copy_format/test_copy_format--1.0.sql b/src/test/modules/test_copy_format/test_copy_format--1.0.sql new file mode 100644 index 0000000000000..d24ea03ce99ca --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format--1.0.sql @@ -0,0 +1,8 @@ +/* src/test/modules/test_copy_format/test_copy_format--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_copy_format" to load this file. \quit + +CREATE FUNCTION test_copy_format(internal) + RETURNS copy_handler + AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_copy_format/test_copy_format.c b/src/test/modules/test_copy_format/test_copy_format.c new file mode 100644 index 0000000000000..d833f22bbf4f7 --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format.c @@ -0,0 +1,120 @@ +/*-------------------------------------------------------------------------- + * + * test_copy_format.c + * Code for testing custom COPY format. + * + * Portions Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_copy_format/test_copy_format.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "commands/copy.h" +#include "commands/defrem.h" + +PG_MODULE_MAGIC; + +static bool +CopyFromProcessOption(CopyFromState cstate, DefElem *defel) +{ + ereport(NOTICE, + (errmsg("CopyFromProcessOption: \"%s\"=\"%s\"", + defel->defname, defGetString(defel)))); + return true; +} + +static int16 +CopyFromGetFormat(CopyFromState cstate) +{ + ereport(NOTICE, (errmsg("CopyFromGetFormat"))); + return 0; +} + +static void +CopyFromStart(CopyFromState cstate, TupleDesc tupDesc) +{ + ereport(NOTICE, (errmsg("CopyFromStart: natts=%d", tupDesc->natts))); +} + +static bool +CopyFromOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls) +{ + ereport(NOTICE, (errmsg("CopyFromOneRow"))); + return false; +} + +static void +CopyFromEnd(CopyFromState cstate) +{ + ereport(NOTICE, (errmsg("CopyFromEnd"))); +} + +static const CopyFromRoutine CopyFromRoutineTestCopyFormat = { + .type = T_CopyFromRoutine, + .CopyFromProcessOption = CopyFromProcessOption, + .CopyFromGetFormat = CopyFromGetFormat, + .CopyFromStart = CopyFromStart, + .CopyFromOneRow = CopyFromOneRow, + .CopyFromEnd = CopyFromEnd, +}; + +static bool +CopyToProcessOption(CopyToState cstate, DefElem *defel) +{ + ereport(NOTICE, + (errmsg("CopyToProcessOption: \"%s\"=\"%s\"", + defel->defname, defGetString(defel)))); + return true; +} + +static void +CopyToSendCopyBegin(CopyToState cstate) +{ + ereport(NOTICE, (errmsg("CopyToGetFormat"))); +} + +static void +CopyToStart(CopyToState cstate, TupleDesc tupDesc) +{ + ereport(NOTICE, (errmsg("CopyToStart: natts=%d", tupDesc->natts))); +} + +static void +CopyToOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + ereport(NOTICE, (errmsg("CopyToOneRow: tts_nvalid=%u", slot->tts_nvalid))); +} + +static void +CopyToEnd(CopyToState cstate) +{ + ereport(NOTICE, (errmsg("CopyToEnd"))); +} + +static const CopyToRoutine CopyToRoutineTestCopyFormat = { + .type = T_CopyToRoutine, + .CopyToProcessOption = CopyToProcessOption, + .CopyToSendCopyBegin = CopyToSendCopyBegin, + .CopyToStart = CopyToStart, + .CopyToOneRow = CopyToOneRow, + .CopyToEnd = CopyToEnd, +}; + +PG_FUNCTION_INFO_V1(test_copy_format); +Datum +test_copy_format(PG_FUNCTION_ARGS) +{ + bool is_from = PG_GETARG_BOOL(0); + + ereport(NOTICE, + (errmsg("test_copy_format: is_from=%s", is_from ? "true" : "false"))); + + if (is_from) + PG_RETURN_POINTER(&CopyFromRoutineTestCopyFormat); + else + PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat); +} diff --git a/src/test/modules/test_copy_format/test_copy_format.control b/src/test/modules/test_copy_format/test_copy_format.control new file mode 100644 index 0000000000000..f05a6362358d1 --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format.control @@ -0,0 +1,4 @@ +comment = 'Test code for custom COPY format' +default_version = '1.0' +module_pathname = '$libdir/test_copy_format' +relocatable = true