From 71500c80f219d38084a57824332ade553dcc39a7 Mon Sep 17 00:00:00 2001 From: Josh Rickmar Date: Wed, 8 Nov 2017 16:48:44 -0500 Subject: [PATCH] multi: Add initial committed filter (CF) support This change begins the work of bringing committed filters to the network consensus daemon. Committed filters are designed to enable light wallets without many of the privacy issues associated with server-side bloom filtering. The new gcs package provides the primitives for creating and matching against Golomb-coded sets (GCS) filters while the blockcf package provides creation of filters and filter entries for data structures found in blocks. The wire package has been updated to define a new protocol version and service flag for advertising CF support and includes types for the following new messages: cfheaders, cfilter, cftypes, getcfheaders, getcfilter, getcftypes. The peer package and server implementation have been updated to include support for the new protocol version and messages. Filters are created using a collision probability of 2^-20 and are saved to a new optional database index when running with committed filter support enabled (the default). At first startup, if support is not disabled, the index will be created and populated with filters and filter headers for all preexisting blocks, and new filters will be recorded for processed blocks. Multiple filter types are supported. The regular filter commits to output scripts and previous outpoints that any non-voting wallet will require access to. Scripts and previous outpoints that can only be spent by votes and revocations are not committed to the filter. The extended filter is a supplementary filter which commits to all transaction hashes and script data pushes from the input scripts of non-coinbase regular and ticket purchase transactions. Creating these filters is based on the algorithm defined by BIP0158 but is modified to only commit "regular" data in stake transactions to prevent committed filters from being used to create SPV voting wallets. --- Gopkg.lock | 8 +- LICENSE | 5 +- blockchain/indexers/README.md | 3 + blockchain/indexers/cfindex.go | 292 +++++++++++++++++++++++++++ config.go | 12 ++ dcrd.go | 8 + dcrjson/chainsvrcmds.go | 32 +++ dcrjson/chainsvrcmds_test.go | 28 +++ dcrjson/jsonrpcerr.go | 1 + gcs/README.md | 9 + gcs/bits.go | 194 ++++++++++++++++++ gcs/blockcf/README.md | 8 + gcs/blockcf/blockcf.go | 197 ++++++++++++++++++ gcs/doc.go | 26 +++ gcs/gcs.go | 358 +++++++++++++++++++++++++++++++++ gcs/gcs_test.go | 199 ++++++++++++++++++ gcs/gcsbench_test.go | 100 +++++++++ gcs/uint64slice.go | 26 +++ peer/peer.go | 54 ++++- peer/peer_test.go | 45 +++++ rpcclient/chain.go | 100 +++++++++ rpcclient/extensions.go | 4 +- rpcclient/infrastructure.go | 7 + rpcserver.go | 82 ++++++++ rpcserverhelp.go | 14 ++ server.go | 273 ++++++++++++++++++++++++- wire/message.go | 24 +++ wire/message_test.go | 53 +++-- wire/msgcfheaders.go | 181 +++++++++++++++++ wire/msgcfilter.go | 124 ++++++++++++ wire/msgcftypes.go | 126 ++++++++++++ wire/msggetcfheaders.go | 138 +++++++++++++ wire/msggetcfilter.go | 76 +++++++ wire/msggetcftypes.go | 58 ++++++ wire/protocol.go | 15 +- wire/protocol_test.go | 3 +- 36 files changed, 2852 insertions(+), 31 deletions(-) create mode 100644 blockchain/indexers/cfindex.go create mode 100644 gcs/README.md create mode 100644 gcs/bits.go create mode 100644 gcs/blockcf/README.md create mode 100644 gcs/blockcf/blockcf.go create mode 100644 gcs/doc.go create mode 100644 gcs/gcs.go create mode 100644 gcs/gcs_test.go create mode 100644 gcs/gcsbench_test.go create mode 100644 gcs/uint64slice.go create mode 100644 wire/msgcfheaders.go create mode 100644 wire/msgcfilter.go create mode 100644 wire/msgcftypes.go create mode 100644 wire/msggetcfheaders.go create mode 100644 wire/msggetcfilter.go create mode 100644 wire/msggetcftypes.go diff --git a/Gopkg.lock b/Gopkg.lock index 1d39c28d02..332ef64fa2 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1,6 +1,12 @@ # This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. +[[projects]] + branch = "master" + name = "github.com/aead/siphash" + packages = ["."] + revision = "e404fcfc888570cadd1610538e2dbc89f66af814" + [[projects]] branch = "master" name = "github.com/agl/ed25519" @@ -122,6 +128,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "dac2b0bbb944e27f7652fb08881c0de11c94472c2041f5ff3a75a4064dae5439" + inputs-digest = "d190067efa8c61778ef133b4f695f26aa8e0333625b7f0172788009b68567f3d" solver-name = "gps-cdcl" solver-version = 1 diff --git a/LICENSE b/LICENSE index 684c60048b..71678d9f0a 100644 --- a/LICENSE +++ b/LICENSE @@ -1,7 +1,8 @@ ISC License -Copyright (c) 2013-2016 The btcsuite developers -Copyright (c) 2015-2016 The Decred developers +Copyright (c) 2013-2017 The btcsuite developers +Copyright (c) 2015-2018 The Decred developers +Copyright (c) 2017 The Lightning Network Developers Permission to use, copy, modify, and distribute this software for any purpose with or without fee is hereby granted, provided that the above diff --git a/blockchain/indexers/README.md b/blockchain/indexers/README.md index ffc5e15132..40aad1fd0d 100644 --- a/blockchain/indexers/README.md +++ b/blockchain/indexers/README.md @@ -23,6 +23,9 @@ via an RPC interface. - Stores a key with an empty value for every address that has ever existed and was seen by the client - Requires the transaction-by-hash index +- Committed Filter (cfindexparentbucket) Index + - Stores all committed filters and committed filter headers for all blocks in + the main chain ## Installation diff --git a/blockchain/indexers/cfindex.go b/blockchain/indexers/cfindex.go new file mode 100644 index 0000000000..88de84e617 --- /dev/null +++ b/blockchain/indexers/cfindex.go @@ -0,0 +1,292 @@ +// Copyright (c) 2017 The btcsuite developers +// Copyright (c) 2018 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package indexers + +import ( + "errors" + "fmt" + + "github.com/decred/dcrd/blockchain" + "github.com/decred/dcrd/chaincfg" + "github.com/decred/dcrd/chaincfg/chainhash" + "github.com/decred/dcrd/database" + "github.com/decred/dcrd/dcrutil" + "github.com/decred/dcrd/gcs" + "github.com/decred/dcrd/gcs/blockcf" + "github.com/decred/dcrd/wire" +) + +const ( + // cfIndexName is the human-readable name for the index. + cfIndexName = "committed filter index" +) + +// Committed filters come in two flavors: basic and extended. They are +// generated and dropped in pairs, and both are indexed by a block's hash. +// Besides holding different content, they also live in different buckets. +var ( + // cfIndexParentBucketKey is the name of the parent bucket used to house + // the index. The rest of the buckets live below this bucket. + cfIndexParentBucketKey = []byte("cfindexparentbucket") + + // cfIndexKeys is an array of db bucket names used to house indexes of + // block hashes to cfilters. + cfIndexKeys = [][]byte{ + []byte("cf0byhashidx"), + []byte("cf1byhashidx"), + } + + // cfHeaderKeys is an array of db bucket names used to house indexes of + // block hashes to cf headers. + cfHeaderKeys = [][]byte{ + []byte("cf0headerbyhashidx"), + []byte("cf1headerbyhashidx"), + } + + maxFilterType = uint8(len(cfHeaderKeys) - 1) +) + +// dbFetchFilter retrieves a block's basic or extended filter. A filter's +// absence is not considered an error. +func dbFetchFilter(dbTx database.Tx, key []byte, h *chainhash.Hash) []byte { + idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key) + return idx.Get(h[:]) +} + +// dbFetchFilterHeader retrieves a block's basic or extended filter header. +// A filter's absence is not considered an error. +func dbFetchFilterHeader(dbTx database.Tx, key []byte, h *chainhash.Hash) ([]byte, error) { + idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key) + + fh := idx.Get(h[:]) + if fh == nil { + return make([]byte, chainhash.HashSize), nil + } + if len(fh) != chainhash.HashSize { + return nil, fmt.Errorf("invalid filter header length %v", len(fh)) + } + + return fh, nil +} + +// dbStoreFilter stores a block's basic or extended filter. +func dbStoreFilter(dbTx database.Tx, key []byte, h *chainhash.Hash, f []byte) error { + idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key) + return idx.Put(h[:], f) +} + +// dbStoreFilterHeader stores a block's basic or extended filter header. +func dbStoreFilterHeader(dbTx database.Tx, key []byte, h *chainhash.Hash, fh []byte) error { + if len(fh) != chainhash.HashSize { + return fmt.Errorf("invalid filter header length %v", len(fh)) + } + idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key) + return idx.Put(h[:], fh) +} + +// dbDeleteFilter deletes a filter's basic or extended filter. +func dbDeleteFilter(dbTx database.Tx, key []byte, h *chainhash.Hash) error { + idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key) + return idx.Delete(h[:]) +} + +// dbDeleteFilterHeader deletes a filter's basic or extended filter header. +func dbDeleteFilterHeader(dbTx database.Tx, key []byte, h *chainhash.Hash) error { + idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key) + return idx.Delete(h[:]) +} + +// CFIndex implements a committed filter (cf) by hash index. +type CFIndex struct { + db database.DB + chainParams *chaincfg.Params +} + +// Ensure the CFIndex type implements the Indexer interface. +var _ Indexer = (*CFIndex)(nil) + +// Init initializes the hash-based cf index. This is part of the Indexer +// interface. +func (idx *CFIndex) Init() error { + return nil // Nothing to do. +} + +// Key returns the database key to use for the index as a byte slice. This is +// part of the Indexer interface. +func (idx *CFIndex) Key() []byte { + return cfIndexParentBucketKey +} + +// Name returns the human-readable name of the index. This is part of the +// Indexer interface. +func (idx *CFIndex) Name() string { + return cfIndexName +} + +// Create is invoked when the indexer manager determines the index needs to +// be created for the first time. It creates buckets for the two hash-based cf +// indexes (simple, extended). +func (idx *CFIndex) Create(dbTx database.Tx) error { + meta := dbTx.Metadata() + + cfIndexParentBucket, err := meta.CreateBucket(cfIndexParentBucketKey) + if err != nil { + return err + } + + for _, bucketName := range cfIndexKeys { + _, err = cfIndexParentBucket.CreateBucket(bucketName) + if err != nil { + return err + } + } + + for _, bucketName := range cfHeaderKeys { + _, err = cfIndexParentBucket.CreateBucket(bucketName) + if err != nil { + return err + } + } + + firstHeader := make([]byte, chainhash.HashSize) + err = dbStoreFilterHeader(dbTx, cfHeaderKeys[wire.GCSFilterRegular], + &idx.chainParams.GenesisBlock.Header.PrevBlock, firstHeader) + if err != nil { + return err + } + + return dbStoreFilterHeader(dbTx, cfHeaderKeys[wire.GCSFilterExtended], + &idx.chainParams.GenesisBlock.Header.PrevBlock, firstHeader) +} + +// storeFilter stores a given filter, and performs the steps needed to +// generate the filter's header. +func storeFilter(dbTx database.Tx, block *dcrutil.Block, f *gcs.Filter, filterType wire.FilterType) error { + if uint8(filterType) > maxFilterType { + return errors.New("unsupported filter type") + } + + // Figure out which buckets to use. + fkey := cfIndexKeys[filterType] + hkey := cfHeaderKeys[filterType] + + // Start by storing the filter. + h := block.Hash() + var basicFilterBytes []byte + if f != nil { + basicFilterBytes = f.NBytes() + } + err := dbStoreFilter(dbTx, fkey, h, basicFilterBytes) + if err != nil { + return err + } + + // Then fetch the previous block's filter header. + ph := &block.MsgBlock().Header.PrevBlock + pfh, err := dbFetchFilterHeader(dbTx, hkey, ph) + if err != nil { + return err + } + + // Construct the new block's filter header, and store it. + prevHeader, err := chainhash.NewHash(pfh) + if err != nil { + return err + } + fh := gcs.MakeHeaderForFilter(f, prevHeader) + return dbStoreFilterHeader(dbTx, hkey, h, fh[:]) +} + +// ConnectBlock is invoked by the index manager when a new block has been +// connected to the main chain. This indexer adds a hash-to-cf mapping for +// every passed block. This is part of the Indexer interface. +func (idx *CFIndex) ConnectBlock(dbTx database.Tx, block, parent *dcrutil.Block, view *blockchain.UtxoViewpoint) error { + f, err := blockcf.Regular(block.MsgBlock()) + if err != nil && err != gcs.ErrNoData { + return err + } + + err = storeFilter(dbTx, block, f, wire.GCSFilterRegular) + if err != nil { + return err + } + + f, err = blockcf.Extended(block.MsgBlock()) + if err != nil && err != gcs.ErrNoData { + return err + } + + return storeFilter(dbTx, block, f, wire.GCSFilterExtended) +} + +// DisconnectBlock is invoked by the index manager when a block has been +// disconnected from the main chain. This indexer removes the hash-to-cf +// mapping for every passed block. This is part of the Indexer interface. +func (idx *CFIndex) DisconnectBlock(dbTx database.Tx, block, parent *dcrutil.Block, view *blockchain.UtxoViewpoint) error { + for _, key := range cfIndexKeys { + err := dbDeleteFilter(dbTx, key, block.Hash()) + if err != nil { + return err + } + } + + for _, key := range cfHeaderKeys { + err := dbDeleteFilterHeader(dbTx, key, block.Hash()) + if err != nil { + return err + } + } + + return nil +} + +// FilterByBlockHash returns the serialized contents of a block's basic or +// extended committed filter. +func (idx *CFIndex) FilterByBlockHash(h *chainhash.Hash, filterType wire.FilterType) ([]byte, error) { + if uint8(filterType) > maxFilterType { + return nil, errors.New("unsupported filter type") + } + + var f []byte + err := idx.db.View(func(dbTx database.Tx) error { + f = dbFetchFilter(dbTx, cfIndexKeys[filterType], h) + return nil + }) + return f, err +} + +// FilterHeaderByBlockHash returns the serialized contents of a block's basic +// or extended committed filter header. +func (idx *CFIndex) FilterHeaderByBlockHash(h *chainhash.Hash, filterType wire.FilterType) ([]byte, error) { + if uint8(filterType) > maxFilterType { + return nil, errors.New("unsupported filter type") + } + + var fh []byte + err := idx.db.View(func(dbTx database.Tx) error { + var err error + fh, err = dbFetchFilterHeader(dbTx, + cfHeaderKeys[filterType], h) + return err + }) + return fh, err +} + +// NewCfIndex returns a new instance of an indexer that is used to create a +// mapping of the hashes of all blocks in the blockchain to their respective +// committed filters. +// +// It implements the Indexer interface which plugs into the IndexManager that +// in turn is used by the blockchain package. This allows the index to be +// seamlessly maintained along with the chain. +func NewCfIndex(db database.DB, chainParams *chaincfg.Params) *CFIndex { + return &CFIndex{db: db, chainParams: chainParams} +} + +// DropCfIndex drops the CF index from the provided database if exists. +func DropCfIndex(db database.DB, interrupt <-chan struct{}) error { + return dropIndexMetadata(db, cfIndexParentBucketKey, cfIndexName) +} diff --git a/config.go b/config.go index 9fda612f1a..638bb6e2ae 100644 --- a/config.go +++ b/config.go @@ -58,6 +58,7 @@ const ( defaultSigCacheMaxSize = 100000 defaultTxIndex = false defaultNoExistsAddrIndex = false + defaultNoCFilters = false ) var ( @@ -159,6 +160,8 @@ type config struct { DropAddrIndex bool `long:"dropaddrindex" description:"Deletes the address-based transaction index from the database on start up and then exits."` NoExistsAddrIndex bool `long:"noexistsaddrindex" description:"Disable the exists address index, which tracks whether or not an address has even been used."` DropExistsAddrIndex bool `long:"dropexistsaddrindex" description:"Deletes the exists address index from the database on start up and then exits."` + NoCFilters bool `long:"nocfilters" description:"Disable compact filtering (CF) support"` + DropCFIndex bool `long:"dropcfindex" description:"Deletes the index used for compact filtering (CF) support from the database on start up and then exits."` PipeRx uint `long:"piperx" description:"File descriptor of read end pipe to enable parent -> child process communication"` PipeTx uint `long:"pipetx" description:"File descriptor of write end pipe to enable parent <- child process communication"` LifetimeEvents bool `long:"lifetimeevents" description:"Send lifetime notifications over the TX pipe"` @@ -447,6 +450,7 @@ func loadConfig() (*config, []string, error) { AddrIndex: defaultAddrIndex, AllowOldVotes: defaultAllowOldVotes, NoExistsAddrIndex: defaultNoExistsAddrIndex, + NoCFilters: defaultNoCFilters, } // Service options which are only added on Windows. @@ -901,6 +905,14 @@ func loadConfig() (*config, []string, error) { return nil, nil, err } + // !--nocfilters and --dropcfindex do not mix. + if !cfg.NoCFilters && cfg.DropCFIndex { + err := errors.New("dropcfindex cannot be actived without nocfilters") + fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr, usageMessage) + return nil, nil, err + } + // Check getwork keys are valid and saved parsed versions. cfg.miningAddrs = make([]dcrutil.Address, 0, len(cfg.GetWorkKeys)+ len(cfg.MiningAddrs)) diff --git a/dcrd.go b/dcrd.go index 55b1225211..899e07bfda 100644 --- a/dcrd.go +++ b/dcrd.go @@ -166,6 +166,14 @@ func dcrdMain(serverChan chan<- *server) error { return nil } + if cfg.DropCFIndex { + if err := indexers.DropCfIndex(db, interrupt); err != nil { + dcrdLog.Errorf("%v", err) + return err + } + + return nil + } // Create server and start it. lifetimeNotifier.notifyStartupEvent(lifetimeEventP2PServer) diff --git a/dcrjson/chainsvrcmds.go b/dcrjson/chainsvrcmds.go index 3e3c738f5c..e7918f08d8 100644 --- a/dcrjson/chainsvrcmds.go +++ b/dcrjson/chainsvrcmds.go @@ -309,6 +309,36 @@ func NewGetBlockTemplateCmd(request *TemplateRequest) *GetBlockTemplateCmd { } } +// GetCFilterCmd defines the getcfilter JSON-RPC command. +type GetCFilterCmd struct { + Hash string + FilterType string +} + +// NewGetCFilterCmd returns a new instance which can be used to issue a +// getcfilter JSON-RPC command. +func NewGetCFilterCmd(hash string, filterType string) *GetCFilterCmd { + return &GetCFilterCmd{ + Hash: hash, + FilterType: filterType, + } +} + +// GetCFilterHeaderCmd defines the getcfilterheader JSON-RPC command. +type GetCFilterHeaderCmd struct { + Hash string + FilterType string +} + +// NewGetCFilterHeaderCmd returns a new instance which can be used to issue a +// getcfilterheader JSON-RPC command. +func NewGetCFilterHeaderCmd(hash string, filterType string) *GetCFilterHeaderCmd { + return &GetCFilterHeaderCmd{ + Hash: hash, + FilterType: filterType, + } +} + // GetChainTipsCmd defines the getchaintips JSON-RPC command. type GetChainTipsCmd struct{} @@ -735,6 +765,8 @@ func init() { MustRegisterCmd("getblockheader", (*GetBlockHeaderCmd)(nil), flags) MustRegisterCmd("getblocksubsidy", (*GetBlockSubsidyCmd)(nil), flags) MustRegisterCmd("getblocktemplate", (*GetBlockTemplateCmd)(nil), flags) + MustRegisterCmd("getcfilter", (*GetCFilterCmd)(nil), flags) + MustRegisterCmd("getcfilterheader", (*GetCFilterHeaderCmd)(nil), flags) MustRegisterCmd("getchaintips", (*GetChainTipsCmd)(nil), flags) MustRegisterCmd("getconnectioncount", (*GetConnectionCountCmd)(nil), flags) MustRegisterCmd("getdifficulty", (*GetDifficultyCmd)(nil), flags) diff --git a/dcrjson/chainsvrcmds_test.go b/dcrjson/chainsvrcmds_test.go index 9b735926b8..e74816a29d 100644 --- a/dcrjson/chainsvrcmds_test.go +++ b/dcrjson/chainsvrcmds_test.go @@ -331,6 +331,34 @@ func TestChainSvrCmds(t *testing.T) { }, }, }, + { + name: "getcfilter", + newCmd: func() (interface{}, error) { + return dcrjson.NewCmd("getcfilter", "123", "extended") + }, + staticCmd: func() interface{} { + return dcrjson.NewGetCFilterCmd("123", "extended") + }, + marshalled: `{"jsonrpc":"1.0","method":"getcfilter","params":["123","extended"],"id":1}`, + unmarshalled: &dcrjson.GetCFilterCmd{ + Hash: "123", + FilterType: "extended", + }, + }, + { + name: "getcfilterheader", + newCmd: func() (interface{}, error) { + return dcrjson.NewCmd("getcfilterheader", "123", "extended") + }, + staticCmd: func() interface{} { + return dcrjson.NewGetCFilterHeaderCmd("123", "extended") + }, + marshalled: `{"jsonrpc":"1.0","method":"getcfilterheader","params":["123","extended"],"id":1}`, + unmarshalled: &dcrjson.GetCFilterHeaderCmd{ + Hash: "123", + FilterType: "extended", + }, + }, { name: "getchaintips", newCmd: func() (interface{}, error) { diff --git a/dcrjson/jsonrpcerr.go b/dcrjson/jsonrpcerr.go index ac70baddc7..33fd06c1f8 100644 --- a/dcrjson/jsonrpcerr.go +++ b/dcrjson/jsonrpcerr.go @@ -71,6 +71,7 @@ const ( ErrRPCDifficulty RPCErrorCode = -5 ErrRPCOutOfRange RPCErrorCode = -1 ErrRPCNoTxInfo RPCErrorCode = -5 + ErrRPCNoCFIndex RPCErrorCode = -5 ErrRPCNoNewestBlockInfo RPCErrorCode = -5 ErrRPCInvalidTxVout RPCErrorCode = -5 ErrRPCRawTxString RPCErrorCode = -32602 diff --git a/gcs/README.md b/gcs/README.md new file mode 100644 index 0000000000..8b5e73fc2d --- /dev/null +++ b/gcs/README.md @@ -0,0 +1,9 @@ +gcs +========== + +[![GoDoc](https://godoc.org/github.com/decred/dcrd/gcs?status.png)](http://godoc.org/github.com/decred/dcrd/gcs) + +Package gcs provides an API for building and using a Golomb-coded set filter +similar to that described [here](http://giovanni.bajo.it/post/47119962313/golomb-coded-sets-smaller-than-bloom-filters). + +A comprehensive suite of tests is provided to ensure proper functionality. \ No newline at end of file diff --git a/gcs/bits.go b/gcs/bits.go new file mode 100644 index 0000000000..b0c84695f0 --- /dev/null +++ b/gcs/bits.go @@ -0,0 +1,194 @@ +// Copyright (c) 2018 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package gcs + +import ( + "io" +) + +type bitWriter struct { + bytes []byte + p *byte // Pointer to last byte + next byte // Next bit to write or skip +} + +// writeOne writes a one bit to the bit stream. +func (b *bitWriter) writeOne() { + if b.next == 0 { + b.bytes = append(b.bytes, 1<<7) + b.p = &b.bytes[len(b.bytes)-1] + b.next = 1 << 6 + return + } + + *b.p |= b.next + b.next >>= 1 +} + +// writeZero writes a zero bit to the bit stream. +func (b *bitWriter) writeZero() { + if b.next == 0 { + b.bytes = append(b.bytes, 0) + b.p = &b.bytes[len(b.bytes)-1] + b.next = 1 << 6 + return + } + + b.next >>= 1 +} + +// writeNBits writes n number of LSB bits of data to the bit stream in big +// endian format. Panics if n > 64. +func (b *bitWriter) writeNBits(data uint64, n uint) { + if n > 64 { + panic("gcs: cannot write more than 64 bits of a uint64") + } + + data <<= 64 - n + + // If byte is partially written, fill the rest + for n > 0 { + if b.next == 0 { + break + } + if data&(1<<63) != 0 { + b.writeOne() + } else { + b.writeZero() + } + n-- + data <<= 1 + } + + if n == 0 { + return + } + + // Write 8 bits at a time. + for n >= 8 { + b.bytes = append(b.bytes, byte(data>>56)) + n -= 8 + data <<= 8 + } + + // Write the remaining bits. + for n > 0 { + if data&(1<<63) != 0 { + b.writeOne() + } else { + b.writeZero() + } + n-- + data <<= 1 + } +} + +type bitReader struct { + bytes []byte + next byte // next bit to read in bytes[0] +} + +func newBitReader(bitstream []byte) bitReader { + return bitReader{ + bytes: bitstream, + next: 1 << 7, + } +} + +// readUnary returns the number of unread sequential one bits before the next +// zero bit. Errors with io.EOF if no zero bits are encountered. +func (b *bitReader) readUnary() (uint64, error) { + var value uint64 + + for { + if len(b.bytes) == 0 { + return value, io.EOF + } + + for b.next != 0 { + bit := b.bytes[0] & b.next + b.next >>= 1 + if bit == 0 { + return value, nil + } + value++ + } + + b.bytes = b.bytes[1:] + b.next = 1 << 7 + } +} + +// readNBits reads n number of LSB bits of data from the bit stream in big +// endian format. Panics if n > 64. +func (b *bitReader) readNBits(n uint) (uint64, error) { + if n > 64 { + panic("gcs: cannot read more than 64 bits as a uint64") + } + + if len(b.bytes) == 0 { + return 0, io.EOF + } + + var value uint64 + + // If byte is partially read, read the rest + if b.next != 1<<7 { + for n > 0 { + if b.next == 0 { + b.next = 1 << 7 + b.bytes = b.bytes[1:] + break + } + + n-- + if b.bytes[0]&b.next != 0 { + value |= 1 << n + } + b.next >>= 1 + } + } + + if n == 0 { + return value, nil + } + + // Read 8 bits at a time. + for n >= 8 { + if len(b.bytes) == 0 { + return 0, io.EOF + } + + n -= 8 + value |= uint64(b.bytes[0]) << n + b.bytes = b.bytes[1:] + } + + if len(b.bytes) == 0 { + if n != 0 { + return 0, io.EOF + } + return value, nil + } + + // Read the remaining bits. + for n > 0 { + if b.next == 0 { + b.bytes = b.bytes[1:] + if len(b.bytes) == 0 { + return 0, io.EOF + } + b.next = 1 << 7 + } + + n-- + if b.bytes[0]&b.next != 0 { + value |= 1 << n + } + b.next >>= 1 + } + + return value, nil +} diff --git a/gcs/blockcf/README.md b/gcs/blockcf/README.md new file mode 100644 index 0000000000..537da22483 --- /dev/null +++ b/gcs/blockcf/README.md @@ -0,0 +1,8 @@ +blockcf +========== + +[![GoDoc](https://godoc.org/github.com/decred/dcrd/gcs/blockcf?status.png)](http://godoc.org/github.com/decred/dcrd/gcs/blockcf) + +Package blockcf provides functions to build committed filters from blocks. +Unlike the gcs package, which is a general implementation of golomb coded sets, +this package is tailored for specific filter creation for Decred blocks. \ No newline at end of file diff --git a/gcs/blockcf/blockcf.go b/gcs/blockcf/blockcf.go new file mode 100644 index 0000000000..a7d1199cf7 --- /dev/null +++ b/gcs/blockcf/blockcf.go @@ -0,0 +1,197 @@ +// Copyright (c) 2017 The btcsuite developers +// Copyright (c) 2017 The Lightning Network Developers +// Copyright (c) 2018 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +/* +Package blockcf provides functions for building committed filters for blocks +using Golomb-coded sets in a way that is useful for light clients such as SPV +wallets. + +Committed filters are a reversal of how bloom filters are typically used by a +light client: a consensus-validating full node commits to filters for every +block with a predetermined collision probability and light clients match against +the filters locally rather than uploading personal data to other nodes. If a +filter matches, the light client should fetch the entire block and further +inspect it for relevant transactions. +*/ +package blockcf + +import ( + "encoding/binary" + + "github.com/decred/dcrd/blockchain/stake" + "github.com/decred/dcrd/chaincfg/chainhash" + "github.com/decred/dcrd/gcs" + "github.com/decred/dcrd/txscript" + "github.com/decred/dcrd/wire" +) + +// P is the collision probability used for block committed filters (2^-20) +const P = 20 + +// Entries describes all of the filter entries used to create a GCS filter and +// provides methods for appending data structures found in blocks. +type Entries [][]byte + +// AddOutPoint adds a serialized outpoint to an entries slice. +func (e *Entries) AddOutPoint(outpoint *wire.OutPoint) { + entry := make([]byte, chainhash.HashSize+4) + copy(entry, outpoint.Hash[:]) + binary.LittleEndian.PutUint32(entry[chainhash.HashSize:], outpoint.Index) + + *e = append(*e, entry) +} + +// AddHash adds a hash to an entries slice. +func (e *Entries) AddHash(hash *chainhash.Hash) { + *e = append(*e, hash[:]) +} + +// AddRegularPkScript adds the regular tx output script to an entries slice. +func (e *Entries) AddRegularPkScript(script []byte) { + *e = append(*e, script) +} + +// AddStakePkScript adds the output script without the stake opcode tag to an +// entries slice. +func (e *Entries) AddStakePkScript(script []byte) { + *e = append(*e, script[1:]) +} + +// AddSigScript adds any data pushes of a signature script to an entries slice. +func (e *Entries) AddSigScript(script []byte) { + // Ignore errors and add pushed data, if any + pushes, err := txscript.PushedData(script) + if err == nil && len(pushes) != 0 { + *e = append(*e, pushes...) + } +} + +// Key creates a block committed filter key by truncating a block hash to the +// key size. +func Key(hash *chainhash.Hash) [gcs.KeySize]byte { + var key [gcs.KeySize]byte + copy(key[:], hash[:]) + return key +} + +// Regular builds a regular GCS filter from a block. A regular GCS filter will +// contain all the previous regular outpoints spent within a block, as well as +// the data pushes within all the outputs created within a block which can be +// spent by regular transactions. +func Regular(block *wire.MsgBlock) (*gcs.Filter, error) { + var data Entries + + // Add "regular" data from stake transactions. For each class of stake + // transaction, the following data is committed to the regular filter: + // + // ticket purchases: + // - all previous outpoints + // - all change output scripts + // + // votes: + // - all OP_SSGEN-tagged output scripts (all outputs after the first + // two -- these describe the block voted on and the vote choices) + // + // revocations: + // - all output scripts + // + // Because change outputs are required in a ticket purchase, even when + // unused, a special case is made that excludes their commitment when the + // output value is zero (provably unspendable). + // + // Output scripts are handled specially for stake transactions by slicing + // off the stake opcode tag (OP_SS*). This tag always appears as the first + // byte of the script and removing it allows users of the filter to only + // match against a normal P2PKH or P2SH script, instead of many extra + // matches for each tag. + for _, tx := range block.STransactions { + switch stake.DetermineTxType(tx) { + case stake.TxTypeSStx: // Ticket purchase + for _, in := range tx.TxIn { + data.AddOutPoint(&in.PreviousOutPoint) + } + for i := 2; i < len(tx.TxOut); i += 2 { // Iterate change outputs + out := tx.TxOut[i] + if out.Value != 0 { + data.AddStakePkScript(out.PkScript) + } + } + + case stake.TxTypeSSGen: // Vote + for _, out := range tx.TxOut[2:] { // Iterate generated coins + data.AddStakePkScript(out.PkScript) + } + + case stake.TxTypeSSRtx: // Revocation + for _, out := range tx.TxOut { + data.AddStakePkScript(out.PkScript) + } + } + } + + // For regular transactions, all previous outpoints except the coinbase's + // are committed, and all output scripts are committed. + for i, tx := range block.Transactions { + if i != 0 { + for _, txIn := range tx.TxIn { + data.AddOutPoint(&txIn.PreviousOutPoint) + } + } + for _, txOut := range tx.TxOut { + data.AddRegularPkScript(txOut.PkScript) + } + } + + // Create the key by truncating the block hash. + blockHash := block.BlockHash() + key := Key(&blockHash) + + return gcs.NewFilter(P, key, data) +} + +// Extended builds an extended GCS filter from a block. An extended filter +// supplements a regular basic filter by including all transaction hashes of +// regular and stake transactions, and adding the witness data (a.k.a. the +// signature script) found within every non-coinbase regular transaction. +func Extended(block *wire.MsgBlock) (*gcs.Filter, error) { + var data Entries + + // For each stake transaction, commit the transaction hash. If the + // transaction is a ticket purchase, commit pushes from the signature script + // (witness). + for _, tx := range block.STransactions { + txHash := tx.TxHash() + data.AddHash(&txHash) + + if stake.IsSStx(tx) { + for _, in := range tx.TxIn { + data.AddSigScript(in.SignatureScript) + } + } + } + + // For each regular transaction, commit the transaction hash. For all + // regular transactions except the coinbase, commit pushes to the signature + // script (witness). + coinbaseHash := block.Transactions[0].TxHash() + data.AddHash(&coinbaseHash) + for _, tx := range block.Transactions[1:] { + txHash := tx.TxHash() + data.AddHash(&txHash) + + for _, txIn := range tx.TxIn { + if txIn.SignatureScript != nil { + data.AddSigScript(txIn.SignatureScript) + } + } + } + + // Create the key by truncating the block hash. + blockHash := block.BlockHash() + key := Key(&blockHash) + + return gcs.NewFilter(P, key, data) +} diff --git a/gcs/doc.go b/gcs/doc.go new file mode 100644 index 0000000000..c316380cdb --- /dev/null +++ b/gcs/doc.go @@ -0,0 +1,26 @@ +// Copyright (c) 2016-2017 The btcsuite developers +// Copyright (c) 2016-2017 The Lightning Network Developers +// Copyright (c) 2018 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +/* +Package gcs provides an API for building and using a Golomb-coded set filter. + +Golomb-Coded Set + +A Golomb-coded set is a probabilistic data structure used similarly to a Bloom +filter. A filter uses constant-size overhead plus on average n+2 bits per item +added to the filter, where 2^-n is the desired false positive (collision) +probability. + +GCS use in Decred + +GCS filters are a mechanism for storing and transmitting per-block filters. The +usage is intended to be the inverse of Bloom filters: a consensus-validating +full node commits to a single filter for every block and serves the filter to +SPV clients that match against the filter locally to determine if the block is +potentially relevant. The suggested collision probability for Decred use is +2^-20. +*/ +package gcs diff --git a/gcs/gcs.go b/gcs/gcs.go new file mode 100644 index 0000000000..a25a70a855 --- /dev/null +++ b/gcs/gcs.go @@ -0,0 +1,358 @@ +// Copyright (c) 2016-2017 The btcsuite developers +// Copyright (c) 2016-2017 The Lightning Network Developers +// Copyright (c) 2018 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package gcs + +import ( + "encoding/binary" + "errors" + "math" + "sort" + + "github.com/aead/siphash" + "github.com/dchest/blake256" + "github.com/decred/dcrd/chaincfg/chainhash" +) + +// Inspired by https://github.com/rasky/gcs + +var ( + // ErrNTooBig signifies that the filter can't handle N items. + ErrNTooBig = errors.New("N does not fit in uint32") + + // ErrPTooBig signifies that the filter can't handle `1/2**P` + // collision probability. + ErrPTooBig = errors.New("P is too large") + + // ErrNoData signifies that an empty slice was passed. + ErrNoData = errors.New("no data provided") + + // ErrMisserialized signifies a filter was misserialized and is missing the + // N and/or P parameters of a serialized filter. + ErrMisserialized = errors.New("misserialized filter") +) + +// KeySize is the size of the byte array required for key material for the +// SipHash keyed hash function. +const KeySize = siphash.KeySize + +// Filter describes an immutable filter that can be built from a set of data +// elements, serialized, deserialized, and queried in a thread-safe manner. The +// serialized form is compressed as a Golomb Coded Set (GCS), but does not +// include N or P to allow the user to encode the metadata separately if +// necessary. The hash function used is SipHash, a keyed function; the key used +// in building the filter is required in order to match filter values and is +// not included in the serialized form. +type Filter struct { + n uint32 + p uint8 + modulusNP uint64 + filterNData []byte // 4 bytes n big endian, remainder is filter data +} + +// NewFilter builds a new GCS filter with the collision probability of +// `1/(2**P)`, key `key`, and including every `[]byte` in `data` as a member of +// the set. +func NewFilter(P uint8, key [KeySize]byte, data [][]byte) (*Filter, error) { + // Some initial parameter checks: make sure we have data from which to + // build the filter, and make sure our parameters will fit the hash + // function we're using. + if len(data) == 0 { + return nil, ErrNoData + } + if len(data) > math.MaxInt32 { + return nil, ErrNTooBig + } + if P > 32 { + return nil, ErrPTooBig + } + + // Create the filter object and insert metadata. + modP := uint64(1 << P) + modPMask := modP - 1 + f := Filter{ + n: uint32(len(data)), + p: P, + modulusNP: uint64(len(data)) * modP, + } + + // Allocate filter data. + values := make(uint64Slice, 0, len(data)) + + // Insert the hash (modulo N*P) of each data element into a slice and + // sort the slice. + for _, d := range data { + v := siphash.Sum64(d, &key) % f.modulusNP + values = append(values, v) + } + sort.Sort(values) + + var b bitWriter + + // Write the sorted list of values into the filter bitstream, + // compressing it using Golomb coding. + var value, lastValue, remainder uint64 + for _, v := range values { + // Calculate the difference between this value and the last, + // modulo P. + remainder = (v - lastValue) & modPMask + + // Calculate the difference between this value and the last, + // divided by P. + value = (v - lastValue - remainder) >> f.p + lastValue = v + + // Write the P multiple into the bitstream in unary; the + // average should be around 1 (2 bits - 0b10). + for value > 0 { + b.writeOne() + value-- + } + b.writeZero() + + // Write the remainder as a big-endian integer with enough bits + // to represent the appropriate collision probability. + b.writeNBits(remainder, uint(f.p)) + } + + // Save the filter data internally as n + filter bytes + ndata := make([]byte, 4+len(b.bytes)) + binary.BigEndian.PutUint32(ndata, f.n) + copy(ndata[4:], b.bytes) + f.filterNData = ndata + + return &f, nil +} + +// FromBytes deserializes a GCS filter from a known N, P, and serialized filter +// as returned by Bytes(). +func FromBytes(N uint32, P uint8, d []byte) (*Filter, error) { + // Basic sanity check. + if P > 32 { + return nil, ErrPTooBig + } + + // Save the filter data internally as n + filter bytes + ndata := make([]byte, 4+len(d)) + binary.BigEndian.PutUint32(ndata, N) + copy(ndata[4:], d) + + f := &Filter{ + n: N, + p: P, + modulusNP: uint64(N) * uint64(1< lastValue2: + // Advance filter created from search terms or return + // false if we're at the end because nothing matched. + if i < len(values) { + lastValue2 = values[i] + i++ + } else { + return false + } + case lastValue2 > lastValue1: + // Advance filter we're searching or return false if + // we're at the end because nothing matched. + value, err := f.readFullUint64(&b) + if err != nil { + return false + } + lastValue1 += value + } + } + + // If we've made it this far, an element matched between filters so we + // return true. + return true +} + +// readFullUint64 reads a value represented by the sum of a unary multiple of +// the filter's P modulus (`2**P`) and a big-endian P-bit remainder. +func (f *Filter) readFullUint64(b *bitReader) (uint64, error) { + v, err := b.readUnary() + if err != nil { + return 0, err + } + + rem, err := b.readNBits(uint(f.p)) + if err != nil { + return 0, err + } + + // Add the multiple and the remainder. + return v< 0 { + rpcsLog.Debugf("Found header of committed filter for %v", hash) + } else { + rpcsLog.Debugf("Could not find header of committed filter for %v: %v", + hash, err) + return nil, &dcrjson.RPCError{ + Code: dcrjson.ErrRPCBlockNotFound, + Message: "Block not found", + } + } + + hash.SetBytes(headerBytes) + return hash.String(), nil +} + // handleGetHeaders implements the getheaders command. func handleGetHeaders(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*dcrjson.GetHeadersCmd) diff --git a/rpcserverhelp.go b/rpcserverhelp.go index e276879d7e..5b460e24e1 100644 --- a/rpcserverhelp.go +++ b/rpcserverhelp.go @@ -409,6 +409,18 @@ var helpDescsEnUS = map[string]string{ "getblocktemplate--condition2": "mode=proposal, accepted", "getblocktemplate--result1": "An error string which represents why the proposal was rejected or nothing if accepted", + // GetCFilterCmd help. + "getcfilter--synopsis": "Returns the committed filter for a block", + "getcfilter--result0": "The committed filter serialized with the N value and encoded as a hex string", + "getcfilter-hash": "The block hash of the filter being queried", + "getcfilter-filtertype": "The type of committed filter to return", + + // GetCFilterHeaderCmd help. + "getcfilterheader--synopsis": "Returns the filter header hash committing to all filters in the chain up through a block", + "getcfilterheader--result0": "The filter header commitment hash", + "getcfilterheader-hash": "The block hash of the filter header being queried", + "getcfilterheader-filtertype": "The type of committed filter to return the header commitment for", + // GetChainTips help. "getchaintips--synopsis": "Returns information about all known chain tips the in the block tree.\n\n" + "The statuses in the result have the following meanings:\n" + @@ -929,6 +941,8 @@ var rpcResultTypes = map[string][]interface{}{ "getblockheader": {(*string)(nil), (*dcrjson.GetBlockHeaderVerboseResult)(nil)}, "getblocksubsidy": {(*dcrjson.GetBlockSubsidyResult)(nil)}, "getblocktemplate": {(*dcrjson.GetBlockTemplateResult)(nil), (*string)(nil), nil}, + "getcfilter": {(*string)(nil)}, + "getcfilterheader": {(*string)(nil)}, "getchaintips": {(*[]dcrjson.GetChainTipsResult)(nil)}, "getconnectioncount": {(*int32)(nil)}, "getcurrentnet": {(*uint32)(nil)}, diff --git a/server.go b/server.go index e680d30818..0e7a0841d7 100644 --- a/server.go +++ b/server.go @@ -28,6 +28,8 @@ import ( "github.com/decred/dcrd/connmgr" "github.com/decred/dcrd/database" "github.com/decred/dcrd/dcrutil" + "github.com/decred/dcrd/gcs" + "github.com/decred/dcrd/gcs/blockcf" "github.com/decred/dcrd/mempool" "github.com/decred/dcrd/mining" "github.com/decred/dcrd/peer" @@ -38,7 +40,7 @@ import ( const ( // defaultServices describes the default services that are supported by // the server. - defaultServices = wire.SFNodeNetwork | wire.SFNodeBloom + defaultServices = wire.SFNodeNetwork | wire.SFNodeBloom | wire.SFNodeCF // defaultRequiredServices describes the default services that are // required to be supported by outbound peers. @@ -54,7 +56,7 @@ const ( connectionRetryInterval = time.Second * 5 // maxProtocolVersion is the max protocol version the server supports. - maxProtocolVersion = wire.MaxBlockSizeVersion + maxProtocolVersion = wire.NodeCFVersion ) var ( @@ -178,6 +180,7 @@ type server struct { txIndex *indexers.TxIndex addrIndex *indexers.AddrIndex existsAddrIndex *indexers.ExistsAddrIndex + cfIndex *indexers.CFIndex } // serverPeer extends the peer to maintain state shared by the server and @@ -853,6 +856,225 @@ func (sp *serverPeer) OnGetHeaders(p *peer.Peer, msg *wire.MsgGetHeaders) { p.QueueMessage(&wire.MsgHeaders{Headers: blockHeaders}, nil) } +// OnGetCFilter is invoked when a peer receives a getcfilter wire message. +func (sp *serverPeer) OnGetCFilter(p *peer.Peer, msg *wire.MsgGetCFilter) { + // Disconnect and/or ban depending on the node cf services flag and + // negotiated protocol version. + if !sp.enforceNodeCFFlag(msg.Command()) { + return + } + + // Ignore getcfilter requests if cfg.NoCFilters is set or we're not in sync. + if cfg.NoCFilters || !sp.server.blockManager.IsCurrent() { + return + } + + // Check for understood filter type. + switch msg.FilterType { + case wire.GCSFilterRegular, wire.GCSFilterExtended: + default: + peerLog.Warnf("OnGetCFilter: unsupported filter type %v", + msg.FilterType) + return + } + + filterBytes, err := sp.server.cfIndex.FilterByBlockHash(&msg.BlockHash, + msg.FilterType) + if err != nil { + peerLog.Errorf("OnGetCFilter: failed to fetch cfilter: %v", err) + return + } + + // If the filter is not saved in the index (perhaps it was removed as a + // block was disconnected, or this has always been a sidechain block) build + // the filter on the spot. + if len(filterBytes) == 0 { + block, err := sp.server.blockManager.chain.FetchBlockByHash( + &msg.BlockHash) + if err != nil { + peerLog.Errorf("OnGetCFilter: failed to fetch non-mainchain "+ + "block %v: %v", &msg.BlockHash, err) + return + } + + var f *gcs.Filter + switch msg.FilterType { + case wire.GCSFilterRegular: + f, err = blockcf.Regular(block.MsgBlock()) + if err != nil { + peerLog.Errorf("OnGetCFilter: failed to build regular "+ + "cfilter for block %v: %v", &msg.BlockHash, err) + return + } + case wire.GCSFilterExtended: + f, err = blockcf.Extended(block.MsgBlock()) + if err != nil { + peerLog.Errorf("OnGetCFilter: failed to build extended "+ + "cfilter for block %v: %v", &msg.BlockHash, err) + return + } + default: + peerLog.Errorf("OnGetCFilter: unhandled filter type %d", + msg.FilterType) + return + } + + filterBytes = f.NBytes() + } + + peerLog.Tracef("Obtained CF for %v", &msg.BlockHash) + + filterMsg := wire.NewMsgCFilter(&msg.BlockHash, msg.FilterType, + filterBytes) + sp.QueueMessage(filterMsg, nil) +} + +// OnGetCFHeaders is invoked when a peer receives a getcfheader wire message. +func (sp *serverPeer) OnGetCFHeaders(p *peer.Peer, msg *wire.MsgGetCFHeaders) { + // Disconnect and/or ban depending on the node cf services flag and + // negotiated protocol version. + if !sp.enforceNodeCFFlag(msg.Command()) { + return + } + + // Ignore getcfheader requests if cfg.NoCFilters is set or we're not in + // sync. + if cfg.NoCFilters || !sp.server.blockManager.IsCurrent() { + return + } + + // Check for understood filter type. + switch msg.FilterType { + case wire.GCSFilterRegular, wire.GCSFilterExtended: + default: + peerLog.Warnf("OnGetCFilter: unsupported filter type %v", + msg.FilterType) + return + } + + // Attempt to look up the height of the provided stop hash. + chain := sp.server.blockManager.chain + endIdx := int64(math.MaxInt64) + height, err := chain.BlockHeightByHash(&msg.HashStop) + if err == nil { + endIdx = height + 1 + } + + // There are no block locators so a specific header is being requested + // as identified by the stop hash. + if len(msg.BlockLocatorHashes) == 0 { + // No blocks with the stop hash were found so there is nothing + // to do. Just return. This behavior mirrors the reference + // implementation. + if endIdx == math.MaxInt32 { + return + } + + // Fetch the raw committed filter header bytes from the + // database. + headerBytes, err := sp.server.cfIndex.FilterHeaderByBlockHash( + &msg.HashStop, msg.FilterType) + if err != nil || len(headerBytes) == 0 { + peerLog.Warnf("Could not obtain CF header for %v: %v", + msg.HashStop, err) + return + } + + // Deserialize the hash. + var header chainhash.Hash + err = header.SetBytes(headerBytes) + if err != nil { + peerLog.Warnf("Committed filter header deserialize "+ + "failed: %v", err) + return + } + + headersMsg := wire.NewMsgCFHeaders() + headersMsg.AddCFHeader(&header) + headersMsg.StopHash = msg.HashStop + headersMsg.FilterType = msg.FilterType + sp.QueueMessage(headersMsg, nil) + return + } + + // Find the most recent known block based on the block locator. + // Use the block after the genesis block if no other blocks in the + // provided locator are known. This does mean the client will start + // over with the genesis block if unknown block locators are provided. + // This mirrors the behavior in the reference implementation. + startIdx := int64(1) + for _, hash := range msg.BlockLocatorHashes { + height, err := chain.BlockHeightByHash(hash) + if err == nil { + // Start with the next hash since we know this one. + startIdx = height + 1 + break + } + } + + // Don't attempt to fetch more than we can put into a single message. + if endIdx-startIdx > wire.MaxBlockHeadersPerMsg { + endIdx = startIdx + wire.MaxBlockHeadersPerMsg + } + + // Fetch the inventory from the block database. + hashList, err := chain.HeightRange(startIdx, endIdx) + if err != nil { + peerLog.Warnf("Header lookup failed: %v", err) + return + } + if len(hashList) == 0 { + return + } + + // Generate cfheaders message and send it. + headersMsg := wire.NewMsgCFHeaders() + for i := range hashList { + // Fetch the raw committed filter header bytes from the + // database. + headerBytes, err := sp.server.cfIndex.FilterHeaderByBlockHash( + &hashList[i], msg.FilterType) + if (err != nil) || (len(headerBytes) == 0) { + peerLog.Warnf("Could not obtain CF header for %v: %v", + hashList[i], err) + return + } + + // Deserialize the hash. + var header chainhash.Hash + err = header.SetBytes(headerBytes) + if err != nil { + peerLog.Warnf("Committed filter header deserialize "+ + "failed: %v", err) + return + } + + headersMsg.AddCFHeader(&header) + } + + headersMsg.FilterType = msg.FilterType + headersMsg.StopHash = hashList[len(hashList)-1] + sp.QueueMessage(headersMsg, nil) +} + +// OnGetCFTypes is invoked when a peer receives a getcftypes wire message. +func (sp *serverPeer) OnGetCFTypes(p *peer.Peer, msg *wire.MsgGetCFTypes) { + // Disconnect and/or ban depending on the node cf services flag and + // negotiated protocol version. + if !sp.enforceNodeCFFlag(msg.Command()) { + return + } + + // Ignore getcftypes requests if cfg.NoCFilters is set. + if cfg.NoCFilters { + return + } + + cfTypesMsg := wire.NewMsgCFTypes([]wire.FilterType{ + wire.GCSFilterRegular, wire.GCSFilterExtended}) + sp.QueueMessage(cfTypesMsg, nil) +} + // enforceNodeBloomFlag disconnects the peer if the server is not configured to // allow bloom filters. Additionally, if the peer has negotiated to a protocol // version that is high enough to observe the bloom filter service support bit, @@ -888,6 +1110,42 @@ func (sp *serverPeer) enforceNodeBloomFlag(cmd string) bool { return true } +// enforceNodeCFFlag disconnects the peer if the server is not configured to +// allow committed filters. Additionally, if the peer has negotiated to a +// protocol version that is high enough to observe the committed filter service +// support bit, it will be banned since it is intentionally violating the +// protocol. +func (sp *serverPeer) enforceNodeCFFlag(cmd string) bool { + if sp.server.services&wire.SFNodeCF != wire.SFNodeCF { + // Ban the peer if the protocol version is high enough that the + // peer is knowingly violating the protocol and banning is + // enabled. + // + // NOTE: Even though the addBanScore function already examines + // whether or not banning is enabled, it is checked here as well + // to ensure the violation is logged and the peer is + // disconnected regardless. + if sp.ProtocolVersion() >= wire.NodeCFVersion && + !cfg.DisableBanning { + + // Disonnect the peer regardless of whether it was + // banned. + sp.addBanScore(100, 0, cmd) + sp.Disconnect() + return false + } + + // Disconnect the peer regardless of protocol version or banning + // state. + peerLog.Debugf("%s sent an unsupported %s request -- "+ + "disconnecting", sp, cmd) + sp.Disconnect() + return false + } + + return true +} + // OnFilterAdd is invoked when a peer receives a filteradd wire message and is // used by remote peers to add data to an already loaded bloom filter. The peer // will be disconnected if a filter is not loaded when this message is received. @@ -1623,6 +1881,9 @@ func newPeerConfig(sp *serverPeer) *peer.Config { OnGetData: sp.OnGetData, OnGetBlocks: sp.OnGetBlocks, OnGetHeaders: sp.OnGetHeaders, + OnGetCFilter: sp.OnGetCFilter, + OnGetCFHeaders: sp.OnGetCFHeaders, + OnGetCFTypes: sp.OnGetCFTypes, OnFilterAdd: sp.OnFilterAdd, OnFilterClear: sp.OnFilterClear, OnFilterLoad: sp.OnFilterLoad, @@ -2240,6 +2501,9 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param if cfg.NoPeerBloomFilters { services &^= wire.SFNodeBloom } + if cfg.NoCFilters { + services &^= wire.SFNodeCF + } amgr := addrmgr.New(cfg.DataDir, dcrdLookup) @@ -2420,6 +2684,11 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param s.existsAddrIndex = indexers.NewExistsAddrIndex(db, chainParams) indexes = append(indexes, s.existsAddrIndex) } + if !cfg.NoCFilters { + indxLog.Info("CF index is enabled") + s.cfIndex = indexers.NewCfIndex(db, chainParams) + indexes = append(indexes, s.cfIndex) + } // Create an index manager if any of the optional indexes are enabled. var indexManager blockchain.IndexManager diff --git a/wire/message.go b/wire/message.go index da90c5ad2d..967ad8f093 100644 --- a/wire/message.go +++ b/wire/message.go @@ -53,6 +53,12 @@ const ( CmdReject = "reject" CmdSendHeaders = "sendheaders" CmdFeeFilter = "feefilter" + CmdGetCFilter = "getcfilter" + CmdGetCFHeaders = "getcfheaders" + CmdGetCFTypes = "getcftypes" + CmdCFilter = "cfilter" + CmdCFHeaders = "cfheaders" + CmdCFTypes = "cftypes" ) // Message is an interface that describes a Decred message. A type that @@ -143,6 +149,24 @@ func makeEmptyMessage(command string) (Message, error) { case CmdFeeFilter: msg = &MsgFeeFilter{} + case CmdGetCFilter: + msg = &MsgGetCFilter{} + + case CmdGetCFHeaders: + msg = &MsgGetCFHeaders{} + + case CmdGetCFTypes: + msg = &MsgGetCFTypes{} + + case CmdCFilter: + msg = &MsgCFilter{} + + case CmdCFHeaders: + msg = &MsgCFHeaders{} + + case CmdCFTypes: + msg = &MsgCFTypes{} + default: return nil, fmt.Errorf("unhandled command [%s]", command) } diff --git a/wire/message_test.go b/wire/message_test.go index 6fe37e1380..90928f0671 100644 --- a/wire/message_test.go +++ b/wire/message_test.go @@ -72,6 +72,13 @@ func TestMessage(t *testing.T) { msgFilterAdd := NewMsgFilterAdd([]byte{0x01}) msgFilterClear := NewMsgFilterClear() msgFilterLoad := NewMsgFilterLoad([]byte{0x01}, 10, 0, BloomUpdateNone) + msgGetCFilter := NewMsgGetCFilter(&chainhash.Hash{}, GCSFilterExtended) + msgGetCFHeaders := NewMsgGetCFHeaders() + msgGetCFTypes := NewMsgGetCFTypes() + msgCFilter := NewMsgCFilter(&chainhash.Hash{}, GCSFilterExtended, + []byte("payload")) + msgCFHeaders := NewMsgCFHeaders() + msgCFTypes := NewMsgCFTypes([]FilterType{GCSFilterExtended}) bh := NewBlockHeader( int32(0), // Version &chainhash.Hash{}, // PrevHash @@ -101,26 +108,32 @@ func TestMessage(t *testing.T) { dcrnet CurrencyNet // Network to use for wire encoding bytes int // Expected num bytes read/written }{ - {msgVersion, msgVersion, pver, MainNet, 125}, // [0] - {msgVerack, msgVerack, pver, MainNet, 24}, // [1] - {msgGetAddr, msgGetAddr, pver, MainNet, 24}, // [2] - {msgAddr, msgAddr, pver, MainNet, 25}, // [3] - {msgGetBlocks, msgGetBlocks, pver, MainNet, 61}, // [4] - {msgBlock, msgBlock, pver, MainNet, 522}, // [5] - {msgInv, msgInv, pver, MainNet, 25}, // [6] - {msgGetData, msgGetData, pver, MainNet, 25}, // [7] - {msgNotFound, msgNotFound, pver, MainNet, 25}, // [8] - {msgTx, msgTx, pver, MainNet, 39}, // [9] - {msgPing, msgPing, pver, MainNet, 32}, // [10] - {msgPong, msgPong, pver, MainNet, 32}, // [11] - {msgGetHeaders, msgGetHeaders, pver, MainNet, 61}, // [12] - {msgHeaders, msgHeaders, pver, MainNet, 25}, // [13] - {msgMemPool, msgMemPool, pver, MainNet, 24}, // [15] - {msgFilterAdd, msgFilterAdd, pver, MainNet, 26}, // [16] - {msgFilterClear, msgFilterClear, pver, MainNet, 24}, // [17] - {msgFilterLoad, msgFilterLoad, pver, MainNet, 35}, // [18] - {msgMerkleBlock, msgMerkleBlock, pver, MainNet, 215}, // [19] - {msgReject, msgReject, pver, MainNet, 79}, // [20] + {msgVersion, msgVersion, pver, MainNet, 125}, // [0] + {msgVerack, msgVerack, pver, MainNet, 24}, // [1] + {msgGetAddr, msgGetAddr, pver, MainNet, 24}, // [2] + {msgAddr, msgAddr, pver, MainNet, 25}, // [3] + {msgGetBlocks, msgGetBlocks, pver, MainNet, 61}, // [4] + {msgBlock, msgBlock, pver, MainNet, 522}, // [5] + {msgInv, msgInv, pver, MainNet, 25}, // [6] + {msgGetData, msgGetData, pver, MainNet, 25}, // [7] + {msgNotFound, msgNotFound, pver, MainNet, 25}, // [8] + {msgTx, msgTx, pver, MainNet, 39}, // [9] + {msgPing, msgPing, pver, MainNet, 32}, // [10] + {msgPong, msgPong, pver, MainNet, 32}, // [11] + {msgGetHeaders, msgGetHeaders, pver, MainNet, 61}, // [12] + {msgHeaders, msgHeaders, pver, MainNet, 25}, // [13] + {msgMemPool, msgMemPool, pver, MainNet, 24}, // [15] + {msgFilterAdd, msgFilterAdd, pver, MainNet, 26}, // [16] + {msgFilterClear, msgFilterClear, pver, MainNet, 24}, // [17] + {msgFilterLoad, msgFilterLoad, pver, MainNet, 35}, // [18] + {msgMerkleBlock, msgMerkleBlock, pver, MainNet, 215}, // [19] + {msgReject, msgReject, pver, MainNet, 79}, // [20] + {msgGetCFilter, msgGetCFilter, pver, MainNet, 57}, // [21] + {msgGetCFHeaders, msgGetCFHeaders, pver, MainNet, 58}, // [22] + {msgGetCFTypes, msgGetCFTypes, pver, MainNet, 24}, // [23] + {msgCFilter, msgCFilter, pver, MainNet, 65}, // [24] + {msgCFHeaders, msgCFHeaders, pver, MainNet, 58}, // [25] + {msgCFTypes, msgCFTypes, pver, MainNet, 26}, // [26] } t.Logf("Running %d tests", len(tests)) diff --git a/wire/msgcfheaders.go b/wire/msgcfheaders.go new file mode 100644 index 0000000000..64192a853e --- /dev/null +++ b/wire/msgcfheaders.go @@ -0,0 +1,181 @@ +// Copyright (c) 2017 The btcsuite developers +// Copyright (c) 2017 The Lightning Network Developers +// Copyright (c) 2018 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package wire + +import ( + "fmt" + "io" + + "github.com/decred/dcrd/chaincfg/chainhash" +) + +const ( + // MaxCFHeaderPayload is the maximum byte size of a committed + // filter header. + MaxCFHeaderPayload = chainhash.HashSize + + // MaxCFHeadersPerMsg is the maximum number of committed filter headers + // that can be in a single cfheaders message. + MaxCFHeadersPerMsg = 2000 +) + +// MsgCFHeaders implements the Message interface and represents a cfheaders +// message. It is used to deliver committed filter header information in +// response to a getcfheaders message (MsgGetCFHeaders). The maximum number of +// committed filter headers per message is currently 2000. See MsgGetCFHeaders +// for details on requesting the headers. +type MsgCFHeaders struct { + StopHash chainhash.Hash + FilterType FilterType + HeaderHashes []*chainhash.Hash +} + +// AddCFHeader adds a new committed filter header to the message. +func (msg *MsgCFHeaders) AddCFHeader(headerHash *chainhash.Hash) error { + if len(msg.HeaderHashes)+1 > MaxCFHeadersPerMsg { + str := fmt.Sprintf("too many block headers in message [max %v]", + MaxBlockHeadersPerMsg) + return messageError("MsgCFHeaders.AddCFHeader", str) + } + + msg.HeaderHashes = append(msg.HeaderHashes, headerHash) + return nil +} + +// BtcDecode decodes r using the wire protocol encoding into the receiver. +// This is part of the Message interface implementation. +func (msg *MsgCFHeaders) BtcDecode(r io.Reader, pver uint32) error { + if pver < NodeCFVersion { + str := fmt.Sprintf("cfheaders message invalid for protocol "+ + "version %d", pver) + return messageError("MsgCFHeaders.BtcDecode", str) + } + + // Read stop hash + err := readElement(r, &msg.StopHash) + if err != nil { + return err + } + + // Read filter type + err = readElement(r, (*uint8)(&msg.FilterType)) + if err != nil { + return err + } + + // Read number of filter headers + count, err := ReadVarInt(r, pver) + if err != nil { + return err + } + + // Limit to max committed filter headers per message. + if count > MaxCFHeadersPerMsg { + str := fmt.Sprintf("too many committed filter headers for "+ + "message [count %v, max %v]", count, + MaxBlockHeadersPerMsg) + return messageError("MsgCFHeaders.BtcDecode", str) + } + + // Create a contiguous slice of headers to deserialize into in order to + // reduce the number of allocations. + msg.HeaderHashes = make([]*chainhash.Hash, 0, count) + for i := uint64(0); i < count; i++ { + var cfh chainhash.Hash + err := readElement(r, &cfh) + if err != nil { + return err + } + msg.AddCFHeader(&cfh) + } + + return nil +} + +// BtcEncode encodes the receiver to w using the wire protocol encoding. +// This is part of the Message interface implementation. +func (msg *MsgCFHeaders) BtcEncode(w io.Writer, pver uint32) error { + if pver < NodeCFVersion { + str := fmt.Sprintf("cfheaders message invalid for protocol "+ + "version %d", pver) + return messageError("MsgCFHeaders.BtcEncode", str) + } + + // Write stop hash + err := writeElement(w, &msg.StopHash) + if err != nil { + return err + } + + // Write filter type + err = binarySerializer.PutUint8(w, uint8(msg.FilterType)) + if err != nil { + return err + } + + // Limit to max committed headers per message. + count := len(msg.HeaderHashes) + if count > MaxCFHeadersPerMsg { + str := fmt.Sprintf("too many committed filter headers for "+ + "message [count %v, max %v]", count, + MaxBlockHeadersPerMsg) + return messageError("MsgCFHeaders.BtcEncode", str) + } + + err = WriteVarInt(w, pver, uint64(count)) + if err != nil { + return err + } + + for _, cfh := range msg.HeaderHashes { + err := writeElement(w, cfh) + if err != nil { + return err + } + } + + return nil +} + +// Deserialize decodes a filter header from r into the receiver using a format +// that is suitable for long-term storage such as a database. This function +// differs from BtcDecode in that BtcDecode decodes from the wire protocol as it +// was sent across the network. The wire encoding can technically differ +// depending on the protocol version and doesn't even really need to match the +// format of a stored filter header at all. As of the time this comment was +// written, the encoded filter header is the same in both instances, but there +// is a distinct difference and separating the two allows the API to be flexible +// enough to deal with changes. +func (msg *MsgCFHeaders) Deserialize(r io.Reader) error { + // At the current time, there is no difference between the wire encoding + // and the stable long-term storage format. As a result, make use of + // BtcDecode. + return msg.BtcDecode(r, 0) +} + +// Command returns the protocol command string for the message. This is part +// of the Message interface implementation. +func (msg *MsgCFHeaders) Command() string { + return CmdCFHeaders +} + +// MaxPayloadLength returns the maximum length the payload can be for the +// receiver. This is part of the Message interface implementation. +func (msg *MsgCFHeaders) MaxPayloadLength(pver uint32) uint32 { + // Hash size + filter type + num headers (varInt) + + // (header size * max headers). + return chainhash.HashSize + 1 + MaxVarIntPayload + + (MaxCFHeaderPayload * MaxCFHeadersPerMsg) +} + +// NewMsgCFHeaders returns a new cfheaders message that conforms to the Message +// interface. See MsgCFHeaders for details. +func NewMsgCFHeaders() *MsgCFHeaders { + return &MsgCFHeaders{ + HeaderHashes: make([]*chainhash.Hash, 0, MaxCFHeadersPerMsg), + } +} diff --git a/wire/msgcfilter.go b/wire/msgcfilter.go new file mode 100644 index 0000000000..581f5d0921 --- /dev/null +++ b/wire/msgcfilter.go @@ -0,0 +1,124 @@ +// Copyright (c) 2017 The btcsuite developers +// Copyright (c) 2017 The Lightning Network Developers +// Copyright (c) 2018 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package wire + +import ( + "fmt" + "io" + + "github.com/decred/dcrd/chaincfg/chainhash" +) + +const ( + // MaxCFilterDataSize is the maximum byte size of a committed filter. + // The maximum size is currently defined as 256KiB. + MaxCFilterDataSize = 256 * 1024 +) + +// MsgCFilter implements the Message interface and represents a cfilter message. +// It is used to deliver a committed filter in response to a getcfilter +// (MsgGetCFilter) message. +type MsgCFilter struct { + BlockHash chainhash.Hash + FilterType FilterType + Data []byte +} + +// BtcDecode decodes r using the wire protocol encoding into the receiver. +// This is part of the Message interface implementation. +func (msg *MsgCFilter) BtcDecode(r io.Reader, pver uint32) error { + if pver < NodeCFVersion { + str := fmt.Sprintf("cfilter message invalid for protocol "+ + "version %d", pver) + return messageError("MsgCFilter.BtcDecode", str) + } + + // Read the hash of the filter's block + err := readElement(r, &msg.BlockHash) + if err != nil { + return err + } + + // Read filter type + err = readElement(r, (*uint8)(&msg.FilterType)) + if err != nil { + return err + } + + // Read filter data + msg.Data, err = ReadVarBytes(r, pver, MaxCFilterDataSize, + "cfilter data") + return err +} + +// BtcEncode encodes the receiver to w using the wire protocol encoding. This is +// part of the Message interface implementation. +func (msg *MsgCFilter) BtcEncode(w io.Writer, pver uint32) error { + if pver < NodeCFVersion { + str := fmt.Sprintf("cfilter message invalid for protocol "+ + "version %d", pver) + return messageError("MsgCFHeaders.BtcEncode", str) + } + + size := len(msg.Data) + if size > MaxCFilterDataSize { + str := fmt.Sprintf("cfilter size too large for message "+ + "[size %v, max %v]", size, MaxCFilterDataSize) + return messageError("MsgCFilter.BtcEncode", str) + } + + err := writeElement(w, &msg.BlockHash) + if err != nil { + return err + } + + err = binarySerializer.PutUint8(w, uint8(msg.FilterType)) + if err != nil { + return err + } + + return WriteVarBytes(w, pver, msg.Data) +} + +// Deserialize decodes a filter from r into the receiver using a format that is +// suitable for long-term storage such as a database. This function differs from +// BtcDecode in that BtcDecode decodes from the wire protocol as it was sent +// across the network. The wire encoding can technically differ depending on +// the protocol version and doesn't even really need to match the format of a +// stored filter at all. As of the time this comment was written, the encoded +// filter is the same in both instances, but there is a distinct difference and +// separating the two allows the API to be flexible enough to deal with changes. +func (msg *MsgCFilter) Deserialize(r io.Reader) error { + // At the current time, there is no difference between the wire encoding + // and the stable long-term storage format. As a result, make use of + // BtcDecode. + return msg.BtcDecode(r, 0) +} + +// Command returns the protocol command string for the message. This is part +// of the Message interface implementation. +func (msg *MsgCFilter) Command() string { + return CmdCFilter +} + +// MaxPayloadLength returns the maximum length the payload can be for the +// receiver. This is part of the Message interface implementation. +func (msg *MsgCFilter) MaxPayloadLength(pver uint32) uint32 { + return uint32(VarIntSerializeSize(MaxCFilterDataSize)) + + MaxCFilterDataSize + chainhash.HashSize + 1 +} + +// NewMsgCFilter returns a new cfilter message that conforms to the Message +// interface. See MsgCFilter for details. +func NewMsgCFilter(blockHash *chainhash.Hash, filterType FilterType, + data []byte) *MsgCFilter { + return &MsgCFilter{ + BlockHash: *blockHash, + FilterType: filterType, + Data: data, + } +} diff --git a/wire/msgcftypes.go b/wire/msgcftypes.go new file mode 100644 index 0000000000..5c4df7c14a --- /dev/null +++ b/wire/msgcftypes.go @@ -0,0 +1,126 @@ +// Copyright (c) 2017 The btcsuite developers +// Copyright (c) 2017 The Lightning Network Developers +// Copyright (c) 2018 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package wire + +import ( + "fmt" + "io" +) + +// MaxFilterTypesPerMsg is the maximum number of filter types allowed per +// message. +const MaxFilterTypesPerMsg = 256 + +// FilterType is used to represent a filter type. +type FilterType uint8 + +const ( + // GCSFilterRegular is the regular filter type. + GCSFilterRegular FilterType = iota + + // GCSFilterExtended is the extended filter type. + GCSFilterExtended +) + +// MsgCFTypes is the cftypes message. +type MsgCFTypes struct { + SupportedFilters []FilterType +} + +// BtcDecode decodes r using the wire protocol encoding into the receiver. +// This is part of the Message interface implementation. +func (msg *MsgCFTypes) BtcDecode(r io.Reader, pver uint32) error { + if pver < NodeCFVersion { + str := fmt.Sprintf("cftypes message invalid for protocol "+ + "version %d", pver) + return messageError("MsgCFTypes.BtcDecode", str) + } + + // Read the number of filter types supported. The count may not exceed the + // total number of filters that can be represented by a FilterType byte. + count, err := ReadVarInt(r, pver) + if err != nil { + return err + } + if count > MaxFilterTypesPerMsg { + str := fmt.Sprintf("too many filter types for for message "+ + "[count %v, max %v]", count, MaxFilterTypesPerMsg) + return messageError("MsgCFTypes.BtcDecode", str) + } + + // Read each filter type. + msg.SupportedFilters = make([]FilterType, count) + for i := range msg.SupportedFilters { + err = readElement(r, (*uint8)(&msg.SupportedFilters[i])) + if err != nil { + return err + } + } + + return nil +} + +// BtcEncode encodes the receiver to w using the wire protocol encoding. This is +// part of the Message interface implementation. +func (msg *MsgCFTypes) BtcEncode(w io.Writer, pver uint32) error { + if pver < NodeCFVersion { + str := fmt.Sprintf("cftypes message invalid for protocol "+ + "version %d", pver) + return messageError("MsgCFTypes.BtcEncode", str) + } + + // Write length of supported filters slice. We assume it's deduplicated. + err := WriteVarInt(w, pver, uint64(len(msg.SupportedFilters))) + if err != nil { + return err + } + + for i := range msg.SupportedFilters { + err = binarySerializer.PutUint8(w, uint8(msg.SupportedFilters[i])) + if err != nil { + return err + } + } + + return nil +} + +// Deserialize decodes a filter from r into the receiver using a format that is +// suitable for long-term storage such as a database. This function differs from +// BtcDecode in that BtcDecode decodes from the wire protocol as it was sent +// across the network. The wire encoding can technically differ depending on +// the protocol version and doesn't even really need to match the format of a +// stored filter at all. As of the time this comment was written, the encoded +// filter is the same in both instances, but there is a distinct difference and +// separating the two allows the API to be flexible enough to deal with changes. +func (msg *MsgCFTypes) Deserialize(r io.Reader) error { + // At the current time, there is no difference between the wire encoding + // and the stable long-term storage format. As a result, make use of + // BtcDecode. + return msg.BtcDecode(r, 0) +} + +// Command returns the protocol command string for the message. This is part +// of the Message interface implementation. +func (msg *MsgCFTypes) Command() string { + return CmdCFTypes +} + +// MaxPayloadLength returns the maximum length the payload can be for the +// receiver. This is part of the Message interface implementation. +func (msg *MsgCFTypes) MaxPayloadLength(pver uint32) uint32 { + // 2 bytes for filter count, and 1 byte for up to 256 filter types. + return 258 +} + +// NewMsgCFTypes returns a new cftypes message that conforms to the Message +// interface. See MsgCFTypes for details. +func NewMsgCFTypes(filterTypes []FilterType) *MsgCFTypes { + return &MsgCFTypes{ + SupportedFilters: filterTypes, + } +} diff --git a/wire/msggetcfheaders.go b/wire/msggetcfheaders.go new file mode 100644 index 0000000000..1a2f33c01a --- /dev/null +++ b/wire/msggetcfheaders.go @@ -0,0 +1,138 @@ +// Copyright (c) 2017 The btcsuite developers +// Copyright (c) 2017 The Lightning Network Developers +// Copyright (c) 2018 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package wire + +import ( + "fmt" + "io" + + "github.com/decred/dcrd/chaincfg/chainhash" +) + +// MsgGetCFHeaders is a message similar to MsgGetHeaders, but for committed +// filter headers. It allows to set the FilterType field to get headers in the +// chain of basic (0x00) or extended (0x01) headers. +type MsgGetCFHeaders struct { + BlockLocatorHashes []*chainhash.Hash + HashStop chainhash.Hash + FilterType FilterType +} + +// AddBlockLocatorHash adds a new block locator hash to the message. +func (msg *MsgGetCFHeaders) AddBlockLocatorHash(hash *chainhash.Hash) error { + if len(msg.BlockLocatorHashes)+1 > MaxBlockLocatorsPerMsg { + str := fmt.Sprintf("too many block locator hashes for message [max %v]", + MaxBlockLocatorsPerMsg) + return messageError("MsgGetCFHeaders.AddBlockLocatorHash", str) + } + + msg.BlockLocatorHashes = append(msg.BlockLocatorHashes, hash) + return nil +} + +// BtcDecode decodes r using the wire protocol encoding into the receiver. +// This is part of the Message interface implementation. +func (msg *MsgGetCFHeaders) BtcDecode(r io.Reader, pver uint32) error { + if pver < NodeCFVersion { + str := fmt.Sprintf("getcfheaders message invalid for protocol "+ + "version %d", pver) + return messageError("MsgCFHeaders.BtcDecode", str) + } + + // Read num block locator hashes and limit to max. + count, err := ReadVarInt(r, pver) + if err != nil { + return err + } + if count > MaxBlockLocatorsPerMsg { + str := fmt.Sprintf("too many block locator hashes for message "+ + "[count %v, max %v]", count, MaxBlockLocatorsPerMsg) + return messageError("MsgGetHeaders.BtcDecode", str) + } + + // Create a contiguous slice of hashes to deserialize into in order to + // reduce the number of allocations. + locatorHashes := make([]chainhash.Hash, count) + msg.BlockLocatorHashes = make([]*chainhash.Hash, 0, count) + for i := uint64(0); i < count; i++ { + hash := &locatorHashes[i] + err := readElement(r, hash) + if err != nil { + return err + } + msg.AddBlockLocatorHash(hash) + } + + err = readElement(r, &msg.HashStop) + if err != nil { + return err + } + + return readElement(r, (*uint8)(&msg.FilterType)) +} + +// BtcEncode encodes the receiver to w using the wire protocol encoding. +// This is part of the Message interface implementation. +func (msg *MsgGetCFHeaders) BtcEncode(w io.Writer, pver uint32) error { + if pver < NodeCFVersion { + str := fmt.Sprintf("getcfheaders message invalid for protocol "+ + "version %d", pver) + return messageError("MsgCFHeaders.BtcEncode", str) + } + + // Limit to max block locator hashes per message. + count := len(msg.BlockLocatorHashes) + if count > MaxBlockLocatorsPerMsg { + str := fmt.Sprintf("too many block locator hashes for message "+ + "[count %v, max %v]", count, MaxBlockLocatorsPerMsg) + return messageError("MsgGetHeaders.BtcEncode", str) + } + + err := WriteVarInt(w, pver, uint64(count)) + if err != nil { + return err + } + + for _, hash := range msg.BlockLocatorHashes { + err := writeElement(w, hash) + if err != nil { + return err + } + } + + err = writeElement(w, &msg.HashStop) + if err != nil { + return err + } + + return binarySerializer.PutUint8(w, uint8(msg.FilterType)) +} + +// Command returns the protocol command string for the message. This is part +// of the Message interface implementation. +func (msg *MsgGetCFHeaders) Command() string { + return CmdGetCFHeaders +} + +// MaxPayloadLength returns the maximum length the payload can be for the +// receiver. This is part of the Message interface implementation. +func (msg *MsgGetCFHeaders) MaxPayloadLength(pver uint32) uint32 { + // Num block locator hashes (varInt) + max allowed + // block locators + hash stop + filter type 1 byte. + return MaxVarIntPayload + (MaxBlockLocatorsPerMsg * + chainhash.HashSize) + chainhash.HashSize + 1 +} + +// NewMsgGetCFHeaders returns a new getcfheader message that conforms to the +// Message interface using the passed parameters and defaults for the remaining +// fields. +func NewMsgGetCFHeaders() *MsgGetCFHeaders { + return &MsgGetCFHeaders{ + BlockLocatorHashes: make([]*chainhash.Hash, 0, + MaxBlockLocatorsPerMsg), + } +} diff --git a/wire/msggetcfilter.go b/wire/msggetcfilter.go new file mode 100644 index 0000000000..85a11343c3 --- /dev/null +++ b/wire/msggetcfilter.go @@ -0,0 +1,76 @@ +// Copyright (c) 2017 The btcsuite developers +// Copyright (c) 2017 The Lightning Network Developers +// Copyright (c) 2018 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package wire + +import ( + "fmt" + "io" + + "github.com/decred/dcrd/chaincfg/chainhash" +) + +// MsgGetCFilter implements the Message interface and represents a getcfilter +// message. It is used to request a committed filter for a block. +type MsgGetCFilter struct { + BlockHash chainhash.Hash + FilterType FilterType +} + +// BtcDecode decodes r using the wire protocol encoding into the receiver. +// This is part of the Message interface implementation. +func (msg *MsgGetCFilter) BtcDecode(r io.Reader, pver uint32) error { + if pver < NodeCFVersion { + str := fmt.Sprintf("getcfilter message invalid for protocol "+ + "version %d", pver) + return messageError("MsgGetCFilter.BtcDecode", str) + } + + err := readElement(r, &msg.BlockHash) + if err != nil { + return err + } + return readElement(r, (*uint8)(&msg.FilterType)) +} + +// BtcEncode encodes the receiver to w using the wire protocol encoding. +// This is part of the Message interface implementation. +func (msg *MsgGetCFilter) BtcEncode(w io.Writer, pver uint32) error { + if pver < NodeCFVersion { + str := fmt.Sprintf("getcfilter message invalid for protocol "+ + "version %d", pver) + return messageError("MsgGetCFilter.BtcEncode", str) + } + + err := writeElement(w, &msg.BlockHash) + if err != nil { + return err + } + return binarySerializer.PutUint8(w, uint8(msg.FilterType)) +} + +// Command returns the protocol command string for the message. This is part +// of the Message interface implementation. +func (msg *MsgGetCFilter) Command() string { + return CmdGetCFilter +} + +// MaxPayloadLength returns the maximum length the payload can be for the +// receiver. This is part of the Message interface implementation. +func (msg *MsgGetCFilter) MaxPayloadLength(pver uint32) uint32 { + // Block hash + filter type. + return chainhash.HashSize + 1 +} + +// NewMsgGetCFilter returns a new getcfilter message that conforms to the +// Message interface using the passed parameters and defaults for the remaining +// fields. +func NewMsgGetCFilter(blockHash *chainhash.Hash, filterType FilterType) *MsgGetCFilter { + return &MsgGetCFilter{ + BlockHash: *blockHash, + FilterType: filterType, + } +} diff --git a/wire/msggetcftypes.go b/wire/msggetcftypes.go new file mode 100644 index 0000000000..8fe89e5788 --- /dev/null +++ b/wire/msggetcftypes.go @@ -0,0 +1,58 @@ +// Copyright (c) 2017 The btcsuite developers +// Copyright (c) 2017 The Lightning Network Developers +// Copyright (c) 2018 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package wire + +import ( + "fmt" + "io" +) + +// MsgGetCFTypes is the getcftypes message. +type MsgGetCFTypes struct{} + +// BtcDecode decodes the receiver from w using the wire protocol encoding. +// This is part of the Message interface implementation. +func (msg *MsgGetCFTypes) BtcDecode(r io.Reader, pver uint32) error { + if pver < NodeCFVersion { + str := fmt.Sprintf("getcftypes message invalid for protocol "+ + "version %d", pver) + return messageError("MsgGetCFTypes.BtcDecode", str) + } + + return nil +} + +// BtcEncode encodes the receiver to w using the wire protocol encoding. This is +// part of the Message interface implementation. +func (msg *MsgGetCFTypes) BtcEncode(w io.Writer, pver uint32) error { + if pver < NodeCFVersion { + str := fmt.Sprintf("getcftypes message invalid for protocol "+ + "version %d", pver) + return messageError("MsgGetCFTypes.BtcEncode", str) + } + + return nil +} + +// Command returns the protocol command string for the message. This is part +// of the Message interface implementation. +func (msg *MsgGetCFTypes) Command() string { + return CmdGetCFTypes +} + +// MaxPayloadLength returns the maximum length the payload can be for the +// receiver. This is part of the Message interface implementation. +func (msg *MsgGetCFTypes) MaxPayloadLength(pver uint32) uint32 { + // Empty message. + return 0 +} + +// NewMsgGetCFTypes returns a new getcftypes message that conforms to the +// Message interface. +func NewMsgGetCFTypes() *MsgGetCFTypes { + return &MsgGetCFTypes{} +} diff --git a/wire/protocol.go b/wire/protocol.go index fa167a64a2..6500697658 100644 --- a/wire/protocol.go +++ b/wire/protocol.go @@ -17,9 +17,9 @@ const ( InitialProcotolVersion uint32 = 1 // ProtocolVersion is the latest protocol version this package supports. - ProtocolVersion uint32 = 5 + ProtocolVersion uint32 = 6 - // Node BloomVersion is the protocol version which added the SFNodeBloom + // NodeBloomVersion is the protocol version which added the SFNodeBloom // service flag. NodeBloomVersion uint32 = 2 @@ -34,6 +34,11 @@ const ( // FeeFilterVersion is the protocol version which added a new // feefilter message. FeeFilterVersion uint32 = 5 + + // NodeCFVersion is the protocol version which adds the SFNodeCF service + // flag and the cfheaders, cfilter, cftypes, getcfheaders, getcfilter and + // getcftypes messages. + NodeCFVersion uint32 = 6 ) // ServiceFlag identifies services supported by a Decred peer. @@ -46,12 +51,17 @@ const ( // SFNodeBloom is a flag used to indiciate a peer supports bloom // filtering. SFNodeBloom + + // SFNodeCF is a flag used to indicate a peer supports committed + // filters (CFs). + SFNodeCF ) // Map of service flags back to their constant names for pretty printing. var sfStrings = map[ServiceFlag]string{ SFNodeNetwork: "SFNodeNetwork", SFNodeBloom: "SFNodeBloom", + SFNodeCF: "SFNodeCF", } // orderedSFStrings is an ordered list of service flags from highest to @@ -59,6 +69,7 @@ var sfStrings = map[ServiceFlag]string{ var orderedSFStrings = []ServiceFlag{ SFNodeNetwork, SFNodeBloom, + SFNodeCF, } // String returns the ServiceFlag in human-readable form. diff --git a/wire/protocol_test.go b/wire/protocol_test.go index 811e588284..14b252f0e3 100644 --- a/wire/protocol_test.go +++ b/wire/protocol_test.go @@ -16,7 +16,8 @@ func TestServiceFlagStringer(t *testing.T) { {0, "0x0"}, {SFNodeNetwork, "SFNodeNetwork"}, {SFNodeBloom, "SFNodeBloom"}, - {0xffffffff, "SFNodeNetwork|SFNodeBloom|0xfffffffc"}, + {SFNodeCF, "SFNodeCF"}, + {0xffffffff, "SFNodeNetwork|SFNodeBloom|SFNodeCF|0xfffffff8"}, } t.Logf("Running %d tests", len(tests))