1818 */
1919package org .elasticsearch .repositories .s3 ;
2020
21- import com .amazonaws .services .s3 .AmazonS3 ;
22- import com .amazonaws .services .s3 .model .CannedAccessControlList ;
23- import com .amazonaws .services .s3 .model .StorageClass ;
21+ import com .sun .net .httpserver .HttpExchange ;
22+ import com .sun .net .httpserver .HttpHandler ;
23+ import com .sun .net .httpserver .HttpServer ;
24+ import org .elasticsearch .common .SuppressForbidden ;
25+ import org .elasticsearch .common .bytes .BytesReference ;
26+ import org .elasticsearch .common .io .Streams ;
27+ import org .elasticsearch .common .network .InetAddresses ;
28+ import org .elasticsearch .common .regex .Regex ;
29+ import org .elasticsearch .common .settings .MockSecureSettings ;
30+ import org .elasticsearch .common .settings .Setting ;
2431import org .elasticsearch .common .settings .Settings ;
25- import org .elasticsearch .common .unit .ByteSizeUnit ;
26- import org .elasticsearch .common .unit .ByteSizeValue ;
27- import org .elasticsearch .common .xcontent .NamedXContentRegistry ;
28- import org .elasticsearch .env .Environment ;
32+ import org .elasticsearch .mocksocket .MockHttpServer ;
2933import org .elasticsearch .plugins .Plugin ;
30- import org .elasticsearch .repositories .Repository ;
3134import org .elasticsearch .repositories .blobstore .ESBlobStoreRepositoryIntegTestCase ;
32- import org .elasticsearch .threadpool .ThreadPool ;
35+ import org .elasticsearch .rest .RestStatus ;
36+ import org .elasticsearch .rest .RestUtils ;
3337import org .junit .After ;
38+ import org .junit .AfterClass ;
39+ import org .junit .Before ;
3440import org .junit .BeforeClass ;
3541
42+ import java .io .IOException ;
43+ import java .io .InputStreamReader ;
44+ import java .net .InetAddress ;
45+ import java .net .InetSocketAddress ;
46+ import java .nio .charset .StandardCharsets ;
47+ import java .util .ArrayList ;
3648import java .util .Collection ;
3749import java .util .Collections ;
38- import java .util .Locale ;
50+ import java .util .HashMap ;
51+ import java .util .Iterator ;
52+ import java .util .List ;
3953import java .util .Map ;
4054import java .util .concurrent .ConcurrentHashMap ;
4155import java .util .concurrent .ConcurrentMap ;
4256
57+ import static java .nio .charset .StandardCharsets .UTF_8 ;
58+ import static org .hamcrest .Matchers .nullValue ;
59+
60+ @ SuppressForbidden (reason = "this test uses a HttpServer to emulate an S3 endpoint" )
4361public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase {
4462
45- private static final ConcurrentMap <String , byte []> blobs = new ConcurrentHashMap <>();
46- private static String bucket ;
47- private static ByteSizeValue bufferSize ;
48- private static boolean serverSideEncryption ;
49- private static String cannedACL ;
50- private static String storageClass ;
63+ private static HttpServer httpServer ;
5164
5265 @ BeforeClass
53- public static void setUpRepositorySettings () {
54- bucket = randomAlphaOfLength (randomIntBetween (1 , 10 )).toLowerCase (Locale .ROOT );
55- bufferSize = new ByteSizeValue (randomIntBetween (5 , 50 ), ByteSizeUnit .MB );
56- serverSideEncryption = randomBoolean ();
57- if (randomBoolean ()) {
58- cannedACL = randomFrom (CannedAccessControlList .values ()).toString ();
59- }
60- if (randomBoolean ()) {
61- storageClass = randomValueOtherThan (StorageClass .Glacier , () -> randomFrom (StorageClass .values ())).toString ();
62- }
66+ public static void startHttpServer () throws Exception {
67+ httpServer = MockHttpServer .createHttp (new InetSocketAddress (InetAddress .getLoopbackAddress (), 0 ), 0 );
68+ httpServer .start ();
69+ }
70+
71+ @ Before
72+ public void setUpHttpServer () {
73+ httpServer .createContext ("/bucket" , new InternalHttpHandler ());
74+ }
75+
76+ @ AfterClass
77+ public static void stopHttpServer () {
78+ httpServer .stop (0 );
79+ httpServer = null ;
6380 }
6481
6582 @ After
66- public void wipeRepository () {
67- blobs . clear ( );
83+ public void tearDownHttpServer () {
84+ httpServer . removeContext ( "/bucket" );
6885 }
6986
7087 @ Override
@@ -75,11 +92,8 @@ protected String repositoryType() {
7592 @ Override
7693 protected Settings repositorySettings () {
7794 return Settings .builder ()
78- .put (S3Repository .BUCKET_SETTING .getKey (), bucket )
79- .put (S3Repository .BUFFER_SIZE_SETTING .getKey (), bufferSize )
80- .put (S3Repository .SERVER_SIDE_ENCRYPTION_SETTING .getKey (), serverSideEncryption )
81- .put (S3Repository .CANNED_ACL_SETTING .getKey (), cannedACL )
82- .put (S3Repository .STORAGE_CLASS_SETTING .getKey (), storageClass )
95+ .put (S3Repository .BUCKET_SETTING .getKey (), "bucket" )
96+ .put (S3Repository .CLIENT_NAME .getKey (), "test" )
8397 .build ();
8498 }
8599
@@ -88,22 +102,131 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
88102 return Collections .singletonList (TestS3RepositoryPlugin .class );
89103 }
90104
105+ @ Override
106+ protected Settings nodeSettings (int nodeOrdinal ) {
107+ final MockSecureSettings secureSettings = new MockSecureSettings ();
108+ secureSettings .setString (S3ClientSettings .ACCESS_KEY_SETTING .getConcreteSettingForNamespace ("test" ).getKey (), "access" );
109+ secureSettings .setString (S3ClientSettings .SECRET_KEY_SETTING .getConcreteSettingForNamespace ("test" ).getKey (), "secret" );
110+
111+ final InetSocketAddress address = httpServer .getAddress ();
112+ final String endpoint = "http://" + InetAddresses .toUriString (address .getAddress ()) + ":" + address .getPort ();
113+
114+ return Settings .builder ()
115+ .put (Settings .builder ()
116+ .put (S3ClientSettings .ENDPOINT_SETTING .getConcreteSettingForNamespace ("test" ).getKey (), endpoint )
117+ .put (S3ClientSettings .DISABLE_CHUNKED_ENCODING .getConcreteSettingForNamespace ("test" ).getKey (), true )
118+ .build ())
119+ .put (super .nodeSettings (nodeOrdinal ))
120+ .setSecureSettings (secureSettings )
121+ .build ();
122+ }
123+
91124 public static class TestS3RepositoryPlugin extends S3RepositoryPlugin {
92125
93126 public TestS3RepositoryPlugin (final Settings settings ) {
94127 super (settings );
95128 }
96129
97130 @ Override
98- public Map <String , Repository .Factory > getRepositories (final Environment env , final NamedXContentRegistry registry ,
99- final ThreadPool threadPool ) {
100- return Collections .singletonMap (S3Repository .TYPE ,
101- metadata -> new S3Repository (metadata , registry , new S3Service () {
102- @ Override
103- AmazonS3 buildClient (S3ClientSettings clientSettings ) {
104- return new MockAmazonS3 (blobs , bucket , serverSideEncryption , cannedACL , storageClass );
131+ public List <Setting <?>> getSettings () {
132+ final List <Setting <?>> settings = new ArrayList <>(super .getSettings ());
133+ // Disable chunked encoding as it simplifies a lot the request parsing on the httpServer side
134+ settings .add (S3ClientSettings .DISABLE_CHUNKED_ENCODING );
135+ return settings ;
136+ }
137+ }
138+
139+ /**
140+ * Minimal HTTP handler that acts as a S3 compliant server
141+ */
142+ @ SuppressForbidden (reason = "this test uses a HttpServer to emulate an S3 endpoint" )
143+ private static class InternalHttpHandler implements HttpHandler {
144+
145+ private final ConcurrentMap <String , BytesReference > blobs = new ConcurrentHashMap <>();
146+
147+ @ Override
148+ public void handle (final HttpExchange exchange ) throws IOException {
149+ final String request = exchange .getRequestMethod () + " " + exchange .getRequestURI ().toString ();
150+ try {
151+ if (Regex .simpleMatch ("PUT /bucket/*" , request )) {
152+ blobs .put (exchange .getRequestURI ().toString (), Streams .readFully (exchange .getRequestBody ()));
153+ exchange .sendResponseHeaders (RestStatus .OK .getStatus (), -1 );
154+
155+ } else if (Regex .simpleMatch ("GET /bucket/?prefix=*" , request )) {
156+ final Map <String , String > params = new HashMap <>();
157+ RestUtils .decodeQueryString (exchange .getRequestURI ().getQuery (), 0 , params );
158+ assertThat ("Test must be adapted for GET Bucket (List Objects) Version 2" , params .get ("list-type" ), nullValue ());
159+
160+ final StringBuilder list = new StringBuilder ();
161+ list .append ("<?xml version=\" 1.0\" encoding=\" UTF-8\" ?>" );
162+ list .append ("<ListBucketResult>" );
163+ final String prefix = params .get ("prefix" );
164+ if (prefix != null ) {
165+ list .append ("<Prefix>" ).append (prefix ).append ("</Prefix>" );
166+ }
167+ for (Map .Entry <String , BytesReference > blob : blobs .entrySet ()) {
168+ if (prefix == null || blob .getKey ().startsWith ("/bucket/" + prefix )) {
169+ list .append ("<Contents>" );
170+ list .append ("<Key>" ).append (blob .getKey ().replace ("/bucket/" , "" )).append ("</Key>" );
171+ list .append ("<Size>" ).append (blob .getValue ().length ()).append ("</Size>" );
172+ list .append ("</Contents>" );
105173 }
106- }, threadPool ));
174+ }
175+ list .append ("</ListBucketResult>" );
176+
177+ byte [] response = list .toString ().getBytes (StandardCharsets .UTF_8 );
178+ exchange .getResponseHeaders ().add ("Content-Type" , "application/xml" );
179+ exchange .sendResponseHeaders (RestStatus .OK .getStatus (), response .length );
180+ exchange .getResponseBody ().write (response );
181+
182+ } else if (Regex .simpleMatch ("GET /bucket/*" , request )) {
183+ final BytesReference blob = blobs .get (exchange .getRequestURI ().toString ());
184+ if (blob != null ) {
185+ exchange .getResponseHeaders ().add ("Content-Type" , "application/octet-stream" );
186+ exchange .sendResponseHeaders (RestStatus .OK .getStatus (), blob .length ());
187+ blob .writeTo (exchange .getResponseBody ());
188+ } else {
189+ exchange .sendResponseHeaders (RestStatus .NOT_FOUND .getStatus (), -1 );
190+ }
191+
192+ } else if (Regex .simpleMatch ("DELETE /bucket/*" , request )) {
193+ int deletions = 0 ;
194+ for (Iterator <Map .Entry <String , BytesReference >> iterator = blobs .entrySet ().iterator (); iterator .hasNext (); ) {
195+ Map .Entry <String , BytesReference > blob = iterator .next ();
196+ if (blob .getKey ().startsWith (exchange .getRequestURI ().toString ())) {
197+ iterator .remove ();
198+ deletions ++;
199+ }
200+ }
201+ exchange .sendResponseHeaders ((deletions > 0 ? RestStatus .OK : RestStatus .NO_CONTENT ).getStatus (), -1 );
202+
203+ } else if (Regex .simpleMatch ("POST /bucket/?delete" , request )) {
204+ final String requestBody = Streams .copyToString (new InputStreamReader (exchange .getRequestBody (), UTF_8 ));
205+
206+ final StringBuilder deletes = new StringBuilder ();
207+ deletes .append ("<?xml version=\" 1.0\" encoding=\" UTF-8\" ?>" );
208+ deletes .append ("<DeleteResult>" );
209+ for (Iterator <Map .Entry <String , BytesReference >> iterator = blobs .entrySet ().iterator (); iterator .hasNext (); ) {
210+ Map .Entry <String , BytesReference > blob = iterator .next ();
211+ String key = blob .getKey ().replace ("/bucket/" , "" );
212+ if (requestBody .contains ("<Key>" + key + "</Key>" )) {
213+ deletes .append ("<Deleted><Key>" ).append (key ).append ("</Key></Deleted>" );
214+ iterator .remove ();
215+ }
216+ }
217+ deletes .append ("</DeleteResult>" );
218+
219+ byte [] response = deletes .toString ().getBytes (StandardCharsets .UTF_8 );
220+ exchange .getResponseHeaders ().add ("Content-Type" , "application/xml" );
221+ exchange .sendResponseHeaders (RestStatus .OK .getStatus (), response .length );
222+ exchange .getResponseBody ().write (response );
223+
224+ } else {
225+ exchange .sendResponseHeaders (RestStatus .INTERNAL_SERVER_ERROR .getStatus (), -1 );
226+ }
227+ } finally {
228+ exchange .close ();
229+ }
107230 }
108231 }
109232}
0 commit comments