Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix segments fragments loss #23781

Merged
merged 10 commits into from
Oct 25, 2023
3 changes: 3 additions & 0 deletions changelog/23781.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
core/activity: Fixes segments fragment loss due to exceeding entry record size limit
```
18 changes: 9 additions & 9 deletions vault/activity_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,18 @@ const (
activitySegmentWriteTimeout = 1 * time.Minute

// Number of client records to store per segment. Each ClientRecord may
// consume upto 99 bytes; rounding it to 100bytes. Considering the storage
// limit of 512KB per storage entry, we can roughly store 512KB/100bytes =
// 5241 clients; rounding down to 5000 clients.
activitySegmentClientCapacity = 5000
// consume upto 99 bytes; rounding it to 100bytes. This []byte undergo JSON marshalling
// before adding them in storage increasing the size by approximately 4/3 times. Considering the storage
// limit of 512KB per storage entry, we can roughly store 512KB/(100bytes * 4/3) yielding approximately 3820 records.
ActivitySegmentClientCapacity = 3820

// Maximum number of segments per month. This allows for 700K entities per
// month; 700K/5K. These limits are geared towards controlling the storage
// month; 700K/3820 (ActivitySegmentClientCapacity). These limits are geared towards controlling the storage
// implications of persisting activity logs. If we hit a scenario where the
// storage consequences are less important in comparison to the accuracy of
// the client activity, these limits can be further relaxed or even be
// removed.
activityLogMaxSegmentPerMonth = 140
activityLogMaxSegmentPerMonth = 184

// trackedTWESegmentPeriod is a time period of a little over a month, and represents
// the amount of time that needs to pass after a 1.9 or later upgrade to result in
Expand Down Expand Up @@ -351,7 +351,7 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
}

// Will all new entities fit? If not, roll over to a new segment.
available := activitySegmentClientCapacity - len(a.currentSegment.currentClients.Clients)
available := ActivitySegmentClientCapacity - len(a.currentSegment.currentClients.Clients)
remaining := available - len(newEntities)
excess := 0
if remaining < 0 {
Expand Down Expand Up @@ -389,9 +389,9 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for

// Rotate to next segment
a.currentSegment.clientSequenceNumber += 1
if len(excessClients) > activitySegmentClientCapacity {
if len(excessClients) > ActivitySegmentClientCapacity {
a.logger.Warn("too many new active clients, dropping tail", "clients", len(excessClients))
excessClients = excessClients[:activitySegmentClientCapacity]
excessClients = excessClients[:ActivitySegmentClientCapacity]
}
a.currentSegment.currentClients.Clients = excessClients
err := a.saveCurrentSegmentInternal(ctx, force)
Expand Down
39 changes: 27 additions & 12 deletions vault/activity_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,8 @@ func TestActivityLog_availableLogs(t *testing.T) {
// TestActivityLog_MultipleFragmentsAndSegments adds 4000 clients to a fragment
// and saves it and reads it. The test then adds 4000 more clients and calls
// receivedFragment with 200 more entities. The current segment is saved to
// storage and read back. The test verifies that there are 5000 clients in the
// first segment index, then the rest in the second index.
// storage and read back. The test verifies that there are ActivitySegmentClientCapacity clients in the
// first and second segment index, then the rest in the third index.
func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
core, _, _ := TestCoreUnsealedWithConfig(t, &CoreConfig{
ActivityLogConfig: ActivityLogCoreConfig{
Expand All @@ -682,14 +682,15 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
startTimestamp := a.GetStartTimestamp()
path0 := fmt.Sprintf("sys/counters/activity/log/entity/%d/0", startTimestamp)
path1 := fmt.Sprintf("sys/counters/activity/log/entity/%d/1", startTimestamp)
path2 := fmt.Sprintf("sys/counters/activity/log/entity/%d/2", startTimestamp)
tokenPath := fmt.Sprintf("sys/counters/activity/log/directtokens/%d/0", startTimestamp)

genID := func(i int) string {
return fmt.Sprintf("11111111-1111-1111-1111-%012d", i)
}
ts := time.Now().Unix()

// First 4000 should fit in one segment
// First ActivitySegmentClientCapacity should fit in one segment
for i := 0; i < 4000; i++ {
a.AddEntityToFragment(genID(i), "root", ts)
}
Expand All @@ -702,7 +703,7 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
default:
}

// Save incomplete segment
// Save segment
err := a.saveCurrentSegmentToStorage(context.Background(), false)
if err != nil {
t.Fatalf("got error writing entities to storage: %v", err)
Expand All @@ -714,8 +715,8 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
if err != nil {
t.Fatalf("could not unmarshal protobuf: %v", err)
}
if len(entityLog0.Clients) != 4000 {
t.Fatalf("unexpected entity length. Expected %d, got %d", 4000, len(entityLog0.Clients))
if len(entityLog0.Clients) != ActivitySegmentClientCapacity {
t.Fatalf("unexpected entity length. Expected %d, got %d", ActivitySegmentClientCapacity, len(entityLog0.Clients))
}

// 4000 more local entities
Expand Down Expand Up @@ -775,17 +776,17 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
}

seqNum := a.GetEntitySequenceNumber()
if seqNum != 1 {
t.Fatalf("expected sequence number 1, got %v", seqNum)
if seqNum != 2 {
t.Fatalf("expected sequence number 2, got %v", seqNum)
}

protoSegment0 = readSegmentFromStorage(t, core, path0)
err = proto.Unmarshal(protoSegment0.Value, &entityLog0)
if err != nil {
t.Fatalf("could not unmarshal protobuf: %v", err)
}
if len(entityLog0.Clients) != activitySegmentClientCapacity {
t.Fatalf("unexpected client length. Expected %d, got %d", activitySegmentClientCapacity,
if len(entityLog0.Clients) != ActivitySegmentClientCapacity {
t.Fatalf("unexpected client length. Expected %d, got %d", ActivitySegmentClientCapacity,
len(entityLog0.Clients))
}

Expand All @@ -795,8 +796,19 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
if err != nil {
t.Fatalf("could not unmarshal protobuf: %v", err)
}
expectedCount := 8100 - activitySegmentClientCapacity
if len(entityLog1.Clients) != expectedCount {
if len(entityLog1.Clients) != ActivitySegmentClientCapacity {
t.Fatalf("unexpected entity length. Expected %d, got %d", ActivitySegmentClientCapacity,
len(entityLog1.Clients))
}

protoSegment2 := readSegmentFromStorage(t, core, path2)
entityLog2 := activity.EntityActivityLog{}
err = proto.Unmarshal(protoSegment2.Value, &entityLog2)
if err != nil {
t.Fatalf("could not unmarshal protobuf: %v", err)
}
expectedCount := 8100 - (ActivitySegmentClientCapacity * 2)
if len(entityLog2.Clients) != expectedCount {
t.Fatalf("unexpected entity length. Expected %d, got %d", expectedCount,
len(entityLog1.Clients))
}
Expand All @@ -808,6 +820,9 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
for _, e := range entityLog1.Clients {
entityPresent[e.ClientID] = struct{}{}
}
for _, e := range entityLog2.Clients {
entityPresent[e.ClientID] = struct{}{}
}
for i := 0; i < 8100; i++ {
expectedID := genID(i)
if _, present := entityPresent[expectedID]; !present {
Expand Down