Skip to content

Commit 0c8552c

Browse files
Evaluate service imports when publishing advisories (#4302)
This should fix #4275 by allowing advisories to be copied into other accounts via service imports. Signed-off-by: Neil Twigg <[email protected]>
2 parents 9cfe8b8 + 1434ee7 commit 0c8552c

File tree

2 files changed

+219
-8
lines changed

2 files changed

+219
-8
lines changed

server/client.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3127,20 +3127,14 @@ var needFlush = struct{}{}
31273127
// deliverMsg will deliver a message to a matching subscription and its underlying client.
31283128
// We process all connection/client types. mh is the part that will be protocol/client specific.
31293129
func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, subject, reply, mh, msg []byte, gwrply bool) bool {
3130-
// Check sub client and check echo
3131-
if sub.client == nil || c == sub.client && !sub.client.echo {
3130+
// Check sub client and check echo. Only do this if not a service import.
3131+
if sub.client == nil || (c == sub.client && !sub.client.echo && !sub.si) {
31323132
return false
31333133
}
31343134

31353135
client := sub.client
31363136
client.mu.Lock()
31373137

3138-
// Check echo
3139-
if c == client && !client.echo {
3140-
client.mu.Unlock()
3141-
return false
3142-
}
3143-
31443138
// Check if we have a subscribe deny clause. This will trigger us to check the subject
31453139
// for a match against the denied subjects.
31463140
if client.mperms != nil && client.checkDenySub(string(subject)) {

server/jetstream_test.go

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10723,6 +10723,223 @@ func TestJetStreamAccountImportBasics(t *testing.T) {
1072310723
}
1072410724
}
1072510725

10726+
// This tests whether we are able to aggregate all JetStream advisory events
10727+
// from all accounts into a single account. Config for this test uses
10728+
// service imports and exports as that allows for gathering all events
10729+
// without having to know the account name and without separate entries
10730+
// for each account in aggregate account config.
10731+
// This test fails as it is not receiving the api audit event ($JS.EVENT.ADVISORY.API).
10732+
func TestJetStreamAccountImportJSAdvisoriesAsService(t *testing.T) {
10733+
conf := createConfFile(t, []byte(`
10734+
listen=127.0.0.1:-1
10735+
no_auth_user: pp
10736+
jetstream: {max_mem_store: 64GB, max_file_store: 10TB}
10737+
accounts {
10738+
JS {
10739+
jetstream: enabled
10740+
users: [ {user: pp, password: foo} ]
10741+
imports [
10742+
{ service: { account: AGG, subject: '$JS.EVENT.ADVISORY.ACC.JS.>' }, to: '$JS.EVENT.ADVISORY.>' }
10743+
]
10744+
}
10745+
AGG {
10746+
users: [ {user: agg, password: foo} ]
10747+
exports: [
10748+
{ service: '$JS.EVENT.ADVISORY.ACC.*.>', response: Singleton, account_token_position: 5 }
10749+
]
10750+
}
10751+
}
10752+
`))
10753+
10754+
s, _ := RunServerWithConfig(conf)
10755+
if config := s.JetStreamConfig(); config != nil {
10756+
defer removeDir(t, config.StoreDir)
10757+
}
10758+
defer s.Shutdown()
10759+
10760+
// This should be the pp user, one which manages JetStream assets
10761+
ncJS, err := nats.Connect(s.ClientURL())
10762+
if err != nil {
10763+
t.Fatalf("Unexpected error during connect: %v", err)
10764+
}
10765+
defer ncJS.Close()
10766+
10767+
// This is the agg user, which should aggregate all JS advisory events.
10768+
ncAgg, err := nats.Connect(s.ClientURL(), nats.UserInfo("agg", "foo"))
10769+
if err != nil {
10770+
t.Fatalf("Unexpected error during connect: %v", err)
10771+
}
10772+
defer ncAgg.Close()
10773+
10774+
js, err := ncJS.JetStream()
10775+
if err != nil {
10776+
t.Fatalf("Unexpected error: %v", err)
10777+
}
10778+
10779+
// user from JS account should receive events on $JS.EVENT.ADVISORY.> subject
10780+
subJS, err := ncJS.SubscribeSync("$JS.EVENT.ADVISORY.>")
10781+
if err != nil {
10782+
t.Fatalf("Unexpected error: %v", err)
10783+
}
10784+
defer subJS.Unsubscribe()
10785+
10786+
// user from AGG account should receive events on mapped $JS.EVENT.ADVISORY.ACC.JS.> subject (with account name)
10787+
subAgg, err := ncAgg.SubscribeSync("$JS.EVENT.ADVISORY.ACC.JS.>")
10788+
if err != nil {
10789+
t.Fatalf("Unexpected error: %v", err)
10790+
}
10791+
10792+
// add stream using JS account
10793+
// this should trigger 2 events:
10794+
// - an action event on $JS.EVENT.ADVISORY.STREAM.CREATED.ORDERS
10795+
// - an api audit event on $JS.EVENT.ADVISORY.API
10796+
_, err = js.AddStream(&nats.StreamConfig{Name: "ORDERS", Subjects: []string{"ORDERS.*"}})
10797+
if err != nil {
10798+
t.Fatalf("Unexpected error adding stream: %v", err)
10799+
}
10800+
msg, err := subJS.NextMsg(time.Second)
10801+
if err != nil {
10802+
t.Fatalf("Unexpected error: %v", err)
10803+
}
10804+
if msg.Subject != "$JS.EVENT.ADVISORY.STREAM.CREATED.ORDERS" {
10805+
t.Fatalf("Unexpected subject: %q", msg.Subject)
10806+
}
10807+
msg, err = subJS.NextMsg(time.Second)
10808+
if err != nil {
10809+
t.Fatalf("Unexpected error: %v", err)
10810+
}
10811+
if msg.Subject != "$JS.EVENT.ADVISORY.API" {
10812+
t.Fatalf("Unexpected subject: %q", msg.Subject)
10813+
}
10814+
10815+
// same set of events should be received by AGG account
10816+
// on subjects containing account name (ACC.JS)
10817+
msg, err = subAgg.NextMsg(time.Second)
10818+
if err != nil {
10819+
t.Fatalf("Unexpected error: %v", err)
10820+
}
10821+
if msg.Subject != "$JS.EVENT.ADVISORY.ACC.JS.STREAM.CREATED.ORDERS" {
10822+
t.Fatalf("Unexpected subject: %q", msg.Subject)
10823+
}
10824+
10825+
// we get error here, since we do not get the api audit event
10826+
msg, err = subAgg.NextMsg(time.Second)
10827+
if err != nil {
10828+
t.Fatalf("Unexpected error: %v", err)
10829+
}
10830+
if msg.Subject != "$JS.EVENT.ADVISORY.ACC.JS.API" {
10831+
t.Fatalf("Unexpected subject: %q", msg.Subject)
10832+
}
10833+
}
10834+
10835+
// This tests whether we are able to aggregate all JetStream advisory events
10836+
// from all accounts into a single account. Config for this test uses
10837+
// stream imports and exports as that allows for gathering all events
10838+
// as long as there is a separate stream import entry for each account
10839+
// in aggregate account config.
10840+
func TestJetStreamAccountImportJSAdvisoriesAsStream(t *testing.T) {
10841+
conf := createConfFile(t, []byte(`
10842+
listen=127.0.0.1:-1
10843+
no_auth_user: pp
10844+
jetstream: {max_mem_store: 64GB, max_file_store: 10TB}
10845+
accounts {
10846+
JS {
10847+
jetstream: enabled
10848+
users: [ {user: pp, password: foo} ]
10849+
exports [
10850+
{ stream: '$JS.EVENT.ADVISORY.>' }
10851+
]
10852+
}
10853+
AGG {
10854+
users: [ {user: agg, password: foo} ]
10855+
imports: [
10856+
{ stream: { account: JS, subject: '$JS.EVENT.ADVISORY.>' }, to: '$JS.EVENT.ADVISORY.ACC.JS.>' }
10857+
]
10858+
}
10859+
}
10860+
`))
10861+
10862+
s, _ := RunServerWithConfig(conf)
10863+
if config := s.JetStreamConfig(); config != nil {
10864+
defer removeDir(t, config.StoreDir)
10865+
}
10866+
defer s.Shutdown()
10867+
10868+
// This should be the pp user, one which manages JetStream assets
10869+
ncJS, err := nats.Connect(s.ClientURL())
10870+
if err != nil {
10871+
t.Fatalf("Unexpected error during connect: %v", err)
10872+
}
10873+
defer ncJS.Close()
10874+
10875+
// This is the agg user, which should aggregate all JS advisory events.
10876+
ncAgg, err := nats.Connect(s.ClientURL(), nats.UserInfo("agg", "foo"))
10877+
if err != nil {
10878+
t.Fatalf("Unexpected error during connect: %v", err)
10879+
}
10880+
defer ncAgg.Close()
10881+
10882+
js, err := ncJS.JetStream()
10883+
if err != nil {
10884+
t.Fatalf("Unexpected error: %v", err)
10885+
}
10886+
10887+
// user from JS account should receive events on $JS.EVENT.ADVISORY.> subject
10888+
subJS, err := ncJS.SubscribeSync("$JS.EVENT.ADVISORY.>")
10889+
if err != nil {
10890+
t.Fatalf("Unexpected error: %v", err)
10891+
}
10892+
defer subJS.Unsubscribe()
10893+
10894+
// user from AGG account should receive events on mapped $JS.EVENT.ADVISORY.ACC.JS.> subject (with account name)
10895+
subAgg, err := ncAgg.SubscribeSync("$JS.EVENT.ADVISORY.ACC.JS.>")
10896+
if err != nil {
10897+
t.Fatalf("Unexpected error: %v", err)
10898+
}
10899+
10900+
// add stream using JS account
10901+
// this should trigger 2 events:
10902+
// - an action event on $JS.EVENT.ADVISORY.STREAM.CREATED.ORDERS
10903+
// - an api audit event on $JS.EVENT.ADVISORY.API
10904+
_, err = js.AddStream(&nats.StreamConfig{Name: "ORDERS", Subjects: []string{"ORDERS.*"}})
10905+
if err != nil {
10906+
t.Fatalf("Unexpected error adding stream: %v", err)
10907+
}
10908+
msg, err := subJS.NextMsg(time.Second)
10909+
if err != nil {
10910+
t.Fatalf("Unexpected error: %v", err)
10911+
}
10912+
if msg.Subject != "$JS.EVENT.ADVISORY.STREAM.CREATED.ORDERS" {
10913+
t.Fatalf("Unexpected subject: %q", msg.Subject)
10914+
}
10915+
msg, err = subJS.NextMsg(time.Second)
10916+
if err != nil {
10917+
t.Fatalf("Unexpected error: %v", err)
10918+
}
10919+
if msg.Subject != "$JS.EVENT.ADVISORY.API" {
10920+
t.Fatalf("Unexpected subject: %q", msg.Subject)
10921+
}
10922+
10923+
// same set of events should be received by AGG account
10924+
// on subjects containing account name (ACC.JS)
10925+
msg, err = subAgg.NextMsg(time.Second)
10926+
if err != nil {
10927+
t.Fatalf("Unexpected error: %v", err)
10928+
}
10929+
if msg.Subject != "$JS.EVENT.ADVISORY.ACC.JS.STREAM.CREATED.ORDERS" {
10930+
t.Fatalf("Unexpected subject: %q", msg.Subject)
10931+
}
10932+
10933+
// when using stream instead of service, we get all events
10934+
msg, err = subAgg.NextMsg(time.Second)
10935+
if err != nil {
10936+
t.Fatalf("Unexpected error: %v", err)
10937+
}
10938+
if msg.Subject != "$JS.EVENT.ADVISORY.ACC.JS.API" {
10939+
t.Fatalf("Unexpected subject: %q", msg.Subject)
10940+
}
10941+
}
10942+
1072610943
// This is for importing all of JetStream into another account for admin purposes.
1072710944
func TestJetStreamAccountImportAll(t *testing.T) {
1072810945
conf := createConfFile(t, []byte(`

0 commit comments

Comments
 (0)