Skip to content

Commit e8dceec

Browse files
committed
use ConfigurationUtil for KafkaSourceConfig
Signed-off-by: Yupeng Fu <[email protected]>
1 parent 6fb0c1b commit e8dceec

File tree

3 files changed

+327
-5
lines changed

3 files changed

+327
-5
lines changed
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.core.util;
10+
11+
import org.opensearch.OpenSearchException;
12+
import org.opensearch.OpenSearchParseException;
13+
14+
import java.util.Map;
15+
16+
/**
17+
* Utility class for parsing configurations.
18+
*
19+
* @opensearch.internal
20+
*/
21+
public class ConfigurationUtils {
22+
23+
private ConfigurationUtils() {}
24+
25+
/**
26+
* Returns and removes the specified optional property from the specified configuration map.
27+
* <p>
28+
* If the property value isn't of type string a {@link OpenSearchParseException} is thrown.
29+
*/
30+
public static String readOptionalStringProperty(Map<String, Object> configuration, String propertyName) {
31+
Object value = configuration.get(propertyName);
32+
return readString(propertyName, value);
33+
}
34+
35+
/**
36+
* Returns and removes the specified property from the specified configuration map.
37+
* <p>
38+
* If the property value isn't of type string an {@link OpenSearchParseException} is thrown.
39+
* If the property is missing an {@link OpenSearchParseException} is thrown
40+
*/
41+
public static String readStringProperty(Map<String, Object> configuration, String propertyName) {
42+
return readStringProperty(configuration, propertyName, null);
43+
}
44+
45+
/**
46+
* Returns the specified property from the specified configuration map.
47+
* <p>
48+
* If the property value isn't of type string a {@link OpenSearchParseException} is thrown.
49+
* If the property is missing and no default value has been specified a {@link OpenSearchParseException} is thrown
50+
*/
51+
public static String readStringProperty(Map<String, Object> configuration, String propertyName, String defaultValue) {
52+
Object value = configuration.get(propertyName);
53+
if (value == null && defaultValue != null) {
54+
return defaultValue;
55+
} else if (value == null) {
56+
throw newConfigurationException(propertyName, "required property is missing");
57+
}
58+
return readString(propertyName, value);
59+
}
60+
61+
public static OpenSearchException newConfigurationException(String propertyName, String reason) {
62+
String msg;
63+
if (propertyName == null) {
64+
msg = reason;
65+
} else {
66+
msg = "[" + propertyName + "] " + reason;
67+
}
68+
OpenSearchParseException exception = new OpenSearchParseException(msg);
69+
addMetadataToException(exception, propertyName);
70+
return exception;
71+
}
72+
73+
private static String readString(String propertyName, Object value) {
74+
if (value == null) {
75+
return null;
76+
}
77+
if (value instanceof String) {
78+
return (String) value;
79+
}
80+
throw newConfigurationException(propertyName, "property isn't a string, but of type [" + value.getClass().getName() + "]");
81+
}
82+
83+
private static void addMetadataToException(OpenSearchException exception, String propertyName) {
84+
if (propertyName != null) {
85+
exception.addMetadata("opensearch.property_name", propertyName);
86+
}
87+
}
88+
89+
/**
90+
* Returns the specified property from the specified configuration map.
91+
* <p>
92+
* If the property value isn't of type string or int a {@link OpenSearchParseException} is thrown.
93+
* If the property is missing and no default value has been specified a {@link OpenSearchParseException} is thrown
94+
*/
95+
public static String readStringOrIntProperty(Map<String, Object> configuration, String propertyName, String defaultValue) {
96+
Object value = configuration.get(propertyName);
97+
if (value == null && defaultValue != null) {
98+
return defaultValue;
99+
} else if (value == null) {
100+
throw newConfigurationException(propertyName, "required property is missing");
101+
}
102+
return readStringOrInt(propertyName, value);
103+
}
104+
105+
private static String readStringOrInt(String propertyName, Object value) {
106+
if (value == null) {
107+
return null;
108+
}
109+
if (value instanceof String) {
110+
return (String) value;
111+
} else if (value instanceof Integer) {
112+
return String.valueOf(value);
113+
}
114+
throw newConfigurationException(propertyName, "property isn't a string or int, but of type [" + value.getClass().getName() + "]");
115+
}
116+
117+
/**
118+
* Returns the specified property from the specified configuration map.
119+
* <p>
120+
* If the property value isn't of type string or int a {@link OpenSearchParseException} is thrown.
121+
*/
122+
public static String readOptionalStringOrIntProperty(Map<String, Object> configuration, String propertyName) {
123+
Object value = configuration.get(propertyName);
124+
if (value == null) {
125+
return null;
126+
}
127+
return readStringOrInt(propertyName, value);
128+
}
129+
130+
public static boolean readBooleanProperty(Map<String, Object> configuration, String propertyName, boolean defaultValue) {
131+
Object value = configuration.get(propertyName);
132+
if (value == null) {
133+
return defaultValue;
134+
} else {
135+
return readBoolean(propertyName, value).booleanValue();
136+
}
137+
}
138+
139+
private static Boolean readBoolean(String propertyName, Object value) {
140+
if (value == null) {
141+
return null;
142+
}
143+
if (value instanceof Boolean) {
144+
return (boolean) value;
145+
}
146+
throw newConfigurationException(propertyName, "property isn't a boolean, but of type [" + value.getClass().getName() + "]");
147+
}
148+
149+
/**
150+
* Returns the specified property from the specified configuration map.
151+
* <p>
152+
* If the property value isn't of type int a {@link OpenSearchParseException} is thrown.
153+
* If the property is missing an {@link OpenSearchParseException} is thrown
154+
*/
155+
public static Integer readIntProperty(Map<String, Object> configuration, String propertyName, Integer defaultValue) {
156+
Object value = configuration.get(propertyName);
157+
if (value == null) {
158+
return defaultValue;
159+
}
160+
try {
161+
return Integer.parseInt(value.toString());
162+
} catch (Exception e) {
163+
throw newConfigurationException(propertyName, "property cannot be converted to an int [" + value + "]");
164+
}
165+
}
166+
167+
/**
168+
* Returns the specified property from the specified configuration map.
169+
* <p>
170+
* If the property value isn't of type int a {@link OpenSearchParseException} is thrown.
171+
* If the property is missing an {@link OpenSearchParseException} is thrown
172+
*/
173+
public static Double readDoubleProperty(Map<String, Object> configuration, String propertyName) {
174+
Object value = configuration.get(propertyName);
175+
if (value == null) {
176+
throw newConfigurationException(propertyName, "required property is missing");
177+
}
178+
try {
179+
return Double.parseDouble(value.toString());
180+
} catch (Exception e) {
181+
throw newConfigurationException(propertyName, "property cannot be converted to a double [" + value + "]");
182+
}
183+
}
184+
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.core.util;
10+
11+
import org.opensearch.OpenSearchParseException;
12+
import org.opensearch.test.OpenSearchTestCase;
13+
import org.junit.Before;
14+
15+
import java.util.Arrays;
16+
import java.util.HashMap;
17+
import java.util.Map;
18+
19+
import static org.hamcrest.Matchers.equalTo;
20+
21+
public class ConfigurationUtilsTests extends OpenSearchTestCase {
22+
private Map<String, Object> config;
23+
24+
@Before
25+
public void setConfig() {
26+
config = new HashMap<>();
27+
config.put("foo", "bar");
28+
config.put("boolVal", true);
29+
config.put("null", null);
30+
config.put("arr", Arrays.asList("1", "2", "3"));
31+
config.put("ip", "127.0.0.1");
32+
config.put("num", 1);
33+
config.put("double", 1.0);
34+
}
35+
36+
public void testReadStringProperty() {
37+
String val = ConfigurationUtils.readStringProperty(config, "foo");
38+
assertThat(val, equalTo("bar"));
39+
String val1 = ConfigurationUtils.readStringProperty(config, "foo1", "none");
40+
assertThat(val1, equalTo("none"));
41+
try {
42+
ConfigurationUtils.readStringProperty(config, "foo1", null);
43+
} catch (OpenSearchParseException e) {
44+
assertThat(e.getMessage(), equalTo("[foo1] required property is missing"));
45+
}
46+
}
47+
48+
public void testOptionalReadStringProperty() {
49+
String val = ConfigurationUtils.readOptionalStringProperty(config, "foo");
50+
assertThat(val, equalTo("bar"));
51+
String val1 = ConfigurationUtils.readOptionalStringProperty(config, "foo1");
52+
assertThat(val, equalTo("bar"));
53+
assertThat(val1, equalTo(null));
54+
}
55+
56+
public void testReadStringPropertyInvalidType() {
57+
try {
58+
ConfigurationUtils.readStringProperty(config, "arr");
59+
} catch (OpenSearchParseException e) {
60+
assertThat(e.getMessage(), equalTo("[arr] property isn't a string, but of type [java.util.Arrays$ArrayList]"));
61+
}
62+
}
63+
64+
public void testReadBooleanProperty() {
65+
Boolean val = ConfigurationUtils.readBooleanProperty(config, "boolVal", false);
66+
assertThat(val, equalTo(true));
67+
}
68+
69+
public void testReadNullBooleanProperty() {
70+
Boolean val = ConfigurationUtils.readBooleanProperty(config, "null", false);
71+
assertThat(val, equalTo(false));
72+
}
73+
74+
public void testReadBooleanPropertyInvalidType() {
75+
try {
76+
ConfigurationUtils.readBooleanProperty(config, "arr", true);
77+
} catch (OpenSearchParseException e) {
78+
assertThat(e.getMessage(), equalTo("[arr] property isn't a boolean, but of type [java.util.Arrays$ArrayList]"));
79+
}
80+
}
81+
82+
public void testReadStringOrIntProperty() {
83+
String val1 = ConfigurationUtils.readStringOrIntProperty(config, "foo", null);
84+
String val2 = ConfigurationUtils.readStringOrIntProperty(config, "num", null);
85+
assertThat(val1, equalTo("bar"));
86+
assertThat(val2, equalTo("1"));
87+
}
88+
89+
public void testOptionalReadStringOrIntProperty() {
90+
String val1 = ConfigurationUtils.readOptionalStringOrIntProperty(config, "foo");
91+
String val2 = ConfigurationUtils.readOptionalStringOrIntProperty(config, "num");
92+
String val3 = ConfigurationUtils.readOptionalStringOrIntProperty(config, "num1");
93+
assertThat(val1, equalTo("bar"));
94+
assertThat(val2, equalTo("1"));
95+
assertThat(val3, equalTo(null));
96+
}
97+
98+
public void testReadIntProperty() {
99+
int val = ConfigurationUtils.readIntProperty(config, "num", null);
100+
assertThat(val, equalTo(1));
101+
try {
102+
ConfigurationUtils.readIntProperty(config, "foo", 2);
103+
} catch (OpenSearchParseException e) {
104+
assertThat(e.getMessage(), equalTo("[foo] property cannot be converted to an int [bar]"));
105+
}
106+
try {
107+
ConfigurationUtils.readIntProperty(config, "foo1", 2);
108+
} catch (OpenSearchParseException e) {
109+
assertThat(e.getMessage(), equalTo("required property is missing"));
110+
}
111+
}
112+
113+
public void testReadDoubleProperty() {
114+
double val = ConfigurationUtils.readDoubleProperty(config, "double");
115+
assertThat(val, equalTo(1.0));
116+
try {
117+
ConfigurationUtils.readDoubleProperty(config, "foo");
118+
} catch (OpenSearchParseException e) {
119+
assertThat(e.getMessage(), equalTo("[foo] property cannot be converted to a double [bar]"));
120+
}
121+
try {
122+
ConfigurationUtils.readDoubleProperty(config, "foo1");
123+
} catch (OpenSearchParseException e) {
124+
assertThat(e.getMessage(), equalTo("[foo1] required property is missing"));
125+
}
126+
}
127+
128+
public void testReadStringOrIntPropertyInvalidType() {
129+
try {
130+
ConfigurationUtils.readStringOrIntProperty(config, "arr", null);
131+
} catch (OpenSearchParseException e) {
132+
assertThat(e.getMessage(), equalTo("[arr] property isn't a string or int, but of type [java.util.Arrays$ArrayList]"));
133+
}
134+
}
135+
136+
}

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,17 @@
88

99
package org.opensearch.plugin.kafka;
1010

11+
import org.opensearch.core.util.ConfigurationUtils;
12+
1113
import java.util.Map;
12-
import java.util.Objects;
1314

1415
/**
1516
* Class encapsulating the configuration of a Kafka source.
1617
*/
1718
public class KafkaSourceConfig {
19+
private final String PROP_TOPIC = "topic";
20+
private final String PROP_BOOTSTRAP_SERVERS = "bootstrap_servers";
21+
1822
private final String topic;
1923
private final String bootstrapServers;
2024

@@ -23,10 +27,8 @@ public class KafkaSourceConfig {
2327
* @param params the configuration parameters
2428
*/
2529
public KafkaSourceConfig(Map<String, Object> params) {
26-
// TODO: better parsing and validation
27-
this.topic = (String) Objects.requireNonNull(params.get("topic"));
28-
this.bootstrapServers = (String) Objects.requireNonNull(params.get("bootstrap_servers"));
29-
assert this.bootstrapServers != null;
30+
this.topic = ConfigurationUtils.readStringProperty(params, PROP_TOPIC);
31+
this.bootstrapServers = ConfigurationUtils.readStringProperty(params, PROP_BOOTSTRAP_SERVERS);
3032
}
3133

3234
/**

0 commit comments

Comments
 (0)