Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ private void setVolumeAcl(ObjectStore objectStore, String volumeName,
private void checkUser(MiniOzoneCluster cluster, UserGroupInformation user,
List<String> expectVol, boolean expectListAllSuccess) throws IOException {

UserGroupInformation.setLoginUser(user);
OzoneClient client = cluster.getClient();
ObjectStore objectStore = client.getObjectStore();

Expand Down Expand Up @@ -190,6 +189,59 @@ private void checkUser(MiniOzoneCluster cluster, UserGroupInformation user,
}
}

/**
* Check if listVolume of other users than the login user works as expected.
* ozone.om.volume.listall.allowed = true
* Everyone should be able to list other users' volumes with this config.
*/
@Test
public void testListVolumeWithOtherUsersListAllAllowed() throws Exception {
// ozone.acl.enabled = true, ozone.om.volume.listall.allowed = true
MiniOzoneCluster cluster = startCluster(true, true);

// Login as user1, list other users' volumes
UserGroupInformation.setLoginUser(user1);
checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
true);
checkUser(cluster, adminUser, Arrays.asList("volume1", "volume2", "volume3",
"volume4", "volume5"), true);

UserGroupInformation.setLoginUser(user2);
checkUser(cluster, user1, Arrays.asList("volume1", "volume4", "volume5"),
true);
checkUser(cluster, adminUser, Arrays.asList("volume1", "volume2", "volume3",
"volume4", "volume5"), true);

stopCluster(cluster);
}

/**
* Check if listVolume of other users than the login user works as expected.
* ozone.om.volume.listall.allowed = false
* Only admin should be able to list other users' volumes with this config.
*/
@Test
public void testListVolumeWithOtherUsersListAllDisallowed() throws Exception {
// ozone.acl.enabled = true, ozone.om.volume.listall.allowed = false
MiniOzoneCluster cluster = startCluster(true, false);

// Login as user1, list other users' volumes, expect failure
UserGroupInformation.setLoginUser(user1);
checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
false);
checkUser(cluster, adminUser, Arrays.asList("volume1", "volume2", "volume3",
"volume4", "volume5"), false);

// While admin should be able to list volumes just fine.
UserGroupInformation.setLoginUser(adminUser);
checkUser(cluster, user1, Arrays.asList("volume1", "volume4", "volume5"),
true);
checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
true);

stopCluster(cluster);
}

@Test
public void testAclEnabledListAllAllowed() throws Exception {
// ozone.acl.enabled = true, ozone.om.volume.listall.allowed = true
Expand All @@ -207,10 +259,15 @@ public void testAclEnabledListAllAllowed() throws Exception {
public void testAclEnabledListAllDisallowed() throws Exception {
// ozone.acl.enabled = true, ozone.om.volume.listall.allowed = false
MiniOzoneCluster cluster = startCluster(true, false);
// The default user is adminUser as set in init(),
// listall always succeeds if we use that UGI, we should use non-admin here
UserGroupInformation.setLoginUser(user1);
checkUser(cluster, user1, Arrays.asList("volume1", "volume4", "volume5"),
false);
UserGroupInformation.setLoginUser(user2);
checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
false);
UserGroupInformation.setLoginUser(adminUser);
checkUser(cluster, adminUser, Arrays.asList("volume1", "volume2", "volume3",
"volume4", "volume5"), true);
stopCluster(cluster);
Expand All @@ -231,8 +288,11 @@ public void testAclDisabledListAllAllowed() throws Exception {
public void testAclDisabledListAllDisallowed() throws Exception {
// ozone.acl.enabled = false, ozone.om.volume.listall.allowed = false
MiniOzoneCluster cluster = startCluster(false, false);
// If ACL is disabled, all permission checks are disabled in Ozone by design
UserGroupInformation.setLoginUser(user1);
checkUser(cluster, user1, Arrays.asList("volume1", "volume3", "volume5"),
true);
UserGroupInformation.setLoginUser(user2);
checkUser(cluster, user2, Arrays.asList("volume2", "volume4"),
true); // listall will succeed since acl is disabled
stopCluster(cluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1583,11 +1583,11 @@ private void checkAcls(ResourceType resType, StoreType store,
*
* @return true if permission granted, false if permission denied.
*/
private boolean hasAcls(ResourceType resType, StoreType store,
ACLType acl, String vol, String bucket, String key) {
private boolean hasAcls(String userName, ResourceType resType,
StoreType store, ACLType acl, String vol, String bucket, String key) {
try {
return checkAcls(resType, store, acl, vol, bucket, key,
ProtobufRpcEngine.Server.getRemoteUser(),
UserGroupInformation.createRemoteUser(userName),
ProtobufRpcEngine.Server.getRemoteIp(),
ProtobufRpcEngine.Server.getRemoteIp().getHostName(),
false);
Expand Down Expand Up @@ -1849,8 +1849,8 @@ public List<OmVolumeArgs> listVolumeByUser(String userName, String prefix,
List<OmVolumeArgs> result = new ArrayList<>();
// Filter all volumes by LIST ACL
for (OmVolumeArgs volumeArgs : listAllVolumes) {
if (hasAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.LIST,
volumeArgs.getVolume(), null, null)) {
if (hasAcls(userName, ResourceType.VOLUME, StoreType.OZONE,
ACLType.LIST, volumeArgs.getVolume(), null, null)) {
result.add(volumeArgs);
}
}
Expand Down