Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add method to control if a phase can be run synchronously #800

Merged
merged 3 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions api/internal/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ type Consumer struct {
FilterConfigs map[string]*fmModel.ParsedFilterConfig

// fields that generated from the configuration
CanSkipMethod map[string]bool
FilterNames []string
InitOnce sync.Once
CanSkipMethod map[string]bool
CanSkipMethodOnce sync.Once
CanSyncRunMethod map[string]bool
// CanSyncRunMethod share the same sync.Once with CanSkipMethodOnce
}

func (c *Consumer) Unmarshal(s string) error {
Expand Down Expand Up @@ -92,9 +94,10 @@ func (c *Consumer) InitConfigs() error {
}

c.FilterConfigs[name] = &fmModel.ParsedFilterConfig{
Name: name,
ParsedConfig: conf,
Factory: p.Factory,
Name: name,
ParsedConfig: conf,
Factory: p.Factory,
SyncRunPhases: p.ConfigParser.NonBlockingPhases(),
}
}

Expand Down
63 changes: 63 additions & 0 deletions api/pkg/filtermanager/api/phase.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright The HTNN Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

type Phase int

const (
PhaseDecodeHeaders Phase = 0x01
PhaseDecodeData Phase = 0x02
PhaseDecodeTrailers Phase = 0x04
PhaseDecodeRequest Phase = 0x08
PhaseEncodeHeaders Phase = 0x10
PhaseEncodeData Phase = 0x20
PhaseEncodeTrailers Phase = 0x40
PhaseEncodeResponse Phase = 0x80
PhaseOnLog Phase = 0x100
)

var (
AllPhases = PhaseDecodeHeaders | PhaseDecodeData | PhaseDecodeTrailers | PhaseDecodeRequest |
PhaseEncodeHeaders | PhaseEncodeData | PhaseEncodeTrailers | PhaseEncodeResponse | PhaseOnLog
)

func (p Phase) Contains(phases Phase) bool {
return p&phases == phases
}

func MethodToPhase(meth string) Phase {
switch meth {
case "DecodeHeaders":
return PhaseDecodeHeaders
case "DecodeData":
return PhaseDecodeData
case "DecodeTrailers":
return PhaseDecodeTrailers
case "DecodeRequest":
return PhaseDecodeRequest
case "EncodeHeaders":
return PhaseEncodeHeaders
case "EncodeData":
return PhaseEncodeData
case "EncodeTrailers":
return PhaseEncodeTrailers
case "EncodeResponse":
return PhaseEncodeResponse
case "OnLog":
return PhaseOnLog
default:
return 0

Check warning on line 61 in api/pkg/filtermanager/api/phase.go

View check run for this annotation

Codecov / codecov/patch

api/pkg/filtermanager/api/phase.go#L58-L61

Added lines #L58 - L61 were not covered by tests
}
}
7 changes: 4 additions & 3 deletions api/pkg/filtermanager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,10 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC
})
} else {
conf.parsed = append(conf.parsed, &model.ParsedFilterConfig{
Name: proto.Name,
ParsedConfig: config,
Factory: plugin.Factory,
Name: proto.Name,
ParsedConfig: config,
Factory: plugin.Factory,
SyncRunPhases: plugin.ConfigParser.NonBlockingPhases(),
})

_, ok := pkgPlugins.LoadPlugin(name).(pkgPlugins.ConsumerPlugin)
Expand Down
Loading
Loading