From 6e51251cc1036b553e15a1db5973453d2528c706 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 11:47:10 -0700 Subject: [PATCH 01/30] consul: Adding new directory structs for KVS --- consul/structs/structs.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 5b32320f28e..93b66c25e0a 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -172,6 +172,33 @@ type IndexedCheckServiceNodes struct { Nodes CheckServiceNodes } +// DirEntry is used to represent a directory entry. This is +// used for values in our Key-Value store. +type DirEntry struct { + CreateIndex uint64 + ModifyIndex uint64 + Key string + Flags uint64 + Value []byte +} +type DirEntries []*DirEntry + +type KVSOp string + +const ( + KVSSet KVSOp = "set" + KVSGet = "get" // Key must match + KVSList = "list" // Key is only a prefix + KVSDelete = "delete" + KVSCAS = "cas" // Check-and-set +) + +// KVSRequest is used to operate on the Key-Value store +type KVSRequest struct { + Op KVSOp // Which operation are we performing + DirEnt DirEntry // Which directory entry +} + // Decode is used to decode a MsgPack encoded object func Decode(buf []byte, out interface{}) error { var handle codec.MsgpackHandle From a365b30792aca3a1ae740c567491783319fd7822 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 11:48:32 -0700 Subject: [PATCH 02/30] consul: Adding kvs table to state store --- consul/state_store.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index cf46b8adde2..d66c76d92e6 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -14,7 +14,8 @@ const ( dbNodes = "nodes" dbServices = "services" dbChecks = "checks" - dbMaxMapSize = 128 * 1024 * 1024 // 128MB maximum size + dbKVS = "kvs" + dbMaxMapSize = 512 * 1024 * 1024 // 512MB maximum size ) // The StateStore is responsible for maintaining all the Consul @@ -31,6 +32,7 @@ type StateStore struct { nodeTable *MDBTable serviceTable *MDBTable checkTable *MDBTable + kvsTable *MDBTable tables MDBTables watch map[*MDBTable]*NotifyGroup queryTables map[string]MDBTables @@ -183,8 +185,25 @@ func (s *StateStore) initialize() error { }, } + s.kvsTable = &MDBTable{ + Name: dbKVS, + Indexes: map[string]*MDBIndex{ + "id": &MDBIndex{ + Unique: true, + Fields: []string{"Key"}, + }, + }, + Decoder: func(buf []byte) interface{} { + out := new(structs.DirEntry) + if err := structs.Decode(buf, out); err != nil { + panic(err) + } + return out + }, + } + // Store the set of tables - s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable} + s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable, s.kvsTable} for _, table := range s.tables { table.Env = s.env table.Encoder = encoder From a6086e1750adc47b5f7e85c3726a869201f5a887 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 12:13:40 -0700 Subject: [PATCH 03/30] consul: Adding KVSDeleteTree operation --- consul/structs/structs.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 93b66c25e0a..b4c12f5abe0 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -186,11 +186,12 @@ type DirEntries []*DirEntry type KVSOp string const ( - KVSSet KVSOp = "set" - KVSGet = "get" // Key must match - KVSList = "list" // Key is only a prefix - KVSDelete = "delete" - KVSCAS = "cas" // Check-and-set + KVSSet KVSOp = "set" + KVSGet = "get" // Key must match + KVSList = "list" // Key is only a prefix + KVSDelete = "delete" + KVSDeleteTree = "delete-tree" + KVSCAS = "cas" // Check-and-set ) // KVSRequest is used to operate on the Key-Value store From 6744ee62be792587354539cf0ca6872cdbeaebbe Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 12:13:57 -0700 Subject: [PATCH 04/30] consul: Implement and test KV get and set --- consul/state_store.go | 63 ++++++++++++++++++++++++++++++ consul/state_store_test.go | 80 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+) diff --git a/consul/state_store.go b/consul/state_store.go index d66c76d92e6..b0496dcb651 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -705,6 +705,69 @@ func (s *StateStore) parseCheckServiceNodes(tx *MDBTxn, res []interface{}, err e return nodes } +// KVSSet is used to create or update a KV entry +func (s *StateStore) KVSSet(index uint64, d *structs.DirEntry) error { + // Start a new txn + tx, err := s.kvsTable.StartTxn(false, nil) + if err != nil { + return err + } + defer tx.Abort() + + // Get the existing node + res, err := s.kvsTable.GetTxn(tx, "id", d.Key) + if err != nil { + return err + } + + // Set the create and modify times + if len(res) == 0 { + d.CreateIndex = index + } else { + d.CreateIndex = res[0].(*structs.DirEntry).CreateIndex + } + d.ModifyIndex = index + + if err := s.kvsTable.InsertTxn(tx, d); err != nil { + return err + } + if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { + return err + } + defer s.watch[s.kvsTable].Notify() + return tx.Commit() +} + +// KVSGet is used to get a KV entry +func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) { + idx, res, err := s.kvsTable.Get("id", key) + var d *structs.DirEntry + if len(res) > 0 { + d = res[0].(*structs.DirEntry) + } + return idx, d, err +} + +// KVSList is used to list all KV entries with a prefix +func (s *StateStore) KVSList() (uint64, structs.DirEntries, error) { + return 0, nil, nil +} + +// KVSDelete is used to delete a KVS entry +func (s *StateStore) KVSDelete() error { + return nil +} + +// KVSDeleteTree is used to delete all keys with a given prefix +func (s *StateStore) KVSDeleteTree() error { + return nil +} + +// KVSCheckAndSet is used to perform an atomic check-and-set +func (s *StateStore) KVSCheckAndSet() error { + return nil +} + // Snapshot is used to create a point in time snapshot func (s *StateStore) Snapshot() (*StateSnapshot, error) { // Begin a new txn on all tables diff --git a/consul/state_store_test.go b/consul/state_store_test.go index de5ccd80588..87fd0ac02c2 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -933,3 +933,83 @@ func TestSS_Register_Deregister_Query(t *testing.T) { t.Fatalf("Bad: %v", nodes) } } + +func TestKVSSet_Get(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // Should not exist + idx, d, err := store.KVSGet("/foo") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 0 { + t.Fatalf("bad: %v", idx) + } + if d != nil { + t.Fatalf("bad: %v", d) + } + + // Create the entry + d = &structs.DirEntry{Key: "/foo", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Should exist exist + idx, d, err = store.KVSGet("/foo") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1000 { + t.Fatalf("bad: %v", idx) + } + if d.CreateIndex != 1000 { + t.Fatalf("bad: %v", d) + } + if d.ModifyIndex != 1000 { + t.Fatalf("bad: %v", d) + } + if d.Key != "/foo" { + t.Fatalf("bad: %v", d) + } + if d.Flags != 42 { + t.Fatalf("bad: %v", d) + } + if string(d.Value) != "test" { + t.Fatalf("bad: %v", d) + } + + // Update the entry + d = &structs.DirEntry{Key: "/foo", Flags: 43, Value: []byte("zip")} + if err := store.KVSSet(1010, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Should update + idx, d, err = store.KVSGet("/foo") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1010 { + t.Fatalf("bad: %v", idx) + } + if d.CreateIndex != 1000 { + t.Fatalf("bad: %v", d) + } + if d.ModifyIndex != 1010 { + t.Fatalf("bad: %v", d) + } + if d.Key != "/foo" { + t.Fatalf("bad: %v", d) + } + if d.Flags != 43 { + t.Fatalf("bad: %v", d) + } + if string(d.Value) != "zip" { + t.Fatalf("bad: %v", d) + } +} From f87c47424cb3d116ce72b32bb0c7a85887e089e9 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 12:24:06 -0700 Subject: [PATCH 05/30] consul: Implement KVSDelete --- consul/state_store.go | 22 ++++++++++++++++++++-- consul/state_store_test.go | 31 +++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index b0496dcb651..7b0808528c6 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -754,8 +754,26 @@ func (s *StateStore) KVSList() (uint64, structs.DirEntries, error) { } // KVSDelete is used to delete a KVS entry -func (s *StateStore) KVSDelete() error { - return nil +func (s *StateStore) KVSDelete(index uint64, key string) error { + // Start a new txn + tx, err := s.kvsTable.StartTxn(false, nil) + if err != nil { + return err + } + defer tx.Abort() + + num, err := s.kvsTable.DeleteTxn(tx, "id", key) + if err != nil { + return err + } + + if num > 0 { + if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { + return err + } + defer s.watch[s.kvsTable].Notify() + } + return tx.Commit() } // KVSDeleteTree is used to delete all keys with a given prefix diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 87fd0ac02c2..c2f64dbc7bf 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -1013,3 +1013,34 @@ func TestKVSSet_Get(t *testing.T) { t.Fatalf("bad: %v", d) } } + +func TestKVSDelete(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // Create the entry + d := &structs.DirEntry{Key: "/foo", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Delete the entry + if err := store.KVSDelete(1020, "/foo"); err != nil { + t.Fatalf("err: %v", err) + } + + // Should not exist + idx, d, err := store.KVSGet("/foo") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1020 { + t.Fatalf("bad: %v", idx) + } + if d != nil { + t.Fatalf("bad: %v", d) + } +} From 8d7c5fc9cd75d63a1891a4b4939736f41fe8341a Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 12:37:11 -0700 Subject: [PATCH 06/30] consul: Implementing Check-And-Set --- consul/state_store.go | 47 ++++++++++++++++++++++++++-- consul/state_store_test.go | 63 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 2 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index 7b0808528c6..3f1f9a14940 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -782,8 +782,51 @@ func (s *StateStore) KVSDeleteTree() error { } // KVSCheckAndSet is used to perform an atomic check-and-set -func (s *StateStore) KVSCheckAndSet() error { - return nil +func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) { + // Start a new txn + tx, err := s.kvsTable.StartTxn(false, nil) + if err != nil { + return false, err + } + defer tx.Abort() + + // Get the existing node + res, err := s.kvsTable.GetTxn(tx, "id", d.Key) + if err != nil { + return false, err + } + + // Get the existing node if any + var exist *structs.DirEntry + if len(res) > 0 { + exist = res[0].(*structs.DirEntry) + } + + // Use the ModifyIndex as the constraint. A modify of time of 0 + // means we are doing a set-if-not-exists, while any other value + // means we expect that modify time. + if d.ModifyIndex == 0 && exist != nil { + return false, nil + } else if d.ModifyIndex > 0 && (exist == nil || exist.ModifyIndex != d.ModifyIndex) { + return false, nil + } + + // Set the create and modify times + if exist == nil { + d.CreateIndex = index + } else { + d.CreateIndex = exist.CreateIndex + } + d.ModifyIndex = index + + if err := s.kvsTable.InsertTxn(tx, d); err != nil { + return false, err + } + if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { + return false, err + } + defer s.watch[s.kvsTable].Notify() + return true, tx.Commit() } // Snapshot is used to create a point in time snapshot diff --git a/consul/state_store_test.go b/consul/state_store_test.go index c2f64dbc7bf..c9a26e67639 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -1044,3 +1044,66 @@ func TestKVSDelete(t *testing.T) { t.Fatalf("bad: %v", d) } } + +func TestKVSCheckAndSet(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // CAS should fail, no entry + d := &structs.DirEntry{ + ModifyIndex: 100, + Key: "/foo", + Flags: 42, + Value: []byte("test"), + } + ok, err := store.KVSCheckAndSet(1000, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("unexpected commit") + } + + // Constrain on not-exist, should work + d.ModifyIndex = 0 + ok, err = store.KVSCheckAndSet(1001, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("expected commit") + } + + // Constrain on not-exist, should fail + d.ModifyIndex = 0 + ok, err = store.KVSCheckAndSet(1002, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("unexpected commit") + } + + // Constrain on a wrong modify time + d.ModifyIndex = 1000 + ok, err = store.KVSCheckAndSet(1003, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("unexpected commit") + } + + // Constrain on a correct modify time + d.ModifyIndex = 1001 + ok, err = store.KVSCheckAndSet(1004, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("expected commit") + } +} From 59703cbfae829051fc52dd93a562dfd1649da86a Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 13:06:29 -0700 Subject: [PATCH 07/30] consul: Adding support for virtual indexes --- consul/mdb_table.go | 59 ++++++++++++++++++++-- consul/mdb_table_test.go | 103 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+), 4 deletions(-) diff --git a/consul/mdb_table.go b/consul/mdb_table.go index 966024c06f8..58f27812431 100644 --- a/consul/mdb_table.go +++ b/consul/mdb_table.go @@ -49,10 +49,13 @@ type MDBIndex struct { Unique bool // Controls if values are unique Fields []string // Fields are used to build the index IdxFunc IndexFunc // Can be used to provide custom indexing + Virtual bool // Virtual index does not exist, but can be used for queries + RealIndex string // Virtual indexes use a RealIndex for iteration - table *MDBTable - name string - dbiName string + table *MDBTable + name string + dbiName string + realIndex *MDBIndex } // MDBTxn is used to wrap an underlying transaction @@ -88,6 +91,17 @@ func DefaultIndexFunc(idx *MDBIndex, parts []string) string { return prefix } +// DefaultIndexPrefixFunc can be used with DefaultIndexFunc to scan +// for index prefix values. This should only be used as part of a +// virtual index. +func DefaultIndexPrefixFunc(idx *MDBIndex, parts []string) string { + if len(parts) == 0 { + return "_" + } + prefix := "_" + strings.Join(parts, "||") + return prefix +} + // Init is used to initialize the MDBTable and ensure it's ready func (t *MDBTable) Init() error { if t.Env == nil { @@ -111,6 +125,9 @@ func (t *MDBTable) Init() error { if id.AllowBlank { return fmt.Errorf("id index must not allow blanks") } + if id.Virtual { + return fmt.Errorf("id index cannot be virtual") + } // Create the table if err := t.createTable(); err != nil { @@ -221,6 +238,9 @@ EXTEND: mdbTxn.dbis[t.Name] = dbi for _, index := range t.Indexes { + if index.Virtual { + continue + } dbi, err := index.openDBI(tx) if err != nil { tx.Abort() @@ -237,6 +257,9 @@ func (t *MDBTable) objIndexKeys(obj interface{}) (map[string][]byte, error) { // Construct the indexes keys indexes := make(map[string][]byte) for name, index := range t.Indexes { + if index.Virtual { + continue + } key, err := index.keyFromObject(obj) if err != nil { return nil, err @@ -301,6 +324,9 @@ AFTER_DELETE: // Insert the new indexes for name, index := range t.Indexes { + if index.Virtual { + continue + } dbi := tx.dbis[index.dbiName] if err := tx.tx.Put(dbi, indexes[name], encRowId, 0); err != nil { return err @@ -427,6 +453,12 @@ func (t *MDBTable) deleteWithIndex(tx *MDBTxn, idx *MDBIndex, key []byte) (num i if name == idx.name { continue } + if idx.Virtual && name == idx.RealIndex { + continue + } + if otherIdx.Virtual { + continue + } dbi := tx.dbis[otherIdx.dbiName] if err := tx.tx.Del(dbi, indexes[name], encRowId); err != nil { panic(err) @@ -464,11 +496,23 @@ func (i *MDBIndex) init(table *MDBTable, name string) error { if err := i.createIndex(); err != nil { return err } + // Verify real index exists + if i.Virtual { + if realIndex, ok := table.Indexes[i.RealIndex]; !ok { + return fmt.Errorf("real index '%s' missing", i.RealIndex) + } else { + i.realIndex = realIndex + } + } return nil } // createIndex is used to ensure the index exists func (i *MDBIndex) createIndex() error { + // Do not create if this is a virtual index + if i.Virtual { + return nil + } tx, err := i.table.Env.BeginTxn(nil, 0) if err != nil { return err @@ -529,7 +573,14 @@ func (i *MDBIndex) keyFromParts(parts ...string) []byte { func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte, cb func(encRowId, res []byte) bool) error { table := tx.dbis[i.table.Name] - dbi := tx.dbis[i.dbiName] + + // If virtual, use the correct DBI + var dbi mdb.DBI + if i.Virtual { + dbi = tx.dbis[i.realIndex.dbiName] + } else { + dbi = tx.dbis[i.dbiName] + } cursor, err := tx.tx.CursorOpen(dbi) if err != nil { diff --git a/consul/mdb_table_test.go b/consul/mdb_table_test.go index 13bfff5b191..061240df288 100644 --- a/consul/mdb_table_test.go +++ b/consul/mdb_table_test.go @@ -781,3 +781,106 @@ func TestMDBTableDelete_Prefix(t *testing.T) { t.Fatalf("expect 2 result: %#v", res) } } + +func TestMDBTableVirtualIndex(t *testing.T) { + dir, env := testMDBEnv(t) + defer os.RemoveAll(dir) + defer env.Close() + + table := &MDBTable{ + Env: env, + Name: "test", + Indexes: map[string]*MDBIndex{ + "id": &MDBIndex{ + Unique: true, + Fields: []string{"First"}, + }, + "id_prefix": &MDBIndex{ + Virtual: true, + RealIndex: "id", + Fields: []string{"First"}, + IdxFunc: DefaultIndexPrefixFunc, + }, + }, + Encoder: MockEncoder, + Decoder: MockDecoder, + } + if err := table.Init(); err != nil { + t.Fatalf("err: %v", err) + } + + if table.lastRowID != 0 { + t.Fatalf("bad last row id: %d", table.lastRowID) + } + + objs := []*MockData{ + &MockData{ + Key: "1", + First: "Jack", + Last: "Smith", + Country: "USA", + }, + &MockData{ + Key: "2", + First: "John", + Last: "Wang", + Country: "USA", + }, + &MockData{ + Key: "3", + First: "James", + Last: "Torres", + Country: "Mexico", + }, + } + + // Insert some mock objects + for idx, obj := range objs { + if err := table.Insert(obj); err != nil { + t.Fatalf("err: %v", err) + } + if err := table.SetLastIndex(uint64(4 * idx)); err != nil { + t.Fatalf("err: %v", err) + } + } + + if table.lastRowID != 3 { + t.Fatalf("bad last row id: %d", table.lastRowID) + } + + if idx, _ := table.LastIndex(); idx != 8 { + t.Fatalf("bad last idx: %d", idx) + } + + _, res, err := table.Get("id_prefix", "J") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 3 { + t.Fatalf("expect 3 result: %#v", res) + } + + _, res, err = table.Get("id_prefix", "Ja") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 2 { + t.Fatalf("expect 2 result: %#v", res) + } + + num, err := table.Delete("id_prefix", "Ja") + if err != nil { + t.Fatalf("err: %v", err) + } + if num != 2 { + t.Fatalf("expect 2 result: %#v", num) + } + + _, res, err = table.Get("id_prefix", "J") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 1 { + t.Fatalf("expect 1 result: %#v", res) + } +} From dfb8c03659674dd099dd07bff19bc1b91146fa1a Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 13:12:35 -0700 Subject: [PATCH 08/30] consul: Adding support for KVSList --- consul/state_store.go | 16 +++++++++-- consul/state_store_test.go | 56 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index 3f1f9a14940..2d9a4f5b80d 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -192,6 +192,12 @@ func (s *StateStore) initialize() error { Unique: true, Fields: []string{"Key"}, }, + "id_prefix": &MDBIndex{ + Virtual: true, + RealIndex: "id", + Fields: []string{"Key"}, + IdxFunc: DefaultIndexPrefixFunc, + }, }, Decoder: func(buf []byte) interface{} { out := new(structs.DirEntry) @@ -749,8 +755,13 @@ func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) { } // KVSList is used to list all KV entries with a prefix -func (s *StateStore) KVSList() (uint64, structs.DirEntries, error) { - return 0, nil, nil +func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) { + idx, res, err := s.kvsTable.Get("id_prefix", prefix) + ents := make(structs.DirEntries, len(res)) + for idx, r := range res { + ents[idx] = r.(*structs.DirEntry) + } + return idx, ents, err } // KVSDelete is used to delete a KVS entry @@ -778,6 +789,7 @@ func (s *StateStore) KVSDelete(index uint64, key string) error { // KVSDeleteTree is used to delete all keys with a given prefix func (s *StateStore) KVSDeleteTree() error { + // TODO: return nil } diff --git a/consul/state_store_test.go b/consul/state_store_test.go index c9a26e67639..36d19629fd5 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -1107,3 +1107,59 @@ func TestKVSCheckAndSet(t *testing.T) { t.Fatalf("expected commit") } } + +func TestKVS_List(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // Should not exist + idx, ents, err := store.KVSList("/web") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 0 { + t.Fatalf("bad: %v", idx) + } + if len(ents) != 0 { + t.Fatalf("bad: %v", ents) + } + + // Create the entries + d := &structs.DirEntry{Key: "/web/a", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/b", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1001, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/sub/c", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1002, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Should list + idx, ents, err = store.KVSList("/web") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1002 { + t.Fatalf("bad: %v", idx) + } + if len(ents) != 3 { + t.Fatalf("bad: %v", ents) + } + + if ents[0].Key != "/web/a" { + t.Fatalf("bad: %v", ents[0]) + } + if ents[1].Key != "/web/b" { + t.Fatalf("bad: %v", ents[1]) + } + if ents[2].Key != "/web/sub/c" { + t.Fatalf("bad: %v", ents[2]) + } +} From 860cfd7497aecf3de4bc243fda66d9aa49137083 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 13:20:08 -0700 Subject: [PATCH 09/30] consul: Support DeleteTree --- consul/state_store.go | 18 +++++++++------ consul/state_store_test.go | 46 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 7 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index 2d9a4f5b80d..aa8a59fe0b9 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -766,6 +766,16 @@ func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) // KVSDelete is used to delete a KVS entry func (s *StateStore) KVSDelete(index uint64, key string) error { + return s.kvsDeleteWithIndex(index, "id", key) +} + +// KVSDeleteTree is used to delete all keys with a given prefix +func (s *StateStore) KVSDeleteTree(index uint64, prefix string) error { + return s.kvsDeleteWithIndex(index, "id_prefix", prefix) +} + +// kvsDeleteWithIndex does a delete with either the id or id_prefix +func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex, val string) error { // Start a new txn tx, err := s.kvsTable.StartTxn(false, nil) if err != nil { @@ -773,7 +783,7 @@ func (s *StateStore) KVSDelete(index uint64, key string) error { } defer tx.Abort() - num, err := s.kvsTable.DeleteTxn(tx, "id", key) + num, err := s.kvsTable.DeleteTxn(tx, tableIndex, val) if err != nil { return err } @@ -787,12 +797,6 @@ func (s *StateStore) KVSDelete(index uint64, key string) error { return tx.Commit() } -// KVSDeleteTree is used to delete all keys with a given prefix -func (s *StateStore) KVSDeleteTree() error { - // TODO: - return nil -} - // KVSCheckAndSet is used to perform an atomic check-and-set func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) { // Start a new txn diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 36d19629fd5..f37c8b5470c 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -1163,3 +1163,49 @@ func TestKVS_List(t *testing.T) { t.Fatalf("bad: %v", ents[2]) } } + +func TestKVSDeleteTree(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // Should not exist + err = store.KVSDeleteTree(1000, "/web") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create the entries + d := &structs.DirEntry{Key: "/web/a", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/b", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1001, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/sub/c", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1002, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Nuke the web tree + err = store.KVSDeleteTree(1010, "/web") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Nothing should list + idx, ents, err := store.KVSList("/web") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1010 { + t.Fatalf("bad: %v", idx) + } + if len(ents) != 0 { + t.Fatalf("bad: %v", ents) + } +} From 9cfea72d2ac7d7f50ed2ad88446e2055194f5d12 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 13:31:31 -0700 Subject: [PATCH 10/30] consul: Snapshot KVS store support --- consul/state_store.go | 14 ++++++++++++++ consul/state_store_test.go | 28 +++++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/consul/state_store.go b/consul/state_store.go index aa8a59fe0b9..2d01be2c7d5 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -901,3 +901,17 @@ func (s *StateSnapshot) NodeChecks(node string) structs.HealthChecks { _, checks := s.store.parseHealthChecks(s.lastIndex, res, err) return checks } + +// KVSDump is used to list all KV entries +func (s *StateSnapshot) KVSDump() structs.DirEntries { + res, err := s.store.kvsTable.GetTxn(s.tx, "id") + if err != nil { + s.store.logger.Printf("[ERR] consul.state: Failed to get KVS entries: %v", err) + return nil + } + ents := make(structs.DirEntries, len(res)) + for idx, r := range res { + ents[idx] = r.(*structs.DirEntry) + } + return ents +} diff --git a/consul/state_store_test.go b/consul/state_store_test.go index f37c8b5470c..e03e0ad76d9 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -550,6 +550,16 @@ func TestStoreSnapshot(t *testing.T) { t.Fatalf("err: %v") } + // Add some KVS entries + d := &structs.DirEntry{Key: "/web/a", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(14, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/b", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(15, d); err != nil { + t.Fatalf("err: %v", err) + } + // Take a snapshot snap, err := store.Snapshot() if err != nil { @@ -558,7 +568,7 @@ func TestStoreSnapshot(t *testing.T) { defer snap.Close() // Check the last nodes - if idx := snap.LastIndex(); idx != 13 { + if idx := snap.LastIndex(); idx != 15 { t.Fatalf("bad: %v", idx) } @@ -591,6 +601,12 @@ func TestStoreSnapshot(t *testing.T) { t.Fatalf("bad: %v", checks[0]) } + // Check we have the entries + ents := snap.KVSDump() + if len(ents) != 2 { + t.Fatalf("missing KVS entries!") + } + // Make some changes! if err := store.EnsureService(14, "foo", &structs.NodeService{"db", "db", "slave", 8000}); err != nil { t.Fatalf("err: %v", err) @@ -612,6 +628,10 @@ func TestStoreSnapshot(t *testing.T) { t.Fatalf("err: %v") } + if err := store.KVSDelete(18, "/web/a"); err != nil { + t.Fatalf("err: %v") + } + // Check snapshot has old values nodes = snap.Nodes() if len(nodes) != 2 { @@ -639,6 +659,12 @@ func TestStoreSnapshot(t *testing.T) { if !reflect.DeepEqual(checks[0], check) { t.Fatalf("bad: %v", checks[0]) } + + // Check we have the entries + ents = snap.KVSDump() + if len(ents) != 2 { + t.Fatalf("missing KVS entries!") + } } func TestEnsureCheck(t *testing.T) { From f91e12fe300d6e387961e39df06e7ef6273547fc Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 13:41:43 -0700 Subject: [PATCH 11/30] consul: Adding FSM support for KVS operations --- consul/fsm.go | 29 +++++++++++++++++++++++++++++ consul/structs/structs.go | 1 + 2 files changed, 30 insertions(+) diff --git a/consul/fsm.go b/consul/fsm.go index 4646f7e4f1b..849d75d86a8 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -65,6 +65,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} { return c.decodeRegister(buf[1:], log.Index) case structs.DeregisterRequestType: return c.applyDeregister(buf[1:], log.Index) + case structs.KVSRequestType: + return c.applyKVSOperation(buf[1:], log.Index) default: panic(fmt.Errorf("failed to apply request: %#v", buf)) } @@ -131,6 +133,32 @@ func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} { return nil } +func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} { + var req structs.KVSRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + switch req.Op { + case structs.KVSSet: + return c.state.KVSSet(index, &req.DirEnt) + case structs.KVSDelete: + return c.state.KVSDelete(index, req.DirEnt.Key) + case structs.KVSDeleteTree: + return c.state.KVSDeleteTree(index, req.DirEnt.Key) + case structs.KVSCAS: + act, err := c.state.KVSCheckAndSet(index, &req.DirEnt) + if err != nil { + return err + } else { + return act + } + default: + c.logger.Printf("[WARN] consul.fsm: Invalid KVS operation '%s'", req.Op) + return fmt.Errorf("Invalid KVS operation '%s'", req.Op) + } + return nil +} + func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) { defer func(start time.Time) { c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start)) @@ -152,6 +180,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { if err != nil { return err } + c.state.Close() c.state = state // Create a decoder diff --git a/consul/structs/structs.go b/consul/structs/structs.go index b4c12f5abe0..5d898be9a90 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -18,6 +18,7 @@ type MessageType uint8 const ( RegisterRequestType MessageType = iota DeregisterRequestType + KVSRequestType ) const ( From c31232fb703753585d367f2c5ac6816a1db75cf5 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 13:56:16 -0700 Subject: [PATCH 12/30] consul: FSM support to snapshot/restore KVS values --- consul/fsm.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/consul/fsm.go b/consul/fsm.go index 849d75d86a8..739e5d92253 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -213,6 +213,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { } c.applyRegister(&req, header.LastIndex) + case structs.KVSRequestType: + var req structs.DirEntry + if err := dec.Decode(&req); err != nil { + return err + } + if err := c.state.KVSSet(req.CreateIndex, &req); err != nil { + return err + } + default: return fmt.Errorf("Unrecognized msg type: %v", msgType) } @@ -276,6 +285,21 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { } } } + + // Enable GC of the ndoes + nodes = nil + + // Dump the KVS entries + dirents := s.state.KVSDump() + for _, ent := range dirents { + // Register the node itself + sink.Write([]byte{byte(structs.KVSRequestType)}) + if err := encoder.Encode(ent); err != nil { + sink.Cancel() + return err + } + } + return nil } From 6ca5c7c49d16034d6dad4a5ba6ffb01b7f97bc26 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 14:13:03 -0700 Subject: [PATCH 13/30] consul: Testing FSM application of KVS commands --- consul/fsm_test.go | 198 ++++++++++++++++++++++++++++++++++++++ consul/structs/structs.go | 5 +- 2 files changed, 201 insertions(+), 2 deletions(-) diff --git a/consul/fsm_test.go b/consul/fsm_test.go index 5c210db06ea..24f7c7cbbfe 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -322,6 +322,10 @@ func TestFSM_SnapshotRestore(t *testing.T) { Status: structs.HealthPassing, ServiceID: "web", }) + fsm.state.KVSSet(8, &structs.DirEntry{ + Key: "/test", + Value: []byte("foo"), + }) // Snapshot snap, err := fsm.Snapshot() @@ -370,4 +374,198 @@ func TestFSM_SnapshotRestore(t *testing.T) { if len(checks) != 1 { t.Fatalf("Bad: %v", checks) } + + // Verify key is set + _, d, err := fsm.state.KVSGet("/test") + if err != nil { + t.Fatalf("err: %v", err) + } + if string(d.Value) != "foo" { + t.Fatalf("bad: %v", d) + } +} + +func TestFSM_KVSSet(t *testing.T) { + fsm, err := NewFSM(os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + defer fsm.Close() + + req := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Flags: 0, + Value: []byte("test"), + }, + } + buf, err := structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify key is set + _, d, err := fsm.state.KVSGet("/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d == nil { + t.Fatalf("missing") + } +} + +func TestFSM_KVSDelete(t *testing.T) { + fsm, err := NewFSM(os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + defer fsm.Close() + + req := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Flags: 0, + Value: []byte("test"), + }, + } + buf, err := structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Run the delete + req.Op = structs.KVSDelete + buf, err = structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify key is not set + _, d, err := fsm.state.KVSGet("/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d != nil { + t.Fatalf("key present") + } +} + +func TestFSM_KVSDeleteTree(t *testing.T) { + fsm, err := NewFSM(os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + defer fsm.Close() + + req := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Flags: 0, + Value: []byte("test"), + }, + } + buf, err := structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Run the delete tree + req.Op = structs.KVSDeleteTree + req.DirEnt.Key = "/test" + buf, err = structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify key is not set + _, d, err := fsm.state.KVSGet("/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d != nil { + t.Fatalf("key present") + } +} + +func TestFSM_KVSCheckAndSet(t *testing.T) { + fsm, err := NewFSM(os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + defer fsm.Close() + + req := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Flags: 0, + Value: []byte("test"), + }, + } + buf, err := structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify key is set + _, d, err := fsm.state.KVSGet("/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d == nil { + t.Fatalf("key missing") + } + + // Run the check-and-set + req.Op = structs.KVSCAS + req.DirEnt.ModifyIndex = d.ModifyIndex + req.DirEnt.Value = []byte("zip") + buf, err = structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = fsm.Apply(makeLog(buf)) + if resp.(bool) != true { + t.Fatalf("resp: %v", resp) + } + + // Verify key is updated + _, d, err = fsm.state.KVSGet("/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if string(d.Value) != "zip" { + t.Fatalf("bad: %v", d) + } } diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 5d898be9a90..8569f612c73 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -197,8 +197,9 @@ const ( // KVSRequest is used to operate on the Key-Value store type KVSRequest struct { - Op KVSOp // Which operation are we performing - DirEnt DirEntry // Which directory entry + Datacenter string + Op KVSOp // Which operation are we performing + DirEnt DirEntry // Which directory entry } // Decode is used to decode a MsgPack encoded object From 47ad40a925b7224c1efeb424a45a2f72d547045d Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 14:15:49 -0700 Subject: [PATCH 14/30] consul: Adding KVS RPC endpoint --- consul/kvs_endpoint.go | 8 ++++++++ consul/server.go | 3 +++ 2 files changed, 11 insertions(+) create mode 100644 consul/kvs_endpoint.go diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go new file mode 100644 index 00000000000..9d47bfe49b7 --- /dev/null +++ b/consul/kvs_endpoint.go @@ -0,0 +1,8 @@ +package consul + +import () + +// KVS endpoint is used to manipulate the Key-Value store +type KVS struct { + srv *Server +} diff --git a/consul/server.go b/consul/server.go index ccd28e98113..9a4a6a59f05 100644 --- a/consul/server.go +++ b/consul/server.go @@ -101,6 +101,7 @@ type endpoints struct { Health *Health Raft *Raft Status *Status + KVS *KVS } // NewServer is used to construct a new Consul server from the @@ -276,12 +277,14 @@ func (s *Server) setupRPC() error { s.endpoints.Raft = &Raft{s} s.endpoints.Catalog = &Catalog{s} s.endpoints.Health = &Health{s} + s.endpoints.KVS = &KVS{s} // Register the handlers s.rpcServer.Register(s.endpoints.Status) s.rpcServer.Register(s.endpoints.Raft) s.rpcServer.Register(s.endpoints.Catalog) s.rpcServer.Register(s.endpoints.Health) + s.rpcServer.Register(s.endpoints.KVS) list, err := net.ListenTCP("tcp", s.config.RPCAddr) if err != nil { From 3e9dc6d8b64e50ac0ad26d102c04c02274e022d8 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 16:00:23 -0700 Subject: [PATCH 15/30] consul: First pass at KVS endpoints for RPC --- consul/kvs_endpoint.go | 97 ++++++++++++++++++++++++++++++++++++++- consul/state_store.go | 2 + consul/structs/structs.go | 14 +++++- 3 files changed, 110 insertions(+), 3 deletions(-) diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 9d47bfe49b7..1649405328e 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -1,8 +1,103 @@ package consul -import () +import ( + "fmt" + "github.com/armon/go-metrics" + "github.com/hashicorp/consul/consul/structs" + "time" +) // KVS endpoint is used to manipulate the Key-Value store type KVS struct { srv *Server } + +// Apply is used to apply a KVS request to the data store. This should +// only be used for operations that modify the data +func (c *Catalog) Apply(args *structs.KVSRequest, reply *bool) error { + if done, err := c.srv.forward("KVS.Apply", args.Datacenter, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"consul", "kvs", "apply"}, time.Now()) + + // Verify the args + if args.DirEnt.Key == "" { + return fmt.Errorf("Must provide key") + } + + // Apply the update + resp, err := c.srv.raftApply(structs.KVSRequestType, args) + if err != nil { + c.srv.logger.Printf("[ERR] consul.kvs: Apply failed: %v", err) + return err + } + if respErr, ok := resp.(error); ok { + return respErr + } + + // Check if the return type is a bool + if respBool, ok := resp.(bool); ok { + *reply = respBool + } + return nil +} + +// Get is used to lookup a single key +func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error { + if done, err := k.srv.forward("KVS.Get", args.Datacenter, args, reply); done { + return err + } + + // Get the local state + state := k.srv.fsm.State() + return k.srv.blockingRPC(&args.BlockingQuery, + state.QueryTables("KVSGet"), + func() (uint64, error) { + index, ent, err := state.KVSGet(args.Key) + if err != nil { + return 0, err + } + if ent == nil { + reply.Index = index + reply.Entries = nil + } else { + reply.Index = ent.ModifyIndex + reply.Entries = structs.DirEntries{ent} + } + return reply.Index, nil + }) +} + +// List is used to list all keys with a given prefix +func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error { + if done, err := k.srv.forward("KVS.List", args.Datacenter, args, reply); done { + return err + } + + // Get the local state + state := k.srv.fsm.State() + return k.srv.blockingRPC(&args.BlockingQuery, + state.QueryTables("KVSList"), + func() (uint64, error) { + index, ent, err := state.KVSList(args.Key) + if err != nil { + return 0, err + } + if len(ent) == 0 { + reply.Index = index + reply.Entries = nil + } else { + // Determine the maximum affected index + var maxIndex uint64 + for _, e := range ent { + if e.ModifyIndex > maxIndex { + maxIndex = e.ModifyIndex + } + } + + reply.Index = maxIndex + reply.Entries = ent + } + return reply.Index, nil + }) +} diff --git a/consul/state_store.go b/consul/state_store.go index 2d01be2c7d5..11b627a9d9b 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -231,6 +231,8 @@ func (s *StateStore) initialize() error { "NodeChecks": MDBTables{s.checkTable}, "ServiceChecks": MDBTables{s.checkTable}, "CheckServiceNodes": MDBTables{s.nodeTable, s.serviceTable, s.checkTable}, + "KVSGet": MDBTables{s.kvsTable}, + "KVSList": MDBTables{s.kvsTable}, } return nil } diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 8569f612c73..638749cfc09 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -188,8 +188,6 @@ type KVSOp string const ( KVSSet KVSOp = "set" - KVSGet = "get" // Key must match - KVSList = "list" // Key is only a prefix KVSDelete = "delete" KVSDeleteTree = "delete-tree" KVSCAS = "cas" // Check-and-set @@ -202,6 +200,18 @@ type KVSRequest struct { DirEnt DirEntry // Which directory entry } +// KeyRequest is used to request a key, or key prefix +type KeyRequest struct { + Datacenter string + Key string + BlockingQuery +} + +type IndexedDirEntries struct { + Index uint64 + Entries DirEntries +} + // Decode is used to decode a MsgPack encoded object func Decode(buf []byte, out interface{}) error { var handle codec.MsgpackHandle From fc5110405ce1284081af37b249f43d32f470642f Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 16:10:49 -0700 Subject: [PATCH 16/30] consul: Adding tests for endpoint method --- consul/kvs_endpoint.go | 8 ++--- consul/kvs_endpoint_test.go | 65 +++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 4 deletions(-) create mode 100644 consul/kvs_endpoint_test.go diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 1649405328e..20d62878742 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -14,8 +14,8 @@ type KVS struct { // Apply is used to apply a KVS request to the data store. This should // only be used for operations that modify the data -func (c *Catalog) Apply(args *structs.KVSRequest, reply *bool) error { - if done, err := c.srv.forward("KVS.Apply", args.Datacenter, args, reply); done { +func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { + if done, err := k.srv.forward("KVS.Apply", args.Datacenter, args, reply); done { return err } defer metrics.MeasureSince([]string{"consul", "kvs", "apply"}, time.Now()) @@ -26,9 +26,9 @@ func (c *Catalog) Apply(args *structs.KVSRequest, reply *bool) error { } // Apply the update - resp, err := c.srv.raftApply(structs.KVSRequestType, args) + resp, err := k.srv.raftApply(structs.KVSRequestType, args) if err != nil { - c.srv.logger.Printf("[ERR] consul.kvs: Apply failed: %v", err) + k.srv.logger.Printf("[ERR] consul.kvs: Apply failed: %v", err) return err } if respErr, ok := resp.(error); ok { diff --git a/consul/kvs_endpoint_test.go b/consul/kvs_endpoint_test.go new file mode 100644 index 00000000000..6726bc0f7db --- /dev/null +++ b/consul/kvs_endpoint_test.go @@ -0,0 +1,65 @@ +package consul + +import ( + "github.com/hashicorp/consul/consul/structs" + "os" + "testing" + "time" +) + +func TestKVS_Apply(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + // Wait for leader + time.Sleep(100 * time.Millisecond) + + arg := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "test", + Flags: 42, + Value: []byte("test"), + }, + } + var out bool + if err := client.Call("KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Verify + state := s1.fsm.State() + _, d, err := state.KVSGet("test") + if err != nil { + t.Fatalf("err: %v", err) + } + if d == nil { + t.Fatalf("should not be nil") + } + + // Do a check and set + arg.Op = structs.KVSCAS + arg.DirEnt.ModifyIndex = d.ModifyIndex + arg.DirEnt.Flags = 43 + if err := client.Call("KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Check this was applied + if out != true { + t.Fatalf("bad: %v", out) + } + + // Verify + _, d, err = state.KVSGet("test") + if err != nil { + t.Fatalf("err: %v", err) + } + if d.Flags != 43 { + t.Fatalf("bad: %v", d) + } +} From 1b2dd17f93a414607d15753f0a5246b4200e859a Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 16:18:44 -0700 Subject: [PATCH 17/30] consul: Test the remaining KVS endpoints --- consul/kvs_endpoint_test.go | 108 ++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/consul/kvs_endpoint_test.go b/consul/kvs_endpoint_test.go index 6726bc0f7db..d116e82c593 100644 --- a/consul/kvs_endpoint_test.go +++ b/consul/kvs_endpoint_test.go @@ -63,3 +63,111 @@ func TestKVS_Apply(t *testing.T) { t.Fatalf("bad: %v", d) } } + +func TestKVS_Get(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + // Wait for leader + time.Sleep(100 * time.Millisecond) + + arg := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "test", + Flags: 42, + Value: []byte("test"), + }, + } + var out bool + if err := client.Call("KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + getR := structs.KeyRequest{ + Datacenter: "dc1", + Key: "test", + } + var dirent structs.IndexedDirEntries + if err := client.Call("KVS.Get", &getR, &dirent); err != nil { + t.Fatalf("err: %v", err) + } + + if dirent.Index == 0 { + t.Fatalf("Bad: %v", dirent) + } + if len(dirent.Entries) != 1 { + t.Fatalf("Bad: %v", dirent) + } + d := dirent.Entries[0] + if d.Flags != 42 { + t.Fatalf("bad: %v", d) + } + if string(d.Value) != "test" { + t.Fatalf("bad: %v", d) + } +} + +func TestKVSEndpoint_List(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + // Wait for leader + time.Sleep(100 * time.Millisecond) + + keys := []string{ + "/test/key1", + "/test/key2", + "/test/sub/key3", + } + + for _, key := range keys { + arg := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: key, + Flags: 1, + }, + } + var out bool + if err := client.Call("KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + } + + getR := structs.KeyRequest{ + Datacenter: "dc1", + Key: "/test", + } + var dirent structs.IndexedDirEntries + if err := client.Call("KVS.List", &getR, &dirent); err != nil { + t.Fatalf("err: %v", err) + } + + if dirent.Index == 0 { + t.Fatalf("Bad: %v", dirent) + } + if len(dirent.Entries) != 3 { + t.Fatalf("Bad: %v", dirent.Entries) + } + for i := 0; i < len(dirent.Entries); i++ { + d := dirent.Entries[i] + if d.Key != keys[i] { + t.Fatalf("bad: %v", d) + } + if d.Flags != 1 { + t.Fatalf("bad: %v", d) + } + if d.Value != nil { + t.Fatalf("bad: %v", d) + } + } +} From 6cfde2bf12409df4e491626d8e18dd14ff2e602a Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 17:12:10 -0700 Subject: [PATCH 18/30] agent: First pass at KVS endpoints --- command/agent/http.go | 2 + command/agent/http_api.md | 9 ++- command/agent/kvs_endpoint.go | 148 ++++++++++++++++++++++++++++++++++ 3 files changed, 158 insertions(+), 1 deletion(-) create mode 100644 command/agent/kvs_endpoint.go diff --git a/command/agent/http.go b/command/agent/http.go index 33e84601ae9..40e97da204d 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -88,6 +88,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/agent/service/register", s.wrap(s.AgentRegisterService)) s.mux.HandleFunc("/v1/agent/service/deregister", s.wrap(s.AgentDeregisterService)) + s.mux.HandleFunc("/v1/kv/", s.wrap(s.KVSEndpoint)) + if enableDebug { s.mux.HandleFunc("/debug/pprof/", pprof.Index) s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) diff --git a/command/agent/http_api.md b/command/agent/http_api.md index 1ac2f5ae33a..ff379faac61 100644 --- a/command/agent/http_api.md +++ b/command/agent/http_api.md @@ -8,6 +8,7 @@ register new services. The URLs are also versioned to allow for changes in the API. The current URLs supported are: +Catalog: * /v1/catalog/register : Registers a new service * /v1/catalog/deregister : Deregisters a service or node * /v1/catalog/datacenters : Lists known datacenters @@ -16,15 +17,17 @@ The current URLs supported are: * /v1/catalog/service// : Lists the nodes in a given service * /v1/catalog/node// : Lists the services provided by a node -* Health system: +Health system: * /v1/health/node/: Returns the health info of a node * /v1/health/checks/: Returns the checks of a service * /v1/health/service/: Returns the nodes and health info of a service * /v1/health/state/: Returns the checks in a given state +Status: * /v1/status/leader : Returns the current Raft leader * /v1/status/peers : Returns the current Raft peer set +Agent: * /v1/agent/checks: Returns the checks the local agent is managing * /v1/agent/services : Returns the services local agent is managing * /v1/agent/members : Returns the members as seen by the local serf agent @@ -37,3 +40,7 @@ The current URLs supported are: * /v1/agent/check/fail/ * /v1/agent/service/register * /v1/agent/service/deregister/ + +KVS: +* /v1/kv/ + diff --git a/command/agent/kvs_endpoint.go b/command/agent/kvs_endpoint.go new file mode 100644 index 00000000000..038052cc20d --- /dev/null +++ b/command/agent/kvs_endpoint.go @@ -0,0 +1,148 @@ +package agent + +import ( + "bytes" + "github.com/hashicorp/consul/consul/structs" + "io" + "net/http" + "strconv" + "strings" +) + +func (s *HTTPServer) KVSEndpoint(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Set default DC + args := structs.KeyRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + return nil, nil + } + + // Pull out the key name, validation left to each sub-handler + args.Key = strings.TrimPrefix(req.URL.Path, "/v1/kv/") + + // Switch on the method + switch req.Method { + case "GET": + return s.KVSGet(resp, req, &args) + case "PUT": + return s.KVSPut(resp, req, &args) + case "DELETE": + return s.KVSDelete(resp, req, &args) + default: + resp.WriteHeader(405) + return nil, nil + } + return nil, nil +} + +// KVSGet handles a GET request +func (s *HTTPServer) KVSGet(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) { + // Check for recurse + method := "KVS.Get" + params := req.URL.Query() + if _, ok := params["recurse"]; ok { + method = "KVS.List" + } else if missingKey(resp, args) { + return nil, nil + } + + // Make the RPC + var out structs.IndexedDirEntries + if err := s.agent.RPC(method, &args, &out); err != nil { + return nil, err + } + setIndex(resp, out.Index) + return out.Entries, nil +} + +// KVSPut handles a PUT request +func (s *HTTPServer) KVSPut(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) { + if missingKey(resp, args) { + return nil, nil + } + applyReq := structs.KVSRequest{ + Datacenter: args.Datacenter, + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: args.Key, + Flags: 0, + Value: nil, + }, + } + + // Check for flags + params := req.URL.Query() + if _, ok := params["flags"]; ok { + flagVal, err := strconv.ParseUint(params.Get("flags"), 10, 64) + if err != nil { + return nil, err + } + applyReq.DirEnt.Flags = flagVal + } + + // Check for cas value + if _, ok := params["cas"]; ok { + casVal, err := strconv.ParseUint(params.Get("flags"), 10, 64) + if err != nil { + return nil, err + } + applyReq.DirEnt.ModifyIndex = casVal + applyReq.Op = structs.KVSCAS + } + + // Copy the value + buf := bytes.NewBuffer(nil) + if _, err := io.Copy(buf, req.Body); err != nil { + return nil, err + } + applyReq.DirEnt.Value = buf.Bytes() + + // Make the RPC + var out bool + if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil { + return nil, err + } + + // Only use the out value if this was a CAS + if applyReq.Op == structs.KVSSet { + return true, nil + } else { + return out, nil + } +} + +// KVSPut handles a DELETE request +func (s *HTTPServer) KVSDelete(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) { + if missingKey(resp, args) { + return nil, nil + } + applyReq := structs.KVSRequest{ + Datacenter: args.Datacenter, + Op: structs.KVSDelete, + DirEnt: structs.DirEntry{ + Key: args.Key, + }, + } + + // Check for recurse + params := req.URL.Query() + if _, ok := params["recurse"]; ok { + applyReq.Op = structs.KVSDeleteTree + } + + // Make the RPC + var out bool + if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil { + return nil, err + } + return nil, nil +} + +// missingKey checks if the key is missing +func missingKey(resp http.ResponseWriter, args *structs.KeyRequest) bool { + if args.Key == "" { + resp.WriteHeader(400) + resp.Write([]byte("Missing key name")) + return true + } + return false +} From 94df059026f292003d808201a68940dd1c611101 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 20:00:01 -0700 Subject: [PATCH 19/30] consul: Enable a recursive delete of all keys --- command/agent/kvs_endpoint.go | 5 ++--- consul/kvs_endpoint.go | 2 +- consul/state_store.go | 7 +++++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/command/agent/kvs_endpoint.go b/command/agent/kvs_endpoint.go index 038052cc20d..f3a13c5c7e9 100644 --- a/command/agent/kvs_endpoint.go +++ b/command/agent/kvs_endpoint.go @@ -112,9 +112,6 @@ func (s *HTTPServer) KVSPut(resp http.ResponseWriter, req *http.Request, args *s // KVSPut handles a DELETE request func (s *HTTPServer) KVSDelete(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) { - if missingKey(resp, args) { - return nil, nil - } applyReq := structs.KVSRequest{ Datacenter: args.Datacenter, Op: structs.KVSDelete, @@ -127,6 +124,8 @@ func (s *HTTPServer) KVSDelete(resp http.ResponseWriter, req *http.Request, args params := req.URL.Query() if _, ok := params["recurse"]; ok { applyReq.Op = structs.KVSDeleteTree + } else if missingKey(resp, args) { + return nil, nil } // Make the RPC diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 20d62878742..2b94b6ba476 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -21,7 +21,7 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { defer metrics.MeasureSince([]string{"consul", "kvs", "apply"}, time.Now()) // Verify the args - if args.DirEnt.Key == "" { + if args.DirEnt.Key == "" && args.Op != structs.KVSDeleteTree { return fmt.Errorf("Must provide key") } diff --git a/consul/state_store.go b/consul/state_store.go index 11b627a9d9b..ded7e13b0ba 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -773,11 +773,14 @@ func (s *StateStore) KVSDelete(index uint64, key string) error { // KVSDeleteTree is used to delete all keys with a given prefix func (s *StateStore) KVSDeleteTree(index uint64, prefix string) error { + if prefix == "" { + return s.kvsDeleteWithIndex(index, "id") + } return s.kvsDeleteWithIndex(index, "id_prefix", prefix) } // kvsDeleteWithIndex does a delete with either the id or id_prefix -func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex, val string) error { +func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts ...string) error { // Start a new txn tx, err := s.kvsTable.StartTxn(false, nil) if err != nil { @@ -785,7 +788,7 @@ func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex, val string) er } defer tx.Abort() - num, err := s.kvsTable.DeleteTxn(tx, tableIndex, val) + num, err := s.kvsTable.DeleteTxn(tx, tableIndex, parts...) if err != nil { return err } From ceb6964547cf4f4bb3a2669795448d1ac08f2da6 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 20:00:17 -0700 Subject: [PATCH 20/30] consul: Return 404 if no entries found --- command/agent/kvs_endpoint.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/command/agent/kvs_endpoint.go b/command/agent/kvs_endpoint.go index f3a13c5c7e9..5ce9ce205fc 100644 --- a/command/agent/kvs_endpoint.go +++ b/command/agent/kvs_endpoint.go @@ -51,6 +51,12 @@ func (s *HTTPServer) KVSGet(resp http.ResponseWriter, req *http.Request, args *s return nil, err } setIndex(resp, out.Index) + + // Check if we get a not found + if len(out.Entries) == 0 { + resp.WriteHeader(404) + return nil, nil + } return out.Entries, nil } From ca1c2d75b3152b55fcfcb40c049a3ebe586c7c3f Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 20:00:30 -0700 Subject: [PATCH 21/30] agnet: Fix parsing of cas flag --- command/agent/kvs_endpoint.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/command/agent/kvs_endpoint.go b/command/agent/kvs_endpoint.go index 5ce9ce205fc..af97aa5c164 100644 --- a/command/agent/kvs_endpoint.go +++ b/command/agent/kvs_endpoint.go @@ -87,7 +87,7 @@ func (s *HTTPServer) KVSPut(resp http.ResponseWriter, req *http.Request, args *s // Check for cas value if _, ok := params["cas"]; ok { - casVal, err := strconv.ParseUint(params.Get("flags"), 10, 64) + casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64) if err != nil { return nil, err } From 8f2ef0d7d28a89b5daa3962621840cff412d9cb3 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 20:00:46 -0700 Subject: [PATCH 22/30] consul: Fixing blocking query if set table is at index 0 --- consul/kvs_endpoint.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 2b94b6ba476..0e884524f36 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -58,7 +58,13 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er return 0, err } if ent == nil { - reply.Index = index + // Must provide non-zero index to prevent blocking + // Index 1 is impossible anyways (due to Raft internals) + if index == 0 { + reply.Index = 1 + } else { + reply.Index = index + } reply.Entries = nil } else { reply.Index = ent.ModifyIndex @@ -84,7 +90,13 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e return 0, err } if len(ent) == 0 { - reply.Index = index + // Must provide non-zero index to prevent blocking + // Index 1 is impossible anyways (due to Raft internals) + if index == 0 { + reply.Index = 1 + } else { + reply.Index = index + } reply.Entries = nil } else { // Determine the maximum affected index From 512c6895e4baf479efe4d034941b1bc0523cfbbd Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 20:09:16 -0700 Subject: [PATCH 23/30] consul: Adding raft endpoint to force a snapshot --- consul/raft_endpoint.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/consul/raft_endpoint.go b/consul/raft_endpoint.go index 9db31202c9f..97b7adf7466 100644 --- a/consul/raft_endpoint.go +++ b/consul/raft_endpoint.go @@ -18,3 +18,8 @@ func (r *Raft) RemovePeer(args string, reply *struct{}) error { future := r.server.raft.RemovePeer(peer) return future.Error() } + +func (r *Raft) Snapshot(args struct{}, reply *struct{}) error { + future := r.server.raft.Snapshot() + return future.Error() +} From 526629e12cc220755a851785afa176aafe93eec6 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 20:45:57 -0700 Subject: [PATCH 24/30] agent: Adding tests for the KV endpoints --- command/agent/kvs_endpoint_test.go | 291 +++++++++++++++++++++++++++++ 1 file changed, 291 insertions(+) create mode 100644 command/agent/kvs_endpoint_test.go diff --git a/command/agent/kvs_endpoint_test.go b/command/agent/kvs_endpoint_test.go new file mode 100644 index 00000000000..6bb476e0a86 --- /dev/null +++ b/command/agent/kvs_endpoint_test.go @@ -0,0 +1,291 @@ +package agent + +import ( + "bytes" + "fmt" + "github.com/hashicorp/consul/consul/structs" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" +) + +func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + // Wait for a leader + time.Sleep(100 * time.Millisecond) + + keys := []string{ + "baz", + "bar", + "foo/sub1", + "foo/sub2", + "zip", + } + + for _, key := range keys { + buf := bytes.NewBuffer([]byte("test")) + req, err := http.NewRequest("PUT", "/v1/kv/"+key, buf) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + if res := obj.(bool); !res { + t.Fatalf("should work") + } + } + + for _, key := range keys { + req, err := http.NewRequest("GET", "/v1/kv/"+key, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + header := resp.Header().Get("X-Consul-Index") + if header == "" { + t.Fatalf("Bad: %v", header) + } + + res, ok := obj.(structs.DirEntries) + if !ok { + t.Fatalf("should work") + } + + if len(res) != 1 { + t.Fatalf("bad: %v", res) + } + + if res[0].Key != key { + t.Fatalf("bad: %v", res) + } + } + + for _, key := range keys { + req, err := http.NewRequest("DELETE", "/v1/kv/"+key, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + _, err = srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + } +} + +func TestKVSEndpoint_Recurse(t *testing.T) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + // Wait for a leader + time.Sleep(100 * time.Millisecond) + + keys := []string{ + "bar", + "baz", + "foo/sub1", + "foo/sub2", + "zip", + } + + for _, key := range keys { + buf := bytes.NewBuffer([]byte("test")) + req, err := http.NewRequest("PUT", "/v1/kv/"+key, buf) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + if res := obj.(bool); !res { + t.Fatalf("should work") + } + } + + { + // Get all the keys + req, err := http.NewRequest("GET", "/v1/kv/?recurse", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + header := resp.Header().Get("X-Consul-Index") + if header == "" { + t.Fatalf("Bad: %v", header) + } + + res, ok := obj.(structs.DirEntries) + if !ok { + t.Fatalf("should work") + } + + if len(res) != len(keys) { + t.Fatalf("bad: %v", res) + } + + for idx, key := range keys { + if res[idx].Key != key { + t.Fatalf("bad: %v %v", res[idx].Key, key) + } + } + } + + { + req, err := http.NewRequest("DELETE", "/v1/kv/?recurse", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + _, err = srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + } + + { + // Get all the keys + req, err := http.NewRequest("GET", "/v1/kv/?recurse", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + if obj != nil { + t.Fatalf("bad: %v", obj) + } + } +} + +func TestKVSEndpoint_CAS(t *testing.T) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + // Wait for a leader + time.Sleep(100 * time.Millisecond) + + { + buf := bytes.NewBuffer([]byte("test")) + req, err := http.NewRequest("PUT", "/v1/kv/test?flags=50", buf) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + if res := obj.(bool); !res { + t.Fatalf("should work") + } + } + + req, err := http.NewRequest("GET", "/v1/kv/test", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + d := obj.(structs.DirEntries)[0] + + // Check the flags + if d.Flags != 50 { + t.Fatalf("bad: %v", d) + } + + // Create a CAS request, bad index + { + buf := bytes.NewBuffer([]byte("zip")) + req, err := http.NewRequest("PUT", + fmt.Sprintf("/v1/kv/test?flags=42&cas=%d", d.ModifyIndex-1), buf) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + if res := obj.(bool); res { + t.Fatalf("should NOT work") + } + } + + // Create a CAS request, good index + { + buf := bytes.NewBuffer([]byte("zip")) + req, err := http.NewRequest("PUT", + fmt.Sprintf("/v1/kv/test?flags=42&cas=%d", d.ModifyIndex), buf) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + if res := obj.(bool); !res { + t.Fatalf("should work") + } + } + + // Verify the update + req, _ = http.NewRequest("GET", "/v1/kv/test", nil) + resp = httptest.NewRecorder() + obj, _ = srv.KVSEndpoint(resp, req) + d = obj.(structs.DirEntries)[0] + + if d.Flags != 42 { + t.Fatalf("bad: %v", d) + } + if string(d.Value) != "zip" { + t.Fatalf("bad: %v", d) + } +} From f7e76960bd6a32cc926806224a2741bbdac698ba Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 21:15:37 -0700 Subject: [PATCH 25/30] website: Update format of consul info --- .../source/docs/commands/info.html.markdown | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/website/source/docs/commands/info.html.markdown b/website/source/docs/commands/info.html.markdown index 0cb95f944e7..f66c83a1274 100644 --- a/website/source/docs/commands/info.html.markdown +++ b/website/source/docs/commands/info.html.markdown @@ -35,30 +35,34 @@ Here is an example output: num_peers = 2 state = Leader term = 4 - serf-lan: - event-queue = 0 - event-time = 2 + serf_lan: + event_queue = 0 + event_time = 2 failed = 0 - intent-queue = 0 + intent_queue = 0 left = 0 - member-time = 7 + member_time = 7 members = 3 - serf-wan: - event-queue = 0 - event-time = 1 + query_queue = 0 + query_time = 1 + serf_wan: + event_queue = 0 + event_time = 1 failed = 0 - intent-queue = 0 + intent_queue = 0 left = 0 - member-time = 1 + member_time = 1 members = 1 + query_queue = 0 + query_time = 1 There are currently the top-level keys for: * agent: Provides information about the agent * consul: Information about the consul library (client or server) * raft: Provides info about the Raft [consensus library](/docs/internals/consensus.html) -* serf-lan: Provides info about the LAN [gossip pool](/docs/internals/gossip.html) -* serf-wan: Provides info about the WAN [gossip pool](/docs/internals/gossip.html) +* serf_lan: Provides info about the LAN [gossip pool](/docs/internals/gossip.html) +* serf_wan: Provides info about the WAN [gossip pool](/docs/internals/gossip.html) ## Usage From cfa4ba2d345862e2249f206c5b38ff7f647d5aa7 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 31 Mar 2014 21:15:46 -0700 Subject: [PATCH 26/30] website: Document the Key/Value API --- website/source/docs/agent/http.html.markdown | 73 +++++++++++++++++++- 1 file changed, 70 insertions(+), 3 deletions(-) diff --git a/website/source/docs/agent/http.html.markdown b/website/source/docs/agent/http.html.markdown index 0f0bd7a38b1..b938e7ec944 100644 --- a/website/source/docs/agent/http.html.markdown +++ b/website/source/docs/agent/http.html.markdown @@ -7,11 +7,12 @@ sidebar_current: "docs-agent-http" # HTTP API The main interface to Consul is a RESTful HTTP API. The API can be -used for CRUD for nodes, services, and checks. The endpoints are +used for CRUD for nodes, services, checks, and configuration. The endpoints are versioned to enable changes without breaking backwards compatibility. -All endpoints fall into one of 4 categories: +All endpoints fall into one of 5 categories: +* kv - Key/Value store * agent - Agent control * catalog - Manages nodes and services * health - Manages health checks @@ -28,7 +29,7 @@ Queries that support this will mention it specifically, however the use of this feature is the same for all. If supported, the query will set an HTTP header "X-Consul-Index". This is an opaque handle that the client will use. -To cause a query to block, the query parameters "?wait=&index=" are added +To cause a query to block, the query parameters "?wait=\&index=\" are added to a request. The "?wait=" query parameter limits how long the query will potentially block for. It not set, it will default to 10 minutes. It can be specified in the form of "10s" or "5m", which is 10 seconds or 5 minutes respectively. The "?index=" parameter is an @@ -41,6 +42,72 @@ note is that when the query returns there is **no guarantee** of a change. It is possible that the timeout was reached, or that there was an idempotent write that does not affect the result. + +## KV + +The KV endpoint is used to expose a simple key/value store. This can be used +to store service configurations or other meta data in a simple way. It has only +a single endpoint: + + /v1/kv/ + +This is the only endpoint that is used with the Key/Value store. +It's use depends on the HTTP method. The `GET`, `PUT` and `DELETE` methods +are all supported. + +When using the `GET` method, Consul will return the specified key, +or if the "?recurse" query parameter is provided, it will return +all keys with the given prefix. + +Each object will look like: + + [ + { + "CreateIndex":100, + "ModifyIndex":200, + "Key":"zip", + "Flags":0, + "Value":"dGVzdA==" + } + ] + +The `CreateIndex` is the internal index value that represents +when the entry was created. The `ModifyIndex` is the last index +that modified this key. This index corresponds to the `X-Consul-Index` +header value that is returned. A blocking query can be used to wait for +a value to change. If "?recurse" is used, the `X-Consul-Index` corresponds +to the latest `ModifyIndex` and so a blocking query waits until any of the +listed keys are updated. + +The `Key` is simply the full path of the entry. `Flags` are an opaque +unsigned integer that can be attached to each entry. The use of this is +left totally to the user. Lastly, the `Value` is a base64 key value. + +If no entries are found, a 404 code is returned. + +When using the `PUT` method, Consul expects the request body to be the +value corresponding to the key. There are a number of parameters that can +be used with a PUT request: + +* ?flags=\ : This can be used to specify an unsigned value between + 0 and 2^64-1. It is opaque to the user, but a client application may + use it. + +* ?cas=\ : This flag is used to turn the `PUT` into a **Check-And-Set** + operation. This is very useful as it allows clients to build more complex + syncronization primitives on top. If the index is 0, then Consul will only + put the key if it does not already exist. If the index is non-zero, then + the key is only set if the index matches the `ModifyIndex` of that key. + +The return value is simply either `true` or `false`. If the CAS check fails, +then `false` will be returned. + +Lastly, the `DELETE` method can be used to delete a single key or all +keys sharing a prefix. If the "?recurse" query parameter is provided, +then all keys with the prefix are deleted, otherwise only the specified +key. + + ## Agent The Agent endpoints are used to interact with a local Consul agent. Usually, From 4ccb2d4a73b915dbf613a37fdaab314995188915 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 1 Apr 2014 11:29:55 -0700 Subject: [PATCH 27/30] consul: More efficient restore of KVS entriesg --- consul/fsm.go | 2 +- consul/state_store.go | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/consul/fsm.go b/consul/fsm.go index 739e5d92253..f8451913f27 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -218,7 +218,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { if err := dec.Decode(&req); err != nil { return err } - if err := c.state.KVSSet(req.CreateIndex, &req); err != nil { + if err := c.state.KVSRestore(&req); err != nil { return err } diff --git a/consul/state_store.go b/consul/state_store.go index ded7e13b0ba..be18995a150 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -746,6 +746,22 @@ func (s *StateStore) KVSSet(index uint64, d *structs.DirEntry) error { return tx.Commit() } +// KVSRestore is used to restore a DirEntry. It should only be used when +// doing a restore, otherwise KVSSet should be used. +func (s *StateStore) KVSRestore(d *structs.DirEntry) error { + // Start a new txn + tx, err := s.kvsTable.StartTxn(false, nil) + if err != nil { + return err + } + defer tx.Abort() + + if err := s.kvsTable.InsertTxn(tx, d); err != nil { + return err + } + return tx.Commit() +} + // KVSGet is used to get a KV entry func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) { idx, res, err := s.kvsTable.Get("id", key) From 8172526801bf4d9b224d9f9fbaedd0a62b6beb3b Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 1 Apr 2014 11:42:07 -0700 Subject: [PATCH 28/30] consul: Support a streaming transaction --- consul/mdb_table.go | 23 +++++++++++ consul/mdb_table_test.go | 88 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+) diff --git a/consul/mdb_table.go b/consul/mdb_table.go index 58f27812431..eeead56c34a 100644 --- a/consul/mdb_table.go +++ b/consul/mdb_table.go @@ -376,6 +376,29 @@ func (t *MDBTable) GetTxn(tx *MDBTxn, index string, parts ...string) ([]interfac return results, err } +// StreamTxn is like GetTxn but it streams the results over a channel. +// This can be used if the expected data set is very large. The stream +// is always closed on return. +func (t *MDBTable) StreamTxn(stream chan<- interface{}, tx *MDBTxn, index string, parts ...string) error { + // Always close the stream on return + defer close(stream) + + // Get the associated index + idx, key, err := t.getIndex(index, parts) + if err != nil { + return err + } + + // Stream the results + err = idx.iterate(tx, key, func(encRowId, res []byte) bool { + obj := t.Decoder(res) + stream <- obj + return false + }) + + return err +} + // getIndex is used to get the proper index, and also check the arity func (t *MDBTable) getIndex(index string, parts []string) (*MDBIndex, []byte, error) { // Get the index diff --git a/consul/mdb_table_test.go b/consul/mdb_table_test.go index 061240df288..d57a9bd6b2b 100644 --- a/consul/mdb_table_test.go +++ b/consul/mdb_table_test.go @@ -884,3 +884,91 @@ func TestMDBTableVirtualIndex(t *testing.T) { t.Fatalf("expect 1 result: %#v", res) } } + +func TestMDBTableStream(t *testing.T) { + dir, env := testMDBEnv(t) + defer os.RemoveAll(dir) + defer env.Close() + + table := &MDBTable{ + Env: env, + Name: "test", + Indexes: map[string]*MDBIndex{ + "id": &MDBIndex{ + Unique: true, + Fields: []string{"Key"}, + }, + "name": &MDBIndex{ + Fields: []string{"First", "Last"}, + }, + "country": &MDBIndex{ + Fields: []string{"Country"}, + }, + }, + Encoder: MockEncoder, + Decoder: MockDecoder, + } + if err := table.Init(); err != nil { + t.Fatalf("err: %v", err) + } + + objs := []*MockData{ + &MockData{ + Key: "1", + First: "Kevin", + Last: "Smith", + Country: "USA", + }, + &MockData{ + Key: "2", + First: "Kevin", + Last: "Wang", + Country: "USA", + }, + &MockData{ + Key: "3", + First: "Bernardo", + Last: "Torres", + Country: "Mexico", + }, + } + + // Insert some mock objects + for idx, obj := range objs { + if err := table.Insert(obj); err != nil { + t.Fatalf("err: %v", err) + } + if err := table.SetLastIndex(uint64(idx + 1)); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Start a readonly txn + tx, err := table.StartTxn(true, nil) + if err != nil { + panic(err) + } + defer tx.Abort() + + // Stream the records + streamCh := make(chan interface{}) + go func() { + if err := table.StreamTxn(streamCh, tx, "id"); err != nil { + t.Fatalf("err: %v", err) + } + }() + + // Verify we get them all + idx := 0 + for obj := range streamCh { + p := obj.(*MockData) + if !reflect.DeepEqual(p, objs[idx]) { + t.Fatalf("bad: %#v %#v", p, objs[idx]) + } + idx++ + } + + if idx != 3 { + t.Fatalf("bad index: %d", idx) + } +} From 9473bbe7bf39980a62ce031ecd2e2aa5083e43b8 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 1 Apr 2014 11:55:25 -0700 Subject: [PATCH 29/30] consul: Avoid loading all KV pairs during a snapshot --- consul/fsm.go | 29 ++++++++++++++++++++++++----- consul/state_store.go | 17 +++++------------ consul/state_store_test.go | 36 ++++++++++++++++++++++++++++++++++-- 3 files changed, 63 insertions(+), 19 deletions(-) diff --git a/consul/fsm.go b/consul/fsm.go index f8451913f27..5e8c980e2cf 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -290,11 +290,30 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { nodes = nil // Dump the KVS entries - dirents := s.state.KVSDump() - for _, ent := range dirents { - // Register the node itself - sink.Write([]byte{byte(structs.KVSRequestType)}) - if err := encoder.Encode(ent); err != nil { + streamCh := make(chan interface{}, 256) + errorCh := make(chan error) + go func() { + if err := s.state.KVSDump(streamCh); err != nil { + errorCh <- err + } + }() + +OUTER: + for { + select { + case raw := <-streamCh: + if raw == nil { + break OUTER + } + ent := raw.(*structs.DirEntry) + + sink.Write([]byte{byte(structs.KVSRequestType)}) + if err := encoder.Encode(ent); err != nil { + sink.Cancel() + return err + } + + case err := <-errorCh: sink.Cancel() return err } diff --git a/consul/state_store.go b/consul/state_store.go index be18995a150..4b08c046404 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -923,16 +923,9 @@ func (s *StateSnapshot) NodeChecks(node string) structs.HealthChecks { return checks } -// KVSDump is used to list all KV entries -func (s *StateSnapshot) KVSDump() structs.DirEntries { - res, err := s.store.kvsTable.GetTxn(s.tx, "id") - if err != nil { - s.store.logger.Printf("[ERR] consul.state: Failed to get KVS entries: %v", err) - return nil - } - ents := make(structs.DirEntries, len(res)) - for idx, r := range res { - ents[idx] = r.(*structs.DirEntry) - } - return ents +// KVSDump is used to list all KV entries. It takes a channel and streams +// back *struct.DirEntry objects. This will block and should be invoked +// in a goroutine. +func (s *StateSnapshot) KVSDump(stream chan<- interface{}) error { + return s.store.kvsTable.StreamTxn(stream, s.tx, "id") } diff --git a/consul/state_store_test.go b/consul/state_store_test.go index e03e0ad76d9..5e5bf3e8e1e 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -602,7 +602,23 @@ func TestStoreSnapshot(t *testing.T) { } // Check we have the entries - ents := snap.KVSDump() + streamCh := make(chan interface{}, 64) + doneCh := make(chan struct{}) + var ents []*structs.DirEntry + go func() { + for { + obj := <-streamCh + if obj == nil { + close(doneCh) + return + } + ents = append(ents, obj.(*structs.DirEntry)) + } + }() + if err := snap.KVSDump(streamCh); err != nil { + t.Fatalf("err: %v", err) + } + <-doneCh if len(ents) != 2 { t.Fatalf("missing KVS entries!") } @@ -661,7 +677,23 @@ func TestStoreSnapshot(t *testing.T) { } // Check we have the entries - ents = snap.KVSDump() + streamCh = make(chan interface{}, 64) + doneCh = make(chan struct{}) + ents = nil + go func() { + for { + obj := <-streamCh + if obj == nil { + close(doneCh) + return + } + ents = append(ents, obj.(*structs.DirEntry)) + } + }() + if err := snap.KVSDump(streamCh); err != nil { + t.Fatalf("err: %v", err) + } + <-doneCh if len(ents) != 2 { t.Fatalf("missing KVS entries!") } From 5ff482dec021d1059f6c38ecf9e328a959f49452 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 1 Apr 2014 12:10:58 -0700 Subject: [PATCH 30/30] consul: FSM snapshot can avoid type assertion --- consul/fsm.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/consul/fsm.go b/consul/fsm.go index 5e8c980e2cf..22854729fd3 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -305,10 +305,8 @@ OUTER: if raw == nil { break OUTER } - ent := raw.(*structs.DirEntry) - sink.Write([]byte{byte(structs.KVSRequestType)}) - if err := encoder.Encode(ent); err != nil { + if err := encoder.Encode(raw); err != nil { sink.Cancel() return err }