Skip to content

Commit

Permalink
feat: do not use finalizer to stop metrics cache loop (#450)
Browse files Browse the repository at this point in the history
* feat: do not use finalizer to stop metrics cache loop

Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Oct 15, 2021
1 parent 0112945 commit f81b33d
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 2 deletions.
2 changes: 0 additions & 2 deletions manager/controllers/pipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

// PipelineReconciler reconciles a Pipeline object.
Expand Down Expand Up @@ -92,7 +91,6 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
},
Spec: step,
}
controllerutil.AddFinalizer(obj, stepFinalizer)
if err := r.Client.Create(ctx, obj); err != nil {
if apierr.IsAlreadyExists(err) {
old := &dfv1.Step{}
Expand Down
52 changes: 52 additions & 0 deletions manager/controllers/scaling/metrics_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ import (
lru "github.com/hashicorp/golang-lru"
pmodel "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
// check if the Step is deleted after reaching a number of failed times.
deadKeyCheckThreashold = 50
)

var (
httpClient *http.Client
metricsCache *lru.Cache
Expand All @@ -45,6 +51,9 @@ type MetricsCacheHandler struct {
stepList *list.List
lock sync.RWMutex
workers int

// counters for unavailable keys
deadKeys *sync.Map
}

func NewMetricsCacheHandler(client client.Client, workers int) *MetricsCacheHandler {
Expand All @@ -55,6 +64,8 @@ func NewMetricsCacheHandler(client client.Client, workers int) *MetricsCacheHand
stepMap: make(map[string]*list.Element),
stepList: list.New(),
lock: sync.RWMutex{},

deadKeys: new(sync.Map),
}
}

Expand Down Expand Up @@ -98,6 +109,44 @@ func (m *MetricsCacheHandler) Start(ctx context.Context) {
go m.pullMetrics(ctx, i, keyCh)
}

go func(cctx context.Context) {
defer runtime.HandleCrash()
ticker := time.NewTicker(180 * time.Second)
defer ticker.Stop()
for {
select {
case <-cctx.Done():
return
case <-ticker.C:
m.deadKeys.Range(func(k, v interface{}) bool {
times := v.(int)
if times < deadKeyCheckThreashold {
return true
}
// namespace/name/headless-svc-name
key := fmt.Sprint(k)
s := strings.Split(key, "/")
if err := m.client.Get(cctx, client.ObjectKey{Namespace: s[0], Name: s[1]}, &dfv1.Step{}); err != nil {
if apierrors.IsNotFound(err) {
logger.Info("no corresponding step is found", "key", key)
if err := m.StopWatching(key); err != nil {
logger.Error(err, "failed to stop watching", "key", key)
return true
}
m.deadKeys.Delete(k)
} else {
logger.Error(err, "failed to query step object", "key", key)
}
return true
}
// Step is existing, remove it from dead keys
m.deadKeys.Delete(k)
return true
})
}
}
}(ctx)

assign := func() {
m.lock.Lock()
defer m.lock.Unlock()
Expand Down Expand Up @@ -145,6 +194,9 @@ func (m *MetricsCacheHandler) pullMetrics(ctx context.Context, id int, keyCh <-c
if pending, err := getPendingMetric(key); err != nil {
if errors.Is(err, errMetricsEndpointUnavailable) {
logger.Info("metrics endpoint unavailable, might have been scaled to 0", "key", key)
if v, existing := m.deadKeys.LoadOrStore(key, 1); existing {
m.deadKeys.Store(key, v.(int)+1)
}
} else {
logger.Error(err, "failed to get pending messages", "key", key)
}
Expand Down

0 comments on commit f81b33d

Please sign in to comment.