Skip to content

feat: append only tables for redshift #4596

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions warehouse/integrations/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/warehouse/client"
sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
Expand Down Expand Up @@ -153,6 +154,7 @@ type Redshift struct {
Warehouse model.Warehouse
Uploader warehouseutils.Uploader
connectTimeout time.Duration
conf *config.Config
logger logger.Logger
stats stats.Stats

Expand Down Expand Up @@ -194,6 +196,7 @@ type connectionCredentials struct {
func New(conf *config.Config, log logger.Logger, stat stats.Stats) *Redshift {
rs := &Redshift{}

rs.conf = conf
rs.logger = log.Child("integrations").Child("redshift")
rs.stats = stat

Expand Down Expand Up @@ -464,7 +467,7 @@ func (rs *Redshift) loadTable(
logfield.WorkspaceID, rs.Warehouse.WorkspaceID,
logfield.Namespace, rs.Namespace,
logfield.TableName, tableName,
logfield.ShouldMerge, rs.shouldMerge(tableName),
logfield.ShouldMerge, rs.ShouldMerge(tableName),
)
log.Infow("started loading")

Expand Down Expand Up @@ -519,7 +522,7 @@ func (rs *Redshift) loadTable(
}

var rowsDeleted int64
if rs.shouldMerge(tableName) {
if rs.ShouldMerge(tableName) {
log.Infow("deleting from load table")
rowsDeleted, err = rs.deleteFromLoadTable(
ctx, txn, tableName,
Expand Down Expand Up @@ -728,7 +731,7 @@ func (rs *Redshift) loadUserTables(ctx context.Context) map[string]error {
logfield.DestinationType, rs.Warehouse.Destination.DestinationDefinition.Name,
logfield.WorkspaceID, rs.Warehouse.WorkspaceID,
logfield.Namespace, rs.Namespace,
logfield.ShouldMerge, !rs.config.skipComputingUserLatestTraits || rs.shouldMerge(warehouseutils.UsersTable),
logfield.ShouldMerge, !rs.config.skipComputingUserLatestTraits || rs.ShouldMerge(warehouseutils.UsersTable),
logfield.TableName, warehouseutils.UsersTable,
}
rs.logger.Infow("started loading for identifies and users tables", logFields...)
Expand Down Expand Up @@ -1342,7 +1345,7 @@ func (rs *Redshift) SetConnectionTimeout(timeout time.Duration) {
rs.connectTimeout = timeout
}

func (rs *Redshift) shouldMerge(tableName string) bool {
func (rs *Redshift) ShouldMerge(tableName string) bool {
if !rs.config.allowMerge {
return false
}
Expand All @@ -1352,6 +1355,13 @@ func (rs *Redshift) shouldMerge(tableName string) bool {
// backwards compatibility.
return !slices.Contains(rs.config.skipDedupDestinationIDs, rs.Warehouse.Destination.ID)
}

configKey := "Warehouse.redshift.appendOnlyTables." + rs.Warehouse.Destination.ID
appendOnlyTables := rs.conf.GetStringSlice(configKey, nil)
if slices.Contains(appendOnlyTables, tableName) {
return false
}

// It's important to check the ability to append after skipDedup to make sure that if both
// skipDedupDestinationIDs and skipComputingUserLatestTraits are set, we still merge.
// see hyperverge user table use case for more details.
Expand Down
108 changes: 108 additions & 0 deletions warehouse/integrations/redshift/redshift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/stats"
kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/runner"
th "github.com/rudderlabs/rudder-server/testhelper"
Expand Down Expand Up @@ -1041,6 +1042,113 @@ func TestIntegration(t *testing.T) {
})
}

func TestRedshift_ShouldMerge(t *testing.T) {
testCases := []struct {
name string
preferAppend bool
tableName string
appendOnlyTables []string
uploaderCanAppend bool
expected bool
}{
{
name: "uploader says we can append and user prefers append",
preferAppend: true,
uploaderCanAppend: true,
tableName: "tracks",
expected: false,
},
{
name: "uploader says we cannot append and user prefers append",
preferAppend: true,
uploaderCanAppend: false,
tableName: "tracks",
expected: true,
},
{
name: "uploader says we can append and user prefers not to append",
preferAppend: false,
uploaderCanAppend: true,
tableName: "tracks",
expected: true,
},
{
name: "uploader says we cannot append and user prefers not to append",
preferAppend: false,
uploaderCanAppend: false,
tableName: "tracks",
expected: true,
},
{
name: "uploader says we can append, in merge mode, but table is in append only",
preferAppend: false,
uploaderCanAppend: true,
tableName: "tracks",
appendOnlyTables: []string{"tracks"},
expected: false,
},
{
name: "uploader says we can append, in append mode, but table is in append only",
preferAppend: true,
uploaderCanAppend: true,
tableName: "tracks",
appendOnlyTables: []string{"tracks"},
expected: false,
},
{
name: "uploader says we can append, in append mode, but table is not in append only",
preferAppend: true,
uploaderCanAppend: true,
tableName: "page_views",
appendOnlyTables: []string{"tracks"},
expected: false,
},
{
name: "uploader says we cannot append, in merge mode, but table is in append only",
preferAppend: false,
uploaderCanAppend: false,
tableName: "tracks",
appendOnlyTables: []string{"tracks"},
expected: false,
},
{
name: "uploader says we can append, in merge mode, but table is not in append only",
preferAppend: false,
uploaderCanAppend: true,
tableName: "page_views",
appendOnlyTables: []string{"tracks"},
expected: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
destID := "test_destination_id"

c := config.New()
c.Set("Warehouse.redshift.appendOnlyTables."+destID, tc.appendOnlyTables)

rs := redshift.New(c, logger.NOP, stats.NOP)

rs.Warehouse = model.Warehouse{
Destination: backendconfig.DestinationT{
ID: destID,
Config: map[string]any{
model.PreferAppendSetting.String(): tc.preferAppend,
},
},
}

mockCtrl := gomock.NewController(t)
uploader := mockuploader.NewMockUploader(mockCtrl)
uploader.EXPECT().CanAppend().AnyTimes().Return(tc.uploaderCanAppend)

rs.Uploader = uploader
require.Equal(t, rs.ShouldMerge(tc.tableName), tc.expected)
})
}
}

func TestCheckAndIgnoreColumnAlreadyExistError(t *testing.T) {
testCases := []struct {
name string
Expand Down
1 change: 1 addition & 0 deletions warehouse/integrations/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/warehouse/client"
sqlmw "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
Expand Down
Loading