You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
In particular, when Pulsar Functions are running in Kubernetes, passing in specific data by updating the command line arguments will trigger a reschedule (restart for pod) of the Pulsar Instance.
Therefore we need to provide a more dynamic way of accessing data (mainly FunctionDetails) for functions, and this way can also serve to pass the function's other parameters.
Goal
Let the Pulsar functions support reading parameters from the specified configuration file
Let the Pulsar functions feedback state information more gracefully (no more command line approach)
The current function runtimes that need to support this feature are the Python Runtime, Java Runtime and Golang Runtime.
API Changes
No response
Implementation
This mechanism is aimed at allowing the function to dynamically fetch the required parameters and data without having to restart the function in order to load the latest parameters or data.
A more appropriate approach is to provide the function with a local config file where the data and parameters are stored. The provider of the data and parameters (usually the initiator of the function, e.g., function-worker) writes the contents of the parameters and the data (FunctionDetails) to a fixed local config file and passes the path of this config file to the function.
exec --config-file /path/to/config-file
The function determines the value of --config-file and determines if the target config file exists, and parses its contents if it does.
The rules for applying parameters or data to the function are as follows.
Parameters or data passed in on the command line have a higher priority than the corresponding parameters or data in the configuration file
For mandatory parameters or data, if they are not passed in, an exception is thrown.
For optional parameters or data, if not passed in, the default value is used
We have two major steps, they are
Support reading configs from file
Monitor state config changes
Support reading configs from file
Python Runtime
Add the argument
config_file, default is None, which means you need to get the configuration content from config_file
The priority of command line arguments is higher than the priority of configuration file arguments during initialization
Java Runtime
We propose to allow JavaInstanceStarter to read from file by:
Create a class JavaInstanceConfiguration extends PulsarConfiguration. This class will contain all the command line args. Subclassed of PulsarConfiguration can be read using PulsarConfigurationLoader.
In JavaInstanceStarter, add another command line args --config_file, this arg is the config file location.
First , and read the “command line args”, then read the “file configs”. Finally, merge these two with priority “command line args ” > “file configs”, plus required field validation and default value completion.
All JCommander configs will be non-required : the actual required validation will be done in a later step through a custom method.
If a config field is not provided (by JCommander nor by file), they will be initialized to null value. This way we can distinguish what field are not provided and what are provided but the value is the same as the default value. The default value for each config field is then provided in a later step through a custome method.
JavaInstanceStarter is used by all three runtime implementations : KubernetesRuntime , ProcessRuntime and ThreadRuntime . When can then modify the Pulsar Worker to use this new feature to run JavaInstanceStarter .
Golang Runtime
The current Golang Runtime reads the configurations from conf/config.yaml
However, there are structural differences between the configuration items in conf/config.yaml and those in Java and Python runtimes, so perhaps we can align their configuration styles in the implementation of this proposal.
An alternative proposal for the coexistence of configuration files and command line arguments is to follow an exclusivity policy, i.e. when a configuration file is applied, then only the configuration is read from the configuration file and no longer care about the arguments passed in via the command line, to be discussed.
Monitor state config changes
At present, this requirement mainly arises from Java functions, because the feature of getting the number of function instances is provided in the Window Functions scenario, which is currently supported only by the Java Runtime.
Java instances need to know the parallelism of the current instance. However, this parallelism information is passed in at start-up only, and if the parallelism information changes, we need to change the instance's startup command, which will result in a rescheduling of all instances (process restart or pod restart), which is not desirable.
One way to avoid this reschedule is to avoid changing the instance’s startup command when the parallelism parameter changes. This can be done with our first goal proposal above
In addition, we need to make Pulsar Functions support dynamic fetching the current parallelism information without having to reinitialize each instance due to changes in the number of function instances (possibly through manual changes or auto-scaling based on mechanisms such as [HPA](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/)).
What configs should be modifiable at runtime?
Not every config in FunctionDetails should be modifiable at runtime. In fact, some configs are only used during initialization, and these configs (such as className, processingGuarantee) changes should trigger instance reschedule or process restart. (Discussion here). For such configs, we should not support changing them at runtime without restart.
Configs like parallelism should be modifiable at runtime. In fact, instead of defining it as a config entry, it’s more like a context state that is exposed to the Java runtime.
States that need to be modified:
parallelism
Implementation Plan: use file to detect config changes
Based on the observation above. Initially our scope will be limited to FunctionDetails.parallelism only which used by Context.getNumInstances().
If we find other config options should be supported, we will follow a similar approach:
Users should provide a config file storing key value config pairs.
The user provides the location of the config file when starting the instance
And we have two alternatives:
Whenever the user queries the config value using context, for example Context#getNumInstances(), the runtime should read the file and retrieve the newest value.
Use a polling thread to monitor file changes. Whenever the file content is changed, the state of the instance is updated as well.
Alternatives
Included in the section above.
Anything else?
Concerns:
Consistency: If the function relies on all instances see the same config value. For example, the function uses parallelism to compute owned work for each instance, then if two instance sees a different parallelism value, could it cause data inconsistency issues ? (This is like the rebalancing when adding parallelism)
User code initialization : if the user only uses the config value at initialization to create some objects, then after initialization, even if the config value is updated, the created objects will not be changed.
The text was updated successfully, but these errors were encountered:
Credits to @tpiperatgod to initialize this PIP, thanks to @freeznet to give feedbacks on this PIP~
Motivation
The Pulsar functions have the ability to provide the number of instances of the called function (see, for example, ["Get num instances"](https://pulsar.apache.org/docs/2.10.x/window-functions-context/#get-num-instances) ), but the data that this function currently relies on needs to be passed in via command line arguments and cannot be easily changed dynamically.
In particular, when Pulsar Functions are running in Kubernetes, passing in specific data by updating the command line arguments will trigger a reschedule (restart for pod) of the Pulsar Instance.
Therefore we need to provide a more dynamic way of accessing data (mainly
FunctionDetails
) for functions, and this way can also serve to pass the function's other parameters.Goal
The current function runtimes that need to support this feature are the Python Runtime, Java Runtime and Golang Runtime.
API Changes
No response
Implementation
This mechanism is aimed at allowing the function to dynamically fetch the required parameters and data without having to restart the function in order to load the latest parameters or data.
A more appropriate approach is to provide the function with a local config file where the data and parameters are stored. The provider of the data and parameters (usually the initiator of the function, e.g., function-worker) writes the contents of the parameters and the data (FunctionDetails) to a fixed local config file and passes the path of this config file to the function.
The function determines the value of
--config-file
and determines if the target config file exists, and parses its contents if it does.The rules for applying parameters or data to the function are as follows.
We have two major steps, they are
Support reading configs from file
Python Runtime
Add the argument
config_file
, default is None, which means you need to get the configuration content fromconfig_file
The priority of command line arguments is higher than the priority of configuration file arguments during initialization
Java Runtime
We propose to allow
JavaInstanceStarter
to read from file by:JavaInstanceConfiguration extends PulsarConfiguration
. This class will contain all the command line args. Subclassed ofPulsarConfiguration
can be read usingPulsarConfigurationLoader
.JavaInstanceStarter
, add another command line args--config_file
, this arg is the config file location.JavaInstanceStarter
is used by all three runtime implementations :KubernetesRuntime
,ProcessRuntime
andThreadRuntime
. When can then modify the Pulsar Worker to use this new feature to runJavaInstanceStarter
.Golang Runtime
conf/config.yaml
conf/config.yaml
and those in Java and Python runtimes, so perhaps we can align their configuration styles in the implementation of this proposal.An alternative proposal for the coexistence of configuration files and command line arguments is to follow an exclusivity policy, i.e. when a configuration file is applied, then only the configuration is read from the configuration file and no longer care about the arguments passed in via the command line, to be discussed.
Monitor state config changes
Java instances need to know the parallelism of the current instance. However, this parallelism information is passed in at start-up only, and if the parallelism information changes, we need to change the instance's startup command, which will result in a rescheduling of all instances (process restart or pod restart), which is not desirable.
One way to avoid this reschedule is to avoid changing the instance’s startup command when the parallelism parameter changes. This can be done with our first goal proposal above
In addition, we need to make Pulsar Functions support dynamic fetching the current parallelism information without having to reinitialize each instance due to changes in the number of function instances (possibly through manual changes or auto-scaling based on mechanisms such as [HPA](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/)).
What configs should be modifiable at runtime?
Not every config in
FunctionDetails
should be modifiable at runtime. In fact, some configs are only used during initialization, and these configs (such asclassName
,processingGuarantee
) changes should trigger instance reschedule or process restart. (Discussion here). For such configs, we should not support changing them at runtime without restart.Configs like
parallelism
should be modifiable at runtime. In fact, instead of defining it as a config entry, it’s more like a context state that is exposed to the Java runtime.States that need to be modified:
Implementation Plan: use file to detect config changes
Based on the observation above. Initially our scope will be limited to
FunctionDetails.parallelism
only which used byContext.getNumInstances()
.If we find other config options should be supported, we will follow a similar approach:
And we have two alternatives:
Context#getNumInstances()
, the runtime should read the file and retrieve the newest value.Alternatives
Included in the section above.
Anything else?
Concerns:
The text was updated successfully, but these errors were encountered: