Skip to content

Commit bf085f0

Browse files
committed
add method to control if a phase can be run synchronously
Signed-off-by: spacewander <[email protected]>
1 parent 886be64 commit bf085f0

File tree

9 files changed

+487
-242
lines changed

9 files changed

+487
-242
lines changed

api/internal/consumer/consumer.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,12 @@ type Consumer struct {
4545
FilterConfigs map[string]*fmModel.ParsedFilterConfig
4646

4747
// fields that generated from the configuration
48-
CanSkipMethod map[string]bool
4948
FilterNames []string
5049
InitOnce sync.Once
50+
CanSkipMethod map[string]bool
5151
CanSkipMethodOnce sync.Once
52+
CanSyncRunMethod map[string]bool
53+
// CanSyncRunMethod share the same sync.Once with CanSkipMethodOnce
5254
}
5355

5456
func (c *Consumer) Unmarshal(s string) error {
@@ -95,6 +97,7 @@ func (c *Consumer) InitConfigs() error {
9597
Name: name,
9698
ParsedConfig: conf,
9799
Factory: p.Factory,
100+
CanSyncRun: p.ConfigParser.IsNonBlocking(),
98101
}
99102
}
100103

api/pkg/filtermanager/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC
235235
Name: proto.Name,
236236
ParsedConfig: config,
237237
Factory: plugin.Factory,
238+
CanSyncRun: plugin.ConfigParser.IsNonBlocking(),
238239
})
239240

240241
_, ok := pkgPlugins.LoadPlugin(name).(pkgPlugins.ConsumerPlugin)

0 commit comments

Comments
 (0)