Skip to content

Commit 106b42f

Browse files
DeleteWithPrefix support for SQLite (#3265)
Signed-off-by: Ryan Lettieri <[email protected]> Signed-off-by: Ryan Lettieri <[email protected]> Signed-off-by: Alessandro (Ale) Segala <[email protected]> Signed-off-by: ItalyPaleAle <[email protected]> Co-authored-by: Alessandro (Ale) Segala <[email protected]>
1 parent 2fcfc1a commit 106b42f

File tree

12 files changed

+226
-2
lines changed

12 files changed

+226
-2
lines changed

state/feature.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ const (
2626
FeatureQueryAPI Feature = "QUERY_API"
2727
// FeatureTTL is the feature that supports TTLs.
2828
FeatureTTL Feature = "TTL"
29+
// FeatureDeleteWithPrefix is the feature that supports deleting with prefix.
30+
FeatureDeleteWithPrefix Feature = "DELETE_WITH_PREFIX"
2931
)
3032

3133
// Feature names a feature that can be implemented by state store components.

state/requests.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ limitations under the License.
1414
package state
1515

1616
import (
17+
"fmt"
18+
"strings"
19+
1720
"github.com/dapr/components-contrib/state/query"
1821
)
1922

@@ -67,6 +70,21 @@ func (r DeleteRequest) Operation() OperationType {
6770
return OperationDelete
6871
}
6972

73+
// DeleteWithPrefixRequest is the object describing a delete with prefix state request used for deleting actors.
74+
type DeleteWithPrefixRequest struct {
75+
Prefix string `json:"prefix"`
76+
}
77+
78+
func (r *DeleteWithPrefixRequest) Validate() error {
79+
if r.Prefix == "" {
80+
return fmt.Errorf("a prefix is required for deleteWithPrefix request")
81+
}
82+
if !strings.HasSuffix(r.Prefix, "||") {
83+
r.Prefix += "||"
84+
}
85+
return nil
86+
}
87+
7088
// DeleteStateOption controls how a state store reacts to a delete request.
7189
type DeleteStateOption struct {
7290
Concurrency string `json:"concurrency,omitempty"` // "concurrency"

state/responses.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,8 @@ type QueryItem struct {
5252
Error string `json:"error,omitempty"`
5353
ContentType *string `json:"contentType,omitempty"`
5454
}
55+
56+
// DeleteWithPrefixResponse is the object representing a delete with prefix state response containing the number of items removed.
57+
type DeleteWithPrefixResponse struct {
58+
Count int64 `json:"count"` // count of items removed
59+
}

state/sqlite/sqlite.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func newSQLiteStateStore(logger logger.Logger, dba DBAccess) *SQLiteStore {
4949
state.FeatureETag,
5050
state.FeatureTransactional,
5151
state.FeatureTTL,
52+
state.FeatureDeleteWithPrefix,
5253
},
5354
dbaccess: dba,
5455
}
@@ -100,6 +101,11 @@ func (s *SQLiteStore) Multi(ctx context.Context, request *state.TransactionalSta
100101
return s.dbaccess.ExecuteMulti(ctx, request.Operations)
101102
}
102103

104+
// DeleteWithPrefix deletes an actor's state
105+
func (s *SQLiteStore) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) {
106+
return s.dbaccess.DeleteWithPrefix(ctx, req)
107+
}
108+
103109
// Close implements io.Closer.
104110
func (s *SQLiteStore) Close() error {
105111
if s.dbaccess != nil {

state/sqlite/sqlite_dbaccess.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type DBAccess interface {
4545
Delete(ctx context.Context, req *state.DeleteRequest) error
4646
BulkGet(ctx context.Context, req []state.GetRequest) ([]state.BulkGetResponse, error)
4747
ExecuteMulti(ctx context.Context, reqs []state.TransactionalStateOperation) error
48+
DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error)
4849
Close() error
4950
}
5051

@@ -78,6 +79,7 @@ func (a *sqliteDBAccess) Init(ctx context.Context, md state.Metadata) error {
7879
return err
7980
}
8081

82+
registerFuntions()
8183
connString, err := a.metadata.GetConnectionString(a.logger, sqlite.GetConnectionStringOpts{})
8284
if err != nil {
8385
// Already logged
@@ -418,6 +420,34 @@ func (a *sqliteDBAccess) Delete(ctx context.Context, req *state.DeleteRequest) e
418420
return a.doDelete(ctx, a.db, req)
419421
}
420422

423+
func (a *sqliteDBAccess) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) {
424+
if req.Prefix == "" {
425+
return state.DeleteWithPrefixResponse{}, fmt.Errorf("missing prefix in delete with prefix operation")
426+
}
427+
ctx, cancel := context.WithTimeout(ctx, a.metadata.Timeout)
428+
defer cancel()
429+
430+
err := req.Validate()
431+
if err != nil {
432+
return state.DeleteWithPrefixResponse{}, err
433+
}
434+
435+
// Concatenation is required for table name because sql.DB does not substitute parameters for table names.
436+
//nolint:gosec
437+
result, err := a.db.ExecContext(ctx, "DELETE FROM "+a.metadata.TableName+" WHERE prefix = ?",
438+
req.Prefix)
439+
if err != nil {
440+
return state.DeleteWithPrefixResponse{}, err
441+
}
442+
443+
rows, err := result.RowsAffected()
444+
if err != nil {
445+
return state.DeleteWithPrefixResponse{}, err
446+
}
447+
448+
return state.DeleteWithPrefixResponse{Count: rows}, nil
449+
}
450+
421451
func (a *sqliteDBAccess) ExecuteMulti(parentCtx context.Context, reqs []state.TransactionalStateOperation) error {
422452
tx, err := a.db.BeginTx(parentCtx, nil)
423453
if err != nil {

state/sqlite/sqlite_integration_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,10 @@ func TestSqliteIntegration(t *testing.T) {
140140
multiWithSetOnly(t, s)
141141
})
142142

143+
t.Run("Delete with Prefix (actor state)", func(t *testing.T) {
144+
testDeleteWithPrefix(t, s)
145+
})
146+
143147
t.Run("ttlExpireTime", func(t *testing.T) {
144148
getExpireTime(t, s)
145149
getBulkExpireTime(t, s)
@@ -610,6 +614,55 @@ func setItemWithNoKey(t *testing.T, s state.Store) {
610614
require.Error(t, err)
611615
}
612616

617+
func testDeleteWithPrefix(t *testing.T, s state.Store) {
618+
setReq1 := &state.SetRequest{
619+
Key: "mock-app-id||mock-actor-type||mock-actor-id||key0",
620+
}
621+
622+
setReq2 := &state.SetRequest{
623+
Key: "mock-app-id||mock-actor-type||mock-actor-id||key1",
624+
}
625+
626+
setReq3 := &state.SetRequest{
627+
Key: "mock-app-id||mock-actor-type||mock-actor-id||key2",
628+
}
629+
630+
setReq4 := &state.SetRequest{
631+
Key: "different-app-id||different-actor-type||different-actor-id||key0",
632+
}
633+
634+
delReq := state.DeleteWithPrefixRequest{
635+
Prefix: "mock-app-id||mock-actor-type||mock-actor-id",
636+
}
637+
638+
err := s.Set(context.Background(), setReq1)
639+
require.NoError(t, err)
640+
641+
err = s.Set(context.Background(), setReq2)
642+
require.NoError(t, err)
643+
644+
err = s.Set(context.Background(), setReq3)
645+
require.NoError(t, err)
646+
647+
err = s.Set(context.Background(), setReq4)
648+
require.NoError(t, err)
649+
650+
res, err := s.(state.DeleteWithPrefix).DeleteWithPrefix(context.Background(), delReq)
651+
require.NoError(t, err)
652+
assert.Equal(t, int64(3), res.Count)
653+
654+
delReq = state.DeleteWithPrefixRequest{
655+
Prefix: "different-app-id||different-actor-type||different-actor-id||",
656+
}
657+
res, err = s.(state.DeleteWithPrefix).DeleteWithPrefix(context.Background(), delReq)
658+
require.NoError(t, err)
659+
assert.Equal(t, int64(1), res.Count)
660+
661+
res, err = s.(state.DeleteWithPrefix).DeleteWithPrefix(context.Background(), delReq)
662+
require.NoError(t, err)
663+
assert.Equal(t, int64(0), res.Count)
664+
}
665+
613666
func testSetItemWithInvalidTTL(t *testing.T, s state.Store) {
614667
setReq := &state.SetRequest{
615668
Key: randomKey(),

state/sqlite/sqlite_migrations.go

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,47 @@ package sqlite
1616
import (
1717
"context"
1818
"database/sql"
19+
"database/sql/driver"
1920
"fmt"
21+
"strings"
2022

2123
commonsql "github.com/dapr/components-contrib/common/component/sql"
2224
sqlitemigrations "github.com/dapr/components-contrib/common/component/sql/migrations/sqlite"
2325
"github.com/dapr/kit/logger"
26+
27+
sqlite3 "modernc.org/sqlite"
2428
)
2529

2630
type migrationOptions struct {
2731
StateTableName string
2832
MetadataTableName string
2933
}
3034

35+
func registerFuntions() {
36+
sqlite3.RegisterDeterministicScalarFunction(
37+
"parse_key_prefix",
38+
1,
39+
func(ctx *sqlite3.FunctionContext, args []driver.Value) (driver.Value, error) {
40+
var s1 string
41+
switch arg0 := args[0].(type) {
42+
case string:
43+
s1 = arg0
44+
default:
45+
return "", fmt.Errorf("expected argv[0] to be text")
46+
}
47+
if len(s1) == 0 {
48+
return "", fmt.Errorf("cannot create prefix for empty string")
49+
}
50+
51+
lastIndex := strings.LastIndex(s1, "||")
52+
if lastIndex != -1 {
53+
return s1[:lastIndex+2], nil
54+
}
55+
return "", nil
56+
},
57+
)
58+
}
59+
3160
// Perform the required migrations
3261
func performMigrations(ctx context.Context, db *sql.DB, logger logger.Logger, opts migrationOptions) error {
3362
m := sqlitemigrations.Migrations{
@@ -61,5 +90,23 @@ func performMigrations(ctx context.Context, db *sql.DB, logger logger.Logger, op
6190
}
6291
return nil
6392
},
64-
})
93+
// Migration 1: add the "prefix" column
94+
func(ctx context.Context) error {
95+
// Add the "prefix" column that can be used by DeleteWithPrefix
96+
logger.Infof("Adding 'prefix' column to table '%s'", opts.StateTableName)
97+
_, err := m.GetConn().ExecContext(
98+
ctx,
99+
fmt.Sprintf(
100+
`ALTER TABLE %[1]s ADD COLUMN prefix TEXT GENERATED ALWAYS AS (parse_key_prefix(key)) VIRTUAL;
101+
CREATE INDEX %[1]s_prefix_index ON %[1]s(prefix) WHERE prefix != ""`,
102+
opts.StateTableName,
103+
),
104+
)
105+
if err != nil {
106+
return fmt.Errorf("failed to create virtual column: %w", err)
107+
}
108+
return nil
109+
},
110+
},
111+
)
65112
}

state/sqlite/sqlite_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,15 @@ func TestValidMultiDeleteRequest(t *testing.T) {
294294
require.NoError(t, err)
295295
}
296296

297+
func TestValidEmptyDeleteWithPrefixRequest(t *testing.T) {
298+
t.Parallel()
299+
300+
ods := createSqlite(t)
301+
res, err := ods.DeleteWithPrefix(context.Background(), createDeleteWithPrefixRequest())
302+
require.NoError(t, err)
303+
assert.Equal(t, int64(0), res.Count)
304+
}
305+
297306
// Proves that the Ping method runs the ping method.
298307
func TestPingRunsDBAccessPing(t *testing.T) {
299308
t.Parallel()
@@ -338,6 +347,10 @@ func (m *fakeDBaccess) BulkGet(parentCtx context.Context, req []state.GetRequest
338347
return nil, nil
339348
}
340349

350+
func (m *fakeDBaccess) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) {
351+
return state.DeleteWithPrefixResponse{}, nil
352+
}
353+
341354
func (m *fakeDBaccess) Delete(ctx context.Context, req *state.DeleteRequest) error {
342355
return nil
343356
}
@@ -389,6 +402,12 @@ func createDeleteRequest() state.DeleteRequest {
389402
}
390403
}
391404

405+
func createDeleteWithPrefixRequest() state.DeleteWithPrefixRequest {
406+
return state.DeleteWithPrefixRequest{
407+
Prefix: randomKey(),
408+
}
409+
}
410+
392411
func createSqliteWithFake(t *testing.T) (*SQLiteStore, *fakeDBaccess) {
393412
ods := createSqlite(t)
394413
fake := ods.dbaccess.(*fakeDBaccess)

state/store.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,8 @@ func Ping(ctx context.Context, store Store) error {
6161
return errors.New("ping is not implemented by this state store")
6262
}
6363
}
64+
65+
// DeleteWithPrefix is an interface to delete objects with a prefix.
66+
type DeleteWithPrefix interface {
67+
DeleteWithPrefix(ctx context.Context, req DeleteWithPrefixRequest) (DeleteWithPrefixResponse, error)
68+
}
4 KB
Binary file not shown.

0 commit comments

Comments
 (0)