From 3521fcd6eb1936e43887a65214fdecbdbf306b09 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Fri, 22 Aug 2025 17:39:33 +0800 Subject: [PATCH 1/4] fix(zstd decompression): limit concurrency to 1 to prevent deadlock in zstd library --- encoding/codecv7_test.go | 30 ++++++++++++++++++++++++++++++ encoding/codecv7_types.go | 2 +- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/encoding/codecv7_test.go b/encoding/codecv7_test.go index 4b29ea7..ade96b1 100644 --- a/encoding/codecv7_test.go +++ b/encoding/codecv7_test.go @@ -4,8 +4,12 @@ import ( "encoding/hex" "encoding/json" "fmt" + "log" "math/big" "math/rand" + "net/http" + _ "net/http" + _ "net/http/pprof" "strings" "testing" @@ -17,6 +21,32 @@ import ( "github.com/stretchr/testify/require" ) +// TestDecodeAllDeadlock tests the decompression of random bytes to trigger deadlock in zstd library. + +func TestDecodeAllDeadlock(t *testing.T) { + //t.Skip("Skip test that triggers deadlock in zstd library") + + go func() { + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() + + // generate some random bytes + randomBytes := make([]byte, maxBlobBytes) + rand.Read(randomBytes) + + c := NewDACodecV8() + + compressed, err := c.CompressScrollBatchBytes(randomBytes) + require.NoError(t, err) + + // repeatedly decompress the bytes to trigger deadlock in zstd library + for i := 0; i < 100000; i++ { + uncompressed, err := decompressV7Bytes(compressed) + require.NoError(t, err) + require.Equal(t, randomBytes, uncompressed) + } +} + // TestCodecV7DABlockEncodeDecode tests the encoding and decoding of daBlockV7. func TestCodecV7DABlockEncodeDecode(t *testing.T) { codecV7, err := CodecFromVersion(CodecV7) diff --git a/encoding/codecv7_types.go b/encoding/codecv7_types.go index 8e1a86b..2096040 100644 --- a/encoding/codecv7_types.go +++ b/encoding/codecv7_types.go @@ -482,7 +482,7 @@ func decompressV7Bytes(compressedBytes []byte) ([]byte, error) { compressedBytes = append(zstdMagicNumber, compressedBytes...) r := bytes.NewReader(compressedBytes) - zr, err := zstd.NewReader(r) + zr, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(1)) if err != nil { return nil, fmt.Errorf("failed to create zstd reader: %w", err) } From 6a97fc7c1ac77254333e87b6b4624763c5608d01 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Fri, 22 Aug 2025 17:40:45 +0800 Subject: [PATCH 2/4] skip test --- encoding/codecv7_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/encoding/codecv7_test.go b/encoding/codecv7_test.go index ade96b1..934f546 100644 --- a/encoding/codecv7_test.go +++ b/encoding/codecv7_test.go @@ -24,7 +24,7 @@ import ( // TestDecodeAllDeadlock tests the decompression of random bytes to trigger deadlock in zstd library. func TestDecodeAllDeadlock(t *testing.T) { - //t.Skip("Skip test that triggers deadlock in zstd library") + t.Skip("Skip test that triggers deadlock in zstd library") go func() { log.Println(http.ListenAndServe("localhost:6060", nil)) From d801a9c1ac80c14e275e989afa896ac3bb877d14 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Fri, 22 Aug 2025 17:46:28 +0800 Subject: [PATCH 3/4] use crypt/rand --- encoding/codecv7_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/encoding/codecv7_test.go b/encoding/codecv7_test.go index 934f546..038bffe 100644 --- a/encoding/codecv7_test.go +++ b/encoding/codecv7_test.go @@ -1,6 +1,7 @@ package encoding import ( + crand "crypto/rand" "encoding/hex" "encoding/json" "fmt" @@ -32,7 +33,8 @@ func TestDecodeAllDeadlock(t *testing.T) { // generate some random bytes randomBytes := make([]byte, maxBlobBytes) - rand.Read(randomBytes) + _, err := crand.Read(randomBytes) + require.NoError(t, err) c := NewDACodecV8() From fd86a7c02d23afd75bf38bc746adac7a4ee5dbff Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Fri, 22 Aug 2025 17:54:22 +0800 Subject: [PATCH 4/4] address review comments --- encoding/codecv7_test.go | 5 ++--- encoding/da.go | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/encoding/codecv7_test.go b/encoding/codecv7_test.go index 038bffe..12bf8d0 100644 --- a/encoding/codecv7_test.go +++ b/encoding/codecv7_test.go @@ -9,7 +9,6 @@ import ( "math/big" "math/rand" "net/http" - _ "net/http" _ "net/http/pprof" "strings" "testing" @@ -22,8 +21,8 @@ import ( "github.com/stretchr/testify/require" ) -// TestDecodeAllDeadlock tests the decompression of random bytes to trigger deadlock in zstd library. - +// TestDecodeAllDeadlock tests the decompression of random bytes to trigger deadlock in zstd library +// with setting of zstd.WithDecoderConcurrency(2). func TestDecodeAllDeadlock(t *testing.T) { t.Skip("Skip test that triggers deadlock in zstd library") diff --git a/encoding/da.go b/encoding/da.go index de19118..572bc9d 100644 --- a/encoding/da.go +++ b/encoding/da.go @@ -552,7 +552,7 @@ func decompressScrollBlobToBatch(compressedBytes []byte) ([]byte, error) { batchOfBytes := make([]byte, readBatchSize) r := bytes.NewReader(compressedBytes) - zr, err := zstd.NewReader(r) + zr, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(1)) if err != nil { return nil, err }