From 66d94de665fc432d250177b0af3d945a05ab2089 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 24 Jul 2025 20:54:25 +0200 Subject: [PATCH] [FIXED] Cipher conversion fails on compressed msg block Signed-off-by: Maurice van Veen --- server/filestore.go | 7 +- server/jetstream_test.go | 145 ++++++++++++++++++++------------------- 2 files changed, 81 insertions(+), 71 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 1fcdc812f9a..9936949a023 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1293,10 +1293,13 @@ func (mb *msgBlock) convertCipher() error { buf, _ := mb.loadBlock(nil) bek.XORKeyStream(buf, buf) - // Make sure we can parse with old cipher and key file. - if err = mb.indexCacheBuf(buf); err != nil { + // Check for compression, and make sure we can parse with old cipher and key file. + if nbuf, err := mb.decompressIfNeeded(buf); err != nil { + return err + } else if err = mb.indexCacheBuf(nbuf); err != nil { return err } + // Reset the cache since we just read everything in. mb.cache = nil diff --git a/server/jetstream_test.go b/server/jetstream_test.go index ada1dfa4610..9e51bde5016 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -16096,7 +16096,8 @@ func TestJetStreamLastSequenceBySubjectConcurrent(t *testing.T) { func TestJetStreamServerReencryption(t *testing.T) { storeDir := t.TempDir() - for i, algo := range []struct { + var i int + for _, algo := range []struct { from string to string }{ @@ -16105,40 +16106,42 @@ func TestJetStreamServerReencryption(t *testing.T) { {"chacha", "chacha"}, {"chacha", "aes"}, } { - t.Run(fmt.Sprintf("%s_to_%s", algo.from, algo.to), func(t *testing.T) { - streamName := fmt.Sprintf("TEST_%d", i) - subjectName := fmt.Sprintf("foo_%d", i) - expected := 30 - - checkStream := func(js nats.JetStreamContext) { - si, err := js.StreamInfo(streamName) - if err != nil { - t.Fatal(err) - } + for _, compression := range []StoreCompression{NoCompression, S2Compression} { + t.Run(fmt.Sprintf("%s_to_%s/%s", algo.from, algo.to, compression), func(t *testing.T) { + i++ + streamName := fmt.Sprintf("TEST_%d", i) + subjectName := fmt.Sprintf("foo_%d", i) + expected := 30 + + checkStream := func(js nats.JetStreamContext) { + si, err := js.StreamInfo(streamName) + if err != nil { + t.Fatal(err) + } - if si.State.Msgs != uint64(expected) { - t.Fatalf("Should be %d messages but got %d messages", expected, si.State.Msgs) - } + if si.State.Msgs != uint64(expected) { + t.Fatalf("Should be %d messages but got %d messages", expected, si.State.Msgs) + } - sub, err := js.PullSubscribe(subjectName, "") - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + sub, err := js.PullSubscribe(subjectName, "") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } - c := 0 - for _, m := range fetchMsgs(t, sub, expected, 5*time.Second) { - m.AckSync() - c++ - } - if c != expected { - t.Fatalf("Should have read back %d messages but got %d messages", expected, c) + c := 0 + for _, m := range fetchMsgs(t, sub, expected, 5*time.Second) { + m.AckSync() + c++ + } + if c != expected { + t.Fatalf("Should have read back %d messages but got %d messages", expected, c) + } } - } - // First off, we start up using the original encryption key and algorithm. - // We'll create a stream and populate it with some messages. - t.Run("setup", func(t *testing.T) { - conf := createConfFile(t, []byte(fmt.Sprintf(` + // First off, we start up using the original encryption key and algorithm. + // We'll create a stream and populate it with some messages. + t.Run("setup", func(t *testing.T) { + conf := createConfFile(t, []byte(fmt.Sprintf(` server_name: S22 listen: 127.0.0.1:-1 jetstream: { @@ -16148,34 +16151,37 @@ func TestJetStreamServerReencryption(t *testing.T) { } `, "firstencryptionkey", algo.from, storeDir))) - s, _ := RunServerWithConfig(conf) - defer s.Shutdown() + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() - nc, js := jsClientConnect(t, s) - defer nc.Close() + nc, js := jsClientConnect(t, s) + defer nc.Close() - cfg := &nats.StreamConfig{ - Name: streamName, - Subjects: []string{subjectName}, - } - if _, err := js.AddStream(cfg); err != nil { - t.Fatalf("Unexpected error: %v", err) - } + cfg := &StreamConfig{ + Name: streamName, + Subjects: []string{subjectName}, + Storage: FileStorage, + Compression: compression, + } + if _, err := jsStreamCreate(t, nc, cfg); err != nil { + t.Fatalf("Unexpected error: %v", err) + } - for i := 0; i < expected; i++ { - if _, err := js.Publish(subjectName, []byte("ENCRYPTED PAYLOAD!!")); err != nil { - t.Fatalf("Unexpected publish error: %v", err) + payload := strings.Repeat("A", 512*1024) + for i := 0; i < expected; i++ { + if _, err := js.Publish(subjectName, []byte(payload)); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } } - } - checkStream(js) - }) + checkStream(js) + }) - // Next up, we will restart the server, this time with both the new key - // and algorithm and also the old key. At startup, the server will detect - // the change in encryption key and/or algorithm and re-encrypt the stream. - t.Run("reencrypt", func(t *testing.T) { - conf := createConfFile(t, []byte(fmt.Sprintf(` + // Next up, we will restart the server, this time with both the new key + // and algorithm and also the old key. At startup, the server will detect + // the change in encryption key and/or algorithm and re-encrypt the stream. + t.Run("reencrypt", func(t *testing.T) { + conf := createConfFile(t, []byte(fmt.Sprintf(` server_name: S22 listen: 127.0.0.1:-1 jetstream: { @@ -16186,20 +16192,20 @@ func TestJetStreamServerReencryption(t *testing.T) { } `, "secondencryptionkey", algo.to, "firstencryptionkey", storeDir))) - s, _ := RunServerWithConfig(conf) - defer s.Shutdown() + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() - nc, js := jsClientConnect(t, s) - defer nc.Close() + nc, js := jsClientConnect(t, s) + defer nc.Close() - checkStream(js) - }) + checkStream(js) + }) - // Finally, we'll restart the server using only the new key and algorithm. - // At this point everything should have been re-encrypted, so we should still - // be able to access the stream. - t.Run("restart", func(t *testing.T) { - conf := createConfFile(t, []byte(fmt.Sprintf(` + // Finally, we'll restart the server using only the new key and algorithm. + // At this point everything should have been re-encrypted, so we should still + // be able to access the stream. + t.Run("restart", func(t *testing.T) { + conf := createConfFile(t, []byte(fmt.Sprintf(` server_name: S22 listen: 127.0.0.1:-1 jetstream: { @@ -16209,15 +16215,16 @@ func TestJetStreamServerReencryption(t *testing.T) { } `, "secondencryptionkey", algo.to, storeDir))) - s, _ := RunServerWithConfig(conf) - defer s.Shutdown() + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() - nc, js := jsClientConnect(t, s) - defer nc.Close() + nc, js := jsClientConnect(t, s) + defer nc.Close() - checkStream(js) + checkStream(js) + }) }) - }) + } } }