diff --git a/config.yaml b/config.yaml index d68649a..aaaf87f 100644 --- a/config.yaml +++ b/config.yaml @@ -69,16 +69,16 @@ FlinkApplicationDeployment: - ServiceFlinkListApplicationDeployments - ServiceFlinkStopApplicationDeployment FlinkApplication: - - ServiceFlinkCreateApplicationVersion - - ServiceFlinkDeleteApplicationVersion - - ServiceFlinkGetApplicationVersion - - ServiceFlinkValidateApplicationVersion -FlinkApplicationVersion: - ServiceFlinkCreateApplication - ServiceFlinkDeleteApplication - ServiceFlinkGetApplication - ServiceFlinkListApplications - ServiceFlinkUpdateApplication +FlinkApplicationVersion: + - ServiceFlinkCreateApplicationVersion + - ServiceFlinkDeleteApplicationVersion + - ServiceFlinkGetApplicationVersion + - ServiceFlinkValidateApplicationVersion FlinkJob: - ServiceFlinkJobDetails - ServiceFlinkJobsList diff --git a/handler/flinkapplication/flinkapplication.go b/handler/flinkapplication/flinkapplication.go index f81ce84..db3a652 100644 --- a/handler/flinkapplication/flinkapplication.go +++ b/handler/flinkapplication/flinkapplication.go @@ -10,25 +10,30 @@ import ( ) type Handler interface { - // ServiceFlinkCreateApplicationVersion create a Flink ApplicationVersion - // POST /project/{project}/service/{service_name}/flink/application/{application_id}/version - // https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkCreateApplicationVersion - ServiceFlinkCreateApplicationVersion(ctx context.Context, project string, serviceName string, applicationId string, in *ServiceFlinkCreateApplicationVersionIn) (*ServiceFlinkCreateApplicationVersionOut, error) + // ServiceFlinkCreateApplication create a Flink Application + // POST /project/{project}/service/{service_name}/flink/application + // https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkCreateApplication + ServiceFlinkCreateApplication(ctx context.Context, project string, serviceName string, in *ServiceFlinkCreateApplicationIn) (*ServiceFlinkCreateApplicationOut, error) - // ServiceFlinkDeleteApplicationVersion delete a Flink ApplicationVersion - // DELETE /project/{project}/service/{service_name}/flink/application/{application_id}/version/{application_version_id} - // https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkDeleteApplicationVersion - ServiceFlinkDeleteApplicationVersion(ctx context.Context, project string, serviceName string, applicationId string, applicationVersionId string) (*ServiceFlinkDeleteApplicationVersionOut, error) + // ServiceFlinkDeleteApplication delete a Flink Application + // DELETE /project/{project}/service/{service_name}/flink/application/{application_id} + // https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkDeleteApplication + ServiceFlinkDeleteApplication(ctx context.Context, project string, serviceName string, applicationId string) (*ServiceFlinkDeleteApplicationOut, error) - // ServiceFlinkGetApplicationVersion get a Flink ApplicationVersion - // GET /project/{project}/service/{service_name}/flink/application/{application_id}/version/{application_version_id} - // https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkGetApplicationVersion - ServiceFlinkGetApplicationVersion(ctx context.Context, project string, serviceName string, applicationId string, applicationVersionId string) (*ServiceFlinkGetApplicationVersionOut, error) + // ServiceFlinkGetApplication get a Flink Application + // GET /project/{project}/service/{service_name}/flink/application/{application_id} + // https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkGetApplication + ServiceFlinkGetApplication(ctx context.Context, project string, serviceName string, applicationId string) (*ServiceFlinkGetApplicationOut, error) - // ServiceFlinkValidateApplicationVersion validate a Flink ApplicationVersion - // POST /project/{project}/service/{service_name}/flink/application/{application_id}/version/validate - // https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkValidateApplicationVersion - ServiceFlinkValidateApplicationVersion(ctx context.Context, project string, serviceName string, applicationId string, in *ServiceFlinkValidateApplicationVersionIn) (*ServiceFlinkValidateApplicationVersionOut, error) + // ServiceFlinkListApplications get all Flink Applications + // GET /project/{project}/service/{service_name}/flink/application + // https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkListApplications + ServiceFlinkListApplications(ctx context.Context, project string, serviceName string) ([]ApplicationOut, error) + + // ServiceFlinkUpdateApplication update a Flink Application + // PUT /project/{project}/service/{service_name}/flink/application/{application_id} + // https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkUpdateApplication + ServiceFlinkUpdateApplication(ctx context.Context, project string, serviceName string, applicationId string, in *ServiceFlinkUpdateApplicationIn) (*ServiceFlinkUpdateApplicationOut, error) } func NewHandler(doer doer) FlinkApplicationHandler { @@ -43,40 +48,50 @@ type FlinkApplicationHandler struct { doer doer } -func (h *FlinkApplicationHandler) ServiceFlinkCreateApplicationVersion(ctx context.Context, project string, serviceName string, applicationId string, in *ServiceFlinkCreateApplicationVersionIn) (*ServiceFlinkCreateApplicationVersionOut, error) { - path := fmt.Sprintf("/project/%s/service/%s/flink/application/%s/version", project, serviceName, applicationId) - b, err := h.doer.Do(ctx, "ServiceFlinkCreateApplicationVersion", "POST", path, in) - out := new(ServiceFlinkCreateApplicationVersionOut) +func (h *FlinkApplicationHandler) ServiceFlinkCreateApplication(ctx context.Context, project string, serviceName string, in *ServiceFlinkCreateApplicationIn) (*ServiceFlinkCreateApplicationOut, error) { + path := fmt.Sprintf("/project/%s/service/%s/flink/application", project, serviceName) + b, err := h.doer.Do(ctx, "ServiceFlinkCreateApplication", "POST", path, in) + out := new(ServiceFlinkCreateApplicationOut) err = json.Unmarshal(b, out) if err != nil { return nil, err } return out, nil } -func (h *FlinkApplicationHandler) ServiceFlinkDeleteApplicationVersion(ctx context.Context, project string, serviceName string, applicationId string, applicationVersionId string) (*ServiceFlinkDeleteApplicationVersionOut, error) { - path := fmt.Sprintf("/project/%s/service/%s/flink/application/%s/version/%s", project, serviceName, applicationId, applicationVersionId) - b, err := h.doer.Do(ctx, "ServiceFlinkDeleteApplicationVersion", "DELETE", path, nil) - out := new(ServiceFlinkDeleteApplicationVersionOut) +func (h *FlinkApplicationHandler) ServiceFlinkDeleteApplication(ctx context.Context, project string, serviceName string, applicationId string) (*ServiceFlinkDeleteApplicationOut, error) { + path := fmt.Sprintf("/project/%s/service/%s/flink/application/%s", project, serviceName, applicationId) + b, err := h.doer.Do(ctx, "ServiceFlinkDeleteApplication", "DELETE", path, nil) + out := new(ServiceFlinkDeleteApplicationOut) err = json.Unmarshal(b, out) if err != nil { return nil, err } return out, nil } -func (h *FlinkApplicationHandler) ServiceFlinkGetApplicationVersion(ctx context.Context, project string, serviceName string, applicationId string, applicationVersionId string) (*ServiceFlinkGetApplicationVersionOut, error) { - path := fmt.Sprintf("/project/%s/service/%s/flink/application/%s/version/%s", project, serviceName, applicationId, applicationVersionId) - b, err := h.doer.Do(ctx, "ServiceFlinkGetApplicationVersion", "GET", path, nil) - out := new(ServiceFlinkGetApplicationVersionOut) +func (h *FlinkApplicationHandler) ServiceFlinkGetApplication(ctx context.Context, project string, serviceName string, applicationId string) (*ServiceFlinkGetApplicationOut, error) { + path := fmt.Sprintf("/project/%s/service/%s/flink/application/%s", project, serviceName, applicationId) + b, err := h.doer.Do(ctx, "ServiceFlinkGetApplication", "GET", path, nil) + out := new(ServiceFlinkGetApplicationOut) err = json.Unmarshal(b, out) if err != nil { return nil, err } return out, nil } -func (h *FlinkApplicationHandler) ServiceFlinkValidateApplicationVersion(ctx context.Context, project string, serviceName string, applicationId string, in *ServiceFlinkValidateApplicationVersionIn) (*ServiceFlinkValidateApplicationVersionOut, error) { - path := fmt.Sprintf("/project/%s/service/%s/flink/application/%s/version/validate", project, serviceName, applicationId) - b, err := h.doer.Do(ctx, "ServiceFlinkValidateApplicationVersion", "POST", path, in) - out := new(ServiceFlinkValidateApplicationVersionOut) +func (h *FlinkApplicationHandler) ServiceFlinkListApplications(ctx context.Context, project string, serviceName string) ([]ApplicationOut, error) { + path := fmt.Sprintf("/project/%s/service/%s/flink/application", project, serviceName) + b, err := h.doer.Do(ctx, "ServiceFlinkListApplications", "GET", path, nil) + out := new(serviceFlinkListApplicationsOut) + err = json.Unmarshal(b, out) + if err != nil { + return nil, err + } + return out.Applications, nil +} +func (h *FlinkApplicationHandler) ServiceFlinkUpdateApplication(ctx context.Context, project string, serviceName string, applicationId string, in *ServiceFlinkUpdateApplicationIn) (*ServiceFlinkUpdateApplicationOut, error) { + path := fmt.Sprintf("/project/%s/service/%s/flink/application/%s", project, serviceName, applicationId) + b, err := h.doer.Do(ctx, "ServiceFlinkUpdateApplication", "PUT", path, in) + out := new(ServiceFlinkUpdateApplicationOut) err = json.Unmarshal(b, out) if err != nil { return nil, err @@ -84,35 +99,20 @@ func (h *FlinkApplicationHandler) ServiceFlinkValidateApplicationVersion(ctx con return out, nil } -type ColumnOut struct { - DataType string `json:"data_type"` - Extras string `json:"extras,omitempty"` - Key string `json:"key,omitempty"` - Name string `json:"name"` - Nullable bool `json:"nullable"` - Watermark string `json:"watermark,omitempty"` -} -type PositionOut struct { - CharacterNumber int `json:"character_number"` - EndCharacterNumber int `json:"end_character_number"` - EndLineNumber int `json:"end_line_number"` - LineNumber int `json:"line_number"` -} -type ServiceFlinkCreateApplicationVersionIn struct { +type ApplicationOut struct { + CreatedAt *time.Time `json:"created_at,omitempty"` + CreatedBy string `json:"created_by,omitempty"` + Id string `json:"id"` + Name string `json:"name"` + UpdatedAt *time.Time `json:"updated_at,omitempty"` + UpdatedBy string `json:"updated_by,omitempty"` +} +type ApplicationVersionIn struct { Sinks []SinkIn `json:"sinks"` Sources []SourceIn `json:"sources"` Statement string `json:"statement"` } -type ServiceFlinkCreateApplicationVersionOut struct { - CreatedAt time.Time `json:"created_at"` - CreatedBy string `json:"created_by"` - Id string `json:"id"` - Sinks []SinkOut `json:"sinks"` - Sources []SourceOut `json:"sources"` - Statement string `json:"statement"` - Version int `json:"version"` -} -type ServiceFlinkDeleteApplicationVersionOut struct { +type ApplicationVersionOut struct { CreatedAt time.Time `json:"created_at"` CreatedBy string `json:"created_by"` Id string `json:"id"` @@ -121,25 +121,73 @@ type ServiceFlinkDeleteApplicationVersionOut struct { Statement string `json:"statement"` Version int `json:"version"` } -type ServiceFlinkGetApplicationVersionOut struct { - CreatedAt time.Time `json:"created_at"` - CreatedBy string `json:"created_by"` - Id string `json:"id"` - Sinks []SinkOut `json:"sinks"` - Sources []SourceOut `json:"sources"` - Statement string `json:"statement"` - Version int `json:"version"` -} -type ServiceFlinkValidateApplicationVersionIn struct { - Sinks []SinkIn `json:"sinks"` - Sources []SourceIn `json:"sources"` - Statement string `json:"statement,omitempty"` +type ColumnOut struct { + DataType string `json:"data_type"` + Extras string `json:"extras,omitempty"` + Key string `json:"key,omitempty"` + Name string `json:"name"` + Nullable bool `json:"nullable"` + Watermark string `json:"watermark,omitempty"` } -type ServiceFlinkValidateApplicationVersionOut struct { - Sinks []SinkOutAlt `json:"sinks"` - Sources []SourceOutAlt `json:"sources"` - Statement string `json:"statement,omitempty"` - StatementError *StatementErrorOut `json:"statement_error,omitempty"` +type CurrentDeploymentOut struct { + CreatedAt time.Time `json:"created_at"` + CreatedBy string `json:"created_by"` + ErrorMsg string `json:"error_msg,omitempty"` + Id string `json:"id"` + JobId string `json:"job_id,omitempty"` + LastSavepoint string `json:"last_savepoint,omitempty"` + Parallelism int `json:"parallelism"` + RestartEnabled bool `json:"restart_enabled"` + StartingSavepoint string `json:"starting_savepoint,omitempty"` + Status string `json:"status"` + VersionId string `json:"version_id"` +} +type ServiceFlinkCreateApplicationIn struct { + ApplicationVersion *ApplicationVersionIn `json:"application_version,omitempty"` + Name string `json:"name"` +} +type ServiceFlinkCreateApplicationOut struct { + ApplicationVersions []ApplicationVersionOut `json:"application_versions"` + CreatedAt time.Time `json:"created_at"` + CreatedBy string `json:"created_by"` + CurrentDeployment *CurrentDeploymentOut `json:"current_deployment,omitempty"` + Id string `json:"id"` + Name string `json:"name"` + UpdatedAt time.Time `json:"updated_at"` + UpdatedBy string `json:"updated_by"` +} +type ServiceFlinkDeleteApplicationOut struct { + ApplicationVersions []ApplicationVersionOut `json:"application_versions"` + CreatedAt time.Time `json:"created_at"` + CreatedBy string `json:"created_by"` + CurrentDeployment *CurrentDeploymentOut `json:"current_deployment,omitempty"` + Id string `json:"id"` + Name string `json:"name"` + UpdatedAt time.Time `json:"updated_at"` + UpdatedBy string `json:"updated_by"` +} +type ServiceFlinkGetApplicationOut struct { + ApplicationVersions []ApplicationVersionOut `json:"application_versions"` + CreatedAt time.Time `json:"created_at"` + CreatedBy string `json:"created_by"` + CurrentDeployment *CurrentDeploymentOut `json:"current_deployment,omitempty"` + Id string `json:"id"` + Name string `json:"name"` + UpdatedAt time.Time `json:"updated_at"` + UpdatedBy string `json:"updated_by"` +} +type ServiceFlinkUpdateApplicationIn struct { + Name string `json:"name"` +} +type ServiceFlinkUpdateApplicationOut struct { + ApplicationVersions []ApplicationVersionOut `json:"application_versions"` + CreatedAt time.Time `json:"created_at"` + CreatedBy string `json:"created_by"` + CurrentDeployment *CurrentDeploymentOut `json:"current_deployment,omitempty"` + Id string `json:"id"` + Name string `json:"name"` + UpdatedAt time.Time `json:"updated_at"` + UpdatedBy string `json:"updated_by"` } type SinkIn struct { CreateTable string `json:"create_table"` @@ -153,15 +201,6 @@ type SinkOut struct { TableId string `json:"table_id"` TableName string `json:"table_name"` } -type SinkOutAlt struct { - Columns []ColumnOut `json:"columns,omitempty"` - CreateTable string `json:"create_table"` - IntegrationId string `json:"integration_id,omitempty"` - Message string `json:"message,omitempty"` - Options map[string]any `json:"options,omitempty"` - Position *PositionOut `json:"position,omitempty"` - TableName string `json:"table_name,omitempty"` -} type SourceIn struct { CreateTable string `json:"create_table"` IntegrationId string `json:"integration_id,omitempty"` @@ -174,16 +213,6 @@ type SourceOut struct { TableId string `json:"table_id"` TableName string `json:"table_name"` } -type SourceOutAlt struct { - Columns []ColumnOut `json:"columns,omitempty"` - CreateTable string `json:"create_table"` - IntegrationId string `json:"integration_id,omitempty"` - Message string `json:"message,omitempty"` - Options map[string]any `json:"options,omitempty"` - Position *PositionOut `json:"position,omitempty"` - TableName string `json:"table_name,omitempty"` -} -type StatementErrorOut struct { - Message string `json:"message"` - Position *PositionOut `json:"position,omitempty"` +type serviceFlinkListApplicationsOut struct { + Applications []ApplicationOut `json:"applications"` } diff --git a/handler/flinkapplicationversion/flinkapplicationversion.go b/handler/flinkapplicationversion/flinkapplicationversion.go index 8b542a2..f21e2bc 100644 --- a/handler/flinkapplicationversion/flinkapplicationversion.go +++ b/handler/flinkapplicationversion/flinkapplicationversion.go @@ -10,30 +10,25 @@ import ( ) type Handler interface { - // ServiceFlinkCreateApplication create a Flink Application - // POST /project/{project}/service/{service_name}/flink/application - // https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkCreateApplication - ServiceFlinkCreateApplication(ctx context.Context, project string, serviceName string, in *ServiceFlinkCreateApplicationIn) (*ServiceFlinkCreateApplicationOut, error) + // ServiceFlinkCreateApplicationVersion create a Flink ApplicationVersion + // POST /project/{project}/service/{service_name}/flink/application/{application_id}/version + // https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkCreateApplicationVersion + ServiceFlinkCreateApplicationVersion(ctx context.Context, project string, serviceName string, applicationId string, in *ServiceFlinkCreateApplicationVersionIn) (*ServiceFlinkCreateApplicationVersionOut, error) - // ServiceFlinkDeleteApplication delete a Flink Application - // DELETE /project/{project}/service/{service_name}/flink/application/{application_id} - // https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkDeleteApplication - ServiceFlinkDeleteApplication(ctx context.Context, project string, serviceName string, applicationId string) (*ServiceFlinkDeleteApplicationOut, error) + // ServiceFlinkDeleteApplicationVersion delete a Flink ApplicationVersion + // DELETE /project/{project}/service/{service_name}/flink/application/{application_id}/version/{application_version_id} + // https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkDeleteApplicationVersion + ServiceFlinkDeleteApplicationVersion(ctx context.Context, project string, serviceName string, applicationId string, applicationVersionId string) (*ServiceFlinkDeleteApplicationVersionOut, error) - // ServiceFlinkGetApplication get a Flink Application - // GET /project/{project}/service/{service_name}/flink/application/{application_id} - // https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkGetApplication - ServiceFlinkGetApplication(ctx context.Context, project string, serviceName string, applicationId string) (*ServiceFlinkGetApplicationOut, error) + // ServiceFlinkGetApplicationVersion get a Flink ApplicationVersion + // GET /project/{project}/service/{service_name}/flink/application/{application_id}/version/{application_version_id} + // https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkGetApplicationVersion + ServiceFlinkGetApplicationVersion(ctx context.Context, project string, serviceName string, applicationId string, applicationVersionId string) (*ServiceFlinkGetApplicationVersionOut, error) - // ServiceFlinkListApplications get all Flink Applications - // GET /project/{project}/service/{service_name}/flink/application - // https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkListApplications - ServiceFlinkListApplications(ctx context.Context, project string, serviceName string) ([]ApplicationOut, error) - - // ServiceFlinkUpdateApplication update a Flink Application - // PUT /project/{project}/service/{service_name}/flink/application/{application_id} - // https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkUpdateApplication - ServiceFlinkUpdateApplication(ctx context.Context, project string, serviceName string, applicationId string, in *ServiceFlinkUpdateApplicationIn) (*ServiceFlinkUpdateApplicationOut, error) + // ServiceFlinkValidateApplicationVersion validate a Flink ApplicationVersion + // POST /project/{project}/service/{service_name}/flink/application/{application_id}/version/validate + // https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkValidateApplicationVersion + ServiceFlinkValidateApplicationVersion(ctx context.Context, project string, serviceName string, applicationId string, in *ServiceFlinkValidateApplicationVersionIn) (*ServiceFlinkValidateApplicationVersionOut, error) } func NewHandler(doer doer) FlinkApplicationVersionHandler { @@ -48,50 +43,40 @@ type FlinkApplicationVersionHandler struct { doer doer } -func (h *FlinkApplicationVersionHandler) ServiceFlinkCreateApplication(ctx context.Context, project string, serviceName string, in *ServiceFlinkCreateApplicationIn) (*ServiceFlinkCreateApplicationOut, error) { - path := fmt.Sprintf("/project/%s/service/%s/flink/application", project, serviceName) - b, err := h.doer.Do(ctx, "ServiceFlinkCreateApplication", "POST", path, in) - out := new(ServiceFlinkCreateApplicationOut) +func (h *FlinkApplicationVersionHandler) ServiceFlinkCreateApplicationVersion(ctx context.Context, project string, serviceName string, applicationId string, in *ServiceFlinkCreateApplicationVersionIn) (*ServiceFlinkCreateApplicationVersionOut, error) { + path := fmt.Sprintf("/project/%s/service/%s/flink/application/%s/version", project, serviceName, applicationId) + b, err := h.doer.Do(ctx, "ServiceFlinkCreateApplicationVersion", "POST", path, in) + out := new(ServiceFlinkCreateApplicationVersionOut) err = json.Unmarshal(b, out) if err != nil { return nil, err } return out, nil } -func (h *FlinkApplicationVersionHandler) ServiceFlinkDeleteApplication(ctx context.Context, project string, serviceName string, applicationId string) (*ServiceFlinkDeleteApplicationOut, error) { - path := fmt.Sprintf("/project/%s/service/%s/flink/application/%s", project, serviceName, applicationId) - b, err := h.doer.Do(ctx, "ServiceFlinkDeleteApplication", "DELETE", path, nil) - out := new(ServiceFlinkDeleteApplicationOut) +func (h *FlinkApplicationVersionHandler) ServiceFlinkDeleteApplicationVersion(ctx context.Context, project string, serviceName string, applicationId string, applicationVersionId string) (*ServiceFlinkDeleteApplicationVersionOut, error) { + path := fmt.Sprintf("/project/%s/service/%s/flink/application/%s/version/%s", project, serviceName, applicationId, applicationVersionId) + b, err := h.doer.Do(ctx, "ServiceFlinkDeleteApplicationVersion", "DELETE", path, nil) + out := new(ServiceFlinkDeleteApplicationVersionOut) err = json.Unmarshal(b, out) if err != nil { return nil, err } return out, nil } -func (h *FlinkApplicationVersionHandler) ServiceFlinkGetApplication(ctx context.Context, project string, serviceName string, applicationId string) (*ServiceFlinkGetApplicationOut, error) { - path := fmt.Sprintf("/project/%s/service/%s/flink/application/%s", project, serviceName, applicationId) - b, err := h.doer.Do(ctx, "ServiceFlinkGetApplication", "GET", path, nil) - out := new(ServiceFlinkGetApplicationOut) +func (h *FlinkApplicationVersionHandler) ServiceFlinkGetApplicationVersion(ctx context.Context, project string, serviceName string, applicationId string, applicationVersionId string) (*ServiceFlinkGetApplicationVersionOut, error) { + path := fmt.Sprintf("/project/%s/service/%s/flink/application/%s/version/%s", project, serviceName, applicationId, applicationVersionId) + b, err := h.doer.Do(ctx, "ServiceFlinkGetApplicationVersion", "GET", path, nil) + out := new(ServiceFlinkGetApplicationVersionOut) err = json.Unmarshal(b, out) if err != nil { return nil, err } return out, nil } -func (h *FlinkApplicationVersionHandler) ServiceFlinkListApplications(ctx context.Context, project string, serviceName string) ([]ApplicationOut, error) { - path := fmt.Sprintf("/project/%s/service/%s/flink/application", project, serviceName) - b, err := h.doer.Do(ctx, "ServiceFlinkListApplications", "GET", path, nil) - out := new(serviceFlinkListApplicationsOut) - err = json.Unmarshal(b, out) - if err != nil { - return nil, err - } - return out.Applications, nil -} -func (h *FlinkApplicationVersionHandler) ServiceFlinkUpdateApplication(ctx context.Context, project string, serviceName string, applicationId string, in *ServiceFlinkUpdateApplicationIn) (*ServiceFlinkUpdateApplicationOut, error) { - path := fmt.Sprintf("/project/%s/service/%s/flink/application/%s", project, serviceName, applicationId) - b, err := h.doer.Do(ctx, "ServiceFlinkUpdateApplication", "PUT", path, in) - out := new(ServiceFlinkUpdateApplicationOut) +func (h *FlinkApplicationVersionHandler) ServiceFlinkValidateApplicationVersion(ctx context.Context, project string, serviceName string, applicationId string, in *ServiceFlinkValidateApplicationVersionIn) (*ServiceFlinkValidateApplicationVersionOut, error) { + path := fmt.Sprintf("/project/%s/service/%s/flink/application/%s/version/validate", project, serviceName, applicationId) + b, err := h.doer.Do(ctx, "ServiceFlinkValidateApplicationVersion", "POST", path, in) + out := new(ServiceFlinkValidateApplicationVersionOut) err = json.Unmarshal(b, out) if err != nil { return nil, err @@ -99,20 +84,26 @@ func (h *FlinkApplicationVersionHandler) ServiceFlinkUpdateApplication(ctx conte return out, nil } -type ApplicationOut struct { - CreatedAt *time.Time `json:"created_at,omitempty"` - CreatedBy string `json:"created_by,omitempty"` - Id string `json:"id"` - Name string `json:"name"` - UpdatedAt *time.Time `json:"updated_at,omitempty"` - UpdatedBy string `json:"updated_by,omitempty"` -} -type ApplicationVersionIn struct { +type ColumnOut struct { + DataType string `json:"data_type"` + Extras string `json:"extras,omitempty"` + Key string `json:"key,omitempty"` + Name string `json:"name"` + Nullable bool `json:"nullable"` + Watermark string `json:"watermark,omitempty"` +} +type PositionOut struct { + CharacterNumber int `json:"character_number"` + EndCharacterNumber int `json:"end_character_number"` + EndLineNumber int `json:"end_line_number"` + LineNumber int `json:"line_number"` +} +type ServiceFlinkCreateApplicationVersionIn struct { Sinks []SinkIn `json:"sinks"` Sources []SourceIn `json:"sources"` Statement string `json:"statement"` } -type ApplicationVersionOut struct { +type ServiceFlinkCreateApplicationVersionOut struct { CreatedAt time.Time `json:"created_at"` CreatedBy string `json:"created_by"` Id string `json:"id"` @@ -121,73 +112,34 @@ type ApplicationVersionOut struct { Statement string `json:"statement"` Version int `json:"version"` } -type ColumnOut struct { - DataType string `json:"data_type"` - Extras string `json:"extras,omitempty"` - Key string `json:"key,omitempty"` - Name string `json:"name"` - Nullable bool `json:"nullable"` - Watermark string `json:"watermark,omitempty"` +type ServiceFlinkDeleteApplicationVersionOut struct { + CreatedAt time.Time `json:"created_at"` + CreatedBy string `json:"created_by"` + Id string `json:"id"` + Sinks []SinkOut `json:"sinks"` + Sources []SourceOut `json:"sources"` + Statement string `json:"statement"` + Version int `json:"version"` +} +type ServiceFlinkGetApplicationVersionOut struct { + CreatedAt time.Time `json:"created_at"` + CreatedBy string `json:"created_by"` + Id string `json:"id"` + Sinks []SinkOut `json:"sinks"` + Sources []SourceOut `json:"sources"` + Statement string `json:"statement"` + Version int `json:"version"` +} +type ServiceFlinkValidateApplicationVersionIn struct { + Sinks []SinkIn `json:"sinks"` + Sources []SourceIn `json:"sources"` + Statement string `json:"statement,omitempty"` } -type CurrentDeploymentOut struct { - CreatedAt time.Time `json:"created_at"` - CreatedBy string `json:"created_by"` - ErrorMsg string `json:"error_msg,omitempty"` - Id string `json:"id"` - JobId string `json:"job_id,omitempty"` - LastSavepoint string `json:"last_savepoint,omitempty"` - Parallelism int `json:"parallelism"` - RestartEnabled bool `json:"restart_enabled"` - StartingSavepoint string `json:"starting_savepoint,omitempty"` - Status string `json:"status"` - VersionId string `json:"version_id"` -} -type ServiceFlinkCreateApplicationIn struct { - ApplicationVersion *ApplicationVersionIn `json:"application_version,omitempty"` - Name string `json:"name"` -} -type ServiceFlinkCreateApplicationOut struct { - ApplicationVersions []ApplicationVersionOut `json:"application_versions"` - CreatedAt time.Time `json:"created_at"` - CreatedBy string `json:"created_by"` - CurrentDeployment *CurrentDeploymentOut `json:"current_deployment,omitempty"` - Id string `json:"id"` - Name string `json:"name"` - UpdatedAt time.Time `json:"updated_at"` - UpdatedBy string `json:"updated_by"` -} -type ServiceFlinkDeleteApplicationOut struct { - ApplicationVersions []ApplicationVersionOut `json:"application_versions"` - CreatedAt time.Time `json:"created_at"` - CreatedBy string `json:"created_by"` - CurrentDeployment *CurrentDeploymentOut `json:"current_deployment,omitempty"` - Id string `json:"id"` - Name string `json:"name"` - UpdatedAt time.Time `json:"updated_at"` - UpdatedBy string `json:"updated_by"` -} -type ServiceFlinkGetApplicationOut struct { - ApplicationVersions []ApplicationVersionOut `json:"application_versions"` - CreatedAt time.Time `json:"created_at"` - CreatedBy string `json:"created_by"` - CurrentDeployment *CurrentDeploymentOut `json:"current_deployment,omitempty"` - Id string `json:"id"` - Name string `json:"name"` - UpdatedAt time.Time `json:"updated_at"` - UpdatedBy string `json:"updated_by"` -} -type ServiceFlinkUpdateApplicationIn struct { - Name string `json:"name"` -} -type ServiceFlinkUpdateApplicationOut struct { - ApplicationVersions []ApplicationVersionOut `json:"application_versions"` - CreatedAt time.Time `json:"created_at"` - CreatedBy string `json:"created_by"` - CurrentDeployment *CurrentDeploymentOut `json:"current_deployment,omitempty"` - Id string `json:"id"` - Name string `json:"name"` - UpdatedAt time.Time `json:"updated_at"` - UpdatedBy string `json:"updated_by"` +type ServiceFlinkValidateApplicationVersionOut struct { + Sinks []SinkOutAlt `json:"sinks"` + Sources []SourceOutAlt `json:"sources"` + Statement string `json:"statement,omitempty"` + StatementError *StatementErrorOut `json:"statement_error,omitempty"` } type SinkIn struct { CreateTable string `json:"create_table"` @@ -201,6 +153,15 @@ type SinkOut struct { TableId string `json:"table_id"` TableName string `json:"table_name"` } +type SinkOutAlt struct { + Columns []ColumnOut `json:"columns,omitempty"` + CreateTable string `json:"create_table"` + IntegrationId string `json:"integration_id,omitempty"` + Message string `json:"message,omitempty"` + Options map[string]any `json:"options,omitempty"` + Position *PositionOut `json:"position,omitempty"` + TableName string `json:"table_name,omitempty"` +} type SourceIn struct { CreateTable string `json:"create_table"` IntegrationId string `json:"integration_id,omitempty"` @@ -213,6 +174,16 @@ type SourceOut struct { TableId string `json:"table_id"` TableName string `json:"table_name"` } -type serviceFlinkListApplicationsOut struct { - Applications []ApplicationOut `json:"applications"` +type SourceOutAlt struct { + Columns []ColumnOut `json:"columns,omitempty"` + CreateTable string `json:"create_table"` + IntegrationId string `json:"integration_id,omitempty"` + Message string `json:"message,omitempty"` + Options map[string]any `json:"options,omitempty"` + Position *PositionOut `json:"position,omitempty"` + TableName string `json:"table_name,omitempty"` +} +type StatementErrorOut struct { + Message string `json:"message"` + Position *PositionOut `json:"position,omitempty"` }