diff --git a/sdk/core/azure-core/pom.xml b/sdk/core/azure-core/pom.xml index e7f2494f7fd8..ab88f5934cd7 100644 --- a/sdk/core/azure-core/pom.xml +++ b/sdk/core/azure-core/pom.xml @@ -265,7 +265,6 @@ --add-opens com.azure.core/com.azure.core.implementation.models.jsonflatten=com.fasterxml.jackson.databind --add-opens com.azure.core/com.azure.core.implementation.models.jsonflatten=ALL-UNNAMED --add-opens com.azure.core/com.azure.core.implementation.serializer=ALL-UNNAMED - --add-opens com.azure.core/com.azure.core.implementation.util=ALL-UNNAMED --add-opens com.azure.core/com.azure.core.models=ALL-UNNAMED --add-opens com.azure.core/com.azure.core.util=ALL-UNNAMED --add-opens com.azure.core/com.azure.core.util.jsonpatch=ALL-UNNAMED diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/http/HttpRequest.java b/sdk/core/azure-core/src/main/java/com/azure/core/http/HttpRequest.java index 1f903a7d5982..4e78b3e06d6d 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/http/HttpRequest.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/http/HttpRequest.java @@ -3,14 +3,13 @@ package com.azure.core.http; -import com.azure.core.implementation.util.FluxByteBufferContent; -import com.azure.core.util.RequestContent; import com.azure.core.util.logging.ClientLogger; import reactor.core.publisher.Flux; import java.net.MalformedURLException; import java.net.URL; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; /** * The outgoing Http request. It provides ways to construct {@link HttpRequest} with {@link HttpMethod}, {@link URL}, @@ -22,7 +21,7 @@ public class HttpRequest { private HttpMethod httpMethod; private URL url; private HttpHeaders headers; - private RequestContent requestContent; + private Flux body; /** * Create a new HttpRequest instance. @@ -31,7 +30,7 @@ public class HttpRequest { * @param url the target address to send the request to */ public HttpRequest(HttpMethod httpMethod, URL url) { - this(httpMethod, url, new HttpHeaders(), (RequestContent) null); + this(httpMethod, url, new HttpHeaders(), null); } /** @@ -60,22 +59,10 @@ public HttpRequest(HttpMethod httpMethod, String url) { * @param body the request content */ public HttpRequest(HttpMethod httpMethod, URL url, HttpHeaders headers, Flux body) { - this(httpMethod, url, headers, new FluxByteBufferContent(body)); - } - - /** - * Creates a new {@link HttpRequest} instance. - * - * @param httpMethod The HTTP request method. - * @param url The target address to send the request. - * @param headers The HTTP headers of the request. - * @param requestContent The {@link RequestContent}. - */ - public HttpRequest(HttpMethod httpMethod, URL url, HttpHeaders headers, RequestContent requestContent) { this.httpMethod = httpMethod; this.url = url; this.headers = headers; - this.requestContent = requestContent; + this.body = body; } /** @@ -173,7 +160,7 @@ public HttpRequest setHeader(String name, String value) { * @return the content to be send */ public Flux getBody() { - return (requestContent == null) ? null : requestContent.asFluxByteBuffer(); + return body; } /** @@ -185,7 +172,8 @@ public Flux getBody() { * @return this HttpRequest */ public HttpRequest setBody(String content) { - return setRequestContent(RequestContent.fromString(content)); + final byte[] bodyBytes = content.getBytes(StandardCharsets.UTF_8); + return setBody(bodyBytes); } /** @@ -211,36 +199,7 @@ public HttpRequest setBody(byte[] content) { * @return this HttpRequest */ public HttpRequest setBody(Flux content) { - this.requestContent = new FluxByteBufferContent(content); - return this; - } - - /** - * Gets the HttpRequest's {@link RequestContent}. - * - * @return The {@link RequestContent}. - */ - public RequestContent getRequestContent() { - return this.requestContent; - } - - /** - * Sets the {@link RequestContent}. - *

- * If {@link RequestContent#getLength()} returns null for the passed {@link RequestContent} the caller must set the - * Content-Length header to indicate the length of the content, or use Transfer-Encoding: chunked. Otherwise, {@link - * RequestContent#getLength()} will be used to set the Content-Length header. - * - * @param requestContent The {@link RequestContent}. - * @return The updated HttpRequest object. - */ - public HttpRequest setRequestContent(RequestContent requestContent) { - Long requestContentLength = requestContent.getLength(); - if (requestContentLength != null) { - setContentLength(requestContentLength); - } - - this.requestContent = requestContent; + this.body = content; return this; } @@ -259,6 +218,6 @@ private void setContentLength(long contentLength) { */ public HttpRequest copy() { final HttpHeaders bufferedHeaders = new HttpHeaders(headers); - return new HttpRequest(httpMethod, url, bufferedHeaders, requestContent); + return new HttpRequest(httpMethod, url, bufferedHeaders, body); } } diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/ArrayContent.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/ArrayContent.java deleted file mode 100644 index cae8dd1510fa..000000000000 --- a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/ArrayContent.java +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.core.implementation.util; - -import com.azure.core.util.CoreUtils; -import com.azure.core.util.RequestContent; -import reactor.core.publisher.Flux; - -import java.nio.ByteBuffer; - -/** - * A {@link RequestContent} implementation which is backed by a {@code byte[]}. - */ -public final class ArrayContent extends RequestContent { - private final byte[] content; - private final int offset; - private final int length; - - /** - * Creates a new instance of {@link ArrayContent}. - * - * @param content The {@code byte[]} content. - * @param offset The offset in the array to begin reading data. - * @param length The length of the content. - */ - public ArrayContent(byte[] content, int offset, int length) { - this.content = CoreUtils.clone(content); - this.offset = offset; - this.length = length; - } - - @Override - public Flux asFluxByteBuffer() { - return Flux.defer(() -> Flux.just(ByteBuffer.wrap(content, offset, length))); - } - - @Override - public Long getLength() { - return (long) length; - } -} diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/ByteBufferContent.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/ByteBufferContent.java deleted file mode 100644 index 14c892da06a3..000000000000 --- a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/ByteBufferContent.java +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.core.implementation.util; - -import com.azure.core.util.RequestContent; -import reactor.core.publisher.Flux; - -import java.nio.ByteBuffer; - -/** - * A {@link RequestContent} implementation which is backed by a {@link ByteBuffer}. - */ -public final class ByteBufferContent extends RequestContent { - private final ByteBuffer byteBuffer; - private final long length; - - /** - * Creates a new instance of {@link ByteBufferContent}. - * - * @param byteBuffer The {@link ByteBuffer} content. - */ - public ByteBufferContent(ByteBuffer byteBuffer) { - this.byteBuffer = byteBuffer; - this.length = byteBuffer.remaining(); - } - - @Override - public Flux asFluxByteBuffer() { - // Duplicate the ByteBuffer so that each invocation of this method uses a fully readable ByteBuffer. - return Flux.defer(() -> Flux.just(byteBuffer.duplicate())); - } - - @Override - public Long getLength() { - return length; - } -} diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/FileContent.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/FileContent.java deleted file mode 100644 index 251fbf0456b4..000000000000 --- a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/FileContent.java +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.core.implementation.util; - -import com.azure.core.util.RequestContent; -import com.azure.core.util.logging.ClientLogger; -import reactor.core.Exceptions; -import reactor.core.publisher.Flux; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.Path; - -/** - * A {@link RequestContent} implementation which is backed by a file. - */ -public final class FileContent extends RequestContent { - private final ClientLogger logger = new ClientLogger(FileContent.class); - - private final Path file; - private final long offset; - private final long length; - private final int chunkSize; - - /** - * Creates a new instance of {@link FileContent}. - * - * @param file The {@link Path} content. - * @param offset The offset in the {@link Path} to begin reading data. - * @param length The length of the content. - * @param chunkSize The requested size for each read of the path. - */ - public FileContent(Path file, long offset, long length, int chunkSize) { - this.file = file; - this.offset = offset; - this.length = length; - this.chunkSize = chunkSize; - } - - @Override - public Flux asFluxByteBuffer() { - return Flux.using(() -> FileChannel.open(file), channel -> Flux.generate(() -> 0, (count, sink) -> { - if (count == length) { - sink.complete(); - return count; - } - - int readCount = (int) Math.min(chunkSize, length - count); - try { - sink.next(channel.map(FileChannel.MapMode.READ_ONLY, offset + count, readCount)); - } catch (IOException ex) { - sink.error(ex); - } - - return count + readCount; - }), channel -> { - try { - channel.close(); - } catch (IOException ex) { - throw logger.logExceptionAsError(Exceptions.propagate(ex)); - } - }); - } - - @Override - public Long getLength() { - return length; - } -} diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/FluxByteBufferContent.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/FluxByteBufferContent.java deleted file mode 100644 index 14a4b793c7ff..000000000000 --- a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/FluxByteBufferContent.java +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.core.implementation.util; - -import com.azure.core.util.RequestContent; -import reactor.core.publisher.Flux; - -import java.nio.ByteBuffer; - -/** - * A {@link RequestContent} implementation which is backed by a {@link Flux} of {@link ByteBuffer}. - */ -public class FluxByteBufferContent extends RequestContent { - private final Flux content; - private final Long length; - - /** - * Creates a new instance of {@link FluxByteBufferContent}. - * - * @param content The {@link Flux} of {@link ByteBuffer} content. - */ - public FluxByteBufferContent(Flux content) { - this(content, null); - } - - /** - * Creates a new instance of {@link FluxByteBufferContent}. - * - * @param content The {@link Flux} of {@link ByteBuffer} content. - * @param length The length of the content, may be null. - */ - public FluxByteBufferContent(Flux content, Long length) { - this.content = content; - this.length = length; - } - - @Override - public Flux asFluxByteBuffer() { - return content; - } - - @Override - public Long getLength() { - return length; - } -} diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/InputStreamContent.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/InputStreamContent.java deleted file mode 100644 index 2cb6f81d0cda..000000000000 --- a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/InputStreamContent.java +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.core.implementation.util; - -import com.azure.core.util.FluxUtil; -import com.azure.core.util.RequestContent; -import reactor.core.publisher.Flux; - -import java.io.InputStream; -import java.nio.ByteBuffer; - -/** - * A {@link RequestContent} implementation which is backed by an {@link InputStream}. - */ -public class InputStreamContent extends RequestContent { - private final InputStream content; - private final Long length; - private final int chunkSize; - - /** - * Creates a new instance of {@link InputStreamContent}. - * - * @param content The {@link InputStream} content. - * @param length The length of the content, may be null. - * @param chunkSize The requested size for each {@link InputStream#read(byte[])}. - */ - public InputStreamContent(InputStream content, Long length, int chunkSize) { - this.content = content; - this.length = length; - this.chunkSize = chunkSize; - } - - @Override - public Flux asFluxByteBuffer() { - return FluxUtil.toFluxByteBuffer(content, chunkSize); - } - - @Override - public Long getLength() { - return length; - } -} diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/SerializableContent.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/SerializableContent.java deleted file mode 100644 index 11d6478a36bd..000000000000 --- a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/SerializableContent.java +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.core.implementation.util; - -import com.azure.core.util.RequestContent; -import com.azure.core.util.serializer.ObjectSerializer; -import reactor.core.publisher.Flux; - -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicReference; - -/** - * A {@link RequestContent} implementation which is backed by a serializable object. - */ -public final class SerializableContent extends RequestContent { - private final Object serializable; - private final ObjectSerializer objectSerializer; - - private final AtomicReference serializedObject = new AtomicReference<>(); - - /** - * Creates a new instance of {@link SerializableContent}. - * - * @param serializable The serializable {@link Object} content. - * @param objectSerializer The {@link ObjectSerializer} that will serialize the {@link Object} content. - */ - public SerializableContent(Object serializable, ObjectSerializer objectSerializer) { - this.serializable = serializable; - this.objectSerializer = objectSerializer; - } - - @Override - public Flux asFluxByteBuffer() { - serializedObject.compareAndSet(null, objectSerializer.serializeToBytes(serializable)); - - return Flux.defer(() -> Flux.just(ByteBuffer.wrap(serializedObject.get()).asReadOnlyBuffer())); - } - - @Override - public Long getLength() { - return null; - } -} diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/package-info.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/package-info.java deleted file mode 100644 index 5880dc06a054..000000000000 --- a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/package-info.java +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -/** - * Package containing implementation utilities. - */ -package com.azure.core.implementation.util; diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/util/RequestContent.java b/sdk/core/azure-core/src/main/java/com/azure/core/util/RequestContent.java deleted file mode 100644 index 18b0d5754f16..000000000000 --- a/sdk/core/azure-core/src/main/java/com/azure/core/util/RequestContent.java +++ /dev/null @@ -1,304 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.core.util; - -import com.azure.core.implementation.util.ArrayContent; -import com.azure.core.implementation.util.ByteBufferContent; -import com.azure.core.implementation.util.FileContent; -import com.azure.core.implementation.util.FluxByteBufferContent; -import com.azure.core.implementation.util.InputStreamContent; -import com.azure.core.implementation.util.SerializableContent; -import com.azure.core.util.logging.ClientLogger; -import com.azure.core.util.serializer.JsonSerializerProviders; -import com.azure.core.util.serializer.ObjectSerializer; -import reactor.core.publisher.Flux; - -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.nio.file.Path; -import java.util.Objects; - -/** - * Represents the content sent as part of a request. - */ -public abstract class RequestContent { - private static final ClientLogger LOGGER = new ClientLogger(RequestContent.class); - - /** - * Converts the {@link RequestContent} into a {@code Flux} for use in reactive streams. - * - * @return The {@link RequestContent} as a {@code Flux}. - */ - public abstract Flux asFluxByteBuffer(); - - /** - * Gets the length of the {@link RequestContent} if it is able to be calculated. - *

- * If the content length isn't able to be calculated null will be returned. - * - * @return The length of the {@link RequestContent} if it is able to be calculated, otherwise null. - */ - public abstract Long getLength(); - - /** - * Creates a {@link RequestContent} that uses {@code byte[]} as its data. - * - * @param bytes The bytes that will be the {@link RequestContent} data. - * @return A new {@link RequestContent}. - * @throws NullPointerException If {@code bytes} is null. - */ - public static RequestContent fromBytes(byte[] bytes) { - Objects.requireNonNull(bytes, "'bytes' cannot be null."); - return fromBytes(bytes, 0, bytes.length); - } - - /** - * Creates a {@link RequestContent} that uses {@code byte[]} as its data. - * - * @param bytes The bytes that will be the {@link RequestContent} data. - * @param offset Offset in the bytes where the data will begin. - * @param length Length of the data. - * @return A new {@link RequestContent}. - * @throws NullPointerException If {@code bytes} is null. - * @throws IllegalArgumentException If {@code offset} or {@code length} are negative or {@code offset} plus {@code - * length} is greater than {@code bytes.length}. - */ - public static RequestContent fromBytes(byte[] bytes, int offset, int length) { - Objects.requireNonNull(bytes, "'bytes' cannot be null."); - if (offset < 0) { - throw LOGGER.logExceptionAsError(new IllegalArgumentException("'offset' cannot be negative.")); - } - if (length < 0) { - throw LOGGER.logExceptionAsError(new IllegalArgumentException("'length' cannot be negative.")); - } - if (offset + length > bytes.length) { - throw LOGGER.logExceptionAsError(new IllegalArgumentException( - "'offset' plus 'length' cannot be greater than 'bytes.length'.")); - } - - return new ArrayContent(bytes, offset, length); - } - - /** - * Creates a {@link RequestContent} that uses {@link String} as its data. - *

- * The passed {@link String} is converted using {@link StandardCharsets#UTF_8}, if another character set is required - * use {@link #fromBytes(byte[])} and pass {@link String#getBytes(Charset)} using the required character set. - * - * @param content The string that will be the {@link RequestContent} data. - * @return A new {@link RequestContent}. - * @throws NullPointerException If {@code content} is null. - */ - public static RequestContent fromString(String content) { - Objects.requireNonNull(content, "'content' cannot be null."); - return fromBytes(content.getBytes(StandardCharsets.UTF_8)); - } - - /** - * Creates a {@link RequestContent} that uses {@link BinaryData} as its data. - * - * @param content The {@link BinaryData} that will be the {@link RequestContent} data. - * @return A new {@link RequestContent}. - * @throws NullPointerException If {@code content} is null. - */ - public static RequestContent fromBinaryData(BinaryData content) { - Objects.requireNonNull(content, "'content' cannot be null."); - return new ByteBufferContent(content.toByteBuffer()); - } - - /** - * Creates a {@link RequestContent} that uses {@link Path} as its data. - * - * @param file The {@link Path} that will be the {@link RequestContent} data. - * @return A new {@link RequestContent}. - * @throws NullPointerException If {@code file} is null. - */ - public static RequestContent fromFile(Path file) { - Objects.requireNonNull(file, "'file' cannot be null."); - return fromFile(file, 0, file.toFile().length(), 8092); - } - - /** - * Creates a {@link RequestContent} that uses {@link Path} as its data. - * - * @param file The {@link Path} that will be the {@link RequestContent} data. - * @param offset Offset in the {@link Path} where the data will begin. - * @param length Length of the data. - * @param chunkSize The requested size for each read of the path. - * @return A new {@link RequestContent}. - * @throws NullPointerException If {@code file} is null. - * @throws IllegalArgumentException If {@code offset} or {@code length} are negative or {@code offset} plus {@code - * length} is greater than the file size or {@code chunkSize} is less than or equal to 0. - */ - public static RequestContent fromFile(Path file, long offset, long length, int chunkSize) { - Objects.requireNonNull(file, "'file' cannot be null."); - if (offset < 0) { - throw LOGGER.logExceptionAsError(new IllegalArgumentException("'offset' cannot be negative.")); - } - if (length < 0) { - throw LOGGER.logExceptionAsError(new IllegalArgumentException("'length' cannot be negative.")); - } - if (offset + length > file.toFile().length()) { - throw LOGGER.logExceptionAsError(new IllegalArgumentException( - "'offset' plus 'length' cannot be greater than the file's size.")); - } - if (chunkSize <= 0) { - throw LOGGER.logExceptionAsError(new IllegalArgumentException( - "'chunkSize' cannot be less than or equal to 0.")); - } - - return new FileContent(file, offset, length, chunkSize); - } - - /** - * Creates a {@link RequestContent} that uses a serialized {@link Object} as its data. - *

- * This uses an {@link ObjectSerializer} found on the classpath. - *

- * The {@link RequestContent} returned has a null {@link #getLength()}, if the length of the content is needed use - * {@link BinaryData#fromObject(Object)} and {@link RequestContent#fromBinaryData(BinaryData)} to create the request - * content. - * - * @param serializable An {@link Object} that will be serialized to be the {@link RequestContent} data. - * @return A new {@link RequestContent}. - */ - public static RequestContent fromObject(Object serializable) { - return fromObject(serializable, JsonSerializerProviders.createInstance(true)); - } - - /** - * Creates a {@link RequestContent} that uses a serialized {@link Object} as its data. - *

- * The {@link RequestContent} returned has a null {@link #getLength()}, if the length of the content is needed use - * {@link BinaryData#fromObject(Object, ObjectSerializer)} and {@link RequestContent#fromBinaryData(BinaryData)} to - * create the request content. - * - * @param serializable An {@link Object} that will be serialized to be the {@link RequestContent} data. - * @param serializer The {@link ObjectSerializer} that will serialize the {@link Object}. - * @return A new {@link RequestContent}. - * @throws NullPointerException If {@code serializer} is null. - */ - public static RequestContent fromObject(Object serializable, ObjectSerializer serializer) { - Objects.requireNonNull(serializer, "'serializer' cannot be null."); - return new SerializableContent(serializable, serializer); - } - - /** - * Creates a {@link RequestContent} that uses a {@link Flux} of {@link ByteBuffer} as its data. - *

- * {@link RequestContent#getLength()} will be null if this factory method is used, if the length needs to be - * non-null use {@link RequestContent#fromFlux(Flux, long)}. - *

- * The {@link RequestContent} created by this factory method doesn't buffer the passed {@link Flux} of {@link - * ByteBuffer}, if the content must be replay-able the passed {@link Flux} of {@link ByteBuffer} must be replay-able - * as well. - * - * @param content The {@link Flux} of {@link ByteBuffer} that will be the {@link RequestContent} data. - * @return A new {@link RequestContent}. - * @throws NullPointerException If {@code content} is null. - */ - public static RequestContent fromFlux(Flux content) { - Objects.requireNonNull(content, "'content' cannot be null."); - return new FluxByteBufferContent(content); - } - - /** - * Creates a {@link RequestContent} that uses a {@link Flux} of {@link ByteBuffer} as its data. - *

- * The {@link RequestContent} created by this factory method doesn't buffer the passed {@link Flux} of {@link - * ByteBuffer}, if the content must be replay-able the passed {@link Flux} of {@link ByteBuffer} must be replay-able - * as well. - * - * @param content The {@link Flux} of {@link ByteBuffer} that will be the {@link RequestContent} data. - * @param length The length of the content. - * @return A new {@link RequestContent}. - * @throws NullPointerException If {@code content} is null. - * @throws IllegalStateException If {@code length} is less than 0. - */ - public static RequestContent fromFlux(Flux content, long length) { - Objects.requireNonNull(content, "'content' cannot be null."); - if (length < 0) { - throw LOGGER.logExceptionAsError(new IllegalArgumentException("'length' cannot be less than 0.")); - } - - return new FluxByteBufferContent(content, length); - } - - /** - * Creates a {@link RequestContent} that uses a {@link BufferedFluxByteBuffer} as its data. - *

- * {@link RequestContent#getLength()} will be null if this factory method is used, if the length needs to be - * non-null use {@link RequestContent#fromBufferedFlux(BufferedFluxByteBuffer, long)}. - * - * @param content The {@link BufferedFluxByteBuffer} that will be the {@link RequestContent} data. - * @return A new {@link RequestContent}. - * @throws NullPointerException If {@code content} is null. - */ - static RequestContent fromBufferedFlux(BufferedFluxByteBuffer content) { - Objects.requireNonNull(content, "'content' cannot be null."); - return new FluxByteBufferContent(content); - } - - /** - * Creates a {@link RequestContent} that uses a {@link BufferedFluxByteBuffer} as its data. - * - * @param content The {@link BufferedFluxByteBuffer} that will be the {@link RequestContent} data. - * @param length The length of the content. - * @return A new {@link RequestContent}. - * @throws NullPointerException If {@code content} is null. - * @throws IllegalStateException If {@code length} is less than 0. - */ - static RequestContent fromBufferedFlux(BufferedFluxByteBuffer content, long length) { - Objects.requireNonNull(content, "'content' cannot be null."); - if (length < 0) { - throw LOGGER.logExceptionAsError(new IllegalArgumentException("'length' cannot be less than 0.")); - } - - return new FluxByteBufferContent(content, length); - } - - /** - * Creates a {@link RequestContent} that uses an {@link InputStream} as its data. - *

- * {@link RequestContent#getLength()} will be null if this factory method is used, if the length needs to be - * non-null use {@link RequestContent#fromInputStream(InputStream, long, int)}. - * - * @param content The {@link InputStream} that will be the {@link RequestContent} data. - * @return A new {@link RequestContent}. - * @throws NullPointerException If {@code inputStream} is null. - */ - public static RequestContent fromInputStream(InputStream content) { - return fromInputStreamInternal(content, null, 8092); - } - - /** - * Creates a {@link RequestContent} that uses an {@link InputStream} as its data. - * - * @param content The {@link InputStream} that will be the {@link RequestContent} data. - * @param length The length of the content. - * @param chunkSize The requested size for each {@link InputStream#read(byte[])}. - * @return A new {@link RequestContent}. - * @throws NullPointerException If {@code inputStream} is null. - * @throws IllegalArgumentException If {@code length} is less than 0 or {@code chunkSize} is less than or equal to - * 0. - */ - public static RequestContent fromInputStream(InputStream content, long length, int chunkSize) { - return fromInputStreamInternal(content, length, chunkSize); - } - - private static RequestContent fromInputStreamInternal(InputStream content, Long length, int chunkSize) { - Objects.requireNonNull(content, "'content' cannot be null."); - if (length != null && length < 0) { - throw LOGGER.logExceptionAsError(new IllegalArgumentException("'length' cannot be less than 0.")); - } - if (chunkSize <= 0) { - throw LOGGER.logExceptionAsError(new IllegalArgumentException( - "'chunkSize' cannot be less than or equal to 0.")); - } - - return new InputStreamContent(content, length, chunkSize); - } -} diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/credential/TokenCacheTests.java b/sdk/core/azure-core/src/test/java/com/azure/core/credential/TokenCacheTests.java index ff14ec86e8ad..d25a41e42b39 100644 --- a/sdk/core/azure-core/src/test/java/com/azure/core/credential/TokenCacheTests.java +++ b/sdk/core/azure-core/src/test/java/com/azure/core/credential/TokenCacheTests.java @@ -31,7 +31,7 @@ public void testOnlyOneThreadRefreshesToken() throws Exception { Flux.range(1, 10) .flatMap(i -> Mono.just(OffsetDateTime.now()) // Runs cache.getToken() on 10 different threads - .publishOn(Schedulers.newParallel("pool", 10)) + .publishOn(Schedulers.parallel()) .flatMap(start -> cache.getToken() .map(t -> Duration.between(start, OffsetDateTime.now()).toMillis()) .doOnNext(millis -> { @@ -69,7 +69,7 @@ public void testLongRunningWontOverflow() throws Exception { .take(100) .flatMap(i -> Mono.just(OffsetDateTime.now()) // Runs cache.getToken() on 10 different threads - .subscribeOn(Schedulers.newParallel("pool", 10)) + .subscribeOn(Schedulers.parallel()) .flatMap(start -> cache.getToken() .map(t -> Duration.between(start, OffsetDateTime.now()).toMillis()) .doOnNext(millis -> { diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/util/FileContentTests.java b/sdk/core/azure-core/src/test/java/com/azure/core/implementation/util/FileContentTests.java deleted file mode 100644 index 9637fbfbe003..000000000000 --- a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/util/FileContentTests.java +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.core.implementation.util; - -import org.junit.jupiter.api.Test; -import reactor.test.StepVerifier; - -import java.io.IOException; -import java.nio.MappedByteBuffer; -import java.nio.file.FileSystem; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.spi.FileSystemProvider; -import java.util.Objects; - -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -/** - * Tests {@link FileContent}. - */ -public class FileContentTests { - @Test - public void fileChannelOpenErrorReturnsReactively() { - Path notARealPath = Paths.get("fake"); - FileContent fileContent = new FileContent(notARealPath, 0, 1024, 8092); - - StepVerifier.create(fileContent.asFluxByteBuffer()) - .verifyError(IOException.class); - } - - @Test - public void fileChannelCloseErrorReturnsReactively() throws IOException { - MyFileChannel myFileChannel = spy(MyFileChannel.class); - when(myFileChannel.map(any(), anyLong(), anyLong())).thenReturn(mock(MappedByteBuffer.class)); - doThrow(IOException.class).when(myFileChannel).implCloseChannel(); - - FileSystemProvider fileSystemProvider = mock(FileSystemProvider.class); - when(fileSystemProvider.newFileChannel(any(), any(), any())).thenReturn(myFileChannel); - - FileSystem fileSystem = mock(FileSystem.class); - when(fileSystem.provider()).thenReturn(fileSystemProvider); - - Path path = mock(Path.class); - when(path.getFileSystem()).thenReturn(fileSystem); - - FileContent fileContent = new FileContent(path, 0, 1024, 8092); - StepVerifier.create(fileContent.asFluxByteBuffer()) - .thenConsumeWhile(Objects::nonNull) - .verifyError(IOException.class); - } - - @Test - public void fileChannelIsClosedWhenMapErrors() throws IOException { - MyFileChannel myFileChannel = spy(MyFileChannel.class); - when(myFileChannel.map(any(), anyLong(), anyLong())).thenThrow(IOException.class); - - FileSystemProvider fileSystemProvider = mock(FileSystemProvider.class); - when(fileSystemProvider.newFileChannel(any(), any(), any())).thenReturn(myFileChannel); - - FileSystem fileSystem = mock(FileSystem.class); - when(fileSystem.provider()).thenReturn(fileSystemProvider); - - Path path = mock(Path.class); - when(path.getFileSystem()).thenReturn(fileSystem); - - FileContent fileContent = new FileContent(path, 0, 1024, 8092); - StepVerifier.create(fileContent.asFluxByteBuffer()) - .thenConsumeWhile(Objects::nonNull) - .verifyError(IOException.class); - - assertFalse(myFileChannel.isOpen()); - } -} diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/util/MyFileChannel.java b/sdk/core/azure-core/src/test/java/com/azure/core/implementation/util/MyFileChannel.java deleted file mode 100644 index 21d08ee7f9d8..000000000000 --- a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/util/MyFileChannel.java +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.core.implementation.util; - -import java.io.IOException; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; - -public abstract class MyFileChannel extends FileChannel { - // Needed by Mockito - public MyFileChannel() { - super(); - } - - @Override - public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException { - return null; - } - - @Override - protected void implCloseChannel() throws IOException { - } -} diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/util/RequestContentTests.java b/sdk/core/azure-core/src/test/java/com/azure/core/util/RequestContentTests.java deleted file mode 100644 index b4fdf1dca469..000000000000 --- a/sdk/core/azure-core/src/test/java/com/azure/core/util/RequestContentTests.java +++ /dev/null @@ -1,235 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.core.util; - -import org.junit.jupiter.api.function.Executable; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import reactor.core.publisher.Flux; -import reactor.test.StepVerifier; - -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.security.SecureRandom; -import java.time.Duration; -import java.util.Arrays; -import java.util.stream.Stream; - -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * Tests {@link RequestContent}. - */ -public class RequestContentTests { - @ParameterizedTest - @MethodSource("expectedContentSupplier") - public void expectedContent(RequestContent content, boolean checkLength, byte[] expected) { - if (checkLength) { - assertEquals(expected.length, content.getLength()); - } - - StepVerifier.create(FluxUtil.collectBytesInByteBufferStream(content.asFluxByteBuffer())) - .assertNext(bytes -> assertArrayEquals(expected, bytes)) - .expectComplete() - .verify(Duration.ofSeconds(30)); - } - - private static Stream expectedContentSupplier() throws IOException { - byte[] emptyBytes = new byte[0]; - - SecureRandom random = new SecureRandom(); - byte[] randomBytes = new byte[1024 * 1024]; - random.nextBytes(randomBytes); - - String emptyString = new String(emptyBytes, StandardCharsets.UTF_8); - String randomString = new String(randomBytes, StandardCharsets.UTF_8); - // This is done as using random bytes to make a String may result in the String being longer than the initial - // byte array as missing code point bytes are packed. - byte[] randomStringBytes = randomString.getBytes(StandardCharsets.UTF_8); - - Path emptyTempFile = Files.createTempFile("emptyTempFile", ".txt"); - emptyTempFile.toFile().deleteOnExit(); - Files.write(emptyTempFile, emptyBytes); - - Path randomTempFile = Files.createTempFile("randomTempFile", ".txt"); - randomTempFile.toFile().deleteOnExit(); - Files.write(randomTempFile, randomBytes); - - Flux emptyFlux = Flux.defer(() -> Flux.just(ByteBuffer.wrap(emptyBytes))); - Flux randomFlux = Flux.defer(() -> Flux.just(ByteBuffer.wrap(randomBytes))); - Flux chunkedRandomFlux = Flux.generate(() -> 0, (offset, sink) -> { - if (offset == randomBytes.length) { - sink.complete(); - return offset; - } - - int nextLength = Math.min(1024, randomBytes.length - offset); - sink.next(ByteBuffer.wrap(Arrays.copyOfRange(randomBytes, offset, offset + nextLength))); - return offset + nextLength; - }); - - return Stream.of( - Arguments.of(RequestContent.fromBytes(emptyBytes), true, emptyBytes), - Arguments.of(RequestContent.fromBytes(emptyBytes, 0, 0), true, emptyBytes), - - Arguments.of(RequestContent.fromBytes(randomBytes), true, randomBytes), - Arguments.of(RequestContent.fromBytes(randomBytes, 0, randomBytes.length), true, randomBytes), - Arguments.of(RequestContent.fromBytes(randomBytes, 1024, 1024), true, - Arrays.copyOfRange(randomBytes, 1024, 2048)), - - Arguments.of(RequestContent.fromString(emptyString), true, emptyBytes), - Arguments.of(RequestContent.fromString(randomString), true, randomStringBytes), - - Arguments.of(RequestContent.fromFile(emptyTempFile), true, emptyBytes), - Arguments.of(RequestContent.fromFile(emptyTempFile, 0, 0, 8092), true, emptyBytes), - - Arguments.of(RequestContent.fromFile(randomTempFile), true, randomBytes), - Arguments.of(RequestContent.fromFile(randomTempFile, 0, randomBytes.length, 8092), true, randomBytes), - Arguments.of(RequestContent.fromFile(randomTempFile, 1024, 1024, 8092), true, - Arrays.copyOfRange(randomBytes, 1024, 2048)), - - Arguments.of(RequestContent.fromObject(emptyString), false, "\"\"".getBytes(StandardCharsets.UTF_8)), - - Arguments.of(RequestContent.fromFlux(emptyFlux), false, emptyBytes), - Arguments.of(RequestContent.fromFlux(emptyFlux, 0), true, emptyBytes), - - Arguments.of(RequestContent.fromFlux(randomFlux), false, randomBytes), - Arguments.of(RequestContent.fromFlux(randomFlux, randomBytes.length), true, randomBytes), - - Arguments.of(RequestContent.fromFlux(chunkedRandomFlux), false, randomBytes), - Arguments.of(RequestContent.fromFlux(chunkedRandomFlux, randomBytes.length), true, randomBytes), - - Arguments.of(RequestContent.fromBufferedFlux(new BufferedFluxByteBuffer(emptyFlux)), false, emptyBytes), - Arguments.of(RequestContent.fromBufferedFlux(new BufferedFluxByteBuffer(emptyFlux), 0), true, emptyBytes), - - Arguments.of(RequestContent.fromBufferedFlux(new BufferedFluxByteBuffer(randomFlux)), false, randomBytes), - Arguments.of(RequestContent.fromBufferedFlux(new BufferedFluxByteBuffer(randomFlux), randomBytes.length), - true, randomBytes), - - Arguments.of(RequestContent.fromBufferedFlux(new BufferedFluxByteBuffer(chunkedRandomFlux)), false, - randomBytes), - Arguments.of( - RequestContent.fromBufferedFlux(new BufferedFluxByteBuffer(chunkedRandomFlux), randomBytes.length), - true, randomBytes), - - Arguments.of(RequestContent.fromInputStream(new ByteArrayInputStream(emptyBytes)), false, emptyBytes), - Arguments.of(RequestContent.fromInputStream(new ByteArrayInputStream(emptyBytes), 0, 8092), true, - emptyBytes), - - Arguments.of(RequestContent.fromInputStream(new ByteArrayInputStream(randomBytes)), false, randomBytes), - Arguments.of(RequestContent.fromInputStream(new ByteArrayInputStream(randomBytes), randomBytes.length, - 8092), true, randomBytes) - ); - } - - @ParameterizedTest - @MethodSource("invalidArgumentSupplier") - public void invalidArgument(Executable requestContentSupplier, Class expectedException) { - assertThrows(expectedException, requestContentSupplier); - } - - private static Stream invalidArgumentSupplier() { - byte[] dummyBytes = new byte[0]; - - File mockFile = mock(File.class); - when(mockFile.length()).thenReturn(0L); - - Path mockPath = mock(Path.class); - when(mockPath.toFile()).thenReturn(mockFile); - - return Stream.of( - // bytes cannot be null - Arguments.of(createExecutable(() -> RequestContent.fromBytes(null)), NullPointerException.class), - Arguments.of(createExecutable(() -> RequestContent.fromBytes(null, 0, 0)), NullPointerException.class), - - // offset cannot be negative - Arguments.of(createExecutable(() -> RequestContent.fromBytes(dummyBytes, -1, 0)), - IllegalArgumentException.class), - - // length cannot be negative - Arguments.of(createExecutable(() -> RequestContent.fromBytes(dummyBytes, 0, -1)), - IllegalArgumentException.class), - - // offset + length cannot be greater than bytes.length - Arguments.of(createExecutable(() -> RequestContent.fromBytes(dummyBytes, 0, 1)), - IllegalArgumentException.class), - - // content cannot be null - Arguments.of(createExecutable(() -> RequestContent.fromString(null)), NullPointerException.class), - - // content cannot be null - Arguments.of(createExecutable(() -> RequestContent.fromBinaryData(null)), NullPointerException.class), - - // file cannot be null - Arguments.of(createExecutable(() -> RequestContent.fromFile(null)), NullPointerException.class), - Arguments.of(createExecutable(() -> RequestContent.fromFile(null, 0, 0, 0)), NullPointerException.class), - - // offset cannot be negative - Arguments.of(createExecutable(() -> RequestContent.fromFile(mockPath, -1, 0, 0)), - IllegalArgumentException.class), - - // length cannot be negative - Arguments.of(createExecutable(() -> RequestContent.fromFile(mockPath, 0, -1, 0)), - IllegalArgumentException.class), - - // offset + length cannot be greater than file size - Arguments.of(createExecutable(() -> RequestContent.fromFile(mockPath, 0, 1, 0)), - IllegalArgumentException.class), - - // chunkSize cannot be less than or equal to 0 - Arguments.of(createExecutable(() -> RequestContent.fromFile(mockPath, 0, 0, -1)), - IllegalArgumentException.class), - Arguments.of(createExecutable(() -> RequestContent.fromFile(mockPath, 0, 0, 0)), - IllegalArgumentException.class), - - // serializer cannot be null - Arguments.of(createExecutable(() -> RequestContent.fromObject(null, null)), NullPointerException.class), - - // content cannot be null - Arguments.of(createExecutable(() -> RequestContent.fromFlux(null)), NullPointerException.class), - - // length cannot be negative - Arguments.of(createExecutable(() -> RequestContent.fromFlux(Flux.empty(), -1)), - IllegalArgumentException.class), - - // content cannot be null - Arguments.of(createExecutable(() -> RequestContent.fromBufferedFlux(null)), NullPointerException.class), - - // length cannot be negative - Arguments.of( - createExecutable(() -> RequestContent.fromBufferedFlux(new BufferedFluxByteBuffer(Flux.empty()), -1)), - IllegalArgumentException.class), - - // content cannot be null - Arguments.of(createExecutable(() -> RequestContent.fromInputStream(null)), NullPointerException.class), - - // length cannot be negative - Arguments.of( - createExecutable(() -> RequestContent.fromInputStream(new ByteArrayInputStream(dummyBytes), -1, 0)), - IllegalArgumentException.class), - - // chunkSize cannot be zero or negative - Arguments.of( - createExecutable(() -> RequestContent.fromInputStream(new ByteArrayInputStream(dummyBytes), 0, -1)), - IllegalArgumentException.class), - Arguments.of( - createExecutable(() -> RequestContent.fromInputStream(new ByteArrayInputStream(dummyBytes), 0, 0)), - IllegalArgumentException.class) - ); - } - - private static Executable createExecutable(Runnable runnable) { - return runnable::run; - } -}