@@ -12,6 +12,7 @@ import (
1212 "fmt"
1313 "net/http"
1414 "os"
15+ "path"
1516 "path/filepath"
1617 "regexp"
1718 "slices"
@@ -1984,6 +1985,39 @@ func (r *tester) previewTransform(ctx context.Context, transformId string) ([]co
19841985 return preview .Documents , nil
19851986}
19861987
1988+ func (r * tester ) resetTransform (ctx context.Context , transformId string ) error {
1989+ resp , err := r .esAPI .TransformResetTransform (transformId ,
1990+ r .esAPI .TransformResetTransform .WithContext (ctx ),
1991+ r .esAPI .TransformResetTransform .WithForce (true ),
1992+ )
1993+ if err != nil {
1994+ return err
1995+ }
1996+ defer resp .Body .Close ()
1997+
1998+ if resp .IsError () {
1999+ return fmt .Errorf ("failed to reset transform %q: %s" , transformId , resp .String ())
2000+ }
2001+
2002+ return nil
2003+ }
2004+
2005+ func (r * tester ) startTransform (ctx context.Context , transformId string ) error {
2006+ resp , err := r .esAPI .TransformStartTransform (transformId ,
2007+ r .esAPI .TransformStartTransform .WithContext (ctx ),
2008+ )
2009+ if err != nil {
2010+ return err
2011+ }
2012+ defer resp .Body .Close ()
2013+
2014+ if resp .IsError () {
2015+ return fmt .Errorf ("failed to start transform %q: %s" , transformId , resp .String ())
2016+ }
2017+
2018+ return nil
2019+ }
2020+
19872021func (r * tester ) scheduleTransform (ctx context.Context , transformId string ) error {
19882022 resp , err := r .esAPI .TransformScheduleNowTransform (transformId ,
19892023 r .esAPI .TransformScheduleNowTransform .WithContext (ctx ),
@@ -2053,9 +2087,45 @@ func (r *tester) getTransformStats(ctx context.Context, transformId string) (*tr
20532087 return & response .Transforms [0 ], nil
20542088}
20552089
2090+ func (r * tester ) checkTransformAuditMessages (ctx context.Context , transformId string ) error {
2091+ // XXX: This is an internal API, are these audit messages available somewhere else?
2092+ const internalTransformsPath = "/internal/transform/transforms"
2093+ messagesPath := path .Join (internalTransformsPath , transformId , "messages" )
2094+ query := "?sortField=timestamp&sortDirection=desc" // Required
2095+ statusCode , body , err := r .kibanaClient .SendRequest (ctx , http .MethodGet , messagesPath + query , nil )
2096+ if err != nil {
2097+ return fmt .Errorf ("could not get transform audit messages: %w" , err )
2098+ }
2099+ if statusCode >= 400 {
2100+ return fmt .Errorf ("could not get transform audit messages: status code %d, body: %s" , statusCode , body )
2101+ }
2102+
2103+ var resp struct {
2104+ Messages []struct {
2105+ TransformID string `json:"transform_id"`
2106+ Message string `json:"message"`
2107+ Level string `json:"level"`
2108+ Timestamp int `json:"timestamp"`
2109+ NodeName string `json:"node_name"`
2110+ } `json:"messages"`
2111+ }
2112+ err = json .Unmarshal (body , & resp )
2113+ if err != nil {
2114+ return fmt .Errorf ("could not decode response: %w" , err )
2115+ }
2116+
2117+ for _ , message := range resp .Messages {
2118+ if message .Level == "error" {
2119+ return fmt .Errorf ("failure found in transform: %s" , message .Message )
2120+ }
2121+ }
2122+ return nil
2123+ }
2124+
20562125// checkRunningTransformHealth checks the following for a given transform:
20572126// - That it is started.
20582127// - That it can execute at least once during the check.
2128+ // - That it hasn't generated any error message.
20592129// - That it is healthy after executing at least once.
20602130func (r * tester ) checkRunningTransformHealth (ctx context.Context , transformId string ) error {
20612131 const (
@@ -2065,6 +2135,18 @@ func (r *tester) checkRunningTransformHealth(ctx context.Context, transformId st
20652135 lastSearchTime := 0
20662136 last := - 1
20672137 running := false
2138+
2139+ // Reset transform to clean any previous state.
2140+ /* XXX: It fails to create the index after reset :?
2141+ err := r.resetTransform(ctx, transformId)
2142+ if err != nil {
2143+ return fmt.Errorf("failed to reset transform: %w", err)
2144+ }
2145+ err = r.startTransform(ctx, transformId)
2146+ if err != nil {
2147+ return fmt.Errorf("failed to start transform after reset: %w", err)
2148+ }
2149+ */
20682150 ok , err := wait .UntilTrue (ctx , func (ctx context.Context ) (bool , error ) {
20692151 stats , err := r .getTransformStats (ctx , transformId )
20702152 if err != nil {
@@ -2106,6 +2188,12 @@ func (r *tester) checkRunningTransformHealth(ctx context.Context, transformId st
21062188 return false , nil
21072189 }
21082190
2191+ // We need to check the audit messages in case a document is removed but caused issues.
2192+ err = r .checkTransformAuditMessages (ctx , transformId )
2193+ if err != nil {
2194+ return false , err
2195+ }
2196+
21092197 err = healthError (stats .Health )
21102198 return err == nil , err
21112199 }, period , timeout )
0 commit comments