Skip to content

refactor:OneDegreeGraphScanIterator implements graph proxy partition scan iterator #611

@kitalkuyo-gita

Description

@kitalkuyo-gita

Background

Currently, OneDegreeGraphScanIterator has a pending feature that requires support for graph proxy scan iterators partitioned by label.

Interaction Sequence Diagrams

Before Refactoring - Current Implementation

sequenceDiagram
    participant Client as Client
    participant Proxy as SyncGraphRocksdbProxy
    participant Iterator as OneDegreeGraphScanIterator
    participant VertexIter as VertexScanIterator
    participant EdgeIter as EdgeScanIterator
    
    Client->>Proxy: getOneDegreeGraphIterator(pushdown)
    Proxy->>Proxy: flush()
    Proxy->>VertexIter: getVertexIterator(pushdown)
    Proxy->>EdgeIter: getEdgeIterator(pushdown)
    Proxy->>Iterator: new OneDegreeGraphScanIterator(keyType, vertexIter, edgeIter, pushdown)
    Iterator-->>Proxy: iterator instance
    Proxy-->>Client: OneDegreeGraphIterator
    
    loop Scan Data
        Client->>Iterator: hasNext()
        Iterator->>VertexIter: hasNext()/next()
        Iterator->>EdgeIter: hasNext()/next()
        Iterator->>Iterator: merge vertex and edge data
        Iterator-->>Client: true/false
        Client->>Iterator: next()
        Iterator-->>Client: OneDegreeGraph
    end
Loading

After Refactoring - Label Partition Support

sequenceDiagram
    participant Client as Client
    participant Proxy as SyncGraphLabelPartitionProxy
    participant Iterator as OneDegreeGraphScanIterator
    participant PartitionMgr as PartitionManager
    participant VertexIter as MultiPartitionVertexIterator
    participant EdgeIter as MultiPartitionEdgeIterator
    participant CF1 as ColumnFamily_Label1
    participant CF2 as ColumnFamily_Label2
    
    Client->>Proxy: getOneDegreeGraphIterator(pushdown)
    Proxy->>Proxy: flush()
    Proxy->>PartitionMgr: parseLabels(filter)
    PartitionMgr-->>Proxy: labelList
    
    par Create Partition Iterators in Parallel
        Proxy->>CF1: getVertexIterator(label1)
        Proxy->>CF2: getVertexIterator(label2)
        Proxy->>CF1: getEdgeIterator(label1)
        Proxy->>CF2: getEdgeIterator(label2)
    end
    
    Proxy->>VertexIter: new MultiPartitionVertexIterator(cfIterators)
    Proxy->>EdgeIter: new MultiPartitionEdgeIterator(cfIterators)
    Proxy->>Iterator: new OneDegreeGraphScanIterator(keyType, vertexIter, edgeIter, pushdown)
    Iterator-->>Proxy: iterator instance
    Proxy-->>Client: OneDegreeGraphIterator
    
    loop Scan Data
        Client->>Iterator: hasNext()
        Iterator->>VertexIter: hasNext()/next()
        VertexIter->>CF1: scan partition 1
        VertexIter->>CF2: scan partition 2
        Iterator->>EdgeIter: hasNext()/next()
        EdgeIter->>CF1: scan partition 1
        EdgeIter->>CF2: scan partition 2
        Iterator->>Iterator: cross-partition data merge
        Iterator-->>Client: true/false
        Client->>Iterator: next()
        Iterator-->>Client: OneDegreeGraph
    end
Loading

Before/After Comparison

Change Point 1: Partition Awareness

Before: Current implementation only has TODO comments, no partition scanning support

After: Supports parallel scanning of multiple label partitions with intelligent partition selection based on filter conditions

Change Point 2: Data Source Management

Before: Uses single vertex and edge iterators

After: Uses multi-partition iterators supporting cross-partition data aggregation

Change Point 3: Scanning Strategy

Before: Sequential scanning of single data source

After: Parallel scanning of multiple partitions with streaming data merge

Benefits Analysis

1. Performance Benefits

  • Parallel Scanning: Multiple partitions scanned simultaneously, improving overall throughput
  • Intelligent Filtering: Only scan relevant partitions based on label filter conditions, reducing I/O overhead
  • Memory Optimization: Streaming processing avoids large data caching

2. Scalability Benefits

  • Partition Isolation: Different label data physically isolated, facilitating independent scaling
  • Load Balancing: Dynamic scanning strategy adjustment based on partition sizes
  • Fault Tolerance: Single partition failure doesn't affect other partition access

3. Maintainability Benefits

  • Code Reuse: Reuses existing partition proxy architecture
  • Unified Interface: Maintains compatibility with existing iterator interfaces
  • Test-Friendly: Independent testing at partition level

Required Interface Changes

1. Core Iterator Interface Extensions

// Additions needed in OneDegreeGraphScanIterator
public class OneDegreeGraphScanIterator<K, VV, EV> {
    // New partition-aware constructor
    public OneDegreeGraphScanIterator(
        IType<K> keyType,
        List<CloseableIterator<IVertex<K, VV>>> vertexIterators,
        List<CloseableIterator<IEdge<K, EV>>> edgeIterators,
        IStatePushDown pushdown) { ... }
    
    // New partition merge logic
    private OneDegreeGraph<K, VV, EV> mergeFromPartitions() { ... }
}

2. Partition Proxy Interface Enhancements

Based on existing implementation, need to enhance:

// Enhanced SyncGraphLabelPartitionProxy
public class SyncGraphLabelPartitionProxy<K, VV, EV> {
    // New multi-partition iterator support
    public CloseableIterator<OneDegreeGraph<K, VV, EV>> getOneDegreeGraphIterator(
        IStatePushDown pushdown) {
        // Implement partition-aware one-degree graph scanning
    }
}

3. Multi-Partition Iterator Interface

// New interface
public interface IMultiPartitionIterator<T> extends CloseableIterator<T> {
    List<String> getActivePartitions();
    void addPartition(String partitionName, CloseableIterator<T> iterator);
    void removePartition(String partitionName);
}

4. ProxyBuilder Interface Updates

Based on existing TODO comments, need to complete async graph proxy support:

public class ProxyBuilder {
    // Complete async support for label partitions
    public static <K, VV, EV> IGraphRocksdbProxy<K, VV, EV> build(...) {
        if (partitionType == PartitionType.LABEL) {
            // Remove TODO, implement async graph proxy
            return new AsyncGraphLabelPartitionProxy<>(rocksdbClient, encoder, config);
        }
    }
}

Technical Architecture Analysis

1. Existing Partition Proxy Architecture

The system has implemented two main partition proxies:

  • SyncGraphDtPartitionProxy: A graph proxy partitioned by timestamp
  • SyncGraphLabelPartitionProxy: A graph proxy partitioned by label

These proxies are built and managed using ProxyBuilder.

2. Scan Iterator System

Current scan iterators include:

  • VertexScanIterator: Vertex Scan Iterator
  • EdgeScanIterator: Edge Scan Iterator
  • OneDegreeGraphScanIterator: One-Degree Graph Scan Iterator

Detailed Implementation Plan

Phase 1: Interface Design and Extension

  1. Extending the IOneDegreeGraphIterator Interface
  • Adding partition-aware method signatures
  • Supporting multi-partition parallel scans
  • Adding partition filtering conditions
  1. Designing a Partition Scan Strategy
  • Defining the Scan Order for Label Partitions
  • Implementing Data Merging Logic Between Partitions
  • Handling Cross-Partition One-Degree Graph Construction

Phase 2: Core Implementation

  1. Enhancing the OneDegreeGraphScanIterator Class
  • Implementing Support for SyncGraphLabelPartitionProxy
  • Adding partition-aware scan logic
  • Implementing a multi-partition data aggregation algorithm
  1. Partition Scan Algorithm Design
  • Parallel Scan: Scanning multiple label partitions simultaneously
  • Data merging: Merge vertex and edge data from different partitions into a complete first-degree graph
  • Memory optimization: Use stream processing to avoid memory overflows
  1. Error handling and fault tolerance
  • Downgrade strategy for partition unavailability
  • Data consistency check
  • Exception recovery mechanism

Phase 3: Performance Optimization

  1. Caching strategy
  • Implementing partition-level caching
  • Intelligent prefetching algorithm
  • LRU cache eviction strategy
  1. Concurrency control
  • Partition-level read/write locks
  • Lock-free data structure optimization
  • Thread pool management

Phase 4: Integration testing

  1. Unit testing
  • Independent scan testing of each partition
  • Cross-partition data consistency testing
  • Boundary condition testing
  1. Performance testing
  • Large-scale dataset scanning performance
  • Concurrent access stress testing
  • Memory efficiency testing
  1. Integration testing
  • Compatibility with existing RocksDB storage
  • Verifying Multi-Version Graph Support
  • Integration Testing with Paimon Storage

Technical Challenges and Solutions

1. Cross-Partition Data Consistency

Challenge: Ensure that data read from partitions with different labels can be correctly combined into a degree-one graph
Solution: Implement a vertex ID-based data merging algorithm to ensure the correct association between edges and vertices

2. Performance Optimization

Challenge: Multi-partition scans may result in performance degradation
Solution: Adopt a parallel scan + streaming merge strategy to minimize memory usage

3. Compatibility Guarantee

Challenge: Maintain backward compatibility with existing code
Solution: Adopt the adapter pattern to provide a unified interface for different partition types

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions