-
Notifications
You must be signed in to change notification settings - Fork 13
/
fetch.go
176 lines (153 loc) · 5.47 KB
/
fetch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package sturdyc
import (
"context"
"errors"
"maps"
)
func (c *Client[T]) groupIDs(ids []string, keyFn KeyFn) (hits map[string]T, misses, refreshes []string) {
hits = make(map[string]T)
misses = make([]string, 0)
refreshes = make([]string, 0)
for _, id := range ids {
key := keyFn(id)
value, exists, markedAsMissing, shouldRefresh := c.getWithState(key)
// Check if the record should be refreshed in the background.
if shouldRefresh {
refreshes = append(refreshes, id)
}
if markedAsMissing {
continue
}
if !exists {
misses = append(misses, id)
continue
}
hits[id] = value
}
return hits, misses, refreshes
}
func getFetch[V, T any](ctx context.Context, c *Client[T], key string, fetchFn FetchFn[V]) (T, error) {
wrappedFetch := wrap[T](distributedFetch(c, key, fetchFn))
// Begin by checking if we have the item in our cache.
value, ok, markedAsMissing, shouldRefresh := c.getWithState(key)
if shouldRefresh {
c.safeGo(func() {
c.refresh(key, wrappedFetch)
})
}
if markedAsMissing {
return value, ErrMissingRecord
}
if ok {
return value, nil
}
return callAndCache(ctx, c, key, wrappedFetch)
}
// GetOrFetch attempts to retrieve the specified key from the cache. If the value
// is absent, it invokes the fetchFn function to obtain it and then stores the result.
// Additionally, when background refreshes are enabled, GetOrFetch determines if the record
// needs refreshing and, if necessary, schedules this task for background execution.
//
// Parameters:
//
// ctx - The context to be used for the request.
// key - The key to be fetched.
// fetchFn - Used to retrieve the data from the underlying data source if the key is not found in the cache.
//
// Returns:
//
// The value corresponding to the key and an error if one occurred.
func (c *Client[T]) GetOrFetch(ctx context.Context, key string, fetchFn FetchFn[T]) (T, error) {
return getFetch[T, T](ctx, c, key, fetchFn)
}
// GetOrFetch is a convenience function that performs type assertion on the result of client.GetOrFetch.
//
// Parameters:
//
// ctx - The context to be used for the request.
// c - The cache client.
// key - The key to be fetched.
// fetchFn - Used to retrieve the data from the underlying data source if the key is not found in the cache.
//
// Returns:
//
// The value corresponding to the key and an error if one occurred.
//
// Type Parameters:
//
// V - The type returned by the fetchFn. Must be assignable to T.
// T - The type stored in the cache.
func GetOrFetch[V, T any](ctx context.Context, c *Client[T], key string, fetchFn FetchFn[V]) (V, error) {
res, err := getFetch[V, T](ctx, c, key, fetchFn)
return unwrap[V](res, err)
}
func getFetchBatch[V, T any](ctx context.Context, c *Client[T], ids []string, keyFn KeyFn, fetchFn BatchFetchFn[V]) (map[string]T, error) {
wrappedFetch := wrapBatch[T](distributedBatchFetch[V, T](c, keyFn, fetchFn))
cachedRecords, cacheMisses, idsToRefresh := c.groupIDs(ids, keyFn)
// If any records need to be refreshed, we'll do so in the background.
if len(idsToRefresh) > 0 {
c.safeGo(func() {
if c.bufferRefreshes {
bufferBatchRefresh(c, idsToRefresh, keyFn, wrappedFetch)
return
}
c.refreshBatch(idsToRefresh, keyFn, wrappedFetch)
})
}
// If we were able to retrieve all records from the cache, we can return them straight away.
if len(cacheMisses) == 0 {
return cachedRecords, nil
}
callBatchOpts := callBatchOpts[T, T]{ids: cacheMisses, keyFn: keyFn, fn: wrappedFetch}
response, err := callAndCacheBatch(ctx, c, callBatchOpts)
if err != nil && !errors.Is(err, ErrOnlyCachedRecords) {
if len(cachedRecords) > 0 {
return cachedRecords, ErrOnlyCachedRecords
}
return cachedRecords, err
}
maps.Copy(cachedRecords, response)
return cachedRecords, err
}
// GetOrFetchBatch attempts to retrieve the specified ids from the cache. If
// any of the values are absent, it invokes the fetchFn function to obtain them
// and then stores the result. Additionally, when background refreshes are
// enabled, GetOrFetch determines if any of the records need refreshing and, if
// necessary, schedules this to be performed in the background.
//
// Parameters:
//
// ctx - The context to be used for the request.
// ids - The list of IDs to be fetched.
// keyFn - Used to generate the cache key for each ID.
// fetchFn - Used to retrieve the data from the underlying data source if any IDs are not found in the cache.
//
// Returns:
//
// A map of IDs to their corresponding values and an error if one occurred.
func (c *Client[T]) GetOrFetchBatch(ctx context.Context, ids []string, keyFn KeyFn, fetchFn BatchFetchFn[T]) (map[string]T, error) {
return getFetchBatch[T, T](ctx, c, ids, keyFn, fetchFn)
}
// GetOrFetchBatch is a convenience function that performs type assertion on the
// result of client.GetOrFetchBatch.
//
// Parameters:
//
// ctx - The context to be used for the request.
// c - The cache client.
// ids - The list of IDs to be fetched.
// keyFn - Used to prefix each ID in order to create a unique cache key.
// fetchFn - Used to retrieve the data from the underlying data source.
//
// Returns:
//
// A map of ids to their corresponding values and an error if one occurred.
//
// Type Parameters:
//
// V - The type returned by the fetchFn. Must be assignable to T.
// T - The type stored in the cache.
func GetOrFetchBatch[V, T any](ctx context.Context, c *Client[T], ids []string, keyFn KeyFn, fetchFn BatchFetchFn[V]) (map[string]V, error) {
res, err := getFetchBatch[V, T](ctx, c, ids, keyFn, fetchFn)
return unwrapBatch[V](res, err)
}