diff --git a/README.md b/README.md index b50402dbdc5..6c7e362ca1c 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,7 @@ -[](https://maven-badges.herokuapp.com/maven-central/io.vitess/vitess-jdbc) -[](https://travis-ci.org/youtube/vitess/builds) -[](https://codebeat.co/projects/github-com-youtube-vitess) -[](https://goreportcard.com/report/github.com/youtube/vitess) -# Vitess +[](https://travis-ci.org/youtube/vitess/builds) + +# Vitess (Slack Fork) Vitess is a database clustering system for horizontal scaling of MySQL through generalized sharding. diff --git a/go/vt/servenv/grpc_server.go b/go/vt/servenv/grpc_server.go index 34a8eabe753..e839195663b 100644 --- a/go/vt/servenv/grpc_server.go +++ b/go/vt/servenv/grpc_server.go @@ -57,10 +57,6 @@ var ( // GRPCCA is the CA to use if TLS is enabled GRPCCA *string - // GRPCMaxMessageSize is the maximum message size which the gRPC server will - // accept. Larger messages will be rejected. - GRPCMaxMessageSize *int - // GRPCServer is the global server to serve gRPC. GRPCServer *grpc.Server @@ -114,9 +110,10 @@ func createGRPCServer() { // grpc: received message length XXXXXXX exceeding the max size 4194304 // Note: For gRPC 1.0.0 it's sufficient to set the limit on the server only // because it's not enforced on the client side. - if GRPCMaxMessageSize != nil { - opts = append(opts, grpc.MaxRecvMsgSize(*GRPCMaxMessageSize)) - opts = append(opts, grpc.MaxSendMsgSize(*GRPCMaxMessageSize)) + if grpcutils.MaxMessageSize != nil { + log.Infof("Setting grpc max message size to %d", *grpcutils.MaxMessageSize) + opts = append(opts, grpc.MaxRecvMsgSize(*grpcutils.MaxMessageSize)) + opts = append(opts, grpc.MaxSendMsgSize(*grpcutils.MaxMessageSize)) } if GRPCMaxConnectionAge != nil { @@ -161,12 +158,11 @@ func RegisterGRPCFlags() { GRPCCert = flag.String("grpc_cert", "", "certificate to use, requires grpc_key, enables TLS") GRPCKey = flag.String("grpc_key", "", "key to use, requires grpc_cert, enables TLS") GRPCCA = flag.String("grpc_ca", "", "ca to use, requires TLS, and enforces client cert check") - // Note: We're using 4 MiB as default value because that's the default in the - // gRPC 1.0.0 Go server. - GRPCMaxMessageSize = flag.Int("grpc_max_message_size", 4*1024*1024, "Maximum allowed RPC message size. Larger messages will be rejected by gRPC with the error 'exceeding the max size'.") // Default is effectively infinity, as defined in grpc. GRPCMaxConnectionAge = flag.Duration("grpc_max_connection_age", time.Duration(math.MaxInt64), "Maximum age of a client connection before GoAway is sent.") GRPCMaxConnectionAgeGrace = flag.Duration("grpc_max_connection_age_grace", time.Duration(math.MaxInt64), "Additional grace period after grpc_max_connection_age, after which connections are forcibly closed.") + + grpcutils.RegisterFlags() } // GRPCCheckServiceMap returns if we should register a gRPC service diff --git a/go/vt/servenv/grpcutils/options.go b/go/vt/servenv/grpcutils/options.go new file mode 100644 index 00000000000..95ae7644cf8 --- /dev/null +++ b/go/vt/servenv/grpcutils/options.go @@ -0,0 +1,35 @@ +/* +Copyright 2017 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreedto in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpcutils + +import ( + "flag" +) + +var ( + defaultMaxMessageSize = 4 * 1024 * 1024 + // MaxMessageSize is the maximum message size which the gRPC server will + // accept. Larger messages will be rejected. + MaxMessageSize = &defaultMaxMessageSize +) + +// RegisterFlags registers the command line flags for common grpc options +func RegisterFlags() { + // Note: We're using 4 MiB as default value because that's the default in the + // gRPC 1.0.0 Go server. + MaxMessageSize = flag.Int("grpc_max_message_size", defaultMaxMessageSize, "Maximum allowed RPC message size. Larger messages will be rejected by gRPC with the error 'exceeding the max size'.") +} diff --git a/go/vt/vtgate/executor_dml_test.go b/go/vt/vtgate/executor_dml_test.go index aea2ac08f75..ff8048f3529 100644 --- a/go/vt/vtgate/executor_dml_test.go +++ b/go/vt/vtgate/executor_dml_test.go @@ -346,7 +346,7 @@ func TestInsertSharded(t *testing.T) { t.Errorf("sbc2.Queries: %+v, want nil\n", sbc2.Queries) } wantQueries = []*querypb.BoundQuery{{ - Sql: "insert into name_user_map(name, user_id) values (:name0, :user_id0)", + Sql: "insert ignore into name_user_map(name, user_id) values (:name0, :user_id0)", BindVariables: map[string]*querypb.BindVariable{ "name0": sqltypes.BytesBindVariable([]byte("myname")), "user_id0": sqltypes.Uint64BindVariable(1), @@ -377,7 +377,7 @@ func TestInsertSharded(t *testing.T) { t.Errorf("sbc1.Queries: %+v, want nil\n", sbc1.Queries) } wantQueries = []*querypb.BoundQuery{{ - Sql: "insert into name_user_map(name, user_id) values (:name0, :user_id0)", + Sql: "insert ignore into name_user_map(name, user_id) values (:name0, :user_id0)", BindVariables: map[string]*querypb.BindVariable{ "name0": sqltypes.BytesBindVariable([]byte("myname2")), "user_id0": sqltypes.Uint64BindVariable(3), @@ -410,7 +410,7 @@ func TestInsertComments(t *testing.T) { t.Errorf("sbc2.Queries: %+v, want nil\n", sbc2.Queries) } wantQueries = []*querypb.BoundQuery{{ - Sql: "insert into name_user_map(name, user_id) values (:name0, :user_id0) /* trailing */", + Sql: "insert ignore into name_user_map(name, user_id) values (:name0, :user_id0) /* trailing */", BindVariables: map[string]*querypb.BindVariable{ "name0": sqltypes.BytesBindVariable([]byte("myname")), "user_id0": sqltypes.Uint64BindVariable(1), @@ -450,7 +450,7 @@ func TestInsertGeneratorSharded(t *testing.T) { Sql: "select next :n values from user_seq", BindVariables: map[string]*querypb.BindVariable{"n": sqltypes.Int64BindVariable(1)}, }, { - Sql: "insert into name_user_map(name, user_id) values (:name0, :user_id0)", + Sql: "insert ignore into name_user_map(name, user_id) values (:name0, :user_id0)", BindVariables: map[string]*querypb.BindVariable{ "name0": sqltypes.BytesBindVariable([]byte("myname")), "user_id0": sqltypes.Uint64BindVariable(1), @@ -570,7 +570,7 @@ func TestInsertLookupOwned(t *testing.T) { t.Errorf("sbc.Queries:\n%+v, want\n%+v\n", sbc.Queries, wantQueries) } wantQueries = []*querypb.BoundQuery{{ - Sql: "insert into music_user_map(music_id, user_id) values (:music_id0, :user_id0)", + Sql: "insert ignore into music_user_map(music_id, user_id) values (:music_id0, :user_id0)", BindVariables: map[string]*querypb.BindVariable{ "music_id0": sqltypes.Int64BindVariable(3), "user_id0": sqltypes.Uint64BindVariable(2), @@ -610,7 +610,7 @@ func TestInsertLookupOwnedGenerator(t *testing.T) { Sql: "select next :n values from user_seq", BindVariables: map[string]*querypb.BindVariable{"n": sqltypes.Int64BindVariable(1)}, }, { - Sql: "insert into music_user_map(music_id, user_id) values (:music_id0, :user_id0)", + Sql: "insert ignore into music_user_map(music_id, user_id) values (:music_id0, :user_id0)", BindVariables: map[string]*querypb.BindVariable{ "music_id0": sqltypes.Int64BindVariable(4), "user_id0": sqltypes.Uint64BindVariable(2), @@ -879,7 +879,7 @@ func TestMultiInsertSharded(t *testing.T) { } wantQueries1 = []*querypb.BoundQuery{{ - Sql: "insert into name_user_map(name, user_id) values (:name0, :user_id0), (:name1, :user_id1)", + Sql: "insert ignore into name_user_map(name, user_id) values (:name0, :user_id0), (:name1, :user_id1)", BindVariables: map[string]*querypb.BindVariable{ "name0": sqltypes.BytesBindVariable([]byte("myname1")), "user_id0": sqltypes.Uint64BindVariable(1), @@ -917,7 +917,7 @@ func TestMultiInsertSharded(t *testing.T) { t.Errorf("sbc2.Queries: %+v, want nil\n", sbc2.Queries) } wantQueries = []*querypb.BoundQuery{{ - Sql: "insert into name_user_map(name, user_id) values (:name0, :user_id0), (:name1, :user_id1)", + Sql: "insert ignore into name_user_map(name, user_id) values (:name0, :user_id0), (:name1, :user_id1)", BindVariables: map[string]*querypb.BindVariable{ "name0": sqltypes.BytesBindVariable([]byte("myname1")), "user_id0": sqltypes.Uint64BindVariable(1), @@ -963,7 +963,7 @@ func TestMultiInsertGenerator(t *testing.T) { Sql: "select next :n values from user_seq", BindVariables: map[string]*querypb.BindVariable{"n": sqltypes.Int64BindVariable(2)}, }, { - Sql: "insert into music_user_map(music_id, user_id) values (:music_id0, :user_id0), (:music_id1, :user_id1)", + Sql: "insert ignore into music_user_map(music_id, user_id) values (:music_id0, :user_id0), (:music_id1, :user_id1)", BindVariables: map[string]*querypb.BindVariable{ "user_id0": sqltypes.Uint64BindVariable(2), "music_id0": sqltypes.Int64BindVariable(1), @@ -1017,7 +1017,7 @@ func TestMultiInsertGeneratorSparse(t *testing.T) { Sql: "select next :n values from user_seq", BindVariables: map[string]*querypb.BindVariable{"n": sqltypes.Int64BindVariable(2)}, }, { - Sql: "insert into music_user_map(music_id, user_id) values (:music_id0, :user_id0), (:music_id1, :user_id1), (:music_id2, :user_id2)", + Sql: "insert ignore into music_user_map(music_id, user_id) values (:music_id0, :user_id0), (:music_id1, :user_id1), (:music_id2, :user_id2)", BindVariables: map[string]*querypb.BindVariable{ "user_id0": sqltypes.Uint64BindVariable(2), "music_id0": sqltypes.Int64BindVariable(1), diff --git a/go/vt/vtgate/grpcvtgateconn/conn.go b/go/vt/vtgate/grpcvtgateconn/conn.go index e5e56507b8d..e4d3f0f92dc 100644 --- a/go/vt/vtgate/grpcvtgateconn/conn.go +++ b/go/vt/vtgate/grpcvtgateconn/conn.go @@ -58,7 +58,7 @@ func dial(ctx context.Context, addr string, timeout time.Duration) (vtgateconn.I if err != nil { return nil, err } - cc, err := grpc.Dial(addr, opt, grpc.WithBlock(), grpc.WithTimeout(timeout)) + cc, err := grpc.Dial(addr, opt, grpc.WithBlock(), grpc.WithTimeout(timeout), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*grpcutils.MaxMessageSize), grpc.MaxCallSendMsgSize(*grpcutils.MaxMessageSize))) if err != nil { return nil, err } diff --git a/go/vt/vtgate/vindexes/lookup_hash_test.go b/go/vt/vtgate/vindexes/lookup_hash_test.go index ce37c17520a..b8e4f7baba7 100644 --- a/go/vt/vtgate/vindexes/lookup_hash_test.go +++ b/go/vt/vtgate/vindexes/lookup_hash_test.go @@ -137,7 +137,7 @@ func TestLookupHashCreate(t *testing.T) { t.Error(err) } wantQuery := &querypb.BoundQuery{ - Sql: "insert into t(fromc,toc) values(:fromc0,:toc0)", + Sql: "insert ignore into t(fromc,toc) values(:fromc0,:toc0)", BindVariables: map[string]*querypb.BindVariable{ "fromc0": sqltypes.Int64BindVariable(1), "toc0": sqltypes.Uint64BindVariable(1), diff --git a/go/vt/vtgate/vindexes/lookup_hash_unique_test.go b/go/vt/vtgate/vindexes/lookup_hash_unique_test.go index 9a1c8a1168e..b68e1096b5a 100644 --- a/go/vt/vtgate/vindexes/lookup_hash_unique_test.go +++ b/go/vt/vtgate/vindexes/lookup_hash_unique_test.go @@ -74,7 +74,7 @@ func TestLookupHashUniqueCreate(t *testing.T) { t.Error(err) } wantQuery := &querypb.BoundQuery{ - Sql: "insert into t(fromc,toc) values(:fromc0,:toc0)", + Sql: "insert ignore into t(fromc,toc) values(:fromc0,:toc0)", BindVariables: map[string]*querypb.BindVariable{ "fromc0": sqltypes.Int64BindVariable(1), "toc0": sqltypes.Uint64BindVariable(1), diff --git a/go/vt/vtgate/vindexes/lookup_internal.go b/go/vt/vtgate/vindexes/lookup_internal.go index 97c76485b96..1ec88f0d39d 100644 --- a/go/vt/vtgate/vindexes/lookup_internal.go +++ b/go/vt/vtgate/vindexes/lookup_internal.go @@ -45,7 +45,7 @@ func (lkp *lookup) Init(lookupQueryParams map[string]string, isHashed bool) { lkp.To = toCol lkp.sel = fmt.Sprintf("select %s from %s where %s = :%s", toCol, table, fromCol, fromCol) lkp.ver = fmt.Sprintf("select %s from %s where %s = :%s and %s = :%s", fromCol, table, fromCol, fromCol, toCol, toCol) - lkp.ins = fmt.Sprintf("insert into %s(%s, %s) values", table, fromCol, toCol) + lkp.ins = fmt.Sprintf("insert ignore into %s(%s, %s) values", table, fromCol, toCol) lkp.del = fmt.Sprintf("delete from %s where %s = :%s and %s = :%s", table, fromCol, fromCol, toCol, toCol) lkp.isHashedIndex = isHashed } @@ -158,7 +158,7 @@ func (lkp *lookup) Create(vcursor VCursor, ids []sqltypes.Value, ksids [][]byte) if len(ids) != len(ksids) { return fmt.Errorf("lookup.Create:length of ids %v doesn't match length of ksids %v", len(ids), len(ksids)) } - insBuffer.WriteString("insert into ") + insBuffer.WriteString("insert ignore into ") insBuffer.WriteString(lkp.Table) insBuffer.WriteString("(") insBuffer.WriteString(lkp.From) diff --git a/go/vt/vtgate/vindexes/lookup_test.go b/go/vt/vtgate/vindexes/lookup_test.go index 6edd5d8508c..e4c1661907d 100644 --- a/go/vt/vtgate/vindexes/lookup_test.go +++ b/go/vt/vtgate/vindexes/lookup_test.go @@ -123,7 +123,7 @@ func TestLookupUniqueCreate(t *testing.T) { t.Error(err) } wantQuery := &querypb.BoundQuery{ - Sql: "insert into t(fromc,toc) values(:fromc0,:toc0)", + Sql: "insert ignore into t(fromc,toc) values(:fromc0,:toc0)", BindVariables: map[string]*querypb.BindVariable{ "fromc0": sqltypes.Int64BindVariable(1), "toc0": sqltypes.BytesBindVariable([]byte("test")), @@ -221,7 +221,7 @@ func TestLookupNonUniqueCreate(t *testing.T) { t.Error(err) } wantQuery := &querypb.BoundQuery{ - Sql: "insert into t(fromc,toc) values(:fromc0,:toc0)", + Sql: "insert ignore into t(fromc,toc) values(:fromc0,:toc0)", BindVariables: map[string]*querypb.BindVariable{ "fromc0": sqltypes.Int64BindVariable(1), "toc0": sqltypes.BytesBindVariable([]byte("test")), diff --git a/go/vt/vttablet/grpctabletconn/conn.go b/go/vt/vttablet/grpctabletconn/conn.go index c87319c6c23..4e9d885b7b7 100644 --- a/go/vt/vttablet/grpctabletconn/conn.go +++ b/go/vt/vttablet/grpctabletconn/conn.go @@ -77,6 +77,7 @@ func DialTablet(tablet *topodatapb.Tablet, timeout time.Duration) (queryservice. if timeout > 0 { opts = append(opts, grpc.WithBlock(), grpc.WithTimeout(timeout)) } + opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*grpcutils.MaxMessageSize), grpc.MaxCallSendMsgSize(*grpcutils.MaxMessageSize))) cc, err := grpc.Dial(addr, opts...) if err != nil { return nil, err diff --git a/go/vt/vttablet/tabletserver/txlogz.go b/go/vt/vttablet/tabletserver/txlogz.go index 02cafe21422..f4493779e0b 100644 --- a/go/vt/vttablet/tabletserver/txlogz.go +++ b/go/vt/vttablet/tabletserver/txlogz.go @@ -85,6 +85,17 @@ func txlogzHandler(w http.ResponseWriter, req *http.Request) { return } + io.WriteString(w, ` + + +
+/txlogz has been redacted for your protection
+ + + `) + return; + timeout, limit := parseTimeoutLimitParams(req) ch := tabletenv.TxLogger.Subscribe("txlogz") defer tabletenv.TxLogger.Unsubscribe(ch) diff --git a/go/vt/vttablet/tabletserver/txlogz_test.go b/go/vt/vttablet/tabletserver/txlogz_test.go index 4290731a33e..ce743bb4bea 100644 --- a/go/vt/vttablet/tabletserver/txlogz_test.go +++ b/go/vt/vttablet/tabletserver/txlogz_test.go @@ -32,6 +32,14 @@ func testHandler(req *http.Request, t *testing.T) { response := httptest.NewRecorder() tabletenv.TxLogger.Send("test msg") txlogzHandler(response, req) + + if !strings.Contains(response.Body.String(), "Redacted") { + t.Fatalf("should have been redacted") + } + + // skip the rest of the test since it is now always redacted + return + if !strings.Contains(response.Body.String(), "error") { t.Fatalf("should show an error page since transaction log format is invalid.") }