diff --git a/catalog/catalog.go b/catalog/catalog.go index 01b0e28fc..2866276fc 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -36,9 +36,7 @@ import ( "strings" "github.com/apache/iceberg-go" - "github.com/apache/iceberg-go/catalog/internal" iceinternal "github.com/apache/iceberg-go/internal" - "github.com/apache/iceberg-go/io" "github.com/apache/iceberg-go/table" ) @@ -216,51 +214,3 @@ func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals [ return updatedProps, summary, nil } - -//lint:ignore U1000 this is linked to by catalogs via go:linkname but we don't want to export it -func updateAndStageTable(ctx context.Context, current *table.Table, ident table.Identifier, reqs []table.Requirement, updates []table.Update, cat table.CatalogIO) (*table.StagedTable, error) { - var ( - baseMeta table.Metadata - metadataLoc string - ) - - if current != nil { - for _, r := range reqs { - if err := r.Validate(current.Metadata()); err != nil { - return nil, err - } - } - - baseMeta = current.Metadata() - metadataLoc = current.MetadataLocation() - } else { - var err error - baseMeta, err = table.NewMetadata(iceberg.NewSchema(0), nil, table.UnsortedSortOrder, "", nil) - if err != nil { - return nil, err - } - } - - updated, err := internal.UpdateTableMetadata(baseMeta, updates, metadataLoc) - if err != nil { - return nil, err - } - - provider, err := table.LoadLocationProvider(updated.Location(), updated.Properties()) - if err != nil { - return nil, err - } - - newVersion := internal.ParseMetadataVersion(metadataLoc) + 1 - newLocation, err := provider.NewTableMetadataFileLocation(newVersion) - if err != nil { - return nil, err - } - - fs, err := io.LoadFS(ctx, updated.Properties(), newLocation) - if err != nil { - return nil, err - } - - return &table.StagedTable{Table: table.New(ident, updated, newLocation, fs, cat)}, nil -} diff --git a/catalog/glue/glue.go b/catalog/glue/glue.go index 75b19eb23..ab15c2d04 100644 --- a/catalog/glue/glue.go +++ b/catalog/glue/glue.go @@ -65,6 +65,10 @@ const ( Endpoint = "glue.endpoint" MaxRetries = "glue.max-retries" RetryMode = "glue.retry-mode" + + icebergFieldIDKey = "iceberg.field.id" + icebergFieldOptionalKey = "iceberg.field.optional" + icebergFieldCurrentKey = "iceberg.field.current" ) var _ catalog.Catalog = (*Catalog)(nil) @@ -130,6 +134,7 @@ type Catalog struct { glueSvc glueAPI catalogId *string awsCfg *aws.Config + props iceberg.Properties } // NewCatalog creates a new instance of glue.Catalog with the given options. @@ -151,6 +156,7 @@ func NewCatalog(opts ...Option) *Catalog { glueSvc: glue.NewFromConfig(glueOps.awsConfig), catalogId: catalogId, awsCfg: &glueOps.awsConfig, + props: iceberg.Properties(glueOps.awsProperties), } } @@ -232,46 +238,55 @@ func (c *Catalog) CatalogType() catalog.Type { } // CreateTable creates a new Iceberg table in the Glue catalog. -// AWS Glue will create a new table and a new metadata file in S3 with the format: metadataLocation/metadata/00000-00000-00000-00000-00000.metadata.json. +// This function will create the metadata file in S3 using the catalog and table properties, +// to determine the bucket and key for the metadata location. func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, opts ...catalog.CreateTableOpt) (*table.Table, error) { + staged, err := internal.CreateStagedTable(ctx, c.props, c.LoadNamespaceProperties, identifier, schema, opts...) + if err != nil { + return nil, err + } + database, tableName, err := identifierToGlueTable(identifier) if err != nil { return nil, err } - var cfg catalog.CreateTableCfg - for _, opt := range opts { - opt(&cfg) + + wfs, ok := staged.FS().(io.WriteFileIO) + if !ok { + return nil, errors.New("loaded filesystem IO does not support writing") } - if cfg.Location == "" { - return nil, errors.New("metadata location is required for table creation") + + if err := internal.WriteTableMetadata(staged.Metadata(), wfs, staged.MetadataLocation()); err != nil { + return nil, err } - parameters := map[string]string{} - for k, v := range cfg.Properties { - parameters[k] = v + + var tableDescription *string + if desc := staged.Properties().Get("Description", ""); desc != "" { + tableDescription = aws.String(desc) } + tableInput := &types.TableInput{ - Name: aws.String(tableName), - Parameters: parameters, - TableType: aws.String("EXTERNAL_TABLE"), + Name: aws.String(tableName), + Parameters: map[string]string{ + tableTypePropsKey: glueTypeIceberg, + metadataLocationPropsKey: staged.MetadataLocation(), + }, + TableType: aws.String("EXTERNAL_TABLE"), StorageDescriptor: &types.StorageDescriptor{ - Location: aws.String(cfg.Location), - Columns: schemaToGlueColumns(schema), + Location: aws.String(staged.Metadata().Location()), + Columns: schemaToGlueColumns(schema, true), }, + Description: tableDescription, } _, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{ CatalogId: c.catalogId, DatabaseName: aws.String(database), TableInput: tableInput, - OpenTableFormatInput: &types.OpenTableFormatInput{ - IcebergInput: &types.IcebergInput{ - MetadataOperation: types.MetadataOperationCreate, - }, - }, }) if err != nil { return nil, fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err) } - createdTable, err := c.LoadTable(ctx, identifier, cfg.Properties) + createdTable, err := c.LoadTable(ctx, identifier, nil) if err != nil { // Attempt to clean up the table if loading fails _, cleanupErr := c.glueSvc.DeleteTable(ctx, &glue.DeleteTableInput{ @@ -313,7 +328,7 @@ func (c *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier TableType: aws.String("EXTERNAL_TABLE"), StorageDescriptor: &types.StorageDescriptor{ Location: aws.String(metadataLocation), - Columns: schemaToGlueColumns(metadata.Schema()), + Columns: schemaToGlueColumns(metadata.Schema(), true), }, } _, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{ @@ -333,9 +348,6 @@ func (c *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier return c.LoadTable(ctx, identifier, nil) } -//go:linkname updateAndStageTable github.com/apache/iceberg-go/catalog.updateAndStageTable -func updateAndStageTable(ctx context.Context, current *table.Table, ident table.Identifier, reqs []table.Requirement, updates []table.Update, cat table.CatalogIO) (*table.StagedTable, error) - func (c *Catalog) CommitTable(ctx context.Context, tbl *table.Table, requirements []table.Requirement, updates []table.Update) (table.Metadata, string, error) { // Load current table database, tableName, err := identifierToGlueTable(tbl.Identifier()) @@ -348,7 +360,7 @@ func (c *Catalog) CommitTable(ctx context.Context, tbl *table.Table, requirement } // Create a staging table with the updates applied - staged, err := updateAndStageTable(ctx, tbl, tbl.Identifier(), requirements, updates, c) + staged, err := internal.UpdateAndStageTable(ctx, tbl, tbl.Identifier(), requirements, updates, c) if err != nil { return nil, "", err } @@ -764,7 +776,7 @@ func buildGlueTableInput(ctx context.Context, database string, tableName string, existingColumnMap[*column.Name] = *column.Comment } var glueColumns []types.Column - for _, column := range schemaToGlueColumns(staged.Metadata().CurrentSchema()) { + for _, column := range schemaToGlueColumns(staged.Metadata().CurrentSchema(), true) { col := types.Column{ Name: column.Name, Comment: column.Comment, diff --git a/catalog/glue/schema.go b/catalog/glue/schema.go index 27b2f6d64..167b41d95 100644 --- a/catalog/glue/schema.go +++ b/catalog/glue/schema.go @@ -19,6 +19,7 @@ package glue import ( "fmt" + "strconv" "strings" "github.com/apache/iceberg-go" @@ -27,22 +28,27 @@ import ( ) // schemaToGlueColumns converts an Iceberg schema to a list of Glue columns. -func schemaToGlueColumns(schema *iceberg.Schema) []types.Column { +func schemaToGlueColumns(schema *iceberg.Schema, isCurrent bool) []types.Column { var columns []types.Column for _, field := range schema.Fields() { - columns = append(columns, fieldToGlueColumn(field)) + columns = append(columns, fieldToGlueColumn(field, isCurrent)) } return columns } // fieldToGlueColumn converts an Iceberg nested field to a Glue column. -func fieldToGlueColumn(field iceberg.NestedField) types.Column { +func fieldToGlueColumn(field iceberg.NestedField, isCurrent bool) types.Column { column := types.Column{ Name: aws.String(field.Name), Comment: aws.String(field.Doc), + Type: aws.String(icebergTypeToGlueType(field.Type)), + Parameters: map[string]string{ + icebergFieldIDKey: strconv.Itoa(field.ID), + icebergFieldOptionalKey: strconv.FormatBool(!field.Required), + icebergFieldCurrentKey: strconv.FormatBool(isCurrent), + }, } - column.Type = aws.String(icebergTypeToGlueType(field.Type)) return column } diff --git a/catalog/glue/schema_test.go b/catalog/glue/schema_test.go index ff85c86a0..13825d5bb 100644 --- a/catalog/glue/schema_test.go +++ b/catalog/glue/schema_test.go @@ -130,6 +130,11 @@ func TestFieldToGlueColumn(t *testing.T) { Name: aws.String("simple_field"), Type: aws.String("string"), Comment: aws.String("A simple string field"), + Parameters: map[string]string{ + icebergFieldIDKey: "1", + icebergFieldOptionalKey: "false", + icebergFieldCurrentKey: "true", + }, }, }, { @@ -138,22 +143,28 @@ func TestFieldToGlueColumn(t *testing.T) { ID: 2, Name: "price", Type: iceberg.DecimalTypeOf(10, 2), - Required: true, + Required: false, Doc: "Price with 2 decimal places", }, expected: types.Column{ Name: aws.String("price"), Type: aws.String("decimal(10,2)"), Comment: aws.String("Price with 2 decimal places"), + Parameters: map[string]string{ + icebergFieldIDKey: "2", + icebergFieldOptionalKey: "true", + icebergFieldCurrentKey: "true", + }, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result := fieldToGlueColumn(tt.field) - assert.Equal(t, aws.ToString(tt.expected.Name), aws.ToString(result.Name)) - assert.Equal(t, aws.ToString(tt.expected.Type), aws.ToString(result.Type)) - assert.Equal(t, aws.ToString(tt.expected.Comment), aws.ToString(result.Comment)) + result := fieldToGlueColumn(tt.field, true) + assert.Equal(t, tt.expected.Name, result.Name) + assert.Equal(t, tt.expected.Type, result.Type) + assert.Equal(t, tt.expected.Comment, result.Comment) + assert.Equal(t, tt.expected.Parameters, result.Parameters) }) } } @@ -191,7 +202,7 @@ func TestNestedStructType(t *testing.T) { Required: true, Doc: "User address", } - column := fieldToGlueColumn(field) + column := fieldToGlueColumn(field, true) assert.Equal(t, "address", aws.ToString(column.Name)) assert.Equal(t, expected, aws.ToString(column.Type)) assert.Equal(t, "User address", aws.ToString(column.Comment)) @@ -212,7 +223,7 @@ func TestNestedListType(t *testing.T) { Required: false, Doc: "User tags", } - column := fieldToGlueColumn(field) + column := fieldToGlueColumn(field, false) assert.Equal(t, "tags", aws.ToString(column.Name)) assert.Equal(t, "array", aws.ToString(column.Type)) assert.Equal(t, "User tags", aws.ToString(column.Comment)) @@ -235,7 +246,7 @@ func TestNestedMapType(t *testing.T) { Required: true, Doc: "User properties", } - column := fieldToGlueColumn(field) + column := fieldToGlueColumn(field, true) assert.Equal(t, "properties", aws.ToString(column.Name)) assert.Equal(t, "map", aws.ToString(column.Type)) assert.Equal(t, "User properties", aws.ToString(column.Comment)) @@ -281,7 +292,7 @@ func TestComplexNestedTypes(t *testing.T) { Required: true, Doc: "A complex nested field", } - column := fieldToGlueColumn(field) + column := fieldToGlueColumn(field, true) assert.Equal(t, "complex_field", aws.ToString(column.Name)) assert.Equal(t, expected, aws.ToString(column.Type)) assert.Equal(t, "A complex nested field", aws.ToString(column.Comment)) @@ -338,7 +349,7 @@ func TestSchemaToGlueColumns(t *testing.T) { }, } schema := iceberg.NewSchema(1, fields...) - columns := schemaToGlueColumns(schema) + columns := schemaToGlueColumns(schema, true) assert.Equal(t, 4, len(columns)) assert.Equal(t, "id", aws.ToString(columns[0].Name)) assert.Equal(t, "bigint", aws.ToString(columns[0].Type)) diff --git a/catalog/internal/utils.go b/catalog/internal/utils.go index 204f7eaa3..7e4c8ca5b 100644 --- a/catalog/internal/utils.go +++ b/catalog/internal/utils.go @@ -22,11 +22,15 @@ import ( "encoding/json" "errors" "fmt" + "maps" + "net/url" "path" "regexp" "strconv" + "strings" "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/io" "github.com/apache/iceberg-go/table" "github.com/google/uuid" @@ -37,6 +41,16 @@ func GetMetadataLoc(location string, newVersion uint) string { location, newVersion, uuid.New().String()) } +func WriteTableMetadata(metadata table.Metadata, fs io.WriteFileIO, loc string) error { + out, err := fs.Create(loc) + if err != nil { + return nil + } + defer out.Close() + + return json.NewEncoder(out).Encode(metadata) +} + func WriteMetadata(ctx context.Context, metadata table.Metadata, loc string, props iceberg.Properties) error { fs, err := io.LoadFS(ctx, props, loc) if err != nil { @@ -90,6 +104,75 @@ func UpdateTableMetadata(base table.Metadata, updates []table.Update, metadataLo return bldr.Build() } +func CreateStagedTable(ctx context.Context, catprops iceberg.Properties, nspropsFn GetNamespacePropsFn, ident table.Identifier, sc *iceberg.Schema, opts ...catalog.CreateTableOpt) (table.StagedTable, error) { + var cfg catalog.CreateTableCfg + for _, opt := range opts { + opt(&cfg) + } + + dbIdent := catalog.NamespaceFromIdent(ident) + tblname := catalog.TableNameFromIdent(ident) + dbname := strings.Join(dbIdent, ".") + + loc, err := ResolveTableLocation(ctx, cfg.Location, dbname, tblname, catprops, nspropsFn) + if err != nil { + return table.StagedTable{}, err + } + + provider, err := table.LoadLocationProvider(loc, cfg.Properties) + if err != nil { + return table.StagedTable{}, err + } + + metadataLoc, err := provider.NewTableMetadataFileLocation(0) + if err != nil { + return table.StagedTable{}, err + } + + metadata, err := table.NewMetadata(sc, cfg.PartitionSpec, cfg.SortOrder, loc, cfg.Properties) + if err != nil { + return table.StagedTable{}, err + } + + ioProps := maps.Clone(catprops) + maps.Copy(ioProps, cfg.Properties) + fs, err := io.LoadFS(ctx, ioProps, metadataLoc) + if err != nil { + return table.StagedTable{}, err + } + + return table.StagedTable{ + Table: table.New(ident, metadata, metadataLoc, fs, nil), + }, nil +} + +type GetNamespacePropsFn func(context.Context, table.Identifier) (iceberg.Properties, error) + +func ResolveTableLocation(ctx context.Context, loc, dbname, tablename string, catprops iceberg.Properties, nsprops GetNamespacePropsFn) (string, error) { + if len(loc) == 0 { + dbprops, err := nsprops(ctx, strings.Split(dbname, ".")) + if err != nil { + return "", err + } + + return getDefaultWarehouseLocation(dbname, tablename, dbprops, catprops) + } + + return strings.TrimSuffix(loc, "/"), nil +} + +func getDefaultWarehouseLocation(dbname, tablename string, nsprops, catprops iceberg.Properties) (string, error) { + if dblocation := nsprops.Get("location", ""); dblocation != "" { + return url.JoinPath(dblocation, tablename) + } + + if warehousepath := catprops.Get("warehouse", ""); warehousepath != "" { + return url.JoinPath(warehousepath, dbname+".db", tablename) + } + + return "", errors.New("no default path set, please specify a location when creating a table") +} + // (\d+) -> version number // - -> separator // ([\w-]{36}) -> UUID (36 characters, including hyphens) @@ -116,3 +199,50 @@ func ParseMetadataVersion(location string) int { return v } + +func UpdateAndStageTable(ctx context.Context, current *table.Table, ident table.Identifier, reqs []table.Requirement, updates []table.Update, cat table.CatalogIO) (*table.StagedTable, error) { + var ( + baseMeta table.Metadata + metadataLoc string + ) + + if current != nil { + for _, r := range reqs { + if err := r.Validate(current.Metadata()); err != nil { + return nil, err + } + } + + baseMeta = current.Metadata() + metadataLoc = current.MetadataLocation() + } else { + var err error + baseMeta, err = table.NewMetadata(iceberg.NewSchema(0), nil, table.UnsortedSortOrder, "", nil) + if err != nil { + return nil, err + } + } + + updated, err := UpdateTableMetadata(baseMeta, updates, metadataLoc) + if err != nil { + return nil, err + } + + provider, err := table.LoadLocationProvider(updated.Location(), updated.Properties()) + if err != nil { + return nil, err + } + + newVersion := ParseMetadataVersion(metadataLoc) + 1 + newLocation, err := provider.NewTableMetadataFileLocation(newVersion) + if err != nil { + return nil, err + } + + fs, err := io.LoadFS(ctx, updated.Properties(), newLocation) + if err != nil { + return nil, err + } + + return &table.StagedTable{Table: table.New(ident, updated, newLocation, fs, cat)}, nil +} diff --git a/catalog/sql/sql.go b/catalog/sql/sql.go index 9373f27b9..bb8a083f3 100644 --- a/catalog/sql/sql.go +++ b/catalog/sql/sql.go @@ -24,7 +24,6 @@ import ( "fmt" "iter" "maps" - "net/url" "slices" "strings" "sync" @@ -258,31 +257,6 @@ func (c *Catalog) namespaceExists(ctx context.Context, ns string) (bool, error) }) } -func (c *Catalog) getDefaultWarehouseLocation(ctx context.Context, nsname, tableName string) (string, error) { - dbprops, err := c.LoadNamespaceProperties(ctx, strings.Split(nsname, ".")) - if err != nil { - return "", err - } - - if dblocation := dbprops.Get("location", ""); dblocation != "" { - return url.JoinPath(dblocation, tableName) - } - - if warehousepath := c.props.Get("warehouse", ""); warehousepath != "" { - return url.JoinPath(warehousepath, nsname+".db", tableName) - } - - return "", errors.New("no default path set, please specify a location when creating a table") -} - -func (c *Catalog) resolveTableLocation(ctx context.Context, loc, nsname, tablename string) (string, error) { - if len(loc) == 0 { - return c.getDefaultWarehouseLocation(ctx, nsname, tablename) - } - - return strings.TrimSuffix(loc, "/"), nil -} - func checkValidNamespace(ident table.Identifier) error { if len(ident) < 1 { return fmt.Errorf("%w: empty namespace identifier", catalog.ErrNoSuchNamespace) @@ -292,9 +266,9 @@ func checkValidNamespace(ident table.Identifier) error { } func (c *Catalog) CreateTable(ctx context.Context, ident table.Identifier, sc *iceberg.Schema, opts ...catalog.CreateTableOpt) (*table.Table, error) { - var cfg catalog.CreateTableCfg - for _, opt := range opts { - opt(&cfg) + staged, err := internal.CreateStagedTable(ctx, c.props, c.LoadNamespaceProperties, ident, sc, opts...) + if err != nil { + return nil, err } nsIdent := catalog.NamespaceFromIdent(ident) @@ -309,18 +283,12 @@ func (c *Catalog) CreateTable(ctx context.Context, ident table.Identifier, sc *i return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, ns) } - loc, err := c.resolveTableLocation(ctx, cfg.Location, ns, tblIdent) - if err != nil { - return nil, err - } - - metadataLocation := internal.GetMetadataLoc(loc, 0) - metadata, err := table.NewMetadata(sc, cfg.PartitionSpec, cfg.SortOrder, loc, cfg.Properties) - if err != nil { - return nil, err + wfs, ok := staged.FS().(io.WriteFileIO) + if !ok { + return nil, errors.New("loaded filesystem IO does not support writing") } - if err := internal.WriteMetadata(ctx, metadata, metadataLocation, c.props); err != nil { + if err := internal.WriteTableMetadata(staged.Metadata(), wfs, staged.MetadataLocation()); err != nil { return nil, err } @@ -329,7 +297,7 @@ func (c *Catalog) CreateTable(ctx context.Context, ident table.Identifier, sc *i CatalogName: c.name, TableNamespace: ns, TableName: tblIdent, - MetadataLocation: sql.NullString{String: metadataLocation, Valid: true}, + MetadataLocation: sql.NullString{String: staged.MetadataLocation(), Valid: true}, }).Exec(ctx) if err != nil { return fmt.Errorf("failed to create table: %w", err) @@ -341,12 +309,9 @@ func (c *Catalog) CreateTable(ctx context.Context, ident table.Identifier, sc *i return nil, err } - return c.LoadTable(ctx, ident, cfg.Properties) + return c.LoadTable(ctx, ident, staged.Properties()) } -//go:linkname updateAndStageTable github.com/apache/iceberg-go/catalog.updateAndStageTable -func updateAndStageTable(ctx context.Context, current *table.Table, ident table.Identifier, reqs []table.Requirement, updates []table.Update, cat table.CatalogIO) (*table.StagedTable, error) - func (c *Catalog) CommitTable(ctx context.Context, tbl *table.Table, reqs []table.Requirement, updates []table.Update) (table.Metadata, string, error) { ns := catalog.NamespaceFromIdent(tbl.Identifier()) tblName := catalog.TableNameFromIdent(tbl.Identifier()) @@ -356,7 +321,7 @@ func (c *Catalog) CommitTable(ctx context.Context, tbl *table.Table, reqs []tabl return nil, "", err } - staged, err := updateAndStageTable(ctx, current, tbl.Identifier(), reqs, updates, c) + staged, err := internal.UpdateAndStageTable(ctx, current, tbl.Identifier(), reqs, updates, c) if err != nil { return nil, "", err }