Skip to content
This repository was archived by the owner on Sep 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions scheduler/routebroadcastscheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type RouteBroadcastScheduler struct {
externalServiceStart chan time.Duration
cfg *config.RouteEmitterConfig
logger lager.Logger
MaxGreetAttempts int
}

func NewRouteBroadcastScheduler(
Expand All @@ -44,7 +45,8 @@ func NewRouteBroadcastScheduler(
externalServiceStart: make(chan time.Duration),
cfg: cfg,

logger: logger.Session("route-broadcast-scheduler", lager.Data{"name": externalServiceName}),
logger: logger.Session("route-broadcast-scheduler", lager.Data{"name": externalServiceName}),
MaxGreetAttempts: 30,
}
}

Expand All @@ -60,22 +62,28 @@ func (s *RouteBroadcastScheduler) Run(signals <-chan os.Signal, ready chan<- str
return err
}

close(ready)
s.logger.Info("started")

var registerInterval time.Duration
retryGreetingTicker := s.clock.NewTicker(time.Second)

//keep trying to greet until we hear from the external service
greetAttempt := 0
GREET_LOOP:
for {
greetAttempt += 1
s.logger.Info("greeting-external-service")
err := s.greetExternalService(replyUuid.String())

if err != nil {
s.logger.Error("failed-to-greet-external-service", err)
return err
}

if greetAttempt == s.MaxGreetAttempts {
err = fmt.Errorf("attempted to greet '%v' %d times without success", s.externalServiceName, s.MaxGreetAttempts)
s.logger.Error("greeting-external-service.giving-up", err)
return err
}

select {
case registerInterval = <-s.externalServiceStart:
s.logger.Info("received-external-service-registry-interval", lager.Data{"interval": registerInterval.String()})
Expand All @@ -89,6 +97,9 @@ GREET_LOOP:
}
retryGreetingTicker.Stop()

close(ready)
s.logger.Info("started")

// now keep emitting at the desired interval
emitTicker := s.clock.NewTicker(registerInterval)

Expand Down
60 changes: 58 additions & 2 deletions scheduler/routebroadcastscheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scheduler_test
import (
"fmt"
"os"
"sync"
"time"

"code.cloudfoundry.org/clock/fakeclock"
Expand All @@ -28,6 +29,10 @@ var _ = Describe("RouteBroadcastScheduler", func() {
shutdown chan struct{}

natsStartMessages chan<- *nats.Msg

maxGreetAttempts int
blockTestOnProcessBecomingReady bool
expectProcessToExitWithFailure bool
)

testRouteBroadcastScheduler := func(prefix string) {
Expand All @@ -52,20 +57,28 @@ var _ = Describe("RouteBroadcastScheduler", func() {

return nil
})
maxGreetAttempts = 30
})

JustBeforeEach(func() {
logger := lagertest.NewTestLogger("test")
schedulerRunner = scheduler.NewRouteBroadcastScheduler(clock, natsClient, logger, prefix, cfg, emitCh)
schedulerRunner.MaxGreetAttempts = maxGreetAttempts

shutdown = make(chan struct{})

process = ifrit.Invoke(schedulerRunner)
if blockTestOnProcessBecomingReady {
process = ifrit.Invoke(schedulerRunner)
} else {
process = ifrit.Background(schedulerRunner)
}
})

AfterEach(func() {
process.Signal(os.Interrupt)
Eventually(process.Wait()).Should(Receive(BeNil()))
if !expectProcessToExitWithFailure {
Eventually(process.Wait()).Should(Receive(BeNil()))
}
close(shutdown)
close(natsStartMessages)
})
Expand Down Expand Up @@ -110,19 +123,62 @@ var _ = Describe("RouteBroadcastScheduler", func() {
})

Context("when the external service does not emit a *.start", func() {
BeforeEach(func() {
blockTestOnProcessBecomingReady = false
})

It("should keep greeting the external service until it gets an interval", func() {
isReady := false
var wg sync.WaitGroup
wg.Add(1)
go func() {
for {
select {
case <-process.Ready():
isReady = true
wg.Done()
return
}
}
}()

//get the first greeting
Eventually(greetings).Should(Receive())

//get the second greeting, and respond
clock.WaitForWatcherAndIncrement(time.Second)
var msg *nats.Msg
Eventually(greetings).Should(Receive(&msg))
Expect(isReady).To(Equal(false))

go natsClient.Publish(msg.Reply, []byte(`{"minimumRegisterIntervalInSeconds":1, "pruneThresholdInSeconds": 3}`))
wg.Wait()
Eventually(isReady).Should(Equal(true))

//should no longer be greeting the external service
Consistently(greetings).ShouldNot(Receive())
})

Context("when it has been greeting for a very long time", func() {
BeforeEach(func() {
maxGreetAttempts = 3
expectProcessToExitWithFailure = true
})

It("eventually fails", func() {
//get the first greeting
Eventually(greetings).Should(Receive())

//attempt to greet 3 times
clock.WaitForWatcherAndIncrement(time.Second)
clock.WaitForWatcherAndIncrement(time.Second)
clock.WaitForWatcherAndIncrement(time.Second)

//should exit with an error
err := <-process.Wait()
Expect(err).To(MatchError(ContainSubstring("attempted to greet")))
})
})
})

Context("after getting the first interval, when a second interval arrives", func() {
Expand Down