Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ import (

// Configuration variables
var (
listeningAddress string
metricsEndpoint string
scrapeURIs []string
fixProcessCount bool
k8sAutoTracking bool
namespace string
podLabels string
port string
listeningAddress string
metricsEndpoint string
scrapeURIs []string
fixProcessCount bool
k8sAutoTracking bool
k8sHeadless bool
k8sHeadlessService string
namespace string
podLabels string
port string
)

// serverCmd represents the server command
Expand All @@ -64,7 +66,7 @@ to quickly create a Cobra application.`,
log.Info("Kubernetes auto-tracking enabled. Watching for pod changes...")

go func() {
if err := pm.DiscoverPods(exporter, namespace, podLabels, port); err != nil {
if err := pm.DiscoverPods(exporter, namespace, podLabels, port, k8sHeadless, k8sHeadlessService); err != nil {
log.Error(err)
}
}()
Expand Down Expand Up @@ -152,21 +154,25 @@ func init() {

// Kubernetes
serverCmd.Flags().BoolVar(&k8sAutoTracking, "k8s.autotracking", false, "Enable automatic tracking of PHP-FPM pods in Kubernetes.")
serverCmd.Flags().BoolVar(&k8sHeadless, "k8s.headless", false, "Enable the use of headless service in the autodiscovery instead of pod IP.")
serverCmd.Flags().StringVar(&k8sHeadlessService, "k8s.headlessService", "", "The name of the headless service used in the autodiscovery.")
serverCmd.Flags().StringVarP(&namespace, "k8s.namespace", "n", "", "Kubernetes namespace to monitor (defaults to all namespaces if not set)")
serverCmd.Flags().StringVarP(&podLabels, "k8s.pod-labels", "l", "php-fpm-exporter/collect=true", "Kubernetes pod labels as a list of key-value pairs")
serverCmd.Flags().StringVarP(&port, "k8s.port", "p", "9000", "Kubernetes pod port")

// Workaround since vipers BindEnv is currently not working as expected (see https://github.com/spf13/viper/issues/461)

envs := map[string]string{
"PHP_FPM_WEB_LISTEN_ADDRESS": "web.listen-address",
"PHP_FPM_WEB_TELEMETRY_PATH": "web.telemetry-path",
"PHP_FPM_SCRAPE_URI": "phpfpm.scrape-uri",
"PHP_FPM_FIX_PROCESS_COUNT": "phpfpm.fix-process-count",
"PHP_FPM_K8S_AUTOTRACKING": "k8s.autotracking",
"PHP_FPM_K8S_NAMESPACE": "k8s.namespace",
"PHP_FPM_K8S_POD_LABELS": "k8s.pod-labels",
"PHP_FPM_K8S_POD_PORT": "k8s.port",
"PHP_FPM_WEB_LISTEN_ADDRESS": "web.listen-address",
"PHP_FPM_WEB_TELEMETRY_PATH": "web.telemetry-path",
"PHP_FPM_SCRAPE_URI": "phpfpm.scrape-uri",
"PHP_FPM_FIX_PROCESS_COUNT": "phpfpm.fix-process-count",
"PHP_FPM_K8S_AUTOTRACKING": "k8s.autotracking",
"PHP_FPM_K8S_HEADLESS": "k8s.headless",
"PHP_FPM_K8S_HEADLESS_SERVICE": "k8s.headlessService",
"PHP_FPM_K8S_NAMESPACE": "k8s.namespace",
"PHP_FPM_K8S_POD_LABELS": "k8s.pod-labels",
"PHP_FPM_K8S_POD_PORT": "k8s.port",
}

mapEnvVars(envs, serverCmd)
Expand Down
27 changes: 19 additions & 8 deletions phpfpm/pod_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"k8s.io/client-go/tools/watch"
)

const uriTemplate string = "tcp://%s:%s/status"
const (
uriTemplate string = "tcp://%s:%s/status"
)

// customWatcher is a custom implementation of the cache.Watcher interface,
// designed to watch Kubernetes pods based on specific label selectors and namespace.
Expand Down Expand Up @@ -79,15 +81,15 @@ func listPods(clientset *kubernetes.Clientset, namespace string, podLabels strin
// initializePodEnlisting retrieves all pods matching the specified criteria and appends their URIs to the PoolManager's PodPhases.
// This function is invoked prior to starting the NewRetryWatcher to capture the initial state of existing pods
// and to obtain the ResourceVersion required for initializing the NewRetryWatcher.
func (pm *PoolManager) initialPodEnlisting(exporter *Exporter, podList *v1.PodList, port string) (string, error) {
func (pm *PoolManager) initialPodEnlisting(exporter *Exporter, podList *v1.PodList, port string, headless bool, headlessService string) (string, error) {

log.Infof("Found %d pod(s) during initial list", len(podList.Items))
for _, pod := range podList.Items {
podName := pod.Name
currentPhase := pod.Status.Phase
log.Debugf("Processing pod from initial list: %s, phase: %s", podName, currentPhase)

uri := fmt.Sprintf(uriTemplate, pod.Status.PodIP, port)
uri := buildPodDNSURI(&pod, port, headless, headlessService)
pm.processPodAdded(exporter, &pod, uri)
}
return podList.ResourceVersion, nil
Expand Down Expand Up @@ -143,7 +145,7 @@ func (pm *PoolManager) processPodDeleted(exporter *Exporter, pod *v1.Pod, uri st
// DiscoverPods begins by listing the pods that match the specified labels within the given namespace.
// It then starts a watch session in a separate goroutine.
// The list operation is performed first to retrieve the initial ResourceVersion, which is required to initialize a NewRetryWatcher.
func (pm *PoolManager) DiscoverPods(exporter *Exporter, namespace string, podLabels string, port string) error {
func (pm *PoolManager) DiscoverPods(exporter *Exporter, namespace string, podLabels string, port string, headless bool, headlessService string) error {
// Get the Kubernetes client
clientset, err := k8sGetClient()
if err != nil {
Expand All @@ -153,12 +155,12 @@ func (pm *PoolManager) DiscoverPods(exporter *Exporter, namespace string, podLab
watcher := newWatcher(clientset, namespace, podLabels)

podList, err := listPods(clientset, namespace, podLabels)
initialResourceVersion, err := pm.initialPodEnlisting(exporter, podList, port)
initialResourceVersion, err := pm.initialPodEnlisting(exporter, podList, port, headless, headlessService)
if err != nil {
return err
}

go pm.watchPodEvents(exporter, watcher, initialResourceVersion, port)
go pm.watchPodEvents(exporter, watcher, initialResourceVersion, port, headless, headlessService)
return nil
}

Expand All @@ -167,7 +169,7 @@ func (pm *PoolManager) DiscoverPods(exporter *Exporter, namespace string, podLab
// - For "modified" events, it verifies if the pod is in the running state before appending its URI to the pool manager.
// - For "deleted" events, the pod's URI is removed from the pool manager's PodPhases.
// Note: There is an unresolved issue with timeout errors when a pod is deleted, which requires further investigation and handling.
func (pm *PoolManager) watchPodEvents(exporter *Exporter, watcher cache.Watcher, resourceVersion string, port string) {
func (pm *PoolManager) watchPodEvents(exporter *Exporter, watcher cache.Watcher, resourceVersion string, port string, headless bool, headlessService string) {
retryWatcher, err := watch.NewRetryWatcher(resourceVersion, watcher)
if err != nil {
log.Errorf("Failed to create Retry Watcher: %v", err)
Expand All @@ -183,7 +185,7 @@ func (pm *PoolManager) watchPodEvents(exporter *Exporter, watcher cache.Watcher,
continue
}

uri := fmt.Sprintf(uriTemplate, pod.Status.PodIP, port)
uri := buildPodDNSURI(pod, port, headless, headlessService)
log.Debugf("Received event for pod %s: type=%s, phase=%s", pod.Name, event.Type, pod.Status.Phase)

switch event.Type {
Expand All @@ -196,3 +198,12 @@ func (pm *PoolManager) watchPodEvents(exporter *Exporter, watcher cache.Watcher,
}
}
}

func buildPodDNSURI(pod *v1.Pod, port string, headless bool, headlessService string) string {
if headless {
hostname := fmt.Sprintf("%s.%s.%s.svc.cluster.local", pod.Name, headlessService, pod.Namespace)
return fmt.Sprintf(uriTemplate, hostname, port)
} else {
return fmt.Sprintf(uriTemplate, pod.Status.PodIP, port)
}
}
Loading