diff --git a/server/backend/backend.go b/server/backend/backend.go index 04d2eb241..67892e7f0 100644 --- a/server/backend/backend.go +++ b/server/backend/backend.go @@ -228,6 +228,12 @@ func (b *Backend) Shutdown() error { b.EventWebhookManager.Close() b.ClusterClient.Close() + if mongoClient, ok := b.DB.(*mongo.Client); ok { + if err := mongoClient.CloseClientInfoCache(); err != nil { + logging.DefaultLogger().Error("close client info cache: " + err.Error()) + } + } + if err := b.MsgBroker.Close(); err != nil { logging.DefaultLogger().Error(err) } diff --git a/server/backend/database/database.go b/server/backend/database/database.go index b9fb72da5..c67baa195 100644 --- a/server/backend/database/database.go +++ b/server/backend/database/database.go @@ -186,6 +186,10 @@ type Database interface { // DeactivateClient deactivates the client of the given refKey. DeactivateClient(ctx context.Context, refKey types.ClientRefKey) (*ClientInfo, error) + // DeactivateClientForHousekeeping deactivates the client for housekeeping purposes. + // This method bypasses cache and directly updates the database. + DeactivateClientForHousekeeping(ctx context.Context, refKey types.ClientRefKey) (*ClientInfo, error) + // TryAttaching updates the status of the document to Attaching to prevent // deactivating the client while the document is being attached. TryAttaching(ctx context.Context, refKey types.ClientRefKey, docID types.ID) (*ClientInfo, error) diff --git a/server/backend/database/memory/database.go b/server/backend/database/memory/database.go index 295e51f5a..f4513625e 100644 --- a/server/backend/database/memory/database.go +++ b/server/backend/database/memory/database.go @@ -912,6 +912,55 @@ func (d *DB) DeactivateClient(_ context.Context, refKey types.ClientRefKey) (*da return clientInfo, nil } +// DeactivateClientForHousekeeping deactivates the client for housekeeping purposes. +// This method bypasses any in-memory cache and directly updates the database. +// Returns (nil, nil) if the client does not exist, is not in the given project, +// or is already deactivated (i.e., no state change was performed). +func (d *DB) DeactivateClientForHousekeeping( + _ context.Context, + refKey types.ClientRefKey, +) (*database.ClientInfo, error) { + if err := refKey.ClientID.Validate(); err != nil { + return nil, err + } + + txn := d.db.Txn(true) + defer txn.Abort() + + raw, err := txn.First(tblClients, "id", refKey.ClientID.String()) + if err != nil { + return nil, fmt.Errorf("find client by id: %w", err) + } + + // Client not found, return nil to indicate no action needed + if raw == nil { + return nil, nil + } + + // Client not in project, return nil to indicate no action needed + clientInfo := raw.(*database.ClientInfo).DeepCopy() + if err := clientInfo.CheckIfInProject(refKey.ProjectID); err != nil { + return nil, nil + } + + // Only deactivate if currently activated + if clientInfo.Status != database.ClientActivated { + return nil, nil + } + + // clientInfo.Status = database.ClientDeactivated + // clientInfo.UpdatedAt = gotime.Now() + + clientInfo.Deactivate() + + if err := txn.Insert(tblClients, clientInfo); err != nil { + return nil, fmt.Errorf("update client: %w", err) + } + txn.Commit() + + return clientInfo, nil +} + // FindClientInfoByRefKey finds a client by the given refKey. func (d *DB) FindClientInfoByRefKey(_ context.Context, refKey types.ClientRefKey) (*database.ClientInfo, error) { if err := refKey.ClientID.Validate(); err != nil { diff --git a/server/backend/database/mongo/client.go b/server/backend/database/mongo/client.go index 370d55de6..7585d58e6 100644 --- a/server/backend/database/mongo/client.go +++ b/server/backend/database/mongo/client.go @@ -21,6 +21,7 @@ import ( "bytes" "context" "fmt" + "sort" "strings" gotime "time" @@ -51,9 +52,10 @@ type Client struct { config *Config client *mongo.Client - docCache *lru.Cache[types.DocRefKey, *database.DocInfo] - changeCache *lru.Cache[types.DocRefKey, *ChangeStore] - vectorCache *lru.Cache[types.DocRefKey, *cmap.Map[types.ID, time.VersionVector]] + docCache *lru.Cache[types.DocRefKey, *database.DocInfo] + changeCache *lru.Cache[types.DocRefKey, *ChangeStore] + vectorCache *lru.Cache[types.DocRefKey, *cmap.Map[types.ID, time.VersionVector]] + clientInfoCache *ClientInfoCache } // Dial creates an instance of Client and dials the given MongoDB. @@ -115,17 +117,25 @@ func Dial(conf *Config) (*Client, error) { logging.DefaultLogger().Infof("MongoDB connected, URI: %s, DB: %s", conf.ConnectionURI, conf.YorkieDatabase) - return &Client{ + clientInstance := &Client{ config: conf, client: client, docCache: docCache, changeCache: changeCache, vectorCache: vectorCache, - }, nil + } + + // Create client info cache with client reference + clientInstance.clientInfoCache = NewClientInfoCache(nil, clientInstance) + + return clientInstance, nil } // Close all resources of this client. func (c *Client) Close() error { + // Note: clientInfoCache is closed separately in Backend.Shutdown() + // to ensure it's closed before DB disconnection + if err := c.client.Disconnect(context.Background()); err != nil { return fmt.Errorf("close MongoDB client: %w", err) } @@ -136,6 +146,14 @@ func (c *Client) Close() error { return nil } +// CloseClientInfoCache closes the client info cache. +func (c *Client) CloseClientInfoCache() error { + if c.clientInfoCache != nil { + return c.clientInfoCache.Close() + } + return nil +} + // TryLeadership attempts to acquire or renew leadership with the given lease duration. // If leaseToken is empty, it attempts to acquire new leadership. // If leaseToken is provided, it attempts to renew the existing lease. @@ -788,14 +806,16 @@ func (c *Client) ActivateClient( metadata map[string]string, ) (*database.ClientInfo, error) { now := gotime.Now() + + c.clientInfoCache.recordActivateClientMiss() res, err := c.collection(ColClients).UpdateOne(ctx, bson.M{ "project_id": projectID, "key": key, - "metadata": metadata, }, bson.M{ "$set": bson.M{ StatusKey: database.ClientActivated, "updated_at": now, + "metadata": metadata, }, }, options.UpdateOne().SetUpsert(true)) if err != nil { @@ -824,6 +844,10 @@ func (c *Client) ActivateClient( return nil, fmt.Errorf("activate client of %s: %w", key, err) } + if err := c.clientInfoCache.Set(clientInfo.RefKey(), &clientInfo); err != nil { + return nil, fmt.Errorf("set client info in cache: %w", err) + } + return &clientInfo, nil } @@ -834,7 +858,48 @@ func (c *Client) TryAttaching( refKey types.ClientRefKey, docID types.ID, ) (*database.ClientInfo, error) { + // 1. Check cache first for latest client info + if cached := c.clientInfoCache.Get(refKey); cached != nil { + c.clientInfoCache.recordTryAttachingHit() + + // Check if client is activated + if cached.Status != database.ClientActivated { + return nil, fmt.Errorf("try to attach document: %w", database.ErrClientNotFound) + } + + // Check if document is already attached + if docInfo, exists := cached.Documents[docID]; exists && docInfo.Status == database.DocumentAttached { + return nil, fmt.Errorf("conditions not satisfied to attach document: %w", database.ErrClientNotFound) + } + + // Update document status in cache and DB + updatedClientInfo := cached.DeepCopy() + if updatedClientInfo.Documents == nil { + updatedClientInfo.Documents = make(map[types.ID]*database.ClientDocInfo) + } + updatedClientInfo.Documents[docID] = &database.ClientDocInfo{ + Status: database.DocumentAttaching, + ServerSeq: 0, + ClientSeq: 0, + } + updatedClientInfo.UpdatedAt = gotime.Now() + + // Update in DB first (write-through) + if err := c.updateDocumentStatusInDBDirect(refKey, docID, database.DocumentAttaching); err != nil { + return nil, fmt.Errorf("update document status in DB: %w", err) + } + + // Then update cache + if err := c.clientInfoCache.Set(refKey, updatedClientInfo); err != nil { + return nil, fmt.Errorf("update client info in cache: %w", err) + } + + return updatedClientInfo, nil + } + + // 2. If not in cache, proceed with DB operation // client must be activated and document must not be attached + c.clientInfoCache.recordTryAttachingMiss() result := c.collection(ColClients).FindOneAndUpdate( ctx, bson.M{ @@ -862,6 +927,10 @@ func (c *Client) TryAttaching( return nil, fmt.Errorf("try attaching %s to %s : %w", docID, refKey.ClientID, err) } + if err := c.clientInfoCache.Set(refKey, info); err != nil { + return nil, fmt.Errorf("set client info in cache: %w", err) + } + return info, nil } @@ -872,6 +941,48 @@ func (c *Client) DeactivateClient( ) (*database.ClientInfo, error) { now := gotime.Now() + // 1. Check cache first for latest client info + if cached := c.clientInfoCache.Get(refKey); cached != nil { + c.clientInfoCache.recordDeactivateClientHit() + + // Check if client is activated + if cached.Status != database.ClientActivated { + return nil, fmt.Errorf( + "conditions not satisfied to deactivate client: %w", + database.ErrClientNotFound, + ) + } + + // Check if any documents are attached + for _, docInfo := range cached.Documents { + if docInfo.Status == database.DocumentAttaching || docInfo.Status == database.DocumentAttached { + return nil, fmt.Errorf( + "conditions not satisfied to deactivate client: %w", + database.ErrClientNotFound, + ) + } + } + + // Update client status in cache and DB + updatedClientInfo := cached.DeepCopy() + updatedClientInfo.Status = database.ClientDeactivated + updatedClientInfo.UpdatedAt = now + + // Update in DB first (write-through) + if err := c.updateStatusInDBDirect(refKey, database.ClientDeactivated); err != nil { + return nil, fmt.Errorf("update client status in DB: %w", err) + } + + // Then update cache + if err := c.clientInfoCache.Set(refKey, updatedClientInfo); err != nil { + return nil, fmt.Errorf("update client info in cache: %w", err) + } + + return updatedClientInfo, nil + } + + // 2. If not in cache, proceed with DB operation + c.clientInfoCache.recordDeactivateClientMiss() result := c.collection(ColClients).FindOneAndUpdate( ctx, bson.M{ @@ -908,18 +1019,68 @@ func (c *Client) DeactivateClient( return nil, fmt.Errorf("deactivate client of %s: %w", refKey.ClientID, err) } + if err := c.clientInfoCache.Set(refKey, &info); err != nil { + return nil, fmt.Errorf("set client info in cache: %w", err) + } + + return &info, nil +} + +// DeactivateClientForHousekeeping deactivates the client for housekeeping purposes. +// This method bypasses cache and directly updates the database. +func (c *Client) DeactivateClientForHousekeeping( + ctx context.Context, + refKey types.ClientRefKey, +) (*database.ClientInfo, error) { + now := gotime.Now() + + // Direct DB operation without cache checks + result := c.collection(ColClients).FindOneAndUpdate( + ctx, + bson.M{ + "project_id": refKey.ProjectID, + "_id": refKey.ClientID, + "status": database.ClientActivated, + }, + bson.M{ + "$set": bson.M{ + "status": database.ClientDeactivated, + "updated_at": now, + }, + }, + options.FindOneAndUpdate().SetReturnDocument(options.After), + ) + + info := database.ClientInfo{} + if err := result.Decode(&info); err != nil { + if err == mongo.ErrNoDocuments { + // Client not found or already deactivated, return nil to indicate no action needed + return nil, nil + } + return nil, fmt.Errorf("decode client info: %w", err) + } + + // Update cache with the deactivated client info + if err := c.clientInfoCache.Set(refKey, &info); err != nil { + return nil, fmt.Errorf("set client info in cache: %w", err) + } + return &info, nil } // FindClientInfoByRefKey finds the client of the given refKey. func (c *Client) FindClientInfoByRefKey(ctx context.Context, refKey types.ClientRefKey) (*database.ClientInfo, error) { - result := c.collection(ColClients).FindOneAndUpdate(ctx, bson.M{ + // 1. Try to get from cache first + if cached := c.clientInfoCache.Get(refKey); cached != nil { + c.clientInfoCache.recordFindClientInfoHit() + return cached, nil + } + + // 2. If not in cache, get from database (read-only) + c.clientInfoCache.recordFindClientInfoMiss() + result := c.collection(ColClients).FindOne(ctx, bson.M{ "project_id": refKey.ProjectID, "_id": refKey.ClientID, - }, bson.M{ - "$set": bson.M{ - "updated_at": gotime.Now(), - }, }) clientInfo := database.ClientInfo{} @@ -929,6 +1090,10 @@ func (c *Client) FindClientInfoByRefKey(ctx context.Context, refKey types.Client } } + if err := c.clientInfoCache.Set(refKey, &clientInfo); err != nil { + return nil, fmt.Errorf("set client info in cache: %w", err) + } + return &clientInfo, nil } @@ -939,7 +1104,7 @@ func (c *Client) UpdateClientInfoAfterPushPull( clientInfo *database.ClientInfo, docInfo *database.DocInfo, ) error { - clientDocInfo, ok := clientInfo.Documents[docInfo.ID] + _, ok := clientInfo.Documents[docInfo.ID] if !ok { return fmt.Errorf( "update client of %s after PushPull %s: %w", @@ -947,50 +1112,92 @@ func (c *Client) UpdateClientInfoAfterPushPull( ) } - attached, err := clientInfo.IsAttached(docInfo.ID) + _, err := clientInfo.IsAttached(docInfo.ID) if err != nil { return err } - var updater bson.M - if attached { - updater = bson.M{ - "$max": bson.M{ - clientDocInfoKey(docInfo.ID, "server_seq"): clientDocInfo.ServerSeq, - clientDocInfoKey(docInfo.ID, "client_seq"): clientDocInfo.ClientSeq, - }, - "$set": bson.M{ - clientDocInfoKey(docInfo.ID, StatusKey): clientDocInfo.Status, - "updated_at": clientInfo.UpdatedAt, - }, - "$addToSet": bson.M{ - "attached_docs": docInfo.ID, - }, + // Get cached client info if available + cached := c.clientInfoCache.Get(clientInfo.RefKey()) + + if cached != nil { + // Update status using write-through strategy + if cached.Status != clientInfo.Status { + if err := c.clientInfoCache.UpdateStatus(clientInfo.RefKey(), clientInfo.Status); err != nil { + return fmt.Errorf("update client status in cache: %w", err) + } + } + + // Update document status using write-through strategy + if clientDocInfo, exists := clientInfo.Documents[docInfo.ID]; exists { + if cachedDocInfo, exists := cached.Documents[docInfo.ID]; !exists || cachedDocInfo.Status != clientDocInfo.Status { + if err := c.clientInfoCache.UpdateDocumentStatus( + clientInfo.RefKey(), docInfo.ID, clientDocInfo.Status, + ); err != nil { + return fmt.Errorf("update document status in cache: %w", err) + } + } + } + + // Update checkpoint using write-back strategy + if clientDocInfo, exists := clientInfo.Documents[docInfo.ID]; exists { + checkpoint := change.Checkpoint{ + ServerSeq: clientDocInfo.ServerSeq, + ClientSeq: clientDocInfo.ClientSeq, + } + if err := c.clientInfoCache.UpdateCheckpoint(clientInfo.RefKey(), docInfo.ID, checkpoint); err != nil { + return fmt.Errorf("update checkpoint in cache: %w", err) + } } } else { - updater = bson.M{ - "$set": bson.M{ - clientDocInfoKey(docInfo.ID, "server_seq"): 0, - clientDocInfoKey(docInfo.ID, "client_seq"): 0, - clientDocInfoKey(docInfo.ID, StatusKey): clientDocInfo.Status, - "updated_at": clientInfo.UpdatedAt, - }, - "$pull": bson.M{ - "attached_docs": docInfo.ID, - }, + // If not in cache, update status and document status using write-through + if err := c.clientInfoCache.UpdateStatus(clientInfo.RefKey(), clientInfo.Status); err != nil { + return fmt.Errorf("update client status in cache: %w", err) } - } - res, err := c.collection(ColClients).UpdateOne(ctx, bson.M{ - "project_id": clientInfo.ProjectID, - "_id": clientInfo.ID, - }, updater) + if clientDocInfo, exists := clientInfo.Documents[docInfo.ID]; exists { + if err := c.clientInfoCache.UpdateDocumentStatus(clientInfo.RefKey(), docInfo.ID, clientDocInfo.Status); err != nil { + return fmt.Errorf("update document status in cache: %w", err) + } + } - if err != nil { - return fmt.Errorf("update client of %s after PushPull %s: %w", clientInfo.ID, docInfo.ID, err) - } - if res.MatchedCount == 0 { - return fmt.Errorf("update client of %s after PushPull %s: %w", clientInfo.ID, docInfo.ID, database.ErrClientNotFound) + // Load full client info from DB and cache it + result := c.collection(ColClients).FindOne(ctx, bson.M{ + "project_id": clientInfo.ProjectID, + "_id": clientInfo.ID, + }) + if result.Err() == mongo.ErrNoDocuments { + return fmt.Errorf("update client of %s after PushPull %s: %w", clientInfo.ID, docInfo.ID, database.ErrClientNotFound) + } + if result.Err() != nil { + return fmt.Errorf("update client of %s after PushPull %s: %w", clientInfo.ID, docInfo.ID, result.Err()) + } + + var dbClientInfo database.ClientInfo + if err := result.Decode(&dbClientInfo); err != nil { + return fmt.Errorf("decode client info: %w", err) + } + + // Update checkpoint in the loaded client info + if clientDocInfo, exists := clientInfo.Documents[docInfo.ID]; exists { + if dbClientInfo.Documents == nil { + dbClientInfo.Documents = make(map[types.ID]*database.ClientDocInfo) + } + if dbDocInfo, exists := dbClientInfo.Documents[docInfo.ID]; exists { + dbDocInfo.ServerSeq = clientDocInfo.ServerSeq + dbDocInfo.ClientSeq = clientDocInfo.ClientSeq + } else { + dbClientInfo.Documents[docInfo.ID] = &database.ClientDocInfo{ + Status: clientDocInfo.Status, + ServerSeq: clientDocInfo.ServerSeq, + ClientSeq: clientDocInfo.ClientSeq, + } + } + } + + if err := c.clientInfoCache.Set(clientInfo.RefKey(), &dbClientInfo); err != nil { + return fmt.Errorf("set client info in cache: %w", err) + } } return nil @@ -1089,6 +1296,30 @@ func (c *Client) FindAttachedClientInfosByRefKey( ctx context.Context, docRefKey types.DocRefKey, ) ([]*database.ClientInfo, error) { + // 1. Check cache first for all clients in the project + cachedClients := c.clientInfoCache.GetByProject(docRefKey.ProjectID) + + var attachedClients []*database.ClientInfo + for _, clientInfo := range cachedClients { + if clientInfo.Status != database.ClientActivated { + continue + } + + if docInfo, exists := clientInfo.Documents[docRefKey.DocID]; exists { + if docInfo.Status == database.DocumentAttached { + attachedClients = append(attachedClients, clientInfo) + } + } + } + + if len(attachedClients) > 0 { + sort.Slice(attachedClients, func(i, j int) bool { + return attachedClients[i].ID.String() < attachedClients[j].ID.String() + }) + return attachedClients, nil + } + + // 2. If not found in cache, check database filter := bson.M{ "project_id": docRefKey.ProjectID, "status": database.ClientActivated, @@ -1263,6 +1494,24 @@ func (c *Client) UpdateDocInfoStatusToRemoved( return fmt.Errorf("update %s to removed: %w", refKey, result.Err()) } + // Invalidate only clients attached to this document + attachedClients, err := c.FindAttachedClientInfosByRefKey(ctx, refKey) + if err == nil { + for _, client := range attachedClients { + if err := c.clientInfoCache.Invalidate(client.RefKey()); err != nil { + logging.DefaultLogger().Errorf("failed to invalidate client cache for %s: %v", client.RefKey(), err) + } + } + } else { + logging.DefaultLogger().Warnf( + "failed to find attached clients for %s, falling back to full cache invalidation: %v", + refKey, err) + // Fallback to full invalidation if we can't find attached clients + if err := c.clientInfoCache.InvalidateAll(); err != nil { + return fmt.Errorf("invalidate all client caches: %w", err) + } + } + return nil } @@ -1897,6 +2146,22 @@ func (c *Client) IsDocumentAttached( docRefKey types.DocRefKey, excludeClientID types.ID, ) (bool, error) { + // 1. Check cache first for all clients in the project + cachedClients := c.clientInfoCache.GetByProject(docRefKey.ProjectID) + + for _, clientInfo := range cachedClients { + if excludeClientID != "" && clientInfo.ID == excludeClientID { + continue + } + + if docInfo, exists := clientInfo.Documents[docRefKey.DocID]; exists { + if docInfo.Status == database.DocumentAttached || docInfo.Status == database.DocumentAttaching { + return true, nil + } + } + } + + // 2. If not found in cache, check database filter := bson.M{ "project_id": docRefKey.ProjectID, "attached_docs": docRefKey.DocID, @@ -2076,6 +2341,24 @@ func (c *Client) PurgeDocument( return nil, fmt.Errorf("delete document of %s: %w", docRefKey, err) } + // Invalidate only clients attached to this document + attachedClients, err := c.FindAttachedClientInfosByRefKey(ctx, docRefKey) + if err == nil { + for _, client := range attachedClients { + if err := c.clientInfoCache.Invalidate(client.RefKey()); err != nil { + logging.DefaultLogger().Errorf("failed to invalidate client cache for %s: %v", client.RefKey(), err) + } + } + } else { + logging.DefaultLogger().Warnf( + "failed to find attached clients for %s, falling back to full cache invalidation: %v", + docRefKey, err) + // Fallback to full invalidation if we can't find attached clients + if err := c.clientInfoCache.InvalidateAll(); err != nil { + return nil, fmt.Errorf("invalidate all client caches: %w", err) + } + } + return res, nil } @@ -2166,3 +2449,68 @@ func (c *Client) IsSchemaAttached( } return true, nil } + +// GetClientInfoCacheMetrics returns metrics from the client info cache +func (c *Client) GetClientInfoCacheMetrics() *CacheMetrics { + if c.clientInfoCache == nil { + return nil + } + return c.clientInfoCache.GetMetrics() +} + +// LogClientInfoCacheMetrics logs the current cache metrics +func (c *Client) LogClientInfoCacheMetrics() { + if c.clientInfoCache == nil { + return + } + c.clientInfoCache.LogMetrics() +} + +// updateDocumentStatusInDBDirect updates document status in database directly +func (c *Client) updateDocumentStatusInDBDirect(refKey types.ClientRefKey, docID types.ID, status string) error { + updateDoc := bson.M{ + "$set": bson.M{ + "updated_at": gotime.Now(), + }, + } + + // Add document-specific updates + docKey := clientDocInfoKey(docID, StatusKey) + updateDoc["$set"].(bson.M)[docKey] = status + + // Reset sequence numbers for detached/removed/attaching documents + if status == database.DocumentDetached || status == database.DocumentRemoved || status == database.DocumentAttaching { + serverSeqKey := clientDocInfoKey(docID, "server_seq") + clientSeqKey := clientDocInfoKey(docID, "client_seq") + updateDoc["$set"].(bson.M)[serverSeqKey] = int64(0) + updateDoc["$set"].(bson.M)[clientSeqKey] = uint32(0) + } + + _, err := c.collection(ColClients).UpdateOne( + context.Background(), + bson.M{ + "project_id": refKey.ProjectID, + "_id": refKey.ClientID, + }, + updateDoc, + ) + return err +} + +// updateStatusInDBDirect updates client status in database directly +func (c *Client) updateStatusInDBDirect(refKey types.ClientRefKey, status string) error { + _, err := c.collection(ColClients).UpdateOne( + context.Background(), + bson.M{ + "project_id": refKey.ProjectID, + "_id": refKey.ClientID, + }, + bson.M{ + "$set": bson.M{ + "status": status, + "updated_at": gotime.Now(), + }, + }, + ) + return err +} diff --git a/server/backend/database/mongo/client_cache.go b/server/backend/database/mongo/client_cache.go new file mode 100644 index 000000000..f614245a8 --- /dev/null +++ b/server/backend/database/mongo/client_cache.go @@ -0,0 +1,1102 @@ +/* + * Copyright 2025 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mongo + +import ( + "context" + "fmt" + "strings" + "sync" + "sync/atomic" + "time" + + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + + "github.com/yorkie-team/yorkie/api/types" + "github.com/yorkie-team/yorkie/pkg/document/change" + "github.com/yorkie-team/yorkie/server/backend/database" + "github.com/yorkie-team/yorkie/server/logging" +) + +// CacheMetrics tracks performance metrics for the cache +type CacheMetrics struct { + // Overall cache performance + TotalHits int64 + TotalMisses int64 + + // Operation-specific hit rates + ActivateClientHits int64 + ActivateClientMisses int64 + DeactivateClientHits int64 + DeactivateClientMisses int64 + TryAttachingHits int64 + TryAttachingMisses int64 + FindClientInfoHits int64 + FindClientInfoMisses int64 + + // TTL-related metrics + TTLExpirations int64 + TTLFlushErrors int64 +} + +// CacheConfig defines configuration parameters for the cache +type CacheConfig struct { + // BaseFlushInterval is the base interval for flushing cache to DB + BaseFlushInterval time.Duration + // MaxFlushInterval is the maximum interval for flushing cache to DB + MaxFlushInterval time.Duration + // MinFlushInterval is the minimum interval for flushing cache to DB + MinFlushInterval time.Duration + // MaxCacheSize is the maximum number of cached entries + MaxCacheSize int + // WritePressureThreshold is the threshold for write pressure detection + WritePressureThreshold int + // PressureCheckInterval is the interval for checking write pressure + PressureCheckInterval time.Duration + // EnableFlushCleanup enables cache cleanup after flush + EnableFlushCleanup bool + // TTL is the time-to-live for cached entries + TTL time.Duration +} + +// DefaultCacheConfig returns default cache configuration +func DefaultCacheConfig() *CacheConfig { + return &CacheConfig{ + BaseFlushInterval: 7 * time.Second, + MaxFlushInterval: 10 * time.Second, + MinFlushInterval: 5 * time.Second, + MaxCacheSize: 20000, + WritePressureThreshold: 500, + PressureCheckInterval: 10 * time.Second, + EnableFlushCleanup: true, + TTL: 10 * time.Second, + } +} + +// CachedClientInfo wraps ClientInfo with cache-specific metadata +type CachedClientInfo struct { + ClientInfo *database.ClientInfo + UpdatedAt time.Time + Dirty bool + LastFlush time.Time + ExpiresAt time.Time +} + +// WritePressure tracks write pressure metrics for adaptive flushing +type WritePressure struct { + ActiveWrites int64 + PendingWrites int64 + LastFlushTime time.Time + FlushInterval time.Duration + PressureLevel float64 +} + +// ClientInfoCache manages in-memory caching of ClientInfo objects +type ClientInfoCache struct { + mu sync.RWMutex + cache map[types.ClientRefKey]*CachedClientInfo + flushCh chan struct{} + stopCh chan struct{} + config *CacheConfig + client *Client + writePressure *WritePressure + pressureMu sync.RWMutex + metrics *CacheMetrics +} + +// NewClientInfoCache creates a new ClientInfoCache instance +func NewClientInfoCache(config *CacheConfig, client *Client) *ClientInfoCache { + if config == nil { + config = DefaultCacheConfig() + } + + cache := &ClientInfoCache{ + cache: make(map[types.ClientRefKey]*CachedClientInfo), + flushCh: make(chan struct{}, 1), + stopCh: make(chan struct{}), + config: config, + client: client, + writePressure: &WritePressure{}, + metrics: &CacheMetrics{}, + } + + // Start background goroutines + go cache.adaptiveFlush() + go cache.ttlCleanup() + + // Start metrics logging + cache.StartMetricsLogging() + + return cache +} + +// handleExpiredEntry processes an expired cache entry +// Status information is discarded, checkpoint information is flushed to DB using max operator +func (c *ClientInfoCache) handleExpiredEntry(refKey types.ClientRefKey, cached *CachedClientInfo) { + atomic.AddInt64(&c.metrics.TTLExpirations, 1) + + // Create a copy of client info with only checkpoint information + checkpointInfo := &database.ClientInfo{ + ID: cached.ClientInfo.ID, + ProjectID: cached.ClientInfo.ProjectID, + Key: cached.ClientInfo.Key, + Status: cached.ClientInfo.Status, + Documents: make(map[types.ID]*database.ClientDocInfo), + Metadata: cached.ClientInfo.Metadata, + CreatedAt: cached.ClientInfo.CreatedAt, + UpdatedAt: cached.ClientInfo.UpdatedAt, + } + + // Copy only checkpoint information (ServerSeq, ClientSeq) from documents + for docID, docInfo := range cached.ClientInfo.Documents { + checkpointInfo.Documents[docID] = &database.ClientDocInfo{ + ServerSeq: docInfo.ServerSeq, + ClientSeq: docInfo.ClientSeq, + Status: docInfo.Status, + } + } + + // Flush checkpoint information to DB using max operator + if err := c.flushSingleToDB(refKey, checkpointInfo); err != nil { + atomic.AddInt64(&c.metrics.TTLFlushErrors, 1) + logging.DefaultLogger().Errorf("Failed to flush expired entry checkpoint to DB: %v", err) + } +} + +// Get retrieves a ClientInfo from the cache +func (c *ClientInfoCache) Get(refKey types.ClientRefKey) *database.ClientInfo { + c.mu.RLock() + defer c.mu.RUnlock() + + if cached, exists := c.cache[refKey]; exists { + // Check if entry has expired + if time.Now().After(cached.ExpiresAt) { + // Schedule expiration handling in background + go c.handleExpiration(refKey) + c.recordMiss() + return nil + } + + c.recordHit() + return cached.ClientInfo.DeepCopy() + } + + c.recordMiss() + return nil +} + +// handleExpiration handles expired entry cleanup in background +func (c *ClientInfoCache) handleExpiration(refKey types.ClientRefKey) { + c.mu.Lock() + defer c.mu.Unlock() + + if cached, exists := c.cache[refKey]; exists && time.Now().After(cached.ExpiresAt) { + c.handleExpiredEntry(refKey, cached) + delete(c.cache, refKey) + } +} + +// GetByKey retrieves a ClientInfo from the cache by project ID and key +func (c *ClientInfoCache) GetByKey(projectID types.ID, key string) *database.ClientInfo { + c.mu.RLock() + defer c.mu.RUnlock() + + for refKey, cached := range c.cache { + if refKey.ProjectID == projectID && cached.ClientInfo.Key == key { + // Check if entry has expired + if time.Now().After(cached.ExpiresAt) { + // Schedule expiration handling in background + go c.handleExpiration(refKey) + c.recordMiss() + return nil + } + + c.recordHit() + return cached.ClientInfo.DeepCopy() + } + } + + c.recordMiss() + return nil +} + +// GetByProject retrieves all ClientInfo for a specific project from the cache +func (c *ClientInfoCache) GetByProject(projectID types.ID) []*database.ClientInfo { + c.mu.RLock() + defer c.mu.RUnlock() + + var clients []*database.ClientInfo + var expiredKeys []types.ClientRefKey + + now := time.Now() + for refKey, cached := range c.cache { + if refKey.ProjectID == projectID { + // Check if entry has expired + if now.After(cached.ExpiresAt) { + expiredKeys = append(expiredKeys, refKey) + continue + } + clients = append(clients, cached.ClientInfo.DeepCopy()) + } + } + + // Handle expired entries in background if any found + if len(expiredKeys) > 0 { + go c.handleBatchExpiration(expiredKeys) + } + + return clients +} + +// handleBatchExpiration handles multiple expired entries in batch +func (c *ClientInfoCache) handleBatchExpiration(refKeys []types.ClientRefKey) { + c.mu.Lock() + defer c.mu.Unlock() + + now := time.Now() + for _, refKey := range refKeys { + if cached, exists := c.cache[refKey]; exists && now.After(cached.ExpiresAt) { + c.handleExpiredEntry(refKey, cached) + delete(c.cache, refKey) + } + } +} + +// Set stores a ClientInfo in the cache +func (c *ClientInfoCache) Set(refKey types.ClientRefKey, clientInfo *database.ClientInfo) error { + c.mu.Lock() + defer c.mu.Unlock() + + now := time.Now() + c.cache[refKey] = &CachedClientInfo{ + ClientInfo: clientInfo.DeepCopy(), + UpdatedAt: now, + Dirty: false, + LastFlush: now, + ExpiresAt: now.Add(c.config.TTL), + } + + // Evict oldest if cache is full + if len(c.cache) > c.config.MaxCacheSize { + if err := c.evictOldest(); err != nil { + return fmt.Errorf("failed to evict oldest entry: %w", err) + } + } + return nil +} + +// UpdateClientInfo updates an existing cached ClientInfo with new data +func (c *ClientInfoCache) UpdateClientInfo(refKey types.ClientRefKey, clientInfo *database.ClientInfo) error { + c.mu.Lock() + defer c.mu.Unlock() + + if cached, exists := c.cache[refKey]; exists { + // Apply $max logic for sequence numbers (same as MongoDB's $max operator) + merged := c.mergeClientInfo(cached.ClientInfo, clientInfo) + cached.ClientInfo = merged + cached.Dirty = true + cached.UpdatedAt = time.Now() + cached.ExpiresAt = time.Now().Add(c.config.TTL) + return nil + } + + now := time.Now() + c.cache[refKey] = &CachedClientInfo{ + ClientInfo: clientInfo.DeepCopy(), + UpdatedAt: now, + Dirty: true, + LastFlush: now, + ExpiresAt: now.Add(c.config.TTL), + } + return nil +} + +// mergeClientInfo merges two ClientInfo objects, applying $max logic for sequence numbers +// and handling explicit resets for detached/removed documents +func (c *ClientInfoCache) mergeClientInfo(existing, new *database.ClientInfo) *database.ClientInfo { + merged := existing.DeepCopy() + + // Merge documents using $max logic + for docID, newDocInfo := range new.Documents { + if existingDocInfo, exists := merged.Documents[docID]; exists { + // Reset sequence numbers when document status changes + if newDocInfo.Status == database.DocumentDetached || newDocInfo.Status == database.DocumentRemoved { + existingDocInfo.ServerSeq = 0 + existingDocInfo.ClientSeq = 0 + existingDocInfo.Status = newDocInfo.Status + } else { + // Apply $max logic for sequence numbers + if newDocInfo.ServerSeq > existingDocInfo.ServerSeq { + existingDocInfo.ServerSeq = newDocInfo.ServerSeq + } + if newDocInfo.ClientSeq > existingDocInfo.ClientSeq { + existingDocInfo.ClientSeq = newDocInfo.ClientSeq + } + existingDocInfo.Status = newDocInfo.Status + } + } else { + // Add new document info + merged.Documents[docID] = &database.ClientDocInfo{ + ServerSeq: newDocInfo.ServerSeq, + ClientSeq: newDocInfo.ClientSeq, + Status: newDocInfo.Status, + } + } + } + + // Update other fields + merged.Status = new.Status + merged.UpdatedAt = new.UpdatedAt + + return merged +} + +// Invalidate removes an entry from the cache +func (c *ClientInfoCache) Invalidate(refKey types.ClientRefKey) error { + c.mu.Lock() + defer c.mu.Unlock() + + if entry, exists := c.cache[refKey]; exists { + // Force flush if entry is dirty before removal + if entry.Dirty { + if err := c.flushSingleToDB(refKey, entry.ClientInfo); err != nil { + return fmt.Errorf("failed to flush dirty entry: %w", err) + } + } + delete(c.cache, refKey) + } + return nil +} + +// InvalidateAll clears all entries from the cache +func (c *ClientInfoCache) InvalidateAll() error { + c.mu.Lock() + defer c.mu.Unlock() + + // Force flush all dirty entries before clearing + if err := c.FlushToDB(); err != nil { + return fmt.Errorf("failed to flush all dirty entries: %w", err) + } + c.cache = make(map[types.ClientRefKey]*CachedClientInfo) + return nil +} + +// IsDirty checks if an entry needs flushing to DB +func (c *ClientInfoCache) IsDirty(refKey types.ClientRefKey) bool { + c.mu.RLock() + defer c.mu.RUnlock() + + if cached, exists := c.cache[refKey]; exists { + return cached.Dirty + } + return false +} + +// evictOldest removes the oldest entry from the cache +func (c *ClientInfoCache) evictOldest() error { + var oldestKey types.ClientRefKey + var oldestTime time.Time + var oldestEntry *CachedClientInfo + + for refKey, entry := range c.cache { + if oldestEntry == nil || entry.UpdatedAt.Before(oldestTime) { + oldestKey = refKey + oldestTime = entry.UpdatedAt + oldestEntry = entry + } + } + + if oldestEntry != nil { + // Force flush if oldest entry is dirty before eviction + if oldestEntry.Dirty { + if err := c.flushSingleToDB(oldestKey, oldestEntry.ClientInfo); err != nil { + return fmt.Errorf("failed to flush oldest dirty entry: %w", err) + } + } + delete(c.cache, oldestKey) + } + return nil +} + +// cleanupAfterFlush removes flushed entries from cache +func (c *ClientInfoCache) cleanupAfterFlush() { + c.mu.Lock() + defer c.mu.Unlock() + + // Remove entries that were just flushed (Dirty = false and recently flushed) + removedCount := 0 + now := time.Now() + + for refKey, entry := range c.cache { + // Remove entries that were just flushed (not dirty and recently flushed) + if !entry.Dirty && now.Sub(entry.LastFlush) < time.Second { + delete(c.cache, refKey) + removedCount++ + } + } + + // Log cleanup metrics + if removedCount > 0 { + logging.DefaultLogger().Infof( + "Cache cleanup after flush: removed %d flushed entries", + removedCount, + ) + } +} + +// adaptiveFlush runs background goroutine for periodic flushing +func (c *ClientInfoCache) adaptiveFlush() { + ticker := time.NewTicker(c.config.PressureCheckInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + c.measureWritePressure() + interval := c.calculateFlushInterval() + if c.shouldFlush(interval) { + if err := c.FlushToDB(); err != nil { + logging.DefaultLogger().Error("flush to db: " + err.Error()) + } + } + case <-c.stopCh: + return + } + } +} + +// ttlCleanup runs background goroutine for periodic TTL cleanup +func (c *ClientInfoCache) ttlCleanup() { + ticker := time.NewTicker(c.config.TTL) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + c.mu.Lock() + now := time.Now() + for refKey, cached := range c.cache { + if now.After(cached.ExpiresAt) { + c.handleExpiredEntry(refKey, cached) + delete(c.cache, refKey) + } + } + c.mu.Unlock() + case <-c.stopCh: + return + } + } +} + +// measureWritePressure measures current write pressure +func (c *ClientInfoCache) measureWritePressure() { + c.mu.RLock() + pendingWrites := 0 + for _, entry := range c.cache { + if entry.Dirty { + pendingWrites++ + } + } + c.mu.RUnlock() + + c.pressureMu.Lock() + c.writePressure.PendingWrites = int64(pendingWrites) + + // Calculate pressure level (0.0 to 1.0) + if c.config.WritePressureThreshold > 0 { + c.writePressure.PressureLevel = float64(pendingWrites) / float64(c.config.WritePressureThreshold) + if c.writePressure.PressureLevel > 1.0 { + c.writePressure.PressureLevel = 1.0 + } + } + c.pressureMu.Unlock() +} + +// calculateFlushInterval calculates adaptive flush interval based on pressure +func (c *ClientInfoCache) calculateFlushInterval() time.Duration { + c.pressureMu.RLock() + pressureLevel := c.writePressure.PressureLevel + c.pressureMu.RUnlock() + + // Higher pressure = shorter interval + ratio := 1.0 - pressureLevel + interval := c.config.MinFlushInterval + + time.Duration(float64(c.config.MaxFlushInterval-c.config.MinFlushInterval)*ratio) + + return interval +} + +// shouldFlush determines if it's time to flush based on interval and last flush time +func (c *ClientInfoCache) shouldFlush(interval time.Duration) bool { + c.pressureMu.RLock() + lastFlush := c.writePressure.LastFlushTime + c.pressureMu.RUnlock() + + return time.Since(lastFlush) >= interval +} + +// FlushToDB flushes dirty entries to the database +func (c *ClientInfoCache) FlushToDB() error { + c.mu.Lock() + dirtyClients := make(map[types.ClientRefKey]*database.ClientInfo) + + for refKey, cached := range c.cache { + if cached.Dirty { + // Take a deep copy snapshot to avoid data races with concurrent updates + // after releasing the lock. + dirtyClients[refKey] = cached.ClientInfo.DeepCopy() + cached.Dirty = false + cached.LastFlush = time.Now() + } + } + c.mu.Unlock() + + if len(dirtyClients) == 0 { + return nil + } + + // Build bulk update operations + updates := make([]mongo.WriteModel, 0, len(dirtyClients)) + for refKey, clientInfo := range dirtyClients { + update := c.buildUpdateModel(refKey, clientInfo) + updates = append(updates, update) + } + + // Execute bulk update + if len(updates) > 0 { + result, err := c.client.collection(ColClients).BulkWrite(context.Background(), updates) + if err != nil { + return fmt.Errorf("bulk write client info (%d operations): %w", len(updates), err) + } + + if result.ModifiedCount != int64(len(updates)) { + logging.DefaultLogger().Warnf( + "Bulk write partial success: expected %d modifications, got %d", + len(updates), result.ModifiedCount, + ) + } + } + + // Update last flush time + c.pressureMu.Lock() + c.writePressure.LastFlushTime = time.Now() + c.pressureMu.Unlock() + + // Cleanup cache after successful flush + if c.config.EnableFlushCleanup { + c.cleanupAfterFlush() + } + + return nil +} + +// buildUpdateModel builds a MongoDB update model for the given client info +func (c *ClientInfoCache) buildUpdateModel(refKey types.ClientRefKey, + clientInfo *database.ClientInfo) mongo.WriteModel { + // Build the update document + updateDoc := bson.M{ + "$set": bson.M{ + "updated_at": clientInfo.UpdatedAt, + }, + "$max": bson.M{}, + } + + // Add document-specific updates + for docID, docInfo := range clientInfo.Documents { + docKey := clientDocInfoKey(docID, "server_seq") + updateDoc["$max"].(bson.M)[docKey] = docInfo.ServerSeq + + docKey = clientDocInfoKey(docID, "client_seq") + updateDoc["$max"].(bson.M)[docKey] = docInfo.ClientSeq + + docKey = clientDocInfoKey(docID, StatusKey) + updateDoc["$set"].(bson.M)[docKey] = docInfo.Status + } + + // Update attached_docs array with correct state + attachedDocs := clientInfo.AttachedDocuments() + updateDoc["$set"].(bson.M)["attached_docs"] = attachedDocs + + return mongo.NewUpdateOneModel(). + SetFilter(bson.M{ + "project_id": refKey.ProjectID, + "_id": refKey.ClientID, + }). + SetUpdate(updateDoc) +} + +// flushSingleToDB flushes a single client info entry to the database +func (c *ClientInfoCache) flushSingleToDB(refKey types.ClientRefKey, clientInfo *database.ClientInfo) error { + update := c.buildUpdateModel(refKey, clientInfo) + + _, err := c.client.collection(ColClients).BulkWrite(context.Background(), []mongo.WriteModel{update}) + if err != nil { + return fmt.Errorf("flush single client info: %w", err) + } + + return nil +} + +// Close stops the cache and flushes any remaining dirty entries +func (c *ClientInfoCache) Close() error { + close(c.stopCh) + + // Flush any remaining dirty entries + return c.FlushToDB() +} + +// StartMetricsLogging starts periodic metrics logging +func (c *ClientInfoCache) StartMetricsLogging() { + go c.periodicMetricsLogging() +} + +// periodicMetricsLogging logs metrics periodically +func (c *ClientInfoCache) periodicMetricsLogging() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + c.LogMetrics() + case <-c.stopCh: + return + } + } +} + +// Metrics recording functions +func (c *ClientInfoCache) recordHit() { + atomic.AddInt64(&c.metrics.TotalHits, 1) +} + +func (c *ClientInfoCache) recordMiss() { + atomic.AddInt64(&c.metrics.TotalMisses, 1) +} + +func (c *ClientInfoCache) recordActivateClientMiss() { + atomic.AddInt64(&c.metrics.ActivateClientMisses, 1) + atomic.AddInt64(&c.metrics.TotalMisses, 1) +} + +func (c *ClientInfoCache) recordDeactivateClientHit() { + atomic.AddInt64(&c.metrics.DeactivateClientHits, 1) + atomic.AddInt64(&c.metrics.TotalHits, 1) +} + +func (c *ClientInfoCache) recordDeactivateClientMiss() { + atomic.AddInt64(&c.metrics.DeactivateClientMisses, 1) + atomic.AddInt64(&c.metrics.TotalMisses, 1) +} + +func (c *ClientInfoCache) recordTryAttachingHit() { + atomic.AddInt64(&c.metrics.TryAttachingHits, 1) + atomic.AddInt64(&c.metrics.TotalHits, 1) +} + +func (c *ClientInfoCache) recordTryAttachingMiss() { + atomic.AddInt64(&c.metrics.TryAttachingMisses, 1) + atomic.AddInt64(&c.metrics.TotalMisses, 1) +} + +func (c *ClientInfoCache) recordFindClientInfoHit() { + atomic.AddInt64(&c.metrics.FindClientInfoHits, 1) + atomic.AddInt64(&c.metrics.TotalHits, 1) +} + +func (c *ClientInfoCache) recordFindClientInfoMiss() { + atomic.AddInt64(&c.metrics.FindClientInfoMisses, 1) + atomic.AddInt64(&c.metrics.TotalMisses, 1) +} + +// GetMetrics returns current cache metrics +func (c *ClientInfoCache) GetMetrics() *CacheMetrics { + // Use atomic loads to read metrics without locking + return &CacheMetrics{ + TotalHits: atomic.LoadInt64(&c.metrics.TotalHits), + TotalMisses: atomic.LoadInt64(&c.metrics.TotalMisses), + ActivateClientHits: atomic.LoadInt64(&c.metrics.ActivateClientHits), + ActivateClientMisses: atomic.LoadInt64(&c.metrics.ActivateClientMisses), + DeactivateClientHits: atomic.LoadInt64(&c.metrics.DeactivateClientHits), + DeactivateClientMisses: atomic.LoadInt64(&c.metrics.DeactivateClientMisses), + TryAttachingHits: atomic.LoadInt64(&c.metrics.TryAttachingHits), + TryAttachingMisses: atomic.LoadInt64(&c.metrics.TryAttachingMisses), + FindClientInfoHits: atomic.LoadInt64(&c.metrics.FindClientInfoHits), + FindClientInfoMisses: atomic.LoadInt64(&c.metrics.FindClientInfoMisses), + TTLExpirations: atomic.LoadInt64(&c.metrics.TTLExpirations), + TTLFlushErrors: atomic.LoadInt64(&c.metrics.TTLFlushErrors), + } +} + +// LogMetrics logs current cache metrics +func (c *ClientInfoCache) LogMetrics() { + metrics := c.GetMetrics() + + // Calculate overall hit rate + totalRequests := metrics.TotalHits + metrics.TotalMisses + overallHitRate := 0.0 + if totalRequests > 0 { + overallHitRate = float64(metrics.TotalHits) / float64(totalRequests) * 100 + } + + // Calculate operation-specific hit rates + activateTotal := metrics.ActivateClientHits + metrics.ActivateClientMisses + activateHitRate := 0.0 + if activateTotal > 0 { + activateHitRate = float64(metrics.ActivateClientHits) / float64(activateTotal) * 100 + } + + deactivateTotal := metrics.DeactivateClientHits + metrics.DeactivateClientMisses + deactivateHitRate := 0.0 + if deactivateTotal > 0 { + deactivateHitRate = float64(metrics.DeactivateClientHits) / float64(deactivateTotal) * 100 + } + + tryAttachingTotal := metrics.TryAttachingHits + metrics.TryAttachingMisses + tryAttachingHitRate := 0.0 + if tryAttachingTotal > 0 { + tryAttachingHitRate = float64(metrics.TryAttachingHits) / float64(tryAttachingTotal) * 100 + } + + findClientTotal := metrics.FindClientInfoHits + metrics.FindClientInfoMisses + findClientHitRate := 0.0 + if findClientTotal > 0 { + findClientHitRate = float64(metrics.FindClientInfoHits) / float64(findClientTotal) * 100 + } + + // Calculate cache usage ratio + c.mu.RLock() + currentCacheSize := len(c.cache) + maxCacheSize := c.config.MaxCacheSize + cacheUsageRatio := 0.0 + if maxCacheSize > 0 { + cacheUsageRatio = float64(currentCacheSize) / float64(maxCacheSize) * 100 + } + c.mu.RUnlock() + + logging.DefaultLogger().Infof( + "Cache Metrics - Overall Hit Rate: %.2f%% (%d/%d), Cache Usage: %.2f%% (%d/%d)", + overallHitRate, metrics.TotalHits, totalRequests, + cacheUsageRatio, currentCacheSize, maxCacheSize, + ) + + logging.DefaultLogger().Infof( + "Operation Hit Rates - ActivateClient: %.2f%% (%d/%d), "+ + "DeactivateClient: %.2f%% (%d/%d), TryAttaching: %.2f%% (%d/%d), "+ + "FindClientInfo: %.2f%% (%d/%d)", + activateHitRate, metrics.ActivateClientHits, activateTotal, + deactivateHitRate, metrics.DeactivateClientHits, deactivateTotal, + tryAttachingHitRate, metrics.TryAttachingHits, tryAttachingTotal, + findClientHitRate, metrics.FindClientInfoHits, findClientTotal, + ) + + logging.DefaultLogger().Infof( + "TTL Metrics - Expirations: %d, Flush Errors: %d", + metrics.TTLExpirations, metrics.TTLFlushErrors, + ) +} + +// UpdateCheckpoint updates only the checkpoint (ServerSeq, ClientSeq) for a specific document +// This uses Write-back strategy with max operator for consistency +func (c *ClientInfoCache) UpdateCheckpoint(refKey types.ClientRefKey, docID types.ID, cp change.Checkpoint) error { + c.mu.Lock() + defer c.mu.Unlock() + + if cached, exists := c.cache[refKey]; exists { + // Update checkpoint in cache with max operator + if cached.ClientInfo.Documents == nil { + cached.ClientInfo.Documents = make(map[types.ID]*database.ClientDocInfo) + } + + if docInfo, exists := cached.ClientInfo.Documents[docID]; exists { + // Apply max operator for sequence numbers + if cp.ServerSeq > docInfo.ServerSeq { + docInfo.ServerSeq = cp.ServerSeq + } + if cp.ClientSeq > docInfo.ClientSeq { + docInfo.ClientSeq = cp.ClientSeq + } + } else { + // Create new document info + cached.ClientInfo.Documents[docID] = &database.ClientDocInfo{ + Status: database.DocumentAttached, + ServerSeq: cp.ServerSeq, + ClientSeq: cp.ClientSeq, + } + } + + cached.Dirty = true + cached.UpdatedAt = time.Now() + cached.ExpiresAt = time.Now().Add(c.config.TTL) + return nil + } + + // Cache miss - load from DB and update + clientInfo, err := c.loadClientInfoFromDB(refKey) + if err != nil { + return fmt.Errorf("load client info from DB: %w", err) + } + + // Update checkpoint + if clientInfo.Documents == nil { + clientInfo.Documents = make(map[types.ID]*database.ClientDocInfo) + } + + if docInfo, exists := clientInfo.Documents[docID]; exists { + if cp.ServerSeq > docInfo.ServerSeq { + docInfo.ServerSeq = cp.ServerSeq + } + if cp.ClientSeq > docInfo.ClientSeq { + docInfo.ClientSeq = cp.ClientSeq + } + } else { + clientInfo.Documents[docID] = &database.ClientDocInfo{ + Status: database.DocumentAttached, + ServerSeq: cp.ServerSeq, + ClientSeq: cp.ClientSeq, + } + } + + // Store in cache + now := time.Now() + c.cache[refKey] = &CachedClientInfo{ + ClientInfo: clientInfo, + UpdatedAt: now, + Dirty: true, + LastFlush: now, + ExpiresAt: now.Add(c.config.TTL), + } + return nil +} + +// UpdateStatus updates the client status using CAS Write-through strategy +func (c *ClientInfoCache) UpdateStatus(refKey types.ClientRefKey, status string) error { + // Check if update is needed by comparing with cache first + c.mu.Lock() + if cached, exists := c.cache[refKey]; exists { + if cached.ClientInfo.Status == status { + // No change needed, skip database update + c.mu.Unlock() + return nil + } + } + c.mu.Unlock() + + // Update in DB (Write-through) + if err := c.updateStatusInDB(refKey, status); err != nil { + return fmt.Errorf("update status in DB: %w", err) + } + + // Then update cache + c.mu.Lock() + defer c.mu.Unlock() + + if cached, exists := c.cache[refKey]; exists { + cached.ClientInfo.Status = status + cached.UpdatedAt = time.Now() + // Not dirty since already written to DB + cached.Dirty = false + } else { + // If not in cache, load from DB and cache it + clientInfo, err := c.loadClientInfoFromDB(refKey) + if err != nil { + return fmt.Errorf("load client info from DB: %w", err) + } + + // Update the status in the loaded client info + clientInfo.Status = status + + now := time.Now() + c.cache[refKey] = &CachedClientInfo{ + ClientInfo: clientInfo, + UpdatedAt: now, + Dirty: false, + LastFlush: now, + ExpiresAt: now.Add(c.config.TTL), + } + } + return nil +} + +// UpdateDocumentStatus updates the document status using CAS Write-through strategy +// For DocumentAttaching status, it also resets ServerSeq and ClientSeq to 0 +func (c *ClientInfoCache) UpdateDocumentStatus(refKey types.ClientRefKey, docID types.ID, status string) error { + // Check if update is needed by comparing with cache first + c.mu.Lock() + if cached, exists := c.cache[refKey]; exists { + if cached.ClientInfo.Documents != nil { + if docInfo, exists := cached.ClientInfo.Documents[docID]; exists { + if docInfo.Status == status { + // No change needed, skip database update + c.mu.Unlock() + return nil + } + } + } + } + c.mu.Unlock() + + // Update in DB (Write-through) + if err := c.updateDocumentStatusInDB(refKey, docID, status); err != nil { + return fmt.Errorf("update document status in DB: %w", err) + } + + // Then update cache + c.mu.Lock() + defer c.mu.Unlock() + + if cached, exists := c.cache[refKey]; exists { + if cached.ClientInfo.Documents == nil { + cached.ClientInfo.Documents = make(map[types.ID]*database.ClientDocInfo) + } + + if docInfo, exists := cached.ClientInfo.Documents[docID]; exists { + docInfo.Status = status + // Reset sequence numbers for detached/removed/attaching documents + if status == database.DocumentDetached || status == database.DocumentRemoved || + status == database.DocumentAttaching { + docInfo.ServerSeq = 0 + docInfo.ClientSeq = 0 + } + } else { + cached.ClientInfo.Documents[docID] = &database.ClientDocInfo{ + Status: status, + ServerSeq: 0, + ClientSeq: 0, + } + } + + cached.UpdatedAt = time.Now() + // Not dirty since already written to DB + cached.Dirty = false + } else { + // If not in cache, try to load from DB first + clientInfo, err := c.loadClientInfoFromDB(refKey) + if err != nil { + // If client not found in DB, this might be a new document attachment + // In this case, we need to create a minimal client info structure + if strings.Contains(err.Error(), "client not found") { + return fmt.Errorf("cannot update document status for non-existent client %s: %w", + refKey.ClientID, err) + } + return fmt.Errorf("load client info from DB: %w", err) + } + + // Update the document status in the loaded client info + if clientInfo.Documents == nil { + clientInfo.Documents = make(map[types.ID]*database.ClientDocInfo) + } + + if docInfo, exists := clientInfo.Documents[docID]; exists { + docInfo.Status = status + // Reset sequence numbers for detached/removed/attaching documents + if status == database.DocumentDetached || status == database.DocumentRemoved || + status == database.DocumentAttaching { + docInfo.ServerSeq = 0 + docInfo.ClientSeq = 0 + } + } else { + clientInfo.Documents[docID] = &database.ClientDocInfo{ + Status: status, + ServerSeq: 0, + ClientSeq: 0, + } + } + + now := time.Now() + c.cache[refKey] = &CachedClientInfo{ + ClientInfo: clientInfo, + UpdatedAt: now, + Dirty: false, + LastFlush: now, + ExpiresAt: now.Add(c.config.TTL), + } + } + return nil +} + +// loadClientInfoFromDB loads client info from database +func (c *ClientInfoCache) loadClientInfoFromDB(refKey types.ClientRefKey) (*database.ClientInfo, error) { + result := c.client.collection(ColClients).FindOne(context.Background(), bson.M{ + "project_id": refKey.ProjectID, + "_id": refKey.ClientID, + }) + + if result.Err() != nil { + if result.Err() == mongo.ErrNoDocuments { + return nil, fmt.Errorf("%s: %w", refKey.ClientID, database.ErrClientNotFound) + } + return nil, fmt.Errorf("find client by id: %w", result.Err()) + } + + var clientInfo database.ClientInfo + if err := result.Decode(&clientInfo); err != nil { + return nil, fmt.Errorf("decode client info: %w", err) + } + + return &clientInfo, nil +} + +// updateStatusInDB updates client status in database +func (c *ClientInfoCache) updateStatusInDB(refKey types.ClientRefKey, status string) error { + _, err := c.client.collection(ColClients).UpdateOne( + context.Background(), + bson.M{ + "project_id": refKey.ProjectID, + "_id": refKey.ClientID, + }, + bson.M{ + "$set": bson.M{ + "status": status, + "updated_at": time.Now(), + }, + }, + ) + return err +} + +// updateDocumentStatusInDB updates document status in database +func (c *ClientInfoCache) updateDocumentStatusInDB(refKey types.ClientRefKey, docID types.ID, status string) error { + updateDoc := bson.M{ + "$set": bson.M{ + "updated_at": time.Now(), + }, + } + + // Add document-specific updates + docKey := clientDocInfoKey(docID, StatusKey) + updateDoc["$set"].(bson.M)[docKey] = status + + // Reset sequence numbers for detached/removed/attaching documents + if status == database.DocumentDetached || status == database.DocumentRemoved || status == database.DocumentAttaching { + serverSeqKey := clientDocInfoKey(docID, "server_seq") + clientSeqKey := clientDocInfoKey(docID, "client_seq") + updateDoc["$set"].(bson.M)[serverSeqKey] = int64(0) + updateDoc["$set"].(bson.M)[clientSeqKey] = uint32(0) + } + + _, err := c.client.collection(ColClients).UpdateOne( + context.Background(), + bson.M{ + "project_id": refKey.ProjectID, + "_id": refKey.ClientID, + }, + updateDoc, + ) + return err +} diff --git a/server/clients/clients.go b/server/clients/clients.go index c68e02b31..0ef66de91 100644 --- a/server/clients/clients.go +++ b/server/clients/clients.go @@ -91,6 +91,63 @@ func Deactivate( return be.DB.DeactivateClient(ctx, refKey) } +// DeactivateForHousekeeping deactivates the given client for housekeeping purposes. +// This function doesn't require the client to be activated and handles already deactivated clients. +func DeactivateForHousekeeping( + ctx context.Context, + be *backend.Backend, + project *types.Project, + refKey types.ClientRefKey, +) (*database.ClientInfo, error) { + info, err := FindClientInfoForDeactivation(ctx, be, refKey) + if err != nil { + return nil, err + } + + // The client may have disappeared between candidate selection and now. + if info == nil { + return nil, nil + } + + // If client is already deactivated, return the info without further processing + if info.Status == database.ClientDeactivated { + return info, nil + } + + // Process attached documents + for docID, clientDocInfo := range info.Documents { + if clientDocInfo.Status != database.DocumentAttached { + continue + } + + // TODO(hackerwins): Solve N+1 + docInfo, err := be.DB.FindDocInfoByRefKey(ctx, types.DocRefKey{ + ProjectID: project.ID, + DocID: docID, + }) + if err != nil { + return nil, err + } + + actorID, err := info.ID.ToActorID() + if err != nil { + return nil, err + } + + if err := be.ClusterClient.DetachDocument( + ctx, + project, + actorID, + docID, + docInfo.Key, + ); err != nil { + return nil, err + } + } + + return be.DB.DeactivateClientForHousekeeping(ctx, refKey) +} + // AttachDocument attaches the given document to the client. func AttachDocument( ctx context.Context, @@ -141,3 +198,22 @@ func FindActiveClientInfo( return info, nil } + +// FindClientInfoForDeactivation finds the client info for deactivation. +// This function is used by housekeeping and doesn't require the client to be activated. +func FindClientInfoForDeactivation( + ctx context.Context, + be *backend.Backend, + refKey types.ClientRefKey, +) (*database.ClientInfo, error) { + info, err := be.DB.FindClientInfoByRefKey(ctx, refKey) + if err != nil { + // Treat missing client as no-op for housekeeping + if errors.Is(err, database.ErrClientNotFound) { + return nil, nil + } + return nil, err + } + + return info, nil +} diff --git a/server/clients/housekeeping.go b/server/clients/housekeeping.go index fbd354e3e..41eddcef9 100644 --- a/server/clients/housekeeping.go +++ b/server/clients/housekeeping.go @@ -59,7 +59,7 @@ func DeactivateInactives( deactivatedCount := 0 for _, pair := range candidates { - if _, err := Deactivate(ctx, be, pair.Project.ToProject(), pair.Client.RefKey()); err != nil { + if _, err := DeactivateForHousekeeping(ctx, be, pair.Project.ToProject(), pair.Client.RefKey()); err != nil { return database.DefaultProjectID, err } diff --git a/test/integration/routing_test.go b/test/integration/routing_test.go new file mode 100644 index 000000000..c0e21d8f8 --- /dev/null +++ b/test/integration/routing_test.go @@ -0,0 +1,276 @@ +package integration + +import ( + "context" + "testing" + "time" + + "connectrpc.com/connect" + "github.com/stretchr/testify/assert" + + "github.com/yorkie-team/yorkie/client" + "github.com/yorkie-team/yorkie/pkg/document" + "github.com/yorkie-team/yorkie/pkg/document/json" + "github.com/yorkie-team/yorkie/pkg/document/presence" + "github.com/yorkie-team/yorkie/server/backend" + "github.com/yorkie-team/yorkie/server/backend/database" + "github.com/yorkie-team/yorkie/server/backend/database/mongo" + "github.com/yorkie-team/yorkie/server/backend/housekeeping" + "github.com/yorkie-team/yorkie/server/profiling/prometheus" + "github.com/yorkie-team/yorkie/test/helper" +) + +// attachWithRetry tries Attach with small backoff on FailedPrecondition(conflict) +func attachWithRetry(ctx context.Context, c *client.Client, d *document.Document, tries int) error { + var lastErr error + for i := 0; i < tries; i++ { + if err := c.Attach(ctx, d); err != nil { + lastErr = err + if connect.CodeOf(err) == connect.CodeFailedPrecondition { + time.Sleep(time.Duration(50*(i+1)) * time.Millisecond) + continue + } + return err + } + return nil + } + return lastErr +} + +func TestClientRouting(t *testing.T) { + ctx := context.Background() + + const clientKey = "C1-routing-test" + docKey := helper.TestDocKey(t) + + // Start two servers A and B sharing the same DB. + svrA := helper.TestServer() + assert.NoError(t, svrA.Start()) + defer func() { _ = svrA.Shutdown(true) }() + + svrB := helper.TestServer() + assert.NoError(t, svrB.Start()) + defer func() { _ = svrB.Shutdown(true) }() + + // t0: Connect to A with fixed client key + cliA, err := client.Dial(svrA.RPCAddr(), client.WithKey(clientKey)) + assert.NoError(t, err) + assert.NoError(t, cliA.Activate(ctx)) + + docA := document.New(docKey) + assert.NoError(t, cliA.Attach(ctx, docA)) + + for i := 0; i < 2; i++ { + err := docA.Update(func(r *json.Object, p *presence.Presence) error { + r.SetString("k", "vA") + return nil + }) + assert.NoError(t, err) + assert.NoError(t, cliA.Sync(ctx)) + } + // Ensure the server has the latest state from A + assert.NoError(t, cliA.Detach(ctx, docA)) + // Small delay to reduce cross-server write races + time.Sleep(150 * time.Millisecond) + + // t1: Connect to B with SAME client key + cliB, err := client.Dial(svrB.RPCAddr(), client.WithKey(clientKey)) + assert.NoError(t, err) + assert.NoError(t, cliB.Activate(ctx)) + docB := document.New(docKey) + assert.NoError(t, attachWithRetry(ctx, cliB, docB, 5)) + assert.NoError(t, cliB.Sync(ctx)) + gotB := docB.Marshal() + assert.Contains(t, gotB, "\"k\":\"vA\"") + + for i := 0; i < 2; i++ { + err := docB.Update(func(r *json.Object, p *presence.Presence) error { + r.SetString("k", "vB") + return nil + }) + assert.NoError(t, err) + assert.NoError(t, cliB.Sync(ctx)) + } + assert.NoError(t, cliB.Detach(ctx, docB)) + // Small delay to reduce cross-server write races + time.Sleep(150 * time.Millisecond) + + // t2: Reconnect to A with SAME client key + cliA2, err := client.Dial(svrA.RPCAddr(), client.WithKey(clientKey)) + assert.NoError(t, err) + assert.NoError(t, cliA2.Activate(ctx)) + docA2 := document.New(docKey) + assert.NoError(t, attachWithRetry(ctx, cliA2, docA2, 5)) + assert.NoError(t, cliA2.Sync(ctx)) + gotA2 := docA2.Marshal() + assert.Contains(t, gotA2, "\"k\":\"vB\"") + + err = docA2.Update(func(r *json.Object, p *presence.Presence) error { + r.SetString("k", "vA2") + return nil + }) + assert.NoError(t, err) + assert.NoError(t, cliA2.Sync(ctx)) +} + +func TestClusterRoutingWithMongoDB(t *testing.T) { + ctx := context.Background() + const clientKey = "C1-cluster-routing-test" + + // Create two backends sharing the same MongoDB + metA, err := prometheus.NewMetrics() + assert.NoError(t, err) + + backendA, err := backend.New( + &backend.Config{ + AdminUser: helper.AdminUser, + AdminPassword: helper.AdminPassword, + UseDefaultProject: helper.UseDefaultProject, + ClientDeactivateThreshold: helper.ClientDeactivateThreshold, + SnapshotThreshold: helper.SnapshotThreshold, + SnapshotCacheSize: helper.SnapshotCacheSize, + AuthWebhookCacheSize: helper.AuthWebhookSize, + AuthWebhookCacheTTL: helper.AuthWebhookCacheTTL.String(), + AuthWebhookMaxWaitInterval: helper.AuthWebhookMaxWaitInterval.String(), + AuthWebhookMinWaitInterval: helper.AuthWebhookMinWaitInterval.String(), + AuthWebhookRequestTimeout: helper.AuthWebhookRequestTimeout.String(), + EventWebhookMaxWaitInterval: helper.EventWebhookMaxWaitInterval.String(), + EventWebhookMinWaitInterval: helper.EventWebhookMinWaitInterval.String(), + EventWebhookRequestTimeout: helper.EventWebhookRequestTimeout.String(), + ProjectCacheSize: helper.ProjectCacheSize, + ProjectCacheTTL: helper.ProjectCacheTTL.String(), + AdminTokenDuration: helper.AdminTokenDuration, + }, &mongo.Config{ + ConnectionURI: helper.MongoConnectionURI, + YorkieDatabase: helper.TestDBName(), + ConnectionTimeout: helper.MongoConnectionTimeout, + PingTimeout: helper.MongoPingTimeout, + }, &housekeeping.Config{ + Interval: helper.HousekeepingInterval.String(), + CandidatesLimitPerProject: helper.HousekeepingCandidatesLimitPerProject, + ProjectFetchSize: helper.HousekeepingProjectFetchSize, + }, metA, nil, nil) + assert.NoError(t, err) + defer func() { _ = backendA.Shutdown() }() + + metB, err := prometheus.NewMetrics() + assert.NoError(t, err) + + backendB, err := backend.New( + &backend.Config{ + AdminUser: helper.AdminUser, + AdminPassword: helper.AdminPassword, + UseDefaultProject: helper.UseDefaultProject, + ClientDeactivateThreshold: helper.ClientDeactivateThreshold, + SnapshotThreshold: helper.SnapshotThreshold, + SnapshotCacheSize: helper.SnapshotCacheSize, + AuthWebhookCacheSize: helper.AuthWebhookSize, + AuthWebhookCacheTTL: helper.AuthWebhookCacheTTL.String(), + AuthWebhookMaxWaitInterval: helper.AuthWebhookMaxWaitInterval.String(), + AuthWebhookMinWaitInterval: helper.AuthWebhookMinWaitInterval.String(), + AuthWebhookRequestTimeout: helper.AuthWebhookRequestTimeout.String(), + EventWebhookMaxWaitInterval: helper.EventWebhookMaxWaitInterval.String(), + EventWebhookMinWaitInterval: helper.EventWebhookMinWaitInterval.String(), + EventWebhookRequestTimeout: helper.EventWebhookRequestTimeout.String(), + ProjectCacheSize: helper.ProjectCacheSize, + ProjectCacheTTL: helper.ProjectCacheTTL.String(), + AdminTokenDuration: helper.AdminTokenDuration, + }, &mongo.Config{ + ConnectionURI: helper.MongoConnectionURI, + YorkieDatabase: helper.TestDBName(), + ConnectionTimeout: helper.MongoConnectionTimeout, + PingTimeout: helper.MongoPingTimeout, + }, &housekeeping.Config{ + Interval: helper.HousekeepingInterval.String(), + CandidatesLimitPerProject: helper.HousekeepingCandidatesLimitPerProject, + ProjectFetchSize: helper.HousekeepingProjectFetchSize, + }, metB, nil, nil) + assert.NoError(t, err) + defer func() { _ = backendB.Shutdown() }() + + // Start both backends + assert.NoError(t, backendA.Start(ctx)) + assert.NoError(t, backendB.Start(ctx)) + + // Get project info + projectInfo, err := backendA.DB.FindProjectInfoByID(ctx, database.DefaultProjectID) + assert.NoError(t, err) + project := projectInfo.ToProject() + + t.Run("client routing with cache behavior", func(t *testing.T) { + // t0: Activate client on backend A + clientInfoA, err := backendA.DB.ActivateClient(ctx, project.ID, clientKey, map[string]string{}) + assert.NoError(t, err) + assert.Equal(t, database.ClientActivated, clientInfoA.Status) + + // t1: Check if backend B can see the activated client (cache miss -> DB lookup) + clientInfoB, err := backendB.DB.FindClientInfoByRefKey(ctx, clientInfoA.RefKey()) + assert.NoError(t, err) + assert.Equal(t, database.ClientActivated, clientInfoB.Status) + assert.Equal(t, clientInfoA.ID, clientInfoB.ID) + + // t2: Deactivate client on backend A + _, err = backendA.DB.DeactivateClient(ctx, clientInfoA.RefKey()) + assert.NoError(t, err) + // Small delay to work ttl cleanup + time.Sleep(11 * time.Second) + + // t3: Check if backend B can see the deactivated client (cache miss -> DB lookup) + clientInfoBAfterDeactivate, err := backendB.DB.FindClientInfoByRefKey(ctx, clientInfoA.RefKey()) + assert.NoError(t, err) + assert.Equal(t, database.ClientDeactivated, clientInfoBAfterDeactivate.Status) + + // t4: Try to activate same client key on backend B (should work since client is deactivated) + clientInfoB2, err := backendB.DB.ActivateClient(ctx, project.ID, clientKey, map[string]string{}) + assert.NoError(t, err) + assert.Equal(t, database.ClientActivated, clientInfoB2.Status) + assert.Equal(t, clientInfoA.ID, clientInfoB2.ID, "Same client key should result in same client ID") + }) + + t.Run("client routing with document attachment", func(t *testing.T) { + docKey := helper.TestDocKey(t) + + // t0: Activate client on backend A + clientInfoA, err := backendA.DB.ActivateClient(ctx, project.ID, clientKey+"-doc", map[string]string{}) + assert.NoError(t, err) + + // t1: Create document on backend A + docInfoA, err := backendA.DB.FindOrCreateDocInfo(ctx, clientInfoA.RefKey(), docKey) + assert.NoError(t, err) + + // t2: Attach document on backend A + clientInfoA, err = backendA.DB.TryAttaching(ctx, clientInfoA.RefKey(), docInfoA.ID) + assert.NoError(t, err) + // Small delay to work ttl cleanup + time.Sleep(11 * time.Second) + + // t3: Check if backend B can see the attached document (cache miss -> DB lookup) + clientInfoB, err := backendB.DB.FindClientInfoByRefKey(ctx, clientInfoA.RefKey()) + assert.NoError(t, err) + assert.Equal(t, database.DocumentAttaching, clientInfoB.Documents[docInfoA.ID].Status) + }) + + t.Run("client routing with housekeeping deactivation", func(t *testing.T) { + // t0: Activate client on backend A + clientInfoA, err := backendA.DB.ActivateClient(ctx, project.ID, clientKey+"-housekeeping", map[string]string{}) + assert.NoError(t, err) + assert.Equal(t, database.ClientActivated, clientInfoA.Status) + + // t1: Deactivate client using housekeeping on backend B + _, err = backendB.DB.DeactivateClientForHousekeeping(ctx, clientInfoA.RefKey()) + assert.NoError(t, err) + // Small delay to work ttl cleanup + time.Sleep(11 * time.Second) + + // t2: Check if backend A can see the deactivated client (cache miss -> DB lookup) + clientInfoAAfterHousekeeping, err := backendA.DB.FindClientInfoByRefKey(ctx, clientInfoA.RefKey()) + assert.NoError(t, err) + assert.Equal(t, database.ClientDeactivated, clientInfoAAfterHousekeeping.Status) + + // t3: Try to activate again on backend A (should work since client is deactivated) + clientInfoA2, err := backendA.DB.ActivateClient(ctx, project.ID, clientKey+"-housekeeping", map[string]string{}) + assert.NoError(t, err) + assert.Equal(t, database.ClientActivated, clientInfoA2.Status) + assert.Equal(t, clientInfoA.ID, clientInfoA2.ID) + }) +}