Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions depot/containerstore/containerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ type containerStore struct {

enableUnproxiedPortMappings bool
advertisePreferenceForInstanceAddress bool

jsonMarshaller func(any) ([]byte, error)
}

func New(
Expand All @@ -110,6 +112,7 @@ func New(
cellID string,
enableUnproxiedPortMappings bool,
advertisePreferenceForInstanceAddress bool,
jsonMarshaller func(any) ([]byte, error),
) ContainerStore {
return &containerStore{
containerConfig: containerConfig,
Expand All @@ -133,6 +136,7 @@ func New(

enableUnproxiedPortMappings: enableUnproxiedPortMappings,
advertisePreferenceForInstanceAddress: advertisePreferenceForInstanceAddress,
jsonMarshaller: jsonMarshaller,
}
}

Expand Down Expand Up @@ -167,6 +171,7 @@ func (cs *containerStore) Reserve(logger lager.Logger, req *executor.AllocationR
cs.cellID,
cs.enableUnproxiedPortMappings,
cs.advertisePreferenceForInstanceAddress,
cs.jsonMarshaller,
))

if err != nil {
Expand Down Expand Up @@ -251,9 +256,7 @@ func (cs *containerStore) Update(logger lager.Logger, req *executor.UpdateReques
return err
}

node.Update(logger, req)

return nil
return node.Update(logger, req)
}

func (cs *containerStore) Stop(logger lager.Logger, guid string) error {
Expand Down
116 changes: 116 additions & 0 deletions depot/containerstore/containerstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package containerstore_test

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -159,6 +160,7 @@ var _ = Describe("Container Store", func() {
cellID,
true,
advertisePreferenceForInstanceAddress,
json.Marshal,
)

metronClient.SendDurationStub = func(name string, value time.Duration, opts ...loggregator.EmitGaugeOption) error {
Expand Down Expand Up @@ -484,6 +486,7 @@ var _ = Describe("Container Store", func() {
cellID,
true,
advertisePreferenceForInstanceAddress,
json.Marshal,
)
})

Expand Down Expand Up @@ -705,6 +708,7 @@ var _ = Describe("Container Store", func() {
cellID,
true,
advertisePreferenceForInstanceAddress,
json.Marshal,
)
})

Expand Down Expand Up @@ -1193,6 +1197,7 @@ var _ = Describe("Container Store", func() {
cellID,
true,
advertisePreferenceForInstanceAddress,
json.Marshal,
)

portMapping := []executor.PortMapping{
Expand Down Expand Up @@ -1282,6 +1287,7 @@ var _ = Describe("Container Store", func() {
cellID,
false,
advertisePreferenceForInstanceAddress,
json.Marshal,
)
})

Expand Down Expand Up @@ -2192,6 +2198,98 @@ var _ = Describe("Container Store", func() {
Expect(logStreamer.UpdateTagsArgsForCall(0)).To(Equal(metricTags))
})

It("updates the log config property on the container", func() {
err := containerStore.Update(logger, updateReq)
Expect(err).NotTo(HaveOccurred())

Expect(gardenContainer.SetPropertyCallCount()).To(Equal(1))
name, value := gardenContainer.SetPropertyArgsForCall(0)
Expect(name).To(Equal("log_config"))

var parsedValue map[string]interface{}
err = json.Unmarshal([]byte(value), &parsedValue)
Expect(err).NotTo(HaveOccurred())
Expect(parsedValue["guid"]).To(Equal("container-guid"))
Expect(parsedValue["index"]).To(Equal(1.0))
Expect(parsedValue["source_name"]).To(Equal("test-source"))
Expect(parsedValue["tags"]).To(Equal(map[string]interface{}{"some-tag": "some-value"}))
})

Context("when Update is called concurrently", func() {
JustBeforeEach(func() {
updateReq.InternalRoutes = nil
})
It("locks access to the container info", func() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
err := containerStore.Update(logger, updateReq)
Expect(err).NotTo(HaveOccurred())
wg.Done()
}()
go func() {
err := containerStore.Update(logger, updateReq)
Expect(err).NotTo(HaveOccurred())
wg.Done()
}()
wg.Wait()
})
})

Context("when the log config cannot be serialized", func() {
var fm failingMarshaller

BeforeEach(func() {
containerStore = containerstore.New(
containerConfig,
&totalCapacity,
gardenClient,
dependencyManager,
volumeManager,
credManager,
logManager,
clock,
eventEmitter,
megatron,
"/var/vcap/data/cf-system-trusted-certs",
metronClient,
rootFSSizer,
false,
"/var/vcap/packages/healthcheck",
proxyManager,
cellID,
true,
advertisePreferenceForInstanceAddress,
fm.Marshal,
)
})

It("releases the info lock", func() {
fm.fail = true
err := containerStore.Update(logger, updateReq)
Expect(err).To(HaveOccurred())

fm.fail = false
err = containerStore.Update(logger, updateReq)
Expect(err).NotTo(HaveOccurred())
})
})

Context("when there is an error updating the log config property on the container", func() {
BeforeEach(func() {
gardenContainer.SetPropertyReturns(errors.New("some-error"))
})
It("logs the error", func() {
err := containerStore.Update(logger, updateReq)
Expect(err).To(HaveOccurred())

Expect(gardenContainer.SetPropertyCallCount()).To(Equal(1))
Expect(logger).To(gbytes.Say("failed-to-set-log-config-property"))
Expect(logger).To(gbytes.Say(updateReq.Guid))
Expect(logger).To(gbytes.Say("some-error"))
})
})

Context("when internal routes are not provided", func() {
BeforeEach(func() {
updateReq.InternalRoutes = nil
Expand Down Expand Up @@ -2238,6 +2336,12 @@ var _ = Describe("Container Store", func() {
Expect(logStreamer.UpdateTagsCallCount()).To(Equal(0))
})

It("does not update the log config property on the container", func() {
err := containerStore.Update(logger, updateReq)
Expect(err).NotTo(HaveOccurred())

Expect(gardenContainer.SetPropertyCallCount()).To(Equal(0))
})
})

})
Expand Down Expand Up @@ -2726,6 +2830,7 @@ var _ = Describe("Container Store", func() {
cellID,
true,
advertisePreferenceForInstanceAddress,
json.Marshal,
)

signalled := credManagerRunnerSignalled
Expand Down Expand Up @@ -3305,3 +3410,14 @@ var _ = Describe("Container Store", func() {
})
})
})

type failingMarshaller struct {
fail bool
}

func (m *failingMarshaller) Marshal(v any) ([]byte, error) {
if m.fail {
return []byte{}, errors.New("marshalling failed")
}
return json.Marshal(v)
}
32 changes: 28 additions & 4 deletions depot/containerstore/storenode.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package containerstore

import (
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -95,6 +94,8 @@ type storeNode struct {

startTime time.Time
regenerateCertsCh chan struct{}

jsonMarshaller func(any) ([]byte, error)
}

func newStoreNode(
Expand All @@ -117,6 +118,7 @@ func newStoreNode(
cellID string,
enableUnproxiedPortMappings bool,
advertisePreferenceForInstanceAddress bool,
jsonMarshaller func(any) ([]byte, error),
) *storeNode {
return &storeNode{
config: config,
Expand All @@ -142,6 +144,7 @@ func newStoreNode(
enableUnproxiedPortMappings: enableUnproxiedPortMappings,
advertisePreferenceForInstanceAddress: advertisePreferenceForInstanceAddress,
regenerateCertsCh: make(chan struct{}, 1),
jsonMarshaller: jsonMarshaller,
}
}

Expand Down Expand Up @@ -178,7 +181,7 @@ func (n *storeNode) Initialize(logger lager.Logger, req *executor.RunRequest) er
n.infoLock.Lock()
defer n.infoLock.Unlock()

err := n.info.TransistionToInitialize(req)
err := n.info.TransitionToInitialize(req)
if err != nil {
logger.Error("failed-to-initialize", err)
return err
Expand Down Expand Up @@ -309,7 +312,7 @@ func (n *storeNode) gardenProperties(container *executor.Container) (garden.Prop
}
}
properties[executor.ContainerOwnerProperty] = n.config.OwnerName
logConfig, err := json.Marshal(container.LogConfig)
logConfig, err := n.jsonMarshaller(container.LogConfig)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -595,16 +598,30 @@ func (n *storeNode) run(logger lager.Logger, logStreamer log_streamer.LogStreame
n.completeWithError(logger, err)
}

func (n *storeNode) Update(logger lager.Logger, req *executor.UpdateRequest) {
func (n *storeNode) Update(logger lager.Logger, req *executor.UpdateRequest) error {
logger = logger.Session("node-update")

n.acquireOpLock(logger)
defer n.releaseOpLock(logger)

n.infoLock.Lock()

if req.InternalRoutes != nil {
n.info.InternalRoutes = req.InternalRoutes
}

var logConfigJSON []byte
if req.MetricTags != nil {
n.info.LogConfig.Tags = req.MetricTags
n.info.MetricsConfig.Tags = req.MetricTags

var err error
logConfigJSON, err = n.jsonMarshaller(n.info.LogConfig)
if err != nil {
logger.Error("failed-to-serialize-log-config", err, lager.Data{"guid": req.Guid})
n.infoLock.Unlock()
return err
}
}

n.infoLock.Unlock()
Expand All @@ -615,7 +632,14 @@ func (n *storeNode) Update(logger lager.Logger, req *executor.UpdateRequest) {

if req.MetricTags != nil {
n.logStreamer.UpdateTags(req.MetricTags)

err := n.gardenContainer.SetProperty("log_config", string(logConfigJSON))
if err != nil {
logger.Error("failed-to-set-log-config-property", err, lager.Data{"guid": req.Guid})
return err
}
}
return nil
}

func (n *storeNode) Stop(logger lager.Logger) {
Expand Down
2 changes: 2 additions & 0 deletions initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/rand"
"crypto/tls"
"crypto/x509"
"encoding/json"
"encoding/pem"
"errors"
"fmt"
Expand Down Expand Up @@ -328,6 +329,7 @@ func Initialize(logger lager.Logger, config ExecutorConfig, cellID, zone string,
cellID,
config.EnableUnproxiedPortMappings,
config.AdvertisePreferenceForInstanceAddress,
json.Marshal,
)

depotClient := depot.NewClient(
Expand Down
2 changes: 1 addition & 1 deletion resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *Container) ValidateTransitionTo(newState State) bool {
}
}

func (c *Container) TransistionToInitialize(req *RunRequest) error {
func (c *Container) TransitionToInitialize(req *RunRequest) error {
if !c.ValidateTransitionTo(StateInitializing) {
return ErrInvalidTransition
}
Expand Down