forked from influxdata/influxdb-iox-client-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
iox_client_ingester_writeinfo.go
85 lines (76 loc) · 2.67 KB
/
iox_client_ingester_writeinfo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package influxdbiox
import (
"context"
"errors"
"net/http"
"time"
ingester "github.com/influxdata/influxdb-iox-client-go/v2/internal/ingester"
)
const tokenWaitInterval = 500 * time.Millisecond
// Blocks until the specified predicate is true.
func (c *Client) waitForToken(ctx context.Context, writeToken string, predicate func(*ingester.GetWriteInfoResponse) bool) error {
request := &ingester.GetWriteInfoRequest{
WriteToken: writeToken,
}
for {
response, err := c.ingesterWriteInfoClient.GetWriteInfo(ctx, request)
if err != nil {
return err
}
if predicate(response) {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(tokenWaitInterval):
continue
}
}
}
// WriteTokenFromHTTPResponse fetches the IOx write token, if available, from
// the http.Response object
func WriteTokenFromHTTPResponse(response *http.Response) (string, error) {
writeToken := response.Header.Get("X-IOx-Write-Token")
if len(writeToken) == 0 {
return "", errors.New("no write token found in HTTP response")
}
return writeToken, nil
}
// WaitForDurable blocks until the write associated with writeToken is durable,
// meaning that the data has been safely stored in a write-ahead log.
func (c *Client) WaitForDurable(ctx context.Context, writeToken string) error {
return c.waitForToken(ctx, writeToken, func(response *ingester.GetWriteInfoResponse) bool {
for _, pi := range response.ShardInfos {
if !((pi.Status == ingester.ShardStatus_SHARD_STATUS_DURABLE) || (pi.Status == ingester.ShardStatus_SHARD_STATUS_READABLE) || (pi.Status == ingester.ShardStatus_SHARD_STATUS_PERSISTED)) {
return false
}
}
return true
})
}
// WaitForReadable blocks until the write associated with writeToken is readable,
// meaning that the data can be queried.
func (c *Client) WaitForReadable(ctx context.Context, writeToken string) error {
return c.waitForToken(ctx, writeToken, func(response *ingester.GetWriteInfoResponse) bool {
for _, pi := range response.ShardInfos {
if !((pi.Status == ingester.ShardStatus_SHARD_STATUS_READABLE) || (pi.Status == ingester.ShardStatus_SHARD_STATUS_PERSISTED)) {
return false
}
}
return true
})
}
// WaitForPersisted blocks until the write associated with writeToken is persisted,
// meaning that the data has been batched, sorted, compacted, and persisted to disk
// or object storage.
func (c *Client) WaitForPersisted(ctx context.Context, writeToken string) error {
return c.waitForToken(ctx, writeToken, func(response *ingester.GetWriteInfoResponse) bool {
for _, pi := range response.ShardInfos {
if pi.Status != ingester.ShardStatus_SHARD_STATUS_PERSISTED {
return false
}
}
return true
})
}