diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java index d987ad084d3ef..a120c297ea7e9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java @@ -49,6 +49,7 @@ public interface CommonStatisticNames { String OP_DELETE = "op_delete"; String OP_EXISTS = "op_exists"; String OP_GET_CONTENT_SUMMARY = "op_get_content_summary"; + String OP_GET_DELEGATION_TOKEN = "op_get_delegation_token"; String OP_GET_FILE_CHECKSUM = "op_get_file_checksum"; String OP_GET_FILE_STATUS = "op_get_file_status"; String OP_GET_STATUS = "op_get_status"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index ccc546352908b..aaaa9362be684 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -134,6 +134,10 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES) private String azureAtomicDirs; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE, + DefaultValue = DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE) + private boolean enableConditionalCreateOverwrite; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION) private boolean createRemoteFileSystemDuringInitialization; @@ -427,6 +431,10 @@ public String getAzureAtomicRenameDirs() { return this.azureAtomicDirs; } + public boolean isConditionalCreateOverwriteEnabled() { + return this.enableConditionalCreateOverwrite; + } + public boolean getCreateRemoteFileSystemDuringInitialization() { return this.createRemoteFileSystemDuringInitialization; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java new file mode 100644 index 0000000000000..57cc3eada4847 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricStringBuilder; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableMetric; + +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*; + +/** + * Instrumentation of Abfs counters. + */ +public class AbfsCountersImpl implements AbfsCounters { + + /** + * Single context for all the Abfs counters to separate them from other + * counters. + */ + private static final String CONTEXT = "AbfsContext"; + /** + * The name of a field added to metrics records that uniquely identifies a + * specific FileSystem instance. + */ + private static final String REGISTRY_ID = "AbfsID"; + /** + * The name of a field added to metrics records that indicates the hostname + * portion of the FS URL. + */ + private static final String METRIC_BUCKET = "AbfsBucket"; + + private final MetricsRegistry registry = + new MetricsRegistry("abfsMetrics").setContext(CONTEXT); + + private static final AbfsStatistic[] STATISTIC_LIST = { + CALL_CREATE, + CALL_OPEN, + CALL_GET_FILE_STATUS, + CALL_APPEND, + CALL_CREATE_NON_RECURSIVE, + CALL_DELETE, + CALL_EXIST, + CALL_GET_DELEGATION_TOKEN, + CALL_LIST_STATUS, + CALL_MKDIRS, + CALL_RENAME, + DIRECTORIES_CREATED, + DIRECTORIES_DELETED, + FILES_CREATED, + FILES_DELETED, + ERROR_IGNORED, + CONNECTIONS_MADE, + SEND_REQUESTS, + GET_RESPONSES, + BYTES_SENT, + BYTES_RECEIVED, + READ_THROTTLES, + WRITE_THROTTLES + }; + + public AbfsCountersImpl(URI uri) { + UUID fileSystemInstanceId = UUID.randomUUID(); + registry.tag(REGISTRY_ID, + "A unique identifier for the instance", + fileSystemInstanceId.toString()); + registry.tag(METRIC_BUCKET, "Hostname from the FS URL", uri.getHost()); + + for (AbfsStatistic stats : STATISTIC_LIST) { + createCounter(stats); + } + } + + /** + * Look up a Metric from registered set. + * + * @param name name of metric. + * @return the metric or null. + */ + private MutableMetric lookupMetric(String name) { + return getRegistry().get(name); + } + + /** + * Look up counter by name. + * + * @param name name of counter. + * @return counter if found, else null. + */ + private MutableCounterLong lookupCounter(String name) { + MutableMetric metric = lookupMetric(name); + if (metric == null) { + return null; + } + if (!(metric instanceof MutableCounterLong)) { + throw new IllegalStateException("Metric " + name + + " is not a MutableCounterLong: " + metric); + } + return (MutableCounterLong) metric; + } + + /** + * Create a counter in the registry. + * + * @param stats AbfsStatistic whose counter needs to be made. + * @return counter or null. + */ + private MutableCounterLong createCounter(AbfsStatistic stats) { + return registry.newCounter(stats.getStatName(), + stats.getStatDescription(), 0L); + } + + /** + * {@inheritDoc} + * + * Increment a statistic with some value. + * + * @param statistic AbfsStatistic need to be incremented. + * @param value long value to be incremented by. + */ + @Override + public void incrementCounter(AbfsStatistic statistic, long value) { + MutableCounterLong counter = lookupCounter(statistic.getStatName()); + if (counter != null) { + counter.incr(value); + } + } + + /** + * Getter for MetricRegistry. + * + * @return MetricRegistry or null. + */ + private MetricsRegistry getRegistry() { + return registry; + } + + /** + * {@inheritDoc} + * + * Method to aggregate all the counters in the MetricRegistry and form a + * string with prefix, separator and suffix. + * + * @param prefix string that would be before metric. + * @param separator string that would be between metric name and value. + * @param suffix string that would be after metric value. + * @param all gets all the values even if unchanged. + * @return a String with all the metrics and their values. + */ + @Override + public String formString(String prefix, String separator, String suffix, + boolean all) { + + MetricStringBuilder metricStringBuilder = new MetricStringBuilder(null, + prefix, separator, suffix); + registry.snapshot(metricStringBuilder, all); + return metricStringBuilder.toString(); + } + + /** + * {@inheritDoc} + * + * Creating a map of all the counters for testing. + * + * @return a map of the metrics. + */ + @VisibleForTesting + @Override + public Map toMap() { + MetricsToMap metricBuilder = new MetricsToMap(null); + registry.snapshot(metricBuilder, true); + return metricBuilder.getMap(); + } + + protected static class MetricsToMap extends MetricsRecordBuilder { + private final MetricsCollector parent; + private final Map map = + new HashMap<>(); + + MetricsToMap(MetricsCollector parent) { + this.parent = parent; + } + + @Override + public MetricsRecordBuilder tag(MetricsInfo info, String value) { + return this; + } + + @Override + public MetricsRecordBuilder add(MetricsTag tag) { + return this; + } + + @Override + public MetricsRecordBuilder add(AbstractMetric metric) { + return this; + } + + @Override + public MetricsRecordBuilder setContext(String value) { + return this; + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, int value) { + return tuple(info, value); + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, long value) { + return tuple(info, value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, int value) { + return tuple(info, value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, long value) { + return tuple(info, value); + } + + public MetricsToMap tuple(MetricsInfo info, long value) { + return tuple(info.name(), value); + } + + public MetricsToMap tuple(String name, long value) { + map.put(name, value); + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, float value) { + return tuple(info, (long) value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, double value) { + return tuple(info, (long) value); + } + + @Override + public MetricsCollector parent() { + return parent; + } + + /** + * Get the map. + * + * @return the map of metrics. + */ + public Map getMap() { + return map; + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java new file mode 100644 index 0000000000000..2935cd754315d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames; + +/** + * Statistic which are collected in Abfs. + * Available as metrics in {@link AbfsCountersImpl}. + */ +public enum AbfsStatistic { + + CALL_CREATE(CommonStatisticNames.OP_CREATE, + "Calls of create()."), + CALL_OPEN(CommonStatisticNames.OP_OPEN, + "Calls of open()."), + CALL_GET_FILE_STATUS(CommonStatisticNames.OP_GET_FILE_STATUS, + "Calls of getFileStatus()."), + CALL_APPEND(CommonStatisticNames.OP_APPEND, + "Calls of append()."), + CALL_CREATE_NON_RECURSIVE(CommonStatisticNames.OP_CREATE_NON_RECURSIVE, + "Calls of createNonRecursive()."), + CALL_DELETE(CommonStatisticNames.OP_DELETE, + "Calls of delete()."), + CALL_EXIST(CommonStatisticNames.OP_EXISTS, + "Calls of exist()."), + CALL_GET_DELEGATION_TOKEN(CommonStatisticNames.OP_GET_DELEGATION_TOKEN, + "Calls of getDelegationToken()."), + CALL_LIST_STATUS(CommonStatisticNames.OP_LIST_STATUS, + "Calls of listStatus()."), + CALL_MKDIRS(CommonStatisticNames.OP_MKDIRS, + "Calls of mkdirs()."), + CALL_RENAME(CommonStatisticNames.OP_RENAME, + "Calls of rename()."), + DIRECTORIES_CREATED("directories_created", + "Total number of directories created through the object store."), + DIRECTORIES_DELETED("directories_deleted", + "Total number of directories deleted through the object store."), + FILES_CREATED("files_created", + "Total number of files created through the object store."), + FILES_DELETED("files_deleted", + "Total number of files deleted from the object store."), + ERROR_IGNORED("error_ignored", + "Errors caught and ignored."), + + //Network statistics. + CONNECTIONS_MADE("connections_made", + "Total number of times a connection was made with the data store."), + SEND_REQUESTS("send_requests", + "Total number of times http requests were sent to the data store."), + GET_RESPONSES("get_responses", + "Total number of times a response was received."), + BYTES_SENT("bytes_sent", + "Total bytes uploaded."), + BYTES_RECEIVED("bytes_received", + "Total bytes received."), + READ_THROTTLES("read_throttles", + "Total number of times a read operation is throttled."), + WRITE_THROTTLES("write_throttles", + "Total number of times a write operation is throttled."); + + private String statName; + private String statDescription; + + /** + * Constructor of AbfsStatistic to set statistic name and description. + * + * @param statName Name of the statistic. + * @param statDescription Description of the statistic. + */ + AbfsStatistic(String statName, String statDescription) { + this.statName = statName; + this.statDescription = statDescription; + } + + /** + * Getter for statistic name. + * + * @return Name of statistic. + */ + public String getStatName() { + return statName; + } + + /** + * Getter for statistic description. + * + * @return Description of statistic. + */ + public String getStatDescription() { + return statDescription; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 7180a4769ce85..b40f215133f8b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -69,6 +70,7 @@ import org.apache.hadoop.fs.azurebfs.extensions.AbfsAuthorizationException; import org.apache.hadoop.fs.azurebfs.extensions.AbfsAuthorizer; import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager; +import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; @@ -78,6 +80,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*; + /** * A {@link org.apache.hadoop.fs.FileSystem} for reading and writing files stored on Windows Azure @@ -93,6 +97,7 @@ public class AzureBlobFileSystem extends FileSystem { private boolean delegationTokenEnabled = false; private AbfsDelegationTokenManager delegationTokenManager; private AbfsAuthorizer authorizer; + private AbfsCounters abfsCounters; @Override public void initialize(URI uri, Configuration configuration) @@ -104,9 +109,10 @@ public void initialize(URI uri, Configuration configuration) LOG.debug("Initializing AzureBlobFileSystem for {}", uri); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); - this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration); + abfsCounters = new AbfsCountersImpl(uri); + this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), + configuration, abfsCounters); final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration(); - this.setWorkingDirectory(this.getHomeDirectory()); if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) { @@ -142,6 +148,11 @@ public String toString() { sb.append("uri=").append(uri); sb.append(", user='").append(abfsStore.getUser()).append('\''); sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\''); + if (abfsCounters != null) { + sb.append(", Statistics: {").append(abfsCounters.formString("{", "=", + "}", true)); + sb.append("}"); + } sb.append('}'); return sb.toString(); } @@ -158,7 +169,7 @@ public URI getUri() { @Override public FSDataInputStream open(final Path path, final int bufferSize) throws IOException { LOG.debug("AzureBlobFileSystem.open path: {} bufferSize: {}", path, bufferSize); - + statIncrement(CALL_OPEN); Path qualifiedPath = makeQualified(path); performAbfsAuthCheck(FsAction.READ, qualifiedPath); @@ -180,6 +191,7 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, fi overwrite, blockSize); + statIncrement(CALL_CREATE); trailingPeriodCheck(f); Path qualifiedPath = makeQualified(f); @@ -188,6 +200,7 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, fi try { OutputStream outputStream = abfsStore.createFile(qualifiedPath, overwrite, permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf())); + statIncrement(FILES_CREATED); return new FSDataOutputStream(outputStream, statistics); } catch(AzureBlobFileSystemException ex) { checkException(f, ex); @@ -201,6 +214,7 @@ public FSDataOutputStream createNonRecursive(final Path f, final FsPermission pe final boolean overwrite, final int bufferSize, final short replication, final long blockSize, final Progressable progress) throws IOException { + statIncrement(CALL_CREATE_NON_RECURSIVE); final Path parent = f.getParent(); final FileStatus parentFileStatus = tryGetFileStatus(parent); @@ -244,7 +258,7 @@ public FSDataOutputStream append(final Path f, final int bufferSize, final Progr "AzureBlobFileSystem.append path: {} bufferSize: {}", f.toString(), bufferSize); - + statIncrement(CALL_APPEND); Path qualifiedPath = makeQualified(f); performAbfsAuthCheck(FsAction.WRITE, qualifiedPath); @@ -260,7 +274,7 @@ public FSDataOutputStream append(final Path f, final int bufferSize, final Progr public boolean rename(final Path src, final Path dst) throws IOException { LOG.debug( "AzureBlobFileSystem.rename src: {} dst: {}", src.toString(), dst.toString()); - + statIncrement(CALL_RENAME); trailingPeriodCheck(dst); Path parentFolder = src.getParent(); @@ -328,7 +342,7 @@ public boolean rename(final Path src, final Path dst) throws IOException { public boolean delete(final Path f, final boolean recursive) throws IOException { LOG.debug( "AzureBlobFileSystem.delete path: {} recursive: {}", f.toString(), recursive); - + statIncrement(CALL_DELETE); Path qualifiedPath = makeQualified(f); performAbfsAuthCheck(FsAction.WRITE, qualifiedPath); @@ -354,7 +368,7 @@ public boolean delete(final Path f, final boolean recursive) throws IOException public FileStatus[] listStatus(final Path f) throws IOException { LOG.debug( "AzureBlobFileSystem.listStatus path: {}", f.toString()); - + statIncrement(CALL_LIST_STATUS); Path qualifiedPath = makeQualified(f); performAbfsAuthCheck(FsAction.READ, qualifiedPath); @@ -367,6 +381,24 @@ public FileStatus[] listStatus(final Path f) throws IOException { } } + /** + * Increment of an Abfs statistic. + * + * @param statistic AbfsStatistic that needs increment. + */ + private void statIncrement(AbfsStatistic statistic) { + incrementStatistic(statistic); + } + + /** + * Method for incrementing AbfsStatistic by a long value. + * + * @param statistic the Statistic to be incremented. + */ + private void incrementStatistic(AbfsStatistic statistic) { + abfsCounters.incrementCounter(statistic, 1); + } + /** * Performs a check for (.) until root in the path to throw an exception. * The purpose is to differentiate between dir/dir1 and dir/dir1. @@ -396,7 +428,7 @@ private void trailingPeriodCheck(Path path) throws IllegalArgumentException { public boolean mkdirs(final Path f, final FsPermission permission) throws IOException { LOG.debug( "AzureBlobFileSystem.mkdirs path: {} permissions: {}", f, permission); - + statIncrement(CALL_MKDIRS); trailingPeriodCheck(f); final Path parentFolder = f.getParent(); @@ -411,6 +443,7 @@ public boolean mkdirs(final Path f, final FsPermission permission) throws IOExce try { abfsStore.createDirectory(qualifiedPath, permission == null ? FsPermission.getDirDefault() : permission, FsPermission.getUMask(getConf())); + statIncrement(DIRECTORIES_CREATED); return true; } catch (AzureBlobFileSystemException ex) { checkException(f, ex, AzureServiceErrorCode.PATH_ALREADY_EXISTS); @@ -427,12 +460,13 @@ public synchronized void close() throws IOException { super.close(); LOG.debug("AzureBlobFileSystem.close"); this.isClosed = true; + LOG.debug("Closing Abfs: " + toString()); } @Override public FileStatus getFileStatus(final Path f) throws IOException { LOG.debug("AzureBlobFileSystem.getFileStatus path: {}", f); - + statIncrement(CALL_GET_FILE_STATUS); Path qualifiedPath = makeQualified(f); performAbfsAuthCheck(FsAction.READ, qualifiedPath); @@ -570,6 +604,11 @@ private boolean deleteRoot() throws IOException { @Override public Void call() throws Exception { delete(fs.getPath(), fs.isDirectory()); + if (fs.isDirectory()) { + statIncrement(DIRECTORIES_DELETED); + } else { + statIncrement(FILES_DELETED); + } return null; } }); @@ -943,11 +982,25 @@ public void access(final Path path, final FsAction mode) throws IOException { } } + /** + * Incrementing exists() calls from superclass for statistic collection. + * + * @param f source path. + * @return true if the path exists. + * @throws IOException + */ + @Override + public boolean exists(Path f) throws IOException { + statIncrement(CALL_EXIST); + return super.exists(f); + } + private FileStatus tryGetFileStatus(final Path f) { try { return getFileStatus(f); } catch (IOException ex) { LOG.debug("File not found {}", f); + statIncrement(ERROR_IGNORED); return null; } } @@ -964,6 +1017,7 @@ private boolean fileSystemExists() throws IOException { // there is not way to get the storage error code // workaround here is to check its status code. } catch (FileNotFoundException e) { + statIncrement(ERROR_IGNORED); return false; } } @@ -1135,6 +1189,7 @@ private Throwable getRootCause(Throwable throwable) { */ @Override public synchronized Token getDelegationToken(final String renewer) throws IOException { + statIncrement(CALL_GET_DELEGATION_TOKEN); return this.delegationTokenEnabled ? this.delegationTokenManager.getDelegationToken(renewer) : super.getDelegationToken(renewer); } @@ -1199,4 +1254,9 @@ private void performAbfsAuthCheck(FsAction action, Path... paths) } } } + + @VisibleForTesting + Map getInstrumentationMap() { + return abfsCounters.toMap(); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 0b2b3b1c57cb4..fb2ae9de2172d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -61,6 +61,7 @@ import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException; @@ -73,9 +74,13 @@ import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer; import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl; import org.apache.hadoop.fs.azurebfs.services.AbfsPermission; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AuthType; @@ -129,8 +134,9 @@ public class AzureBlobFileSystemStore { private final UserGroupInformation userGroupInformation; private final IdentityTransformer identityTransformer; - public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration) - throws IOException { + public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, + Configuration configuration, + AbfsCounters abfsCounters) throws IOException { this.uri = uri; String[] authorityParts = authorityParts(uri); final String fileSystemName = authorityParts[0]; @@ -160,7 +166,7 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration c this.authType = abfsConfiguration.getAuthType(accountName); boolean usingOauth = (authType == AuthType.OAuth); boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme; - initializeClient(uri, fileSystemName, accountName, useHttps); + initializeClient(uri, fileSystemName, accountName, useHttps, abfsCounters); this.identityTransformer = new IdentityTransformer(abfsConfiguration.getRawConfiguration()); } @@ -354,17 +360,107 @@ public OutputStream createFile(final Path path, final boolean overwrite, final F umask.toString(), isNamespaceEnabled); - client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite, - isNamespaceEnabled ? getOctalNotation(permission) : null, - isNamespaceEnabled ? getOctalNotation(umask) : null); + // if "fs.azure.enable.conditional.create.overwrite" is enabled and + // is a create request with overwrite=true, create will follow different + // flow. + boolean triggerConditionalCreateOverwrite = false; + if (overwrite + && abfsConfiguration.isConditionalCreateOverwriteEnabled()) { + triggerConditionalCreateOverwrite = true; + } + + if (triggerConditionalCreateOverwrite) { + conditionalCreateOverwriteFile(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), + isNamespaceEnabled ? getOctalNotation(permission) : null, + isNamespaceEnabled ? getOctalNotation(umask) : null + ); + + } else { + client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, + overwrite, + isNamespaceEnabled ? getOctalNotation(permission) : null, + isNamespaceEnabled ? getOctalNotation(umask) : null, + null); + } return new AbfsOutputStream( client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0, - abfsConfiguration.getWriteBufferSize(), - abfsConfiguration.isFlushEnabled(), - abfsConfiguration.isOutputStreamFlushDisabled()); + populateAbfsOutputStreamContext()); + } + + private AbfsOutputStreamContext populateAbfsOutputStreamContext() { + return new AbfsOutputStreamContext() + .withWriteBufferSize(abfsConfiguration.getWriteBufferSize()) + .enableFlush(abfsConfiguration.isFlushEnabled()) + .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled()) + .withStreamStatistics(new AbfsOutputStreamStatisticsImpl()) + .build(); + } + + /** + * Conditional create overwrite flow ensures that create overwrites is done + * only if there is match for eTag of existing file. + * @param relativePath + * @param permission + * @param umask + * @return + * @throws AzureBlobFileSystemException + */ + private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePath, + final String permission, + final String umask) throws AzureBlobFileSystemException { + AbfsRestOperation op; + + try { + // Trigger a create with overwrite=false first so that eTag fetch can be + // avoided for cases when no pre-existing file is present (major portion + // of create file traffic falls into the case of no pre-existing file). + op = client.createPath(relativePath, true, + false, permission, umask, null); + } catch (AbfsRestOperationException e) { + if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { + // File pre-exists, fetch eTag + try { + op = client.getPathStatus(relativePath); + } catch (AbfsRestOperationException ex) { + if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { + // Is a parallel access case, as file which was found to be + // present went missing by this request. + throw new ConcurrentWriteOperationDetectedException( + "Parallel access to the create path detected. Failing request " + + "to honor single writer semantics"); + } else { + throw ex; + } + } + + String eTag = op.getResult() + .getResponseHeader(HttpHeaderConfigurations.ETAG); + + try { + // overwrite only if eTag matches with the file properties fetched befpre + op = client.createPath(relativePath, true, + true, permission, umask, eTag); + } catch (AbfsRestOperationException ex) { + if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) { + // Is a parallel access case, as file with eTag was just queried + // and precondition failure can happen only when another file with + // different etag got created. + throw new ConcurrentWriteOperationDetectedException( + "Parallel access to the create path detected. Failing request " + + "to honor single writer semantics"); + } else { + throw ex; + } + } + } else { + throw e; + } + } + + return op; } public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask) @@ -379,7 +475,7 @@ public void createDirectory(final Path path, final FsPermission permission, fina client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), false, true, isNamespaceEnabled ? getOctalNotation(permission) : null, - isNamespaceEnabled ? getOctalNotation(umask) : null); + isNamespaceEnabled ? getOctalNotation(umask) : null, null); } public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statistics statistics) @@ -402,11 +498,18 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist null); } - // Add statistics for InputStream return new AbfsInputStream(client, statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength, - abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), - abfsConfiguration.getTolerateOobAppends(), eTag); + populateAbfsInputStreamContext(), + eTag); + } + + private AbfsInputStreamContext populateAbfsInputStreamContext() { + return new AbfsInputStreamContext() + .withReadBufferSize(abfsConfiguration.getReadBufferSize()) + .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) + .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends()) + .build(); } public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws @@ -435,9 +538,7 @@ public OutputStream openFileForWrite(final Path path, final boolean overwrite) t client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), offset, - abfsConfiguration.getWriteBufferSize(), - abfsConfiguration.isFlushEnabled(), - abfsConfiguration.isOutputStreamFlushDisabled()); + populateAbfsOutputStreamContext()); } public void rename(final Path source, final Path destination) throws @@ -912,7 +1013,8 @@ public boolean isAtomicRenameKey(String key) { return isKeyForDirectorySet(key, azureAtomicRenameDirSet); } - private void initializeClient(URI uri, String fileSystemName, String accountName, boolean isSecure) throws AzureBlobFileSystemException { + private void initializeClient(URI uri, String fileSystemName, + String accountName, boolean isSecure, AbfsCounters abfsCounters) throws AzureBlobFileSystemException { if (this.client != null) { return; } @@ -945,7 +1047,7 @@ private void initializeClient(URI uri, String fileSystemName, String accountName this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()), - tokenProvider); + tokenProvider, abfsCounters); } private String getOctalNotation(FsPermission fsPermission) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index b9e8fadfd3105..36c929ac6f547 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -50,6 +50,10 @@ public final class ConfigurationKeys { public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling"; public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https"; public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key"; + /** This config ensures that during create overwrite an existing file will be + * overwritten only if there is a match on the eTag of existing file. + */ + public static final String FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = "fs.azure.enable.conditional.create.overwrite"; public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth"; /** Provides a config control to enable or disable ABFS Flush operations - * HFlush and HSync. Default is true. **/ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 0a0ad379bb560..526df03600d86 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -54,6 +54,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false; public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase"; + public static final boolean DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = true; public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1; public static final boolean DEFAULT_ENABLE_FLUSH = true; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java new file mode 100644 index 0000000000000..79813ddfe6400 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.contracts.exceptions; + +/** + * Thrown when a concurrent write operation is detected. + */ +@org.apache.hadoop.classification.InterfaceAudience.Public +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class ConcurrentWriteOperationDetectedException + extends AzureBlobFileSystemException { + + public ConcurrentWriteOperationDetectedException(String message) { + super(message); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index b961b5ceebc92..33abffa9c89be 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -59,12 +59,14 @@ public class AbfsClient { private final String userAgent; private final AccessTokenProvider tokenProvider; + private final AbfsCounters abfsCounters; public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, final AbfsConfiguration abfsConfiguration, final ExponentialRetryPolicy exponentialRetryPolicy, - final AccessTokenProvider tokenProvider) { + final AccessTokenProvider tokenProvider, + final AbfsCounters abfsCounters) { this.baseUrl = baseUrl; this.sharedKeyCredentials = sharedKeyCredentials; String baseUrlString = baseUrl.toString(); @@ -85,6 +87,7 @@ public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredent this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName); this.tokenProvider = tokenProvider; + this.abfsCounters = abfsCounters; } public String getFileSystem() { @@ -217,7 +220,7 @@ public AbfsRestOperation deleteFilesystem() throws AzureBlobFileSystemException } public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite, - final String permission, final String umask) throws AzureBlobFileSystemException { + final String permission, final String umask, final String eTag) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); if (!overwrite) { requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR)); @@ -231,6 +234,10 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_UMASK, umask)); } + if (eTag != null && !eTag.isEmpty()) { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag)); + } + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY); @@ -533,7 +540,8 @@ private URL createRequestUrl(final String query) throws AzureBlobFileSystemExcep return createRequestUrl(EMPTY_STRING, query); } - private URL createRequestUrl(final String path, final String query) + @VisibleForTesting + protected URL createRequestUrl(final String path, final String query) throws AzureBlobFileSystemException { final String base = baseUrl.toString(); String encodedPath = path; @@ -608,4 +616,12 @@ String initializeUserAgent(final AbfsConfiguration abfsConfiguration, URL getBaseUrl() { return baseUrl; } + + /** + * Getter for abfsCounters from AbfsClient. + * @return AbfsCounters instance. + */ + protected AbfsCounters getAbfsCounters() { + return abfsCounters; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java index f1e5aaae6835c..e1a799b7a2648 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java @@ -114,16 +114,19 @@ public void addBytesTransferred(long count, boolean isFailedOperation) { /** * Suspends the current storage operation, as necessary, to reduce throughput. + * @return true if Thread sleeps(Throttling occurs) else false. */ - public void suspendIfNecessary() { + public boolean suspendIfNecessary() { int duration = sleepDuration; if (duration > 0) { try { Thread.sleep(duration); + return true; } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } + return false; } @VisibleForTesting @@ -269,4 +272,4 @@ static class AbfsOperationMetrics { this.operationsSuccessful = new AtomicLong(); } } -} \ No newline at end of file +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java index 1c6ce17a38c3c..7303e833418db 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.azurebfs.AbfsStatistic; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; /** @@ -103,17 +104,24 @@ static void updateMetrics(AbfsRestOperationType operationType, * uses this to suspend the request, if necessary, to minimize errors and * maximize throughput. */ - static void sendingRequest(AbfsRestOperationType operationType) { + static void sendingRequest(AbfsRestOperationType operationType, + AbfsCounters abfsCounters) { if (!isAutoThrottlingEnabled) { return; } switch (operationType) { case ReadFile: - singleton.readThrottler.suspendIfNecessary(); + if (singleton.readThrottler.suspendIfNecessary() + && abfsCounters != null) { + abfsCounters.incrementCounter(AbfsStatistic.READ_THROTTLES, 1); + } break; case Append: - singleton.writeThrottler.suspendIfNecessary(); + if (singleton.writeThrottler.suspendIfNecessary() + && abfsCounters != null) { + abfsCounters.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1); + } break; default: break; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java new file mode 100644 index 0000000000000..87b1af4f0620c --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.Map; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.azurebfs.AbfsStatistic; + +/** + * An interface for Abfs counters. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface AbfsCounters { + + /** + * Increment a AbfsStatistic by a long value. + * + * @param statistic AbfsStatistic to be incremented. + * @param value the value to increment the statistic by. + */ + void incrementCounter(AbfsStatistic statistic, long value); + + /** + * Form a String of the all the statistics and present in an organized manner. + * + * @param prefix the prefix to be set. + * @param separator the separator between the statistic name and value. + * @param suffix the suffix to be used. + * @param all enable all the statistics to be displayed or not. + * @return String of all the statistics and their values. + */ + String formString(String prefix, String separator, String suffix, + boolean all); + + /** + * Convert all the statistics into a key-value pair map to be used for + * testing. + * + * @return map with statistic name as key and statistic value as the map + * value. + */ + @VisibleForTesting + Map toMap(); + +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index fe48cb9323712..d0b9d4bd54663 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -55,21 +55,19 @@ public class AbfsInputStream extends FSInputStream { private boolean closed = false; public AbfsInputStream( - final AbfsClient client, - final Statistics statistics, - final String path, - final long contentLength, - final int bufferSize, - final int readAheadQueueDepth, - final boolean tolerateOobAppends, - final String eTag) { + final AbfsClient client, + final Statistics statistics, + final String path, + final long contentLength, + final AbfsInputStreamContext abfsInputStreamContext, + final String eTag) { this.client = client; this.statistics = statistics; this.path = path; this.contentLength = contentLength; - this.bufferSize = bufferSize; - this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors(); - this.tolerateOobAppends = tolerateOobAppends; + this.bufferSize = abfsInputStreamContext.getReadBufferSize(); + this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth(); + this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends(); this.eTag = eTag; this.readAheadEnabled = true; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java new file mode 100644 index 0000000000000..cba719101694c --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +/** + * Class to hold extra input stream configs. + */ +public class AbfsInputStreamContext extends AbfsStreamContext { + + private int readBufferSize; + + private int readAheadQueueDepth; + + private boolean tolerateOobAppends; + + public AbfsInputStreamContext() { + } + + public AbfsInputStreamContext withReadBufferSize(final int readBufferSize) { + this.readBufferSize = readBufferSize; + return this; + } + + public AbfsInputStreamContext withReadAheadQueueDepth( + final int readAheadQueueDepth) { + this.readAheadQueueDepth = (readAheadQueueDepth >= 0) + ? readAheadQueueDepth + : Runtime.getRuntime().availableProcessors(); + return this; + } + + public AbfsInputStreamContext withTolerateOobAppends( + final boolean tolerateOobAppends) { + this.tolerateOobAppends = tolerateOobAppends; + return this; + } + + public AbfsInputStreamContext build() { + // Validation of parameters to be done here. + return this; + } + + public int getReadBufferSize() { + return readBufferSize; + } + + public int getReadAheadQueueDepth() { + return readAheadQueueDepth; + } + + public boolean isTolerateOobAppends() { + return tolerateOobAppends; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index e68400f048144..f07c2f7ed7c4d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -35,6 +35,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; @@ -47,6 +49,7 @@ * The BlobFsOutputStream for Rest AbfsClient. */ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCapabilities { + private final AbfsClient client; private final String path; private long position; @@ -76,25 +79,30 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private final ElasticByteBufferPool byteBufferPool = new ElasticByteBufferPool(); + private final AbfsOutputStreamStatistics outputStreamStatistics; + + private static final Logger LOG = + LoggerFactory.getLogger(AbfsOutputStream.class); + public AbfsOutputStream( final AbfsClient client, final String path, final long position, - final int bufferSize, - final boolean supportFlush, - final boolean disableOutputStreamFlush) { + AbfsOutputStreamContext abfsOutputStreamContext) { this.client = client; this.path = path; this.position = position; this.closed = false; - this.supportFlush = supportFlush; - this.disableOutputStreamFlush = disableOutputStreamFlush; + this.supportFlush = abfsOutputStreamContext.isEnableFlush(); + this.disableOutputStreamFlush = abfsOutputStreamContext + .isDisableOutputStreamFlush(); this.lastError = null; this.lastFlushOffset = 0; - this.bufferSize = bufferSize; + this.bufferSize = abfsOutputStreamContext.getWriteBufferSize(); this.buffer = byteBufferPool.getBuffer(false, bufferSize).array(); this.bufferIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); + this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics(); this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); @@ -262,6 +270,9 @@ public synchronized void close() throws IOException { threadExecutor.shutdownNow(); } } + if (LOG.isDebugEnabled()) { + LOG.debug("Closing AbfsOutputStream ", toString()); + } } @Override @@ -285,16 +296,20 @@ private synchronized void writeCurrentBufferToService() throws IOException { if (bufferIndex == 0) { return; } + outputStreamStatistics.writeCurrentBuffer(); final byte[] bytes = buffer; final int bytesLength = bufferIndex; + outputStreamStatistics.bytesToUpload(bytesLength); buffer = byteBufferPool.getBuffer(false, bufferSize).array(); bufferIndex = 0; final long offset = position; position += bytesLength; if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { + long start = System.currentTimeMillis(); waitForTaskToComplete(); + outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis()); } final Future job = completionService.submit(new Callable() { @@ -307,6 +322,11 @@ public Void call() throws Exception { } }); + if (job.isCancelled()) { + outputStreamStatistics.uploadFailed(bytesLength); + } else { + outputStreamStatistics.uploadSuccessful(bytesLength); + } writeOperations.add(new WriteOperation(job, offset, bytesLength)); // Try to shrink the queue @@ -368,6 +388,8 @@ private synchronized void shrinkWriteOperationQueue() throws IOException { writeOperations.peek().task.get(); lastTotalAppendOffset += writeOperations.peek().length; writeOperations.remove(); + // Incrementing statistics to indicate queue has been shrunk. + outputStreamStatistics.queueShrunk(); } } catch (Exception e) { if (e.getCause() instanceof AzureBlobFileSystemException) { @@ -415,4 +437,38 @@ private static class WriteOperation { public synchronized void waitForPendingUploads() throws IOException { waitForTaskToComplete(); } + + /** + * Getter method for AbfsOutputStream statistics. + * + * @return statistics for AbfsOutputStream. + */ + @VisibleForTesting + public AbfsOutputStreamStatistics getOutputStreamStatistics() { + return outputStreamStatistics; + } + + /** + * Getter to get the size of the task queue. + * + * @return the number of writeOperations in AbfsOutputStream. + */ + @VisibleForTesting + public int getWriteOperationsSize() { + return writeOperations.size(); + } + + /** + * Appending AbfsOutputStream statistics to base toString(). + * + * @return String with AbfsOutputStream statistics. + */ + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(super.toString()); + sb.append("AbfsOuputStream@").append(this.hashCode()).append("){"); + sb.append(outputStreamStatistics.toString()); + sb.append("}"); + return sb.toString(); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java new file mode 100644 index 0000000000000..e0aefbf33b2e0 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +/** + * Class to hold extra output stream configs. + */ +public class AbfsOutputStreamContext extends AbfsStreamContext { + + private int writeBufferSize; + + private boolean enableFlush; + + private boolean disableOutputStreamFlush; + + private AbfsOutputStreamStatistics streamStatistics; + + public AbfsOutputStreamContext() { + } + + public AbfsOutputStreamContext withWriteBufferSize( + final int writeBufferSize) { + this.writeBufferSize = writeBufferSize; + return this; + } + + public AbfsOutputStreamContext enableFlush(final boolean enableFlush) { + this.enableFlush = enableFlush; + return this; + } + + public AbfsOutputStreamContext disableOutputStreamFlush( + final boolean disableOutputStreamFlush) { + this.disableOutputStreamFlush = disableOutputStreamFlush; + return this; + } + + public AbfsOutputStreamContext withStreamStatistics( + final AbfsOutputStreamStatistics streamStatistics) { + this.streamStatistics = streamStatistics; + return this; + } + + public AbfsOutputStreamContext build() { + // Validation of parameters to be done here. + return this; + } + + public int getWriteBufferSize() { + return writeBufferSize; + } + + public boolean isEnableFlush() { + return enableFlush; + } + + public boolean isDisableOutputStreamFlush() { + return disableOutputStreamFlush; + } + + public AbfsOutputStreamStatistics getStreamStatistics() { + return streamStatistics; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java new file mode 100644 index 0000000000000..c9fe0dd45525d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Interface for {@link AbfsOutputStream} statistics. + */ +@InterfaceStability.Unstable +public interface AbfsOutputStreamStatistics { + + /** + * Number of bytes to be uploaded. + * + * @param bytes number of bytes to upload. + */ + void bytesToUpload(long bytes); + + /** + * Records a successful upload and the number of bytes uploaded. + * + * @param bytes number of bytes that were successfully uploaded. + */ + void uploadSuccessful(long bytes); + + /** + * Records that upload is failed and the number of bytes. + * + * @param bytes number of bytes that failed to upload. + */ + void uploadFailed(long bytes); + + /** + * Time spent in waiting for tasks to be completed in the blocking queue. + * + * @param start millisecond at which the wait for task to be complete begins. + * @param end millisecond at which the wait is completed for the task. + */ + void timeSpentTaskWait(long start, long end); + + /** + * Number of times task queue is shrunk. + */ + void queueShrunk(); + + /** + * Number of times buffer is written to the service after a write operation. + */ + void writeCurrentBuffer(); + + /** + * Method to form a string of all AbfsOutputStream statistics and their + * values. + * + * @return AbfsOutputStream statistics. + */ + @Override + String toString(); + +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java new file mode 100644 index 0000000000000..cd5a29e217ce5 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +/** + * OutputStream statistics implementation for Abfs. + */ +public class AbfsOutputStreamStatisticsImpl + implements AbfsOutputStreamStatistics { + private long bytesToUpload; + private long bytesUploadSuccessful; + private long bytesUploadFailed; + /** + * Counter to get the total time spent while waiting for tasks to complete + * in the blocking queue inside the thread executor. + */ + private long timeSpentOnTaskWait; + /** + * Counter to get the total number of queue shrink operations done {@code + * AbfsOutputStream#shrinkWriteOperationQueue()} by AbfsOutputStream to + * remove the write operations which were successfully done by + * AbfsOutputStream from the task queue. + */ + private long queueShrunkOps; + /** + * Counter to get the total number of times the current buffer is written + * to the service {@code AbfsOutputStream#writeCurrentBufferToService()} via + * AbfsClient and appended to the data store by AbfsRestOperation. + */ + private long writeCurrentBufferOperations; + + /** + * Records the need to upload bytes and increments the total bytes that + * needs to be uploaded. + * + * @param bytes total bytes to upload. Negative bytes are ignored. + */ + @Override + public void bytesToUpload(long bytes) { + if (bytes > 0) { + bytesToUpload += bytes; + } + } + + /** + * Records the total bytes successfully uploaded through AbfsOutputStream. + * + * @param bytes number of bytes that were successfully uploaded. Negative + * bytes are ignored. + */ + @Override + public void uploadSuccessful(long bytes) { + if (bytes > 0) { + bytesUploadSuccessful += bytes; + } + } + + /** + * Records the total bytes failed to upload through AbfsOutputStream. + * + * @param bytes number of bytes failed to upload. Negative bytes are ignored. + */ + @Override + public void uploadFailed(long bytes) { + if (bytes > 0) { + bytesUploadFailed += bytes; + } + } + + /** + * {@inheritDoc} + * + * Records the total time spent waiting for a task to complete. + * + * When the thread executor has a task queue + * {@link java.util.concurrent.BlockingQueue} of size greater than or + * equal to 2 times the maxConcurrentRequestCounts then, it waits for a + * task in that queue to finish, then do the next task in the queue. + * + * This time spent while waiting for the task to be completed is being + * recorded in this counter. + * + * @param startTime time(in milliseconds) before the wait for task to be + * completed is begin. + * @param endTime time(in milliseconds) after the wait for the task to be + * completed is done. + */ + @Override + public void timeSpentTaskWait(long startTime, long endTime) { + timeSpentOnTaskWait += endTime - startTime; + } + + /** + * {@inheritDoc} + * + * Records the number of times AbfsOutputStream try to remove the completed + * write operations from the beginning of write operation task queue. + */ + @Override + public void queueShrunk() { + queueShrunkOps++; + } + + /** + * {@inheritDoc} + * + * Records the number of times AbfsOutputStream writes the buffer to the + * service via the AbfsClient and appends the buffer to the service. + */ + @Override + public void writeCurrentBuffer() { + writeCurrentBufferOperations++; + } + + public long getBytesToUpload() { + return bytesToUpload; + } + + public long getBytesUploadSuccessful() { + return bytesUploadSuccessful; + } + + public long getBytesUploadFailed() { + return bytesUploadFailed; + } + + public long getTimeSpentOnTaskWait() { + return timeSpentOnTaskWait; + } + + public long getQueueShrunkOps() { + return queueShrunkOps; + } + + public long getWriteCurrentBufferOperations() { + return writeCurrentBufferOperations; + } + + /** + * String to show AbfsOutputStream statistics values in AbfsOutputStream. + * + * @return String with AbfsOutputStream statistics. + */ + @Override public String toString() { + final StringBuilder outputStreamStats = new StringBuilder( + "OutputStream Statistics{"); + outputStreamStats.append(", bytes_upload=").append(bytesToUpload); + outputStreamStats.append(", bytes_upload_successfully=") + .append(bytesUploadSuccessful); + outputStreamStats.append(", bytes_upload_failed=") + .append(bytesUploadFailed); + outputStreamStats.append(", time_spent_task_wait=") + .append(timeSpentOnTaskWait); + outputStreamStats.append(", queue_shrunk_ops=").append(queueShrunkOps); + outputStreamStats.append(", write_current_buffer_ops=") + .append(writeCurrentBufferOperations); + outputStreamStats.append("}"); + return outputStreamStats.toString(); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 54fe14ab2b4a5..9d299d75dc45e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.azurebfs.AbfsStatistic; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; @@ -62,6 +63,7 @@ public class AbfsRestOperation { private int bufferLength; private AbfsHttpOperation result; + private AbfsCounters abfsCounters; public AbfsHttpOperation getResult() { return result; @@ -87,6 +89,7 @@ public AbfsHttpOperation getResult() { this.requestHeaders = requestHeaders; this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method) || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method)); + this.abfsCounters = client.getAbfsCounters(); } /** @@ -114,6 +117,7 @@ public AbfsHttpOperation getResult() { this.buffer = buffer; this.bufferOffset = bufferOffset; this.bufferLength = bufferLength; + this.abfsCounters = client.getAbfsCounters(); } /** @@ -149,6 +153,7 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS try { // initialize the HTTP request and open the connection httpOperation = new AbfsHttpOperation(url, method, requestHeaders); + incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1); // sign the HTTP request if (client.getAccessToken() == null) { @@ -161,14 +166,23 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS client.getAccessToken()); } - AbfsClientThrottlingIntercept.sendingRequest(operationType); + AbfsClientThrottlingIntercept.sendingRequest(operationType, abfsCounters); if (hasRequestBody) { // HttpUrlConnection requires httpOperation.sendRequest(buffer, bufferOffset, bufferLength); + incrementCounter(AbfsStatistic.SEND_REQUESTS, 1); + incrementCounter(AbfsStatistic.BYTES_SENT, bufferLength); } httpOperation.processResponse(buffer, bufferOffset, bufferLength); + incrementCounter(AbfsStatistic.GET_RESPONSES, 1); + //Only increment bytesReceived counter when the status code is 2XX. + if (httpOperation.getStatusCode() >= HttpURLConnection.HTTP_OK + && httpOperation.getStatusCode() <= HttpURLConnection.HTTP_PARTIAL) { + incrementCounter(AbfsStatistic.BYTES_RECEIVED, + httpOperation.getBytesReceived()); + } } catch (IOException ex) { if (ex instanceof UnknownHostException) { LOG.warn(String.format("Unknown host name: %s. Retrying to resolve the host name...", httpOperation.getUrl().getHost())); @@ -208,4 +222,16 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS return true; } + + /** + * Incrementing Abfs counters with a long value. + * + * @param statistic the Abfs statistic that needs to be incremented. + * @param value the value to be incremented by. + */ + private void incrementCounter(AbfsStatistic statistic, long value) { + if (abfsCounters != null) { + abfsCounters.incrementCounter(statistic, value); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java new file mode 100644 index 0000000000000..ee77f595fed4b --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +/** + * Base stream configuration class which is going + * to store common configs among input and output streams. + */ +public abstract class AbfsStreamContext { +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index 04be4f458c0ce..d23b41e00e5e3 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.URI; import java.util.Hashtable; +import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; @@ -33,6 +34,8 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore; import org.apache.hadoop.fs.azure.NativeAzureFileSystem; @@ -41,6 +44,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.utils.UriUtils; import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import static org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.WASB_ACCOUNT_NAME_DOMAIN_SUFFIX; @@ -237,6 +241,11 @@ protected String getTestUrl() { protected void setFileSystemName(String fileSystemName) { this.fileSystemName = fileSystemName; } + + protected String getMethodName() { + return methodName.getMethodName(); + } + protected String getFileSystemName() { return fileSystemName; } @@ -341,4 +350,36 @@ protected Path path(String filepath) throws IOException { new Path(getTestPath(), filepath)); } + /** + * Generic create File and enabling AbfsOutputStream Flush. + * + * @param fs AzureBlobFileSystem that is initialised in the test. + * @param path Path of the file to be created. + * @return AbfsOutputStream for writing. + * @throws AzureBlobFileSystemException + */ + protected AbfsOutputStream createAbfsOutputStreamWithFlushEnabled( + AzureBlobFileSystem fs, + Path path) throws AzureBlobFileSystemException { + AzureBlobFileSystemStore abfss = fs.getAbfsStore(); + abfss.getAbfsConfiguration().setDisableOutputStreamFlush(false); + + return (AbfsOutputStream) abfss.createFile(path, + true, FsPermission.getDefault(), FsPermission.getUMask(fs.getConf())); + } + + /** + * Custom assertion for AbfsStatistics which have statistics, expected + * value and map of statistics and value as its parameters. + * @param statistic the AbfsStatistics which needs to be asserted. + * @param expectedValue the expected value of the statistics. + * @param metricMap map of (String, Long) with statistics name as key and + * statistics value as map value. + */ + protected long assertAbfsStatistics(AbfsStatistic statistic, + long expectedValue, Map metricMap) { + assertEquals("Mismatch in " + statistic.getStatName(), expectedValue, + (long) metricMap.get(statistic.getStatName())); + return expectedValue; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsTestWithTimeout.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsTestWithTimeout.java index fee90abeabc9e..ab7bc82370615 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsTestWithTimeout.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsTestWithTimeout.java @@ -23,6 +23,8 @@ import org.junit.Rule; import org.junit.rules.TestName; import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_TIMEOUT; @@ -31,6 +33,9 @@ * This class does not attempt to bind to Azure. */ public class AbstractAbfsTestWithTimeout extends Assert { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractAbfsTestWithTimeout.class); + /** * The name of the current method. */ @@ -67,4 +72,16 @@ public void nameThread() { protected int getTestTimeoutMillis() { return TEST_TIMEOUT; } + + /** + * Describe a test in the logs. + * + * @param text text to print + * @param args arguments to format in the printing + */ + protected void describe(String text, Object... args) { + LOG.info("\n\n{}: {}\n", + methodName.getMethodName(), + String.format(text, args)); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java new file mode 100644 index 0000000000000..b2a17cf64544e --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.junit.Test; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; + +public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestAbfsNetworkStatistics.class); + private static final int LARGE_OPERATIONS = 10; + + public ITestAbfsNetworkStatistics() throws Exception { + } + + /** + * Testing connections_made, send_request and bytes_send statistics in + * {@link AbfsRestOperation}. + */ + @Test + public void testAbfsHttpSendStatistics() throws IOException { + describe("Test to check correct values of statistics after Abfs http send " + + "request is done."); + + AzureBlobFileSystem fs = getFileSystem(); + Map metricMap; + Path sendRequestPath = path(getMethodName()); + String testNetworkStatsString = "http_send"; + long connectionsMade, requestsSent, bytesSent; + + /* + * Creating AbfsOutputStream will result in 1 connection made and 1 send + * request. + */ + try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs, + sendRequestPath)) { + out.write(testNetworkStatsString.getBytes()); + + /* + * Flushes all outstanding data (i.e. the current unfinished packet) + * from the client into the service on all DataNode replicas. + */ + out.hflush(); + + metricMap = fs.getInstrumentationMap(); + + /* + * Testing the network stats with 1 write operation. + * + * connections_made : 3(getFileSystem()) + 1(AbfsOutputStream) + 2(flush). + * + * send_requests : 1(getFileSystem()) + 1(AbfsOutputStream) + 2(flush). + * + * bytes_sent : bytes wrote in AbfsOutputStream. + */ + connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE, + 6, metricMap); + requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 4, + metricMap); + bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT, + testNetworkStatsString.getBytes().length, metricMap); + + } + + // To close the AbfsOutputStream 1 connection is made and 1 request is sent. + connectionsMade++; + requestsSent++; + + + try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs, + sendRequestPath)) { + + // Is a file overwrite case + long createRequestCalls = 1; + long createTriggeredGFSForETag = 0; + if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) { + createRequestCalls += 1; + createTriggeredGFSForETag = 1; + } + + for (int i = 0; i < LARGE_OPERATIONS; i++) { + out.write(testNetworkStatsString.getBytes()); + + /* + * 1 flush call would create 2 connections and 2 send requests. + * when hflush() is called it will essentially trigger append() and + * flush() inside AbfsRestOperation. Both of which calls + * executeHttpOperation() method which creates a connection and sends + * requests. + */ + out.hflush(); + } + + metricMap = fs.getInstrumentationMap(); + + /* + * Testing the network stats with Large amount of bytes sent. + * + * connections made : connections_made(Last assertion) + 1 + * (AbfsOutputStream) + LARGE_OPERATIONS * 2(flush). + * + * send requests : requests_sent(Last assertion) + 1(AbfsOutputStream) + + * LARGE_OPERATIONS * 2(flush). + * + * bytes sent : bytes_sent(Last assertion) + LARGE_OPERATIONS * (bytes + * wrote each time). + * + */ + + connectionsMade += createRequestCalls + createTriggeredGFSForETag; + requestsSent += createRequestCalls; + assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE, + connectionsMade + LARGE_OPERATIONS * 2, metricMap); + assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, + requestsSent + LARGE_OPERATIONS * 2, metricMap); + assertAbfsStatistics(AbfsStatistic.BYTES_SENT, + bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length), + metricMap); + + } + + } + + /** + * Testing get_response and bytes_received in {@link AbfsRestOperation}. + */ + @Test + public void testAbfsHttpResponseStatistics() throws IOException { + describe("Test to check correct values of statistics after Http " + + "Response is processed."); + + AzureBlobFileSystem fs = getFileSystem(); + Path getResponsePath = path(getMethodName()); + Map metricMap; + String testResponseString = "some response"; + long getResponses, bytesReceived; + + FSDataOutputStream out = null; + FSDataInputStream in = null; + try { + + /* + * Creating a File and writing some bytes in it. + * + * get_response : 3(getFileSystem) + 1(OutputStream creation) + 2 + * (Writing data in Data store). + * + */ + out = fs.create(getResponsePath); + out.write(testResponseString.getBytes()); + out.hflush(); + + // open would require 1 get response. + in = fs.open(getResponsePath); + // read would require 1 get response and also get the bytes received. + int result = in.read(); + + // Confirming read isn't -1. + LOG.info("Result of read operation : {}", result); + + metricMap = fs.getInstrumentationMap(); + + /* + * Testing values of statistics after writing and reading a buffer. + * + * get_responses - 6(above operations) + 1(open()) + 1 (read()). + * + * bytes_received - This should be equal to bytes sent earlier. + */ + getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 8, + metricMap); + // Testing that bytes received is equal to bytes sent. + long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName()); + bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, + bytesSend, + metricMap); + + } finally { + IOUtils.cleanupWithLogger(LOG, out, in); + } + + // To close the streams 1 response is received. + getResponses++; + + try { + + /* + * Creating a file and writing buffer into it. + * This is a file recreate, so it will trigger + * 2 extra calls if create overwrite is off by default. + * Also recording the buffer for future read() call. + * This creating outputStream and writing requires 2 * + * (LARGE_OPERATIONS) get requests. + */ + StringBuilder largeBuffer = new StringBuilder(); + out = fs.create(getResponsePath); + + long createRequestCalls = 1; + if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) { + createRequestCalls += 2; + } + + for (int i = 0; i < LARGE_OPERATIONS; i++) { + out.write(testResponseString.getBytes()); + out.hflush(); + largeBuffer.append(testResponseString); + } + + // Open requires 1 get_response. + in = fs.open(getResponsePath); + + /* + * Reading the file which was written above. This read() call would + * read bytes equal to the bytes that was written above. + * Get response would be 1 only. + */ + in.read(0, largeBuffer.toString().getBytes(), 0, + largeBuffer.toString().getBytes().length); + + metricMap = fs.getInstrumentationMap(); + + /* + * Testing the statistics values after writing and reading a large buffer. + * + * get_response : get_responses(Last assertion) + 1 + * (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing + * LARGE_OPERATIONS times) + 1(open()) + 1(read()) + + * 1 (createOverwriteTriggeredGetForeTag). + * + * bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS * + * bytes wrote each time (bytes_received is equal to bytes wrote in the + * File). + * + */ + assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, + bytesReceived + LARGE_OPERATIONS * (testResponseString.getBytes().length), + metricMap); + assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, + getResponses + 2 + createRequestCalls + 2 * LARGE_OPERATIONS, + metricMap); + + } finally { + IOUtils.cleanupWithLogger(LOG, out, in); + } + } + + /** + * Testing bytes_received counter value when a response failure occurs. + */ + @Test + public void testAbfsHttpResponseFailure() throws IOException { + describe("Test to check the values of bytes received counter when a " + + "response is failed"); + + AzureBlobFileSystem fs = getFileSystem(); + Path responseFailurePath = path(getMethodName()); + Map metricMap; + FSDataOutputStream out = null; + + try { + //create an empty file + out = fs.create(responseFailurePath); + //Re-creating the file again on same path with false overwrite, this + // would cause a response failure with status code 409. + out = fs.create(responseFailurePath, false); + } catch (FileAlreadyExistsException faee) { + metricMap = fs.getInstrumentationMap(); + // Assert after catching the 409 error to check the counter values. + assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, 0, metricMap); + } finally { + IOUtils.cleanupWithLogger(LOG, out); + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java new file mode 100644 index 0000000000000..09cbfde1bebfb --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; + +import org.junit.Test; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl; + +/** + * Test AbfsOutputStream statistics. + */ +public class ITestAbfsOutputStreamStatistics + extends AbstractAbfsIntegrationTest { + private static final int OPERATIONS = 10; + + public ITestAbfsOutputStreamStatistics() throws Exception { + } + + /** + * Tests to check bytes uploaded successfully in {@link AbfsOutputStream}. + */ + @Test + public void testAbfsOutputStreamUploadingBytes() throws IOException { + describe("Testing bytes uploaded successfully by AbfsOutputSteam"); + final AzureBlobFileSystem fs = getFileSystem(); + Path uploadBytesFilePath = path(getMethodName()); + String testBytesToUpload = "bytes"; + + try ( + AbfsOutputStream outForSomeBytes = createAbfsOutputStreamWithFlushEnabled( + fs, uploadBytesFilePath) + ) { + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatisticsForUploadBytes = + getAbfsOutputStreamStatistics(outForSomeBytes); + + //Test for zero bytes To upload. + assertEquals("Mismatch in bytes to upload", 0, + abfsOutputStreamStatisticsForUploadBytes.getBytesToUpload()); + + outForSomeBytes.write(testBytesToUpload.getBytes()); + outForSomeBytes.flush(); + abfsOutputStreamStatisticsForUploadBytes = + getAbfsOutputStreamStatistics(outForSomeBytes); + + //Test for bytes to upload. + assertEquals("Mismatch in bytes to upload", + testBytesToUpload.getBytes().length, + abfsOutputStreamStatisticsForUploadBytes.getBytesToUpload()); + + //Test for successful bytes uploaded. + assertEquals("Mismatch in successful bytes uploaded", + testBytesToUpload.getBytes().length, + abfsOutputStreamStatisticsForUploadBytes.getBytesUploadSuccessful()); + + } + + try ( + AbfsOutputStream outForLargeBytes = createAbfsOutputStreamWithFlushEnabled( + fs, uploadBytesFilePath)) { + + for (int i = 0; i < OPERATIONS; i++) { + outForLargeBytes.write(testBytesToUpload.getBytes()); + } + outForLargeBytes.flush(); + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + getAbfsOutputStreamStatistics(outForLargeBytes); + + //Test for bytes to upload. + assertEquals("Mismatch in bytes to upload", + OPERATIONS * (testBytesToUpload.getBytes().length), + abfsOutputStreamStatistics.getBytesToUpload()); + + //Test for successful bytes uploaded. + assertEquals("Mismatch in successful bytes uploaded", + OPERATIONS * (testBytesToUpload.getBytes().length), + abfsOutputStreamStatistics.getBytesUploadSuccessful()); + + } + } + + /** + * Tests to check correct values of queue shrunk operations in + * AbfsOutputStream. + * + * After writing data, AbfsOutputStream doesn't upload the data until + * flushed. Hence, flush() method is called after write() to test queue + * shrink operations. + */ + @Test + public void testAbfsOutputStreamQueueShrink() throws IOException { + describe("Testing queue shrink operations by AbfsOutputStream"); + final AzureBlobFileSystem fs = getFileSystem(); + Path queueShrinkFilePath = path(getMethodName()); + String testQueueShrink = "testQueue"; + + try (AbfsOutputStream outForOneOp = createAbfsOutputStreamWithFlushEnabled( + fs, queueShrinkFilePath)) { + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + getAbfsOutputStreamStatistics(outForOneOp); + + //Test for shrinking queue zero time. + assertEquals("Mismatch in queue shrunk operations", 0, + abfsOutputStreamStatistics.getQueueShrunkOps()); + + } + + /* + * After writing in the loop we flush inside the loop to ensure the write + * operation done in that loop is considered to be done which would help + * us triggering the shrinkWriteOperationQueue() method each time after + * the write operation. + * If we call flush outside the loop, then it will take all the write + * operations inside the loop as one write operation. + * + */ + try ( + AbfsOutputStream outForLargeOps = createAbfsOutputStreamWithFlushEnabled( + fs, queueShrinkFilePath)) { + for (int i = 0; i < OPERATIONS; i++) { + outForLargeOps.write(testQueueShrink.getBytes()); + outForLargeOps.flush(); + } + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + getAbfsOutputStreamStatistics(outForLargeOps); + /* + * After a write operation is done, it is in a task queue where it is + * removed. Hence, to get the correct expected value we get the size of + * the task queue from AbfsOutputStream and subtract it with total + * write operations done to get the number of queue shrinks done. + * + */ + assertEquals("Mismatch in queue shrunk operations", + OPERATIONS - outForLargeOps.getWriteOperationsSize(), + abfsOutputStreamStatistics.getQueueShrunkOps()); + } + + } + + /** + * Tests to check correct values of write current buffer operations done by + * AbfsOutputStream. + * + * After writing data, AbfsOutputStream doesn't upload data till flush() is + * called. Hence, flush() calls were made after write(). + */ + @Test + public void testAbfsOutputStreamWriteBuffer() throws IOException { + describe("Testing write current buffer operations by AbfsOutputStream"); + final AzureBlobFileSystem fs = getFileSystem(); + Path writeBufferFilePath = path(getMethodName()); + String testWriteBuffer = "Buffer"; + + try (AbfsOutputStream outForOneOp = createAbfsOutputStreamWithFlushEnabled( + fs, writeBufferFilePath)) { + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + getAbfsOutputStreamStatistics(outForOneOp); + + //Test for zero time writing buffer to service. + assertEquals("Mismatch in write current buffer operations", 0, + abfsOutputStreamStatistics.getWriteCurrentBufferOperations()); + + outForOneOp.write(testWriteBuffer.getBytes()); + outForOneOp.flush(); + + abfsOutputStreamStatistics = getAbfsOutputStreamStatistics(outForOneOp); + + //Test for one time writing buffer to service. + assertEquals("Mismatch in write current buffer operations", 1, + abfsOutputStreamStatistics.getWriteCurrentBufferOperations()); + } + + try ( + AbfsOutputStream outForLargeOps = createAbfsOutputStreamWithFlushEnabled( + fs, writeBufferFilePath)) { + + /* + * Need to flush each time after we write to actually write the data + * into the data store and thus, get the writeCurrentBufferToService() + * method triggered and increment the statistic. + */ + for (int i = 0; i < OPERATIONS; i++) { + outForLargeOps.write(testWriteBuffer.getBytes()); + outForLargeOps.flush(); + } + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + getAbfsOutputStreamStatistics(outForLargeOps); + //Test for 10 times writing buffer to service. + assertEquals("Mismatch in write current buffer operations", + OPERATIONS, + abfsOutputStreamStatistics.getWriteCurrentBufferOperations()); + } + } + + /** + * Method to get the AbfsOutputStream statistics. + * + * @param out AbfsOutputStream whose statistics is needed. + * @return AbfsOutputStream statistics implementation class to get the + * values of the counters. + */ + private static AbfsOutputStreamStatisticsImpl getAbfsOutputStreamStatistics( + AbfsOutputStream out) { + return (AbfsOutputStreamStatisticsImpl) out.getOutputStreamStatistics(); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java new file mode 100644 index 0000000000000..42205807c1b3e --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.Map; + +import org.junit.Test; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; +import org.apache.hadoop.fs.permission.FsPermission; + +/** + * Tests AzureBlobFileSystem Statistics. + */ +public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest { + + private static final int NUMBER_OF_OPS = 10; + + public ITestAbfsStatistics() throws Exception { + } + + /** + * Testing the initial value of statistics. + */ + @Test + public void testInitialStatsValues() throws IOException { + describe("Testing the initial values of Abfs counters"); + + AbfsCounters abfsCounters = + new AbfsCountersImpl(getFileSystem().getUri()); + Map metricMap = abfsCounters.toMap(); + + for (Map.Entry entry : metricMap.entrySet()) { + String key = entry.getKey(); + Long value = entry.getValue(); + + //Verify if initial value of statistic is 0. + checkInitialValue(key, value); + } + } + + /** + * Testing statistics by creating files and directories. + */ + @Test + public void testCreateStatistics() throws IOException { + describe("Testing counter values got by creating directories and files in" + + " Abfs"); + + AzureBlobFileSystem fs = getFileSystem(); + Path createFilePath = path(getMethodName()); + Path createDirectoryPath = path(getMethodName() + "Dir"); + + fs.mkdirs(createDirectoryPath); + fs.createNonRecursive(createFilePath, FsPermission + .getDefault(), false, 1024, (short) 1, 1024, null); + + Map metricMap = fs.getInstrumentationMap(); + /* + Test of statistic values after creating a directory and a file ; + getFileStatus is called 1 time after creating file and 1 time at time of + initialising. + */ + assertAbfsStatistics(AbfsStatistic.CALL_CREATE, 1, metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_CREATE_NON_RECURSIVE, 1, metricMap); + assertAbfsStatistics(AbfsStatistic.FILES_CREATED, 1, metricMap); + assertAbfsStatistics(AbfsStatistic.DIRECTORIES_CREATED, 1, metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_MKDIRS, 1, metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_GET_FILE_STATUS, 2, metricMap); + + //re-initialising Abfs to reset statistic values. + fs.initialize(fs.getUri(), fs.getConf()); + + /* + Creating 10 directories and files; Directories and files can't be created + with same name, hence + i to give unique names. + */ + for (int i = 0; i < NUMBER_OF_OPS; i++) { + fs.mkdirs(path(getMethodName() + "Dir" + i)); + fs.createNonRecursive(path(getMethodName() + i), + FsPermission.getDefault(), false, 1024, (short) 1, + 1024, null); + } + + metricMap = fs.getInstrumentationMap(); + /* + Test of statistics values after creating 10 directories and files; + getFileStatus is called 1 time at initialise() plus number of times file + is created. + */ + assertAbfsStatistics(AbfsStatistic.CALL_CREATE, NUMBER_OF_OPS, metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_CREATE_NON_RECURSIVE, NUMBER_OF_OPS, + metricMap); + assertAbfsStatistics(AbfsStatistic.FILES_CREATED, NUMBER_OF_OPS, metricMap); + assertAbfsStatistics(AbfsStatistic.DIRECTORIES_CREATED, NUMBER_OF_OPS, + metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_MKDIRS, NUMBER_OF_OPS, metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_GET_FILE_STATUS, + 1 + NUMBER_OF_OPS, metricMap); + } + + /** + * Testing statistics by deleting files and directories. + */ + @Test + public void testDeleteStatistics() throws IOException { + describe("Testing counter values got by deleting directory and files " + + "in Abfs"); + + AzureBlobFileSystem fs = getFileSystem(); + /* + This directory path needs to be root for triggering the + directories_deleted counter. + */ + Path createDirectoryPath = path("/"); + Path createFilePath = path(getMethodName()); + + /* + creating a directory and a file inside that directory. + The directory is root. Hence, no parent. This allows us to invoke + deleteRoot() method to see the population of directories_deleted and + files_deleted counters. + */ + fs.mkdirs(createDirectoryPath); + fs.create(path(createDirectoryPath + getMethodName())); + fs.delete(createDirectoryPath, true); + + Map metricMap = fs.getInstrumentationMap(); + + /* + Test for op_delete, files_deleted, op_list_status. + since directory is delete recursively op_delete is called 2 times. + 1 file is deleted, 1 listStatus() call is made. + */ + assertAbfsStatistics(AbfsStatistic.CALL_DELETE, 2, metricMap); + assertAbfsStatistics(AbfsStatistic.FILES_DELETED, 1, metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_LIST_STATUS, 1, metricMap); + + /* + creating a root directory and deleting it recursively to see if + directories_deleted is called or not. + */ + fs.mkdirs(createDirectoryPath); + fs.create(createFilePath); + fs.delete(createDirectoryPath, true); + metricMap = fs.getInstrumentationMap(); + + //Test for directories_deleted. + assertAbfsStatistics(AbfsStatistic.DIRECTORIES_DELETED, 1, metricMap); + } + + /** + * Testing statistics of open, append, rename and exists method calls. + */ + @Test + public void testOpenAppendRenameExists() throws IOException { + describe("Testing counter values on calling open, append and rename and " + + "exists methods on Abfs"); + + AzureBlobFileSystem fs = getFileSystem(); + Path createFilePath = path(getMethodName()); + Path destCreateFilePath = path(getMethodName() + "New"); + + fs.create(createFilePath); + fs.open(createFilePath); + fs.append(createFilePath); + assertTrue(fs.rename(createFilePath, destCreateFilePath)); + + Map metricMap = fs.getInstrumentationMap(); + //Testing single method calls to open, append and rename. + assertAbfsStatistics(AbfsStatistic.CALL_OPEN, 1, metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_APPEND, 1, metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_RENAME, 1, metricMap); + + //Testing if file exists at path. + assertTrue(String.format("File with name %s should exist", + destCreateFilePath), + fs.exists(destCreateFilePath)); + assertFalse(String.format("File with name %s should not exist", + createFilePath), + fs.exists(createFilePath)); + + metricMap = fs.getInstrumentationMap(); + //Testing exists() calls. + assertAbfsStatistics(AbfsStatistic.CALL_EXIST, 2, metricMap); + + //re-initialising Abfs to reset statistic values. + fs.initialize(fs.getUri(), fs.getConf()); + + fs.create(destCreateFilePath); + + for (int i = 0; i < NUMBER_OF_OPS; i++) { + fs.open(destCreateFilePath); + fs.append(destCreateFilePath); + } + + metricMap = fs.getInstrumentationMap(); + + //Testing large number of method calls to open, append. + assertAbfsStatistics(AbfsStatistic.CALL_OPEN, NUMBER_OF_OPS, metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_APPEND, NUMBER_OF_OPS, metricMap); + + for (int i = 0; i < NUMBER_OF_OPS; i++) { + // rename and then back to earlier name for no error while looping. + assertTrue(fs.rename(destCreateFilePath, createFilePath)); + assertTrue(fs.rename(createFilePath, destCreateFilePath)); + + //check if first name is existing and 2nd is not existing. + assertTrue(String.format("File with name %s should exist", + destCreateFilePath), + fs.exists(destCreateFilePath)); + assertFalse(String.format("File with name %s should not exist", + createFilePath), + fs.exists(createFilePath)); + + } + + metricMap = fs.getInstrumentationMap(); + + /* + Testing exists() calls and rename calls. Since both were called 2 + times in 1 loop. 2*numberOfOps is expectedValue. + */ + assertAbfsStatistics(AbfsStatistic.CALL_RENAME, 2 * NUMBER_OF_OPS, + metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_EXIST, 2 * NUMBER_OF_OPS, + metricMap); + + } + + /** + * Method to check initial value of the statistics which should be 0. + * + * @param statName name of the statistic to be checked. + * @param statValue value of the statistic. + */ + private void checkInitialValue(String statName, long statValue) { + assertEquals("Mismatch in " + statName, 0, statValue); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java index e315ad2f6e312..bdd022a8f7bb2 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java @@ -21,19 +21,47 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.FilterOutputStream; +import java.io.OutputStream; +import java.lang.reflect.Field; import java.util.EnumSet; +import java.util.UUID; +import java.util.concurrent.Callable; import org.apache.hadoop.test.LambdaTestUtils; import org.junit.Test; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; +import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; + +import static java.net.HttpURLConnection.HTTP_CONFLICT; +import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static java.net.HttpURLConnection.HTTP_OK; +import static java.net.HttpURLConnection.HTTP_PRECON_FAILED; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE; /** * Test create operation. @@ -202,4 +230,257 @@ public void call() throws Exception { }); } + /** + * Tests if the number of connections made for: + * 1. create overwrite=false of a file that doesnt pre-exist + * 2. create overwrite=false of a file that pre-exists + * 3. create overwrite=true of a file that doesnt pre-exist + * 4. create overwrite=true of a file that pre-exists + * matches the expectation when run against both combinations of + * fs.azure.enable.conditional.create.overwrite=true and + * fs.azure.enable.conditional.create.overwrite=false + * @throws Throwable + */ + @Test + public void testDefaultCreateOverwriteFileTest() throws Throwable { + testCreateFileOverwrite(true); + testCreateFileOverwrite(false); + } + + public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) + throws Throwable { + final AzureBlobFileSystem currentFs = getFileSystem(); + Configuration config = new Configuration(this.getRawConfiguration()); + config.set("fs.azure.enable.conditional.create.overwrite", + Boolean.toString(enableConditionalCreateOverwrite)); + + final AzureBlobFileSystem fs = + (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), + config); + + long totalConnectionMadeBeforeTest = fs.getInstrumentationMap() + .get(CONNECTIONS_MADE.getStatName()); + + int createRequestCount = 0; + final Path nonOverwriteFile = new Path("/NonOverwriteTest_FileName_" + + UUID.randomUUID().toString()); + + // Case 1: Not Overwrite - File does not pre-exist + // create should be successful + fs.create(nonOverwriteFile, false); + + // One request to server to create path should be issued + createRequestCount++; + + assertAbfsStatistics( + CONNECTIONS_MADE, + totalConnectionMadeBeforeTest + createRequestCount, + fs.getInstrumentationMap()); + + // Case 2: Not Overwrite - File pre-exists + intercept(FileAlreadyExistsException.class, new Callable() { + @Override + public FSDataOutputStream call() throws Exception { + return fs.create(nonOverwriteFile, false); + } + }); + + // One request to server to create path should be issued + createRequestCount++; + + assertAbfsStatistics( + CONNECTIONS_MADE, + totalConnectionMadeBeforeTest + createRequestCount, + fs.getInstrumentationMap()); + + final Path overwriteFilePath = new Path("/OverwriteTest_FileName_" + + UUID.randomUUID().toString()); + + // Case 3: Overwrite - File does not pre-exist + // create should be successful + fs.create(overwriteFilePath, true); + + // One request to server to create path should be issued + createRequestCount++; + + assertAbfsStatistics( + CONNECTIONS_MADE, + totalConnectionMadeBeforeTest + createRequestCount, + fs.getInstrumentationMap()); + + // Case 4: Overwrite - File pre-exists + fs.create(overwriteFilePath, true); + + if (enableConditionalCreateOverwrite) { + // Three requests will be sent to server to create path, + // 1. create without overwrite + // 2. GetFileStatus to get eTag + // 3. create with overwrite + createRequestCount += 3; + } else { + createRequestCount++; + } + + assertAbfsStatistics( + CONNECTIONS_MADE, + totalConnectionMadeBeforeTest + createRequestCount, + fs.getInstrumentationMap()); + } + + /** + * Test negative scenarios with Create overwrite=false as default + * With create overwrite=true ending in 3 calls: + * A. Create overwrite=false + * B. GFS + * C. Create overwrite=true + * + * Scn1: A fails with HTTP409, leading to B which fails with HTTP404, + * detect parallel access + * Scn2: A fails with HTTP409, leading to B which fails with HTTP500, + * fail create with HTTP500 + * Scn3: A fails with HTTP409, leading to B and then C, + * which fails with HTTP412, detect parallel access + * Scn4: A fails with HTTP409, leading to B and then C, + * which fails with HTTP500, fail create with HTTP500 + * Scn5: A fails with HTTP500, fail create with HTTP500 + */ + @Test + public void testNegativeScenariosForCreateOverwriteDisabled() + throws Throwable { + + final AzureBlobFileSystem currentFs = getFileSystem(); + Configuration config = new Configuration(this.getRawConfiguration()); + config.set("fs.azure.enable.conditional.create.overwrite", + Boolean.toString(true)); + + final AzureBlobFileSystem fs = + (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), + config); + + // Get mock AbfsClient with current config + AbfsClient + mockClient + = TestAbfsClient.getMockAbfsClient( + fs.getAbfsStore().getClient(), + fs.getAbfsStore().getAbfsConfiguration()); + + AzureBlobFileSystemStore abfsStore = fs.getAbfsStore(); + abfsStore = setAzureBlobSystemStoreField(abfsStore, "client", mockClient); + + AbfsRestOperation successOp = mock( + AbfsRestOperation.class); + AbfsHttpOperation http200Op = mock( + AbfsHttpOperation.class); + when(http200Op.getStatusCode()).thenReturn(HTTP_OK); + when(successOp.getResult()).thenReturn(http200Op); + + AbfsRestOperationException conflictResponseEx + = getMockAbfsRestOperationException(HTTP_CONFLICT); + AbfsRestOperationException serverErrorResponseEx + = getMockAbfsRestOperationException(HTTP_INTERNAL_ERROR); + AbfsRestOperationException fileNotFoundResponseEx + = getMockAbfsRestOperationException(HTTP_NOT_FOUND); + AbfsRestOperationException preConditionResponseEx + = getMockAbfsRestOperationException(HTTP_PRECON_FAILED); + + doThrow(conflictResponseEx) // Scn1: GFS fails with Http404 + .doThrow(conflictResponseEx) // Scn2: GFS fails with Http500 + .doThrow( + conflictResponseEx) // Scn3: create overwrite=true fails with Http412 + .doThrow( + conflictResponseEx) // Scn4: create overwrite=true fails with Http500 + .doThrow( + serverErrorResponseEx) // Scn5: create overwrite=false fails with Http500 + .when(mockClient) + .createPath(any(String.class), eq(true), eq(false), any(String.class), + any(String.class), (String) eq(null)); + + doThrow(fileNotFoundResponseEx) // Scn1: GFS fails with Http404 + .doThrow(serverErrorResponseEx) // Scn2: GFS fails with Http500 + .doReturn(successOp) // Scn3: create overwrite=true fails with Http412 + .doReturn(successOp) // Scn4: create overwrite=true fails with Http500 + .when(mockClient) + .getPathStatus(any(String.class)); + + doThrow( + preConditionResponseEx) // Scn3: create overwrite=true fails with Http412 + .doThrow( + serverErrorResponseEx) // Scn4: create overwrite=true fails with Http500 + .when(mockClient) + .createPath(any(String.class), eq(true), eq(true), any(String.class), + any(String.class), (String) eq(null)); + + // Scn1: GFS fails with Http404 + // Sequence of events expected: + // 1. create overwrite=false - fail with conflict + // 2. GFS - fail with File Not found + // Create will fail with ConcurrentWriteOperationDetectedException + validateCreateFileException(ConcurrentWriteOperationDetectedException.class, + abfsStore); + + // Scn2: GFS fails with Http500 + // Sequence of events expected: + // 1. create overwrite=false - fail with conflict + // 2. GFS - fail with Server error + // Create will fail with 500 + validateCreateFileException(AbfsRestOperationException.class, abfsStore); + + // Scn3: create overwrite=true fails with Http412 + // Sequence of events expected: + // 1. create overwrite=false - fail with conflict + // 2. GFS - pass + // 3. create overwrite=true - fail with Pre-Condition + // Create will fail with ConcurrentWriteOperationDetectedException + validateCreateFileException(ConcurrentWriteOperationDetectedException.class, + abfsStore); + + // Scn4: create overwrite=true fails with Http500 + // Sequence of events expected: + // 1. create overwrite=false - fail with conflict + // 2. GFS - pass + // 3. create overwrite=true - fail with Server error + // Create will fail with 500 + validateCreateFileException(AbfsRestOperationException.class, abfsStore); + + // Scn5: create overwrite=false fails with Http500 + // Sequence of events expected: + // 1. create overwrite=false - fail with server error + // Create will fail with 500 + validateCreateFileException(AbfsRestOperationException.class, abfsStore); + } + + private AzureBlobFileSystemStore setAzureBlobSystemStoreField( + final AzureBlobFileSystemStore abfsStore, + final String fieldName, + Object fieldObject) throws Exception { + + Field abfsClientField = AzureBlobFileSystemStore.class.getDeclaredField( + fieldName); + abfsClientField.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(abfsClientField, + abfsClientField.getModifiers() & ~java.lang.reflect.Modifier.FINAL); + abfsClientField.set(abfsStore, fieldObject); + return abfsStore; + } + + private void validateCreateFileException(final Class exceptionClass, final AzureBlobFileSystemStore abfsStore) + throws Exception { + final FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, + FsAction.ALL); + final FsPermission umask = new FsPermission(FsAction.NONE, FsAction.NONE, + FsAction.NONE); + final Path testPath = new Path("testFile"); + intercept(exceptionClass, new Callable() { + @Override + public OutputStream call() throws Exception { + return abfsStore.createFile(testPath, true, permission, umask); + } + }); + } + + private AbfsRestOperationException getMockAbfsRestOperationException(int status) { + return new AbfsRestOperationException(status, "", "", new Exception()); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java index 382d3966485f1..de476a6abce9b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java @@ -18,12 +18,18 @@ package org.apache.hadoop.fs.azurebfs; +import java.util.UUID; + import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE; + /** * Test mkdir operation. */ @@ -45,4 +51,58 @@ public void testCreateDirWithExistingDir() throws Exception { public void testCreateRoot() throws Exception { assertMkdirs(getFileSystem(), new Path("/")); } + + /** + * Test mkdir for possible values of fs.azure.disable.default.create.overwrite + * @throws Exception + */ + @Test + public void testDefaultCreateOverwriteDirTest() throws Throwable { + // the config fs.azure.disable.default.create.overwrite should have no + // effect on mkdirs + testCreateDirOverwrite(true); + testCreateDirOverwrite(false); + } + + public void testCreateDirOverwrite(boolean enableConditionalCreateOverwrite) + throws Throwable { + final AzureBlobFileSystem currentFs = getFileSystem(); + Configuration config = new Configuration(this.getRawConfiguration()); + config.set("fs.azure.enable.conditional.create.overwrite", + Boolean.toString(enableConditionalCreateOverwrite)); + + final AzureBlobFileSystem fs = + (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), + config); + + long totalConnectionMadeBeforeTest = fs.getInstrumentationMap() + .get(CONNECTIONS_MADE.getStatName()); + + int mkdirRequestCount = 0; + final Path dirPath = new Path("/DirPath_" + + UUID.randomUUID().toString()); + + // Case 1: Dir does not pre-exist + fs.mkdirs(dirPath); + + // One request to server + mkdirRequestCount++; + + assertAbfsStatistics( + CONNECTIONS_MADE, + totalConnectionMadeBeforeTest + mkdirRequestCount, + fs.getInstrumentationMap()); + + // Case 2: Dir pre-exists + // Mkdir on existing Dir path will not lead to failure + fs.mkdirs(dirPath); + + // One request to server + mkdirRequestCount++; + + assertAbfsStatistics( + CONNECTIONS_MADE, + totalConnectionMadeBeforeTest + mkdirRequestCount, + fs.getInstrumentationMap()); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java new file mode 100644 index 0000000000000..0639cf2f82b9a --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.Map; + +import org.junit.Test; + +import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; + +public class TestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest { + + private static final int LARGE_OPERATIONS = 1000; + + public TestAbfsNetworkStatistics() throws Exception { + } + + /** + * Test to check correct values of read and write throttling statistics in + * {@code AbfsClientThrottlingAnalyzer}. + */ + @Test + public void testAbfsThrottlingStatistics() throws IOException { + describe("Test to check correct values of read throttle and write " + + "throttle statistics in Abfs"); + + AbfsCounters statistics = + new AbfsCountersImpl(getFileSystem().getUri()); + + /* + * Calling the throttle methods to check correct summation and values of + * the counters. + */ + for (int i = 0; i < LARGE_OPERATIONS; i++) { + statistics.incrementCounter(AbfsStatistic.READ_THROTTLES, 1); + statistics.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1); + } + + Map metricMap = statistics.toMap(); + + /* + * Test to check read and write throttle statistics gave correct values for + * 1000 calls. + */ + assertAbfsStatistics(AbfsStatistic.READ_THROTTLES, LARGE_OPERATIONS, + metricMap); + assertAbfsStatistics(AbfsStatistic.WRITE_THROTTLES, LARGE_OPERATIONS, + metricMap); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java new file mode 100644 index 0000000000000..58f00233710f8 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.util.Random; + +import org.junit.Test; + +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl; + +/** + * Unit tests for AbfsOutputStream statistics. + */ +public class TestAbfsOutputStreamStatistics + extends AbstractAbfsIntegrationTest { + + private static final int LOW_RANGE_FOR_RANDOM_VALUE = 49; + private static final int HIGH_RANGE_FOR_RANDOM_VALUE = 9999; + private static final int OPERATIONS = 10; + + public TestAbfsOutputStreamStatistics() throws Exception { + } + + /** + * Tests to check number of bytes failed to upload in + * {@link AbfsOutputStream}. + */ + @Test + public void testAbfsOutputStreamBytesFailed() { + describe("Testing number of bytes failed during upload in AbfsOutputSteam"); + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + new AbfsOutputStreamStatisticsImpl(); + + //Test for zero bytes uploaded. + assertEquals("Mismatch in number of bytes failed to upload", 0, + abfsOutputStreamStatistics.getBytesUploadFailed()); + + //Populating small random value for bytesFailed. + int randomBytesFailed = new Random().nextInt(LOW_RANGE_FOR_RANDOM_VALUE); + abfsOutputStreamStatistics.uploadFailed(randomBytesFailed); + //Test for bytes failed to upload. + assertEquals("Mismatch in number of bytes failed to upload", + randomBytesFailed, abfsOutputStreamStatistics.getBytesUploadFailed()); + + //Reset statistics for the next test. + abfsOutputStreamStatistics = new AbfsOutputStreamStatisticsImpl(); + + /* + * Entering multiple random values for bytesFailed to check correct + * summation of values. + */ + int expectedBytesFailed = 0; + for (int i = 0; i < OPERATIONS; i++) { + randomBytesFailed = new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE); + abfsOutputStreamStatistics.uploadFailed(randomBytesFailed); + expectedBytesFailed += randomBytesFailed; + } + //Test for bytes failed to upload. + assertEquals("Mismatch in number of bytes failed to upload", + expectedBytesFailed, abfsOutputStreamStatistics.getBytesUploadFailed()); + } + + /** + * Tests to check time spent on waiting for tasks to be complete on a + * blocking queue in {@link AbfsOutputStream}. + */ + @Test + public void testAbfsOutputStreamTimeSpentOnWaitTask() { + describe("Testing time Spent on waiting for task to be completed in " + + "AbfsOutputStream"); + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + new AbfsOutputStreamStatisticsImpl(); + + //Test for initial value of timeSpentWaitTask. + assertEquals("Mismatch in time spent on waiting for tasks to complete", 0, + abfsOutputStreamStatistics.getTimeSpentOnTaskWait()); + + int smallRandomStartTime = + new Random().nextInt(LOW_RANGE_FOR_RANDOM_VALUE); + int smallRandomEndTime = + new Random().nextInt(LOW_RANGE_FOR_RANDOM_VALUE) + + smallRandomStartTime; + int smallDiff = smallRandomEndTime - smallRandomStartTime; + abfsOutputStreamStatistics + .timeSpentTaskWait(smallRandomStartTime, smallRandomEndTime); + //Test for small random value of timeSpentWaitTask. + assertEquals("Mismatch in time spent on waiting for tasks to complete", + smallDiff, abfsOutputStreamStatistics.getTimeSpentOnTaskWait()); + + //Reset statistics for the next test. + abfsOutputStreamStatistics = new AbfsOutputStreamStatisticsImpl(); + + /* + * Entering multiple values for timeSpentTaskWait() to check the + * summation is happening correctly. Also calculating the expected result. + */ + int expectedRandomDiff = 0; + for (int i = 0; i < OPERATIONS; i++) { + int largeRandomStartTime = + new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE); + int largeRandomEndTime = new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE) + + largeRandomStartTime; + abfsOutputStreamStatistics + .timeSpentTaskWait(largeRandomStartTime, largeRandomEndTime); + expectedRandomDiff += largeRandomEndTime - largeRandomStartTime; + } + + /* + * Test to check correct value of timeSpentTaskWait after multiple + * random values are passed in it. + */ + assertEquals("Mismatch in time spent on waiting for tasks to complete", + expectedRandomDiff, + abfsOutputStreamStatistics.getTimeSpentOnTaskWait()); + } + + /** + * Unit Tests to check correct values of queue shrunk operations in + * AbfsOutputStream. + * + */ + @Test + public void testAbfsOutputStreamQueueShrink() { + describe("Testing queue shrink operations by AbfsOutputStream"); + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + new AbfsOutputStreamStatisticsImpl(); + + //Test for shrinking queue zero time. + assertEquals("Mismatch in queue shrunk operations", 0, + abfsOutputStreamStatistics.getQueueShrunkOps()); + + abfsOutputStreamStatistics.queueShrunk(); + + //Test for shrinking queue 1 time. + assertEquals("Mismatch in queue shrunk operations", 1, + abfsOutputStreamStatistics.getQueueShrunkOps()); + + //Reset statistics for the next test. + abfsOutputStreamStatistics = new AbfsOutputStreamStatisticsImpl(); + + /* + * Entering random values for queueShrunkOps and checking the correctness + * of summation for the statistic. + */ + int randomQueueValues = new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE); + for (int i = 0; i < randomQueueValues * OPERATIONS; i++) { + abfsOutputStreamStatistics.queueShrunk(); + } + /* + * Test for random times incrementing queue shrunk operations. + */ + assertEquals("Mismatch in queue shrunk operations", + randomQueueValues * OPERATIONS, + abfsOutputStreamStatistics.getQueueShrunkOps()); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java new file mode 100644 index 0000000000000..f831d2d4cd26b --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.Map; + +import org.junit.Test; + +import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; + +/** + * Unit tests for Abfs common counters. + */ +public class TestAbfsStatistics extends AbstractAbfsIntegrationTest { + + private static final int LARGE_OPS = 100; + + public TestAbfsStatistics() throws Exception { + } + + /** + * Tests for op_get_delegation_token and error_ignore counter values. + */ + @Test + public void testInitializeStats() throws IOException { + describe("Testing the counter values after Abfs is initialised"); + + AbfsCounters instrumentation = + new AbfsCountersImpl(getFileSystem().getUri()); + + //Testing summation of the counter values. + for (int i = 0; i < LARGE_OPS; i++) { + instrumentation.incrementCounter(AbfsStatistic.CALL_GET_DELEGATION_TOKEN, 1); + instrumentation.incrementCounter(AbfsStatistic.ERROR_IGNORED, 1); + } + + Map metricMap = instrumentation.toMap(); + + assertAbfsStatistics(AbfsStatistic.CALL_GET_DELEGATION_TOKEN, LARGE_OPS, + metricMap); + assertAbfsStatistics(AbfsStatistic.ERROR_IGNORED, LARGE_OPS, metricMap); + + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java index 420093307033c..b5a22badb2bff 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.lang.reflect.Field; import java.net.URL; import java.util.regex.Pattern; @@ -30,6 +31,11 @@ import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.util.VersionInfo; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + /** * Test useragent of abfs client. * @@ -43,7 +49,7 @@ private void validateUserAgent(String expectedPattern, AbfsConfiguration config, boolean includeSSLProvider) { AbfsClient client = new AbfsClient(baseUrl, null, - config, null, null); + config, null, null, null); String sslProviderName = null; if (includeSSLProvider) { sslProviderName = SSLSocketFactoryEx.getDefaultFactory().getProviderName(); @@ -91,4 +97,53 @@ public void verifyUserAgentWithSSLProvider() throws Exception { validateUserAgent(expectedUserAgentPattern, new URL("https://azure.com"), abfsConfiguration, true); } + + public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance, + AbfsConfiguration abfsConfig) throws Exception { + + AbfsClient client = mock(AbfsClient.class); + when(client.getRetryPolicy()).thenReturn( + new ExponentialRetryPolicy(1)); + + when(client.createDefaultUriQueryBuilder()).thenCallRealMethod(); + when(client.createRequestUrl(anyString(), anyString())).thenCallRealMethod(); + when(client.getAccessToken()).thenCallRealMethod(); + when(client.getSharedKeyCredentials()).thenCallRealMethod(); + when(client.createDefaultHeaders()).thenCallRealMethod(); + + // override baseurl + client = TestAbfsClient.setAbfsClientField(client, "abfsConfiguration", + abfsConfig); + + // override baseurl + client = TestAbfsClient.setAbfsClientField(client, "baseUrl", + baseAbfsClientInstance.getBaseUrl()); + + // override auth provider + client = TestAbfsClient.setAbfsClientField(client, "tokenProvider", + abfsConfig.getTokenProvider()); + + // override user agent + String userAgent = "APN/1.0 Azure Blob FS/3.4.0-SNAPSHOT (PrivateBuild " + + "JavaJRE 1.8.0_252; Linux 5.3.0-59-generic/amd64; openssl-1.0; " + + "UNKNOWN/UNKNOWN) MSFT"; + client = TestAbfsClient.setAbfsClientField(client, "userAgent", userAgent); + + return client; + } + + private static AbfsClient setAbfsClientField( + final AbfsClient client, + final String fieldName, + Object fieldObject) throws Exception { + + Field field = AbfsClient.class.getDeclaredField(fieldName); + field.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, + field.getModifiers() & ~java.lang.reflect.Modifier.FINAL); + field.set(client, fieldObject); + return client; + } }