Skip to content

Commit

Permalink
datastore: Remove all timeouts
Browse files Browse the repository at this point in the history
Remove the arbitrary timeouts in the datastore package and rely on the
passed context to timeout.

Signed-off-by: crozzy <[email protected]>
  • Loading branch information
crozzy committed Jun 21, 2023
1 parent 96e5ea4 commit bfaf2af
Show file tree
Hide file tree
Showing 15 changed files with 35 additions and 106 deletions.
17 changes: 5 additions & 12 deletions datastore/postgres/indexdistributions.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/quay/claircore"
"github.com/quay/claircore/indexer"
"github.com/quay/claircore/pkg/microbatch"
Expand Down Expand Up @@ -79,23 +80,17 @@ func (s *IndexerStore) IndexDistributions(ctx context.Context, dists []*claircor
)

// obtain a transaction scoped batch
tctx, done := context.WithTimeout(ctx, 5*time.Second)
tx, err := s.pool.Begin(tctx)
done()
tx, err := s.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("store:indexDistributions failed to create transaction: %v", err)
}
defer tx.Rollback(ctx)

tctx, done = context.WithTimeout(ctx, 5*time.Second)
insertDistStmt, err := tx.Prepare(tctx, "insertDistStmt", insert)
done()
insertDistStmt, err := tx.Prepare(ctx, "insertDistStmt", insert)
if err != nil {
return fmt.Errorf("failed to create statement: %w", err)
}
tctx, done = context.WithTimeout(ctx, 5*time.Second)
insertDistScanArtifactWithStmt, err := tx.Prepare(tctx, "insertDistScanArtifactWith", insertWith)
done()
insertDistScanArtifactWithStmt, err := tx.Prepare(ctx, "insertDistScanArtifactWith", insertWith)
if err != nil {
return fmt.Errorf("failed to create statement: %w", err)
}
Expand Down Expand Up @@ -157,9 +152,7 @@ func (s *IndexerStore) IndexDistributions(ctx context.Context, dists []*claircor
indexDistributionsCounter.WithLabelValues("insertWith_batch").Add(1)
indexDistributionsDuration.WithLabelValues("insertWith_batch").Observe(time.Since(start).Seconds())

tctx, done = context.WithTimeout(ctx, 5*time.Second)
err = tx.Commit(tctx)
done()
err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("store:indexDistributions failed to commit tx: %w", err)
}
Expand Down
12 changes: 3 additions & 9 deletions datastore/postgres/indexmanifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,13 @@ func (s *IndexerStore) IndexManifest(ctx context.Context, ir *claircore.IndexRep
}

// obtain a transaction scoped batch
tctx, done := context.WithTimeout(ctx, 5*time.Second)
tx, err := s.pool.Begin(tctx)
done()
tx, err := s.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("postgres: indexManifest failed to create transaction: %w", err)
}
defer tx.Rollback(ctx)

tctx, done = context.WithTimeout(ctx, 5*time.Second)
queryStmt, err := tx.Prepare(tctx, "queryStmt", query)
done()
queryStmt, err := tx.Prepare(ctx, "queryStmt", query)
if err != nil {
return fmt.Errorf("failed to create statement: %w", err)
}
Expand Down Expand Up @@ -127,9 +123,7 @@ func (s *IndexerStore) IndexManifest(ctx context.Context, ir *claircore.IndexRep
indexManifestCounter.WithLabelValues("query_batch").Add(1)
indexManifestDuration.WithLabelValues("query_batch").Observe(time.Since(start).Seconds())

tctx, done = context.WithTimeout(ctx, 15*time.Second)
err = tx.Commit(tctx)
done()
err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("failed to commit tx: %w", err)
}
Expand Down
16 changes: 4 additions & 12 deletions datastore/postgres/indexpackage.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,17 @@ func (s *IndexerStore) IndexPackages(ctx context.Context, pkgs []*claircore.Pack

ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/indexPackages")
// obtain a transaction scoped batch
tctx, done := context.WithTimeout(ctx, 5*time.Second)
tx, err := s.pool.Begin(tctx)
done()
tx, err := s.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("store:indexPackage failed to create transaction: %w", err)
}
defer tx.Rollback(ctx)

tctx, done = context.WithTimeout(ctx, 5*time.Second)
insertPackageStmt, err := tx.Prepare(tctx, "insertPackageStmt", insert)
done()
insertPackageStmt, err := tx.Prepare(ctx, "insertPackageStmt", insert)
if err != nil {
return fmt.Errorf("failed to create statement: %w", err)
}
tctx, done = context.WithTimeout(ctx, 5*time.Second)
insertPackageScanArtifactWithStmt, err := tx.Prepare(tctx, "insertPackageScanArtifactWith", insertWith)
done()
insertPackageScanArtifactWithStmt, err := tx.Prepare(ctx, "insertPackageScanArtifactWith", insertWith)
if err != nil {
return fmt.Errorf("failed to create statement: %w", err)
}
Expand Down Expand Up @@ -198,9 +192,7 @@ func (s *IndexerStore) IndexPackages(ctx context.Context, pkgs []*claircore.Pack
Int("inserted", len(pkgs)-skipCt).
Msg("scanartifacts inserted")

tctx, done = context.WithTimeout(ctx, 5*time.Second)
err = tx.Commit(tctx)
done()
err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("store:indexPackages failed to commit tx: %w", err)
}
Expand Down
2 changes: 0 additions & 2 deletions datastore/postgres/indexreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ func (s *IndexerStore) IndexReport(ctx context.Context, hash claircore.Digest) (
// then type convert back to scanner.domain object
var jsr jsonbIndexReport

ctx, done := context.WithTimeout(ctx, 5*time.Second)
defer done()
start := time.Now()
err := s.pool.QueryRow(ctx, query, hash).Scan(&jsr)
switch {
Expand Down
17 changes: 5 additions & 12 deletions datastore/postgres/indexrepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/quay/claircore"
"github.com/quay/claircore/indexer"
"github.com/quay/claircore/pkg/microbatch"
Expand Down Expand Up @@ -72,23 +73,17 @@ func (s *IndexerStore) IndexRepositories(ctx context.Context, repos []*claircore
`
)
// obtain a transaction scoped batch
tctx, done := context.WithTimeout(ctx, 5*time.Second)
tx, err := s.pool.Begin(tctx)
done()
tx, err := s.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("store:indexRepositories failed to create transaction: %w", err)
}
defer tx.Rollback(ctx)

tctx, done = context.WithTimeout(ctx, 5*time.Second)
insertRepoStmt, err := tx.Prepare(tctx, "insertRepoStmt", insert)
done()
insertRepoStmt, err := tx.Prepare(ctx, "insertRepoStmt", insert)
if err != nil {
return fmt.Errorf("failed to create insert repo statement: %w", err)
}
tctx, done = context.WithTimeout(ctx, 5*time.Second)
insertRepoScanArtifactWithStmt, err := tx.Prepare(tctx, "insertRepoScanArtifactWith", insertWith)
done()
insertRepoScanArtifactWithStmt, err := tx.Prepare(ctx, "insertRepoScanArtifactWith", insertWith)
if err != nil {
return fmt.Errorf("failed to create insert repo scanartifact statement: %w", err)
}
Expand Down Expand Up @@ -142,9 +137,7 @@ func (s *IndexerStore) IndexRepositories(ctx context.Context, repos []*claircore
indexRepositoriesCounter.WithLabelValues("insertWith_batch").Add(1)
indexRepositoriesDuration.WithLabelValues("insertWith_batch").Observe(time.Since(start).Seconds())

tctx, done = context.WithTimeout(ctx, 15*time.Second)
err = tx.Commit(tctx)
done()
err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("store:indexRepositories failed to commit tx: %w", err)
}
Expand Down
2 changes: 0 additions & 2 deletions datastore/postgres/layerscanned.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ SELECT
return false, err
}

ctx, done := context.WithTimeout(ctx, 10*time.Second)
defer done()
var ok bool

start := time.Now()
Expand Down
2 changes: 0 additions & 2 deletions datastore/postgres/manifestscanned.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ func (s *IndexerStore) ManifestScanned(ctx context.Context, hash claircore.Diges
// get a map of the found ids which have scanned this package
foundIDs := map[int64]struct{}{}

ctx, done := context.WithTimeout(ctx, 10*time.Second)
defer done()
start := time.Now()
rows, err := s.pool.Query(ctx, selectScanned, hash)
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions datastore/postgres/packagesbylayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ WHERE
return nil, err
}

ctx, done := context.WithTimeout(ctx, 15*time.Second)
defer done()
start := time.Now()
rows, err := s.pool.Query(ctx, query, hash, scannerIDs)
switch {
Expand Down
20 changes: 5 additions & 15 deletions datastore/postgres/persistmanifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,49 +65,39 @@ func (s *IndexerStore) PersistManifest(ctx context.Context, manifest claircore.M
`
)

tctx, done := context.WithTimeout(ctx, 5*time.Second)
tx, err := s.pool.Begin(tctx)
done()
tx, err := s.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to create transaction: %w", err)
}
defer tx.Rollback(ctx)

tctx, done = context.WithTimeout(ctx, 5*time.Second)
start := time.Now()
_, err = tx.Exec(tctx, insertManifest, manifest.Hash)
done()
_, err = tx.Exec(ctx, insertManifest, manifest.Hash)
if err != nil {
return fmt.Errorf("failed to insert manifest: %w", err)
}
persistManifestCounter.WithLabelValues("insertManifest").Add(1)
persistManifestDuration.WithLabelValues("insertManifest").Observe(time.Since(start).Seconds())

for i, layer := range manifest.Layers {
tctx, done = context.WithTimeout(ctx, 5*time.Second)
start := time.Now()
_, err = tx.Exec(tctx, insertLayer, layer.Hash)
done()
_, err = tx.Exec(ctx, insertLayer, layer.Hash)
if err != nil {
return fmt.Errorf("failed to insert layer: %w", err)
}
persistManifestCounter.WithLabelValues("insertLayer").Add(1)
persistManifestDuration.WithLabelValues("insertLayer").Observe(time.Since(start).Seconds())

tctx, done = context.WithTimeout(ctx, 5*time.Second)
start = time.Now()
_, err = tx.Exec(tctx, insertManifestLayer, manifest.Hash, layer.Hash, i)
done()
_, err = tx.Exec(ctx, insertManifestLayer, manifest.Hash, layer.Hash, i)
if err != nil {
return fmt.Errorf("failed to insert manifest -> layer link: %w", err)
}
persistManifestCounter.WithLabelValues("insertManifestLayer").Add(1)
persistManifestDuration.WithLabelValues("insertManifestLayer").Observe(time.Since(start).Seconds())
}

tctx, done = context.WithTimeout(ctx, 15*time.Second)
err = tx.Commit(tctx)
done()
err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("failed to commit tx: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion datastore/postgres/querybuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func buildGetQuery(record *claircore.IndexRecord, opts *datastore.GetOpts) (stri
"repo_uri",
"fixed_in_version",
"updater",
).From("latest_vuln").Where(exps...)
).From("vuln").Where(exps...)

sql, _, err := query.ToSQL()
if err != nil {
Expand Down
10 changes: 2 additions & 8 deletions datastore/postgres/registerscanners.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,10 @@ SELECT

var ok bool
var err error
var tctx context.Context
var done context.CancelFunc
for _, v := range vs {
tctx, done = context.WithTimeout(ctx, time.Second)
start := time.Now()
err = s.pool.QueryRow(tctx, exists, v.Name(), v.Version(), v.Kind()).
err = s.pool.QueryRow(ctx, exists, v.Name(), v.Version(), v.Kind()).
Scan(&ok)
done()
if err != nil {
return fmt.Errorf("failed getting id for scanner %q: %w", v.Name(), err)
}
Expand All @@ -78,10 +74,8 @@ SELECT
continue
}

tctx, done = context.WithTimeout(ctx, time.Second)
start = time.Now()
_, err = s.pool.Exec(tctx, insert, v.Name(), v.Version(), v.Kind())
done()
_, err = s.pool.Exec(ctx, insert, v.Name(), v.Version(), v.Kind())
if err != nil {
return fmt.Errorf("failed to insert scanner %q: %w", v.Name(), err)
}
Expand Down
2 changes: 0 additions & 2 deletions datastore/postgres/repositoriesbylayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ WHERE
return nil, fmt.Errorf("unable to select scanners: %w", err)
}

ctx, done := context.WithTimeout(ctx, 15*time.Second)
defer done()
start := time.Now()
rows, err := s.pool.Query(ctx, query, hash, scannerIDs)
switch {
Expand Down
16 changes: 4 additions & 12 deletions datastore/postgres/setindexfinished.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,16 @@ DO
return fmt.Errorf("failed to select package scanner id: %w", err)
}

tctx, done := context.WithTimeout(ctx, 5*time.Second)
tx, err := s.pool.Begin(tctx)
done()
tx, err := s.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to create transaction: %w", err)
}
defer tx.Rollback(ctx)

// link extracted scanner IDs with incoming manifest
for _, id := range scannerIDs {
tctx, done := context.WithTimeout(ctx, 5*time.Second)
start := time.Now()
_, err := tx.Exec(tctx, insertManifestScanned, ir.Hash, id)
done()
_, err := tx.Exec(ctx, insertManifestScanned, ir.Hash, id)
if err != nil {
return fmt.Errorf("failed to link manifest with scanner list: %w", err)
}
Expand All @@ -106,19 +102,15 @@ DO
// we cast claircore.IndexReport to jsonbIndexReport in order to obtain the value/scan
// implementations

tctx, done = context.WithTimeout(ctx, 5*time.Second)
start := time.Now()
_, err = tx.Exec(tctx, upsertIndexReport, ir.Hash, jsonbIndexReport(*ir))
done()
_, err = tx.Exec(ctx, upsertIndexReport, ir.Hash, jsonbIndexReport(*ir))
if err != nil {
return fmt.Errorf("failed to upsert scan result: %w", err)
}
setIndexedFinishedCounter.WithLabelValues("upsertIndexReport").Add(1)
setIndexedFinishedDuration.WithLabelValues("upsertIndexReport").Observe(time.Since(start).Seconds())

tctx, done = context.WithTimeout(ctx, 15*time.Second)
err = tx.Commit(tctx)
done()
err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
Expand Down
2 changes: 0 additions & 2 deletions datastore/postgres/setindexreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ DO
// we cast scanner.IndexReport to jsonbIndexReport in order to obtain the value/scan
// implementations

ctx, done := context.WithTimeout(ctx, 30*time.Second)
defer done()
start := time.Now()
_, err := s.pool.Exec(ctx, query, ir.Hash, jsonbIndexReport(*ir))
if err != nil {
Expand Down
19 changes: 6 additions & 13 deletions datastore/postgres/setlayerscanned.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,34 +39,27 @@ func (s *IndexerStore) SetLayerScanned(ctx context.Context, hash claircore.Diges
ctx = zlog.ContextWithValues(ctx, "scanner", vs.Name())
const query = `
WITH
scanner
AS (
SELECT
id
FROM
scanner
WHERE
name = $2 AND version = $3 AND kind = $4
),
layer AS (SELECT id FROM layer WHERE hash = $1)
INSERT
INTO
scanned_layer (layer_id, scanner_id)
VALUES
(
(SELECT id AS layer_id FROM layer),
(SELECT id AS scanner_id FROM scanner)
$2
)
ON CONFLICT
(layer_id, scanner_id)
DO
NOTHING;
`

ctx, done := context.WithTimeout(ctx, 15*time.Second)
defer done()
scannerID, err := s.selectScanner(ctx, vs)
if err != nil {
return fmt.Errorf("error getting the scanner ID: %w", err)
}
start := time.Now()
_, err := s.pool.Exec(ctx, query, hash, vs.Name(), vs.Version(), vs.Kind())
_, err = s.pool.Exec(ctx, query, hash, scannerID)
if err != nil {
return fmt.Errorf("error setting layer scanned: %w", err)
}
Expand Down

0 comments on commit bfaf2af

Please sign in to comment.