Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add clustering.replication.max.bytes config #308

Merged
merged 1 commit into from
Dec 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions documentation/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ the configuration file.
| replica.max.idle.wait | | The maximum amount of time a follower will wait before making a replication request once the follower is caught up with the leader. This value should always be less than `replica.max.lag.time` to avoid frequent shrinking of ISR for low-throughput streams. | duration | 10s | |
| replica.fetch.timeout | | Timeout duration for follower replication requests. | duration | 3s | |
| min.insync.replicas | | Specifies the minimum number of replicas that must acknowledge a stream write before it can be committed. If the ISR drops below this size, messages cannot be committed. | int | 1 | [1,...] |
| replication.max.bytes | | The maximum payload size, in bytes, a leader can send to followers for replication messages. This controls the amount of data that can be transferred for individual replication requests. If a leader receives a published message larger than this size, it will return an ack error to the client. Because replication is done over NATS, this cannot exceed the [`max_payload`](https://docs.nats.io/nats-server/configuration#limits) limit configured on the NATS cluster. Thus, this defaults to 1MB, which is the default value for `max_payload`. This should generally be set to match the value of `max_payload`. Setting it too low will preclude the replication of messages larger than it and negatively impact performance. This value should also be the same for all servers in the cluster. | int | 1048576 | |

### Activity Configuration Settings

Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/raft v1.1.2
github.com/liftbridge-io/go-liftbridge/v2 v2.0.2-0.20201118225953-b849cccb6467
github.com/liftbridge-io/liftbridge-api v1.4.1
github.com/liftbridge-io/liftbridge-api v1.4.2-0.20201228201911-4b2d99797dbb
github.com/liftbridge-io/nats-on-a-log v0.0.0-20200818183806-bb17516cf3a3
github.com/liftbridge-io/raft-boltdb v0.0.0-20200414234651-aaf6e08d8f73
github.com/mattn/go-colorable v0.1.7 // indirect
Expand All @@ -40,8 +40,8 @@ require (
github.com/urfave/cli v1.22.4
go.etcd.io/bbolt v1.3.5 // indirect
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de // indirect
golang.org/x/net v0.0.0-20201216054612-986b41b23924 // indirect
golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e // indirect
golang.org/x/net v0.0.0-20201224014010-6772e930b67b // indirect
golang.org/x/sys v0.0.0-20201223074533-0d417f636930 // indirect
golang.org/x/text v0.3.4 // indirect
google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d // indirect
google.golang.org/grpc v1.34.0
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ github.com/liftbridge-io/go-liftbridge/v2 v2.0.2-0.20201118225953-b849cccb6467 h
github.com/liftbridge-io/go-liftbridge/v2 v2.0.2-0.20201118225953-b849cccb6467/go.mod h1:24NMu02Ba2sMO2y+IYstP1UFKVA4a6/p54Lc7KccSLc=
github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201029165056-10f2aa65f256 h1:2pZtC3v6IBTwE70xfb/k0DPlOJ6BlXpthCUWrxCnhwo=
github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201029165056-10f2aa65f256/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8=
github.com/liftbridge-io/liftbridge-api v1.4.1 h1:7LUThKH8z9Nr1Es6Arec4r5yI3JFOz166el5WFepp7A=
github.com/liftbridge-io/liftbridge-api v1.4.1/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8=
github.com/liftbridge-io/liftbridge-api v1.4.2-0.20201228201911-4b2d99797dbb h1:XdwKrJh9gUJ+kaOwYfU6Kt+mZxqOeJQ8oLFbVjpoNuA=
github.com/liftbridge-io/liftbridge-api v1.4.2-0.20201228201911-4b2d99797dbb/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8=
github.com/liftbridge-io/nats-on-a-log v0.0.0-20200818183806-bb17516cf3a3 h1:O4mg1NEmukgY8hxen3grrG5RY34LadMTzpbjf8kM2tA=
github.com/liftbridge-io/nats-on-a-log v0.0.0-20200818183806-bb17516cf3a3/go.mod h1:wmIIYVq+psahPlB1rvtTkGiltdihsKJbqwE1DkIPwj4=
github.com/liftbridge-io/raft-boltdb v0.0.0-20200414234651-aaf6e08d8f73 h1:8r/ReB1ns87pVDwSnPj87HIbOu/5y0uDyGChx9mUGSQ=
Expand Down Expand Up @@ -386,8 +386,8 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201002202402-0a1ea396d57c h1:dk0ukUIHmGHqASjP0iue2261isepFCC6XRCSd1nHgDw=
golang.org/x/net v0.0.0-20201002202402-0a1ea396d57c/go.mod h1:iQL9McJNjoIa5mjH6nYTCTZXUN6RP+XW3eib7Ya3XcI=
golang.org/x/net v0.0.0-20201216054612-986b41b23924 h1:QsnDpLLOKwHBBDa8nDws4DYNc/ryVW2vCpxCs09d4PY=
golang.org/x/net v0.0.0-20201216054612-986b41b23924/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20201224014010-6772e930b67b h1:iFwSg7t5GZmB/Q5TjiEAsdoLDrdJRC1RiF2WhuV29Qw=
golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -426,8 +426,8 @@ golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e h1:AyodaIpKjppX+cBfTASF2E1US3H2JFBj920Ot3rtDjs=
golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201223074533-0d417f636930 h1:vRgIt+nup/B/BwIS0g2oC0haq0iqbV3ZA+u6+0TlNCo=
golang.org/x/sys v0.0.0-20201223074533-0d417f636930/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
39 changes: 39 additions & 0 deletions server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ func (a *apiServer) Publish(ctx context.Context, req *client.PublishRequest) (
}

if e := a.ensureStreamNotReadonly(req.Stream, req.Partition); e != nil {
a.logger.Errorf("api: Failed to publish message: %v", e.Message)
return nil, convertPublishAsyncError(e)
}

Expand Down Expand Up @@ -315,6 +316,13 @@ func (a *apiServer) Publish(ctx context.Context, req *client.PublishRequest) (
return nil, err
}

if ack != nil {
if e := convertAckError(ack.AckError); e != nil {
a.logger.Errorf("api: Published message was rejected: %v", e.Message)
return nil, convertPublishAsyncError(e)
}
}

resp.Ack = ack
return resp, nil
}
Expand Down Expand Up @@ -800,6 +808,30 @@ func convertPublishAsyncError(err *client.PublishAsyncError) error {
return status.Error(code, err.Message)
}

func convertAckError(ackError client.Ack_Error) *client.PublishAsyncError {
var (
code client.PublishAsyncError_Code
message string
)
switch ackError {
case client.Ack_OK:
return nil
case client.Ack_INCORRECT_OFFSET:
code = client.PublishAsyncError_INCORRECT_OFFSET
message = "incorrect expected offset"
case client.Ack_TOO_LARGE:
code = client.PublishAsyncError_BAD_REQUEST
message = "message exceeds max replication size"
default:
code = client.PublishAsyncError_UNKNOWN
message = "unknown error"
}
return &client.PublishAsyncError{
Code: code,
Message: message,
}
}

// publishAsyncSession maintains state for long-lived PublishAsync RPCs.
type publishAsyncSession struct {
*apiServer
Expand Down Expand Up @@ -833,6 +865,13 @@ func (p *publishAsyncSession) dispatchAcks() error {
p.inflight = 0
}
p.mu.Unlock()

if e := convertAckError(ack.AckError); e != nil {
p.logger.Errorf("api: Published async message was rejected: %v", e.Message)
p.sendPublishAsyncError(ack.CorrelationId, e)
return
}

if err := p.stream.Send(&client.PublishResponse{CorrelationId: ack.CorrelationId, Ack: ack}); err != nil {
p.logger.Errorf("api: Failed to send PublishAsync response: %v", err)
}
Expand Down
20 changes: 20 additions & 0 deletions server/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ func TestStreamPublishSubscribe(t *testing.T) {

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1Config.Clustering.ReplicationMaxBytes = 1024
s1 := runServerWithConfig(t, s1Config)
defer s1.Stop()

Expand Down Expand Up @@ -647,6 +648,13 @@ func TestStreamPublishSubscribe(t *testing.T) {
case <-time.After(10 * time.Second):
t.Fatal("Did not receive all expected messages")
}

// Publishing a message whose size is larger than max replication size
// returns an error.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = client.Publish(ctx, name, make([]byte, 1024+1))
require.Error(t, err)
}

// Ensure legacy Publish endpoint works.
Expand All @@ -659,6 +667,7 @@ func TestLegacyPublish(t *testing.T) {

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1Config.Clustering.ReplicationMaxBytes = 1024
s1 := runServerWithConfig(t, s1Config)
defer s1.Stop()

Expand Down Expand Up @@ -708,6 +717,17 @@ func TestLegacyPublish(t *testing.T) {
})
require.NoError(t, err)
require.Nil(t, resp.Ack)

// Publishing a message whose size is larger than max replication size
// returns an error.
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = apiClient.Publish(ctx, &proto.PublishRequest{
Stream: "foo",
Partition: 1,
Value: make([]byte, 1024+1),
})
require.Error(t, err)
}

// Ensure publishing to a NATS subject works.
Expand Down
9 changes: 9 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
defaultReplicaMaxLagTime = 15 * time.Second
defaultReplicaMaxLeaderTimeout = 15 * time.Second
defaultReplicaMaxIdleWait = 10 * time.Second
defaultReplicationMaxBytes = 1024 * 1024 // 1MB
defaultRaftSnapshots = 2
defaultRaftCacheSize = 512
defaultMetadataCacheMaxAge = 2 * time.Minute
Expand Down Expand Up @@ -103,6 +104,7 @@ const (
configClusteringReplicaMaxIdleWait = "clustering.replica.max.idle.wait"
configClusteringReplicaFetchTimeout = "clustering.replica.fetch.timeout"
configClusteringMinInsyncReplicas = "clustering.min.insync.replicas"
configClusteringReplicationMaxBytes = "clustering.replication.max.bytes"

configActivityStreamEnabled = "activity.stream.enabled"
configActivityStreamPublishTimeout = "activity.stream.publish.timeout"
Expand Down Expand Up @@ -155,6 +157,7 @@ var configKeys = map[string]struct{}{
configClusteringReplicaMaxIdleWait: {},
configClusteringReplicaFetchTimeout: {},
configClusteringMinInsyncReplicas: {},
configClusteringReplicationMaxBytes: {},
configActivityStreamEnabled: {},
configActivityStreamPublishTimeout: {},
configActivityStreamPublishAckPolicy: {},
Expand Down Expand Up @@ -280,6 +283,7 @@ type ClusteringConfig struct {
ReplicaFetchTimeout time.Duration
ReplicaMaxIdleWait time.Duration
MinISR int
ReplicationMaxBytes int64
}

// ActivityStreamConfig contains settings for controlling activity stream
Expand Down Expand Up @@ -339,6 +343,7 @@ func NewDefaultConfig() *Config {
config.Clustering.RaftSnapshots = defaultRaftSnapshots
config.Clustering.RaftCacheSize = defaultRaftCacheSize
config.Clustering.MinISR = defaultMinInsyncReplicas
config.Clustering.ReplicationMaxBytes = defaultReplicationMaxBytes
config.Streams.SegmentMaxBytes = defaultMaxSegmentBytes
config.Streams.SegmentMaxAge = defaultMaxSegmentAge
config.Streams.RetentionMaxAge = defaultRetentionMaxAge
Expand Down Expand Up @@ -676,6 +681,10 @@ func parseClusteringConfig(config *Config, v *viper.Viper) error { // nolint: go
config.Clustering.MinISR = v.GetInt(configClusteringMinInsyncReplicas)
}

if v.IsSet(configClusteringReplicationMaxBytes) {
config.Clustering.ReplicationMaxBytes = v.GetInt64(configClusteringReplicationMaxBytes)
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TestNewConfigFromFile(t *testing.T) {
require.Equal(t, 2*time.Second, config.Clustering.ReplicaMaxIdleWait)
require.Equal(t, 3*time.Second, config.Clustering.ReplicaFetchTimeout)
require.Equal(t, 1, config.Clustering.MinISR)
require.Equal(t, int64(1024), config.Clustering.ReplicationMaxBytes)

require.Equal(t, true, config.ActivityStream.Enabled)
require.Equal(t, time.Minute, config.ActivityStream.PublishTimeout)
Expand Down
1 change: 1 addition & 0 deletions server/configs/full.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ clustering:
idle.wait: 2s
fetch.timeout: 3s
min.insync.replicas: '1'
replication.max.bytes: 1024

activity.stream:
enabled: true
Expand Down
4 changes: 3 additions & 1 deletion server/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ func (m *metadataAPI) fetchBrokerInfo(ctx context.Context, numPeers int) ([]*cli
if err != nil {
panic(err)
}
m.ncRaft.PublishRequest(m.getServerInfoInbox(), inbox, queryReq)
if err := m.ncRaft.PublishRequest(m.getServerInfoInbox(), inbox, queryReq); err != nil {
return nil, status.New(codes.Internal, err.Error())
}

// Gather responses.
for i := 0; i < numPeers; i++ {
Expand Down
51 changes: 48 additions & 3 deletions server/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,11 @@ func (p *partition) messageProcessingLoop(recvChan <-chan *nats.Msg, stop <-chan
p.mu.Unlock()

m := natsToProtoMessage(msg, leaderEpoch)
// Reject messages that are larger than the max replication size.
if int64(len(msg.Data)) > p.srv.config.Clustering.ReplicationMaxBytes {
p.sendTooLargeNack(m)
continue
}
msgBatch = append(msgBatch, m)
remaining := batchSize - 1

Expand All @@ -814,12 +819,18 @@ func (p *partition) messageProcessingLoop(recvChan <-chan *nats.Msg, stop <-chan
chanLen = remaining
}

added := 0
for i := 0; i < chanLen; i++ {
msg = <-recvChan
m := natsToProtoMessage(msg, leaderEpoch)
if int64(len(msg.Data)) > p.srv.config.Clustering.ReplicationMaxBytes {
p.sendTooLargeNack(m)
continue
}
msgBatch = append(msgBatch, m)
added++
}
remaining -= chanLen
remaining -= added
}

// Write uncommitted messages to log.
Expand Down Expand Up @@ -970,7 +981,38 @@ func (p *partition) sendAck(ack *client.Ack) {
if err != nil {
panic(err)
}
p.srv.ncAcks.Publish(ack.AckInbox, data)
if err := p.srv.ncAcks.Publish(ack.AckInbox, data); err != nil {
p.srv.logger.Errorf("Error sending ack for partition %s: %v", p, err)
}
}

// sendTooLargeNack publishes an ack containing an error indicating the message
// exceeded the max replication size to the specified AckInbox. If no AckInbox
// is set, this does nothing.
func (p *partition) sendTooLargeNack(msg *commitlog.Message) {
p.srv.logger.Errorf(
"Rejecting message received on partition %s that exceeds clustering.replication.max.bytes (%d)",
p, p.srv.config.Clustering.ReplicationMaxBytes)
if msg.AckInbox == "" {
return
}
ack := &client.Ack{
Stream: p.Stream,
PartitionSubject: p.Subject,
MsgSubject: string(msg.Headers["subject"]),
AckInbox: msg.AckInbox,
CorrelationId: msg.CorrelationID,
AckPolicy: msg.AckPolicy,
ReceptionTimestamp: msg.Timestamp,
AckError: client.Ack_TOO_LARGE,
}
data, err := proto.MarshalAck(ack)
if err != nil {
panic(err)
}
if err := p.srv.ncAcks.Publish(ack.AckInbox, data); err != nil {
p.srv.logger.Errorf("Error sending ack for partition %s: %v", p, err)
}
}

// replicationRequestLoop is a long-running loop which sends replication
Expand Down Expand Up @@ -1323,7 +1365,10 @@ func (p *partition) sendPartitionNotification(replica string) {
if err != nil {
panic(err)
}
p.srv.ncRepl.Publish(p.srv.getPartitionNotificationInbox(replica), req)
if err := p.srv.ncRepl.Publish(p.srv.getPartitionNotificationInbox(replica), req); err != nil {
p.srv.logger.Errorf("Error sending new data notification to replica %s for partition %s: %v",
replica, p, err)
}
}

// pauseReplication stops replication on the leader. This is for unit testing
Expand Down
4 changes: 3 additions & 1 deletion server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,9 @@ func (s *Server) detectBootstrapMisconfig() {
case <-s.shutdownCh:
return
case <-ticker.C:
s.ncRaft.PublishRequest(subj, inbox, srvID)
if err := s.ncRaft.PublishRequest(subj, inbox, srvID); err != nil {
s.logger.Errorf("Error publishing bootstrap misconfiguration detection message: %v", err)
}
}
}
}
Expand Down
18 changes: 6 additions & 12 deletions server/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,9 @@ import (
proto "github.com/liftbridge-io/liftbridge/server/protocol"
)

const (
// replicationMaxSize is the max payload size to send in replication
// messages to followers. The default NATS max message size is 1MB, so
// we'll use that.
replicationMaxSize = 1024 * 1024

// replicationOverhead is the non-data size overhead of replication
// messages: 8 bytes for the leader epoch and 8 bytes for the HW.
replicationOverhead = 16
)
// replicationOverhead is the non-data size overhead of replication messages: 8
// bytes for the leader epoch and 8 bytes for the HW.
const replicationOverhead = 16

// replicationRequest wraps a ReplicationRequest protobuf and a NATS subject
// where responses should be sent.
Expand Down Expand Up @@ -231,7 +224,7 @@ func (r *replicator) replicate(
message commitlog.SerializedMessage
err error
)
for offset < newestOffset && r.writer.Len() < replicationMaxSize {
for offset < newestOffset && int64(r.writer.Len()) < r.partition.srv.config.Clustering.ReplicationMaxBytes {
message, offset, _, _, err = reader.ReadMessage(ctx, r.headersBuf[:])
if err != nil {
r.partition.srv.logger.Errorf("Failed to read message while replicating: %v", err)
Expand All @@ -240,7 +233,8 @@ func (r *replicator) replicate(

// Check if this message will put us over the batch size limit. If it
// does, flush the batch now.
if uint32(len(message))+uint32(len(r.headersBuf))+uint32(r.writer.Len()) > replicationMaxSize {
batchSize := int64(len(message)) + int64(len(r.headersBuf)) + int64(r.writer.Len())
if batchSize > r.partition.srv.config.Clustering.ReplicationMaxBytes {
break
}

Expand Down
Loading