Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
bc7f532
Clients and code samples for LeaseClients
alzimmermsft Sep 13, 2019
99948a4
Updated tests
alzimmermsft Sep 13, 2019
4b13ea4
Updating unit tests
alzimmermsft Sep 13, 2019
b36de0d
Using builder pattern for LeaseClients
alzimmermsft Sep 16, 2019
eeec369
Merge branch 'master' into AzStorage_LeaseClient
alzimmermsft Sep 16, 2019
0ba893a
Fix unit tests
alzimmermsft Sep 16, 2019
52f0a11
Cleaning up documentation
alzimmermsft Sep 16, 2019
52f9134
Fix linting issues and more unit tests
alzimmermsft Sep 16, 2019
1b18093
Merge branch 'master' into AzStorage_LeaseClient
alzimmermsft Sep 16, 2019
dcd005c
Some moving of clients
alzimmermsft Sep 16, 2019
65ea68c
Moved test files into correct folder
alzimmermsft Sep 16, 2019
28d6e77
Merge branch 'AzStorage_LeaseClient' into AzStorage_MoveSubBlobs
alzimmermsft Sep 16, 2019
a91f4ae
Moved sub-blob clients to specialized namespace, added BlobBaseClients
alzimmermsft Sep 17, 2019
d77b4cd
Refactoring unit tests
alzimmermsft Sep 17, 2019
63f0e1b
More cleaning up
alzimmermsft Sep 17, 2019
a619012
More cleaning up
alzimmermsft Sep 18, 2019
2f595bb
Merged in master
alzimmermsft Sep 19, 2019
51da041
Cleaning up linting issues
alzimmermsft Sep 20, 2019
307fd0d
Addressing PR feedback
alzimmermsft Sep 20, 2019
10d8ca6
Merged in master
alzimmermsft Sep 20, 2019
a49a98a
Merged in master
alzimmermsft Sep 23, 2019
d300657
Merged in master
alzimmermsft Sep 23, 2019
cc60e01
Merge branch 'master' into AzStorage_MoveSubBlobs
alzimmermsft Sep 24, 2019
8919c8c
Cleaning up Spotbug and linting issues
alzimmermsft Sep 24, 2019
43d31e6
Updated SpecializedBlobClientBuilder to be a full-fledged builder
alzimmermsft Sep 24, 2019
0b4b624
Merged in master
alzimmermsft Sep 25, 2019
d67ad67
Merged in master
alzimmermsft Sep 26, 2019
c4ba6ae
Updating CPK variable name and moving specialized test classes to spe…
alzimmermsft Sep 26, 2019
8cfcece
Cleaning up tests and linting
alzimmermsft Sep 26, 2019
b0538ce
Fix test order and added exception comment
alzimmermsft Sep 26, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,8 @@
<suppress checks="com.azure.tools.checkstyle.checks.ThrowFromClientLoggerCheck" files=".*[/\\]com[/\\]azure[/\\]core[/\\]util[/\\]logging[/\\]*"/>

<!-- Suppress IO exception for now, which need code owner's attention -->
<suppress checks="com.azure.tools.checkstyle.checks.ThrowFromClientLoggerCheck" files="com.azure.storage.blob.BlobInputStream.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ThrowFromClientLoggerCheck" files="com.azure.storage.blob.BlobOutputStream.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ThrowFromClientLoggerCheck" files="com.azure.storage.blob.BlockBlobClient.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ThrowFromClientLoggerCheck" files="com.azure.storage.blob.PageBlobAsyncClient.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ThrowFromClientLoggerCheck" files="com.azure.storage.blob.specialized.BlobInputStream.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ThrowFromClientLoggerCheck" files="com.azure.storage.blob.specialized.BlobOutputStream.java"/>

<!-- Suppress requirement that abstract builder classes require the ServiceClientBuilder annotation -->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientBuilderCheck" files="com.azure.storage.common.BaseClientBuilder.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,8 @@
<!-- Incorrect flagging, if the response is null a StorageException would have been thrown -->
<Match>
<Or>
<Class name="com.azure.storage.blob.BlobInputStream"/>
<Class name="com.azure.storage.blob.BlobOutputStream$AppendBlobOutputStream"/>
<Class name="com.azure.storage.blob.specialized.BlobInputStream"/>
<Class name="com.azure.storage.blob.specialized.BlobOutputStream$AppendBlobOutputStream"/>
<Class name="com.azure.storage.queue.QueueServiceClient"/>
</Or>
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
Expand Down Expand Up @@ -579,7 +579,7 @@

<!-- This is a false positive, it is possible for usingUserDelegation to be false. -->
<Match>
<Class name="com.azure.storage.blob.BlobServiceSASSignatureValues"/>
<Class name="com.azure.storage.blob.specialized.BlobServiceSASSignatureValues"/>
<Bug pattern="UC_USELESS_CONDITION"/>
</Match>

Expand Down
2 changes: 2 additions & 0 deletions eng/spotbugs-aggregate-report/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
<source>..\..\sdk\core\azure-core-http-netty\src\main\java\com</source>
<source>..\..\sdk\core\azure-core-http-netty\src\samples\java\com</source>
<source>..\..\sdk\core\azure-core-http-okhttp\src\main\java\com</source>
<source>..\..\sdk\core\azure-core-http-okhttp\src\samples\java\com</source>
<source>..\..\sdk\core\azure-core-management\src\main\java\com</source>
<source>..\..\sdk\core\azure-core-test\src\main\java\com</source>
<source>..\..\sdk\eventhubs\azure-messaging-eventhubs\src\main\java\com</source>
Expand All @@ -79,6 +80,7 @@
<!-- <source>..\..\sdk\keyvault\azure-keyvault-secrets\src\main\java\com</source>-->
<!-- <source>..\..\sdk\keyvault\azure-keyvault-secrets\src\samples\java\com</source>-->
<source>..\..\sdk\storage\azure-storage-common\src\main\java\com</source>
<source>..\..\sdk\storage\azure-storage-common\src\samples\java\com</source>
<source>..\..\sdk\storage\azure-storage-blob\src\main\java\com</source>
<source>..\..\sdk\storage\azure-storage-blob\src\samples\java\com</source>
<source>..\..\sdk\storage\azure-storage-blob-cryptography\src\main\java\com</source>
Expand Down
1 change: 1 addition & 0 deletions pom.client.xml
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@
-snippetpath ${project.basedir}/sdk/keyvault/azure-keyvault-keys/src/samples/java
-snippetpath ${project.basedir}/sdk/keyvault/azure-keyvault-secrets/src/samples/java
-snippetpath ${project.basedir}/sdk/storage/azure-storage-blob/src/samples/java
-snippetpath ${project.basedir}/sdk/storage/azure-storage-common/src/samples/java
-snippetpath ${project.basedir}/sdk/storage/azure-storage-file/src/samples/java
-snippetpath ${project.basedir}/sdk/storage/azure-storage-queue/src/samples/java
</additionalOptions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void simpleInstantiation() {
// END: com.azure.core.http.okhttp.instantiation-simple
}

private void proxySample() {
public void proxySample() {
// BEGIN: com.azure.core.http.okhttp.OkHttpAsyncHttpClientBuilder#proxy
final String proxyHost = "<proxy-host>"; // e.g. localhost
final int proxyPort = 9999; // Proxy port
Expand All @@ -38,7 +38,7 @@ private void proxySample() {

}

private void proxyBasicAuthenticationSample() {
public void proxyBasicAuthenticationSample() {

// BEGIN: com.azure.core.http.okhttp.OkHttpAsyncHttpClientBuilder#setProxyAuthenticator
final String proxyHost = "<proxy-host>"; // e.g. localhost
Expand All @@ -60,7 +60,7 @@ private void proxyBasicAuthenticationSample() {

}

private void connectionTimeoutSample() {
public void connectionTimeoutSample() {

// BEGIN: com.azure.core.http.okhttp.OkHttpAsyncHttpClientBuilder#connectionTimeout
final Duration connectionTimeout = Duration.ofSeconds(250); // connection timeout of 250 seconds
Expand All @@ -71,7 +71,7 @@ private void connectionTimeoutSample() {

}

private void readTimeoutSample() {
public void readTimeoutSample() {

// BEGIN: com.azure.core.http.okhttp.OkHttpAsyncHttpClientBuilder#readTimeout
final Duration readTimeout = Duration.ofSeconds(100); // read timeout of 100 seconds
Expand All @@ -82,7 +82,7 @@ private void readTimeoutSample() {

}

private void usingExistingHttpClientSample() {
public void usingExistingHttpClientSample() {

// BEGIN: com.azure.core.http.okhttp.using-existing-okhttp
// Create an OkHttpClient with connection timeout of 250 seconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.messaging.eventhubs.checkpointstore.blob;

import com.azure.core.http.rest.Response;
import com.azure.core.implementation.util.ImplUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.EventProcessor;
Expand Down Expand Up @@ -42,9 +43,10 @@ public class BlobPartitionManager implements PartitionManager {
private static final String OFFSET = "Offset";
private static final String OWNER_ID = "OwnerId";
private static final String ETAG = "eTag";
private static final String CLAIM_ERROR = "Couldn't claim ownership of partition {}, error {}";

private static final String BLOB_PATH_SEPARATOR = "/";
private static final ByteBuffer UPLOAD_DATA = ByteBuffer.wrap("" .getBytes(UTF_8));
private static final ByteBuffer UPLOAD_DATA = ByteBuffer.wrap("".getBytes(UTF_8));

private final ContainerAsyncClient containerAsyncClient;
private final ClientLogger logger = new ClientLogger(BlobPartitionManager.class);
Expand Down Expand Up @@ -90,57 +92,48 @@ public Flux<PartitionOwnership> listOwnership(String eventHubName, String consum
@Override
public Flux<PartitionOwnership> claimOwnership(PartitionOwnership... requestedPartitionOwnerships) {

return Flux.fromArray(requestedPartitionOwnerships).flatMap(
partitionOwnership -> {

String partitionId = partitionOwnership.getPartitionId();
String blobName = getBlobName(partitionOwnership.getEventHubName(),
partitionOwnership.getConsumerGroupName(), partitionId);

if (!blobClients.containsKey(blobName)) {
blobClients.put(blobName, containerAsyncClient.getBlobAsyncClient(blobName));
}

BlobAsyncClient blobAsyncClient = blobClients.get(blobName);

Metadata metadata = new Metadata();
metadata.put(OWNER_ID, partitionOwnership.getOwnerId());
Long offset = partitionOwnership.getOffset();
metadata.put(OFFSET, offset == null ? null : String.valueOf(offset));
Long sequenceNumber = partitionOwnership.getSequenceNumber();
metadata.put(SEQUENCE_NUMBER, sequenceNumber == null ? null : String.valueOf(sequenceNumber));
BlobAccessConditions blobAccessConditions = new BlobAccessConditions();
if (ImplUtils.isNullOrEmpty(partitionOwnership.getETag())) {
// New blob should be created
blobAccessConditions.setModifiedAccessConditions(new ModifiedAccessConditions()
.setIfNoneMatch("*"));
return blobAsyncClient.asBlockBlobAsyncClient()
.uploadWithResponse(Flux.just(UPLOAD_DATA), 0, null, metadata, null,
blobAccessConditions)
.flatMapMany(response -> {
partitionOwnership.setETag(response.getHeaders().get(ETAG).getValue());
return Mono.just(partitionOwnership);
}, error -> {
logger.info("Couldn't claim ownership of partition {}, error {}", partitionId,
error.getMessage());
return Mono.empty();
}, Mono::empty);
} else {
// update existing blob
blobAccessConditions.setModifiedAccessConditions(new ModifiedAccessConditions()
.setIfMatch(partitionOwnership.getETag()));
return blobAsyncClient.setMetadataWithResponse(metadata, blobAccessConditions)
.flatMapMany(response -> {
partitionOwnership.setETag(response.getHeaders().get(ETAG).getValue());
return Mono.just(partitionOwnership);
}, error -> {
logger.info("Couldn't claim ownership of partition {}, error {}", partitionId,
error.getMessage());
return Mono.empty();
}, () -> Mono.empty());
}
return Flux.fromArray(requestedPartitionOwnerships).flatMap(partitionOwnership -> {
String partitionId = partitionOwnership.getPartitionId();
String blobName = getBlobName(partitionOwnership.getEventHubName(),
partitionOwnership.getConsumerGroupName(), partitionId);

if (!blobClients.containsKey(blobName)) {
blobClients.put(blobName, containerAsyncClient.getBlobAsyncClient(blobName));
}

BlobAsyncClient blobAsyncClient = blobClients.get(blobName);

Metadata metadata = new Metadata();
metadata.put(OWNER_ID, partitionOwnership.getOwnerId());
Long offset = partitionOwnership.getOffset();
metadata.put(OFFSET, offset == null ? null : String.valueOf(offset));
Long sequenceNumber = partitionOwnership.getSequenceNumber();
metadata.put(SEQUENCE_NUMBER, sequenceNumber == null ? null : String.valueOf(sequenceNumber));
BlobAccessConditions blobAccessConditions = new BlobAccessConditions();
if (ImplUtils.isNullOrEmpty(partitionOwnership.getETag())) {
// New blob should be created
blobAccessConditions.setModifiedAccessConditions(new ModifiedAccessConditions().setIfNoneMatch("*"));
return blobAsyncClient.asBlockBlobAsyncClient()
.uploadWithResponse(Flux.just(UPLOAD_DATA), 0, null, metadata, null, blobAccessConditions)
.flatMapMany(response -> updateOwnershipETag(response, partitionOwnership), error -> {
logger.info(CLAIM_ERROR, partitionId, error.getMessage());
return Mono.empty();
}, Mono::empty);
} else {
// update existing blob
blobAccessConditions.setModifiedAccessConditions(new ModifiedAccessConditions()
.setIfMatch(partitionOwnership.getETag()));
return blobAsyncClient.setMetadataWithResponse(metadata, blobAccessConditions)
.flatMapMany(response -> updateOwnershipETag(response, partitionOwnership), error -> {
logger.info(CLAIM_ERROR, partitionId, error.getMessage());
return Mono.empty();
}, Mono::empty);
}
);
});
}

private Mono<PartitionOwnership> updateOwnershipETag(Response<?> response, PartitionOwnership ownership) {
return Mono.just(ownership.setETag(response.getHeaders().get(ETAG).getValue()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import com.azure.storage.blob.BlobAsyncClient;
import com.azure.storage.blob.BlockBlobAsyncClient;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.azure.storage.blob.ContainerAsyncClient;
import com.azure.storage.blob.models.BlobAccessConditions;
import com.azure.storage.blob.models.BlobItem;
Expand Down
9 changes: 9 additions & 0 deletions sdk/storage/azure-storage-blob/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,15 @@
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<testResources>
<testResource>
<directory>${basedir}/src/test/resources</directory>
</testResource>
</testResources>
</build>

<profiles>
<profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,68 @@

package com.azure.storage.blob;

import com.azure.core.http.HttpPipeline;
import com.azure.core.http.policy.UserAgentPolicy;
import com.azure.storage.blob.models.CpkInfo;
import com.azure.storage.blob.models.CustomerProvidedKey;
import com.azure.storage.common.BaseClientBuilder;
import com.azure.storage.common.Constants;
import com.azure.storage.common.policy.ResponseValidationPolicyBuilder;

abstract class BaseBlobClientBuilder<T extends BaseClientBuilder<T>> extends BaseClientBuilder<T> {
/**
* Base builder for Azure Storage Blobs.
* @param <T> Generic type that extends {@link BaseClientBuilder}.
*/
public abstract class BaseBlobClientBuilder<T extends BaseClientBuilder<T>> extends BaseClientBuilder<T> {

private static final String BLOB_ENDPOINT_MIDFIX = "blob";

protected CpkInfo cpk;
protected CpkInfo customerProvidedKey;

@SuppressWarnings("unchecked")
/**
* Sets the {@link CustomerProvidedKey customer provided key} that is used to encrypt blob contents on the server.
*
* @param key Customer provided key containing the encryption key
* @return the updated builder object
*/
public T customerProvidedKey(CustomerProvidedKey key) {
cpk = new CpkInfo()
.setEncryptionKey(key.getKey())
.setEncryptionKeySha256(key.getKeySHA256())
.setEncryptionAlgorithm(key.getEncryptionAlgorithm());
if (key == null) {
customerProvidedKey = null;
} else {
customerProvidedKey = new CpkInfo()
.setEncryptionKey(key.getKey())
.setEncryptionKeySha256(key.getKeySHA256())
.setEncryptionAlgorithm(key.getEncryptionAlgorithm());
}

return (T) this;
return getClazz().cast(this);
}

/**
* Gets the {@link UserAgentPolicy user agent policy} that is used to set the User-Agent header for each request.
*
* @return the {@code UserAgentPolicy} that will be used in the {@link HttpPipeline}.
*/
@Override
protected final UserAgentPolicy getUserAgentPolicy() {
return new UserAgentPolicy(BlobConfiguration.NAME, BlobConfiguration.VERSION, super.getConfiguration());
}

/**
* Gets the midfix used to create the resource URL.
*
* @return the Azure Storage Blob midfix.
*/
@Override
protected final String getServiceUrlMidfix() {
return BLOB_ENDPOINT_MIDFIX;
}

/**
* Configures the response validation rules that are applied to each request/response.
*
* @param builder Builder to assemble assertions together.
*/
@Override
protected final void applyServiceSpecificValidations(ResponseValidationPolicyBuilder builder) {
// CPK
Expand Down
Loading