diff --git a/common/build.gradle b/common/build.gradle index 533fccd9b2e..369a649cde0 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -35,7 +35,7 @@ dependencies { api "org.antlr:antlr4-runtime:4.7.1" api group: 'com.google.guava', name: 'guava', version: '31.0.1-jre' api group: 'org.apache.logging.log4j', name: 'log4j-core', version:'2.17.1' - api group: 'org.apache.commons', name: 'commons-lang3', version: '3.10' + api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' testImplementation group: 'junit', name: 'junit', version: '4.13.2' testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.9.1' diff --git a/core/build.gradle b/core/build.gradle index 0b8ffc422cf..04ee45da772 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -37,12 +37,14 @@ dependencies { api group: 'com.google.guava', name: 'guava', version: '31.0.1-jre' api group: 'org.springframework', name: 'spring-context', version: "${spring_version}" api group: 'org.springframework', name: 'spring-beans', version: "${spring_version}" - api group: 'org.apache.commons', name: 'commons-lang3', version: '3.10' + api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' api group: 'com.facebook.presto', name: 'presto-matching', version: '0.240' api group: 'org.apache.commons', name: 'commons-math3', version: '3.6.1' api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}" api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}" + api group: 'com.google.code.gson', name: 'gson', version: '2.8.9' + implementation 'com.amazonaws:aws-encryption-sdk-java:2.4.0' api project(':common') testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index 228b54ba0ca..45d139018b1 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -64,7 +64,7 @@ import org.opensearch.sql.data.model.ExprMissingValue; import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.datasource.DataSourceService; -import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.exception.SemanticCheckException; import org.opensearch.sql.expression.DSL; import org.opensearch.sql.expression.Expression; @@ -134,9 +134,9 @@ public LogicalPlan analyze(UnresolvedPlan unresolved, AnalysisContext context) { @Override public LogicalPlan visitRelation(Relation node, AnalysisContext context) { QualifiedName qualifiedName = node.getTableQualifiedName(); - Set allowedDataSourceNames = dataSourceService.getDataSources() + Set allowedDataSourceNames = dataSourceService.getMaskedDataSourceMetadataInfo() .stream() - .map(DataSource::getName) + .map(DataSourceMetadata::getName) .collect(Collectors.toSet()); DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver = new DataSourceSchemaIdentifierNameResolver(qualifiedName.getParts(), @@ -182,9 +182,9 @@ public LogicalPlan visitRelationSubquery(RelationSubquery node, AnalysisContext @Override public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext context) { QualifiedName qualifiedName = node.getFunctionName(); - Set allowedDataSourceNames = dataSourceService.getDataSources() + Set allowedDataSourceNames = dataSourceService.getMaskedDataSourceMetadataInfo() .stream() - .map(DataSource::getName) + .map(DataSourceMetadata::getName) .collect(Collectors.toSet()); DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver = new DataSourceSchemaIdentifierNameResolver(qualifiedName.getParts(), diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceMetadataStorage.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceMetadataStorage.java new file mode 100644 index 00000000000..59837c198a2 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceMetadataStorage.java @@ -0,0 +1,25 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.datasource; + +import java.util.List; +import java.util.Optional; +import org.opensearch.sql.datasource.model.DataSourceMetadata; + +/** + * Interface for DataSourceMetadata Storage. + */ +public interface DataSourceMetadataStorage { + + List getDataSourceMetadata(); + + Optional getDataSourceMetadata(String datasourceName); + + void createDataSourceMetadata(DataSourceMetadata dataSourceMetadata); + +} diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java index 37e6f8e085a..f8429e796ec 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java +++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java @@ -19,7 +19,7 @@ public interface DataSourceService { * * @return set of {@link DataSource}. */ - Set getDataSources(); + Set getMaskedDataSourceMetadataInfo(); /** * Returns {@link DataSource} with corresponding to the DataSource name. @@ -36,6 +36,7 @@ public interface DataSourceService { */ void addDataSource(DataSourceMetadata... metadatas); + /** * remove all the registered {@link DataSource}. */ diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java index 274024e548e..cfe258002e8 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java +++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java @@ -5,14 +5,16 @@ package org.opensearch.sql.datasource; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; +import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; + +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.HashSet; import java.util.Map; -import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import org.opensearch.sql.common.utils.StringUtils; import org.opensearch.sql.datasource.model.DataSource; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; @@ -28,43 +30,63 @@ */ public class DataSourceServiceImpl implements DataSourceService { - private static String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*"; - - private final ConcurrentHashMap dataSourceMap; + private final ConcurrentHashMap dataSourceMap; private final Map dataSourceFactoryMap; + private final DataSourceMetadataStorage dataSourceMetadataStorage; + + private final String clusterName; + /** * Construct from the set of {@link DataSourceFactory} at bootstrap time. */ - public DataSourceServiceImpl(Set dataSourceFactories) { + public DataSourceServiceImpl(DataSourceMetadataStorage dataSourceMetadataStorage, + Set dataSourceFactories, + String clusterName) { + this.dataSourceMetadataStorage = dataSourceMetadataStorage; dataSourceFactoryMap = dataSourceFactories.stream() .collect(Collectors.toMap(DataSourceFactory::getDataSourceType, f -> f)); dataSourceMap = new ConcurrentHashMap<>(); + this.clusterName = clusterName; + } @Override - public Set getDataSources() { - return Set.copyOf(dataSourceMap.values()); + public Set getMaskedDataSourceMetadataInfo() { + return new HashSet<>(this.dataSourceMetadataStorage.getDataSourceMetadata()); } @Override public DataSource getDataSource(String dataSourceName) { - if (!dataSourceMap.containsKey(dataSourceName)) { - throw new IllegalArgumentException( - String.format("DataSource with name %s doesn't exist.", dataSourceName)); + Optional dataSourceMetadataOptional + = this.dataSourceMetadataStorage.getDataSourceMetadata(dataSourceName); + if (dataSourceMetadataOptional.isEmpty()) { + if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) { + return dataSourceMap.get(DataSourceMetadata.defaultOpenSearchDataSourceMetadata()); + } else { + throw new IllegalArgumentException( + String.format("DataSource with name %s doesn't exist.", dataSourceName)); + } + } else if (!dataSourceMap.containsKey(dataSourceMetadataOptional.get())) { + clearDataSource(dataSourceMetadataOptional.get()); + addDataSource(dataSourceMetadataOptional.get()); } - return dataSourceMap.get(dataSourceName); + return dataSourceMap.get(dataSourceMetadataOptional.get()); + } + + private void clearDataSource(DataSourceMetadata dataSourceMetadata) { + dataSourceMap.entrySet() + .removeIf(entry -> entry.getKey().getName().equals(dataSourceMetadata.getName())); } @Override public void addDataSource(DataSourceMetadata... metadatas) { for (DataSourceMetadata metadata : metadatas) { - validateDataSourceMetaData(metadata); - dataSourceMap.put( - metadata.getName(), - dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata)); + AccessController.doPrivileged((PrivilegedAction) () -> dataSourceMap.put( + metadata, + dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata, clusterName))); } } @@ -72,28 +94,4 @@ public void addDataSource(DataSourceMetadata... metadatas) { public void clear() { dataSourceMap.clear(); } - - /** - * This can be moved to a different validator class when we introduce more connectors. - * - * @param metadata {@link DataSourceMetadata}. - */ - private void validateDataSourceMetaData(DataSourceMetadata metadata) { - Preconditions.checkArgument( - !Strings.isNullOrEmpty(metadata.getName()), - "Missing Name Field from a DataSource. Name is a required parameter."); - Preconditions.checkArgument( - !dataSourceMap.containsKey(metadata.getName()), - StringUtils.format( - "Datasource name should be unique, Duplicate datasource found %s.", - metadata.getName())); - Preconditions.checkArgument( - metadata.getName().matches(DATASOURCE_NAME_REGEX), - StringUtils.format( - "DataSource Name: %s contains illegal characters. Allowed characters: a-zA-Z0-9_-*@.", - metadata.getName())); - Preconditions.checkArgument( - !Objects.isNull(metadata.getProperties()), - "Missing properties field in catalog configuration. Properties are required parameters."); - } } diff --git a/core/src/main/java/org/opensearch/sql/datasource/encryptor/CredentialInfoEncryptor.java b/core/src/main/java/org/opensearch/sql/datasource/encryptor/CredentialInfoEncryptor.java new file mode 100644 index 00000000000..d71c0b15e50 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/datasource/encryptor/CredentialInfoEncryptor.java @@ -0,0 +1,16 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.datasource.encryptor; + +public interface CredentialInfoEncryptor { + + String encrypt(String plainText); + + String decrypt(String encryptedText); + +} diff --git a/core/src/main/java/org/opensearch/sql/datasource/encryptor/CredentialInfoEncryptorImpl.java b/core/src/main/java/org/opensearch/sql/datasource/encryptor/CredentialInfoEncryptorImpl.java new file mode 100644 index 00000000000..512df77cfc6 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/datasource/encryptor/CredentialInfoEncryptorImpl.java @@ -0,0 +1,55 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.datasource.encryptor; + +import com.amazonaws.encryptionsdk.AwsCrypto; +import com.amazonaws.encryptionsdk.CommitmentPolicy; +import com.amazonaws.encryptionsdk.CryptoResult; +import com.amazonaws.encryptionsdk.jce.JceMasterKey; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import javax.crypto.spec.SecretKeySpec; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public class CredentialInfoEncryptorImpl implements CredentialInfoEncryptor { + + private final String masterKey; + + @Override + public String encrypt(String plainText) { + + final AwsCrypto crypto = AwsCrypto.builder() + .withCommitmentPolicy(CommitmentPolicy.RequireEncryptRequireDecrypt) + .build(); + + JceMasterKey jceMasterKey + = JceMasterKey.getInstance(new SecretKeySpec(masterKey.getBytes(), "AES"), "Custom", "", + "AES/GCM/NoPadding"); + + final CryptoResult encryptResult = crypto.encryptData(jceMasterKey, + plainText.getBytes(StandardCharsets.UTF_8)); + return Base64.getEncoder().encodeToString(encryptResult.getResult()); + } + + @Override + public String decrypt(String encryptedText) { + final AwsCrypto crypto = AwsCrypto.builder() + .withCommitmentPolicy(CommitmentPolicy.RequireEncryptRequireDecrypt) + .build(); + + JceMasterKey jceMasterKey + = JceMasterKey.getInstance(new SecretKeySpec(masterKey.getBytes(), "AES"), "Custom", "", + "AES/GCM/NoPadding"); + + final CryptoResult decryptedResult + = crypto.decryptData(jceMasterKey, Base64.getDecoder().decode(encryptedText)); + return new String(decryptedResult.getResult()); + } + +} diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScan.java b/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScan.java index 14cd09e1629..786a1c49240 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScan.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScan.java @@ -18,7 +18,7 @@ import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.model.ExprValueUtils; import org.opensearch.sql.datasource.DataSourceService; -import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.storage.TableScanOperator; /** @@ -47,14 +47,15 @@ public String explain() { @Override public void open() { List exprValues = new ArrayList<>(); - Set dataSources = dataSourceService.getDataSources(); - for (DataSource dataSource : dataSources) { + Set dataSourceMetadataSet + = dataSourceService.getMaskedDataSourceMetadataInfo(); + for (DataSourceMetadata dataSourceMetadata : dataSourceMetadataSet) { exprValues.add( new ExprTupleValue(new LinkedHashMap<>(ImmutableMap.of( "DATASOURCE_NAME", - ExprValueUtils.stringValue(dataSource.getName()), + ExprValueUtils.stringValue(dataSourceMetadata.getName()), "CONNECTOR_TYPE", - ExprValueUtils.stringValue(dataSource.getConnectorType().name()))))); + ExprValueUtils.stringValue(dataSourceMetadata.getConnector().name()))))); } iterator = exprValues.iterator(); } diff --git a/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java b/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java index 20d263e601f..7f64d44c959 100644 --- a/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java +++ b/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java @@ -27,5 +27,5 @@ public interface DataSourceFactory { /** * Create {@link DataSource}. */ - DataSource createDataSource(DataSourceMetadata metadata); + DataSource createDataSource(DataSourceMetadata metadata, String clusterName); } diff --git a/integ-test/build.gradle b/integ-test/build.gradle index b792ecb18f5..540bd666b36 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -75,6 +75,8 @@ configurations.all { resolutionStrategy.force "org.apache.httpcomponents:httpcore:4.4.13" resolutionStrategy.force "joda-time:joda-time:2.10.12" resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36" + resolutionStrategy.force "org.apache.httpcomponents:httpcore:4.4.15" + resolutionStrategy.force "org.apache.httpcomponents:httpclient:4.5.13" } dependencies { diff --git a/legacy/build.gradle b/legacy/build.gradle index db9d6138f06..d3ee13370ee 100644 --- a/legacy/build.gradle +++ b/legacy/build.gradle @@ -90,7 +90,7 @@ dependencies { } implementation group: 'com.google.guava', name: 'guava', version: '31.0.1-jre' implementation group: 'org.json', name: 'json', version:'20180813' - implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.10' + implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}" // add geo module as dependency. https://github.com/opensearch-project/OpenSearch/pull/4180/. implementation group: 'org.opensearch.plugin', name: 'geo', version: "${opensearch_version}" diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactory.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactory.java index 011f6236fb1..c8bf9f452b3 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactory.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactory.java @@ -27,7 +27,7 @@ public DataSourceType getDataSourceType() { } @Override - public DataSource createDataSource(DataSourceMetadata metadata) { + public DataSource createDataSource(DataSourceMetadata metadata, String clusterName) { return new DataSource(metadata.getName(), DataSourceType.OPENSEARCH, new OpenSearchStorageEngine(client, settings)); } diff --git a/plugin/build.gradle b/plugin/build.gradle index 4e9093cbb2e..2b66abf6a37 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -98,6 +98,8 @@ configurations.all { resolutionStrategy.force "com.squareup.okhttp3:okhttp:4.9.3" resolutionStrategy.force "joda-time:joda-time:2.10.12" resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36" + resolutionStrategy.force "org.apache.httpcomponents:httpcore:4.4.15" + resolutionStrategy.force "org.apache.httpcomponents:httpclient:4.5.13" } compileJava { options.compilerArgs.addAll(["-processor", 'lombok.launch.AnnotationProcessorHider$AnnotationProcessor']) @@ -112,6 +114,7 @@ dependencies { api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}" api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}" + implementation group: 'commons-io', name: 'commons-io', version: '2.8.0' api project(":ppl") api project(':legacy') diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index fab14966d8d..63aa5060b8b 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -51,8 +51,11 @@ import org.opensearch.script.ScriptContext; import org.opensearch.script.ScriptEngine; import org.opensearch.script.ScriptService; +import org.opensearch.sql.datasource.DataSourceMetadataStorage; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.DataSourceServiceImpl; +import org.opensearch.sql.datasource.encryptor.CredentialInfoEncryptor; +import org.opensearch.sql.datasource.encryptor.CredentialInfoEncryptorImpl; import org.opensearch.sql.datasource.model.DataSource; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.legacy.esdomain.LocalClusterState; @@ -68,13 +71,19 @@ import org.opensearch.sql.opensearch.storage.script.ExpressionScriptEngine; import org.opensearch.sql.opensearch.storage.serialization.DefaultExpressionSerializer; import org.opensearch.sql.plugin.config.OpenSearchPluginConfig; +import org.opensearch.sql.plugin.datasource.OpenSearchDataSourceMetadataStorage; import org.opensearch.sql.plugin.datasource.DataSourceSettings; import org.opensearch.sql.plugin.rest.RestPPLQueryAction; import org.opensearch.sql.plugin.rest.RestPPLStatsAction; import org.opensearch.sql.plugin.rest.RestQuerySettingsAction; +import org.opensearch.sql.plugin.rest.datasource.RestDataSourceQueryAction; import org.opensearch.sql.plugin.transport.PPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; +import org.opensearch.sql.plugin.transport.datasource.TransportCreateDataSourceAction; +import org.opensearch.sql.plugin.transport.datasource.TransportGetDataSourceAction; +import org.opensearch.sql.plugin.transport.datasource.model.CreateDataSourceActionResponse; +import org.opensearch.sql.plugin.transport.datasource.model.GetDataSourceActionResponse; import org.opensearch.sql.ppl.config.PPLServiceConfig; import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; import org.opensearch.sql.sql.config.SQLServiceConfig; @@ -102,6 +111,10 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, Rel private DataSourceService dataSourceService; + private DataSourceMetadataStorage dataSourceMetadataStorage; + + private CredentialInfoEncryptor credentialInfoEncryptor; + public String name() { return "sql"; } @@ -130,7 +143,8 @@ public List getRestHandlers( new RestSqlAction(settings, applicationContext), new RestSqlStatsAction(settings, restController), new RestPPLStatsAction(settings, restController), - new RestQuerySettingsAction(settings, restController)); + new RestQuerySettingsAction(settings, restController), + new RestDataSourceQueryAction()); } /** @@ -141,7 +155,11 @@ public List getRestHandlers( return Arrays.asList( new ActionHandler<>( new ActionType<>(PPLQueryAction.NAME, TransportPPLQueryResponse::new), - TransportPPLQueryAction.class)); + TransportPPLQueryAction.class), + new ActionHandler<>(new ActionType<>(TransportCreateDataSourceAction.NAME, + CreateDataSourceActionResponse::new), TransportCreateDataSourceAction.class), + new ActionHandler<>(new ActionType<>(TransportGetDataSourceAction.NAME, + GetDataSourceActionResponse::new), TransportGetDataSourceAction.class)); } @Override @@ -160,15 +178,19 @@ public Collection createComponents( this.clusterService = clusterService; this.pluginSettings = new OpenSearchSettings(clusterService.getClusterSettings()); this.client = (NodeClient) client; + String masterKey = DataSourceSettings + .DATASOURCE_MASTER_SECRET_KEY.get(clusterService.getSettings()); + this.credentialInfoEncryptor = new CredentialInfoEncryptorImpl(masterKey); + this.dataSourceMetadataStorage + = new OpenSearchDataSourceMetadataStorage(client, clusterService, credentialInfoEncryptor); this.dataSourceService = - new DataSourceServiceImpl( + new DataSourceServiceImpl(dataSourceMetadataStorage, new ImmutableSet.Builder() .add(new OpenSearchDataSourceFactory( new OpenSearchNodeClient(this.client), pluginSettings)) .add(new PrometheusStorageFactory()) - .build()); + .build(), client.settings().get("cluster.name", "opensearch")); dataSourceService.addDataSource(defaultOpenSearchDataSourceMetadata()); - loadDataSources(dataSourceService, clusterService.getSettings()); LocalClusterState.state().setClusterService(clusterService); LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); @@ -181,6 +203,10 @@ public Collection createComponents( org.opensearch.sql.common.setting.Settings.class, () -> pluginSettings); applicationContext.registerBean( DataSourceService.class, () -> dataSourceService); + applicationContext.registerBean( + DataSourceMetadataStorage.class, () -> dataSourceMetadataStorage); + applicationContext.registerBean( + CredentialInfoEncryptor.class, () -> credentialInfoEncryptor); applicationContext.register(OpenSearchPluginConfig.class); applicationContext.register(PPLServiceConfig.class); applicationContext.register(SQLServiceConfig.class); @@ -210,6 +236,7 @@ public List> getSettings() { .addAll(LegacyOpenDistroSettings.legacySettings()) .addAll(OpenSearchSettings.pluginSettings()) .add(DataSourceSettings.DATASOURCE_CONFIG) + .add(DataSourceSettings.DATASOURCE_MASTER_SECRET_KEY) .build(); } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceSettings.java b/plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceSettings.java index 9a3466df459..a451ad30be9 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceSettings.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceSettings.java @@ -14,4 +14,10 @@ public class DataSourceSettings { public static final Setting DATASOURCE_CONFIG = SecureSetting.secureFile( "plugins.query.federation.datasources.config", null); + + public static final Setting DATASOURCE_MASTER_SECRET_KEY = Setting.simpleString( + "plugins.query.datasources.encryption.masterkey", + "0000000000000000", + Setting.Property.NodeScope, + Setting.Property.Dynamic); } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/datasource/OpenSearchDataSourceMetadataStorage.java b/plugin/src/main/java/org/opensearch/sql/plugin/datasource/OpenSearchDataSourceMetadataStorage.java new file mode 100644 index 00000000000..4ed20679589 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/datasource/OpenSearchDataSourceMetadataStorage.java @@ -0,0 +1,242 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.datasource; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.commons.io.IOUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionFuture; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.SearchHit; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.sql.common.utils.StringUtils; +import org.opensearch.sql.datasource.DataSourceMetadataStorage; +import org.opensearch.sql.datasource.encryptor.CredentialInfoEncryptor; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.auth.AuthenticationType; +import org.opensearch.sql.opensearch.security.SecurityAccess; + +public class OpenSearchDataSourceMetadataStorage implements DataSourceMetadataStorage { + + private static final String DATASOURCE_INDEX_NAME = ".ql-datasources"; + private static final String DATASOURCE_INDEX_MAPPING_FILE_NAME = "datasources-index-mapping.yml"; + private static final String DATASOURCE_INDEX_SETTINGS_FILE_NAME + = "datasources-index-settings.yml"; + private static String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*"; + private static final Logger LOG = LogManager.getLogger(); + + private final Client client; + private final ClusterService clusterService; + private final CredentialInfoEncryptor credentialInfoEncryptor; + + public OpenSearchDataSourceMetadataStorage(Client client, ClusterService clusterService, + CredentialInfoEncryptor credentialInfoEncryptor) { + this.client = client; + this.clusterService = clusterService; + this.credentialInfoEncryptor = credentialInfoEncryptor; + } + + @Override + public List getDataSourceMetadata() { + if (!this.clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) { + createDataSourcesIndex(); + } + return searchInElasticSearch(QueryBuilders.matchAllQuery()); + } + + @Override + public Optional getDataSourceMetadata(String datasourceName) { + if (!this.clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) { + createDataSourcesIndex(); + } + return searchInElasticSearch(QueryBuilders.termQuery("name", datasourceName)) + .stream() + .findFirst() + .map(this::decryptConfidentialInfo); + } + + private DataSourceMetadata decryptConfidentialInfo(DataSourceMetadata dataSourceMetadata) { + Map propertiesMap = dataSourceMetadata.getProperties(); + Optional authTypeOptional + = propertiesMap.keySet().stream().filter(s -> s.endsWith("auth.type")) + .findFirst() + .map(propertiesMap::get) + .map(AuthenticationType::get); + switch (authTypeOptional.get()) { + case BASICAUTH: + Optional passwordKey = propertiesMap.keySet().stream() + .filter(s -> s.endsWith("auth.password")) + .findFirst(); + propertiesMap.put(passwordKey.get(), + this.credentialInfoEncryptor.decrypt(propertiesMap.get(passwordKey.get()))); + break; + case AWSSIGV4AUTH: + Optional accessKey = propertiesMap.keySet().stream() + .filter(s -> s.endsWith("auth.access_key")) + .findFirst(); + propertiesMap.put(accessKey.get(), + this.credentialInfoEncryptor.decrypt(propertiesMap.get(accessKey.get()))); + Optional secretKey = propertiesMap.keySet().stream() + .filter(s -> s.endsWith("auth.secret_key")) + .findFirst(); + propertiesMap.put(secretKey.get(), + this.credentialInfoEncryptor.decrypt(propertiesMap.get(secretKey.get()))); + break; + default: + break; + } + return dataSourceMetadata; + } + + @Override + public void createDataSourceMetadata(DataSourceMetadata dataSourceMetadata) { + validateDataSourceMetaData(dataSourceMetadata); + encryptConfidentialData(dataSourceMetadata); + if (!this.clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) { + createDataSourcesIndex(); + } + IndexRequest indexRequest = new IndexRequest(DATASOURCE_INDEX_NAME); + indexRequest.id(dataSourceMetadata.getName()); + indexRequest.create(true); + ObjectMapper objectMapper = new ObjectMapper(); + ActionFuture indexResponseActionFuture = SecurityAccess.doPrivileged(() -> { + indexRequest.source(objectMapper.writeValueAsString(dataSourceMetadata), XContentType.JSON); + return client.index(indexRequest); + }); + IndexResponse indexResponse = indexResponseActionFuture.actionGet(); + if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) { + LOG.debug("DatasourceMetadata : {} successfully created", dataSourceMetadata); + } + } + + private void createDataSourcesIndex() { + try { + InputStream mappingFileStream = OpenSearchDataSourceMetadataStorage.class.getClassLoader() + .getResourceAsStream(DATASOURCE_INDEX_MAPPING_FILE_NAME); + InputStream settingsFileStream = OpenSearchDataSourceMetadataStorage.class.getClassLoader() + .getResourceAsStream(DATASOURCE_INDEX_SETTINGS_FILE_NAME); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(DATASOURCE_INDEX_NAME); + createIndexRequest + .mapping(IOUtils.toString(mappingFileStream, StandardCharsets.UTF_8), + XContentType.YAML) + .settings(IOUtils.toString(settingsFileStream, StandardCharsets.UTF_8), + XContentType.YAML); + ActionFuture createIndexResponseActionFuture + = client.admin().indices().create(createIndexRequest); + CreateIndexResponse createIndexResponse = createIndexResponseActionFuture.actionGet(); + if (createIndexResponse.isAcknowledged()) { + LOG.info("Index: {} creation Acknowledged", DATASOURCE_INDEX_NAME); + } else { + throw new IllegalStateException( + String.format("Index: %s creation not acknowledged", DATASOURCE_INDEX_NAME )); + } + } catch (Throwable e) { + throw new RuntimeException( + "Internal server error while creating" + DATASOURCE_INDEX_NAME + " index"); + } + } + + private void encryptConfidentialData(DataSourceMetadata dataSourceMetadata) { + Map propertiesMap = dataSourceMetadata.getProperties(); + Optional authTypeOptional + = propertiesMap.keySet().stream().filter(s -> s.endsWith("auth.type")) + .findFirst() + .map(propertiesMap::get) + .map(AuthenticationType::get); + switch (authTypeOptional.get()) { + case BASICAUTH: + Optional passwordKey = propertiesMap.keySet().stream() + .filter(s -> s.endsWith("auth.password")) + .findFirst(); + propertiesMap.put(passwordKey.get(), + this.credentialInfoEncryptor.encrypt(propertiesMap.get(passwordKey.get())).toString()); + break; + case AWSSIGV4AUTH: + Optional accessKey = propertiesMap.keySet().stream() + .filter(s -> s.endsWith("auth.access_key")) + .findFirst(); + propertiesMap.put(accessKey.get(), + this.credentialInfoEncryptor.encrypt(propertiesMap.get(accessKey.get()))); + Optional secretKey = propertiesMap.keySet().stream() + .filter(s -> s.endsWith("auth.secret_key")) + .findFirst(); + propertiesMap.put(secretKey.get(), + this.credentialInfoEncryptor.encrypt(propertiesMap.get(secretKey.get()))); + break; + + default: + break; + } + } + + + /** + * This can be moved to a different validator class when we introduce more connectors. + * + * @param metadata {@link DataSourceMetadata}. + */ + private void validateDataSourceMetaData(DataSourceMetadata metadata) { + Preconditions.checkArgument( + !Strings.isNullOrEmpty(metadata.getName()), + "Missing Name Field from a DataSource. Name is a required parameter."); + Preconditions.checkArgument( + metadata.getName().matches(DATASOURCE_NAME_REGEX), + StringUtils.format( + "DataSource Name: %s contains illegal characters. Allowed characters: a-zA-Z0-9_-*@.", + metadata.getName())); + Preconditions.checkArgument( + !Objects.isNull(metadata.getProperties()), + "Missing properties field in catalog configuration. Properties are required parameters."); + } + + + private List searchInElasticSearch(QueryBuilder query) { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(DATASOURCE_INDEX_NAME); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(query); + searchRequest.source(searchSourceBuilder); + ActionFuture searchResponseActionFuture = client.search(searchRequest); + SearchResponse searchResponse = searchResponseActionFuture.actionGet(); + if (searchResponse.status().getStatus() != 200) { + throw new RuntimeException("Internal server error while fetching datasource metadata information"); + } else { + ObjectMapper objectMapper = new ObjectMapper(); + return Arrays.stream(searchResponse + .getHits() + .getHits()) + .map(SearchHit::getSourceAsString) + .map(datasourceString + -> SecurityAccess.doPrivileged(() -> objectMapper.readValue(datasourceString, DataSourceMetadata.class))) + .collect(Collectors.toList()); + } + } + +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/datasource/RestDataSourceQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/datasource/RestDataSourceQueryAction.java new file mode 100644 index 00000000000..42ad5a775e9 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/datasource/RestDataSourceQueryAction.java @@ -0,0 +1,129 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.rest.datasource; + +import static org.opensearch.rest.RestRequest.Method.GET; +import static org.opensearch.rest.RestRequest.Method.POST; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.gson.Gson; +import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestStatus; +import org.opensearch.rest.action.RestToXContentListener; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.opensearch.security.SecurityAccess; +import org.opensearch.sql.plugin.transport.datasource.TransportCreateDataSourceAction; +import org.opensearch.sql.plugin.transport.datasource.TransportGetDataSourceAction; +import org.opensearch.sql.plugin.transport.datasource.model.CreateDataSourceActionRequest; +import org.opensearch.sql.plugin.transport.datasource.model.CreateDataSourceActionResponse; +import org.opensearch.sql.plugin.transport.datasource.model.GetDataSourceActionRequest; +import org.opensearch.sql.plugin.transport.datasource.model.GetDataSourceActionResponse; + +public class RestDataSourceQueryAction extends BaseRestHandler { + + public static final String DATASOURCE_ACTIONS = "datasource_actions"; + public static final String BASE_DATASOURCE_ACTION_URL = "/_plugins/_query/_datasources"; + + private static final Logger LOG = LogManager.getLogger(); + + @Override + public String getName() { + return DATASOURCE_ACTIONS; + } + + @Override + public List routes() { + return ImmutableList.of( + /** + * Create a new datasource + * Request URL: POST + * Request body: Ref [org.opensearch.sql.plugin.transport.datasource.model.CreateDataSourceActionRequest] + * Response body: Ref [org.opensearch.sql.plugin.transport.datasource.model.CreateDataSourceActionResponse] + */ + new Route(POST, BASE_DATASOURCE_ACTION_URL), + + /** + * GET datasources + * Request URL: GET + * Request body: Ref [org.opensearch.sql.plugin.transport.datasource.model.GetDataSourceActionRequest] + * Response body: Ref [org.opensearch.sql.plugin.transport.datasource.model.GetDataSourceActionResponse] + */ + new Route(GET, BASE_DATASOURCE_ACTION_URL) + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient) { + switch (restRequest.method()) { + case POST: + return executePostRequest(restRequest, nodeClient); + case GET: + return executeGetRequest(restRequest, nodeClient); + default: + return restChannel + -> restChannel.sendResponse(new BytesRestResponse(RestStatus.METHOD_NOT_ALLOWED, + String.valueOf(restRequest.method()))); + } + } + + private RestChannelConsumer executePostRequest(RestRequest restRequest, + NodeClient nodeClient) { + ObjectMapper objectMapper = new ObjectMapper(); + DataSourceMetadata dataSourceMetadata + = SecurityAccess.doPrivileged( + () -> objectMapper.readValue(restRequest.content().utf8ToString(), DataSourceMetadata.class)); + return restChannel -> nodeClient.execute(TransportCreateDataSourceAction.ACTION_TYPE, + new CreateDataSourceActionRequest(dataSourceMetadata), + new ActionListener<>() { + @Override + public void onResponse(CreateDataSourceActionResponse createDataSourceActionResponse) { + restChannel.sendResponse( + new BytesRestResponse(RestStatus.OK, "application/json; charset=UTF-8", + createDataSourceActionResponse.getResult())); + } + @Override + public void onFailure(Exception e) { + restChannel.sendResponse( + new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, + "application/json; charset=UTF-8", + e.getMessage())); + } + }); + } + + private RestChannelConsumer executeGetRequest(RestRequest restRequest, + NodeClient nodeClient) { + return restChannel -> nodeClient.execute(TransportGetDataSourceAction.ACTION_TYPE, + new GetDataSourceActionRequest(), + new ActionListener<>() { + @Override + public void onResponse(GetDataSourceActionResponse getDataSourceActionResponse) { + restChannel.sendResponse( + new BytesRestResponse(RestStatus.OK, "application/json; charset=UTF-8", + getDataSourceActionResponse.getResult())); + } + @Override + public void onFailure(Exception e) { + restChannel.sendResponse( + new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, + "application/json; charset=UTF-8", + e.getMessage())); + } + }); + } + +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportCreateDataSourceAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportCreateDataSourceAction.java new file mode 100644 index 00000000000..4685ec841ca --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportCreateDataSourceAction.java @@ -0,0 +1,53 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.transport.datasource; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionType; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.common.inject.Inject; +import org.opensearch.sql.datasource.DataSourceMetadataStorage; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.opensearch.security.SecurityAccess; +import org.opensearch.sql.plugin.transport.datasource.model.CreateDataSourceActionRequest; +import org.opensearch.sql.plugin.transport.datasource.model.CreateDataSourceActionResponse; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; + +public class TransportCreateDataSourceAction extends HandledTransportAction { + + public static final String NAME = "cluster:admin/opensearch/datasources/create"; + public static final ActionType + ACTION_TYPE = new ActionType<>(NAME, CreateDataSourceActionResponse::new); + + private AnnotationConfigApplicationContext applicationContext; + + @Inject + public TransportCreateDataSourceAction(TransportService transportService, + ActionFilters actionFilters, + AnnotationConfigApplicationContext applicationContext) { + super(TransportCreateDataSourceAction.NAME, transportService, actionFilters, CreateDataSourceActionRequest::new); + this.applicationContext = applicationContext; + } + + @Override + protected void doExecute(Task task, CreateDataSourceActionRequest request, + ActionListener actionListener) { + DataSourceService dataSourceService = this.applicationContext.getBean(DataSourceService.class); + DataSourceMetadataStorage dataSourceMetadataStorage = this.applicationContext.getBean(DataSourceMetadataStorage.class); + SecurityAccess.doPrivileged(() -> { + dataSourceMetadataStorage.createDataSourceMetadata(request.getDataSourceMetadata()); + dataSourceService.addDataSource(request.getDataSourceMetadata()); + return null; + }); + actionListener.onResponse(new CreateDataSourceActionResponse("Created DataSource")); + } + +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportGetDataSourceAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportGetDataSourceAction.java new file mode 100644 index 00000000000..f4ea55396d3 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportGetDataSourceAction.java @@ -0,0 +1,70 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.transport.datasource; + +import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; + +import java.util.List; +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionType; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.common.inject.Inject; +import org.opensearch.sql.datasource.DataSourceMetadataStorage; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.opensearch.security.SecurityAccess; +import org.opensearch.sql.plugin.transport.datasource.model.GetDataSourceActionRequest; +import org.opensearch.sql.plugin.transport.datasource.model.GetDataSourceActionResponse; +import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; + +public class TransportGetDataSourceAction + extends HandledTransportAction { + + public static final String NAME = "cluster:admin/opensearch/datasources/read"; + public static final ActionType + ACTION_TYPE = new ActionType<>(NAME, GetDataSourceActionResponse::new); + + + + private AnnotationConfigApplicationContext applicationContext; + + @Inject + public TransportGetDataSourceAction(TransportService transportService, + ActionFilters actionFilters, + AnnotationConfigApplicationContext applicationContext) { + super(TransportGetDataSourceAction.NAME, transportService, actionFilters, + GetDataSourceActionRequest::new); + this.applicationContext = applicationContext; + } + + @Override + protected void doExecute(Task task, GetDataSourceActionRequest request, + ActionListener actionListener) { + + try { + DataSourceMetadataStorage dataSourceMetadataStorage + = this.applicationContext.getBean(DataSourceMetadataStorage.class); + List dataSourceMetadataSet + = SecurityAccess.doPrivileged(dataSourceMetadataStorage::getDataSourceMetadata); + String responseContent = + new JsonResponseFormatter>(PRETTY) { + @Override + protected Object buildJsonObject(List response) { + return response; + } + }.format(dataSourceMetadataSet); + actionListener.onResponse(new GetDataSourceActionResponse(responseContent)); + } catch (Exception e) { + actionListener.onFailure(e); + } + + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/model/CreateDataSourceActionRequest.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/model/CreateDataSourceActionRequest.java new file mode 100644 index 00000000000..453661f276b --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/model/CreateDataSourceActionRequest.java @@ -0,0 +1,38 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.transport.datasource.model; + +import java.io.IOException; +import lombok.Getter; +import org.json.JSONObject; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; + +public class CreateDataSourceActionRequest + extends ActionRequest { + + @Getter + private DataSourceMetadata dataSourceMetadata; + + /** Constructor of CreateDataSourceActionRequest from StreamInput. */ + public CreateDataSourceActionRequest(StreamInput in) throws IOException { + super(in); + } + + public CreateDataSourceActionRequest(DataSourceMetadata dataSourceMetadata) { + this.dataSourceMetadata = dataSourceMetadata; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/model/CreateDataSourceActionResponse.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/model/CreateDataSourceActionResponse.java new file mode 100644 index 00000000000..483a116ce26 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/model/CreateDataSourceActionResponse.java @@ -0,0 +1,33 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.transport.datasource.model; + +import java.io.IOException; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.opensearch.action.ActionResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +@RequiredArgsConstructor +public class CreateDataSourceActionResponse + extends ActionResponse { + + @Getter + private final String result; + + public CreateDataSourceActionResponse(StreamInput in) throws IOException { + super(in); + result = in.readString(); + } + + @Override + public void writeTo(StreamOutput streamOutput) throws IOException { + streamOutput.writeString(result); + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/model/GetDataSourceActionRequest.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/model/GetDataSourceActionRequest.java new file mode 100644 index 00000000000..7724d82063c --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/model/GetDataSourceActionRequest.java @@ -0,0 +1,29 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.transport.datasource.model; + +import java.io.IOException; +import lombok.NoArgsConstructor; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; + +@NoArgsConstructor +public class GetDataSourceActionRequest extends ActionRequest { + + /** Constructor of GetDataSourceActionRequest from StreamInput. */ + public GetDataSourceActionRequest(StreamInput in) throws IOException { + super(in); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/model/GetDataSourceActionResponse.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/model/GetDataSourceActionResponse.java new file mode 100644 index 00000000000..8f24b07f07c --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/model/GetDataSourceActionResponse.java @@ -0,0 +1,33 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.transport.datasource.model; + +import java.io.IOException; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.opensearch.action.ActionResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +@RequiredArgsConstructor +public class GetDataSourceActionResponse extends ActionResponse { + + @Getter + private final String result; + + public GetDataSourceActionResponse(StreamInput in) throws IOException { + super(in); + result = in.readString(); + } + + @Override + public void writeTo(StreamOutput streamOutput) throws IOException { + streamOutput.writeString(result); + } + +} diff --git a/plugin/src/main/resources/datasources-index-mapping.yml b/plugin/src/main/resources/datasources-index-mapping.yml new file mode 100644 index 00000000000..584ef004812 --- /dev/null +++ b/plugin/src/main/resources/datasources-index-mapping.yml @@ -0,0 +1,19 @@ +--- +## +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +## + +# Schema file for the observability index +# Since we only search based on "access", sort on lastUpdatedTimeMs & createdTimeMs, +# other fields are not used in mapping to avoid index on those fields. +# Also "dynamic" is set to "false" so that other fields can be added. +dynamic: false +properties: + name: + type: text + fields: + keyword: + type: keyword + connectorType: + type: keyword \ No newline at end of file diff --git a/plugin/src/main/resources/datasources-index-settings.yml b/plugin/src/main/resources/datasources-index-settings.yml new file mode 100644 index 00000000000..c01b2b33764 --- /dev/null +++ b/plugin/src/main/resources/datasources-index-settings.yml @@ -0,0 +1,11 @@ +--- +## +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +## + +# Settings file for the observability index +index: + number_of_shards: "1" + auto_expand_replicas: "0-2" + number_of_replicas: "0" \ No newline at end of file diff --git a/prometheus/build.gradle b/prometheus/build.gradle index 7cf1e56085e..ca70813e58b 100644 --- a/prometheus/build.gradle +++ b/prometheus/build.gradle @@ -22,6 +22,8 @@ dependencies { implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-cbor', version: "${versions.jackson}" implementation group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3' implementation 'com.github.babbel:okhttp-aws-signer:1.0.2' + implementation group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.1' + implementation group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.1' implementation group: 'org.json', name: 'json', version: '20180813' testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/authinterceptors/AwsSigningInterceptor.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/authinterceptors/AwsSigningInterceptor.java index f3d91c55a28..7ebbaf5f2a6 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/authinterceptors/AwsSigningInterceptor.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/authinterceptors/AwsSigningInterceptor.java @@ -7,6 +7,9 @@ package org.opensearch.sql.prometheus.authinterceptors; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; import com.babbel.mobile.android.commons.okhttpawssigner.OkHttpAwsV4Signer; import java.io.IOException; import java.time.ZoneId; @@ -16,29 +19,29 @@ import okhttp3.Interceptor; import okhttp3.Request; import okhttp3.Response; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class AwsSigningInterceptor implements Interceptor { private OkHttpAwsV4Signer okHttpAwsV4Signer; - private String accessKey; + private AWSCredentialsProvider awsCredentialsProvider; - private String secretKey; + private static final Logger LOG = LogManager.getLogger(); /** * AwsSigningInterceptor which intercepts http requests * and adds required headers for sigv4 authentication. * - * @param accessKey accessKey. - * @param secretKey secretKey. + * @param awsCredentialsProvider AWSCredentialsProvider. * @param region region. * @param serviceName serviceName. */ - public AwsSigningInterceptor(@NonNull String accessKey, @NonNull String secretKey, + public AwsSigningInterceptor(@NonNull AWSCredentialsProvider awsCredentialsProvider, @NonNull String region, @NonNull String serviceName) { this.okHttpAwsV4Signer = new OkHttpAwsV4Signer(region, serviceName); - this.accessKey = accessKey; - this.secretKey = secretKey; + this.awsCredentialsProvider = awsCredentialsProvider; } @Override @@ -48,11 +51,23 @@ public Response intercept(Interceptor.Chain chain) throws IOException { DateTimeFormatter timestampFormat = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'") .withZone(ZoneId.of("GMT")); - Request newRequest = request.newBuilder() + Request.Builder newRequestBuilder = request.newBuilder() .addHeader("x-amz-date", timestampFormat.format(ZonedDateTime.now())) - .addHeader("host", request.url().host()) - .build(); - Request signed = okHttpAwsV4Signer.sign(newRequest, accessKey, secretKey); + .addHeader("host", request.url().host()); + AWSCredentials awsCredentials = awsCredentialsProvider.getCredentials(); + + if (awsCredentialsProvider instanceof STSAssumeRoleSessionCredentialsProvider) { + newRequestBuilder.addHeader("x-amz-security-token", + ((STSAssumeRoleSessionCredentialsProvider) awsCredentialsProvider).getCredentials().getSessionToken()); + LOG.info("x-amz-security-token: {}", + ((STSAssumeRoleSessionCredentialsProvider) awsCredentialsProvider).getCredentials().getSessionToken()); + } + Request newRequest = newRequestBuilder.build(); + LOG.info(awsCredentialsProvider.getClass()); + LOG.info("Access Key: {}", awsCredentials.getAWSAccessKeyId()); + LOG.info("Secret Key: {}", awsCredentials.getAWSSecretKey()); + Request signed = okHttpAwsV4Signer.sign(newRequest, + awsCredentials.getAWSAccessKeyId(), awsCredentials.getAWSSecretKey()); return chain.proceed(signed); } diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java index 4e8b30af2fa..c7ed8a8e877 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java @@ -7,6 +7,8 @@ package org.opensearch.sql.prometheus.storage; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; import java.net.URI; import java.net.URISyntaxException; import java.util.HashSet; @@ -41,18 +43,19 @@ public DataSourceType getDataSourceType() { } @Override - public DataSource createDataSource(DataSourceMetadata metadata) { + public DataSource createDataSource(DataSourceMetadata metadata, String clusterName) { return new DataSource( metadata.getName(), DataSourceType.PROMETHEUS, - getStorageEngine(metadata.getName(), metadata.getProperties())); + getStorageEngine(metadata.getName(), metadata.getProperties(), clusterName)); } - StorageEngine getStorageEngine(String catalogName, Map requiredConfig) { + StorageEngine getStorageEngine(String catalogName, Map requiredConfig, + String clusterName) { validateFieldsInConfig(requiredConfig, Set.of(URI)); PrometheusClient prometheusClient; try { - prometheusClient = new PrometheusClientImpl(getHttpClient(requiredConfig), + prometheusClient = new PrometheusClientImpl(getHttpClient(requiredConfig, clusterName), new URI(requiredConfig.get(URI))); } catch (URISyntaxException e) { throw new RuntimeException( @@ -62,7 +65,8 @@ StorageEngine getStorageEngine(String catalogName, Map requiredC } - private OkHttpClient getHttpClient(Map config) { + private OkHttpClient getHttpClient(Map config, String clusterName) { + OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder(); okHttpClient.callTimeout(1, TimeUnit.MINUTES); okHttpClient.connectTimeout(30, TimeUnit.SECONDS); @@ -75,7 +79,8 @@ private OkHttpClient getHttpClient(Map config) { } else if (AuthenticationType.AWSSIGV4AUTH.equals(authenticationType)) { validateFieldsInConfig(config, Set.of(REGION, ACCESS_KEY, SECRET_KEY)); okHttpClient.addInterceptor(new AwsSigningInterceptor( - config.get(ACCESS_KEY), config.get(SECRET_KEY), + new AWSStaticCredentialsProvider( + new BasicAWSCredentials(config.get(ACCESS_KEY), config.get(SECRET_KEY))), config.get(REGION), "aps")); } else { throw new IllegalArgumentException(