Skip to content

Commit

Permalink
Fix segments fragments loss (#23781) (#23841)
Browse files Browse the repository at this point in the history
* add ent changes

* add changelog

* make fmt

Co-authored-by: akshya96 <[email protected]>
  • Loading branch information
1 parent c07fb7e commit 725de7b
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 22 deletions.
3 changes: 3 additions & 0 deletions changelog/23781.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
core/activity: Fixes segments fragment loss due to exceeding entry record size limit
```
18 changes: 9 additions & 9 deletions vault/activity_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,18 @@ const (
activitySegmentWriteTimeout = 1 * time.Minute

// Number of client records to store per segment. Each ClientRecord may
// consume upto 99 bytes; rounding it to 100bytes. Considering the storage
// limit of 512KB per storage entry, we can roughly store 512KB/100bytes =
// 5241 clients; rounding down to 5000 clients.
activitySegmentClientCapacity = 5000
// consume upto 99 bytes; rounding it to 100bytes. This []byte undergo JSON marshalling
// before adding them in storage increasing the size by approximately 4/3 times. Considering the storage
// limit of 512KB per storage entry, we can roughly store 512KB/(100bytes * 4/3) yielding approximately 3820 records.
ActivitySegmentClientCapacity = 3820

// Maximum number of segments per month. This allows for 700K entities per
// month; 700K/5K. These limits are geared towards controlling the storage
// month; 700K/3820 (ActivitySegmentClientCapacity). These limits are geared towards controlling the storage
// implications of persisting activity logs. If we hit a scenario where the
// storage consequences are less important in comparison to the accuracy of
// the client activity, these limits can be further relaxed or even be
// removed.
activityLogMaxSegmentPerMonth = 140
activityLogMaxSegmentPerMonth = 184

// trackedTWESegmentPeriod is a time period of a little over a month, and represents
// the amount of time that needs to pass after a 1.9 or later upgrade to result in
Expand Down Expand Up @@ -351,7 +351,7 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
}

// Will all new entities fit? If not, roll over to a new segment.
available := activitySegmentClientCapacity - len(a.currentSegment.currentClients.Clients)
available := ActivitySegmentClientCapacity - len(a.currentSegment.currentClients.Clients)
remaining := available - len(newEntities)
excess := 0
if remaining < 0 {
Expand Down Expand Up @@ -389,9 +389,9 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for

// Rotate to next segment
a.currentSegment.clientSequenceNumber += 1
if len(excessClients) > activitySegmentClientCapacity {
if len(excessClients) > ActivitySegmentClientCapacity {
a.logger.Warn("too many new active clients, dropping tail", "clients", len(excessClients))
excessClients = excessClients[:activitySegmentClientCapacity]
excessClients = excessClients[:ActivitySegmentClientCapacity]
}
a.currentSegment.currentClients.Clients = excessClients
err := a.saveCurrentSegmentInternal(ctx, force)
Expand Down
43 changes: 30 additions & 13 deletions vault/activity_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,9 +663,11 @@ func TestActivityLog_availableLogs(t *testing.T) {
}
}

// TestActivityLog_MultipleFragmentsAndSegments adds 4000 clients to a fragment and saves it and reads it. The test then
// adds 4000 more clients and calls receivedFragment with 200 more entities. The current segment is saved to storage and
// read back. The test verifies that there are 5000 clients in the first segment index, then the rest in the second index.
// TestActivityLog_MultipleFragmentsAndSegments adds 4000 clients to a fragment
// and saves it and reads it. The test then adds 4000 more clients and calls
// receivedFragment with 200 more entities. The current segment is saved to
// storage and read back. The test verifies that there are ActivitySegmentClientCapacity clients in the
// first and second segment index, then the rest in the third index.
func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
core, _, _ := TestCoreUnsealed(t)
a := core.activityLog
Expand All @@ -685,14 +687,15 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
startTimestamp := a.GetStartTimestamp()
path0 := fmt.Sprintf("sys/counters/activity/log/entity/%d/0", startTimestamp)
path1 := fmt.Sprintf("sys/counters/activity/log/entity/%d/1", startTimestamp)
path2 := fmt.Sprintf("sys/counters/activity/log/entity/%d/2", startTimestamp)
tokenPath := fmt.Sprintf("sys/counters/activity/log/directtokens/%d/0", startTimestamp)

genID := func(i int) string {
return fmt.Sprintf("11111111-1111-1111-1111-%012d", i)
}
ts := time.Now().Unix()

// First 4000 should fit in one segment
// First ActivitySegmentClientCapacity should fit in one segment
for i := 0; i < 4000; i++ {
a.AddEntityToFragment(genID(i), "root", ts)
}
Expand All @@ -705,7 +708,7 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
default:
}

// Save incomplete segment
// Save segment
err := a.saveCurrentSegmentToStorage(context.Background(), false)
if err != nil {
t.Fatalf("got error writing entities to storage: %v", err)
Expand All @@ -717,8 +720,8 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
if err != nil {
t.Fatalf("could not unmarshal protobuf: %v", err)
}
if len(entityLog0.Clients) != 4000 {
t.Fatalf("unexpected entity length. Expected %d, got %d", 4000, len(entityLog0.Clients))
if len(entityLog0.Clients) != ActivitySegmentClientCapacity {
t.Fatalf("unexpected entity length. Expected %d, got %d", ActivitySegmentClientCapacity, len(entityLog0.Clients))
}

// 4000 more local entities
Expand Down Expand Up @@ -778,17 +781,17 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
}

seqNum := a.GetEntitySequenceNumber()
if seqNum != 1 {
t.Fatalf("expected sequence number 1, got %v", seqNum)
if seqNum != 2 {
t.Fatalf("expected sequence number 2, got %v", seqNum)
}

protoSegment0 = readSegmentFromStorage(t, core, path0)
err = proto.Unmarshal(protoSegment0.Value, &entityLog0)
if err != nil {
t.Fatalf("could not unmarshal protobuf: %v", err)
}
if len(entityLog0.Clients) != activitySegmentClientCapacity {
t.Fatalf("unexpected client length. Expected %d, got %d", activitySegmentClientCapacity,
if len(entityLog0.Clients) != ActivitySegmentClientCapacity {
t.Fatalf("unexpected client length. Expected %d, got %d", ActivitySegmentClientCapacity,
len(entityLog0.Clients))
}

Expand All @@ -798,8 +801,19 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
if err != nil {
t.Fatalf("could not unmarshal protobuf: %v", err)
}
expectedCount := 8100 - activitySegmentClientCapacity
if len(entityLog1.Clients) != expectedCount {
if len(entityLog1.Clients) != ActivitySegmentClientCapacity {
t.Fatalf("unexpected entity length. Expected %d, got %d", ActivitySegmentClientCapacity,
len(entityLog1.Clients))
}

protoSegment2 := readSegmentFromStorage(t, core, path2)
entityLog2 := activity.EntityActivityLog{}
err = proto.Unmarshal(protoSegment2.Value, &entityLog2)
if err != nil {
t.Fatalf("could not unmarshal protobuf: %v", err)
}
expectedCount := 8100 - (ActivitySegmentClientCapacity * 2)
if len(entityLog2.Clients) != expectedCount {
t.Fatalf("unexpected entity length. Expected %d, got %d", expectedCount,
len(entityLog1.Clients))
}
Expand All @@ -811,6 +825,9 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
for _, e := range entityLog1.Clients {
entityPresent[e.ClientID] = struct{}{}
}
for _, e := range entityLog2.Clients {
entityPresent[e.ClientID] = struct{}{}
}
for i := 0; i < 8100; i++ {
expectedID := genID(i)
if _, present := entityPresent[expectedID]; !present {
Expand Down

0 comments on commit 725de7b

Please sign in to comment.