Skip to content

Commit

Permalink
log duration for aggregation func
Browse files Browse the repository at this point in the history
  • Loading branch information
dannyvankooten committed Dec 24, 2018
1 parent eb2eb72 commit 23793bd
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 6 deletions.
14 changes: 11 additions & 3 deletions pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"net/url"
"strings"
"time"

"github.com/usefathom/fathom/pkg/datastore"
"github.com/usefathom/fathom/pkg/models"
Expand All @@ -18,6 +19,7 @@ type Aggregator struct {
type Report struct {
Processed int
PoolEmpty bool
Duration time.Duration
}

type results struct {
Expand All @@ -35,6 +37,8 @@ func New(db datastore.Datastore) *Aggregator {

// Run processes the pageviews which are ready to be processed and adds them to daily aggregation
func (agg *Aggregator) Run() Report {
startTime := time.Now()

// Get unprocessed pageviews
limit := 10000
pageviews, err := agg.database.GetProcessablePageviews(limit)
Expand All @@ -58,8 +62,6 @@ func (agg *Aggregator) Run() Report {
Referrers: map[string]*models.ReferrerStats{},
}

log.Debugf("processing %d pageviews", len(pageviews))

sites, err := agg.database.GetSites()
if err != nil {
log.Error(err)
Expand Down Expand Up @@ -155,10 +157,16 @@ func (agg *Aggregator) Run() Report {
log.Error(err)
}

return Report{
endTime := time.Now()
dur := endTime.Sub(startTime)

report := Report{
Processed: n,
PoolEmpty: n < limit,
Duration: dur,
}
log.Debugf("processed %d pageviews. took: %s, pool empty: %v", report.Processed, report.Duration, report.PoolEmpty)
return report
}

// parseReferrer parses the referrer string & normalizes it
Expand Down
7 changes: 5 additions & 2 deletions pkg/api/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,15 @@ func (c *Collector) aggregate() {

agg := aggregator.New(c.Store)
timeout := 1 * time.Minute
report = agg.Run()
agg.Run()

for {
select {
case <-time.After(timeout):
// keep running aggregate until pageview pool is empty
// run aggregator at least once
report = agg.Run()

// if pool is not empty yet, keep running
for !report.PoolEmpty {
report = agg.Run()
}
Expand Down
1 change: 0 additions & 1 deletion pkg/cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ func server(c *cli.Context) error {
// set debug log level if --debug was passed
if c.Bool("debug") {
log.SetLevel(log.DebugLevel)
h = handlers.LoggingHandler(log.StandardLogger().Writer(), h)
} else {
log.SetLevel(log.WarnLevel)
}
Expand Down

0 comments on commit 23793bd

Please sign in to comment.