Skip to content

Commit 8fd15ff

Browse files
committed
fix: return only the latest namespace entry when fetching tables from warehouse
1 parent af82a6f commit 8fd15ff

File tree

4 files changed

+46
-4
lines changed

4 files changed

+46
-4
lines changed

warehouse/api/http.go

-2
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,6 @@ func (a *Api) addMasterEndpoints(ctx context.Context, r chi.Router) {
172172

173173
r.Post("/jobs", a.logMiddleware(a.sourceManager.InsertJobHandler)) // TODO: add degraded mode
174174
r.Get("/jobs/status", a.logMiddleware(a.sourceManager.StatusJobHandler)) // TODO: add degraded mode
175-
176-
r.Get("/fetch-tables", a.logMiddleware(a.fetchTablesHandler)) // TODO: Remove this endpoint once sources change is released
177175
})
178176
})
179177
r.Route("/internal", func(r chi.Router) {

warehouse/api/http_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -862,7 +862,6 @@ func TestHTTPApi(t *testing.T) {
862862

863863
t.Run("fetch tables", func(t *testing.T) {
864864
for _, u := range []string{
865-
fmt.Sprintf("%s/v1/warehouse/fetch-tables", serverURL),
866865
fmt.Sprintf("%s/internal/v1/warehouse/fetch-tables", serverURL),
867866
} {
868867
req, err := http.NewRequest(http.MethodGet, u, bytes.NewReader([]byte(`

warehouse/internal/repo/schema.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ func (sh *WHSchema) GetTablesForConnection(ctx context.Context, connections []wa
216216
(source_id, destination_id) IN (` + strings.Join(sourceIDDestinationIDPairs, ", ") + `)
217217
AND
218218
schema::text <> '{}'::text
219-
GROUP BY id
219+
GROUP BY source_id, destination_id
220220
)`
221221
rows, err := sh.db.QueryContext(
222222
ctx,

warehouse/internal/repo/schema_test.go

+45
Original file line numberDiff line numberDiff line change
@@ -142,5 +142,50 @@ func TestWHSchemasRepo(t *testing.T) {
142142
t.Log("empty")
143143
_, err = r.GetTablesForConnection(ctx, []warehouseutils.SourceIDDestinationID{})
144144
require.EqualError(t, err, errors.New("no source id and destination id pairs provided").Error())
145+
146+
t.Log("multiple")
147+
latestNamespace := "latest_namespace"
148+
sourceID2 := "source_id_2"
149+
destinationID2 := "destination_id_2"
150+
connection2 := warehouseutils.SourceIDDestinationID{SourceID: sourceID2, DestinationID: destinationID2}
151+
schemaLatest := model.WHSchema{
152+
UploadID: 2,
153+
SourceID: sourceID,
154+
Namespace: latestNamespace,
155+
DestinationID: destinationID,
156+
DestinationType: destinationType,
157+
Schema: schemaModel,
158+
CreatedAt: now,
159+
UpdatedAt: now,
160+
}
161+
schema2 := model.WHSchema{
162+
UploadID: 3,
163+
SourceID: sourceID2,
164+
Namespace: namespace,
165+
DestinationID: destinationID2,
166+
DestinationType: destinationType,
167+
Schema: schemaModel,
168+
CreatedAt: now,
169+
UpdatedAt: now,
170+
}
171+
_, err = r.Insert(ctx, &schemaLatest)
172+
require.NoError(t, err)
173+
_, err = r.Insert(ctx, &schema2)
174+
require.NoError(t, err)
175+
expectedTableNames, err = r.GetTablesForConnection(ctx, []warehouseutils.SourceIDDestinationID{connection, connection2})
176+
require.NoError(t, err)
177+
require.Equal(t, len(expectedTableNames), 2)
178+
require.Contains(t, expectedTableNames, warehouseutils.FetchTableInfo{
179+
SourceID: sourceID,
180+
DestinationID: destinationID,
181+
Namespace: latestNamespace,
182+
Tables: []string{"table_name_1", "table_name_2"},
183+
})
184+
require.Contains(t, expectedTableNames, warehouseutils.FetchTableInfo{
185+
SourceID: sourceID2,
186+
DestinationID: destinationID2,
187+
Namespace: namespace,
188+
Tables: []string{"table_name_1", "table_name_2"},
189+
})
145190
})
146191
}

0 commit comments

Comments
 (0)