Skip to content

Commit

Permalink
fix insert data bug
Browse files Browse the repository at this point in the history
  • Loading branch information
FANNG1 committed May 6, 2024
1 parent 14297cd commit c7de58c
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
package com.datastrato.gravitino.spark.connector;

import java.util.Map;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/** Transform table properties between Gravitino and Spark. */
public interface PropertiesConverter {
Map<String, String> toSparkCatalogProperties(
CaseInsensitiveStringMap options, Map<String, String> properties);

Map<String, String> toGravitinoTableProperties(Map<String, String> properties);

Map<String, String> toSparkTableProperties(Map<String, String> properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,11 @@
package com.datastrato.gravitino.spark.connector.hive;

import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig;
import com.datastrato.gravitino.spark.connector.PropertiesConverter;
import com.datastrato.gravitino.spark.connector.SparkTransformConverter;
import com.datastrato.gravitino.spark.connector.catalog.BaseCatalog;
import com.datastrato.gravitino.spark.connector.table.SparkBaseTable;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
Expand All @@ -25,17 +21,9 @@ public class GravitinoHiveCatalog extends BaseCatalog {
@Override
protected TableCatalog createAndInitSparkCatalog(
String name, CaseInsensitiveStringMap options, Map<String, String> properties) {
Preconditions.checkArgument(properties != null, "Hive Catalog properties should not be null");
String metastoreUri = properties.get(GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI);
Preconditions.checkArgument(
StringUtils.isNotBlank(metastoreUri),
"Couldn't get "
+ GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI
+ " from hive catalog properties");

TableCatalog hiveCatalog = new HiveTableCatalog();
HashMap<String, String> all = new HashMap<>(options);
all.put(GravitinoSparkConfig.SPARK_HIVE_METASTORE_URI, metastoreUri);
Map<String, String> all =
HivePropertiesConverter.getInstance().toSparkCatalogProperties(options, properties);
hiveCatalog.initialize(name, new CaseInsensitiveStringMap(all));

return hiveCatalog;
Expand All @@ -54,7 +42,7 @@ protected SparkBaseTable createSparkTable(

@Override
protected PropertiesConverter getPropertiesConverter() {
return new HivePropertiesConverter();
return HivePropertiesConverter.getInstance();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,32 @@
package com.datastrato.gravitino.spark.connector.hive;

import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata;
import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig;
import com.datastrato.gravitino.spark.connector.PropertiesConverter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.ws.rs.NotSupportedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/** Transform hive catalog properties between Spark and Gravitino. */
public class HivePropertiesConverter implements PropertiesConverter {
public static class HivePropertiesConverterHolder {
private static final HivePropertiesConverter INSTANCE = new HivePropertiesConverter();
}

private HivePropertiesConverter() {}

public static HivePropertiesConverter getInstance() {
return HivePropertiesConverterHolder.INSTANCE;
}

// Transform Spark hive file format to Gravitino hive file format
static final Map<String, String> fileFormatMap =
Expand Down Expand Up @@ -48,6 +62,22 @@ public class HivePropertiesConverter implements PropertiesConverter {
HivePropertiesConstants.GRAVITINO_HIVE_TABLE_LOCATION,
HivePropertiesConstants.SPARK_HIVE_LOCATION);

@Override
public Map<String, String> toSparkCatalogProperties(
CaseInsensitiveStringMap options, Map<String, String> properties) {
Preconditions.checkArgument(properties != null, "Hive Catalog properties should not be null");
String metastoreUri = properties.get(GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI);
Preconditions.checkArgument(
StringUtils.isNotBlank(metastoreUri),
"Couldn't get "
+ GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI
+ " from hive catalog properties");
HashMap<String, String> all = new HashMap<>(properties);
all.putAll(options);
all.put(GravitinoSparkConfig.SPARK_HIVE_METASTORE_URI, metastoreUri);
return all;
}

/**
* CREATE TABLE xxx STORED AS PARQUET will save "hive.stored-as" = "PARQUET" in property.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@
import com.datastrato.gravitino.spark.connector.SparkTransformConverter;
import com.datastrato.gravitino.spark.connector.catalog.BaseCatalog;
import com.datastrato.gravitino.spark.connector.table.SparkBaseTable;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
Expand All @@ -36,32 +32,10 @@ public class GravitinoIcebergCatalog extends BaseCatalog implements FunctionCata
@Override
protected TableCatalog createAndInitSparkCatalog(
String name, CaseInsensitiveStringMap options, Map<String, String> properties) {
Preconditions.checkArgument(
properties != null, "Iceberg Catalog properties should not be null");

String catalogBackend =
properties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND);
Preconditions.checkArgument(
StringUtils.isNotBlank(catalogBackend), "Iceberg Catalog backend should not be empty.");

HashMap<String, String> all = new HashMap<>(options);

switch (catalogBackend.toLowerCase(Locale.ENGLISH)) {
case IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE:
initHiveProperties(catalogBackend, properties, all);
break;
case IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC:
initJdbcProperties(catalogBackend, properties, all);
break;
default:
// SparkCatalog does not support Memory type catalog
throw new IllegalArgumentException(
"Unsupported Iceberg Catalog backend: " + catalogBackend);
}

Map<String, String> all =
IcebergPropertiesConverter.getInstance().toSparkCatalogProperties(options, properties);
TableCatalog icebergCatalog = new SparkCatalog();
icebergCatalog.initialize(name, new CaseInsensitiveStringMap(all));

return icebergCatalog;
}

Expand All @@ -78,7 +52,7 @@ protected SparkBaseTable createSparkTable(

@Override
protected PropertiesConverter getPropertiesConverter() {
return new IcebergPropertiesConverter();
return IcebergPropertiesConverter.getInstance();
}

@Override
Expand All @@ -95,79 +69,4 @@ public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceExce
public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException {
return ((SparkCatalog) sparkCatalog).loadFunction(ident);
}

private void initHiveProperties(
String catalogBackend,
Map<String, String> gravitinoProperties,
HashMap<String, String> icebergProperties) {
String metastoreUri =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI);
Preconditions.checkArgument(
StringUtils.isNotBlank(metastoreUri),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI
+ " from Iceberg Catalog properties");
String hiveWarehouse =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE);
Preconditions.checkArgument(
StringUtils.isNotBlank(hiveWarehouse),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE
+ " from Iceberg Catalog properties");
icebergProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_TYPE,
catalogBackend.toLowerCase(Locale.ENGLISH));
icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, metastoreUri);
icebergProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, hiveWarehouse);
}

private void initJdbcProperties(
String catalogBackend,
Map<String, String> gravitinoProperties,
HashMap<String, String> icebergProperties) {
String jdbcUri =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcUri),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI
+ " from Iceberg Catalog properties");
String jdbcWarehouse =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcWarehouse),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE
+ " from Iceberg Catalog properties");
String jdbcUser = gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_JDBC_USER);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcUser),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_JDBC_USER
+ " from Iceberg Catalog properties");
String jdbcPassword =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_JDBC_PASSWORD);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcPassword),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_JDBC_PASSWORD
+ " from Iceberg Catalog properties");
String jdbcDriver =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_DRIVER);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcDriver),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_DRIVER
+ " from Iceberg Catalog properties");
icebergProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_TYPE,
catalogBackend.toLowerCase(Locale.ROOT));
icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, jdbcUri);
icebergProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, jdbcWarehouse);
icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_USER, jdbcUser);
icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_PASSWORD, jdbcPassword);
icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_DRIVER, jdbcDriver);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,28 @@ public class IcebergPropertiesConstants {
public static final String GRAVITINO_ICEBERG_CATALOG_WAREHOUSE =
IcebergCatalogPropertiesMetadata.WAREHOUSE;

@VisibleForTesting
public static final String ICEBERG_CATALOG_WAREHOUSE = GRAVITINO_ICEBERG_CATALOG_WAREHOUSE;

@VisibleForTesting
public static final String GRAVITINO_ICEBERG_CATALOG_URI = IcebergCatalogPropertiesMetadata.URI;

@VisibleForTesting public static final String ICEBERG_CATALOG_URI = GRAVITINO_ICEBERG_CATALOG_URI;

public static final String GRAVITINO_JDBC_USER =
IcebergCatalogPropertiesMetadata.GRAVITINO_JDBC_USER;
public static final String GRAVITINO_ICEBERG_JDBC_USER =
IcebergCatalogPropertiesMetadata.ICEBERG_JDBC_USER;
public static final String ICEBERG_JDBC_USER = IcebergCatalogPropertiesMetadata.ICEBERG_JDBC_USER;
public static final String GRAVITINO_JDBC_PASSWORD =
IcebergCatalogPropertiesMetadata.GRAVITINO_JDBC_PASSWORD;
public static final String GRAVITINO_ICEBERG_JDBC_PASSWORD =
public static final String ICEBERG_JDBC_PASSWORD =
IcebergCatalogPropertiesMetadata.ICEBERG_JDBC_PASSWORD;
public static final String GRAVITINO_ICEBERG_JDBC_DRIVER =
IcebergCatalogPropertiesMetadata.GRAVITINO_JDBC_DRIVER;

public static final String GRAVITINO_ICEBERG_CATALOG_TYPE = "type";
public static final String ICEBERG_CATALOG_TYPE = "type";
public static final String ICEBERG_CATALOG_BACKEND_HIVE = "hive";
public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive";
public static final String ICEBERG_CATALOG_BACKEND_JDBC = "jdbc";
public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC = "jdbc";

private IcebergPropertiesConstants() {}
Expand Down
Loading

0 comments on commit c7de58c

Please sign in to comment.