Skip to content

Commit 9a89780

Browse files
authored
feat: append only tables for redshift (#4596)
1 parent f1a8b8c commit 9a89780

File tree

3 files changed

+123
-4
lines changed

3 files changed

+123
-4
lines changed

warehouse/integrations/redshift/redshift.go

+14-4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/rudderlabs/rudder-go-kit/filemanager"
2929
"github.com/rudderlabs/rudder-go-kit/logger"
3030
"github.com/rudderlabs/rudder-go-kit/stats"
31+
3132
"github.com/rudderlabs/rudder-server/utils/misc"
3233
"github.com/rudderlabs/rudder-server/warehouse/client"
3334
sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
@@ -153,6 +154,7 @@ type Redshift struct {
153154
Warehouse model.Warehouse
154155
Uploader warehouseutils.Uploader
155156
connectTimeout time.Duration
157+
conf *config.Config
156158
logger logger.Logger
157159
stats stats.Stats
158160

@@ -194,6 +196,7 @@ type connectionCredentials struct {
194196
func New(conf *config.Config, log logger.Logger, stat stats.Stats) *Redshift {
195197
rs := &Redshift{}
196198

199+
rs.conf = conf
197200
rs.logger = log.Child("integrations").Child("redshift")
198201
rs.stats = stat
199202

@@ -464,7 +467,7 @@ func (rs *Redshift) loadTable(
464467
logfield.WorkspaceID, rs.Warehouse.WorkspaceID,
465468
logfield.Namespace, rs.Namespace,
466469
logfield.TableName, tableName,
467-
logfield.ShouldMerge, rs.shouldMerge(tableName),
470+
logfield.ShouldMerge, rs.ShouldMerge(tableName),
468471
)
469472
log.Infow("started loading")
470473

@@ -519,7 +522,7 @@ func (rs *Redshift) loadTable(
519522
}
520523

521524
var rowsDeleted int64
522-
if rs.shouldMerge(tableName) {
525+
if rs.ShouldMerge(tableName) {
523526
log.Infow("deleting from load table")
524527
rowsDeleted, err = rs.deleteFromLoadTable(
525528
ctx, txn, tableName,
@@ -728,7 +731,7 @@ func (rs *Redshift) loadUserTables(ctx context.Context) map[string]error {
728731
logfield.DestinationType, rs.Warehouse.Destination.DestinationDefinition.Name,
729732
logfield.WorkspaceID, rs.Warehouse.WorkspaceID,
730733
logfield.Namespace, rs.Namespace,
731-
logfield.ShouldMerge, !rs.config.skipComputingUserLatestTraits || rs.shouldMerge(warehouseutils.UsersTable),
734+
logfield.ShouldMerge, !rs.config.skipComputingUserLatestTraits || rs.ShouldMerge(warehouseutils.UsersTable),
732735
logfield.TableName, warehouseutils.UsersTable,
733736
}
734737
rs.logger.Infow("started loading for identifies and users tables", logFields...)
@@ -1342,7 +1345,7 @@ func (rs *Redshift) SetConnectionTimeout(timeout time.Duration) {
13421345
rs.connectTimeout = timeout
13431346
}
13441347

1345-
func (rs *Redshift) shouldMerge(tableName string) bool {
1348+
func (rs *Redshift) ShouldMerge(tableName string) bool {
13461349
if !rs.config.allowMerge {
13471350
return false
13481351
}
@@ -1352,6 +1355,13 @@ func (rs *Redshift) shouldMerge(tableName string) bool {
13521355
// backwards compatibility.
13531356
return !slices.Contains(rs.config.skipDedupDestinationIDs, rs.Warehouse.Destination.ID)
13541357
}
1358+
1359+
configKey := "Warehouse.redshift.appendOnlyTables." + rs.Warehouse.Destination.ID
1360+
appendOnlyTables := rs.conf.GetStringSlice(configKey, nil)
1361+
if slices.Contains(appendOnlyTables, tableName) {
1362+
return false
1363+
}
1364+
13551365
// It's important to check the ability to append after skipDedup to make sure that if both
13561366
// skipDedupDestinationIDs and skipComputingUserLatestTraits are set, we still merge.
13571367
// see hyperverge user table use case for more details.

warehouse/integrations/redshift/redshift_test.go

+108
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/rudderlabs/rudder-go-kit/stats"
2727
kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
2828
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
29+
2930
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
3031
"github.com/rudderlabs/rudder-server/runner"
3132
th "github.com/rudderlabs/rudder-server/testhelper"
@@ -1041,6 +1042,113 @@ func TestIntegration(t *testing.T) {
10411042
})
10421043
}
10431044

1045+
func TestRedshift_ShouldMerge(t *testing.T) {
1046+
testCases := []struct {
1047+
name string
1048+
preferAppend bool
1049+
tableName string
1050+
appendOnlyTables []string
1051+
uploaderCanAppend bool
1052+
expected bool
1053+
}{
1054+
{
1055+
name: "uploader says we can append and user prefers append",
1056+
preferAppend: true,
1057+
uploaderCanAppend: true,
1058+
tableName: "tracks",
1059+
expected: false,
1060+
},
1061+
{
1062+
name: "uploader says we cannot append and user prefers append",
1063+
preferAppend: true,
1064+
uploaderCanAppend: false,
1065+
tableName: "tracks",
1066+
expected: true,
1067+
},
1068+
{
1069+
name: "uploader says we can append and user prefers not to append",
1070+
preferAppend: false,
1071+
uploaderCanAppend: true,
1072+
tableName: "tracks",
1073+
expected: true,
1074+
},
1075+
{
1076+
name: "uploader says we cannot append and user prefers not to append",
1077+
preferAppend: false,
1078+
uploaderCanAppend: false,
1079+
tableName: "tracks",
1080+
expected: true,
1081+
},
1082+
{
1083+
name: "uploader says we can append, in merge mode, but table is in append only",
1084+
preferAppend: false,
1085+
uploaderCanAppend: true,
1086+
tableName: "tracks",
1087+
appendOnlyTables: []string{"tracks"},
1088+
expected: false,
1089+
},
1090+
{
1091+
name: "uploader says we can append, in append mode, but table is in append only",
1092+
preferAppend: true,
1093+
uploaderCanAppend: true,
1094+
tableName: "tracks",
1095+
appendOnlyTables: []string{"tracks"},
1096+
expected: false,
1097+
},
1098+
{
1099+
name: "uploader says we can append, in append mode, but table is not in append only",
1100+
preferAppend: true,
1101+
uploaderCanAppend: true,
1102+
tableName: "page_views",
1103+
appendOnlyTables: []string{"tracks"},
1104+
expected: false,
1105+
},
1106+
{
1107+
name: "uploader says we cannot append, in merge mode, but table is in append only",
1108+
preferAppend: false,
1109+
uploaderCanAppend: false,
1110+
tableName: "tracks",
1111+
appendOnlyTables: []string{"tracks"},
1112+
expected: false,
1113+
},
1114+
{
1115+
name: "uploader says we can append, in merge mode, but table is not in append only",
1116+
preferAppend: false,
1117+
uploaderCanAppend: true,
1118+
tableName: "page_views",
1119+
appendOnlyTables: []string{"tracks"},
1120+
expected: true,
1121+
},
1122+
}
1123+
1124+
for _, tc := range testCases {
1125+
t.Run(tc.name, func(t *testing.T) {
1126+
destID := "test_destination_id"
1127+
1128+
c := config.New()
1129+
c.Set("Warehouse.redshift.appendOnlyTables."+destID, tc.appendOnlyTables)
1130+
1131+
rs := redshift.New(c, logger.NOP, stats.NOP)
1132+
1133+
rs.Warehouse = model.Warehouse{
1134+
Destination: backendconfig.DestinationT{
1135+
ID: destID,
1136+
Config: map[string]any{
1137+
model.PreferAppendSetting.String(): tc.preferAppend,
1138+
},
1139+
},
1140+
}
1141+
1142+
mockCtrl := gomock.NewController(t)
1143+
uploader := mockuploader.NewMockUploader(mockCtrl)
1144+
uploader.EXPECT().CanAppend().AnyTimes().Return(tc.uploaderCanAppend)
1145+
1146+
rs.Uploader = uploader
1147+
require.Equal(t, rs.ShouldMerge(tc.tableName), tc.expected)
1148+
})
1149+
}
1150+
}
1151+
10441152
func TestCheckAndIgnoreColumnAlreadyExistError(t *testing.T) {
10451153
testCases := []struct {
10461154
name string

warehouse/integrations/snowflake/snowflake.go

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/rudderlabs/rudder-go-kit/config"
2121
"github.com/rudderlabs/rudder-go-kit/logger"
2222
"github.com/rudderlabs/rudder-go-kit/stats"
23+
2324
"github.com/rudderlabs/rudder-server/utils/misc"
2425
"github.com/rudderlabs/rudder-server/warehouse/client"
2526
sqlmw "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"

0 commit comments

Comments
 (0)