Skip to content

Commit

Permalink
fix(MPT): Fix templated pipeline triggers. (spinnaker#370)
Browse files Browse the repository at this point in the history
  • Loading branch information
jtk54 authored Oct 23, 2018
1 parent 8b473a7 commit eec2773
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class Pipeline {
@JsonProperty
boolean limitConcurrent;

@JsonProperty
boolean plan;

@JsonProperty
List<Trigger> triggers;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,22 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.netflix.spinnaker.echo.model.Pipeline;
import com.netflix.spinnaker.security.AuthenticatedRequest;
import java.util.Collection;
import java.util.Map;
import retrofit.http.Body;
import retrofit.http.GET;
import retrofit.http.Header;
import retrofit.http.POST;
import retrofit.http.Query;
import rx.Observable;

import java.util.Collection;

public interface OrcaService {
@POST("/orchestrate")
Observable<TriggerResponse> trigger(@Body Pipeline pipeline);

@POST("/orchestrate")
Map plan(@Body Map pipelineConfig);

@POST("/orchestrate")
Observable<TriggerResponse> trigger(@Body Pipeline pipeline, @Header(AuthenticatedRequest.SPINNAKER_USER) String runAsUser);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
package com.netflix.spinnaker.echo.pipelinetriggers.orca;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.echo.model.Pipeline;
import com.netflix.spinnaker.echo.pipelinetriggers.orca.OrcaService.TriggerResponse;
import com.netflix.spinnaker.fiat.shared.FiatStatus;
import com.netflix.spinnaker.security.AuthenticatedRequest;
import com.netflix.spinnaker.security.User;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import retrofit.RetrofitError;
import retrofit.RetrofitError.Kind;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -29,6 +30,7 @@ public class PipelineInitiator {
private final Registry registry;
private final OrcaService orca;
private final FiatStatus fiatStatus;
private final ObjectMapper objectMapper;
private final boolean enabled;
private final int retryCount;
private final long retryDelayMillis;
Expand All @@ -37,12 +39,14 @@ public class PipelineInitiator {
public PipelineInitiator(Registry registry,
OrcaService orca,
FiatStatus fiatStatus,
ObjectMapper objectMapper,
@Value("${orca.enabled:true}") boolean enabled,
@Value("${orca.pipelineInitiatorRetryCount:5}") int retryCount,
@Value("${orca.pipelineInitiatorRetryDelayMillis:5000}") long retryDelayMillis) {
this.registry = registry;
this.orca = orca;
this.fiatStatus = fiatStatus;
this.objectMapper = objectMapper;
this.enabled = enabled;
this.retryCount = retryCount;
this.retryDelayMillis = retryDelayMillis;
Expand All @@ -58,34 +62,45 @@ public void initialize() {
public void startPipeline(Pipeline pipeline) {
if (enabled) {
log.info("Triggering {} due to {}", pipeline, pipeline.getTrigger());
registry.counter("orca.requests").increment();

Observable<OrcaService.TriggerResponse> orcaResponse = createTriggerObservable(pipeline)
.retryWhen(new RetryWithDelay(retryCount, retryDelayMillis))
.doOnNext(this::onOrcaResponse)
.doOnError(throwable -> onOrcaError(pipeline, throwable));

if (pipeline.getTrigger() != null && pipeline.getTrigger().isPropagateAuth()) {
// If the trigger is one that should propagate authentication, just directly call Orca as the request interceptor
// will pass along the current headers.
orcaResponse.subscribe();
} else {
// If we should not propagate authentication, create an empty User object for the request
User korkUser = new User();
if (fiatStatus.isEnabled() && pipeline.getTrigger() != null) {
korkUser.setEmail(pipeline.getTrigger().getRunAsUser());
}
try {
AuthenticatedRequest.propagate(() -> orcaResponse.subscribe(), korkUser).call();
} catch (Exception e) {
log.error("Unable to trigger pipeline {}: {}", pipeline, e);
}
if (pipeline.getType().equals("templatedPipeline")) { // TODO(jacobkiefer): Constantize.
log.debug("Planning templated pipeline {} before triggering", pipeline.getId());
pipeline = pipeline.withPlan(true);
Map resolvedPipelineMap = orca.plan(objectMapper.convertValue(pipeline, Map.class));
pipeline = objectMapper.convertValue(resolvedPipelineMap, Pipeline.class);
}
triggerPipeline(pipeline);
registry.counter("orca.requests").increment();

} else {
log.info("Would trigger {} due to {} but triggering is disabled", pipeline, pipeline.getTrigger());
}
}

private void triggerPipeline(Pipeline pipeline) {
Observable<OrcaService.TriggerResponse> orcaResponse = createTriggerObservable(pipeline)
.retryWhen(new RetryWithDelay(retryCount, retryDelayMillis))
.doOnNext(this::onOrcaResponse)
.doOnError(throwable -> onOrcaError(pipeline, throwable));

if (pipeline.getTrigger() != null && pipeline.getTrigger().isPropagateAuth()) {
// If the trigger is one that should propagate authentication, just directly call Orca as the request interceptor
// will pass along the current headers.
orcaResponse.subscribe();
} else {
// If we should not propagate authentication, create an empty User object for the request
User korkUser = new User();
if (fiatStatus.isEnabled() && pipeline.getTrigger() != null) {
korkUser.setEmail(pipeline.getTrigger().getRunAsUser());
}
try {
AuthenticatedRequest.propagate(() -> orcaResponse.subscribe(), korkUser).call();
} catch (Exception e) {
log.error("Unable to trigger pipeline {}: {}", pipeline, e);
}
}
}

private Observable<OrcaService.TriggerResponse> createTriggerObservable(Pipeline pipeline) {
return orca.trigger(pipeline);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.netflix.spinnaker.echo.pipelinetriggers.orca

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spectator.api.NoopRegistry
import com.netflix.spinnaker.echo.model.Pipeline
import com.netflix.spinnaker.fiat.shared.FiatStatus
Expand All @@ -12,13 +13,14 @@ class PipelineInitiatorSpec extends Specification {
def registry = new NoopRegistry()
def orca = Mock(OrcaService)
def fiatStatus = Mock(FiatStatus)
def objectMapper = Mock(ObjectMapper)


@Unroll
def "calls orca #orcaCalls times when enabled=#enabled flag"() {
given:
def pipelineInitiator = new PipelineInitiator(registry, orca, fiatStatus, enabled, 5, 5000)
def pipeline = Pipeline.builder().application("application").name("name").id("id").build()
def pipelineInitiator = new PipelineInitiator(registry, orca, fiatStatus, objectMapper, enabled, 5, 5000)
def pipeline = Pipeline.builder().application("application").name("name").id("id").type("pipeline").build()

when:
pipelineInitiator.startPipeline(pipeline)
Expand All @@ -32,4 +34,31 @@ class PipelineInitiatorSpec extends Specification {
true || 1
false || 0
}

@Unroll
def "calls orca #orcaCalls to plan pipeline if templated"() {
given:
def pipelineInitiator = new PipelineInitiator(registry, orca, fiatStatus, objectMapper, true, 5, 5000)
def pipeline = Pipeline.builder()
.application("application")
.name("name")
.id("id")
.type(type)
.build()
def pipelineMap = pipeline as Map

when:
pipelineInitiator.startPipeline(pipeline)

then:
1 * fiatStatus.isEnabled() >> { return true }
orcaCalls * orca.plan(_) >> pipelineMap
objectMapper.convertValue(pipelineMap, Pipeline.class) >> pipeline
1 * orca.trigger(_) >> empty()

where:
type || orcaCalls
"pipeline" || 0
"templatedPipeline" || 1
}
}

0 comments on commit eec2773

Please sign in to comment.