Skip to content

Commit

Permalink
[fix][fn] Prevent create state table from API calls for non-exists in…
Browse files Browse the repository at this point in the history
…stances (#22107)
  • Loading branch information
freeznet authored and Technoboy- committed Feb 25, 2024
1 parent f01a3a5 commit 603f17b
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,12 @@ public FunctionState getFunctionState(final String tenant,
throw new RestException(Status.BAD_REQUEST, e.getMessage());
}

FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
log.warn("getFunctionState does not exist @ /{}/{}/{}", tenant, namespace, functionName);
throw new RestException(Status.NOT_FOUND, String.format("'%s' is not found", functionName));
}

try {
DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName);
StateValue value = store.getStateValue(key);
Expand Down Expand Up @@ -1219,6 +1225,12 @@ public void putFunctionState(final String tenant,
throw new RestException(Status.BAD_REQUEST, e.getMessage());
}

FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
log.warn("putFunctionState does not exist @ /{}/{}/{}", tenant, namespace, functionName);
throw new RestException(Status.NOT_FOUND, String.format("'%s' is not found", functionName));
}

try {
DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName);
ByteBuffer data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ public void testSourceState() throws Exception {
assertEquals(e.getStatusCode(), 404);
}

// query a non-exist instance should get a 404 error
{
PulsarAdminException e = expectThrows(PulsarAdminException.class, () -> {
admin.functions().getFunctionState("public", "default", "non-exist", "non-exist");
});
assertEquals(e.getStatusCode(), 404);
}

Awaitility.await().ignoreExceptions().untilAsserted(() -> {
FunctionState functionState = admin.functions().getFunctionState("public", "default", sourceName, "now");
assertTrue(functionState.getStringValue().matches("val1-.*"));
Expand Down Expand Up @@ -204,6 +212,14 @@ public void testSinkState() throws Exception {
assertEquals(e.getStatusCode(), 404);
}

// query a non-exist instance should get a 404 error
{
PulsarAdminException e = expectThrows(PulsarAdminException.class, () -> {
admin.functions().getFunctionState("public", "default", "non-exist", "non-exist");
});
assertEquals(e.getStatusCode(), 404);
}

for (int i = 0; i < numMessages; i++) {
producer.send("foo");
}
Expand All @@ -226,6 +242,20 @@ public void testSinkState() throws Exception {
getSinkInfoNotFound(sinkName);
}

@Test(groups = {"python_state", "state", "function", "python_function"})
public void testNonExistFunction() throws Exception {
String functionName = "non-exist-function-" + randomName(8);
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
// query a non-exist instance should get a 404 error
{
PulsarAdminException e = expectThrows(PulsarAdminException.class, () -> {
admin.functions().getFunctionState("public", "default", functionName, "non-exist");
});
assertEquals(e.getStatusCode(), 404);
}
}
}

@Test(groups = {"java_state", "state", "function", "java_function"})
public void testBytes2StringNotUTF8() {
byte[] valueBytes = Base64.getDecoder().decode(VALUE_BASE64);
Expand Down

0 comments on commit 603f17b

Please sign in to comment.