diff --git a/pulsar-function-go/pf/instance.go b/pulsar-function-go/pf/instance.go index 4438d365e7711..235f29b14fe77 100644 --- a/pulsar-function-go/pf/instance.go +++ b/pulsar-function-go/pf/instance.go @@ -31,15 +31,14 @@ import ( ) type goInstance struct { - function function - context *FunctionContext - producer pulsar.Producer - consumers map[string]pulsar.Consumer - client pulsar.Client + function function + context *FunctionContext + producer pulsar.Producer + consumers map[string]pulsar.Consumer + client pulsar.Client lastHealthCheckTs int64 } - // newGoInstance init goInstance and init function context func newGoInstance() *goInstance { goInstance := &goInstance{ @@ -51,19 +50,19 @@ func newGoInstance() *goInstance { return goInstance } -func (gi *goInstance) processSpawnerHealthCheckTimer(tkr *time.Ticker){ +func (gi *goInstance) processSpawnerHealthCheckTimer(tkr *time.Ticker) { log.Info("Starting processSpawnerHealthCheckTimer") now := time.Now() maxIdleTime := gi.context.GetMaxIdleTime() timeSinceLastCheck := now.UnixNano() - gi.lastHealthCheckTs - if (timeSinceLastCheck) > (maxIdleTime) { + if (timeSinceLastCheck) > (maxIdleTime) { log.Error("Haven't received health check from spawner in a while. Stopping instance...") gi.close() tkr.Stop() } } -func (gi *goInstance) startScheduler(){ +func (gi *goInstance) startScheduler() { if gi.context.instanceConf.expectedHealthCheckInterval > 0 { log.Info("Starting Scheduler") go func() { @@ -80,8 +79,8 @@ func (gi *goInstance) startScheduler(){ func (gi *goInstance) startFunction(function function) error { gi.function = function - // start proccess spawner health check timer - now := time.Now(); + // start process spawner health check timer + now := time.Now() gi.lastHealthCheckTs = now.UnixNano() gi.startScheduler() @@ -107,11 +106,11 @@ func (gi *goInstance) startFunction(function function) error { return err } - idleDuration := getIdleTimeout(time.Millisecond * gi.context.instanceConf.killAfterIdleMs) + idleDuration := getIdleTimeout(time.Millisecond * gi.context.instanceConf.killAfterIdle) idleTimer := time.NewTimer(idleDuration) defer idleTimer.Stop() - servicer := InstanceControlServicer{goInstance:gi} + servicer := InstanceControlServicer{goInstance: gi} servicer.serve(gi) CLOSE: @@ -364,7 +363,6 @@ func (gi *goInstance) close() { } } - func (gi *goInstance) healthCheck() *pb.HealthCheckResult { now := time.Now() gi.lastHealthCheckTs = now.UnixNano() @@ -377,11 +375,11 @@ func (gi *goInstance) getFunctionStatus() *pb.FunctionStatus { } func (gi *goInstance) getAndResetMetrics() *pb.MetricsData { - return nil // Not implemented until we add the statistics features + return nil // Not implemented until we add the statistics features } func (gi *goInstance) resetMetrics() *empty.Empty { - return nil // Not implemented until we add the statistics features + return nil // Not implemented until we add the statistics features } func (gi *goInstance) getMetrics() *pb.MetricsData { diff --git a/pulsar-function-go/pf/instanceConf.go b/pulsar-function-go/pf/instanceConf.go index 0f5259113c844..892aefb73112d 100644 --- a/pulsar-function-go/pf/instanceConf.go +++ b/pulsar-function-go/pf/instanceConf.go @@ -30,15 +30,15 @@ import ( // This is the config passed to the Golang Instance. Contains all the information // passed to run functions type instanceConf struct { - instanceID int - funcID string - funcVersion string - funcDetails pb.FunctionDetails - maxBufTuples int - port int - clusterName string - pulsarServiceURL string - killAfterIdleMs time.Duration + instanceID int + funcID string + funcVersion string + funcDetails pb.FunctionDetails + maxBufTuples int + port int + clusterName string + pulsarServiceURL string + killAfterIdle time.Duration expectedHealthCheckInterval int32 } @@ -49,14 +49,14 @@ func newInstanceConf() *instanceConf { panic("config file is nil.") } instanceConf := &instanceConf{ - instanceID: cfg.InstanceID, - funcID: cfg.FuncID, - funcVersion: cfg.FuncVersion, - maxBufTuples: cfg.MaxBufTuples, - port: cfg.Port, - clusterName: cfg.ClusterName, - pulsarServiceURL: cfg.PulsarServiceURL, - killAfterIdleMs: cfg.KillAfterIdleMs, + instanceID: cfg.InstanceID, + funcID: cfg.FuncID, + funcVersion: cfg.FuncVersion, + maxBufTuples: cfg.MaxBufTuples, + port: cfg.Port, + clusterName: cfg.ClusterName, + pulsarServiceURL: cfg.PulsarServiceURL, + killAfterIdle: cfg.KillAfterIdleMs, expectedHealthCheckInterval: cfg.ExpectedHealthCheckInterval, funcDetails: pb.FunctionDetails{ Tenant: cfg.Tenant, diff --git a/pulsar-function-go/pf/instanceControlServicer.go b/pulsar-function-go/pf/instanceControlServicer.go index 4f032a65d4cab..7ea6a8569ec04 100644 --- a/pulsar-function-go/pf/instanceControlServicer.go +++ b/pulsar-function-go/pf/instanceControlServicer.go @@ -22,31 +22,36 @@ package pf import ( "context" "fmt" - "github.com/golang/protobuf/ptypes/empty" - "net" - "google.golang.org/grpc" log "github.com/apache/pulsar/pulsar-function-go/logutil" pb "github.com/apache/pulsar/pulsar-function-go/pb" + "github.com/golang/protobuf/ptypes/empty" + "google.golang.org/grpc" + "net" ) - type InstanceControlServicer struct { goInstance *goInstance } -func (icServicer *InstanceControlServicer) GetFunctionStatus(ctx context.Context, req *empty.Empty) (*pb.FunctionStatus, error) { + +func (icServicer *InstanceControlServicer) GetFunctionStatus( + ctx context.Context, req *empty.Empty) (*pb.FunctionStatus, error) { return icServicer.goInstance.getFunctionStatus(), nil //return nil, status.Errorf(codes.Unimplemented, "method GetFunctionStatus not implemented") } -func (icServicer *InstanceControlServicer) GetAndResetMetrics(ctx context.Context, req *empty.Empty) (*pb.MetricsData, error) { +func (icServicer *InstanceControlServicer) GetAndResetMetrics( + ctx context.Context, req *empty.Empty) (*pb.MetricsData, error) { return icServicer.goInstance.getAndResetMetrics(), nil } -func (icServicer *InstanceControlServicer) ResetMetrics(ctx context.Context, req *empty.Empty) (*empty.Empty, error) { +func (icServicer *InstanceControlServicer) ResetMetrics( + ctx context.Context, req *empty.Empty) (*empty.Empty, error) { return icServicer.goInstance.resetMetrics(), nil } -func (icServicer *InstanceControlServicer) GetMetrics(ctx context.Context, req *empty.Empty) (*pb.MetricsData, error) { +func (icServicer *InstanceControlServicer) GetMetrics( + ctx context.Context, req *empty.Empty) (*pb.MetricsData, error) { return icServicer.goInstance.getMetrics(), nil } -func (icServicer *InstanceControlServicer) HealthCheck(ctx context.Context, req *empty.Empty) (*pb.HealthCheckResult, error) { +func (icServicer *InstanceControlServicer) HealthCheck( + ctx context.Context, req *empty.Empty) (*pb.HealthCheckResult, error) { return icServicer.goInstance.healthCheck(), nil } @@ -68,4 +73,4 @@ func (icServicer *InstanceControlServicer) serve(goInstance *goInstance) *grpc.S } }() return grpcServer -} \ No newline at end of file +}