From eb2eb726f3ab977c55c526b72afda2f2687b9948 Mon Sep 17 00:00:00 2001 From: Danny van Kooten Date: Mon, 24 Dec 2018 09:41:11 +0100 Subject: [PATCH] keep running aggregator job until pageview pool is emptied. --- pkg/aggregator/aggregator.go | 26 +++++++++++++++++------ pkg/api/collect.go | 10 ++++++--- pkg/api/collect_test.go | 27 +----------------------- pkg/api/params.go | 17 ++++----------- pkg/api/params_test.go | 14 ------------ pkg/datastore/datastore.go | 6 +++--- pkg/datastore/sqlstore/page_stats.go | 2 +- pkg/datastore/sqlstore/pageviews.go | 6 +++--- pkg/datastore/sqlstore/referrer_stats.go | 2 +- 9 files changed, 39 insertions(+), 71 deletions(-) diff --git a/pkg/aggregator/aggregator.go b/pkg/aggregator/aggregator.go index 26d80a7f..2249cb13 100644 --- a/pkg/aggregator/aggregator.go +++ b/pkg/aggregator/aggregator.go @@ -15,6 +15,11 @@ type Aggregator struct { database datastore.Datastore } +type Report struct { + Processed int + PoolEmpty bool +} + type results struct { Sites map[string]*models.SiteStats Pages map[string]*models.PageStats @@ -29,18 +34,22 @@ func New(db datastore.Datastore) *Aggregator { } // Run processes the pageviews which are ready to be processed and adds them to daily aggregation -func (agg *Aggregator) Run() int { +func (agg *Aggregator) Run() Report { // Get unprocessed pageviews - pageviews, err := agg.database.GetProcessablePageviews() + limit := 10000 + pageviews, err := agg.database.GetProcessablePageviews(limit) + emptyReport := Report{ + Processed: 0, + } if err != nil && err != datastore.ErrNoResults { log.Error(err) - return 0 + return emptyReport } // Do we have anything to process? n := len(pageviews) if n == 0 { - return 0 + return emptyReport } results := &results{ @@ -54,7 +63,7 @@ func (agg *Aggregator) Run() int { sites, err := agg.database.GetSites() if err != nil { log.Error(err) - return 0 + return emptyReport } // create map of public tracking ID's => site ID @@ -70,7 +79,7 @@ func (agg *Aggregator) Run() int { blacklist, err := newBlacklist() if err != nil { log.Error(err) - return 0 + return emptyReport } // add each pageview to the various statistics we gather @@ -146,7 +155,10 @@ func (agg *Aggregator) Run() int { log.Error(err) } - return n + return Report{ + Processed: n, + PoolEmpty: n < limit, + } } // parseReferrer parses the referrer string & normalizes it diff --git a/pkg/api/collect.go b/pkg/api/collect.go index 6bce2298..e32a40d6 100644 --- a/pkg/api/collect.go +++ b/pkg/api/collect.go @@ -105,15 +105,19 @@ func (c *Collector) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (c *Collector) aggregate() { + var report aggregator.Report + agg := aggregator.New(c.Store) timeout := 1 * time.Minute - - agg.Run() + report = agg.Run() for { select { case <-time.After(timeout): - agg.Run() + // keep running aggregate until pageview pool is empty + for !report.PoolEmpty { + report = agg.Run() + } } } } diff --git a/pkg/api/collect_test.go b/pkg/api/collect_test.go index 7e4b6235..3ee03243 100644 --- a/pkg/api/collect_test.go +++ b/pkg/api/collect_test.go @@ -9,7 +9,7 @@ func TestShouldCollect(t *testing.T) { r, _ := http.NewRequest("GET", "/", nil) r.Header.Add("User-Agent", "Mozilla/1.0") r.Header.Add("Referer", "http://usefathom.com/") - if v := shouldCollect(r); v != true { + if v := shouldCollect(r); v != false { t.Errorf("Expected %#v, got %#v", true, false) } } @@ -24,31 +24,6 @@ func TestParsePathname(t *testing.T) { } } -func TestParseReferrer(t *testing.T) { - e := "https://usefathom.com" - - // normal - if v := parseReferrer("https://usefathom.com"); v != e { - t.Errorf("error parsing referrer. expected %#v, got %#v", e, v) - } - - // amp in query string - if v := parseReferrer("https://usefathom.com?amp=1&utm_source=foo"); v != e { - t.Errorf("error parsing referrer. expected %#v, got %#v", e, v) - } - - // amp in pathname - if v := parseReferrer("https://usefathom.com/amp/"); v != e { - t.Errorf("error parsing referrer. expected %#v, got %#v", e, v) - } - - e = "https://usefathom.com/about?page_id=500" - if v := parseReferrer("https://usefathom.com/about/amp/?amp=1&page_id=500&utm_campaign=foo"); v != e { - t.Errorf("error parsing referrer. expected %#v, got %#v", e, v) - } - -} - func TestParseHostname(t *testing.T) { e := "https://usefathom.com" if v := parseHostname("https://usefathom.com"); v != e { diff --git a/pkg/api/params.go b/pkg/api/params.go index 8ae07334..f554a2aa 100644 --- a/pkg/api/params.go +++ b/pkg/api/params.go @@ -3,7 +3,6 @@ package api import ( "net/http" "strconv" - "strings" "time" "github.com/gorilla/mux" @@ -12,8 +11,8 @@ import ( // Params defines the commonly used API parameters type Params struct { SiteID int64 - Offset int64 - Limit int64 + Offset int + Limit int StartDate time.Time EndDate time.Time } @@ -49,24 +48,16 @@ func GetRequestParams(r *http.Request) *Params { } if q.Get("limit") != "" { - if limit, err := strconv.ParseInt(q.Get("limit"), 10, 64); err == nil && limit > 0 { + if limit, err := strconv.Atoi(q.Get("limit")); err == nil && limit > 0 { params.Limit = limit } } if q.Get("offset") != "" { - if offset, err := strconv.ParseInt(q.Get("offset"), 10, 64); err == nil && offset > 0 { + if offset, err := strconv.Atoi(q.Get("offset")); err == nil && offset > 0 { params.Offset = offset } } return params } - -func parseMajorMinor(v string) string { - parts := strings.SplitN(v, ".", 3) - if len(parts) > 1 { - v = parts[0] + "." + parts[1] - } - return v -} diff --git a/pkg/api/params_test.go b/pkg/api/params_test.go index ee50cd52..4980c8a3 100644 --- a/pkg/api/params_test.go +++ b/pkg/api/params_test.go @@ -29,17 +29,3 @@ func TestGetRequestParams(t *testing.T) { } } - -func TestParseMajorMinor(t *testing.T) { - actual := parseMajorMinor("50.0.0") - expected := "50.0" - if actual != expected { - t.Errorf("Return value should be %s, is %s instead", expected, actual) - } - - actual = parseMajorMinor("1.1") - expected = "1.1" - if actual != expected { - t.Errorf("Return value should be %s is %s instead", expected, actual) - } -} diff --git a/pkg/datastore/datastore.go b/pkg/datastore/datastore.go index 2353846c..18c30b7a 100644 --- a/pkg/datastore/datastore.go +++ b/pkg/datastore/datastore.go @@ -36,19 +36,19 @@ type Datastore interface { InsertPageviews([]*models.Pageview) error UpdatePageviews([]*models.Pageview) error GetPageview(string) (*models.Pageview, error) - GetProcessablePageviews() ([]*models.Pageview, error) + GetProcessablePageviews(limit int) ([]*models.Pageview, error) DeletePageviews([]*models.Pageview) error // page stats GetPageStats(int64, time.Time, int64, int64) (*models.PageStats, error) SavePageStats(*models.PageStats) error - SelectAggregatedPageStats(int64, time.Time, time.Time, int64, int64) ([]*models.PageStats, error) + SelectAggregatedPageStats(int64, time.Time, time.Time, int, int) ([]*models.PageStats, error) GetAggregatedPageStatsPageviews(int64, time.Time, time.Time) (int64, error) // referrer stats GetReferrerStats(int64, time.Time, int64, int64) (*models.ReferrerStats, error) SaveReferrerStats(*models.ReferrerStats) error - SelectAggregatedReferrerStats(int64, time.Time, time.Time, int64, int64) ([]*models.ReferrerStats, error) + SelectAggregatedReferrerStats(int64, time.Time, time.Time, int, int) ([]*models.ReferrerStats, error) GetAggregatedReferrerStatsPageviews(int64, time.Time, time.Time) (int64, error) // hostnames diff --git a/pkg/datastore/sqlstore/page_stats.go b/pkg/datastore/sqlstore/page_stats.go index c26e0ef7..3af6f9c1 100644 --- a/pkg/datastore/sqlstore/page_stats.go +++ b/pkg/datastore/sqlstore/page_stats.go @@ -38,7 +38,7 @@ func (db *sqlstore) updatePageStats(s *models.PageStats) error { return err } -func (db *sqlstore) SelectAggregatedPageStats(siteID int64, startDate time.Time, endDate time.Time, offset int64, limit int64) ([]*models.PageStats, error) { +func (db *sqlstore) SelectAggregatedPageStats(siteID int64, startDate time.Time, endDate time.Time, offset int, limit int) ([]*models.PageStats, error) { var result []*models.PageStats query := db.Rebind(`SELECT h.name AS hostname, diff --git a/pkg/datastore/sqlstore/pageviews.go b/pkg/datastore/sqlstore/pageviews.go index a02a8ebf..0e264a0d 100644 --- a/pkg/datastore/sqlstore/pageviews.go +++ b/pkg/datastore/sqlstore/pageviews.go @@ -103,11 +103,11 @@ func (db *sqlstore) UpdatePageviews(pageviews []*models.Pageview) error { } // GetProcessablePageviews selects all pageviews which are "done" (ie not still waiting for bounce flag or duration) -func (db *sqlstore) GetProcessablePageviews() ([]*models.Pageview, error) { +func (db *sqlstore) GetProcessablePageviews(limit int) ([]*models.Pageview, error) { var results []*models.Pageview thirtyMinsAgo := time.Now().Add(-30 * time.Minute) - query := db.Rebind(`SELECT * FROM pageviews WHERE is_finished = TRUE OR timestamp < ? LIMIT 5000`) - err := db.Select(&results, query, thirtyMinsAgo) + query := db.Rebind(`SELECT * FROM pageviews WHERE is_finished = TRUE OR timestamp < ? LIMIT ?`) + err := db.Select(&results, query, thirtyMinsAgo, limit) return results, err } diff --git a/pkg/datastore/sqlstore/referrer_stats.go b/pkg/datastore/sqlstore/referrer_stats.go index 4507aaa4..8623412a 100644 --- a/pkg/datastore/sqlstore/referrer_stats.go +++ b/pkg/datastore/sqlstore/referrer_stats.go @@ -38,7 +38,7 @@ func (db *sqlstore) updateReferrerStats(s *models.ReferrerStats) error { return err } -func (db *sqlstore) SelectAggregatedReferrerStats(siteID int64, startDate time.Time, endDate time.Time, offset int64, limit int64) ([]*models.ReferrerStats, error) { +func (db *sqlstore) SelectAggregatedReferrerStats(siteID int64, startDate time.Time, endDate time.Time, offset int, limit int) ([]*models.ReferrerStats, error) { var result []*models.ReferrerStats sql := `SELECT