Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 0 additions & 50 deletions catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ import (
"strings"

"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog/internal"
iceinternal "github.com/apache/iceberg-go/internal"
"github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table"
)

Expand Down Expand Up @@ -216,51 +214,3 @@ func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals [

return updatedProps, summary, nil
}

//lint:ignore U1000 this is linked to by catalogs via go:linkname but we don't want to export it
func updateAndStageTable(ctx context.Context, current *table.Table, ident table.Identifier, reqs []table.Requirement, updates []table.Update, cat table.CatalogIO) (*table.StagedTable, error) {
var (
baseMeta table.Metadata
metadataLoc string
)

if current != nil {
for _, r := range reqs {
if err := r.Validate(current.Metadata()); err != nil {
return nil, err
}
}

baseMeta = current.Metadata()
metadataLoc = current.MetadataLocation()
} else {
var err error
baseMeta, err = table.NewMetadata(iceberg.NewSchema(0), nil, table.UnsortedSortOrder, "", nil)
if err != nil {
return nil, err
}
}

updated, err := internal.UpdateTableMetadata(baseMeta, updates, metadataLoc)
if err != nil {
return nil, err
}

provider, err := table.LoadLocationProvider(updated.Location(), updated.Properties())
if err != nil {
return nil, err
}

newVersion := internal.ParseMetadataVersion(metadataLoc) + 1
newLocation, err := provider.NewTableMetadataFileLocation(newVersion)
if err != nil {
return nil, err
}

fs, err := io.LoadFS(ctx, updated.Properties(), newLocation)
if err != nil {
return nil, err
}

return &table.StagedTable{Table: table.New(ident, updated, newLocation, fs, cat)}, nil
}
64 changes: 38 additions & 26 deletions catalog/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ const (
Endpoint = "glue.endpoint"
MaxRetries = "glue.max-retries"
RetryMode = "glue.retry-mode"

icebergFieldIDKey = "iceberg.field.id"
icebergFieldOptionalKey = "iceberg.field.optional"
icebergFieldCurrentKey = "iceberg.field.current"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot about this one. Will it ever be false? 🤔 The current schema should be stored in Glue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting: apache/iceberg#3888

)

var _ catalog.Catalog = (*Catalog)(nil)
Expand Down Expand Up @@ -130,6 +134,7 @@ type Catalog struct {
glueSvc glueAPI
catalogId *string
awsCfg *aws.Config
props iceberg.Properties
}

// NewCatalog creates a new instance of glue.Catalog with the given options.
Expand All @@ -151,6 +156,7 @@ func NewCatalog(opts ...Option) *Catalog {
glueSvc: glue.NewFromConfig(glueOps.awsConfig),
catalogId: catalogId,
awsCfg: &glueOps.awsConfig,
props: iceberg.Properties(glueOps.awsProperties),
}
}

Expand Down Expand Up @@ -232,46 +238,55 @@ func (c *Catalog) CatalogType() catalog.Type {
}

// CreateTable creates a new Iceberg table in the Glue catalog.
// AWS Glue will create a new table and a new metadata file in S3 with the format: metadataLocation/metadata/00000-00000-00000-00000-00000.metadata.json.
// This function will create the metadata file in S3 using the catalog and table properties,
// to determine the bucket and key for the metadata location.
func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, opts ...catalog.CreateTableOpt) (*table.Table, error) {
staged, err := internal.CreateStagedTable(ctx, c.props, c.LoadNamespaceProperties, identifier, schema, opts...)
if err != nil {
return nil, err
}

database, tableName, err := identifierToGlueTable(identifier)
if err != nil {
return nil, err
}
var cfg catalog.CreateTableCfg
for _, opt := range opts {
opt(&cfg)

wfs, ok := staged.FS().(io.WriteFileIO)
if !ok {
return nil, errors.New("loaded filesystem IO does not support writing")
}
if cfg.Location == "" {
return nil, errors.New("metadata location is required for table creation")

if err := internal.WriteTableMetadata(staged.Metadata(), wfs, staged.MetadataLocation()); err != nil {
return nil, err
}
parameters := map[string]string{}
for k, v := range cfg.Properties {
parameters[k] = v

var tableDescription *string
if desc := staged.Properties().Get("Description", ""); desc != "" {
tableDescription = aws.String(desc)
}

tableInput := &types.TableInput{
Name: aws.String(tableName),
Parameters: parameters,
TableType: aws.String("EXTERNAL_TABLE"),
Name: aws.String(tableName),
Parameters: map[string]string{
tableTypePropsKey: glueTypeIceberg,
metadataLocationPropsKey: staged.MetadataLocation(),
},
TableType: aws.String("EXTERNAL_TABLE"),
StorageDescriptor: &types.StorageDescriptor{
Location: aws.String(cfg.Location),
Columns: schemaToGlueColumns(schema),
Location: aws.String(staged.Metadata().Location()),
Columns: schemaToGlueColumns(schema, true),
},
Description: tableDescription,
}
_, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{
CatalogId: c.catalogId,
DatabaseName: aws.String(database),
TableInput: tableInput,
OpenTableFormatInput: &types.OpenTableFormatInput{
IcebergInput: &types.IcebergInput{
MetadataOperation: types.MetadataOperationCreate,
},
},
})
if err != nil {
return nil, fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err)
}
createdTable, err := c.LoadTable(ctx, identifier, cfg.Properties)
createdTable, err := c.LoadTable(ctx, identifier, nil)
if err != nil {
// Attempt to clean up the table if loading fails
_, cleanupErr := c.glueSvc.DeleteTable(ctx, &glue.DeleteTableInput{
Expand Down Expand Up @@ -313,7 +328,7 @@ func (c *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier
TableType: aws.String("EXTERNAL_TABLE"),
StorageDescriptor: &types.StorageDescriptor{
Location: aws.String(metadataLocation),
Columns: schemaToGlueColumns(metadata.Schema()),
Columns: schemaToGlueColumns(metadata.Schema(), true),
},
}
_, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{
Expand All @@ -333,9 +348,6 @@ func (c *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier
return c.LoadTable(ctx, identifier, nil)
}

//go:linkname updateAndStageTable github.com/apache/iceberg-go/catalog.updateAndStageTable
func updateAndStageTable(ctx context.Context, current *table.Table, ident table.Identifier, reqs []table.Requirement, updates []table.Update, cat table.CatalogIO) (*table.StagedTable, error)

func (c *Catalog) CommitTable(ctx context.Context, tbl *table.Table, requirements []table.Requirement, updates []table.Update) (table.Metadata, string, error) {
// Load current table
database, tableName, err := identifierToGlueTable(tbl.Identifier())
Expand All @@ -348,7 +360,7 @@ func (c *Catalog) CommitTable(ctx context.Context, tbl *table.Table, requirement
}

// Create a staging table with the updates applied
staged, err := updateAndStageTable(ctx, tbl, tbl.Identifier(), requirements, updates, c)
staged, err := internal.UpdateAndStageTable(ctx, tbl, tbl.Identifier(), requirements, updates, c)
if err != nil {
return nil, "", err
}
Expand Down Expand Up @@ -764,7 +776,7 @@ func buildGlueTableInput(ctx context.Context, database string, tableName string,
existingColumnMap[*column.Name] = *column.Comment
}
var glueColumns []types.Column
for _, column := range schemaToGlueColumns(staged.Metadata().CurrentSchema()) {
for _, column := range schemaToGlueColumns(staged.Metadata().CurrentSchema(), true) {
col := types.Column{
Name: column.Name,
Comment: column.Comment,
Expand Down
14 changes: 10 additions & 4 deletions catalog/glue/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package glue

import (
"fmt"
"strconv"
"strings"

"github.com/apache/iceberg-go"
Expand All @@ -27,22 +28,27 @@ import (
)

// schemaToGlueColumns converts an Iceberg schema to a list of Glue columns.
func schemaToGlueColumns(schema *iceberg.Schema) []types.Column {
func schemaToGlueColumns(schema *iceberg.Schema, isCurrent bool) []types.Column {
var columns []types.Column
for _, field := range schema.Fields() {
columns = append(columns, fieldToGlueColumn(field))
columns = append(columns, fieldToGlueColumn(field, isCurrent))
}

return columns
}

// fieldToGlueColumn converts an Iceberg nested field to a Glue column.
func fieldToGlueColumn(field iceberg.NestedField) types.Column {
func fieldToGlueColumn(field iceberg.NestedField, isCurrent bool) types.Column {
column := types.Column{
Name: aws.String(field.Name),
Comment: aws.String(field.Doc),
Type: aws.String(icebergTypeToGlueType(field.Type)),
Parameters: map[string]string{
icebergFieldIDKey: strconv.Itoa(field.ID),
icebergFieldOptionalKey: strconv.FormatBool(!field.Required),
icebergFieldCurrentKey: strconv.FormatBool(isCurrent),
},
}
column.Type = aws.String(icebergTypeToGlueType(field.Type))

return column
}
Expand Down
31 changes: 21 additions & 10 deletions catalog/glue/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ func TestFieldToGlueColumn(t *testing.T) {
Name: aws.String("simple_field"),
Type: aws.String("string"),
Comment: aws.String("A simple string field"),
Parameters: map[string]string{
icebergFieldIDKey: "1",
icebergFieldOptionalKey: "false",
icebergFieldCurrentKey: "true",
},
},
},
{
Expand All @@ -138,22 +143,28 @@ func TestFieldToGlueColumn(t *testing.T) {
ID: 2,
Name: "price",
Type: iceberg.DecimalTypeOf(10, 2),
Required: true,
Required: false,
Doc: "Price with 2 decimal places",
},
expected: types.Column{
Name: aws.String("price"),
Type: aws.String("decimal(10,2)"),
Comment: aws.String("Price with 2 decimal places"),
Parameters: map[string]string{
icebergFieldIDKey: "2",
icebergFieldOptionalKey: "true",
icebergFieldCurrentKey: "true",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := fieldToGlueColumn(tt.field)
assert.Equal(t, aws.ToString(tt.expected.Name), aws.ToString(result.Name))
assert.Equal(t, aws.ToString(tt.expected.Type), aws.ToString(result.Type))
assert.Equal(t, aws.ToString(tt.expected.Comment), aws.ToString(result.Comment))
result := fieldToGlueColumn(tt.field, true)
assert.Equal(t, tt.expected.Name, result.Name)
assert.Equal(t, tt.expected.Type, result.Type)
assert.Equal(t, tt.expected.Comment, result.Comment)
assert.Equal(t, tt.expected.Parameters, result.Parameters)
})
}
}
Expand Down Expand Up @@ -191,7 +202,7 @@ func TestNestedStructType(t *testing.T) {
Required: true,
Doc: "User address",
}
column := fieldToGlueColumn(field)
column := fieldToGlueColumn(field, true)
assert.Equal(t, "address", aws.ToString(column.Name))
assert.Equal(t, expected, aws.ToString(column.Type))
assert.Equal(t, "User address", aws.ToString(column.Comment))
Expand All @@ -212,7 +223,7 @@ func TestNestedListType(t *testing.T) {
Required: false,
Doc: "User tags",
}
column := fieldToGlueColumn(field)
column := fieldToGlueColumn(field, false)
assert.Equal(t, "tags", aws.ToString(column.Name))
assert.Equal(t, "array<string>", aws.ToString(column.Type))
assert.Equal(t, "User tags", aws.ToString(column.Comment))
Expand All @@ -235,7 +246,7 @@ func TestNestedMapType(t *testing.T) {
Required: true,
Doc: "User properties",
}
column := fieldToGlueColumn(field)
column := fieldToGlueColumn(field, true)
assert.Equal(t, "properties", aws.ToString(column.Name))
assert.Equal(t, "map<string,int>", aws.ToString(column.Type))
assert.Equal(t, "User properties", aws.ToString(column.Comment))
Expand Down Expand Up @@ -281,7 +292,7 @@ func TestComplexNestedTypes(t *testing.T) {
Required: true,
Doc: "A complex nested field",
}
column := fieldToGlueColumn(field)
column := fieldToGlueColumn(field, true)
assert.Equal(t, "complex_field", aws.ToString(column.Name))
assert.Equal(t, expected, aws.ToString(column.Type))
assert.Equal(t, "A complex nested field", aws.ToString(column.Comment))
Expand Down Expand Up @@ -338,7 +349,7 @@ func TestSchemaToGlueColumns(t *testing.T) {
},
}
schema := iceberg.NewSchema(1, fields...)
columns := schemaToGlueColumns(schema)
columns := schemaToGlueColumns(schema, true)
assert.Equal(t, 4, len(columns))
assert.Equal(t, "id", aws.ToString(columns[0].Name))
assert.Equal(t, "bigint", aws.ToString(columns[0].Type))
Expand Down
Loading
Loading