Skip to content

Commit

Permalink
keep running aggregator job until pageview pool is emptied.
Browse files Browse the repository at this point in the history
  • Loading branch information
dannyvankooten committed Dec 24, 2018
1 parent 634baac commit eb2eb72
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 71 deletions.
26 changes: 19 additions & 7 deletions pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ type Aggregator struct {
database datastore.Datastore
}

type Report struct {
Processed int
PoolEmpty bool
}

type results struct {
Sites map[string]*models.SiteStats
Pages map[string]*models.PageStats
Expand All @@ -29,18 +34,22 @@ func New(db datastore.Datastore) *Aggregator {
}

// Run processes the pageviews which are ready to be processed and adds them to daily aggregation
func (agg *Aggregator) Run() int {
func (agg *Aggregator) Run() Report {
// Get unprocessed pageviews
pageviews, err := agg.database.GetProcessablePageviews()
limit := 10000
pageviews, err := agg.database.GetProcessablePageviews(limit)
emptyReport := Report{
Processed: 0,
}
if err != nil && err != datastore.ErrNoResults {
log.Error(err)
return 0
return emptyReport
}

// Do we have anything to process?
n := len(pageviews)
if n == 0 {
return 0
return emptyReport
}

results := &results{
Expand All @@ -54,7 +63,7 @@ func (agg *Aggregator) Run() int {
sites, err := agg.database.GetSites()
if err != nil {
log.Error(err)
return 0
return emptyReport
}

// create map of public tracking ID's => site ID
Expand All @@ -70,7 +79,7 @@ func (agg *Aggregator) Run() int {
blacklist, err := newBlacklist()
if err != nil {
log.Error(err)
return 0
return emptyReport
}

// add each pageview to the various statistics we gather
Expand Down Expand Up @@ -146,7 +155,10 @@ func (agg *Aggregator) Run() int {
log.Error(err)
}

return n
return Report{
Processed: n,
PoolEmpty: n < limit,
}
}

// parseReferrer parses the referrer string & normalizes it
Expand Down
10 changes: 7 additions & 3 deletions pkg/api/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,19 @@ func (c *Collector) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

func (c *Collector) aggregate() {
var report aggregator.Report

agg := aggregator.New(c.Store)
timeout := 1 * time.Minute

agg.Run()
report = agg.Run()

for {
select {
case <-time.After(timeout):
agg.Run()
// keep running aggregate until pageview pool is empty
for !report.PoolEmpty {
report = agg.Run()
}
}
}
}
Expand Down
27 changes: 1 addition & 26 deletions pkg/api/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ func TestShouldCollect(t *testing.T) {
r, _ := http.NewRequest("GET", "/", nil)
r.Header.Add("User-Agent", "Mozilla/1.0")
r.Header.Add("Referer", "http://usefathom.com/")
if v := shouldCollect(r); v != true {
if v := shouldCollect(r); v != false {
t.Errorf("Expected %#v, got %#v", true, false)
}
}
Expand All @@ -24,31 +24,6 @@ func TestParsePathname(t *testing.T) {
}
}

func TestParseReferrer(t *testing.T) {
e := "https://usefathom.com"

// normal
if v := parseReferrer("https://usefathom.com"); v != e {
t.Errorf("error parsing referrer. expected %#v, got %#v", e, v)
}

// amp in query string
if v := parseReferrer("https://usefathom.com?amp=1&utm_source=foo"); v != e {
t.Errorf("error parsing referrer. expected %#v, got %#v", e, v)
}

// amp in pathname
if v := parseReferrer("https://usefathom.com/amp/"); v != e {
t.Errorf("error parsing referrer. expected %#v, got %#v", e, v)
}

e = "https://usefathom.com/about?page_id=500"
if v := parseReferrer("https://usefathom.com/about/amp/?amp=1&page_id=500&utm_campaign=foo"); v != e {
t.Errorf("error parsing referrer. expected %#v, got %#v", e, v)
}

}

func TestParseHostname(t *testing.T) {
e := "https://usefathom.com"
if v := parseHostname("https://usefathom.com"); v != e {
Expand Down
17 changes: 4 additions & 13 deletions pkg/api/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package api
import (
"net/http"
"strconv"
"strings"
"time"

"github.com/gorilla/mux"
Expand All @@ -12,8 +11,8 @@ import (
// Params defines the commonly used API parameters
type Params struct {
SiteID int64
Offset int64
Limit int64
Offset int
Limit int
StartDate time.Time
EndDate time.Time
}
Expand Down Expand Up @@ -49,24 +48,16 @@ func GetRequestParams(r *http.Request) *Params {
}

if q.Get("limit") != "" {
if limit, err := strconv.ParseInt(q.Get("limit"), 10, 64); err == nil && limit > 0 {
if limit, err := strconv.Atoi(q.Get("limit")); err == nil && limit > 0 {
params.Limit = limit
}
}

if q.Get("offset") != "" {
if offset, err := strconv.ParseInt(q.Get("offset"), 10, 64); err == nil && offset > 0 {
if offset, err := strconv.Atoi(q.Get("offset")); err == nil && offset > 0 {
params.Offset = offset
}
}

return params
}

func parseMajorMinor(v string) string {
parts := strings.SplitN(v, ".", 3)
if len(parts) > 1 {
v = parts[0] + "." + parts[1]
}
return v
}
14 changes: 0 additions & 14 deletions pkg/api/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,3 @@ func TestGetRequestParams(t *testing.T) {
}

}

func TestParseMajorMinor(t *testing.T) {
actual := parseMajorMinor("50.0.0")
expected := "50.0"
if actual != expected {
t.Errorf("Return value should be %s, is %s instead", expected, actual)
}

actual = parseMajorMinor("1.1")
expected = "1.1"
if actual != expected {
t.Errorf("Return value should be %s is %s instead", expected, actual)
}
}
6 changes: 3 additions & 3 deletions pkg/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ type Datastore interface {
InsertPageviews([]*models.Pageview) error
UpdatePageviews([]*models.Pageview) error
GetPageview(string) (*models.Pageview, error)
GetProcessablePageviews() ([]*models.Pageview, error)
GetProcessablePageviews(limit int) ([]*models.Pageview, error)
DeletePageviews([]*models.Pageview) error

// page stats
GetPageStats(int64, time.Time, int64, int64) (*models.PageStats, error)
SavePageStats(*models.PageStats) error
SelectAggregatedPageStats(int64, time.Time, time.Time, int64, int64) ([]*models.PageStats, error)
SelectAggregatedPageStats(int64, time.Time, time.Time, int, int) ([]*models.PageStats, error)
GetAggregatedPageStatsPageviews(int64, time.Time, time.Time) (int64, error)

// referrer stats
GetReferrerStats(int64, time.Time, int64, int64) (*models.ReferrerStats, error)
SaveReferrerStats(*models.ReferrerStats) error
SelectAggregatedReferrerStats(int64, time.Time, time.Time, int64, int64) ([]*models.ReferrerStats, error)
SelectAggregatedReferrerStats(int64, time.Time, time.Time, int, int) ([]*models.ReferrerStats, error)
GetAggregatedReferrerStatsPageviews(int64, time.Time, time.Time) (int64, error)

// hostnames
Expand Down
2 changes: 1 addition & 1 deletion pkg/datastore/sqlstore/page_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (db *sqlstore) updatePageStats(s *models.PageStats) error {
return err
}

func (db *sqlstore) SelectAggregatedPageStats(siteID int64, startDate time.Time, endDate time.Time, offset int64, limit int64) ([]*models.PageStats, error) {
func (db *sqlstore) SelectAggregatedPageStats(siteID int64, startDate time.Time, endDate time.Time, offset int, limit int) ([]*models.PageStats, error) {
var result []*models.PageStats
query := db.Rebind(`SELECT
h.name AS hostname,
Expand Down
6 changes: 3 additions & 3 deletions pkg/datastore/sqlstore/pageviews.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ func (db *sqlstore) UpdatePageviews(pageviews []*models.Pageview) error {
}

// GetProcessablePageviews selects all pageviews which are "done" (ie not still waiting for bounce flag or duration)
func (db *sqlstore) GetProcessablePageviews() ([]*models.Pageview, error) {
func (db *sqlstore) GetProcessablePageviews(limit int) ([]*models.Pageview, error) {
var results []*models.Pageview
thirtyMinsAgo := time.Now().Add(-30 * time.Minute)
query := db.Rebind(`SELECT * FROM pageviews WHERE is_finished = TRUE OR timestamp < ? LIMIT 5000`)
err := db.Select(&results, query, thirtyMinsAgo)
query := db.Rebind(`SELECT * FROM pageviews WHERE is_finished = TRUE OR timestamp < ? LIMIT ?`)
err := db.Select(&results, query, thirtyMinsAgo, limit)
return results, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/datastore/sqlstore/referrer_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (db *sqlstore) updateReferrerStats(s *models.ReferrerStats) error {
return err
}

func (db *sqlstore) SelectAggregatedReferrerStats(siteID int64, startDate time.Time, endDate time.Time, offset int64, limit int64) ([]*models.ReferrerStats, error) {
func (db *sqlstore) SelectAggregatedReferrerStats(siteID int64, startDate time.Time, endDate time.Time, offset int, limit int) ([]*models.ReferrerStats, error) {
var result []*models.ReferrerStats

sql := `SELECT
Expand Down

0 comments on commit eb2eb72

Please sign in to comment.