Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ The Lotus and Lotus-Miner v1.34.2 release includes numerous bug fixes, CLI enhan
- fix(miner): ensure sender account exists ([filecoin-project/lotus#13348](https://github.com/filecoin-project/lotus/pull/13348))
- fix(eth): properly return vm error in all gas estimation methods ([filecoin-project/lotus#13389](https://github.com/filecoin-project/lotus/pull/13389))
- chore: all actor cmd support --actor ([filecoin-project/lotus#13391](https://github.com/filecoin-project/lotus/pull/13391))
- fix(fr32): fix data corruption in multithreaded Pad/Unpad for non-aligned sizes

## 📝 Changelog

Expand Down
19 changes: 18 additions & 1 deletion storage/sealer/fr32/fr32.go
Comment thread
rjan90 marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,14 @@ func mtChunkCount(usz abi.PaddedPieceSize) uint64 {

func mt(in, out []byte, padLen int, op func(unpadded, padded []byte)) {
threads := mtChunkCount(abi.PaddedPieceSize(padLen))
threadBytes := abi.PaddedPieceSize(padLen / int(threads))

// Ensure threadBytes is aligned to 128-byte chunk boundaries.
// Each fr32 chunk is 128 padded bytes / 127 unpadded bytes.
chunksPerThread := (padLen / int(threads)) / 128
if chunksPerThread == 0 {
chunksPerThread = 1
}
threadBytes := abi.PaddedPieceSize(chunksPerThread * 128)

var wg sync.WaitGroup
wg.Add(int(threads))
Expand All @@ -53,6 +60,16 @@ func mt(in, out []byte, padLen int, op func(unpadded, padded []byte)) {
start := threadBytes * abi.PaddedPieceSize(thread)
end := start + threadBytes

// Last thread takes any remainder
if thread == int(threads)-1 {
end = abi.PaddedPieceSize(padLen)
}

// Skip if this thread has no work
if start >= abi.PaddedPieceSize(padLen) {
return
}

op(in[start.Unpadded():end.Unpadded()], out[start:end])
}(i)
}
Expand Down
129 changes: 129 additions & 0 deletions storage/sealer/fr32/fr32_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fr32_test
import (
"bytes"
"crypto/rand"
"fmt"
"io"
"os"
"testing"
Expand Down Expand Up @@ -149,6 +150,134 @@ func TestRoundtrip16MRand(t *testing.T) {
require.Equal(t, ffi, buf)
}

// TestRoundtripMisalignedSizes tests the multithreaded Pad/Unpad with sizes that
// previously caused data corruption due to thread boundary misalignment.
// The bug occurred when (padLen / threads) was not a multiple of 128 bytes,
// causing partial chunks at thread boundaries to be skipped.
func TestRoundtripMisalignedSizes(t *testing.T) {
// These sizes are chosen to trigger the multithreaded path (> 512KB)
// and create thread boundaries that don't align to 128-byte chunks.
testCases := []struct {
name string
numChunks int
}{
// 66061 chunks = 8455808 padded bytes
// With 16 threads: 8455808/16 = 528488 bytes per thread
// 528488/128 = 4128.5 - NOT aligned! This was the original bug case.
{"66061_chunks_8MiB_boundary", 66061},

// Various sizes that create misaligned thread boundaries
{"prime_chunks_1009", 1009 * 8}, // ~1MB, prime-ish number of chunks
{"odd_chunks_8193", 8193}, // Just over 8192 (power of 2)
{"odd_chunks_65537", 65537}, // Just over 65536 (power of 2)
{"odd_chunks_100003", 100003}, // Large prime
{"boundary_chunks_66000", 66000}, // Near the original bug size
{"boundary_chunks_70000", 70000}, // Larger odd size
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
unpaddedSize := tc.numChunks * 127
paddedSize := tc.numChunks * 128

// Skip if too large for this test
if paddedSize > 64<<20 {
t.Skip("Size too large for this test")
}

input := make([]byte, unpaddedSize)
_, err := rand.Read(input)
require.NoError(t, err)

padded := make([]byte, paddedSize)
fr32.Pad(input, padded)

output := make([]byte, unpaddedSize)
fr32.Unpad(padded, output)

require.Equal(t, input, output, "Roundtrip failed for %d chunks", tc.numChunks)
})
}
}

// TestUnpadMisalignedThreadBoundaries specifically tests the fix for the
// multithreaded Unpad bug where thread boundaries weren't aligned to
// 128-byte fr32 chunks, causing data loss.
func TestUnpadMisalignedThreadBoundaries(t *testing.T) {
// Create data that's just over 8MiB to trigger the original bug
// 66061 chunks * 127 bytes = 8389747 unpadded bytes
// 66061 chunks * 128 bytes = 8455808 padded bytes
numChunks := 66061
unpaddedSize := numChunks * 127
paddedSize := numChunks * 128

// Create sequential data so we can detect exactly where corruption occurs
input := make([]byte, unpaddedSize)
for i := range input {
input[i] = byte(i & 0xFF)
}

padded := make([]byte, paddedSize)
fr32.Pad(input, padded)

output := make([]byte, unpaddedSize)
fr32.Unpad(padded, output)

// Check for corruption at thread boundaries
// With the original bug, corruption occurred at offsets like:
// 528384 (thread 0/1 boundary), 1056768 (thread 1/2 boundary), etc.

// First verify total length
require.Equal(t, len(input), len(output), "Output length mismatch")

// Check every byte
for i := 0; i < len(input); i++ {
if input[i] != output[i] {
// Find the extent of the corruption
corruptStart := i
corruptEnd := i
for corruptEnd < len(input) && input[corruptEnd] != output[corruptEnd] {
corruptEnd++
}
t.Fatalf("Data corruption at offset %d (0x%x) to %d (0x%x): expected 0x%02x, got 0x%02x (corrupt bytes: %d)",
corruptStart, corruptStart, corruptEnd, corruptEnd,
input[i], output[i], corruptEnd-corruptStart)
}
}
}

// TestPadUnpadVariousSizesAboveMTTresh tests Pad/Unpad roundtrip for various
// sizes above the MTTresh (512KB) threshold that triggers multithreading.
func TestPadUnpadVariousSizesAboveMTTresh(t *testing.T) {
// Test sizes from just above MTTresh to several MB
// These should all use the multithreaded path
sizes := []int{
513 * 1024 / 127 * 127, // Just above 512KB, aligned to chunks
1 * 1024 * 1024 / 127 * 127, // ~1MB aligned
2*1024*1024/127*127 + 127*100, // ~2MB + extra chunks
4*1024*1024/127*127 + 127*333, // ~4MB + odd chunks
8*1024*1024/127*127 + 127*777, // ~8MB + odd chunks
}

for _, unpaddedSize := range sizes {
paddedSize := unpaddedSize / 127 * 128

t.Run(fmt.Sprintf("%d_bytes", unpaddedSize), func(t *testing.T) {
input := make([]byte, unpaddedSize)
_, err := rand.Read(input)
require.NoError(t, err)

padded := make([]byte, paddedSize)
fr32.Pad(input, padded)

output := make([]byte, unpaddedSize)
fr32.Unpad(padded, output)

require.Equal(t, input, output)
})
}
}

func BenchmarkPadChunk(b *testing.B) {
var buf [128]byte
in := bytes.Repeat([]byte{0xff}, 127)
Expand Down
Loading