Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions src/operator/src/statement/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl StatementExecutor {
path,
schema,
} => {
let projected_schema = Arc::new(
let output_schema = Arc::new(
compat_schema
.project(&projection)
.context(error::ProjectSchemaSnafu)?,
Expand All @@ -255,17 +255,23 @@ impl StatementExecutor {
)),
None,
));

let projected_file_schema = Arc::new(
schema
.project(&projection)
.context(error::ProjectSchemaSnafu)?,
);
let stream = self
.build_file_stream(
CsvOpener::new(csv_config, format.compression_type.into()),
path,
schema.clone(),
projected_file_schema,
)
.await?;

Ok(Box::pin(
RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection))
// The projection is already applied in the CSV reader when we created the stream,
// so we pass None here to avoid double projection which would cause schema mismatch errors.
RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
.with_filter(filters)
.context(error::PhysicalExprSnafu)?,
))
Expand All @@ -280,7 +286,7 @@ impl StatementExecutor {
.project(&projection)
.context(error::ProjectSchemaSnafu)?,
);
let projected_schema = Arc::new(
let output_schema = Arc::new(
compat_schema
.project(&projection)
.context(error::ProjectSchemaSnafu)?,
Expand All @@ -290,17 +296,19 @@ impl StatementExecutor {
.build_file_stream(
JsonOpener::new(
DEFAULT_BATCH_SIZE,
projected_file_schema,
projected_file_schema.clone(),
format.compression_type.into(),
Arc::new(store),
),
path,
schema.clone(),
projected_file_schema,
)
.await?;

Ok(Box::pin(
RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection))
// The projection is already applied in the JSON reader when we created the stream,
// so we pass None here to avoid double projection which would cause schema mismatch errors.
RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
.with_filter(filters)
.context(error::PhysicalExprSnafu)?,
))
Expand All @@ -325,13 +333,13 @@ impl StatementExecutor {
.build()
.context(error::BuildParquetRecordBatchStreamSnafu)?;

let projected_schema = Arc::new(
let output_schema = Arc::new(
compat_schema
.project(&projection)
.context(error::ProjectSchemaSnafu)?,
);
Ok(Box::pin(
RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection))
RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
.with_filter(filters)
.context(error::PhysicalExprSnafu)?,
))
Expand All @@ -352,14 +360,14 @@ impl StatementExecutor {
.await
.context(error::ReadOrcSnafu)?;

let projected_schema = Arc::new(
let output_schema = Arc::new(
compat_schema
.project(&projection)
.context(error::ProjectSchemaSnafu)?,
);

Ok(Box::pin(
RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection))
RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
.with_filter(filters)
.context(error::PhysicalExprSnafu)?,
))
Expand Down
44 changes: 44 additions & 0 deletions tests/cases/standalone/common/copy/copy_from_fs_csv.result
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,42 @@ select * from with_pattern order by ts;
| host3 | 99.9 | 444.4 | 2024-07-27T10:47:43 |
+-------+------+--------+---------------------+

CREATE TABLE demo_with_external_column(host string, cpu double, memory double, ts timestamp time index, external_column string default 'default_value');

Affected Rows: 0

Copy demo_with_external_column FROM '${SQLNESS_HOME}/demo/export/csv/demo.csv' WITH (format='csv');

Affected Rows: 3

select * from demo_with_external_column order by ts;

+-------+------+--------+---------------------+-----------------+
| host | cpu | memory | ts | external_column |
+-------+------+--------+---------------------+-----------------+
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | default_value |
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | default_value |
| host3 | 99.9 | 444.4 | 2024-07-27T10:47:43 | default_value |
+-------+------+--------+---------------------+-----------------+

CREATE TABLE demo_with_less_columns(host string, memory double, ts timestamp time index);

Affected Rows: 0

Copy demo_with_less_columns FROM '${SQLNESS_HOME}/demo/export/csv/demo.csv' WITH (format='csv');

Affected Rows: 3

select * from demo_with_less_columns order by ts;

+-------+--------+---------------------+
| host | memory | ts |
+-------+--------+---------------------+
| host1 | 1024.0 | 2022-06-15T07:02:37 |
| host2 | 333.3 | 2022-06-15T07:02:38 |
| host3 | 444.4 | 2024-07-27T10:47:43 |
+-------+--------+---------------------+

drop table demo;

Affected Rows: 0
Expand All @@ -82,3 +118,11 @@ drop table with_pattern;

Affected Rows: 0

drop table demo_with_external_column;

Affected Rows: 0

drop table demo_with_less_columns;

Affected Rows: 0

16 changes: 16 additions & 0 deletions tests/cases/standalone/common/copy/copy_from_fs_csv.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,26 @@ Copy with_pattern FROM '${SQLNESS_HOME}/demo/export/csv/' WITH (pattern = 'demo.

select * from with_pattern order by ts;

CREATE TABLE demo_with_external_column(host string, cpu double, memory double, ts timestamp time index, external_column string default 'default_value');

Copy demo_with_external_column FROM '${SQLNESS_HOME}/demo/export/csv/demo.csv' WITH (format='csv');

select * from demo_with_external_column order by ts;

CREATE TABLE demo_with_less_columns(host string, memory double, ts timestamp time index);

Copy demo_with_less_columns FROM '${SQLNESS_HOME}/demo/export/csv/demo.csv' WITH (format='csv');

select * from demo_with_less_columns order by ts;

drop table demo;

drop table with_filename;

drop table with_path;

drop table with_pattern;

drop table demo_with_external_column;

drop table demo_with_less_columns;
44 changes: 44 additions & 0 deletions tests/cases/standalone/common/copy/copy_from_fs_json.result
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,42 @@ select * from with_pattern order by ts;
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
+-------+------+--------+---------------------+

CREATE TABLE demo_with_external_column(host string, cpu double, memory double, ts timestamp time index, external_column string default 'default_value');

Affected Rows: 0

Copy demo_with_external_column FROM '${SQLNESS_HOME}/demo/export/json/demo.json' WITH (format='json');

Affected Rows: 3

select * from demo_with_external_column order by ts;

+-------+------+--------+---------------------+-----------------+
| host | cpu | memory | ts | external_column |
+-------+------+--------+---------------------+-----------------+
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | default_value |
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | default_value |
| host3 | 99.9 | 444.4 | 2024-07-27T10:47:43 | default_value |
+-------+------+--------+---------------------+-----------------+

CREATE TABLE demo_with_less_columns(host string, memory double, ts timestamp time index);

Affected Rows: 0

Copy demo_with_less_columns FROM '${SQLNESS_HOME}/demo/export/json/demo.json' WITH (format='json');

Affected Rows: 3

select * from demo_with_less_columns order by ts;

+-------+--------+---------------------+
| host | memory | ts |
+-------+--------+---------------------+
| host1 | 1024.0 | 2022-06-15T07:02:37 |
| host2 | 333.3 | 2022-06-15T07:02:38 |
| host3 | 444.4 | 2024-07-27T10:47:43 |
+-------+--------+---------------------+

drop table demo;

Affected Rows: 0
Expand All @@ -82,3 +118,11 @@ drop table with_pattern;

Affected Rows: 0

drop table demo_with_external_column;

Affected Rows: 0

drop table demo_with_less_columns;

Affected Rows: 0

16 changes: 16 additions & 0 deletions tests/cases/standalone/common/copy/copy_from_fs_json.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,26 @@ Copy with_pattern FROM '${SQLNESS_HOME}/demo/export/json/' WITH (pattern = 'demo

select * from with_pattern order by ts;

CREATE TABLE demo_with_external_column(host string, cpu double, memory double, ts timestamp time index, external_column string default 'default_value');

Copy demo_with_external_column FROM '${SQLNESS_HOME}/demo/export/json/demo.json' WITH (format='json');

select * from demo_with_external_column order by ts;

CREATE TABLE demo_with_less_columns(host string, memory double, ts timestamp time index);

Copy demo_with_less_columns FROM '${SQLNESS_HOME}/demo/export/json/demo.json' WITH (format='json');

select * from demo_with_less_columns order by ts;

drop table demo;

drop table with_filename;

drop table with_path;

drop table with_pattern;

drop table demo_with_external_column;

drop table demo_with_less_columns;
44 changes: 44 additions & 0 deletions tests/cases/standalone/common/copy/copy_from_fs_parquet.result
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,42 @@ Copy with_limit_rows_segment FROM '${SQLNESS_HOME}/demo/export/parquet_files/' L

Error: 2000(InvalidSyntax), Unexpected token while parsing SQL statement, expected: 'the number of maximum rows', found: ;: sql parser error: Expected: literal int, found: hello at Line: 1, Column: 86

CREATE TABLE demo_with_external_column(host string, cpu double, memory double, ts timestamp time index, external_column string default 'default_value');

Affected Rows: 0

Copy demo_with_external_column FROM '${SQLNESS_HOME}/demo/export/parquet_files/demo.parquet';

Affected Rows: 3

select * from demo_with_external_column order by ts;

+-------+-------+--------+---------------------+-----------------+
| host | cpu | memory | ts | external_column |
+-------+-------+--------+---------------------+-----------------+
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | default_value |
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | default_value |
| host3 | 111.1 | 444.4 | 2024-07-27T10:47:43 | default_value |
+-------+-------+--------+---------------------+-----------------+

CREATE TABLE demo_with_less_columns(host string, memory double, ts timestamp time index);

Affected Rows: 0

Copy demo_with_less_columns FROM '${SQLNESS_HOME}/demo/export/parquet_files/demo.parquet';

Affected Rows: 3

select * from demo_with_less_columns order by ts;

+-------+--------+---------------------+
| host | memory | ts |
+-------+--------+---------------------+
| host1 | 1024.0 | 2022-06-15T07:02:37 |
| host2 | 333.3 | 2022-06-15T07:02:38 |
| host3 | 444.4 | 2024-07-27T10:47:43 |
+-------+--------+---------------------+

drop table demo;

Affected Rows: 0
Expand Down Expand Up @@ -151,3 +187,11 @@ drop table with_limit_rows_segment;

Affected Rows: 0

drop table demo_with_external_column;

Affected Rows: 0

drop table demo_with_less_columns;

Affected Rows: 0

16 changes: 16 additions & 0 deletions tests/cases/standalone/common/copy/copy_from_fs_parquet.sql
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ select count(*) from with_limit_rows_segment;

Copy with_limit_rows_segment FROM '${SQLNESS_HOME}/demo/export/parquet_files/' LIMIT hello;

CREATE TABLE demo_with_external_column(host string, cpu double, memory double, ts timestamp time index, external_column string default 'default_value');

Copy demo_with_external_column FROM '${SQLNESS_HOME}/demo/export/parquet_files/demo.parquet';

select * from demo_with_external_column order by ts;

CREATE TABLE demo_with_less_columns(host string, memory double, ts timestamp time index);

Copy demo_with_less_columns FROM '${SQLNESS_HOME}/demo/export/parquet_files/demo.parquet';

select * from demo_with_less_columns order by ts;

drop table demo;

drop table demo_2;
Expand All @@ -65,3 +77,7 @@ drop table with_pattern;
drop table without_limit_rows;

drop table with_limit_rows_segment;

drop table demo_with_external_column;

drop table demo_with_less_columns;
Loading