Skip to content

Commit

Permalink
changed hardware push request data to be type *Hardware
Browse files Browse the repository at this point in the history
  • Loading branch information
kqdeng committed Apr 27, 2020
1 parent 8ab34e8 commit 2f9e9fd
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 123 deletions.
8 changes: 7 additions & 1 deletion cli/tink/cmd/hardware/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ var pushCmd = &cobra.Command{
},
Run: func(cmd *cobra.Command, args []string) {
for _, j := range args {
if _, err := client.HardwareClient.Push(context.Background(), &hardware.PushRequest{Data: j}); err != nil {
fmt.Println(j)
hw := hardware.Hardware{}
err := json.Unmarshal([]byte(j), &hw)
if err != nil {
log.Fatal(err)
}
if _, err := client.HardwareClient.Push(context.Background(), &hardware.PushRequest{Data: &hw}); err != nil {
log.Fatal(err)
}
}
Expand Down
36 changes: 14 additions & 22 deletions grpc-server/hardware.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,31 @@ func (s *server) Push(ctx context.Context, in *hardware.PushRequest) (*hardware.
// must be a copy so deferred cacheInFlight.Dec matches the Inc
labels = prometheus.Labels{"method": "Push", "op": ""}

var h struct {
ID string
State string
}
err := json.Unmarshal([]byte(in.Data), &h)
if err != nil {
hw := in.Data
if hw.Id == "" {
metrics.CacheTotals.With(labels).Inc()
metrics.CacheErrors.With(labels).Inc()
err = errors.Wrap(err, "unmarshal json")
err := errors.New("id must be set to a UUID")
logger.Error(err)
return &hardware.Empty{}, err
}

if h.ID == "" {
metrics.CacheTotals.With(labels).Inc()
metrics.CacheErrors.With(labels).Inc()
err = errors.New("id must be set to a UUID")
logger.Error(err)
return &hardware.Empty{}, err
}

logger.With("id", h.ID).Info("data pushed")
logger.With("id", hw.Id).Info("data pushed")

var fn func() error
msg := ""
if h.State != "deleted" {
data, err := json.Marshal(hw)
if err != nil {
logger.Error(err)
}
if hw.Metadata.State != "deleted" {
labels["op"] = "insert"
msg = "inserting into DB"
fn = func() error { return db.InsertIntoDB(ctx, s.db, in.Data) }
fn = func() error { return db.InsertIntoDB(ctx, s.db, string(data)) }
} else {
msg = "deleting from DB"
labels["op"] = "delete"
fn = func() error { return db.DeleteFromDB(ctx, s.db, h.ID) }
fn = func() error { return db.DeleteFromDB(ctx, s.db, hw.Id) }
}

metrics.CacheTotals.With(labels).Inc()
Expand All @@ -75,12 +67,12 @@ func (s *server) Push(ctx context.Context, in *hardware.PushRequest) (*hardware.
}

s.watchLock.RLock()
if ch := s.watch[h.ID]; ch != nil {
if ch := s.watch[hw.Id]; ch != nil {
select {
case ch <- in.Data:
case ch <- string(data):
default:
metrics.WatchMissTotal.Inc()
logger.With("id", h.ID).Info("skipping blocked watcher")
logger.With("id", hw.Id ).Info("skipping blocked watcher")
}
}
s.watchLock.RUnlock()
Expand Down
Loading

0 comments on commit 2f9e9fd

Please sign in to comment.