2020import java .util .List ;
2121import java .util .concurrent .atomic .AtomicReference ;
2222import java .util .function .Supplier ;
23+ import java .util .stream .Collectors ;
2324
25+ import reactor .core .publisher .Flux ;
2426import reactor .core .publisher .Mono ;
2527
2628import org .springframework .http .HttpCookie ;
3941 */
4042public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
4143
44+ /**
45+ * COMMITTING -> COMMITTED is the period after doCommit is called but before
46+ * the response status and headers have been applied to the underlying
47+ * response during which time pre-commit actions can still make changes to
48+ * the response status and headers.
49+ */
50+ private enum State {NEW , COMMITTING , COMMITTED }
51+
4252 private final HttpHeaders headers ;
4353
4454 private final MultiValueMap <String , HttpCookie > cookies ;
4555
4656 private AtomicReference <State > state = new AtomicReference <>(State .NEW );
4757
48- private final List <Supplier <? extends Mono <Void >>> beforeCommitActions = new ArrayList <>(4 );
58+ private final List <Supplier <? extends Mono <Void >>> commitActions = new ArrayList <>(4 );
4959
5060
5161 public AbstractClientHttpRequest () {
@@ -61,50 +71,74 @@ public AbstractClientHttpRequest(HttpHeaders headers) {
6171
6272 @ Override
6373 public HttpHeaders getHeaders () {
64- if (State .COMITTED .equals (this .state .get ())) {
74+ if (State .COMMITTED .equals (this .state .get ())) {
6575 return HttpHeaders .readOnlyHttpHeaders (this .headers );
6676 }
6777 return this .headers ;
6878 }
6979
7080 @ Override
7181 public MultiValueMap <String , HttpCookie > getCookies () {
72- if (State .COMITTED .equals (this .state .get ())) {
82+ if (State .COMMITTED .equals (this .state .get ())) {
7383 return CollectionUtils .unmodifiableMultiValueMap (this .cookies );
7484 }
7585 return this .cookies ;
7686 }
7787
78- protected Mono <Void > applyBeforeCommit () {
79- Mono <Void > mono = Mono .empty ();
80- if (this .state .compareAndSet (State .NEW , State .COMMITTING )) {
81- for (Supplier <? extends Mono <Void >> action : this .beforeCommitActions ) {
82- mono = mono .then (() -> action .get ());
83- }
84- return mono
85- .otherwise (ex -> {
86- // Ignore errors from beforeCommit actions
87- return Mono .empty ();
88- })
89- .then (() -> {
90- this .state .set (State .COMITTED );
91- writeHeaders ();
92- writeCookies ();
93- return Mono .empty ();
94- });
88+ /**
89+ * A variant of {@link #doCommit(Supplier)} for a request without body.
90+ * @return a completion publisher
91+ */
92+ protected Mono <Void > doCommit () {
93+ return (this .state .get () == State .NEW ? doCommit (null ) : Mono .empty ());
94+ }
95+
96+ /**
97+ * Apply {@link #beforeCommit(Supplier) beforeCommit} actions, apply the
98+ * request headers/cookies, and write the request body.
99+ * @param writeAction the action to write the request body or {@code null}
100+ * @return a completion publisher
101+ */
102+ protected Mono <Void > doCommit (Supplier <? extends Mono <Void >> writeAction ) {
103+
104+ if (!this .state .compareAndSet (AbstractClientHttpRequest .State .NEW , AbstractClientHttpRequest .State .COMMITTING )) {
105+ return Mono .empty ();
106+ }
107+
108+ this .commitActions .add (() -> {
109+ applyHeaders ();
110+ applyCookies ();
111+ this .state .set (AbstractClientHttpRequest .State .COMMITTED );
112+ return Mono .empty ();
113+ });
114+
115+ if (writeAction != null ) {
116+ this .commitActions .add (writeAction );
95117 }
96- return mono ;
118+
119+ List <? extends Mono <Void >> actions = this .commitActions .stream ()
120+ .map (Supplier ::get ).collect (Collectors .toList ());
121+
122+ return Flux .concat (actions ).next ();
97123 }
98124
99125 @ Override
100126 public void beforeCommit (Supplier <? extends Mono <Void >> action ) {
101127 Assert .notNull (action );
102- this .beforeCommitActions .add (action );
128+ this .commitActions .add (action );
103129 }
104130
105- protected abstract void writeHeaders ();
131+ /**
132+ * Implement this method to apply header changes from {@link #getHeaders()}
133+ * to the underlying response. This method is called once only.
134+ */
135+ protected abstract void applyHeaders ();
136+
137+ /**
138+ * Implement this method to add cookies from {@link #getHeaders()} to the
139+ * underlying response. This method is called once only.
140+ */
141+ protected abstract void applyCookies ();
106142
107- protected abstract void writeCookies ();
108143
109- private enum State {NEW , COMMITTING , COMITTED }
110144}
0 commit comments