Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class TestRequestAttributes {
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY5, addRandomRequestAttributes());
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY6, addRandomRequestAttributes());
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY7, addRandomRequestAttributes());
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY8, new HashMap<>());
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY8, new HashMap<String, byte[]>());
}
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(100);
private static final byte[] FAMILY = Bytes.toBytes("0");
Expand Down Expand Up @@ -316,20 +316,26 @@ public Optional<RegionObserver> getRegionObserver() {
@Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
List<Cell> result) throws IOException {
validateRequestAttributes(getRequestAttributesForRowKey(get.getRow()));
if (!isValidRequestAttributes(getRequestAttributesForRowKey(get.getRow()))) {
throw new IOException("Incorrect request attributes");
}
}

@Override
public boolean preScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
validateRequestAttributes(REQUEST_ATTRIBUTES_SCAN);
if (!isValidRequestAttributes(REQUEST_ATTRIBUTES_SCAN)) {
throw new IOException("Incorrect request attributes");
}
return hasNext;
}

@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit)
throws IOException {
validateRequestAttributes(getRequestAttributesForRowKey(put.getRow()));
if (!isValidRequestAttributes(getRequestAttributesForRowKey(put.getRow()))) {
throw new IOException("Incorrect request attributes");
}
}

private Map<String, byte[]> getRequestAttributesForRowKey(byte[] rowKey) {
Expand All @@ -341,20 +347,21 @@ private Map<String, byte[]> getRequestAttributesForRowKey(byte[] rowKey) {
return null;
}

private void validateRequestAttributes(Map<String, byte[]> requestAttributes) {
private boolean isValidRequestAttributes(Map<String, byte[]> requestAttributes) {
RpcCall rpcCall = RpcServer.getCurrentCall().get();
Map<String, byte[]> attrs = rpcCall.getRequestAttributes();
if (attrs.size() != requestAttributes.size()) {
return;
return false;
}
for (Map.Entry<String, byte[]> attr : attrs.entrySet()) {
if (!requestAttributes.containsKey(attr.getKey())) {
return;
return false;
}
if (!Arrays.equals(requestAttributes.get(attr.getKey()), attr.getValue())) {
return;
return false;
}
}
return true;
}
}
}