1
1
import {
2
2
SaveStateRequest ,
3
3
StateItem as StateItemPB ,
4
- Etag as ETagTB ,
4
+ Etag ,
5
5
StateOptions as StateOptionsPB ,
6
6
GetStateRequest ,
7
7
GetBulkStateRequest ,
8
+ DeleteStateRequest ,
9
+ DeleteBulkStateRequest ,
10
+ ExecuteStateTransactionRequest ,
11
+ TransactionalStateOperation ,
8
12
} from '../proto/runtime_pb' ;
9
- import API from './API' ;
10
-
11
- export type StateConcurrency = StateOptionsPB . StateConcurrency ;
12
- export type StateConsistency = StateOptionsPB . StateConsistency ;
13
-
14
- type StateOptions = {
15
- concurrency : StateConcurrency ;
16
- consistency : StateConsistency ;
17
- } ;
18
-
19
- type Metadata = {
20
- [ key : string ] : string ;
21
- } ;
22
-
23
- type StateItem = {
24
- key : string ;
25
- value : Uint8Array | string ;
26
- etag ?: string ;
27
- metadata ?: Metadata ;
28
- options ?: StateOptions ;
29
- }
30
-
31
- type BulkResponseStateItem = {
32
- key : string ;
33
- value : Uint8Array ;
34
- etag : string ;
35
- }
13
+ import { API , RequestMetadata } from './API' ;
14
+ import {
15
+ DeleteStateItem ,
16
+ ResponseStateItem ,
17
+ StateItem ,
18
+ StateOperation ,
19
+ StateOptions ,
20
+ } from './types/State' ;
36
21
37
22
export default class State extends API {
38
23
// Saves an array of state objects
39
- async save ( storeName : string , states : StateItem [ ] | StateItem ) : Promise < void > {
40
- const stateList : StateItemPB [ ] = [ ] ;
24
+ async save ( storeName : string , states : StateItem [ ] | StateItem , meta ?: RequestMetadata ) : Promise < void > {
41
25
if ( ! Array . isArray ( states ) ) {
42
26
states = [ states ] ;
43
27
}
44
- for ( const item of states ) {
45
- const stateItem = new StateItemPB ( ) ;
46
- stateItem . setKey ( item . key ) ;
47
- if ( typeof item . value === 'string' ) {
48
- stateItem . setValue ( Buffer . from ( item . value , 'utf8' ) ) ;
49
- } else {
50
- stateItem . setValue ( item . value ) ;
51
- }
52
- if ( item . etag !== undefined ) {
53
- const etag = new ETagTB ( ) ;
54
- etag . setValue ( item . etag ) ;
55
- stateItem . setEtag ( etag ) ;
56
- }
57
- if ( item . options !== undefined ) {
58
- const options = new StateOptionsPB ( ) ;
59
- options . setConcurrency ( item . options . concurrency ) ;
60
- }
61
- stateList . push ( stateItem ) ;
62
- }
63
-
28
+ const stateList = this . createStateItemPBList ( states ) ;
64
29
const req = new SaveStateRequest ( ) ;
65
30
req . setStoreName ( storeName ) ;
66
31
req . setStatesList ( stateList ) ;
67
32
68
33
return new Promise ( ( resolve , reject ) => {
69
- this . runtime . saveState ( req , ( err ) => {
34
+ this . runtime . saveState ( req , this . createMetadata ( meta ) , ( err ) => {
70
35
if ( err ) return reject ( err ) ;
71
36
resolve ( ) ;
72
37
} ) ;
73
38
} ) ;
74
39
}
75
40
76
41
// Gets the state for a specific key
77
- async get ( storeName : string , key : string ) : Promise < Uint8Array > {
42
+ async get ( storeName : string , key : string , meta ?: RequestMetadata ) : Promise < ResponseStateItem | null > {
78
43
const req = new GetStateRequest ( ) ;
79
44
req . setStoreName ( storeName ) ;
80
45
req . setKey ( key ) ;
81
46
82
47
return new Promise ( ( resolve , reject ) => {
83
- this . runtime . getState ( req , ( err , res ) => {
48
+ this . runtime . getState ( req , this . createMetadata ( meta ) , ( err , res ) => {
84
49
if ( err ) return reject ( err ) ;
85
- resolve ( res . getData_asU8 ( ) ) ;
50
+ if ( this . isEmpty ( res ) ) {
51
+ return resolve ( null ) ;
52
+ }
53
+ resolve ( {
54
+ key,
55
+ value : res . getData_asU8 ( ) ,
56
+ etag : res . getEtag ( ) ,
57
+ } ) ;
86
58
} ) ;
87
59
} ) ;
88
60
}
89
61
90
62
// Gets a bulk of state items for a list of keys
91
- async getBulk ( storeName : string , keys : string [ ] , parallelism = 10 ) : Promise < BulkResponseStateItem [ ] > {
63
+ async getBulk ( storeName : string , keys : string [ ] , parallelism = 10 , meta ?: RequestMetadata ) : Promise < ResponseStateItem [ ] > {
92
64
const req = new GetBulkStateRequest ( ) ;
93
65
req . setStoreName ( storeName ) ;
94
66
req . setKeysList ( keys ) ;
95
67
req . setParallelism ( parallelism ) ;
96
68
97
69
return new Promise ( ( resolve , reject ) => {
98
- this . runtime . getBulkState ( req , ( err , res ) => {
70
+ this . runtime . getBulkState ( req , this . createMetadata ( meta ) , ( err , res ) => {
99
71
if ( err ) return reject ( err ) ;
100
- const states : BulkResponseStateItem [ ] = [ ] ;
72
+ const states : ResponseStateItem [ ] = [ ] ;
101
73
const itemsList = res . getItemsList ( ) ;
102
74
for ( const item of itemsList ) {
75
+ if ( this . isEmpty ( item ) ) {
76
+ continue ;
77
+ }
103
78
states . push ( {
104
79
key : item . getKey ( ) ,
105
80
value : item . getData_asU8 ( ) ,
106
81
etag : item . getEtag ( ) ,
82
+ // metadata: item.getMetadataMap(),
107
83
} ) ;
108
84
}
109
85
resolve ( states ) ;
@@ -112,12 +88,100 @@ export default class State extends API {
112
88
}
113
89
114
90
// Deletes the state for a specific key
115
- async delete ( ) {
91
+ async delete ( storeName : string , key : string , etag = '' , options ?: StateOptions , meta ?: RequestMetadata ) : Promise < void > {
92
+ const req = new DeleteStateRequest ( ) ;
93
+ req . setStoreName ( storeName ) ;
94
+ req . setKey ( key ) ;
95
+ if ( etag ) {
96
+ const etagInstance = new Etag ( ) ;
97
+ etagInstance . setValue ( etag ) ;
98
+ req . setEtag ( etagInstance ) ;
99
+ }
100
+ if ( options ) {
101
+ const optionsInstance = new StateOptionsPB ( ) ;
102
+ optionsInstance . setConcurrency ( options . concurrency ) ;
103
+ optionsInstance . setConsistency ( options . consistency ) ;
104
+ req . setOptions ( optionsInstance ) ;
105
+ }
116
106
107
+ return new Promise ( ( resolve , reject ) => {
108
+ this . runtime . deleteState ( req , this . createMetadata ( meta ) , ( err ) => {
109
+ if ( err ) return reject ( err ) ;
110
+ resolve ( ) ;
111
+ } ) ;
112
+ } ) ;
117
113
}
118
114
119
115
// Deletes a bulk of state items for a list of keys
120
- async deleteBulk ( ) { }
116
+ async deleteBulk ( storeName : string , states : DeleteStateItem [ ] , meta ?: RequestMetadata ) : Promise < void > {
117
+ const req = new DeleteBulkStateRequest ( ) ;
118
+ req . setStoreName ( storeName ) ;
119
+ const stateList = this . createStateItemPBList ( states ) ;
120
+ req . setStatesList ( stateList ) ;
121
+
122
+ return new Promise ( ( resolve , reject ) => {
123
+ this . runtime . deleteBulkState ( req , this . createMetadata ( meta ) , ( err ) => {
124
+ if ( err ) return reject ( err ) ;
125
+ resolve ( ) ;
126
+ } ) ;
127
+ } ) ;
128
+ }
129
+
130
+ // Executes transactions for a specified store
131
+ async executeTransaction ( storeName : string , operations : StateOperation [ ] , meta ?: RequestMetadata ) : Promise < void > {
132
+ const req = new ExecuteStateTransactionRequest ( ) ;
133
+ req . setStorename ( storeName ) ;
134
+ const operationsList : TransactionalStateOperation [ ] = [ ] ;
135
+ for ( const operation of operations ) {
136
+ const ops = new TransactionalStateOperation ( ) ;
137
+ ops . setOperationtype ( operation . operationType ) ;
138
+ const stateItem = this . createStateItemPB ( operation . request ) ;
139
+ ops . setRequest ( stateItem ) ;
140
+ operationsList . push ( ops ) ;
141
+ }
142
+ req . setOperationsList ( operationsList ) ;
143
+
144
+ return new Promise ( ( resolve , reject ) => {
145
+ this . runtime . executeStateTransaction ( req , this . createMetadata ( meta ) , ( err , _res ) => {
146
+ if ( err ) return reject ( err ) ;
147
+ resolve ( ) ;
148
+ } ) ;
149
+ } ) ;
150
+ }
121
151
122
- // async transaction() {}
152
+ private createStateItemPB ( item : StateItem | DeleteStateItem ) : StateItemPB {
153
+ const stateItem = new StateItemPB ( ) ;
154
+ stateItem . setKey ( item . key ) ;
155
+ if ( 'value' in item ) {
156
+ if ( typeof item . value === 'string' ) {
157
+ stateItem . setValue ( Buffer . from ( item . value , 'utf8' ) ) ;
158
+ } else {
159
+ stateItem . setValue ( item . value ) ;
160
+ }
161
+ }
162
+ if ( item . etag !== undefined ) {
163
+ const etag = new Etag ( ) ;
164
+ etag . setValue ( item . etag ) ;
165
+ stateItem . setEtag ( etag ) ;
166
+ }
167
+ if ( item . options !== undefined ) {
168
+ const options = new StateOptionsPB ( ) ;
169
+ options . setConcurrency ( item . options . concurrency ) ;
170
+ options . setConsistency ( item . options . consistency ) ;
171
+ stateItem . setOptions ( options ) ;
172
+ }
173
+ return stateItem ;
174
+ }
175
+
176
+ private createStateItemPBList ( items : StateItem [ ] | DeleteStateItem [ ] ) : StateItemPB [ ] {
177
+ const list : StateItemPB [ ] = [ ] ;
178
+ for ( const item of items ) {
179
+ list . push ( this . createStateItemPB ( item ) ) ;
180
+ }
181
+ return list ;
182
+ }
183
+
184
+ private isEmpty ( obj : { getEtag ( ) : string } ) {
185
+ return obj . getEtag ( ) === '' ;
186
+ }
123
187
}
0 commit comments