Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl;
import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
Expand Down Expand Up @@ -426,6 +427,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext() {
.withWriteBufferSize(abfsConfiguration.getWriteBufferSize())
.enableFlush(abfsConfiguration.isFlushEnabled())
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
Expand All @@ -50,6 +52,7 @@
* The BlobFsOutputStream for Rest AbfsClient.
*/
public class AbfsOutputStream extends OutputStream implements Syncable, StreamCapabilities {

private final AbfsClient client;
private final String path;
private long position;
Expand Down Expand Up @@ -80,6 +83,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
= new ElasticByteBufferPool();

private final Statistics statistics;
private final AbfsOutputStreamStatistics outputStreamStatistics;

private static final Logger LOG =
LoggerFactory.getLogger(AbfsOutputStream.class);

public AbfsOutputStream(
final AbfsClient client,
Expand All @@ -101,6 +108,7 @@ public AbfsOutputStream(
this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
this.bufferIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics();

this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();

Expand Down Expand Up @@ -278,6 +286,9 @@ public synchronized void close() throws IOException {
threadExecutor.shutdownNow();
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Closing AbfsOutputStream ", toString());
}
}

private synchronized void flushInternal(boolean isClose) throws IOException {
Expand All @@ -296,16 +307,20 @@ private synchronized void writeCurrentBufferToService() throws IOException {
if (bufferIndex == 0) {
return;
}
outputStreamStatistics.writeCurrentBuffer();

final byte[] bytes = buffer;
final int bytesLength = bufferIndex;
outputStreamStatistics.bytesToUpload(bytesLength);
buffer = byteBufferPool.getBuffer(false, bufferSize).array();
bufferIndex = 0;
final long offset = position;
position += bytesLength;

if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
long start = System.currentTimeMillis();
waitForTaskToComplete();
outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis());
}

final Future<Void> job = completionService.submit(new Callable<Void>() {
Expand All @@ -324,6 +339,11 @@ public Void call() throws Exception {
}
});

if (job.isCancelled()) {
outputStreamStatistics.uploadFailed(bytesLength);
} else {
outputStreamStatistics.uploadSuccessful(bytesLength);
}
writeOperations.add(new WriteOperation(job, offset, bytesLength));

// Try to shrink the queue
Expand Down Expand Up @@ -388,6 +408,8 @@ private synchronized void shrinkWriteOperationQueue() throws IOException {
writeOperations.peek().task.get();
lastTotalAppendOffset += writeOperations.peek().length;
writeOperations.remove();
// Incrementing statistics to indicate queue has been shrunk.
outputStreamStatistics.queueShrunk();
}
} catch (Exception e) {
if (e.getCause() instanceof AzureBlobFileSystemException) {
Expand Down Expand Up @@ -435,4 +457,38 @@ private static class WriteOperation {
public synchronized void waitForPendingUploads() throws IOException {
waitForTaskToComplete();
}

/**
* Getter method for AbfsOutputStream statistics.
*
* @return statistics for AbfsOutputStream.
*/
@VisibleForTesting
public AbfsOutputStreamStatistics getOutputStreamStatistics() {
return outputStreamStatistics;
}

/**
* Getter to get the size of the task queue.
*
* @return the number of writeOperations in AbfsOutputStream.
*/
@VisibleForTesting
public int getWriteOperationsSize() {
return writeOperations.size();
}

/**
* Appending AbfsOutputStream statistics to base toString().
*
* @return String with AbfsOutputStream statistics.
*/
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(super.toString());
sb.append("AbfsOuputStream@").append(this.hashCode()).append("){");
sb.append(outputStreamStatistics.toString());
sb.append("}");
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {

private boolean disableOutputStreamFlush;

private AbfsOutputStreamStatistics streamStatistics;

public AbfsOutputStreamContext() {
}

Expand All @@ -49,6 +51,12 @@ public AbfsOutputStreamContext disableOutputStreamFlush(
return this;
}

public AbfsOutputStreamContext withStreamStatistics(
final AbfsOutputStreamStatistics streamStatistics) {
this.streamStatistics = streamStatistics;
return this;
}

public AbfsOutputStreamContext build() {
// Validation of parameters to be done here.
return this;
Expand All @@ -65,4 +73,8 @@ public boolean isEnableFlush() {
public boolean isDisableOutputStreamFlush() {
return disableOutputStreamFlush;
}

public AbfsOutputStreamStatistics getStreamStatistics() {
return streamStatistics;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.services;

import org.apache.hadoop.classification.InterfaceStability;

/**
* Interface for {@link AbfsOutputStream} statistics.
*/
@InterfaceStability.Unstable
public interface AbfsOutputStreamStatistics {

/**
* Number of bytes to be uploaded.
*
* @param bytes number of bytes to upload.
*/
void bytesToUpload(long bytes);

/**
* Records a successful upload and the number of bytes uploaded.
*
* @param bytes number of bytes that were successfully uploaded.
*/
void uploadSuccessful(long bytes);

/**
* Records that upload is failed and the number of bytes.
*
* @param bytes number of bytes that failed to upload.
*/
void uploadFailed(long bytes);

/**
* Time spent in waiting for tasks to be completed in the blocking queue.
*
* @param start millisecond at which the wait for task to be complete begins.
* @param end millisecond at which the wait is completed for the task.
*/
void timeSpentTaskWait(long start, long end);

/**
* Number of times task queue is shrunk.
*/
void queueShrunk();

/**
* Number of times buffer is written to the service after a write operation.
*/
void writeCurrentBuffer();

/**
* Method to form a string of all AbfsOutputStream statistics and their
* values.
*
* @return AbfsOutputStream statistics.
*/
@Override
String toString();

}
Loading