-
Notifications
You must be signed in to change notification settings - Fork 24
feat(sdk): refactor TDF architecture with streaming support and segment-based writing #2652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…nt-based writing This commit introduces a comprehensive refactoring of the TDF SDK architecture to support streaming TDF creation with out-of-order segment writing, designed specifically for S3 multipart upload integration. Key Changes: **Architecture Refactoring:** - Restructure TDF components into dedicated packages (tdf/, tdf/keysplit/) - Move core TDF types (manifest, assertions, writer) into tdf/ package - Implement new keysplit package with XOR-based key splitting algorithm - Add segment-based caching and locator system for streaming operations **New Streaming Writer Implementation:** - Add StreamingWriter API with 0-based segment indexing - Implement SegmentWriter for out-of-order segment writing - Support dynamic segment expansion beyond initial expected count - Add comprehensive test coverage for sequential and out-of-order scenarios - Include memory cleanup functionality for uploaded segments **Archive Package Cleanup:** - Remove legacy TDF3Writer and associated test files - Consolidate to SegmentWriter as the primary archive implementation - Add ZIP64 support for large TDF files - Implement deterministic segment output (segment 0 includes ZIP header) **Key Features:** - Out-of-order segment writing with deterministic assembly - S3 multipart upload compatibility with part-to-segment mapping - Streaming TDF creation without full payload buffering - Comprehensive error handling and validation - Thread-safe operations with proper mutex protection **Test Coverage:** - Fix streaming writer tests to use 0-based indexing consistently - Add validation tests for segment index boundaries - Include benchmarks for sequential vs out-of-order writing - Test context cancellation and error conditions This refactoring enables efficient streaming TDF creation for large files while maintaining backward compatibility with existing TDF readers.
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a major and well-structured refactoring of the TDF SDK to support streaming. The changes are comprehensive and move the SDK in a good direction. I've found a few critical issues related to the new streaming writer's memory cleanup and data handling that could lead to corrupted files or runtime errors. I've also included a suggestion to improve performance. Please see the detailed comments below.
| // AddSegment adds a segment to the metadata | ||
| func (sm *SegmentMetadata) AddSegment(index int, data []byte, originalSize uint64, originalCRC32 uint32) error { | ||
| if index < 0 { | ||
| return ErrInvalidSegment | ||
| } | ||
|
|
||
| // Allow dynamic expansion beyond ExpectedCount for streaming use cases | ||
| if index >= sm.ExpectedCount { | ||
| sm.ExpectedCount = index + 1 | ||
| } | ||
|
|
||
| if _, exists := sm.Segments[index]; exists { | ||
| return ErrDuplicateSegment | ||
| } | ||
|
|
||
| sm.Segments[index] = &SegmentEntry{ | ||
| Index: index, | ||
| Data: data, | ||
| Size: originalSize, | ||
| CRC32: originalCRC32, | ||
| Written: time.Now(), | ||
| } | ||
|
|
||
| sm.TotalSize += originalSize | ||
| // Recalculate total CRC32 for all segments in logical order | ||
| sm.TotalCRC32 = sm.calculateTotalCRC32() | ||
|
|
||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The AddSegment function calls calculateTotalCRC32 on every invocation. calculateTotalCRC32 iterates over all expected segments to recompute the total CRC from scratch. For a large number of segments, this becomes very inefficient. For example, with 10,000 segments, this will result in many millions of redundant processing operations.
The total CRC is only needed during Finalize. I suggest moving the calculation there to be performed only once. This would involve:
- Removing
sm.TotalCRC32 = sm.calculateTotalCRC32()fromAddSegment. - Calling
sw.metadata.CalculateTotalCRC32()(after making it public) insidesegmentWriter.Finalize()just before the CRC is needed.
This change will significantly improve the performance of writing segments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the TDF SDK architecture to support streaming TDF creation with segment-based writing and out-of-order segment support, specifically designed for S3 multipart upload integration. The refactoring restructures TDF components into dedicated packages, implements a new streaming writer API with dynamic segment expansion, and replaces legacy TDF3Writer with a more efficient SegmentWriter architecture.
- Restructures code into dedicated tdf/ and tdf/keysplit/ packages for better modularity
- Implements new streaming TDF writer with segment-based caching and out-of-order writing capabilities
- Adds XOR-based key splitting algorithm with comprehensive attribute rule support (allOf, anyOf, hierarchy)
Reviewed Changes
Copilot reviewed 39 out of 39 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/version.go | Removes TDFSpecVersion constant, moving it to new tdf package |
| sdk/tdf_test.go | Updates test imports and assertion constants to use new tdf package |
| sdk/tdf/writer_test.go | Creates empty test file placeholder for new writer package |
| sdk/tdf/writer_e2e_test.go | Comprehensive end-to-end tests for new streaming writer functionality |
| sdk/tdf/writer.go | Main streaming writer implementation with segment-based operations |
| sdk/tdf/version.go | Moves TDF version constants to dedicated tdf package |
| sdk/tdf/reader.go | Placeholder for new reader implementation |
| sdk/tdf/options.go | Configuration options for writer and reader components |
| sdk/tdf/manifest.go | Refactored manifest structures and signature calculation |
| sdk/tdf/keysplit/*.go | Complete XOR key splitting implementation with attribute resolution |
| sdk/tdf/key_access*.go | Key wrapping and access object creation functionality |
| sdk/tdf/assertion*.go | Moves assertion logic to tdf package |
| sdk/tdf.go | Updates main SDK to use new tdf package and segment locator |
Comments suppressed due to low confidence (1)
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| if config.useHex { | ||
| s.InDelta(float64(expectedTdfSizeWithHex), float64(tdfObj.size), 36.0) | ||
| s.InDelta(float64(expectedTdfSizeWithHex), float64(tdfObj.size), 60.0) | ||
| } else { | ||
| s.InDelta(float64(expectedTdfSize), float64(tdfObj.size), 36.0) | ||
| s.InDelta(float64(expectedTdfSize), float64(tdfObj.size), 60.0) |
Copilot
AI
Aug 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The delta tolerance for TDF size assertions has been increased from 36.0 to 60.0. This significant increase (67% larger tolerance) suggests potential instability in size calculations or test brittleness. Consider investigating why a larger tolerance is needed and whether this indicates an underlying issue with size determinism.
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant and valuable refactoring to support streaming TDF creation. The new architecture with SegmentWriter for out-of-order segment writing is a great improvement. My review has identified a critical issue concerning the calculation of ZIP archive metadata (CRC32 and uncompressed size), which could lead to invalid TDF files. The fix requires changes across a few of the new files to ensure plaintext information is available to the archive writer. I have also pointed out a minor issue regarding a debug statement left in a test file. Overall, this is a solid contribution that will be excellent once the correctness issue is addressed.
| // SegmentWriter handles out-of-order segments with deterministic output | ||
| type SegmentWriter interface { | ||
| Writer | ||
| WriteSegment(ctx context.Context, index int, data []byte) ([]byte, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The WriteSegment signature is insufficient for correct ZIP archive generation. It only accepts the segment data (which is encrypted ciphertext), but lacks information about the original plaintext data, such as its size and CRC32 checksum. This leads to incorrect metadata in the generated ZIP file, specifically the uncompressed size and CRC32 fields, which are fundamental to the ZIP format's integrity.
To fix this, the signature should be updated to include the original plaintext data. This change is the first step to addressing related issues in segment_writer.go, zip_primitives.go, and tdf.go.
| WriteSegment(ctx context.Context, index int, data []byte) ([]byte, error) | |
| WriteSegment(ctx context.Context, index int, data []byte, originalData []byte) ([]byte, error) |
| // Calculate original data CRC32 (before encryption) | ||
| originalCRC := crc32.ChecksumIEEE(data) | ||
| originalSize := uint64(len(data)) | ||
|
|
||
| // Create segment buffer for this segment's output | ||
| segmentBuf := sw.getBuffer() | ||
| defer sw.putBuffer(segmentBuf) | ||
|
|
||
| buffer := bytes.NewBuffer(segmentBuf) | ||
|
|
||
| // Deterministic behavior: segment 0 gets ZIP header, others get raw data | ||
| if index == 0 { | ||
| // Segment 0: Write local file header + encrypted data | ||
| if err := sw.writeLocalFileHeader(buffer); err != nil { | ||
| return nil, &Error{Op: "write-segment", Type: "segment", Err: err} | ||
| } | ||
| } | ||
|
|
||
| // All segments: write the encrypted data | ||
| if _, err := buffer.Write(data); err != nil { | ||
| return nil, &Error{Op: "write-segment", Type: "segment", Err: err} | ||
| } | ||
|
|
||
| // AddSegment now handles memory management efficiently with contiguous processing | ||
| // Only unprocessed segments are stored, processed segments are immediately freed | ||
| if err := sw.metadata.AddSegment(index, data, originalSize, originalCRC); err != nil { | ||
| return nil, &Error{Op: "write-segment", Type: "segment", Err: err} | ||
| } | ||
|
|
||
| // Update payload entry metadata | ||
| sw.payloadEntry.Size += originalSize | ||
| sw.payloadEntry.CompressedSize += uint64(len(data)) // Encrypted size | ||
| sw.payloadEntry.CRC32 = crc32.Update(sw.payloadEntry.CRC32, crc32IEEETable, data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a consequence of the insufficient WriteSegment interface (see my comment in sdk/internal/archive/writer.go), this function incorrectly calculates ZIP metadata (uncompressed size and CRC32) based on the encrypted ciphertext instead of the original plaintext. This will result in a corrupt ZIP file.
Specifically:
- Incorrect CRC32 and Size Calculation: On lines 79-80,
originalCRCandoriginalSizeare calculated from thedataparameter, which is the encrypted ciphertext. - Incorrect Payload Size: On line 108,
sw.payloadEntry.Size(which should be the uncompressed size) is incremented by the size of the encrypted data. - Incorrect Payload CRC32: On line 110, the running CRC32 for the payload is updated with the encrypted data.
After updating the interface, this function must be changed to use the originalData for all size and CRC calculations related to the uncompressed payload.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gemini is confused. It believes the encryption at play here is what's defined by the zip spec (zipcrypto). When using encryption according to the zip specification, the CRC tag is calculated over the plaintext.
That obviously doesn't apply to us here.
| segmentBytes, err := tdfWriter.WriteSegment(ctx, segmentIndex, cipherData) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("WriteSegment failed: %w", err) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to tdfWriter.WriteSegment only passes the encrypted cipherData. As noted in other comments, the SegmentWriter needs the original plaintext data to generate a valid ZIP archive.
After modifying the WriteSegment interface in sdk/internal/archive/writer.go, this call must be updated to pass the plaintext data as well. For example:
segmentBytes, err := tdfWriter.WriteSegment(ctx, segmentIndex, cipherData, readBuf.Bytes()[:readSize])| if err := os.WriteFile("/tmp/debug_out_of_order.zip", allBytes, 0o644); err == nil { | ||
| t.Logf("Debug ZIP written to /tmp/debug_out_of_order.zip") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
review of tdf writer, still testable with S4 but blocks some features.
| } | ||
|
|
||
| // Check for duplicate segments using map lookup | ||
| if _, exists := w.segments[index]; exists { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to block this, it would be nice to overwrite segments.
| // uploadToS3(segment0, "part-000") | ||
| // uploadToS3(segment1, "part-001") | ||
| func (w *Writer) WriteSegment(ctx context.Context, index int, data []byte) (*SegmentResult, error) { | ||
| w.mutex.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blocking should only happen where its required, parallel encrypts perf would be killed by this.
| // | ||
| // Performance note: Finalization is O(n) where n is the number of segments. | ||
| // Memory usage is proportional to manifest size, not total data size. | ||
| func (w *Writer) Finalize(ctx context.Context, opts ...Option[*WriterFinalizeConfig]) (*FinalizeResult, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can a config of passing in own segments be added. certain segments may be dropped from the end result.
sdk/tdf/writer.go
Outdated
| } | ||
|
|
||
| // Copy segments to manifest in proper order (map -> slice) | ||
| for i := 0; i <= w.maxSegmentIndex; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rather than maxSegment index it would be nice if we loop through a sorted list in the situation of an extremely sparse map
sdk/tdf/writer.go
Outdated
| var aggregateHash bytes.Buffer | ||
| // Calculate totals and iterate through segments in order (0 to maxSegmentIndex) | ||
| var totalPlaintextSize, totalEncryptedSize int64 | ||
| for i := 0; i <= w.maxSegmentIndex; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually sparse maps aren't supported it seems like.
- allow passing in attributes on new writer - allow passing in default kas on new writer - add crc_combine func instead of holding segments in memory
fix: consistent get manifest
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a major refactoring of the TDF SDK to support streaming TDF creation with out-of-order segment writing. The changes are extensive, introducing new packages for key splitting (keysplit), segment management (segment), and a new archive writer (archive.SegmentWriter). The old TDFWriter is removed and replaced with this more flexible and memory-efficient implementation. The changes are well-tested with new unit tests, integration tests, and benchmarks.
My review focuses on the new streaming architecture. I've found a few issues:
- A potential bug in the
StreamingWriterwhere it doesn't perform the documented 1-based to 0-based index conversion. - Some minor inconsistencies in error handling and implementation details.
- A non-deterministic test.
Overall, this is a very solid and impressive refactoring. The new architecture is much more suitable for large file and streaming use cases. The code is well-structured and the test coverage is excellent.
| func (w *StreamingWriter) WriteSegment(ctx context.Context, segmentIndex int, data []byte) (*tdf.SegmentResult, error) { | ||
| if segmentIndex < 0 { | ||
| return nil, ErrStreamingWriterInvalidPart | ||
| } | ||
|
|
||
| return w.writer.WriteSegment(ctx, segmentIndex, data) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation for StreamingWriter states that it "handles the conversion between S3 part numbers (1-based) and TDF segment indices (0-based)". However, this implementation of WriteSegment passes the segmentIndex directly to the underlying writer without any conversion. If the segmentIndex parameter is intended to be a 1-based part number, it should be converted to a 0-based index before being passed to w.writer.WriteSegment (i.e., segmentIndex - 1). This discrepancy between documentation and implementation can lead to incorrect segment ordering and TDF corruption.
| func (w *StreamingWriter) WriteSegment(ctx context.Context, segmentIndex int, data []byte) (*tdf.SegmentResult, error) { | |
| if segmentIndex < 0 { | |
| return nil, ErrStreamingWriterInvalidPart | |
| } | |
| return w.writer.WriteSegment(ctx, segmentIndex, data) | |
| } | |
| func (w *StreamingWriter) WriteSegment(ctx context.Context, partNumber int, data []byte) (*tdf.SegmentResult, error) { | |
| if partNumber <= 0 { | |
| return nil, ErrStreamingWriterInvalidPart | |
| } | |
| // Convert 1-based part number to 0-based segment index | |
| segmentIndex := partNumber - 1 | |
| return w.writer.WriteSegment(ctx, segmentIndex, data) | |
| } |
| // WithSegments restricts finalization to a contiguous prefix of segments. | ||
| // Indices must form [0..K] with no gaps or duplicates, and no later segments | ||
| // may have been written. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation for WithSegments states that indices must form a contiguous prefix [0..K] with no gaps. However, the underlying implementation in tdf.Writer supports sparse and out-of-order indices. This discrepancy can be misleading for users of this higher-level API. The documentation should be updated to accurately reflect the supported behavior: that it accepts a specific set of segment indices to be included in the final TDF, and the provided order will be the final logical order.
|
|
||
| return &SplitResult{ | ||
| Splits: []Split{{ | ||
| ID: generateSplitID(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When no attributes are provided and a default KAS is used, a single split is generated. In this case, a new UUID is generated for the SplitID. However, in other single-split scenarios (e.g., from createDefaultSplitPlan), the SplitID is intentionally left empty. This inconsistency can be confusing. For single-split TDFs, it's clearer to have a consistent (or consistently empty) SplitID. Consider using an empty string for the SplitID here to align with the behavior in createDefaultSplitPlan.
| ID: generateSplitID(), | |
| ID: "", // Single split does not require a unique ID |
Proposed Changes
This commit introduces a comprehensive refactoring of the TDF SDK architecture
to support streaming TDF creation with out-of-order segment writing, designed
specifically for S3 multipart upload integration.
Key Changes:
Architecture Refactoring:
New Streaming Writer Implementation:
Archive Package Cleanup:
Key Features:
Test Coverage:
This refactoring enables efficient streaming TDF creation for large files
while maintaining backward compatibility with existing TDF readers.
Checklist
Testing Instructions