Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@ public class IcebergResource extends Resource {

private static final String ICEBERG_CATALOG = "starrocks.catalog-type";
private static final String ICEBERG_METASTORE_URIS = "iceberg.catalog.hive.metastore.uris";
private static final String ICEBERG_IMPL = "iceberg.catalog-impl";

@SerializedName(value = "catalogType")
private String catalogType;

@SerializedName(value = "metastoreURIs")
private String metastoreURIs;

@SerializedName(value = "catalogImpl")
private String catalogImpl;

@SerializedName(value = "properties")
private Map<String, String> properties;

Expand All @@ -64,6 +68,17 @@ protected void setProperties(Map<String, String> properties) throws DdlException
throw new DdlException(ICEBERG_METASTORE_URIS + " must be set in properties");
}
break;
case CUSTOM_CATALOG:
catalogImpl = properties.get(ICEBERG_IMPL);
if (StringUtils.isBlank(catalogImpl)) {
throw new DdlException(ICEBERG_IMPL + " must be set in properties");
}
try {
Thread.currentThread().getContextClassLoader().loadClass(catalogImpl);
Copy link
Contributor

@stephen-shelby stephen-shelby Apr 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to verify whether the jvm has loaded the custom class when create customer iceberg resource? If fe restarts and the loadClass() method will not be called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there other considerations here? In fact, there is no need to restart fe by dynamically loading the custom jar.

} catch (ClassNotFoundException e) {
throw new DdlException("Unknown class: " + catalogImpl);
}
break;
default:
throw new DdlException("Unexpected catalog type: " + catalogType);
}
Expand All @@ -76,6 +91,9 @@ protected void getProcNodeData(BaseProcResult result) {
case HIVE_CATALOG:
result.addRow(Lists.newArrayList(name, lowerCaseType, ICEBERG_METASTORE_URIS, metastoreURIs));
break;
case CUSTOM_CATALOG:
result.addRow(Lists.newArrayList(name, lowerCaseType, ICEBERG_IMPL, catalogImpl));
break;
default:
LOG.warn("Unexpected catalog type: " + catalogType);
break;
Expand All @@ -86,6 +104,10 @@ public String getHiveMetastoreURIs() {
return metastoreURIs;
}

public String getIcebergImpl() {
return catalogImpl;
}

public IcebergCatalogType getCatalogType() {
return IcebergCatalogType.fromString(catalogType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class IcebergTable extends Table {

private static final String ICEBERG_CATALOG = "starrocks.catalog-type";
private static final String ICEBERG_METASTORE_URIS = "iceberg.catalog.hive.metastore.uris";
private static final String ICEBERG_IMPL = "iceberg.catalog-impl";
private static final String ICEBERG_DB = "database";
private static final String ICEBERG_TABLE = "table";
private static final String ICEBERG_RESOURCE = "resource";
Expand Down Expand Up @@ -84,6 +85,14 @@ public IcebergCatalogType getCatalogType() {
return IcebergCatalogType.valueOf(icebergProperties.get(ICEBERG_CATALOG));
}

public String getCatalogImpl() {
return icebergProperties.get(ICEBERG_IMPL);
}

public Map<String, String> getIcebergProperties() {
return icebergProperties;
}

public String getIcebergHiveMetastoreUris() {
return icebergProperties.get(ICEBERG_METASTORE_URIS);
}
Expand Down Expand Up @@ -140,16 +149,32 @@ private void validate(Map<String, String> properties) throws DdlException {
IcebergCatalogType type = icebergResource.getCatalogType();
icebergProperties.put(ICEBERG_CATALOG, type.name());
LOG.info("Iceberg table type is " + type.name());
IcebergCatalog icebergCatalog;
switch (type) {
case HIVE_CATALOG:
icebergProperties.put(ICEBERG_METASTORE_URIS, icebergResource.getHiveMetastoreURIs());
icebergCatalog = IcebergUtil.getIcebergHiveCatalog(icebergResource.getHiveMetastoreURIs());
break;
case CUSTOM_CATALOG:
icebergProperties.put(ICEBERG_IMPL, icebergResource.getIcebergImpl());
for (String key : copiedProps.keySet()) {
icebergProperties.put(key, copiedProps.remove(key));
}
icebergCatalog = IcebergUtil.getIcebergCustomCatalog(icebergResource.getIcebergImpl(), icebergProperties);
break;
default:
throw new DdlException("unsupported catalog type " + type.name());
}
this.resourceName = resourceName;

IcebergCatalog catalog = IcebergUtil.getIcebergCatalog(type, icebergResource.getHiveMetastoreURIs());
validateColumn(icebergCatalog);

if (!copiedProps.isEmpty()) {
throw new DdlException("Unknown table properties: " + copiedProps.toString());
}
}

private void validateColumn(IcebergCatalog catalog) throws DdlException {
org.apache.iceberg.Table icebergTable = catalog.loadTable(IcebergUtil.getIcebergTableIdentifier(db, table));
// TODO: use TypeUtil#indexByName to handle nested field
Map<String, Types.NestedField> icebergColumns = icebergTable.schema().columns().stream()
Expand All @@ -168,10 +193,7 @@ private void validate(Map<String, String> properties) throws DdlException {
"iceberg extern table not support no-nullable column: [" + icebergColumn.name() + "]");
}
}

if (!copiedProps.isEmpty()) {
throw new DdlException("Unknown table properties: " + copiedProps.toString());
}
LOG.debug("successfully validating columns for " + catalog);
}

private boolean validateColumnType(Type icebergType, com.starrocks.catalog.Type type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hadoop.SerializableConfiguration;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

import java.util.Map;

/**
* Most code of this file was copied from CatalogLoader.java of flink in iceberg codebase.
* Please see https://github.com/apache/iceberg.
* // TODO: add hadoop or custom catalogloader here.
* // TODO: add hadoop catalogloader here.
*/
public interface CatalogLoader {

Expand All @@ -30,6 +31,10 @@ static CatalogLoader hive(String name, Configuration hadoopConf, Map<String, Str
return new HiveCatalogLoader(name, hadoopConf, properties);
}

static CatalogLoader custom(String name, Configuration hadoopConf, Map<String, String> properties, String catalogImpl) {
return new CustomCatalogLoader(name, hadoopConf, properties, catalogImpl);
}

class HiveCatalogLoader implements CatalogLoader {
private final String catalogName;
private final SerializableConfiguration hadoopConf;
Expand Down Expand Up @@ -62,4 +67,36 @@ public String toString() {
}
}

class CustomCatalogLoader implements CatalogLoader {

private final SerializableConfiguration hadoopConf;
private final Map<String, String> properties;
private final String name;
private final String catalogImpl;

private CustomCatalogLoader(
String name,
Configuration conf,
Map<String, String> properties,
String catalogImpl) {
this.hadoopConf = new SerializableConfiguration(conf);
this.properties = Maps.newHashMap(properties); // wrap into a hashmap for serialization
this.name = name;
this.catalogImpl = Preconditions.checkNotNull(catalogImpl,
"Cannot initialize custom Catalog, impl class name is null");
}

@Override
public Catalog loadCatalog() {
return CatalogUtil.loadCatalog(catalogImpl, name, properties, hadoopConf.get());
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("name", name)
.add("catalogImpl", catalogImpl)
.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

public enum IcebergCatalogType {
HIVE_CATALOG,
CUSTOM_CATALOG,
UNKNOWN;
// TODO: add more iceberg catalog type

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.common.collect.ImmutableMap;
import com.starrocks.catalog.IcebergTable;
import com.starrocks.external.hive.HdfsFileFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
Expand Down Expand Up @@ -42,22 +43,34 @@ public static TableIdentifier getIcebergTableIdentifier(String db, String table)
public static IcebergCatalog getIcebergCatalog(IcebergTable table)
throws StarRocksIcebergException {
IcebergCatalogType catalogType = table.getCatalogType();
return getIcebergCatalog(catalogType, table.getIcebergHiveMetastoreUris());
}

/**
* Returns the corresponding catalog implementation.
*/
public static IcebergCatalog getIcebergCatalog(IcebergCatalogType catalogType, String metastoreUris)
throws StarRocksIcebergException {
switch (catalogType) {
case HIVE_CATALOG: return IcebergHiveCatalog.getInstance(metastoreUris);
case HIVE_CATALOG:
return getIcebergHiveCatalog(table.getIcebergHiveMetastoreUris());
case CUSTOM_CATALOG:
return getIcebergCustomCatalog(table.getCatalogImpl(), table.getIcebergProperties());
default:
throw new StarRocksIcebergException(
"Unexpected catalog type: " + catalogType.toString());
}
}

/**
* Returns the corresponding hive catalog implementation.
*/
public static IcebergCatalog getIcebergHiveCatalog(String metastoreUris)
throws StarRocksIcebergException {
return IcebergHiveCatalog.getInstance(metastoreUris);
}

/**
* Returns the corresponding custom catalog implementation.
*/
public static IcebergCatalog getIcebergCustomCatalog(String catalogImpl, Map<String, String> icebergProperties)
throws StarRocksIcebergException {
return (IcebergCatalog) CatalogLoader.custom(String.format("Custom-%s", catalogImpl),
new Configuration(), icebergProperties, catalogImpl).loadCatalog();
}

/**
* Get hdfs file format in StarRocks use iceberg file format.
* @param format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,34 @@ public void testFromStmt(@Mocked Catalog catalog, @Injectable Auth auth) throws
Assert.assertEquals(metastoreURIs, resource.getHiveMetastoreURIs());
}

@Test
public void testCustomStmt(@Mocked Catalog catalog, @Injectable Auth auth) throws UserException {
new Expectations() {
{
catalog.getAuth();
result = auth;
auth.checkGlobalPriv((ConnectContext) any, PrivPredicate.ADMIN);
result = true;
}
};

String name = "iceberg1";
String type = "iceberg";
String catalogType = "CUSTOM";
String catalogImpl = "com.starrocks.external.iceberg.IcebergHiveCatalog";
Map<String, String> properties = Maps.newHashMap();
properties.put("type", type);
properties.put("starrocks.catalog-type", catalogType);
properties.put("iceberg.catalog-impl", catalogImpl);
CreateResourceStmt stmt = new CreateResourceStmt(true, name, properties);
stmt.analyze(analyzer);
IcebergResource resource = (IcebergResource) Resource.fromStmt(stmt);
Assert.assertEquals("iceberg1", resource.getName());
Assert.assertEquals(type, resource.getType().name().toLowerCase());
Assert.assertEquals(IcebergCatalogType.fromString(catalogType), resource.getCatalogType());
Assert.assertEquals(catalogImpl, resource.getIcebergImpl());
}

@Test
public void testSerialization() throws Exception {
Resource resource = new IcebergResource("iceberg0");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import com.google.common.collect.Maps;
import com.starrocks.common.DdlException;
import com.starrocks.external.iceberg.IcebergCatalog;
import com.starrocks.external.iceberg.IcebergCatalogType;
import com.starrocks.external.iceberg.IcebergCustomCatalogTest;
import com.starrocks.external.iceberg.IcebergUtil;
import mockit.Expectations;
import mockit.Mock;
Expand Down Expand Up @@ -64,7 +64,7 @@ public void testWithResourceName(@Mocked Catalog catalog,

new MockUp<IcebergUtil>() {
@Mock
public IcebergCatalog getIcebergCatalog(IcebergCatalogType type, String uris) {
public IcebergCatalog getIcebergHiveCatalog(String uris) {
return icebergCatalog;
}
};
Expand Down Expand Up @@ -95,6 +95,54 @@ public IcebergCatalog getIcebergCatalog(IcebergCatalogType type, String uris) {
Assert.assertEquals(db, table.getDb());
}

@Test
public void testCustomWithResourceName(@Mocked Catalog catalog,
@Mocked ResourceMgr resourceMgr,
@Mocked IcebergCatalog icebergCatalog,
@Mocked Table iTable) throws DdlException {
Resource icebergResource = new IcebergResource(resourceName);
Map<String, String> resourceProperties = Maps.newHashMap();
resourceProperties.put("starrocks.catalog-type", "custom");
resourceProperties.put("iceberg.catalog-impl", IcebergCustomCatalogTest.IcebergCustomTestingCatalog.class.getName());
icebergResource.setProperties(resourceProperties);

List<Types.NestedField> fields = new ArrayList<>();
fields.add(Types.NestedField.of(1, false, "col1", new Types.LongType()));
Schema schema = new Schema(fields);

new MockUp<IcebergUtil>() {
@Mock
public IcebergCatalog getIcebergCustomCatalog(String catalogImpl, Map<String, String> icebergProperties) {
return icebergCatalog;
}
};

new Expectations() {
{
com.starrocks.catalog.Catalog.getCurrentCatalog();
result = catalog;
minTimes = 0;

catalog.getResourceMgr();
result = resourceMgr;

resourceMgr.getResource("iceberg0");
result = icebergResource;

icebergCatalog.loadTable((TableIdentifier) any);
result = iTable;

iTable.schema();
result = schema;
}
};

properties.put("resource", resourceName);
IcebergTable table = new IcebergTable(1000, "iceberg_table", columns, properties);
Assert.assertEquals(tableName, table.getTable());
Assert.assertEquals(db, table.getDb());
}

@Test(expected = DdlException.class)
public void testNoDb() throws DdlException {
properties.remove("database");
Expand Down
Loading