Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

Commit

Permalink
measured rep transaction type (#200)
Browse files Browse the repository at this point in the history
* measured rep transaction type

* updated CHANGELOG.md

* fix prom cardinality issue
  • Loading branch information
ortuman authored Jan 24, 2022
1 parent 50dfbc1 commit 92c2042
Show file tree
Hide file tree
Showing 12 changed files with 104 additions and 51 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
## jackal - main / unreleased

* [ENHANCEMENT] Added memory ballast. #198
* [BUGFIX] Fix S2S db key check when nop KV is used. #199
* [CHANGE] Introduced measured repository transaction type. #200
* [BUGFIX] Fix S2S db key check when nop KV is used. #199
11 changes: 6 additions & 5 deletions pkg/storage/measured/blocklist.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,34 @@ import (
)

type measuredBlockListRep struct {
rep repository.BlockList
rep repository.BlockList
inTx bool
}

func (m *measuredBlockListRep) UpsertBlockListItem(ctx context.Context, item *blocklistmodel.Item) (err error) {
t0 := time.Now()
err = m.rep.UpsertBlockListItem(ctx, item)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}

func (m *measuredBlockListRep) DeleteBlockListItem(ctx context.Context, item *blocklistmodel.Item) (err error) {
t0 := time.Now()
err = m.rep.DeleteBlockListItem(ctx, item)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}

func (m *measuredBlockListRep) FetchBlockListItems(ctx context.Context, username string) (blockList []*blocklistmodel.Item, err error) {
t0 := time.Now()
blockList, err = m.rep.FetchBlockListItems(ctx, username)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}

func (m *measuredBlockListRep) DeleteBlockListItems(ctx context.Context, username string) (err error) {
t0 := time.Now()
err = m.rep.DeleteBlockListItems(ctx, username)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}
9 changes: 5 additions & 4 deletions pkg/storage/measured/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,27 @@ import (
)

type measuredCapabilitiesRep struct {
rep repository.Capabilities
rep repository.Capabilities
inTx bool
}

func (m *measuredCapabilitiesRep) UpsertCapabilities(ctx context.Context, caps *capsmodel.Capabilities) (err error) {
t0 := time.Now()
err = m.rep.UpsertCapabilities(ctx, caps)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}

func (m *measuredCapabilitiesRep) CapabilitiesExist(ctx context.Context, node, ver string) (ok bool, err error) {
t0 := time.Now()
ok, err = m.rep.CapabilitiesExist(ctx, node, ver)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}

func (m *measuredCapabilitiesRep) FetchCapabilities(ctx context.Context, node, ver string) (caps *capsmodel.Capabilities, err error) {
t0 := time.Now()
caps, err = m.rep.FetchCapabilities(ctx, node, ver)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}
9 changes: 5 additions & 4 deletions pkg/storage/measured/last.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,27 @@ import (
)

type measuredLastRep struct {
rep repository.Last
rep repository.Last
inTx bool
}

func (m *measuredLastRep) UpsertLast(ctx context.Context, last *lastmodel.Last) error {
t0 := time.Now()
err := m.rep.UpsertLast(ctx, last)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}

func (m *measuredLastRep) FetchLast(ctx context.Context, username string) (last *lastmodel.Last, err error) {
t0 := time.Now()
last, err = m.rep.FetchLast(ctx, username)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}

func (m *measuredLastRep) DeleteLast(ctx context.Context, username string) error {
t0 := time.Now()
err := m.rep.DeleteLast(ctx, username)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}
7 changes: 5 additions & 2 deletions pkg/storage/measured/measured.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ func New(rep repository.Repository) repository.Repository {
// InTransaction generates a repository transaction and completes it after it's being used by f function.
// In case f returns no error tx transaction will be committed.
func (m *Measured) InTransaction(ctx context.Context, f func(ctx context.Context, tx repository.Transaction) error) error {
return m.rep.InTransaction(ctx, f)
return m.rep.InTransaction(ctx, func(ctx context.Context, tx repository.Transaction) error {
return f(ctx, newMeasuredTx(tx))
})
}

// Start initializes repository.
Expand All @@ -73,11 +75,12 @@ func (m *Measured) Stop(ctx context.Context) error {
return m.rep.Stop(ctx)
}

func reportOpMetric(opType string, durationInSecs float64, success bool) {
func reportOpMetric(opType string, durationInSecs float64, success bool, inTx bool) {
metricLabel := prometheus.Labels{
"instance": instance.ID(),
"type": opType,
"success": strconv.FormatBool(success),
"tx": strconv.FormatBool(inTx),
}
repOperations.With(metricLabel).Inc()
repOperationDurationBucket.With(metricLabel).Observe(durationInSecs)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/measured/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (
Name: "operations_total",
Help: "The total number of repository operations.",
},
[]string{"instance", "type", "success"},
[]string{"instance", "type", "success", "tx"},
)
repOperationDurationBucket = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand All @@ -34,7 +34,7 @@ var (
Help: "Bucketed histogram of repository operation duration.",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 24),
},
[]string{"instance", "type", "success"},
[]string{"instance", "type", "success", "tx"},
)
)

Expand Down
11 changes: 6 additions & 5 deletions pkg/storage/measured/offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,34 @@ import (
)

type measuredOfflineRep struct {
rep repository.Offline
rep repository.Offline
inTx bool
}

func (m *measuredOfflineRep) InsertOfflineMessage(ctx context.Context, message *stravaganza.Message, username string) error {
t0 := time.Now()
err := m.rep.InsertOfflineMessage(ctx, message, username)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}

func (m *measuredOfflineRep) CountOfflineMessages(ctx context.Context, username string) (int, error) {
t0 := time.Now()
count, err := m.rep.CountOfflineMessages(ctx, username)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return count, err
}

func (m *measuredOfflineRep) FetchOfflineMessages(ctx context.Context, username string) ([]*stravaganza.Message, error) {
t0 := time.Now()
ms, err := m.rep.FetchOfflineMessages(ctx, username)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return ms, err
}

func (m *measuredOfflineRep) DeleteOfflineMessages(ctx context.Context, username string) error {
t0 := time.Now()
err := m.rep.DeleteOfflineMessages(ctx, username)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}
9 changes: 5 additions & 4 deletions pkg/storage/measured/private.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,27 @@ import (
)

type measuredPrivateRep struct {
rep repository.Private
rep repository.Private
inTx bool
}

func (m *measuredPrivateRep) FetchPrivate(ctx context.Context, namespace, username string) (private stravaganza.Element, err error) {
t0 := time.Now()
private, err = m.rep.FetchPrivate(ctx, namespace, username)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}

func (m *measuredPrivateRep) UpsertPrivate(ctx context.Context, private stravaganza.Element, namespace, username string) (err error) {
t0 := time.Now()
err = m.rep.UpsertPrivate(ctx, private, namespace, username)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}

func (m *measuredPrivateRep) DeletePrivates(ctx context.Context, username string) (err error) {
t0 := time.Now()
err = m.rep.DeletePrivates(ctx, username)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}
31 changes: 16 additions & 15 deletions pkg/storage/measured/roster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,103 +23,104 @@ import (
)

type measuredRosterRep struct {
rep repository.Roster
rep repository.Roster
inTx bool
}

func (m *measuredRosterRep) TouchRosterVersion(ctx context.Context, username string) (int, error) {
t0 := time.Now()
ver, err := m.rep.TouchRosterVersion(ctx, username)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return ver, err
}

func (m *measuredRosterRep) FetchRosterVersion(ctx context.Context, username string) (int, error) {
t0 := time.Now()
ver, err := m.rep.FetchRosterVersion(ctx, username)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return ver, err
}

func (m *measuredRosterRep) UpsertRosterItem(ctx context.Context, ri *rostermodel.Item) error {
t0 := time.Now()
err := m.rep.UpsertRosterItem(ctx, ri)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}

func (m *measuredRosterRep) DeleteRosterItem(ctx context.Context, username, jid string) error {
t0 := time.Now()
err := m.rep.DeleteRosterItem(ctx, username, jid)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}

func (m *measuredRosterRep) DeleteRosterItems(ctx context.Context, username string) error {
t0 := time.Now()
err := m.rep.DeleteRosterItems(ctx, username)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}

func (m *measuredRosterRep) FetchRosterItems(ctx context.Context, username string) ([]*rostermodel.Item, error) {
t0 := time.Now()
items, err := m.rep.FetchRosterItems(ctx, username)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return items, err
}

func (m *measuredRosterRep) FetchRosterItemsInGroups(ctx context.Context, username string, groups []string) ([]*rostermodel.Item, error) {
t0 := time.Now()
items, err := m.rep.FetchRosterItemsInGroups(ctx, username, groups)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return items, err
}

func (m *measuredRosterRep) FetchRosterItem(ctx context.Context, username, jid string) (*rostermodel.Item, error) {
t0 := time.Now()
itm, err := m.rep.FetchRosterItem(ctx, username, jid)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return itm, err
}

func (m *measuredRosterRep) UpsertRosterNotification(ctx context.Context, rn *rostermodel.Notification) error {
t0 := time.Now()
err := m.rep.UpsertRosterNotification(ctx, rn)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}

func (m *measuredRosterRep) DeleteRosterNotification(ctx context.Context, contact, jid string) error {
t0 := time.Now()
err := m.rep.DeleteRosterNotification(ctx, contact, jid)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}

func (m *measuredRosterRep) DeleteRosterNotifications(ctx context.Context, contact string) error {
t0 := time.Now()
err := m.rep.DeleteRosterNotifications(ctx, contact)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}

func (m *measuredRosterRep) FetchRosterNotification(ctx context.Context, contact string, jid string) (*rostermodel.Notification, error) {
t0 := time.Now()
rn, err := m.rep.FetchRosterNotification(ctx, contact, jid)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return rn, err
}

func (m *measuredRosterRep) FetchRosterNotifications(ctx context.Context, contact string) ([]*rostermodel.Notification, error) {
t0 := time.Now()
rns, err := m.rep.FetchRosterNotifications(ctx, contact)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return rns, err
}

func (m *measuredRosterRep) FetchRosterGroups(ctx context.Context, username string) ([]string, error) {
t0 := time.Now()
groups, err := m.rep.FetchRosterGroups(ctx, username)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return groups, err
}
41 changes: 41 additions & 0 deletions pkg/storage/measured/tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2021 The jackal Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package measuredrepository

import "github.com/ortuman/jackal/pkg/storage/repository"

type measuredTx struct {
repository.User
repository.Last
repository.Capabilities
repository.Offline
repository.BlockList
repository.Private
repository.Roster
repository.VCard
}

func newMeasuredTx(tx repository.Transaction) *measuredTx {
return &measuredTx{
User: &measuredUserRep{rep: tx, inTx: true},
Last: &measuredLastRep{rep: tx, inTx: true},
Capabilities: &measuredCapabilitiesRep{rep: tx, inTx: true},
Offline: &measuredOfflineRep{rep: tx, inTx: true},
BlockList: &measuredBlockListRep{rep: tx, inTx: true},
Private: &measuredPrivateRep{rep: tx, inTx: true},
Roster: &measuredRosterRep{rep: tx, inTx: true},
VCard: &measuredVCardRep{rep: tx, inTx: true},
}
}
Loading

0 comments on commit 92c2042

Please sign in to comment.