Skip to content

Commit 7b88642

Browse files
craig[bot]Jason Chanajwernerjaylim-crlaayushshah15
committed
82555: sql: fix CREATE TABLE LIKE with implicit pk r=jasonmchan a=jasonmchan Previously, `CREATE TABLE LIKE` copied implicitly created columns (e.g. for the rowid default primary key and hash sharded index). Defaults for some of these columns were not properly copied over in some cases, causing unexpected constraint violations to surface. This commit fixes this by skipping copying such columns; instead, they will be freshly created. Followup work is needed for REGIONAL BY ROW. Fixes #82401 Release note: None 82569: sql/schemachanger/rel,scplan/rules: add support for rules, _; adopt r=ajwerner a=ajwerner The first commit extends the `rel` language with support for rules and `_` and adopts it for the dep rules. The second commit contains further cleanup and adopts in op rules. Release note: None 82652: ccl/sqlproxyccl: fix inaccurate CurConnCount metric due to goroutine leak r=JeffSwenson a=jaylim-crl Previously, there was a possibility where a processor can return from resuming because the client's connection was closed _before_ waitResumed even has the chance to wake up to check on the resumed field. When that happens, the connection goroutine will be blocked forever, and the CurConnCount metric will never be decremented, even if the connection has already been terminated. When the client's connection was closed, the forwarder's context will be cancelled as well. The ideal behavior would be to terminate all waiters when that happens, but the current code does not do that. This commit fixes that issue by adding a new closed state to the processors, and ensuring that the processor is closed whenever resume returns with an error. waitResumed can then check on this state before going back to wait. Release note: None 82683: server: don't re-run node decommissioning callback r=aayushshah15 a=aayushshah15 This commit fixes a bug from #80993. Without this commit, nodes might re-run the callback to enqueue a decommissioning node's ranges into their replicate queues if they received a gossip update from that decommissioning node that was perceived to be newer. Re-running this callback on every newer gossip update from a decommissioning node will be too expensive for nodes with a lot of replicas. Release note: None Co-authored-by: Jason Chan <[email protected]> Co-authored-by: Andrew Werner <[email protected]> Co-authored-by: Jay <[email protected]> Co-authored-by: Aayush Shah <[email protected]>
5 parents e8aeb0e + 19ce4ec + 2461c08 + 5a4517f + 30795be commit 7b88642

40 files changed

+1603
-1221
lines changed

pkg/ccl/sqlproxyccl/forwarder.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@ func (f *forwarder) Context() context.Context {
202202
//
203203
// Close implements the balancer.ConnectionHandle interface.
204204
func (f *forwarder) Close() {
205+
// Cancelling the forwarder's context and connections will automatically
206+
// cause the processors to exit, and close themselves.
205207
f.ctxCancel()
206208

207209
// Whenever Close is called while both of the processors are suspended, the
@@ -389,7 +391,10 @@ func makeLogicalClockFn() func() uint64 {
389391
// cancellation of dials.
390392
var aLongTimeAgo = timeutil.Unix(1, 0)
391393

392-
var errProcessorResumed = errors.New("processor has already been resumed")
394+
var (
395+
errProcessorResumed = errors.New("processor has already been resumed")
396+
errProcessorClosed = errors.New("processor has been closed")
397+
)
393398

394399
// processor must always be constructed through newProcessor.
395400
type processor struct {
@@ -402,6 +407,7 @@ type processor struct {
402407
mu struct {
403408
syncutil.Mutex
404409
cond *sync.Cond
410+
closed bool
405411
resumed bool
406412
inPeek bool
407413
suspendReq bool // Indicates that a suspend has been requested.
@@ -424,13 +430,15 @@ func newProcessor(logicalClockFn func() uint64, src, dst *interceptor.PGConn) *p
424430

425431
// resume starts the processor and blocks during the processing. When the
426432
// processing has been terminated, this returns nil if the processor can be
427-
// resumed again in the future. If an error (except errProcessorResumed) was
428-
// returned, the processor should not be resumed again, and the forwarder should
429-
// be closed.
430-
func (p *processor) resume(ctx context.Context) error {
433+
// resumed again in the future. If an error was returned, the processor should
434+
// not be resumed again, and the forwarder must be closed.
435+
func (p *processor) resume(ctx context.Context) (retErr error) {
431436
enterResume := func() error {
432437
p.mu.Lock()
433438
defer p.mu.Unlock()
439+
if p.mu.closed {
440+
return errProcessorClosed
441+
}
434442
if p.mu.resumed {
435443
return errProcessorResumed
436444
}
@@ -441,6 +449,10 @@ func (p *processor) resume(ctx context.Context) error {
441449
exitResume := func() {
442450
p.mu.Lock()
443451
defer p.mu.Unlock()
452+
// If there's an error, close the processor.
453+
if retErr != nil {
454+
p.mu.closed = true
455+
}
444456
p.mu.resumed = false
445457
p.mu.cond.Broadcast()
446458
}
@@ -495,6 +507,9 @@ func (p *processor) resume(ctx context.Context) error {
495507
}
496508

497509
if err := enterResume(); err != nil {
510+
if errors.Is(err, errProcessorResumed) {
511+
return nil
512+
}
498513
return err
499514
}
500515
defer exitResume()
@@ -524,6 +539,9 @@ func (p *processor) waitResumed(ctx context.Context) error {
524539
if ctx.Err() != nil {
525540
return ctx.Err()
526541
}
542+
if p.mu.closed {
543+
return errProcessorClosed
544+
}
527545
p.mu.cond.Wait()
528546
}
529547
return nil
@@ -536,6 +554,11 @@ func (p *processor) suspend(ctx context.Context) error {
536554
p.mu.Lock()
537555
defer p.mu.Unlock()
538556

557+
// If the processor has been closed, it cannot be suspended at all.
558+
if p.mu.closed {
559+
return errProcessorClosed
560+
}
561+
539562
defer func() {
540563
if p.mu.suspendReq {
541564
p.mu.suspendReq = false

pkg/ccl/sqlproxyccl/forwarder_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -521,12 +521,15 @@ func TestSuspendResumeProcessor(t *testing.T) {
521521
interceptor.NewPGConn(serverProxy),
522522
)
523523
require.EqualError(t, p.resume(ctx), context.Canceled.Error())
524+
p.mu.Lock()
525+
require.True(t, p.mu.closed)
526+
p.mu.Unlock()
524527

525528
// Set resumed to true to simulate suspend loop.
526529
p.mu.Lock()
527530
p.mu.resumed = true
528531
p.mu.Unlock()
529-
require.EqualError(t, p.suspend(ctx), context.Canceled.Error())
532+
require.EqualError(t, p.suspend(ctx), errProcessorClosed.Error())
530533
})
531534

532535
t.Run("wait_for_resumed", func(t *testing.T) {
@@ -586,15 +589,15 @@ func TestSuspendResumeProcessor(t *testing.T) {
586589
interceptor.NewPGConn(serverProxy),
587590
)
588591

589-
// Ensure that everything will return a resumed error except 1.
592+
// Ensure that two resume calls will return right away.
590593
errCh := make(chan error, 2)
591594
go func() { errCh <- p.resume(ctx) }()
592595
go func() { errCh <- p.resume(ctx) }()
593596
go func() { errCh <- p.resume(ctx) }()
594597
err := <-errCh
595-
require.EqualError(t, err, errProcessorResumed.Error())
598+
require.NoError(t, err)
596599
err = <-errCh
597-
require.EqualError(t, err, errProcessorResumed.Error())
600+
require.NoError(t, err)
598601

599602
// Suspend the last goroutine.
600603
err = p.waitResumed(ctx)
@@ -604,7 +607,7 @@ func TestSuspendResumeProcessor(t *testing.T) {
604607

605608
// Validate suspension.
606609
err = <-errCh
607-
require.Nil(t, err)
610+
require.NoError(t, err)
608611
p.mu.Lock()
609612
require.False(t, p.mu.resumed)
610613
require.False(t, p.mu.inPeek)
@@ -694,10 +697,7 @@ func TestSuspendResumeProcessor(t *testing.T) {
694697
// Wait until all resume calls except 1 have returned.
695698
for i := 0; i < concurrency-1; i++ {
696699
err := <-errResumeCh
697-
// If error is not nil, it has to be an already resumed error.
698-
if err != nil {
699-
require.EqualError(t, err, errProcessorResumed.Error())
700-
}
700+
require.NoError(t, err)
701701
}
702702

703703
// Wait until the last one returns. We can guarantee that this is for

pkg/ccl/sqlproxyccl/proxy_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn *proxyConn
407407
if err := f.run(fe.Conn, crdbConn); err != nil {
408408
// Don't send to the client here for the same reason below.
409409
handler.metrics.updateForError(err)
410-
return err
410+
return errors.Wrap(err, "running forwarder")
411411
}
412412

413413
// Block until an error is received, or when the stopper starts quiescing,

pkg/ccl/sqlproxyccl/proxy_handler_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1325,6 +1325,76 @@ func TestConnectionMigration(t *testing.T) {
13251325
}, 10*time.Second, 100*time.Millisecond)
13261326
}
13271327

1328+
// TestCurConnCountMetric ensures that the CurConnCount metric is accurate.
1329+
// Previously, there was a regression where the CurConnCount metric wasn't
1330+
// decremented whenever the connections were closed due to a goroutine leak.
1331+
func TestCurConnCountMetric(t *testing.T) {
1332+
defer leaktest.AfterTest(t)()
1333+
defer log.Scope(t).Close(t)
1334+
1335+
ctx := context.Background()
1336+
1337+
// Start KV server.
1338+
params, _ := tests.CreateTestServerParams()
1339+
s, _, _ := serverutils.StartServer(t, params)
1340+
defer s.Stopper().Stop(ctx)
1341+
1342+
// Start a single SQL pod.
1343+
tenantID := serverutils.TestTenantID()
1344+
tenants := startTestTenantPods(ctx, t, s, tenantID, 1)
1345+
defer func() {
1346+
for _, tenant := range tenants {
1347+
tenant.Stopper().Stop(ctx)
1348+
}
1349+
}()
1350+
1351+
// Register the SQL pod in the directory server.
1352+
tds := tenantdirsvr.NewTestStaticDirectoryServer(s.Stopper(), nil /* timeSource */)
1353+
tds.CreateTenant(tenantID, "tenant-cluster")
1354+
tds.AddPod(tenantID, &tenant.Pod{
1355+
TenantID: tenantID.ToUint64(),
1356+
Addr: tenants[0].SQLAddr(),
1357+
State: tenant.RUNNING,
1358+
StateTimestamp: timeutil.Now(),
1359+
})
1360+
require.NoError(t, tds.Start(ctx))
1361+
1362+
opts := &ProxyOptions{SkipVerify: true, DisableConnectionRebalancing: true}
1363+
opts.testingKnobs.directoryServer = tds
1364+
proxy, addr := newSecureProxyServer(ctx, t, s.Stopper(), opts)
1365+
connectionString := fmt.Sprintf("postgres://testuser:hunter2@%s/?sslmode=require&options=--cluster=tenant-cluster-%s", addr, tenantID)
1366+
1367+
// Open 500 connections to the SQL pod.
1368+
const numConns = 500
1369+
var wg sync.WaitGroup
1370+
wg.Add(numConns)
1371+
for i := 0; i < numConns; i++ {
1372+
go func() {
1373+
defer wg.Done()
1374+
1375+
// Opens a new connection, runs SELECT 1, and closes it right away.
1376+
// Ignore all connection errors.
1377+
conn, err := pgx.Connect(ctx, connectionString)
1378+
if err != nil {
1379+
return
1380+
}
1381+
_ = conn.Ping(ctx)
1382+
_ = conn.Close(ctx)
1383+
}()
1384+
}
1385+
wg.Wait()
1386+
1387+
// Ensure that the CurConnCount metric gets decremented to 0 whenever all
1388+
// the connections are closed.
1389+
testutils.SucceedsSoon(t, func() error {
1390+
val := proxy.metrics.CurConnCount.Value()
1391+
if val == 0 {
1392+
return nil
1393+
}
1394+
return errors.Newf("expected CurConnCount=0, but got %d", val)
1395+
})
1396+
}
1397+
13281398
func TestClusterNameAndTenantFromParams(t *testing.T) {
13291399
defer leaktest.AfterTest(t)()
13301400
defer log.Scope(t).Close(t)

pkg/server/admin_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2444,6 +2444,19 @@ func TestDecommissionEnqueueReplicas(t *testing.T) {
24442444

24452445
// Ensure that the scratch range's replica was proactively enqueued.
24462446
require.Equal(t, <-enqueuedRangeIDs, tc.LookupRangeOrFatal(t, scratchKey).RangeID)
2447+
2448+
// Check that the node was marked as decommissioning in each of the nodes'
2449+
// decommissioningNodeMap. This needs to be wrapped in a SucceedsSoon to
2450+
// deal with gossip propagation delays.
2451+
testutils.SucceedsSoon(t, func() error {
2452+
for i := 0; i < tc.NumServers(); i++ {
2453+
srv := tc.Server(i)
2454+
if _, exists := srv.DecommissioningNodeMap()[decommissioningSrv.NodeID()]; !exists {
2455+
return errors.Newf("node %d not detected to be decommissioning", decommissioningSrv.NodeID())
2456+
}
2457+
}
2458+
return nil
2459+
})
24472460
}
24482461

24492462
decommissionAndCheck(2 /* decommissioningSrvIdx */)

pkg/server/decommission.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ func (t *decommissioningNodeMap) makeOnNodeDecommissioningCallback(
5454
// Nothing more to do.
5555
return
5656
}
57+
t.nodes[decommissioningNodeID] = struct{}{}
5758

5859
logLimiter := log.Every(5 * time.Second) // avoid log spam
5960
if err := stores.VisitStores(func(store *kvserver.Store) error {
@@ -216,3 +217,15 @@ func (s *Server) Decommission(
216217
}
217218
return nil
218219
}
220+
221+
// DecommissioningNodeMap returns the set of node IDs that are decommissioning
222+
// from the perspective of the server.
223+
func (s *Server) DecommissioningNodeMap() map[roachpb.NodeID]interface{} {
224+
s.decomNodeMap.RLock()
225+
defer s.decomNodeMap.RUnlock()
226+
nodes := make(map[roachpb.NodeID]interface{})
227+
for key, val := range s.decomNodeMap.nodes {
228+
nodes[key] = val
229+
}
230+
return nodes
231+
}

pkg/server/server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ type Server struct {
123123
admin *adminServer
124124
status *statusServer
125125
drain *drainServer
126+
decomNodeMap *decommissioningNodeMap
126127
authentication *authenticationServer
127128
migrationServer *migrationServer
128129
tsDB *ts.DB
@@ -844,6 +845,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
844845
admin: sAdmin,
845846
status: sStatus,
846847
drain: drain,
848+
decomNodeMap: decomNodeMap,
847849
authentication: sAuth,
848850
tsDB: tsDB,
849851
tsServer: &sTS,

pkg/sql/create_table.go

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2509,18 +2509,11 @@ func replaceLikeTableOpts(n *tree.CreateTable, params runParams) (tree.TableDefs
25092509
// This is required to ensure the newly created table still works as expected
25102510
// as these columns are required for certain features to work when used
25112511
// as an index.
2512+
// TODO(#82672): We shouldn't need this. This is only still required for
2513+
// the REGIONAL BY ROW column.
25122514
shouldCopyColumnDefaultSet := make(map[string]struct{})
25132515
if opts.Has(tree.LikeTableOptIndexes) {
25142516
for _, idx := range td.NonDropIndexes() {
2515-
// Copy the rowid default if it was created implicitly by not specifying
2516-
// PRIMARY KEY.
2517-
if idx.Primary() && td.IsPrimaryIndexDefaultRowID() {
2518-
for i := 0; i < idx.NumKeyColumns(); i++ {
2519-
shouldCopyColumnDefaultSet[idx.GetKeyColumnName(i)] = struct{}{}
2520-
}
2521-
}
2522-
// Copy any implicitly created columns (e.g. hash-sharded indexes,
2523-
// REGIONAL BY ROW).
25242517
for i := 0; i < idx.ExplicitColumnStartIdx(); i++ {
25252518
for i := 0; i < idx.NumKeyColumns(); i++ {
25262519
shouldCopyColumnDefaultSet[idx.GetKeyColumnName(i)] = struct{}{}
@@ -2530,12 +2523,15 @@ func replaceLikeTableOpts(n *tree.CreateTable, params runParams) (tree.TableDefs
25302523
}
25312524

25322525
defs := make(tree.TableDefs, 0)
2533-
// Add all columns. Columns are always added.
2526+
// Add user-defined columns.
25342527
for i := range td.Columns {
25352528
c := &td.Columns[i]
2536-
if c.Inaccessible {
2537-
// Inaccessible columns automatically get added by
2538-
// the system; we don't need to add them ourselves here.
2529+
implicit, err := isImplicitlyCreatedBySystem(td, c)
2530+
if err != nil {
2531+
return nil, err
2532+
}
2533+
if implicit {
2534+
// Don't add system-created implicit columns.
25392535
continue
25402536
}
25412537
def := tree.ColumnTableDef{
@@ -2615,6 +2611,11 @@ func replaceLikeTableOpts(n *tree.CreateTable, params runParams) (tree.TableDefs
26152611
}
26162612
if opts.Has(tree.LikeTableOptIndexes) {
26172613
for _, idx := range td.NonDropIndexes() {
2614+
if idx.Primary() && td.IsPrimaryIndexDefaultRowID() {
2615+
// We won't copy over the default rowid primary index; instead
2616+
// we'll just generate a new one.
2617+
continue
2618+
}
26182619
indexDef := tree.IndexTableDef{
26192620
Name: tree.Name(idx.GetName()),
26202621
Inverted: idx.GetType() == descpb.IndexDescriptor_INVERTED,
@@ -2883,3 +2884,23 @@ func validateUniqueConstraintParamsForCreateTableAs(n *tree.CreateTable) error {
28832884
}
28842885
return nil
28852886
}
2887+
2888+
// Checks if the column was automatically added by the system (e.g. for a rowid
2889+
// primary key or hash sharded index).
2890+
func isImplicitlyCreatedBySystem(td *tabledesc.Mutable, c *descpb.ColumnDescriptor) (bool, error) {
2891+
// TODO(#82672): add check for REGIONAL BY ROW column
2892+
if td.IsPrimaryIndexDefaultRowID() && c.ID == td.GetPrimaryIndex().GetKeyColumnID(0) {
2893+
return true, nil
2894+
}
2895+
col, err := td.FindColumnWithID(c.ID)
2896+
if err != nil {
2897+
return false, err
2898+
}
2899+
if td.IsShardColumn(col) {
2900+
return true, nil
2901+
}
2902+
if c.Inaccessible {
2903+
return true, nil
2904+
}
2905+
return false, nil
2906+
}

0 commit comments

Comments
 (0)