Skip to content

Commit

Permalink
feat: add results to update methods
Browse files Browse the repository at this point in the history
  • Loading branch information
maxbolgarin committed Dec 16, 2024
1 parent e3d9298 commit 5088c68
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 71 deletions.
12 changes: 8 additions & 4 deletions async.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func (ac *AsyncCollection) Insert(queueKey, taskName string, records ...any) {
// Tasks in different queues will be executed in parallel.
func (ac *AsyncCollection) Upsert(queueKey, taskName string, record any, filter M) {
ac.push(queueKey, taskName, "upsert", func(ctx context.Context) error {
return ac.coll.Upsert(ctx, record, filter)
_, err := ac.coll.Upsert(ctx, record, filter)
return err
})
}

Expand Down Expand Up @@ -158,7 +159,8 @@ func (ac *AsyncCollection) UpdateOne(queueKey, taskName string, filter, update M
// Tasks in different queues will be executed in parallel.
func (ac *AsyncCollection) UpdateMany(queueKey, taskName string, filter, update M) {
ac.push(queueKey, taskName, "update_many", func(ctx context.Context) error {
return ac.coll.UpdateMany(ctx, filter, update)
_, err := ac.coll.UpdateMany(ctx, filter, update)
return err
})
}

Expand Down Expand Up @@ -199,7 +201,8 @@ func (ac *AsyncCollection) DeleteOne(queueKey, taskName string, filter M) {
// Tasks in different queues will be executed in parallel.
func (ac *AsyncCollection) DeleteMany(queueKey, taskName string, filter M) {
ac.push(queueKey, taskName, "delete_many", func(ctx context.Context) error {
return ac.coll.DeleteMany(ctx, filter)
_, err := ac.coll.DeleteMany(ctx, filter)
return err
})
}

Expand All @@ -211,7 +214,8 @@ func (ac *AsyncCollection) DeleteMany(queueKey, taskName string, filter M) {
// the whole operation continues.
func (ac *AsyncCollection) BulkWrite(queueKey, taskName string, models []mongo.WriteModel, isOrdered bool) {
ac.push(queueKey, taskName, "bulk_write", func(ctx context.Context) error {
return ac.coll.BulkWrite(ctx, models, isOrdered)
_, err := ac.coll.BulkWrite(ctx, models, isOrdered)
return err
})
}

Expand Down
51 changes: 31 additions & 20 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,25 @@ func (m *Collection) Insert(ctx context.Context, records ...any) (ids []bson.Obj
}

// Upsert replaces a document in the collection or inserts it if it doesn't exist.
// It returns ErrNotFound if no document is updated.
func (m *Collection) Upsert(ctx context.Context, record any, filter M) error {
// It returns ID of the interserted document.
// If existing document is updated (no new inserted), it returns nil ID and nil error.
// If no document is updated, it returns nil ID and ErrNotFound.
func (m *Collection) Upsert(ctx context.Context, record any, filter M) (*bson.ObjectID, error) {
opts := options.Replace().SetUpsert(true)
upd, err := m.coll.ReplaceOne(ctx, filter.Prepare(), record, opts)
if err != nil {
return HandleMongoError(err)
return nil, HandleMongoError(err)
}
if upd != nil && upd.MatchedCount == 0 {
return ErrNotFound
if upd != nil {
if upd.MatchedCount == 0 {
return nil, ErrNotFound
}
if upd.UpsertedID != nil {
id := upd.UpsertedID.(bson.ObjectID)
return &id, nil
}
}
return nil
return nil, nil
}

// ReplaceOne replaces a document in the collection.
Expand Down Expand Up @@ -236,16 +244,17 @@ func (m *Collection) UpdateOne(ctx context.Context, filter, update M) error {
// Update map/document must contain key beginning with '$', e.g. {$set: {key1: value1}}.
// Modifiers operate on fields. For example: {$mod: {<field>: ...}}.
// You can use predefined options from mongox, e.g. mongox.M{mongox.Inc: mongox.M{"number": 1}}.
// It returns number of updated documents.
// It returns ErrNotFound if no document is updated.
func (m *Collection) UpdateMany(ctx context.Context, filter, update M) error {
func (m *Collection) UpdateMany(ctx context.Context, filter, update M) (int, error) {
updateResult, err := m.coll.UpdateMany(ctx, filter.Prepare(), update.Prepare())
if err != nil {
return HandleMongoError(err)
return 0, HandleMongoError(err)
}
if updateResult != nil && updateResult.MatchedCount == 0 {
return ErrNotFound
return 0, ErrNotFound
}
return nil
return int(updateResult.ModifiedCount), nil
}

// UpdateOneFromDiff sets fields in a document in the collection using diff structure.
Expand Down Expand Up @@ -292,34 +301,36 @@ func (m *Collection) DeleteOne(ctx context.Context, filter M) error {
}

// DeleteMany deletes many documents in the collection based on the filter.
// It returns number of deleted documents.
// It returns ErrNotFound if no document is deleted.
func (m *Collection) DeleteMany(ctx context.Context, filter M) error {
func (m *Collection) DeleteMany(ctx context.Context, filter M) (int, error) {
del, err := m.coll.DeleteMany(ctx, filter.Prepare())
if err != nil {
return HandleMongoError(err)
return 0, HandleMongoError(err)
}
if del != nil && del.DeletedCount == 0 {
return ErrNotFound
return 0, ErrNotFound
}
return nil
return int(del.DeletedCount), nil
}

// BulkWrite executes bulk write operations in the collection.
// Use [BulkBuilder] to create models for bulk write operations.
// IsOrdered==true means that all operations are executed in the order they are added to the [BulkBuilder]
// and if any of them fails, the whole operation fails.
// and if any of them fails, the whole operation fails. Error is not returning.
// IsOrdered==false means that all operations are executed in parallel and if any of them fails,
// the whole operation continues.
func (m *Collection) BulkWrite(ctx context.Context, models []mongo.WriteModel, isOrdered bool) error {
// the whole operation continues. Error is not returning.
// It returns ErrNotFound if no document is matched/inserted/updated/deleted.
func (m *Collection) BulkWrite(ctx context.Context, models []mongo.WriteModel, isOrdered bool) (mongo.BulkWriteResult, error) {
opts := options.BulkWrite().SetOrdered(isOrdered)
res, err := m.coll.BulkWrite(ctx, models, opts)
if err != nil {
return HandleMongoError(err)
return mongo.BulkWriteResult{}, HandleMongoError(err)
}
if res != nil && res.MatchedCount+res.DeletedCount+res.InsertedCount+res.ModifiedCount == 0 {
return ErrNotFound
return mongo.BulkWriteResult{}, ErrNotFound
}
return nil
return lang.Deref(res), nil
}

func (m *Collection) find(ctx context.Context, dest any, filter bson.D, rawOpts ...FindOptions) error {
Expand Down
21 changes: 13 additions & 8 deletions generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,18 @@ func Distinct[T any](ctx context.Context, coll *Collection, field string, filter
return result, nil
}

// Insert inserts a document(s) into the collection.
// Insert inserts a document(s) into the collection
// It returns IDs of the inserted documents.
// Internally InsertMany uses bulk write.
func Insert(ctx context.Context, coll *Collection, record ...any) ([]bson.ObjectID, error) {
return coll.Insert(ctx, record...)
}

// Upsert replaces a document in the collection or inserts it if it doesn't exist.
// It returns ErrNotFound if no document is updated.
func Upsert(ctx context.Context, coll *Collection, record any, filter M) error {
// It returns ID of the inserted document.
// If existing document is updated (no new inserted), it returns nil ID and nil error.
// If no document is updated, it returns nil ID and ErrNotFound.
func Upsert(ctx context.Context, coll *Collection, record any, filter M) (*bson.ObjectID, error) {
return coll.Upsert(ctx, record, filter)
}

Expand Down Expand Up @@ -106,8 +108,9 @@ func UpdateOne(ctx context.Context, coll *Collection, filter, update M) error {
// Update map/document must contain key beginning with '$', e.g. {$set: {key1: value1}}.
// Modifiers operate on fields. For example: {$mod: {<field>: ...}}.
// You can use predefined options from mongox, e.g. mongox.M{mongox.Inc: mongox.M{"number": 1}}.
// It returns number of updated documents.
// It returns ErrNotFound if no document is updated.
func UpdateMany(ctx context.Context, coll *Collection, filter, update M) error {
func UpdateMany(ctx context.Context, coll *Collection, filter, update M) (int, error) {
return coll.UpdateMany(ctx, filter, update)
}

Expand Down Expand Up @@ -139,17 +142,19 @@ func DeleteOne(ctx context.Context, coll *Collection, filter M) error {
}

// DeleteMany deletes documents in the collection based on the filter.
// It returns number of deleted documents.
// It returns ErrNotFound if no document is deleted.
func DeleteMany(ctx context.Context, coll *Collection, filter M) error {
func DeleteMany(ctx context.Context, coll *Collection, filter M) (int, error) {
return coll.DeleteMany(ctx, filter)
}

// BulkWrite executes bulk write operations in the collection.
// Use [BulkBuilder] to create models for bulk write operations.
// IsOrdered==true means that all operations are executed in the order they are added to the [BulkBuilder]
// and if any of them fails, the whole operation fails.
// and if any of them fails, the whole operation fails. Error is not returning.
// IsOrdered==false means that all operations are executed in parallel and if any of them fails,
// the whole operation continues.
func BulkWrite(ctx context.Context, coll *Collection, models []mongo.WriteModel, isOrdered bool) error {
// the whole operation continues. Error is not returning.
// It returns ErrNotFound if no document is matched/inserted/updated/deleted.
func BulkWrite(ctx context.Context, coll *Collection, models []mongo.WriteModel, isOrdered bool) (mongo.BulkWriteResult, error) {
return coll.BulkWrite(ctx, models, isOrdered)
}
Loading

0 comments on commit 5088c68

Please sign in to comment.