Skip to content

Commit

Permalink
Merge pull request #930 from bbklab/fix-launch-tasks
Browse files Browse the repository at this point in the history
fix the mess of launch tasks logic
  • Loading branch information
bbklab authored Sep 12, 2017
2 parents c432a16 + 7892100 commit 02fa8c9
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 115 deletions.
134 changes: 133 additions & 1 deletion integration-test/swan_api_create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,145 @@ func (s *ApiSuite) TestCreateApp(c *check.C) {
costPrintln("TestCreateApp() removed", startAt)
}

func (s *ApiSuite) TestCreateNoMatchedAgentsApp(c *check.C) {
// Purge
//
startAt := time.Now()
err := s.purge(time.Second*60, c)
c.Assert(err, check.IsNil)
fmt.Println("TestCreateNoMatchedAgentsApp() purged")

// New Create App
//
startAt = time.Now()
ver := demoVersion().setName("demo").setCount(10).setCPU(0.01).setMem(5).setConstraint("vcluster", "==", "xxxxx").
setProxy(true, "www.xxx.com", "", false).Get()
id := s.createApp(ver, c)
err = s.waitApp(id, types.OpStatusNoop, time.Second*180, c)
c.Assert(err, check.IsNil)
costPrintln("TestCreateNoMatchedAgentsApp() created", startAt)

// verify app
startAt = time.Now()
app := s.inspectApp(id, c)
c.Assert(app.Name, check.Equals, "demo")
c.Assert(app.TaskCount, check.Equals, 10)
c.Assert(app.VersionCount, check.Equals, 1)
c.Assert(len(app.Version), check.Equals, 1)
c.Assert(app.ErrMsg, check.Not(check.Equals), "")
match, _ := regexp.MatchString("no satisfied agent", app.ErrMsg)
c.Assert(match, check.Equals, true)

// verify app versions
vers := s.listAppVersions(id, c)
c.Assert(len(vers), check.Equals, 1)
c.Assert(vers[0].CPUs, check.Equals, 0.01)
c.Assert(vers[0].Mem, check.Equals, float64(5))
c.Assert(vers[0].Instances, check.Equals, int32(10))
c.Assert(vers[0].RunAs, check.Equals, app.RunAs)

// verify app tasks
tasks := s.listAppTasks(id, c)
c.Assert(len(tasks), check.Equals, 10)
for _, task := range tasks {
c.Assert(task.Status, check.Equals, "pending")
}

// verify proxy record
proxy := s.listAppProxies(id, c)
c.Assert(proxy, check.Not(check.IsNil))
c.Assert(proxy.Alias, check.Equals, "")
c.Assert(len(proxy.Backends), check.Equals, 0)
c.Assert(proxy.Listen, check.Equals, "")
c.Assert(proxy.Sticky, check.Equals, false)

// verify dns records
dns := s.listAppDNS(id, c)
c.Assert(len(dns), check.Equals, 0)

costPrintln("TestCreateNoMatchedAgentsApp() failure stats verified", startAt)

// Remove
//
startAt = time.Now()
err = s.removeApp(id, time.Second*10, c)
c.Assert(err, check.IsNil)
costPrintln("TestCreateNoMatchedAgentsApp() removed", startAt)
}

func (s *ApiSuite) TestCreateOverQuotaResourceApp(c *check.C) {
// Purge
//
startAt := time.Now()
err := s.purge(time.Second*60, c)
c.Assert(err, check.IsNil)
fmt.Println("TestCreateOverQuotaResourceApp() purged")

// New Create App
//
startAt = time.Now()
ver := demoVersion().setName("demo").setCount(10).setCPU(0.01).setMem(500000).
setProxy(true, "www.xxx.com", "", false).Get()
id := s.createApp(ver, c)
err = s.waitApp(id, types.OpStatusNoop, time.Second*180, c)
c.Assert(err, check.IsNil)
costPrintln("TestCreateOverQuotaResourceApp() created", startAt)

// verify app
startAt = time.Now()
app := s.inspectApp(id, c)
c.Assert(app.Name, check.Equals, "demo")
c.Assert(app.TaskCount, check.Equals, 10)
c.Assert(app.VersionCount, check.Equals, 1)
c.Assert(len(app.Version), check.Equals, 1)
c.Assert(app.ErrMsg, check.Not(check.Equals), "")
match, _ := regexp.MatchString("resource not enough", app.ErrMsg)
c.Assert(match, check.Equals, true)

// verify app versions
vers := s.listAppVersions(id, c)
c.Assert(len(vers), check.Equals, 1)
c.Assert(vers[0].CPUs, check.Equals, 0.01)
c.Assert(vers[0].Mem, check.Equals, float64(500000))
c.Assert(vers[0].Instances, check.Equals, int32(10))
c.Assert(vers[0].RunAs, check.Equals, app.RunAs)

// verify app tasks
tasks := s.listAppTasks(id, c)
c.Assert(len(tasks), check.Equals, 10)
for _, task := range tasks {
c.Assert(task.Status, check.Equals, "pending")
}

// verify proxy record
proxy := s.listAppProxies(id, c)
c.Assert(proxy, check.Not(check.IsNil))
c.Assert(proxy.Alias, check.Equals, "")
c.Assert(len(proxy.Backends), check.Equals, 0)
c.Assert(proxy.Listen, check.Equals, "")
c.Assert(proxy.Sticky, check.Equals, false)

// verify dns records
dns := s.listAppDNS(id, c)
c.Assert(len(dns), check.Equals, 0)

costPrintln("TestCreateOverQuotaResourceApp() failure stats verified", startAt)

// Remove
//
startAt = time.Now()
err = s.removeApp(id, time.Second*10, c)
c.Assert(err, check.IsNil)
costPrintln("TestCreateOverQuotaResourceApp() removed", startAt)
}

func (s *ApiSuite) TestCreateInvalidApp(c *check.C) {
// Purge
//
startAt := time.Now()
err := s.purge(time.Second*60, c)
c.Assert(err, check.IsNil)
fmt.Println("TestCreateInvalidApp() purged")
costPrintln("TestCreateInvalidApp() purged", startAt)

// Invalid Create Request
//
Expand Down
1 change: 1 addition & 0 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func New(cfg *config.ManagerConfig) (*Manager, error) {

filters := []mesos.Filter{
filter.NewConstraintsFilter(),
filter.NewResourceFilter(),
}
sched.InitFilters(filters)

Expand Down
22 changes: 9 additions & 13 deletions mesos/filter.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
package mesos

import (
//"github.com/Dataman-Cloud/swan/mesos/filter"
"github.com/Dataman-Cloud/swan/types"
)

type Filter interface {
Filter(config *types.TaskConfig, agents []*Agent) []*Agent
Filter(config *types.TaskConfig, replicas int, agents []*Agent) ([]*Agent, error)
}

//func NewFilter() []Filter {
// filters := []Filter{
// filter.NewResourceFilter(),
// }
//
// return filters
//}

func ApplyFilters(filters []Filter, config *types.TaskConfig, agents []*Agent) []*Agent {
// the returned agents contains at least one proper agent
func ApplyFilters(filters []Filter, config *types.TaskConfig, replicas int, agents []*Agent) ([]*Agent, error) {
accepted := agents

var err error
for _, filter := range filters {
accepted = filter.Filter(config, accepted)
accepted, err = filter.Filter(config, replicas, accepted)
if err != nil {
return nil, err
}
}

return accepted
return accepted, nil
}
20 changes: 15 additions & 5 deletions mesos/filter/constraints.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
package filter

import (
"errors"

"github.com/Dataman-Cloud/swan/mesos"
"github.com/Dataman-Cloud/swan/types"
)

var (
errNoSatisfiedAgent = errors.New("no satisfied agent")
)

type constraintsFilter struct{}

func NewConstraintsFilter() *constraintsFilter {
return &constraintsFilter{}
}

func (f *constraintsFilter) Filter(config *types.TaskConfig, agents []*mesos.Agent) []*mesos.Agent {
constraints := config.Constraints

candidates := make([]*mesos.Agent, 0)
func (f *constraintsFilter) Filter(config *types.TaskConfig, replicas int, agents []*mesos.Agent) ([]*mesos.Agent, error) {
var (
constraints = config.Constraints
candidates = make([]*mesos.Agent, 0)
)

for _, agent := range agents {
match := true
Expand All @@ -31,5 +38,8 @@ func (f *constraintsFilter) Filter(config *types.TaskConfig, agents []*mesos.Age
}
}

return candidates
if len(candidates) == 0 {
return nil, errNoSatisfiedAgent
}
return candidates, nil
}
28 changes: 20 additions & 8 deletions mesos/filter/resource.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,42 @@
package filter

import (
"errors"

"github.com/Dataman-Cloud/swan/mesos"
"github.com/Dataman-Cloud/swan/types"
)

var (
errResourceNotEnough = errors.New("resource not enough")
)

type resourceFilter struct {
}

func NewResourceFilter() *resourceFilter {
return &resourceFilter{}
}

func (f *resourceFilter) Filter(config *types.TaskConfig, agents []*mesos.Agent) []*mesos.Agent {
// multiplicate with replicas to calculate total resource requirments
func (f *resourceFilter) Filter(config *types.TaskConfig, replicas int, agents []*mesos.Agent) ([]*mesos.Agent, error) {
candidates := make([]*mesos.Agent, 0)

for _, agent := range agents {
cpus, mem, disk, ports := agent.Resources()

if cpus >= config.CPUs &&
mem >= config.Mem &&
disk >= config.Disk &&
len(ports) >= len(config.PortMappings) {
var (
cpus, mem, disk, ports = agent.Resources() // avaliable agent resources
cpusReq = config.CPUs * float64(replicas) // total resources requirements ...
memReq = config.Mem * float64(replicas)
diskReq = config.Disk * float64(replicas)
portsReq = len(config.PortMappings) * replicas // FIXME LATER
)
if cpus >= cpusReq && mem >= memReq && disk >= diskReq && len(ports) >= portsReq {
candidates = append(candidates, agent)
}
}

return candidates
if len(candidates) == 0 {
return nil, errResourceNotEnough
}
return candidates, nil
}
Loading

0 comments on commit 02fa8c9

Please sign in to comment.