-
Notifications
You must be signed in to change notification settings - Fork 141
Add codec support for column addition in schema changes #486
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
4ced6b2
d1e908d
c1d02a4
34906f1
9b69e24
bd8da87
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,181 @@ | ||||||
import os | ||||||
|
||||||
import pytest | ||||||
from dbt.tests.util import run_dbt, run_dbt_and_capture | ||||||
|
||||||
schema_change_with_codec_sql = """ | ||||||
{{ | ||||||
config( | ||||||
materialized='%s', | ||||||
unique_key='col_1', | ||||||
on_schema_change='%s' | ||||||
) | ||||||
}} | ||||||
|
||||||
{%% if not is_incremental() %%} | ||||||
select | ||||||
number as col_1, | ||||||
number + 1 as col_2 | ||||||
from numbers(3) | ||||||
{%% else %%} | ||||||
select | ||||||
number as col_1, | ||||||
number + 1 as col_2, | ||||||
number + 2 as col_3 | ||||||
from numbers(2, 3) | ||||||
{%% endif %%} | ||||||
""" | ||||||
|
||||||
|
||||||
schema_change_with_codec_yml = """ | ||||||
version: 2 | ||||||
models: | ||||||
- name: schema_change_codec_append | ||||||
columns: | ||||||
- name: col_1 | ||||||
data_type: UInt64 | ||||||
- name: col_2 | ||||||
data_type: UInt64 | ||||||
- name: col_3 | ||||||
data_type: UInt64 | ||||||
codec: ZSTD | ||||||
- name: schema_change_codec_distributed_append | ||||||
columns: | ||||||
- name: col_1 | ||||||
data_type: UInt64 | ||||||
- name: col_2 | ||||||
data_type: UInt64 | ||||||
- name: col_3 | ||||||
data_type: UInt64 | ||||||
codec: LZ4 | ||||||
""" | ||||||
|
||||||
|
||||||
class TestSchemaChangeWithCodec: | ||||||
@pytest.fixture(scope="class") | ||||||
def models(self): | ||||||
return { | ||||||
"schema_change_codec_append.sql": schema_change_with_codec_sql | ||||||
% ("incremental", "append_new_columns"), | ||||||
"schema_change_codec_distributed_append.sql": schema_change_with_codec_sql | ||||||
% ("distributed_incremental", "append_new_columns"), | ||||||
"schema.yml": schema_change_with_codec_yml, | ||||||
} | ||||||
|
||||||
@pytest.mark.parametrize( | ||||||
"model", ("schema_change_codec_append", "schema_change_codec_distributed_append") | ||||||
) | ||||||
def test_append_with_codec(self, project, model): | ||||||
if ( | ||||||
model == "schema_change_codec_distributed_append" | ||||||
and os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '' | ||||||
): | ||||||
pytest.skip("Not on a cluster") | ||||||
|
||||||
run_dbt(["run", "--select", model]) | ||||||
result = project.run_sql(f"select * from {model} order by col_1", fetch="all") | ||||||
assert len(result) == 3 | ||||||
assert result[0][1] == 1 | ||||||
|
||||||
run_dbt(["--debug", "run", "--select", model]) | ||||||
result = project.run_sql(f"select * from {model} order by col_1", fetch="all") | ||||||
|
||||||
assert all(len(row) == 3 for row in result) | ||||||
assert result[0][2] == 0 | ||||||
assert result[3][2] == 5 | ||||||
|
||||||
table_name = f"{project.test_schema}.{model}" | ||||||
create_table_sql = project.run_sql(f"SHOW CREATE TABLE {table_name}", fetch="one")[0] | ||||||
|
||||||
assert "CODEC" in create_table_sql | ||||||
if "distributed" in model: | ||||||
assert "LZ4" in create_table_sql | ||||||
else: | ||||||
assert "ZSTD" in create_table_sql | ||||||
|
||||||
|
||||||
sync_all_columns_with_codec_sql = """ | ||||||
{{ | ||||||
config( | ||||||
materialized='%s', | ||||||
unique_key='col_1', | ||||||
on_schema_change='sync_all_columns' | ||||||
) | ||||||
}} | ||||||
|
||||||
{%% if not is_incremental() %%} | ||||||
select | ||||||
toUInt8(number) as col_1, | ||||||
number + 1 as col_2 | ||||||
from numbers(3) | ||||||
{%% else %%} | ||||||
select | ||||||
toFloat32(number) as col_1, | ||||||
number + 2 as col_3 | ||||||
from numbers(2, 3) | ||||||
{%% endif %%} | ||||||
""" | ||||||
|
||||||
sync_all_columns_with_codec_yml = """ | ||||||
version: 2 | ||||||
models: | ||||||
- name: sync_codec_test | ||||||
columns: | ||||||
- name: col_1 | ||||||
data_type: Float32 | ||||||
- name: col_3 | ||||||
data_type: UInt64 | ||||||
codec: ZSTD | ||||||
- name: sync_codec_distributed_test | ||||||
columns: | ||||||
- name: col_1 | ||||||
data_type: Float32 | ||||||
- name: col_3 | ||||||
data_type: UInt64 | ||||||
codec: LZ4 | ||||||
""" | ||||||
|
||||||
|
||||||
class TestSyncAllColumnsWithCodec: | ||||||
@pytest.fixture(scope="class") | ||||||
def models(self): | ||||||
return { | ||||||
"sync_codec_test.sql": sync_all_columns_with_codec_sql % "incremental", | ||||||
"sync_codec_distributed_test.sql": sync_all_columns_with_codec_sql | ||||||
% "distributed_incremental", | ||||||
"schema.yml": sync_all_columns_with_codec_yml, | ||||||
} | ||||||
|
||||||
@pytest.mark.parametrize("model", ("sync_codec_test", "sync_codec_distributed_test")) | ||||||
def test_sync_all_columns_with_codec(self, project, model): | ||||||
if ( | ||||||
model == "sync_codec_distributed_test" | ||||||
and os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '' | ||||||
): | ||||||
pytest.skip("Not on a cluster") | ||||||
|
||||||
run_dbt(["run", "--select", model]) | ||||||
result = project.run_sql(f"select * from {model} order by col_1", fetch="all") | ||||||
assert len(result) == 3 | ||||||
assert result[0][1] == 1 | ||||||
|
||||||
run_dbt(["run", "--select", model]) | ||||||
result = project.run_sql(f"select * from {model} order by col_1", fetch="all") | ||||||
|
||||||
assert all(len(row) == 2 for row in result) | ||||||
assert result[0][1] == 0 | ||||||
assert result[3][1] == 5 | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Index out of bounds error. The test expects 4 rows (index 3), but line 161 asserts only 3 rows exist. Since sync_all_columns replaces the schema and the incremental run uses numbers(2, 3), there should only be 3 rows total, so this assertion will fail.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||
|
||||||
table_name = f"{project.test_schema}.{model}" | ||||||
create_table_sql = project.run_sql(f"SHOW CREATE TABLE {table_name}", fetch="one")[0] | ||||||
|
||||||
assert "CODEC" in create_table_sql | ||||||
if "distributed" in model: | ||||||
assert "LZ4" in create_table_sql | ||||||
else: | ||||||
assert "ZSTD" in create_table_sql | ||||||
|
||||||
result_types = project.run_sql( | ||||||
f"select toColumnTypeName(col_1) from {model} limit 1", fetch="one" | ||||||
) | ||||||
assert "Float32" in result_types[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Index out of bounds error. The test expects 4 rows (index 3), but line 74 asserts only 3 rows exist after the first run. After the second run with incremental data from numbers(2, 3), there should be 5 total rows, so the assertion should use index 4 instead of 3.
Copilot uses AI. Check for mistakes.