diff --git a/depot/containerstore/containerstore.go b/depot/containerstore/containerstore.go index 8de98bb0..cfdb457e 100644 --- a/depot/containerstore/containerstore.go +++ b/depot/containerstore/containerstore.go @@ -88,6 +88,8 @@ type containerStore struct { enableUnproxiedPortMappings bool advertisePreferenceForInstanceAddress bool + + jsonMarshaller func(any) ([]byte, error) } func New( @@ -110,6 +112,7 @@ func New( cellID string, enableUnproxiedPortMappings bool, advertisePreferenceForInstanceAddress bool, + jsonMarshaller func(any) ([]byte, error), ) ContainerStore { return &containerStore{ containerConfig: containerConfig, @@ -133,6 +136,7 @@ func New( enableUnproxiedPortMappings: enableUnproxiedPortMappings, advertisePreferenceForInstanceAddress: advertisePreferenceForInstanceAddress, + jsonMarshaller: jsonMarshaller, } } @@ -167,6 +171,7 @@ func (cs *containerStore) Reserve(logger lager.Logger, req *executor.AllocationR cs.cellID, cs.enableUnproxiedPortMappings, cs.advertisePreferenceForInstanceAddress, + cs.jsonMarshaller, )) if err != nil { @@ -251,9 +256,7 @@ func (cs *containerStore) Update(logger lager.Logger, req *executor.UpdateReques return err } - node.Update(logger, req) - - return nil + return node.Update(logger, req) } func (cs *containerStore) Stop(logger lager.Logger, guid string) error { diff --git a/depot/containerstore/containerstore_test.go b/depot/containerstore/containerstore_test.go index 7adfc8f4..d490cbbe 100644 --- a/depot/containerstore/containerstore_test.go +++ b/depot/containerstore/containerstore_test.go @@ -2,6 +2,7 @@ package containerstore_test import ( "bytes" + "encoding/json" "errors" "fmt" "io/ioutil" @@ -159,6 +160,7 @@ var _ = Describe("Container Store", func() { cellID, true, advertisePreferenceForInstanceAddress, + json.Marshal, ) metronClient.SendDurationStub = func(name string, value time.Duration, opts ...loggregator.EmitGaugeOption) error { @@ -484,6 +486,7 @@ var _ = Describe("Container Store", func() { cellID, true, advertisePreferenceForInstanceAddress, + json.Marshal, ) }) @@ -705,6 +708,7 @@ var _ = Describe("Container Store", func() { cellID, true, advertisePreferenceForInstanceAddress, + json.Marshal, ) }) @@ -1193,6 +1197,7 @@ var _ = Describe("Container Store", func() { cellID, true, advertisePreferenceForInstanceAddress, + json.Marshal, ) portMapping := []executor.PortMapping{ @@ -1282,6 +1287,7 @@ var _ = Describe("Container Store", func() { cellID, false, advertisePreferenceForInstanceAddress, + json.Marshal, ) }) @@ -2192,6 +2198,98 @@ var _ = Describe("Container Store", func() { Expect(logStreamer.UpdateTagsArgsForCall(0)).To(Equal(metricTags)) }) + It("updates the log config property on the container", func() { + err := containerStore.Update(logger, updateReq) + Expect(err).NotTo(HaveOccurred()) + + Expect(gardenContainer.SetPropertyCallCount()).To(Equal(1)) + name, value := gardenContainer.SetPropertyArgsForCall(0) + Expect(name).To(Equal("log_config")) + + var parsedValue map[string]interface{} + err = json.Unmarshal([]byte(value), &parsedValue) + Expect(err).NotTo(HaveOccurred()) + Expect(parsedValue["guid"]).To(Equal("container-guid")) + Expect(parsedValue["index"]).To(Equal(1.0)) + Expect(parsedValue["source_name"]).To(Equal("test-source")) + Expect(parsedValue["tags"]).To(Equal(map[string]interface{}{"some-tag": "some-value"})) + }) + + Context("when Update is called concurrently", func() { + JustBeforeEach(func() { + updateReq.InternalRoutes = nil + }) + It("locks access to the container info", func() { + var wg sync.WaitGroup + wg.Add(2) + go func() { + err := containerStore.Update(logger, updateReq) + Expect(err).NotTo(HaveOccurred()) + wg.Done() + }() + go func() { + err := containerStore.Update(logger, updateReq) + Expect(err).NotTo(HaveOccurred()) + wg.Done() + }() + wg.Wait() + }) + }) + + Context("when the log config cannot be serialized", func() { + var fm failingMarshaller + + BeforeEach(func() { + containerStore = containerstore.New( + containerConfig, + &totalCapacity, + gardenClient, + dependencyManager, + volumeManager, + credManager, + logManager, + clock, + eventEmitter, + megatron, + "/var/vcap/data/cf-system-trusted-certs", + metronClient, + rootFSSizer, + false, + "/var/vcap/packages/healthcheck", + proxyManager, + cellID, + true, + advertisePreferenceForInstanceAddress, + fm.Marshal, + ) + }) + + It("releases the info lock", func() { + fm.fail = true + err := containerStore.Update(logger, updateReq) + Expect(err).To(HaveOccurred()) + + fm.fail = false + err = containerStore.Update(logger, updateReq) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Context("when there is an error updating the log config property on the container", func() { + BeforeEach(func() { + gardenContainer.SetPropertyReturns(errors.New("some-error")) + }) + It("logs the error", func() { + err := containerStore.Update(logger, updateReq) + Expect(err).To(HaveOccurred()) + + Expect(gardenContainer.SetPropertyCallCount()).To(Equal(1)) + Expect(logger).To(gbytes.Say("failed-to-set-log-config-property")) + Expect(logger).To(gbytes.Say(updateReq.Guid)) + Expect(logger).To(gbytes.Say("some-error")) + }) + }) + Context("when internal routes are not provided", func() { BeforeEach(func() { updateReq.InternalRoutes = nil @@ -2238,6 +2336,12 @@ var _ = Describe("Container Store", func() { Expect(logStreamer.UpdateTagsCallCount()).To(Equal(0)) }) + It("does not update the log config property on the container", func() { + err := containerStore.Update(logger, updateReq) + Expect(err).NotTo(HaveOccurred()) + + Expect(gardenContainer.SetPropertyCallCount()).To(Equal(0)) + }) }) }) @@ -2726,6 +2830,7 @@ var _ = Describe("Container Store", func() { cellID, true, advertisePreferenceForInstanceAddress, + json.Marshal, ) signalled := credManagerRunnerSignalled @@ -3305,3 +3410,14 @@ var _ = Describe("Container Store", func() { }) }) }) + +type failingMarshaller struct { + fail bool +} + +func (m *failingMarshaller) Marshal(v any) ([]byte, error) { + if m.fail { + return []byte{}, errors.New("marshalling failed") + } + return json.Marshal(v) +} diff --git a/depot/containerstore/storenode.go b/depot/containerstore/storenode.go index 67023cdd..5f33a52e 100644 --- a/depot/containerstore/storenode.go +++ b/depot/containerstore/storenode.go @@ -1,7 +1,6 @@ package containerstore import ( - "encoding/json" "errors" "fmt" "io" @@ -95,6 +94,8 @@ type storeNode struct { startTime time.Time regenerateCertsCh chan struct{} + + jsonMarshaller func(any) ([]byte, error) } func newStoreNode( @@ -117,6 +118,7 @@ func newStoreNode( cellID string, enableUnproxiedPortMappings bool, advertisePreferenceForInstanceAddress bool, + jsonMarshaller func(any) ([]byte, error), ) *storeNode { return &storeNode{ config: config, @@ -142,6 +144,7 @@ func newStoreNode( enableUnproxiedPortMappings: enableUnproxiedPortMappings, advertisePreferenceForInstanceAddress: advertisePreferenceForInstanceAddress, regenerateCertsCh: make(chan struct{}, 1), + jsonMarshaller: jsonMarshaller, } } @@ -178,7 +181,7 @@ func (n *storeNode) Initialize(logger lager.Logger, req *executor.RunRequest) er n.infoLock.Lock() defer n.infoLock.Unlock() - err := n.info.TransistionToInitialize(req) + err := n.info.TransitionToInitialize(req) if err != nil { logger.Error("failed-to-initialize", err) return err @@ -309,7 +312,7 @@ func (n *storeNode) gardenProperties(container *executor.Container) (garden.Prop } } properties[executor.ContainerOwnerProperty] = n.config.OwnerName - logConfig, err := json.Marshal(container.LogConfig) + logConfig, err := n.jsonMarshaller(container.LogConfig) if err != nil { return nil, err } @@ -595,16 +598,30 @@ func (n *storeNode) run(logger lager.Logger, logStreamer log_streamer.LogStreame n.completeWithError(logger, err) } -func (n *storeNode) Update(logger lager.Logger, req *executor.UpdateRequest) { +func (n *storeNode) Update(logger lager.Logger, req *executor.UpdateRequest) error { + logger = logger.Session("node-update") + + n.acquireOpLock(logger) + defer n.releaseOpLock(logger) + n.infoLock.Lock() if req.InternalRoutes != nil { n.info.InternalRoutes = req.InternalRoutes } + var logConfigJSON []byte if req.MetricTags != nil { n.info.LogConfig.Tags = req.MetricTags n.info.MetricsConfig.Tags = req.MetricTags + + var err error + logConfigJSON, err = n.jsonMarshaller(n.info.LogConfig) + if err != nil { + logger.Error("failed-to-serialize-log-config", err, lager.Data{"guid": req.Guid}) + n.infoLock.Unlock() + return err + } } n.infoLock.Unlock() @@ -615,7 +632,14 @@ func (n *storeNode) Update(logger lager.Logger, req *executor.UpdateRequest) { if req.MetricTags != nil { n.logStreamer.UpdateTags(req.MetricTags) + + err := n.gardenContainer.SetProperty("log_config", string(logConfigJSON)) + if err != nil { + logger.Error("failed-to-set-log-config-property", err, lager.Data{"guid": req.Guid}) + return err + } } + return nil } func (n *storeNode) Stop(logger lager.Logger) { diff --git a/initializer/initializer.go b/initializer/initializer.go index 59aadb01..784bbd1e 100644 --- a/initializer/initializer.go +++ b/initializer/initializer.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "crypto/tls" "crypto/x509" + "encoding/json" "encoding/pem" "errors" "fmt" @@ -328,6 +329,7 @@ func Initialize(logger lager.Logger, config ExecutorConfig, cellID, zone string, cellID, config.EnableUnproxiedPortMappings, config.AdvertisePreferenceForInstanceAddress, + json.Marshal, ) depotClient := depot.NewClient( diff --git a/resources.go b/resources.go index f9a7230a..e09fa4e2 100644 --- a/resources.go +++ b/resources.go @@ -72,7 +72,7 @@ func (c *Container) ValidateTransitionTo(newState State) bool { } } -func (c *Container) TransistionToInitialize(req *RunRequest) error { +func (c *Container) TransitionToInitialize(req *RunRequest) error { if !c.ValidateTransitionTo(StateInitializing) { return ErrInvalidTransition }