Skip to content

Commit

Permalink
Fixed formatting issues for style check (apache#4175).
Browse files Browse the repository at this point in the history
  • Loading branch information
Devin Bost committed Jan 20, 2020
1 parent a40e229 commit 8ac03f9
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 43 deletions.
30 changes: 14 additions & 16 deletions pulsar-function-go/pf/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@ import (
)

type goInstance struct {
function function
context *FunctionContext
producer pulsar.Producer
consumers map[string]pulsar.Consumer
client pulsar.Client
function function
context *FunctionContext
producer pulsar.Producer
consumers map[string]pulsar.Consumer
client pulsar.Client
lastHealthCheckTs int64
}


// newGoInstance init goInstance and init function context
func newGoInstance() *goInstance {
goInstance := &goInstance{
Expand All @@ -51,19 +50,19 @@ func newGoInstance() *goInstance {
return goInstance
}

func (gi *goInstance) processSpawnerHealthCheckTimer(tkr *time.Ticker){
func (gi *goInstance) processSpawnerHealthCheckTimer(tkr *time.Ticker) {
log.Info("Starting processSpawnerHealthCheckTimer")
now := time.Now()
maxIdleTime := gi.context.GetMaxIdleTime()
timeSinceLastCheck := now.UnixNano() - gi.lastHealthCheckTs
if (timeSinceLastCheck) > (maxIdleTime) {
if (timeSinceLastCheck) > (maxIdleTime) {
log.Error("Haven't received health check from spawner in a while. Stopping instance...")
gi.close()
tkr.Stop()
}
}

func (gi *goInstance) startScheduler(){
func (gi *goInstance) startScheduler() {
if gi.context.instanceConf.expectedHealthCheckInterval > 0 {
log.Info("Starting Scheduler")
go func() {
Expand All @@ -80,8 +79,8 @@ func (gi *goInstance) startScheduler(){
func (gi *goInstance) startFunction(function function) error {
gi.function = function

// start proccess spawner health check timer
now := time.Now();
// start process spawner health check timer
now := time.Now()
gi.lastHealthCheckTs = now.UnixNano()

gi.startScheduler()
Expand All @@ -107,11 +106,11 @@ func (gi *goInstance) startFunction(function function) error {
return err
}

idleDuration := getIdleTimeout(time.Millisecond * gi.context.instanceConf.killAfterIdleMs)
idleDuration := getIdleTimeout(time.Millisecond * gi.context.instanceConf.killAfterIdle)
idleTimer := time.NewTimer(idleDuration)
defer idleTimer.Stop()

servicer := InstanceControlServicer{goInstance:gi}
servicer := InstanceControlServicer{goInstance: gi}
servicer.serve(gi)

CLOSE:
Expand Down Expand Up @@ -364,7 +363,6 @@ func (gi *goInstance) close() {
}
}


func (gi *goInstance) healthCheck() *pb.HealthCheckResult {
now := time.Now()
gi.lastHealthCheckTs = now.UnixNano()
Expand All @@ -377,11 +375,11 @@ func (gi *goInstance) getFunctionStatus() *pb.FunctionStatus {
}

func (gi *goInstance) getAndResetMetrics() *pb.MetricsData {
return nil // Not implemented until we add the statistics features
return nil // Not implemented until we add the statistics features
}

func (gi *goInstance) resetMetrics() *empty.Empty {
return nil // Not implemented until we add the statistics features
return nil // Not implemented until we add the statistics features
}

func (gi *goInstance) getMetrics() *pb.MetricsData {
Expand Down
34 changes: 17 additions & 17 deletions pulsar-function-go/pf/instanceConf.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ import (
// This is the config passed to the Golang Instance. Contains all the information
// passed to run functions
type instanceConf struct {
instanceID int
funcID string
funcVersion string
funcDetails pb.FunctionDetails
maxBufTuples int
port int
clusterName string
pulsarServiceURL string
killAfterIdleMs time.Duration
instanceID int
funcID string
funcVersion string
funcDetails pb.FunctionDetails
maxBufTuples int
port int
clusterName string
pulsarServiceURL string
killAfterIdle time.Duration
expectedHealthCheckInterval int32
}

Expand All @@ -49,14 +49,14 @@ func newInstanceConf() *instanceConf {
panic("config file is nil.")
}
instanceConf := &instanceConf{
instanceID: cfg.InstanceID,
funcID: cfg.FuncID,
funcVersion: cfg.FuncVersion,
maxBufTuples: cfg.MaxBufTuples,
port: cfg.Port,
clusterName: cfg.ClusterName,
pulsarServiceURL: cfg.PulsarServiceURL,
killAfterIdleMs: cfg.KillAfterIdleMs,
instanceID: cfg.InstanceID,
funcID: cfg.FuncID,
funcVersion: cfg.FuncVersion,
maxBufTuples: cfg.MaxBufTuples,
port: cfg.Port,
clusterName: cfg.ClusterName,
pulsarServiceURL: cfg.PulsarServiceURL,
killAfterIdle: cfg.KillAfterIdleMs,
expectedHealthCheckInterval: cfg.ExpectedHealthCheckInterval,
funcDetails: pb.FunctionDetails{
Tenant: cfg.Tenant,
Expand Down
25 changes: 15 additions & 10 deletions pulsar-function-go/pf/instanceControlServicer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,36 @@ package pf
import (
"context"
"fmt"
"github.com/golang/protobuf/ptypes/empty"
"net"
"google.golang.org/grpc"
log "github.com/apache/pulsar/pulsar-function-go/logutil"
pb "github.com/apache/pulsar/pulsar-function-go/pb"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"
"net"
)


type InstanceControlServicer struct {
goInstance *goInstance
}
func (icServicer *InstanceControlServicer) GetFunctionStatus(ctx context.Context, req *empty.Empty) (*pb.FunctionStatus, error) {

func (icServicer *InstanceControlServicer) GetFunctionStatus(
ctx context.Context, req *empty.Empty) (*pb.FunctionStatus, error) {
return icServicer.goInstance.getFunctionStatus(), nil
//return nil, status.Errorf(codes.Unimplemented, "method GetFunctionStatus not implemented")
}
func (icServicer *InstanceControlServicer) GetAndResetMetrics(ctx context.Context, req *empty.Empty) (*pb.MetricsData, error) {
func (icServicer *InstanceControlServicer) GetAndResetMetrics(
ctx context.Context, req *empty.Empty) (*pb.MetricsData, error) {
return icServicer.goInstance.getAndResetMetrics(), nil
}
func (icServicer *InstanceControlServicer) ResetMetrics(ctx context.Context, req *empty.Empty) (*empty.Empty, error) {
func (icServicer *InstanceControlServicer) ResetMetrics(
ctx context.Context, req *empty.Empty) (*empty.Empty, error) {
return icServicer.goInstance.resetMetrics(), nil
}
func (icServicer *InstanceControlServicer) GetMetrics(ctx context.Context, req *empty.Empty) (*pb.MetricsData, error) {
func (icServicer *InstanceControlServicer) GetMetrics(
ctx context.Context, req *empty.Empty) (*pb.MetricsData, error) {
return icServicer.goInstance.getMetrics(), nil
}
func (icServicer *InstanceControlServicer) HealthCheck(ctx context.Context, req *empty.Empty) (*pb.HealthCheckResult, error) {
func (icServicer *InstanceControlServicer) HealthCheck(
ctx context.Context, req *empty.Empty) (*pb.HealthCheckResult, error) {
return icServicer.goInstance.healthCheck(), nil
}

Expand All @@ -68,4 +73,4 @@ func (icServicer *InstanceControlServicer) serve(goInstance *goInstance) *grpc.S
}
}()
return grpcServer
}
}

0 comments on commit 8ac03f9

Please sign in to comment.