Skip to content

Commit 3f9d901

Browse files
Fix review comments
1 parent a85c622 commit 3f9d901

File tree

7 files changed

+161
-238
lines changed

7 files changed

+161
-238
lines changed

modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java

+9
Original file line numberDiff line numberDiff line change
@@ -434,12 +434,21 @@ public void handleScatterGatherFinishEvent(MessageContext messageContext) {
434434
TracingScope tracingScope = tracingScopeManager.getTracingScope(messageContext);
435435
synchronized (tracingScope.getSpanStore()) {
436436
cleanupContinuationStateSequences(tracingScope.getSpanStore(), messageContext);
437+
cleanUpActiveSpans(tracingScope.getSpanStore(), messageContext);
437438
SpanWrapper outerLevelSpanWrapper = tracingScope.getSpanStore().getOuterLevelSpanWrapper();
438439
tracingScope.getSpanStore().finishSpan(outerLevelSpanWrapper, messageContext);
439440
tracingScopeManager.cleanupTracingScope(tracingScope.getTracingScopeId());
440441
}
441442
}
442443

444+
private void cleanUpActiveSpans(SpanStore spanStore, MessageContext messageContext) {
445+
List<SpanWrapper> activeSpanWrappers = spanStore.getActiveSpanWrappers();
446+
for (int i = activeSpanWrappers.size() - 1; i > 0; i--) {
447+
SpanWrapper spanWrapper = activeSpanWrappers.get(i);
448+
spanStore.finishSpan(spanWrapper, messageContext);
449+
}
450+
}
451+
443452
@Override
444453
public void handleStateStackInsertion(MessageContext synCtx, String seqName, SequenceType seqType) {
445454
TracingScope tracingScope = tracingScopeManager.getTracingScope(synCtx);

modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java

+14-5
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,15 @@ public class ScatterGatherMediatorFactory extends AbstractMediatorFactory {
5454
= new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "scatter-gather");
5555
private static final QName ELEMENT_AGGREGATE_Q
5656
= new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "aggregation");
57-
private static final QName ATT_VALUE_TO_AGGREGATE = new QName("value");
57+
private static final QName ATT_AGGREGATE_EXPRESSION = new QName("expression");
5858
private static final QName ATT_CONDITION = new QName("condition");
5959
private static final QName ATT_TIMEOUT = new QName("timeout");
6060
private static final QName ATT_MIN_MESSAGES = new QName("min-messages");
6161
private static final QName ATT_MAX_MESSAGES = new QName("max-messages");
6262
private static final QName SEQUENCE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "sequence");
6363
private static final QName PARALLEL_EXEC_Q = new QName("parallel-execution");
6464
private static final QName RESULT_TARGET_Q = new QName("result-target");
65+
private static final QName ROOT_ELEMENT_Q = new QName("root-element");
6566
private static final QName CONTENT_TYPE_Q = new QName("content-type");
6667

6768
private static final SequenceMediatorFactory fac = new SequenceMediatorFactory();
@@ -87,7 +88,15 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
8788
if ("JSON".equals(contentTypeAttr.getAttributeValue())) {
8889
mediator.setContentType(ScatterGather.JSON_TYPE);
8990
} else if ("XML".equals(contentTypeAttr.getAttributeValue())) {
90-
mediator.setContentType(ScatterGather.XML_TYPE);
91+
OMAttribute rootElementAttr = elem.getAttribute(ROOT_ELEMENT_Q);
92+
if (rootElementAttr != null && StringUtils.isNotBlank(rootElementAttr.getAttributeValue())) {
93+
mediator.setRootElementName(rootElementAttr.getAttributeValue());
94+
mediator.setContentType(ScatterGather.XML_TYPE);
95+
} else {
96+
String msg = "The 'root-element' attribute is required for the configuration of a " +
97+
"Scatter Gather mediator when the 'content-type' is 'XML'";
98+
throw new SynapseException(msg);
99+
}
91100
} else {
92101
String msg = "The 'content-type' attribute should be either 'JSON' or 'XML'";
93102
throw new SynapseException(msg);
@@ -119,16 +128,16 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
119128

120129
OMElement aggregateElement = elem.getFirstChildWithName(ELEMENT_AGGREGATE_Q);
121130
if (aggregateElement != null) {
122-
OMAttribute aggregateExpr = aggregateElement.getAttribute(ATT_VALUE_TO_AGGREGATE);
131+
OMAttribute aggregateExpr = aggregateElement.getAttribute(ATT_AGGREGATE_EXPRESSION);
123132
if (aggregateExpr != null) {
124133
try {
125134
mediator.setAggregationExpression(
126-
SynapsePathFactory.getSynapsePath(aggregateElement, ATT_VALUE_TO_AGGREGATE));
135+
SynapsePathFactory.getSynapsePath(aggregateElement, ATT_AGGREGATE_EXPRESSION));
127136
} catch (JaxenException e) {
128137
handleException("Unable to load the aggregating expression", e);
129138
}
130139
} else {
131-
String msg = "The 'value' attribute is required for the configuration of a Scatter Gather mediator";
140+
String msg = "The 'expression' attribute is required for the configuration of a Scatter Gather mediator";
132141
throw new SynapseException(msg);
133142
}
134143

modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.synapse.config.xml;
2020

2121
import org.apache.axiom.om.OMElement;
22+
import org.apache.commons.lang3.StringUtils;
2223
import org.apache.synapse.Mediator;
2324
import org.apache.synapse.mediators.eip.Target;
2425
import org.apache.synapse.mediators.v2.ScatterGather;
@@ -47,11 +48,15 @@ public OMElement serializeSpecificMediator(Mediator m) {
4748
"result-target", nullNS, scatterGatherMediator.getResultTarget()));
4849
scatterGatherElement.addAttribute(fac.createOMAttribute(
4950
"content-type", nullNS, scatterGatherMediator.getContentType()));
51+
if (StringUtils.isNotBlank(scatterGatherMediator.getRootElementName())) {
52+
scatterGatherElement.addAttribute(fac.createOMAttribute(
53+
"root-element", nullNS, scatterGatherMediator.getRootElementName()));
54+
}
5055

5156
OMElement aggregationElement = fac.createOMElement("aggregation", synNS);
5257

5358
SynapsePathSerializer.serializePath(
54-
scatterGatherMediator.getAggregationExpression(), aggregationElement, "value");
59+
scatterGatherMediator.getAggregationExpression(), aggregationElement, "expression");
5560

5661
if (scatterGatherMediator.getCorrelateExpression() != null) {
5762
SynapsePathSerializer.serializePath(

modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java

+14-18
Original file line numberDiff line numberDiff line change
@@ -97,24 +97,20 @@ public void run() {
9797
debugManager.advertiseMediationFlowStartPoint(synCtx);
9898
}
9999

100+
boolean result = seq.mediate(synCtx);
100101
// If this is a scatter message, then we need to use the continuation state and continue the mediation
101-
if (isScatterMessage(synCtx)) {
102-
boolean result = seq.mediate(synCtx);
103-
if (result) {
104-
SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(synCtx);
105-
if (seqContinuationState == null) {
106-
return;
107-
}
108-
SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(synCtx, seqContinuationState);
109-
110-
FlowContinuableMediator mediator =
111-
(FlowContinuableMediator) sequenceMediator.getChild(seqContinuationState.getPosition());
112-
113-
synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, true);
114-
mediator.mediate(synCtx, seqContinuationState);
102+
if (isScatterMessage(synCtx) && result) {
103+
SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(synCtx);
104+
if (seqContinuationState == null) {
105+
return;
115106
}
116-
} else {
117-
seq.mediate(synCtx);
107+
SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(synCtx, seqContinuationState);
108+
109+
FlowContinuableMediator mediator =
110+
(FlowContinuableMediator) sequenceMediator.getChild(seqContinuationState.getPosition());
111+
112+
synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, true);
113+
mediator.mediate(synCtx, seqContinuationState);
118114
}
119115
//((Axis2MessageContext)synCtx).getAxis2MessageContext().getEnvelope().discard();
120116

@@ -188,7 +184,7 @@ public void setStatisticsCloseEventListener(StatisticsCloseEventListener statist
188184
*/
189185
private static boolean isScatterMessage(MessageContext synCtx) {
190186

191-
Boolean isSkipContinuationState = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES);
192-
return isSkipContinuationState != null && isSkipContinuationState;
187+
Boolean isScatterMessage = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES);
188+
return isScatterMessage != null && isScatterMessage;
193189
}
194190
}

modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.ArrayList;
3232
import java.util.List;
3333
import java.util.TimerTask;
34+
import java.util.concurrent.locks.ReentrantLock;
3435

3536
/**
3637
* An instance of this class is created to manage each aggregation group, and it holds
@@ -53,7 +54,7 @@ public class Aggregate extends TimerTask {
5354
private AggregateMediator aggregateMediator = null;
5455
private ScatterGather scatterGatherMediator = null;
5556
private List<MessageContext> messages = new ArrayList<MessageContext>();
56-
private boolean locked = false;
57+
private ReentrantLock lock = new ReentrantLock();
5758
private boolean completed = false;
5859
private SynapseEnvironment synEnv = null;
5960

@@ -313,15 +314,15 @@ public void run() {
313314
}
314315

315316
public synchronized boolean getLock() {
316-
if (!locked) {
317-
locked = true;
318-
return true;
319-
}
320-
return false;
317+
318+
return lock.tryLock();
321319
}
322320

323321
public synchronized void releaseLock() {
324-
locked = false;
322+
323+
if (lock.isHeldByCurrentThread()) {
324+
lock.unlock();
325+
}
325326
}
326327

327328
public boolean isCompleted() {
@@ -331,5 +332,4 @@ public boolean isCompleted() {
331332
public void setCompleted(boolean completed) {
332333
this.completed = completed;
333334
}
334-
335335
}

0 commit comments

Comments
 (0)