Skip to content

Commit

Permalink
cleanup scheduler logic
Browse files Browse the repository at this point in the history
  • Loading branch information
radeksimko committed Jun 28, 2022
1 parent 9942f19 commit 91b16de
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 131 deletions.
48 changes: 0 additions & 48 deletions internal/langserver/handlers/indexers.go

This file was deleted.

29 changes: 15 additions & 14 deletions internal/langserver/handlers/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
idecoder "github.com/hashicorp/terraform-ls/internal/decoder"
"github.com/hashicorp/terraform-ls/internal/document"
"github.com/hashicorp/terraform-ls/internal/filesystem"
"github.com/hashicorp/terraform-ls/internal/job"
"github.com/hashicorp/terraform-ls/internal/langserver/diagnostics"
"github.com/hashicorp/terraform-ls/internal/langserver/notifier"
"github.com/hashicorp/terraform-ls/internal/langserver/session"
Expand All @@ -41,8 +42,8 @@ type service struct {
sessCtx context.Context
stopSession context.CancelFunc

closedDirIndexer *scheduler.Scheduler
openDirIndexer *scheduler.Scheduler
lowPrioIndexer *scheduler.Scheduler
highPrioIndexer *scheduler.Scheduler

closedDirWalker *module.Walker
openDirWalker *module.Walker
Expand Down Expand Up @@ -437,15 +438,15 @@ func (svc *service) configureSessionDependencies(ctx context.Context, cfgOpts *s
sendModuleTelemetry(svc.stateStore, svc.telemetry),
}

svc.closedDirIndexer = scheduler.NewScheduler(&closedDirJobStore{svc.stateStore.JobStore}, 1)
svc.closedDirIndexer.SetLogger(svc.logger)
svc.closedDirIndexer.Start(svc.sessCtx)
svc.logger.Printf("running closed dir scheduler")
svc.lowPrioIndexer = scheduler.NewScheduler(svc.stateStore.JobStore, 1, job.LowPriority)
svc.lowPrioIndexer.SetLogger(svc.logger)
svc.lowPrioIndexer.Start(svc.sessCtx)
svc.logger.Printf("started low priority scheduler")

svc.openDirIndexer = scheduler.NewScheduler(&openDirJobStore{svc.stateStore.JobStore}, 1)
svc.openDirIndexer.SetLogger(svc.logger)
svc.openDirIndexer.Start(svc.sessCtx)
svc.logger.Printf("running open dir scheduler")
svc.highPrioIndexer = scheduler.NewScheduler(svc.stateStore.JobStore, 1, job.HighPriority)
svc.highPrioIndexer.SetLogger(svc.logger)
svc.highPrioIndexer.Start(svc.sessCtx)
svc.logger.Printf("started high priority scheduler")

cc, err := ilsp.ClientCapabilities(ctx)
if err == nil {
Expand Down Expand Up @@ -532,11 +533,11 @@ func (svc *service) shutdown() {
svc.logger.Printf("openDirWalker stopped")
}

if svc.closedDirIndexer != nil {
svc.closedDirIndexer.Stop()
if svc.lowPrioIndexer != nil {
svc.lowPrioIndexer.Stop()
}
if svc.openDirIndexer != nil {
svc.openDirIndexer.Stop()
if svc.highPrioIndexer != nil {
svc.highPrioIndexer.Stop()
}
}

Expand Down
8 changes: 5 additions & 3 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,24 @@ type Scheduler struct {
logger *log.Logger
jobStorage JobStorage
parallelism int
priority job.JobPriority
stopFunc context.CancelFunc
}

type JobStorage interface {
job.JobStore
AwaitNextJob(ctx context.Context) (job.ID, job.Job, error)
AwaitNextJob(ctx context.Context, priority job.JobPriority) (job.ID, job.Job, error)
FinishJob(id job.ID, jobErr error, deferredJobIds ...job.ID) error
}

func NewScheduler(jobStorage JobStorage, parallelism int) *Scheduler {
func NewScheduler(jobStorage JobStorage, parallelism int, priority job.JobPriority) *Scheduler {
discardLogger := log.New(ioutil.Discard, "", 0)

return &Scheduler{
logger: discardLogger,
jobStorage: jobStorage,
parallelism: parallelism,
priority: priority,
stopFunc: func() {},
}
}
Expand All @@ -54,7 +56,7 @@ func (s *Scheduler) Stop() {

func (s *Scheduler) eval(ctx context.Context) {
for {
id, nextJob, err := s.jobStorage.AwaitNextJob(ctx)
id, nextJob, err := s.jobStorage.AwaitNextJob(ctx, s.priority)
if err != nil {
if errors.Is(err, context.Canceled) {
return
Expand Down
50 changes: 5 additions & 45 deletions internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestScheduler_closedOnly(t *testing.T) {

ctx := context.Background()

s := NewScheduler(&closedDirJobs{js: ss.JobStore}, 2)
s := NewScheduler(ss.JobStore, 2, job.LowPriority)
s.SetLogger(testLogger())
s.Start(ctx)
t.Cleanup(func() {
Expand Down Expand Up @@ -151,14 +151,14 @@ func TestScheduler_closedAndOpen(t *testing.T) {
t.Cleanup(cancelFunc)
}

cs := NewScheduler(&closedDirJobs{js: ss.JobStore}, 1)
cs := NewScheduler(ss.JobStore, 1, job.LowPriority)
cs.SetLogger(testLogger())
cs.Start(ctx)
t.Cleanup(func() {
cs.Stop()
})

os := NewScheduler(&openDirJobs{js: ss.JobStore}, 1)
os := NewScheduler(ss.JobStore, 1, job.HighPriority)
os.SetLogger(testLogger())
os.Start(ctx)
t.Cleanup(func() {
Expand Down Expand Up @@ -197,7 +197,7 @@ func BenchmarkScheduler_EnqueueAndWaitForJob_closedOnly(b *testing.B) {
tmpDir := b.TempDir()
ctx := context.Background()

s := NewScheduler(&closedDirJobs{js: ss.JobStore}, 1)
s := NewScheduler(ss.JobStore, 1, job.LowPriority)
s.Start(ctx)
b.Cleanup(func() {
s.Stop()
Expand Down Expand Up @@ -238,7 +238,7 @@ func TestScheduler_defer(t *testing.T) {

ctx := context.Background()

s := NewScheduler(&closedDirJobs{js: ss.JobStore}, 2)
s := NewScheduler(ss.JobStore, 2, job.LowPriority)
s.SetLogger(testLogger())
s.Start(ctx)
t.Cleanup(func() {
Expand Down Expand Up @@ -326,43 +326,3 @@ func testLogger() *log.Logger {

return log.New(ioutil.Discard, "", 0)
}

type closedDirJobs struct {
js *state.JobStore
}

func (js *closedDirJobs) EnqueueJob(newJob job.Job) (job.ID, error) {
return js.js.EnqueueJob(newJob)
}

func (js *closedDirJobs) AwaitNextJob(ctx context.Context) (job.ID, job.Job, error) {
return js.js.AwaitNextJob(ctx, job.LowPriority)
}

func (js *closedDirJobs) FinishJob(id job.ID, jobErr error, deferredJobIds ...job.ID) error {
return js.js.FinishJob(id, jobErr, deferredJobIds...)
}

func (js *closedDirJobs) WaitForJobs(ctx context.Context, jobIds ...job.ID) error {
return js.js.WaitForJobs(ctx, jobIds...)
}

type openDirJobs struct {
js *state.JobStore
}

func (js *openDirJobs) EnqueueJob(newJob job.Job) (job.ID, error) {
return js.js.EnqueueJob(newJob)
}

func (js *openDirJobs) AwaitNextJob(ctx context.Context) (job.ID, job.Job, error) {
return js.js.AwaitNextJob(ctx, job.HighPriority)
}

func (js *openDirJobs) FinishJob(id job.ID, jobErr error, deferredJobIds ...job.ID) error {
return js.js.FinishJob(id, jobErr, deferredJobIds...)
}

func (js *openDirJobs) WaitForJobs(ctx context.Context, jobIds ...job.ID) error {
return js.js.WaitForJobs(ctx, jobIds...)
}
22 changes: 1 addition & 21 deletions internal/terraform/module/module_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func TestWalker_complexModules(t *testing.T) {
ExecPath: "tf-mock",
})

s := scheduler.NewScheduler(&closedJobStore{ss.JobStore}, 1)
s := scheduler.NewScheduler(ss.JobStore, 1, job.LowPriority)
ss.SetLogger(testLogger())
s.Start(ctx)

Expand Down Expand Up @@ -352,23 +352,3 @@ func testLogger() *log.Logger {

return log.New(ioutil.Discard, "", 0)
}

type closedJobStore struct {
js *state.JobStore
}

func (js *closedJobStore) EnqueueJob(newJob job.Job) (job.ID, error) {
return js.js.EnqueueJob(newJob)
}

func (js *closedJobStore) AwaitNextJob(ctx context.Context) (job.ID, job.Job, error) {
return js.js.AwaitNextJob(ctx, job.LowPriority)
}

func (js *closedJobStore) FinishJob(id job.ID, jobErr error, deferredJobIds ...job.ID) error {
return js.js.FinishJob(id, jobErr, deferredJobIds...)
}

func (js *closedJobStore) WaitForJobs(ctx context.Context, jobIds ...job.ID) error {
return js.js.WaitForJobs(ctx, jobIds...)
}

0 comments on commit 91b16de

Please sign in to comment.