15
15
// under the License.
16
16
17
17
import ballerina /http ;
18
- import ballerina /mime ;
19
18
import ballerina /log ;
19
+ import ballerina /mime ;
20
20
21
21
isolated service class HttpService {
22
22
* http : Service ;
23
-
23
+
24
24
private final HttpToWebsubhubAdaptor adaptor ;
25
- private final readonly & ClientConfiguration clientConfig ;
26
25
private final string hub;
27
- private final int defaultHubLeaseSeconds;
28
- private final boolean isSubscriptionAvailable;
29
- private final boolean isSubscriptionValidationAvailable;
30
- private final boolean isUnsubscriptionAvailable;
31
- private final boolean isUnsubscriptionValidationAvailable;
32
- private final Controller hubController ;
26
+ private final int defaultLeaseSeconds;
27
+ private final SubscriptionHandler subscriptionHandler ;
33
28
34
29
isolated function init(HttpToWebsubhubAdaptor adaptor , string hubUrl , int leaseSeconds ,
35
- *ClientConfiguration clientConfig ) {
30
+ *ClientConfiguration clientConfig ) {
36
31
self .adaptor = adaptor ;
37
- self .clientConfig = clientConfig .cloneReadOnly ();
38
32
self .hub = hubUrl ;
39
- self .defaultHubLeaseSeconds = leaseSeconds ;
40
- string [] methodNames = adaptor .getServiceMethodNames ();
41
- self .isSubscriptionAvailable = isMethodAvailable (" onSubscription" , methodNames );
42
- self .isSubscriptionValidationAvailable = isMethodAvailable (" onSubscriptionValidation" , methodNames );
43
- self .isUnsubscriptionAvailable = isMethodAvailable (" onUnsubscription" , methodNames );
44
- self .isUnsubscriptionValidationAvailable = isMethodAvailable (" onUnsubscriptionValidation" , methodNames );
45
- self .hubController = new ;
33
+ self .defaultLeaseSeconds = leaseSeconds ;
34
+ self .subscriptionHandler = new (adaptor , clientConfig );
46
35
}
47
36
48
37
isolated resource function post .(http : Caller caller , http : Request request , http : Headers headers ) returns Error ? {
@@ -51,40 +40,34 @@ isolated service class HttpService {
51
40
if params is error {
52
41
response .statusCode = http : STATUS_BAD_REQUEST ;
53
42
response .setTextPayload (params .message ());
54
- return respondToRequest (caller , response );
55
- } else {
56
- string ? mode = params [HUB_MODE ];
57
- match mode {
58
- MODE_REGISTER => {
59
- http : Response | error result = processTopicRegistration (headers , params , self .adaptor );
60
- return handleResult (caller , result );
61
- }
62
- MODE_DEREGISTER => {
63
- http : Response | error result = processTopicDeregistration (headers , params , self .adaptor );
64
- return handleResult (caller , result );
65
- }
66
- MODE_SUBSCRIBE => {
67
- return self .handleSubscription (caller , headers , params );
68
- }
69
- MODE_UNSUBSCRIBE => {
70
- return self .handleUnsubscription (caller , headers , params );
71
- }
72
- MODE_PUBLISH => {
73
- http : Response | error result = processContentPublish (request , headers , params , self .adaptor );
74
- if result is error {
75
- response .statusCode = http : STATUS_BAD_REQUEST ;
76
- response .setTextPayload (result .message ());
77
- return respondToRequest (caller , response );
78
- } else {
79
- return respondToRequest (caller , result );
80
- }
81
- }
82
- _ => {
83
- response .statusCode = http : STATUS_BAD_REQUEST ;
84
- string errorMessage = " The request does not include valid `hub.mode` form param." ;
85
- response .setTextPayload (errorMessage );
86
- return respondToRequest (caller , response );
87
- }
43
+ return respondWithResult (caller , response );
44
+ }
45
+
46
+ string ? mode = params [HUB_MODE ];
47
+ match mode {
48
+ MODE_REGISTER => {
49
+ http : Response | error result = processTopicRegistration (headers , params , self .adaptor );
50
+ return respondWithResult (caller , result );
51
+ }
52
+ MODE_DEREGISTER => {
53
+ http : Response | error result = processTopicDeregistration (headers , params , self .adaptor );
54
+ return respondWithResult (caller , result );
55
+ }
56
+ MODE_SUBSCRIBE => {
57
+ return self .processSubscription (caller , headers , params );
58
+ }
59
+ MODE_UNSUBSCRIBE => {
60
+ return self .processUnsubscription (caller , headers , params );
61
+ }
62
+ MODE_PUBLISH => {
63
+ http : Response | error result = processContentPublish (request , headers , params , self .adaptor );
64
+ return respondWithResult (caller , result );
65
+ }
66
+ _ => {
67
+ response .statusCode = http : STATUS_BAD_REQUEST ;
68
+ string errorMessage = " The request does not include valid `hub.mode` form param." ;
69
+ response .setTextPayload (errorMessage );
70
+ return respondWithResult (caller , response );
88
71
}
89
72
}
90
73
}
@@ -124,79 +107,74 @@ isolated service class HttpService {
124
107
return params ;
125
108
}
126
109
127
- isolated function handleSubscription(http : Caller caller , http : Headers headers , map < string > params ) returns Error ? {
128
- Subscription | error subscription = createSubscriptionMessage (self .hub , self .defaultHubLeaseSeconds , params );
129
- if subscription is Subscription {
130
- http : Response | Redirect result = processSubscription (subscription , headers , self .adaptor , self .isSubscriptionAvailable );
131
- if result is Redirect {
132
- error ? redirectError = caller -> redirect (new http : Response (), result .code , result .redirectUrls );
133
- if redirectError is error {
134
- log : printError (" Error occurred while redirecting the subscription" , 'error = redirectError );
135
- }
136
- } else {
137
- int currentStatusCode = result .statusCode ;
138
- if currentStatusCode == http : STATUS_ACCEPTED {
139
- check respondToRequest (caller , result );
140
- error ? verificationResult = processSubscriptionVerification (headers , self .adaptor , subscription ,
141
- self .isSubscriptionValidationAvailable , self .clientConfig );
142
- if verificationResult is error {
143
- log : printError (" Error occurred while processing subscription" , 'error = verificationResult );
144
- }
145
- return ;
146
- }
147
- return respondToRequest (caller , result );
148
- }
149
- } else {
110
+ isolated function processSubscription(http : Caller caller , http : Headers headers , map < string > params )
111
+ returns Error ? {
112
+
113
+ Subscription | error subscription = createSubscriptionMessage (self .hub , self .defaultLeaseSeconds , params );
114
+ if subscription is error {
150
115
http : Response response = new ;
151
116
response .statusCode = http : STATUS_BAD_REQUEST ;
152
117
response .setTextPayload (subscription .message ());
153
- return respondToRequest (caller , response );
118
+ return respondWithResult (caller , response );
119
+ }
120
+
121
+ http : Response | Redirect result = self .subscriptionHandler .intiateSubscription (subscription , headers );
122
+ if result is Redirect {
123
+ error ? redirectError = caller -> redirect (new http : Response (), result .code , result .redirectUrls );
124
+ if redirectError is error {
125
+ log : printError (" Error occurred while redirecting the subscription" , 'error = redirectError );
126
+ }
127
+ return ;
128
+ }
129
+
130
+ check respondWithResult (caller , result );
131
+ if result .statusCode != http : STATUS_ACCEPTED {
132
+ return ;
133
+ }
134
+
135
+ error ? verification = self .subscriptionHandler .verifySubscription (subscription , headers );
136
+ if verification is error {
137
+ log : printError (" Error occurred while processing subscription" , 'error = verification );
154
138
}
155
139
}
156
140
157
- isolated function handleUnsubscription(http : Caller caller , http : Headers headers , map < string > params ) returns Error ? {
141
+ isolated function processUnsubscription(http : Caller caller , http : Headers headers , map < string > params )
142
+ returns Error ? {
143
+
158
144
Unsubscription | error unsubscription = createUnsubscriptionMessage (params );
159
- if unsubscription is Unsubscription {
160
- http : Response result = processUnsubscription (unsubscription , headers , self .adaptor , self .isUnsubscriptionAvailable );
161
- int currentStatusCode = result .statusCode ;
162
- if currentStatusCode == http : STATUS_ACCEPTED {
163
- check respondToRequest (caller , result );
164
- error ? verificationResult = processUnSubscriptionVerification (headers , self .adaptor , unsubscription ,
165
- self .isUnsubscriptionValidationAvailable , self .clientConfig );
166
- if verificationResult is error {
167
- log : printError (" Error occurred while processing unsubscription" , 'error = verificationResult );
168
- }
169
- return ;
170
- }
171
- return respondToRequest (caller , result );
172
- } else {
145
+ if unsubscription is error {
173
146
http : Response response = new ;
174
147
response .statusCode = http : STATUS_BAD_REQUEST ;
175
148
response .setTextPayload (unsubscription .message ());
176
- return respondToRequest (caller , response );
149
+ return respondWithResult (caller , response );
177
150
}
178
- }
179
- }
180
151
181
- isolated function isMethodAvailable(string methodName , string [] methods ) returns boolean {
182
- return methods .indexOf (methodName ) is int ;
152
+ http : Response result = self .subscriptionHandler .initiateUnsubscription (unsubscription , headers );
153
+ check respondWithResult (caller , result );
154
+ if result .statusCode != http : STATUS_ACCEPTED {
155
+ return ;
156
+ }
157
+
158
+ error ? verification = self .subscriptionHandler .verifyUnsubscription (unsubscription , headers );
159
+ if verification is error {
160
+ log : printError (" Error occurred while processing unsubscription" , 'error = verification );
161
+ }
162
+ }
183
163
}
184
164
185
- isolated function handleResult(http : Caller caller , http : Response | error result ) returns Error ? {
165
+ isolated function respondWithResult(http : Caller caller , http : Response | error result ) returns Error ? {
166
+ http : ListenerError ? respondError = ();
186
167
if result is error {
187
168
http : Response response = new ;
188
169
response .statusCode = http : STATUS_BAD_REQUEST ;
189
170
response .setTextPayload (result .message ());
190
- return respondToRequest ( caller , response );
171
+ respondError = caller -> respond ( response );
191
172
} else {
192
- return respondToRequest ( caller , result );
173
+ respondError = caller -> respond ( result );
193
174
}
194
- }
195
175
196
- isolated function respondToRequest(http : Caller caller , http : Response response ) returns Error ? {
197
- http : ListenerError ? responseError = caller -> respond (response );
198
- if responseError is http : ListenerError {
199
- return error Error (
200
- " Error occurred while responding to the request " , responseError , statusCode = http : STATUS_INTERNAL_SERVER_ERROR );
176
+ if respondError is http : ListenerError {
177
+ return error Error (" Error occurred while responding to the request " , respondError ,
178
+ statusCode = http : STATUS_INTERNAL_SERVER_ERROR );
201
179
}
202
180
}
0 commit comments