Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface OMMultiTenantManager {

String OZONE_TENANT_RANGER_POLICY_DESCRIPTION =
"Created by Ozone. WARNING: "
+ "Changes will be lost when this tenant is deleted.";

String OZONE_TENANT_RANGER_ROLE_DESCRIPTION =
"Managed by Ozone. WARNING: "
+ "Changes will be overridden. "
+ "Use Ozone tenant CLI to manage users in this tenant role instead.";

/* TODO: Outdated
* Init multi-tenant manager. Performs initialization e.g.
* - Initialize Multi-Tenant-Gatekeeper-Plugin
Expand Down Expand Up @@ -341,15 +351,6 @@ static boolean checkAndEnableMultiTenancy(
return true;
}

String OZONE_TENANT_RANGER_POLICY_DESCRIPTION =
"Created by Ozone. WARNING: "
+ "Changes will be lost when this tenant is deleted.";

String OZONE_TENANT_RANGER_ROLE_DESCRIPTION =
"Managed by Ozone. WARNING: "
+ "Changes will be overridden. "
+ "Use Ozone tenant CLI to manage users in this tenant role instead.";

/**
* Returns default VolumeAccess policy given tenant and role names.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,6 @@ public class OMPerformanceMetrics {
private static final String SOURCE_NAME =
OMPerformanceMetrics.class.getSimpleName();

public static OMPerformanceMetrics register() {
MetricsSystem ms = DefaultMetricsSystem.instance();
return ms.register(SOURCE_NAME,
"OzoneManager Request Performance",
new OMPerformanceMetrics());
}

public static void unregister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);
}

@Metric(about = "Overall lookupKey in nanoseconds")
private MutableRate lookupLatencyNs;

Expand Down Expand Up @@ -160,6 +148,18 @@ public static void unregister() {
@Metric(about = "Latency of each iteration of OpenKeyCleanupService in ms")
private MutableGaugeLong openKeyCleanupServiceLatencyMs;

public static OMPerformanceMetrics register() {
MetricsSystem ms = DefaultMetricsSystem.instance();
return ms.register(SOURCE_NAME,
"OzoneManager Request Performance",
new OMPerformanceMetrics());
}

public static void unregister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);
}

public void addLookupLatency(long latencyInNs) {
lookupLatencyNs.add(latencyInNs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,6 @@ public final class OMPolicyProvider extends PolicyProvider {
private static final Supplier<OMPolicyProvider> SUPPLIER =
MemoizedSupplier.valueOf(OMPolicyProvider::new);

private OMPolicyProvider() {
}

@Private
@Unstable
public static OMPolicyProvider getInstance() {
return SUPPLIER.get();
}

private static final List<Service> OM_SERVICES =
Arrays.asList(
new Service(OZONE_OM_SECURITY_CLIENT_PROTOCOL_ACL,
Expand All @@ -67,6 +58,15 @@ public static OMPolicyProvider getInstance() {
ReconfigureProtocol.class)
);

private OMPolicyProvider() {
}

@Private
@Unstable
public static OMPolicyProvider getInstance() {
return SUPPLIER.get();
}

@Override
public Service[] getServices() {
return OM_SERVICES.toArray(new Service[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,6 @@
* Helper class for fetching List Status for a path.
*/
public class OzoneListStatusHelper {
/**
* Interface to get the File Status for a path.
*/
@FunctionalInterface
public interface GetFileStatusHelper {
OzoneFileStatus apply(OmKeyArgs args, String clientAddress,
boolean skipFileNotFoundError) throws IOException;
}

private static final Logger LOG =
LoggerFactory.getLogger(OzoneListStatusHelper.class);

Expand All @@ -79,6 +70,15 @@ OzoneFileStatus apply(OmKeyArgs args, String clientAddress,
this.omDefaultReplication = omDefaultReplication;
}

/**
* Interface to get the File Status for a path.
*/
@FunctionalInterface
public interface GetFileStatusHelper {
OzoneFileStatus apply(OmKeyArgs args, String clientAddress,
boolean skipFileNotFoundError) throws IOException;
}

public Collection<OzoneFileStatus> listStatusFSO(OmKeyArgs args,
String startKey, long numEntries, String clientAddress,
boolean allowPartialPrefixes) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,22 +481,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl

private String omHostName;

/**
* OM Startup mode.
*/
public enum StartupOption {
REGUALR,
BOOTSTRAP,
FORCE_BOOTSTRAP
}

private enum State {
INITIALIZED,
BOOTSTRAPPING,
RUNNING,
STOPPED
}

// Used in MiniOzoneCluster testing
private State omState;
private Thread emptier;
Expand Down Expand Up @@ -5157,4 +5141,20 @@ public void compactOMDB(String columnFamily) throws IOException {
public OMExecutionFlow getOmExecutionFlow() {
return omExecutionFlow;
}

/**
* OM Startup mode.
*/
public enum StartupOption {
REGUALR,
BOOTSTRAP,
FORCE_BOOTSTRAP
}

private enum State {
INITIALIZED,
BOOTSTRAPPING,
RUNNING,
STOPPED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public class SstFilteringService extends BackgroundService

private AtomicBoolean running;

private final BootstrapStateHandler.Lock lock = new BootstrapStateHandler.Lock();

public static boolean isSstFiltered(OzoneConfiguration ozoneConfiguration, SnapshotInfo snapshotInfo) {
Path sstFilteredFile = Paths.get(OmSnapshotManager.getSnapshotPath(ozoneConfiguration,
snapshotInfo), SST_FILTERED_FILE);
Expand All @@ -101,9 +103,6 @@ public SstFilteringService(long interval, TimeUnit unit, long serviceTimeout,
running = new AtomicBoolean(false);
}

private final BootstrapStateHandler.Lock lock =
new BootstrapStateHandler.Lock();

@Override
public void start() {
running.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@
@Metrics(about = "OzoneManager HA Metrics", context = OzoneConsts.OZONE)
public final class OMHAMetrics implements MetricsSource {

public static final String SOURCE_NAME = OMHAMetrics.class.getSimpleName();
private final OMHAMetricsInfo omhaMetricsInfo = new OMHAMetricsInfo();
private MetricsRegistry metricsRegistry;

private String currNodeId;
private String leaderId;

/**
* Private nested class to hold the values
* of MetricsInfo for OMHAMetrics.
Expand Down Expand Up @@ -72,14 +79,6 @@ public void setNodeId(String nodeId) {
}
}

public static final String SOURCE_NAME =
OMHAMetrics.class.getSimpleName();
private final OMHAMetricsInfo omhaMetricsInfo = new OMHAMetricsInfo();
private MetricsRegistry metricsRegistry;

private String currNodeId;
private String leaderId;

private OMHAMetrics(String currNodeId, String leaderId) {
this.currNodeId = currNodeId;
this.leaderId = leaderId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,18 @@
* This class is used for OM Audit logs.
*/
public final class OMAuditLogger {
private OMAuditLogger() {
}

private static final Map<Type, OMAction> CMD_AUDIT_ACTION_MAP = new HashMap<>();
private static final Logger LOG = LoggerFactory.getLogger(OMAuditLogger.class);

private static final Map<Type, OMAction> CMD_AUDIT_ACTION_MAP = new HashMap<>();

static {
init();
}

private OMAuditLogger() {
}

private static void init() {
CMD_AUDIT_ACTION_MAP.put(Type.CreateVolume, OMAction.CREATE_VOLUME);
CMD_AUDIT_ACTION_MAP.put(Type.DeleteVolume, OMAction.DELETE_VOLUME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,35 @@ public final class OzoneManagerDoubleBuffer {
private static final Logger LOG =
LoggerFactory.getLogger(OzoneManagerDoubleBuffer.class);

private Queue<Entry> currentBuffer;
private Queue<Entry> readyBuffer;
/**
* Limit the number of un-flushed transactions for {@link OzoneManagerStateMachine}.
*/
private final Semaphore unFlushedTransactions;

/** To flush the buffers. */
private final Daemon daemon;
/** Is the {@link #daemon} running? */
private final AtomicBoolean isRunning = new AtomicBoolean(false);
/** Notify flush operations are completed by the {@link #daemon}. */
private final FlushNotifier flushNotifier;

private final OMMetadataManager omMetadataManager;

private final Consumer<TermIndex> updateLastAppliedIndex;

private final S3SecretManager s3SecretManager;

private final boolean isTracingEnabled;

private final OzoneManagerDoubleBufferMetrics metrics = OzoneManagerDoubleBufferMetrics.create();

/** Accumulative count (for testing and debug only). */
private final AtomicLong flushedTransactionCount = new AtomicLong();
/** The number of flush iterations (for testing and debug only). */
private final AtomicLong flushIterations = new AtomicLong();

/** Entry for {@link #currentBuffer} and {@link #readyBuffer}. */
private static class Entry {
private final TermIndex termIndex;
Expand Down Expand Up @@ -160,35 +189,6 @@ static Semaphore newSemaphore(int permits) {
return permits > 0 ? new Semaphore(permits) : null;
}

private Queue<Entry> currentBuffer;
private Queue<Entry> readyBuffer;
/**
* Limit the number of un-flushed transactions for {@link OzoneManagerStateMachine}.
*/
private final Semaphore unFlushedTransactions;

/** To flush the buffers. */
private final Daemon daemon;
/** Is the {@link #daemon} running? */
private final AtomicBoolean isRunning = new AtomicBoolean(false);
/** Notify flush operations are completed by the {@link #daemon}. */
private final FlushNotifier flushNotifier;

private final OMMetadataManager omMetadataManager;

private final Consumer<TermIndex> updateLastAppliedIndex;

private final S3SecretManager s3SecretManager;

private final boolean isTracingEnabled;

private final OzoneManagerDoubleBufferMetrics metrics = OzoneManagerDoubleBufferMetrics.create();

/** Accumulative count (for testing and debug only). */
private final AtomicLong flushedTransactionCount = new AtomicLong();
/** The number of flush iterations (for testing and debug only). */
private final AtomicLong flushIterations = new AtomicLong();

private OzoneManagerDoubleBuffer(Builder b) {
this.currentBuffer = new ConcurrentLinkedQueue<>();
this.readyBuffer = new ConcurrentLinkedQueue<>();
Expand Down Expand Up @@ -613,21 +613,6 @@ public void awaitFlush() throws InterruptedException {
}

static class FlushNotifier {
static class Entry {
private final CompletableFuture<Integer> future = new CompletableFuture<>();
private int count;

private CompletableFuture<Integer> await() {
count++;
return future;
}

private int complete() {
Preconditions.assertTrue(future.complete(count));
return future.join();
}
}

/** The size of the map is at most two since it uses {@link #flushCount} + 2 in {@link #await()} .*/
private final Map<Integer, Entry> flushFutures = new TreeMap<>();
private int awaitCount;
Expand All @@ -652,5 +637,20 @@ synchronized int notifyFlush() {
LOG.debug("notifyFlush {}, awaitCount: {} -> {}", flush, await, awaitCount);
return await;
}

static class Entry {
private final CompletableFuture<Integer> future = new CompletableFuture<>();
private int count;

private CompletableFuture<Integer> await() {
count++;
return future;
}

private int complete() {
Preconditions.assertTrue(future.complete(count));
return future.join();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,6 @@ public class OzoneManagerRatisServerConfig {
)
private long logAppenderWaitTimeMin;

public long getLogAppenderWaitTimeMin() {
return logAppenderWaitTimeMin;
}

public void setLogAppenderWaitTimeMin(long logAppenderWaitTimeMin) {
this.logAppenderWaitTimeMin = logAppenderWaitTimeMin;
}

@Config(key = "retrycache.expirytime",
defaultValue = "300s",
type = ConfigType.TIME,
Expand All @@ -60,6 +52,14 @@ public void setLogAppenderWaitTimeMin(long logAppenderWaitTimeMin) {
)
private long retryCacheTimeout = Duration.ofSeconds(300).toMillis();

public long getLogAppenderWaitTimeMin() {
return logAppenderWaitTimeMin;
}

public void setLogAppenderWaitTimeMin(long logAppenderWaitTimeMin) {
this.logAppenderWaitTimeMin = logAppenderWaitTimeMin;
}

public long getRetryCacheTimeout() {
return retryCacheTimeout;
}
Expand Down
Loading