diff --git a/.github/workflows/labeler/label-scope-conf.yml b/.github/workflows/labeler/label-scope-conf.yml index 67f3c40cb37..d03ce21b576 100644 --- a/.github/workflows/labeler/label-scope-conf.yml +++ b/.github/workflows/labeler/label-scope-conf.yml @@ -327,3 +327,9 @@ sensorsdata: - changed-files: - any-glob-to-any-file: seatunnel-connectors-v2/connector-sensorsdata/** - all-globs-to-all-files: '!seatunnel-connectors-v2/connector-!(sensorsdata)/**' + +hugegraph: + - all: + - changed-files: + - any-glob-to-any-file: seatunnel-connectors-v2/connector-hugegraph/** + - all-globs-to-all-files: '!seatunnel-connectors-v2/connector-!(hugegraph)/**' diff --git a/config/plugin_config b/config/plugin_config index 8d3dd7dbe2f..09a6bb25577 100644 --- a/config/plugin_config +++ b/config/plugin_config @@ -96,3 +96,4 @@ connector-qdrant connector-typesense connector-cdc-opengauss connector-sensorsdata +connector-hugegraph \ No newline at end of file diff --git a/docs/en/connector-v2/changelog/connector-hugegraph.md b/docs/en/connector-v2/changelog/connector-hugegraph.md new file mode 100644 index 00000000000..5bdc416dcf9 --- /dev/null +++ b/docs/en/connector-v2/changelog/connector-hugegraph.md @@ -0,0 +1,7 @@ +
Change Log + +| Change | Commit | Version | +| --- | --- |---------| +|[Feature][Connector-V2] Support sink connector for Apache HugeGraph|https://github.com/apache/seatunnel/pull/10002/commits/002a653d11f48c3f76b47db23f5f2a68bc9d690c| 2.3.12 | + +
diff --git a/docs/en/connector-v2/sink/HugeGraph.md b/docs/en/connector-v2/sink/HugeGraph.md new file mode 100644 index 00000000000..6f1ef4bdfad --- /dev/null +++ b/docs/en/connector-v2/sink/HugeGraph.md @@ -0,0 +1,180 @@ +import ChangeLog from '../changelog/connector-hugegraph.md'; + +# HugeGraph Sink Connector + +`Sink: HugeGraph` + +## Description + +The HugeGraph sink connector allows you to write data from SeaTunnel to Apache HugeGraph, a fast and scalable graph database. + +This connector supports writing data as vertices or edges, providing flexible mapping from relational data models to graph structures. It is designed for high-performance data loading. + +## Features + +- **Batch Writing**: Data is written in batches for high throughput. +- **Flexible Mapping**: Supports flexible mapping of source fields to vertex/edge properties. +- **Vertex and Edge Writing**: Can write data as either vertices or edges. +- **Automatic Schema Creation**: Can automatically create graph schema elements (property keys, vertex labels, edge labels) if they do not exist. + +## Configuration Options + +| Name | Type | Required | Default Value | Description | +| ------------------- | ------- | -------- | ------------- |--------------------------------------------------------------------------------| +| `host` | String | Yes | - | The host of the HugeGraph server. | +| `port` | Integer | Yes | - | The port of the HugeGraph server. | +| `graph_name` | String | Yes | - | The name of the graph to write to. | +| `graph_space` | String | Yes | - | The graph space of the graph to be operated on. | +| `username` | String | No | - | The username for HugeGraph authentication. | +| `password` | String | No | - | The password for HugeGraph authentication. | +| `batch_size` | Integer | No | 500 | The number of records to buffer before writing to HugeGraph in a single batch. | +| `batch_interval_ms` | Integer | No | 5000 | The maximum time in milliseconds to wait before flushing a batch. | +| `max_retries` | Integer | No | 3 | The maximum number of times to retry a failed write operation. | +| `retry_backoff_ms` | Integer | No | 5000 | The backoff time between retries in milliseconds. | + +## Sink Options + +| Name | Type | Required | Default Value | Description | +| ------------------ | ------ | -------- | ------------- |-----------------------------------------------------------------------------------------------------| +| `schema_config` | Object | Yes | - | The configuration for mapping the input data to HugeGraph's schema (vertices or edges). | +| `selected_fields` | List | No | - | A list of fields to be selected from the input data. If not specified, all fields will be used. | +| `ignored_fields` | List | No | - | A list of fields to be ignored from the input data. Mutually exclusive with `selected_fields`. | + +### Schema Configuration (`schema_config`) + +Each object in the `schema_config` list defines a mapping from the source data to a specific vertex or edge label in HugeGraph. + +| Name | Type | Required | Default Value | Description | +| ------------------ |--------------------| ---------- | ------------- |----------------------------------------------------------------------------------------------------------| +| `type` | String | Yes | - | The type of graph element to map to. Must be `VERTEX` or `EDGE`. | +| `label` | String | Yes | - | The label of the vertex or edge in HugeGraph. | +| `properties` | `List` | No | - | A list of source field names for the vertex or edge. | +| `ttl` | Long | No | - | The time-to-live for the vertex or edge in seconds. | +| `ttlStartTime` | String | No | - | The start time for the TTL. | +| `enableLabelIndex` | Boolean | No | `false` | Whether to enable label index for this label. | +| `userdata` | `Map` | No | - | User-defined data associated with the label. | +| `idStrategy` | String | For Vertex | - | The ID generation strategy for vertices. Supported values: `PRIMARY_KEY`, `CUSTOMIZE_UUID`, `AUTOMATIC`. | +| `idFields` | `List` | For Vertex | - | A list of source field names used to generate the vertex ID. | +| `sourceConfig` | Object | For Edge | - | An object defining the mapping for the edge's source vertex. See `Source/Target Config` below. | +| `targetConfig` | Object | For Edge | - | An object defining the mapping for the edge's target vertex. See `Source/Target Config` below. | +| `frequency` | String | For Edge | - | The frequency of the edge, e.g., `SINGLE`, `MULTIPLE`. | +| `mapping` | Object | No | - | An object defining advanced field and value mappings. See `Mapping Config` below. | + +### Source/Target Config (`sourceConfig` and `targetConfig`) + +This object is used within an `EDGE` schema to define how to identify the source and target vertices. + +| Name | Type | Required | Default Value | Description | +| ---------- | ------------ | -------- | ------------- |--------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `label` | String | Yes | - | The label of the source or target vertex. | +| `idFields` | `List` | Yes | - | A list of source field names from the input row used to construct the ID of the source/target vertex. The values will be concatenated to form the vertex ID. | + +### Mapping Config (`mapping`) + +This object provides advanced control over how fields and values are mapped to properties. + +| Name | Type | Required | Default Value | Description | +| ----------------- |---------------------|----------| ------------- |-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `fieldMapping` | `Map` | No | - | A map where the key is the source field name and the value is the target property name in HugeGraph. If not specified, the source field name is used as the target property name. | +| `valueMapping` | `Map` | No | - | A map to transform specific field values. The key is the original value from the source, and the value is the new value to be written. | +| `nullableKeys` | `List` | No | - | A list of property keys that can have null values. | +| `nullValues` | `List` | No | - | A list of string values that should be treated as `null`. Any field containing one of these values will not be written. | +| `dateFormat` | String | No | `yyyy-MM-dd` | The date format for parsing date strings. | +| `timeZone` | String | No | `GMT+8` | The time zone for date parsing. | +| `sortKeys` | `List` | For Edge | - | A list of property keys to sort edges with the same source and target vertices. | + +## Usage Examples + +### 1. Writing Vertices + +This example shows how to read from a `FakeSource` and write `person` vertices to HugeGraph. The vertex ID is based on the `name` field. + +```hocon +env { + job.mode = "BATCH" +} + +source { + FakeSource { + plugin_input = "fake_source" + schema = { + fields = { + name = "string" + age = "int" + } + } + } +} + +sink { + HugeGraph { + host = "localhost" + port = 8080 + graph_name = "hugegraph" + graph_space = "default" + selected_fields = ["name", "age"] + schema_config = { + type = "VERTEX" + label = "person" + idStrategy = "PRIMARY_KEY" + idFields = ["name"] + properties = ["name", "age"] + } + } +} +``` + +### 2. Writing Edges + +This example syncs a relationship table to `knows` edges in HugeGraph. The source table contains the names of the two people who know each other and the year they met. + +```hocon +env { + job.mode = "BATCH" +} + +source { + FakeSource { + plugin_input = "fake_source" + schema = { + fields = { + person1_name = "string" + person2_name = "string" + since = "int" + } + } + } +} + +sink { + HugeGraph { + host = "localhost" + port = 8080 + graph_name = "hugegraph" + graph_space = "default" + schema_config = { + type = "EDGE" + label = "knows" + sourceConfig = { + label = "person" + idFields = ["person1_name"] + } + targetConfig = { + label = "person" + idFields = ["person2_name"] + } + properties = ["since"] + mapping = { + fieldMapping = { + person1_name = "name" + person2_name = "name" + } + } + } + } +} +``` + +## Changelog + + diff --git a/docs/zh/connector-v2/changelog/connector-hugegraph.md b/docs/zh/connector-v2/changelog/connector-hugegraph.md new file mode 100644 index 00000000000..5bdc416dcf9 --- /dev/null +++ b/docs/zh/connector-v2/changelog/connector-hugegraph.md @@ -0,0 +1,7 @@ +
Change Log + +| Change | Commit | Version | +| --- | --- |---------| +|[Feature][Connector-V2] Support sink connector for Apache HugeGraph|https://github.com/apache/seatunnel/pull/10002/commits/002a653d11f48c3f76b47db23f5f2a68bc9d690c| 2.3.12 | + +
diff --git a/docs/zh/connector-v2/sink/HugeGraph.md b/docs/zh/connector-v2/sink/HugeGraph.md new file mode 100644 index 00000000000..a6bba58e0cb --- /dev/null +++ b/docs/zh/connector-v2/sink/HugeGraph.md @@ -0,0 +1,180 @@ +import ChangeLog from '../changelog/connector-hugegraph.md'; + +# HugeGraph Sink Connector + +`Sink: HugeGraph` + +## 描述 + +HugeGraph sink连接器允许您将数据从SeaTunnel写入Apache HugeGraph,这是一个快速且可扩展的图数据库。 + +该连接器支持将数据作为顶点或边写入,提供了从关系数据模型到图结构的灵活映射。它专为高性能数据加载而设计。 + +## 特性 + +- **批量写入**: 数据分批写入,以实现高吞吐量。 +- **灵活映射**: 支持将源字段灵活映射到顶点/边属性。 +- **顶点和边写入**: 可以将数据作为顶点或边写入。 +- **自动创建Schema**: 如果不存在,可以自动创建图Schema元素(属性键、顶点标签、边标签)。 + +## 配置选项 + +| 名称 | 类型 | 是否必须 | 默认值 | 描述 | +| ------------------- | ------- | -------- | ------ | ---------------------------------------------------------------------- | +| `host` | String | 是 | - | HugeGraph服务器的主机。 | +| `port` | Integer | 是 | - | HugeGraph服务器的端口。 | +| `graph_name` | String | 是 | - | 要写入的图的名称。 | +| `graph_space` | String | 是 | - | 要操作的图的图空间。 | +| `username` | String | 否 | - | 用于HugeGraph身份验证的用户名。 | +| `password` | String | 否 | - | 用于HugeGraph身份验证的密码。 | +| `batch_size` | Integer | 否 | 500 | 在单批次写入HugeGraph之前缓冲的记录数。 | +| `batch_interval_ms` | Integer | 否 | 5000 | 刷新批次前等待的最大时间(毫秒)。 | +| `max_retries` | Integer | 否 | 3 | 重试失败写入操作的最大次数。 | +| `retry_backoff_ms` | Integer | 否 | 5000 | 重试之间的退避时间(毫秒)。 | + +## Sink选项 + +| 名称 | 类型 | 是否必须 | 默认值 | 描述 | +| ------------------ | ------ | -------- | ------ | -------------------------------------------------------------------- | +| `schema_config` | Object | 是 | - | 将输入数据映射到HugeGraph的Schema(顶点或边)的配置。 | +| `selected_fields` | List | 否 | - | 要从输入数据中选择的字段列表。如果未指定,将使用所有字段。 | +| `ignored_fields` | List | 否 | - | 要从输入数据中忽略的字段列表。与`selected_fields`互斥。 | + +### Schema配置 (`schema_config`) + +`schema_config`列表中的每个对象都定义了从源数据到HugeGraph中特定顶点或边标签的映射。 + +| 名称 | 类型 | 是否必须 | 默认值 | 描述 | +| ------------------ | ------------------- | -------- | ------- |------------------------------------------------------------| +| `type` | String | 是 | - | 要映射到的图元素的类型。必须是`VERTEX`或`EDGE`。 | +| `label` | String | 是 | - | HugeGraph中顶点或边的标签。 | +| `properties` | `List` | 否 | - | 顶点或边的源字段名称列表。 | +| `ttl` | Long | 否 | - | 顶点或边的生存时间(秒)。 | +| `ttlStartTime` | String | 否 | - | TTL的开始时间。 | +| `enableLabelIndex` | Boolean | 否 | `false` | 是否为此标签启用标签索引。 | +| `userdata` | `Map` | 否 | - | 与标签关联的用户定义数据。 | +| `idStrategy` | String | 对于顶点 | - | 顶点的ID生成策略。支持的值:`PRIMARY_KEY`、`CUSTOMIZE_UUID`、`AUTOMATIC`。 | +| `idFields` | `List` | 对于顶点 | - | 用于生成顶点ID的源字段名称列表。 | +| `sourceConfig` | Object | 对于边 | - | 定义边的源顶点映射的对象。请参阅下面的`Source/Target Config`。 | +| `targetConfig` | Object | 对于边 | - | 定义边的目标顶点映射的对象。请参阅下面的`Source/Target Config`。 | +| `frequency` | String | 对于边 | - | 边的频率,例如`SINGLE`、`MULTIPLE`。 | +| `mapping` | Object | 否 | - | 定义高级字段和值映射的对象。请参阅下面的`Mapping Config`。 | + +### Source/Target配置 (`sourceConfig` 和 `targetConfig`) + +此对象在`EDGE` Schema中使用,用于定义如何识别源顶点和目标顶点。 + +| 名称 | 类型 | 是否必须 | 默认值 | 描述 | +| ---------- | ------------ | -------- | ------ | -------------------------------------------------------------------------------------------------------------------------------------------- | +| `label` | String | 是 | - | 源或目标顶点的标签。 | +| `idFields` | `List` | 是 | - | 用于构造源/目标顶点ID的输入行中的源字段名称列表。这些值将被连接起来形成顶点ID。 | + +### Mapping配置 (`mapping`) + +此对象提供对字段和值如何映射到属性的高级控制。 + +| 名称 | 类型 | 是否必须 | 默认值 | 描述 | +| ----------------- | ------------------ | -------- | ------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `fieldMapping` | `Map` | 否 | - | 一个映射,其中键是源字段名,值是HugeGraph中的目标属性名。如果未指定,则使用源字段名作为目标属性名。 | +| `valueMapping` | `Map` | 否 | - | 用于转换特定字段值的映射。键是源的原始值,值是要写入的新值。 | +| `nullableKeys` | `List` | 否 | - | 可以具有null值的属性键列表。 | +| `nullValues` | `List` | 否 | - | 应被视为`null`的字符串值列表。任何包含这些值的字段都不会被写入。 | +| `dateFormat` | String | 否 | `yyyy-MM-dd` | 用于解析日期字符串的日期格式。 | +| `timeZone` | String | 否 | `GMT+8` | 用于日期解析的时区。 | +| `sortKeys` | `List` | 对于边 | - | 用于对具有相同源和目标顶点的边进行排序的属性键列表。 | + +## 使用示例 + +### 1. 写入顶点 + +此示例展示了如何从`FakeSource`读取数据并将`person`顶点写入HugeGraph。顶点ID基于`name`字段。 + +```hocon +env { + job.mode = "BATCH" +} + +source { + FakeSource { + plugin_input = "fake_source" + schema = { + fields = { + name = "string" + age = "int" + } + } + } +} + +sink { + HugeGraph { + host = "localhost" + port = 8080 + graph_name = "hugegraph" + graph_space = "default" + selected_fields = ["name", "age"] + schema_config = { + type = "VERTEX" + label = "person" + idStrategy = "PRIMARY_KEY" + idFields = ["name"] + properties = ["name", "age"] + } + } +} +``` + +### 2. 写入边 + +此示例将一个关系表同步为HugeGraph中的`knows`边。源表包含相互认识的两个人的姓名以及他们相识的年份。 + +```hocon +env { + job.mode = "BATCH" +} + +source { + FakeSource { + plugin_input = "fake_source" + schema = { + fields = { + person1_name = "string" + person2_name = "string" + since = "int" + } + } + } +} + +sink { + HugeGraph { + host = "localhost" + port = 8080 + graph_name = "hugegraph" + graph_space = "default" + schema_config = { + type = "EDGE" + label = "knows" + sourceConfig = { + label = "person" + idFields = ["person1_name"] + } + targetConfig = { + label = "person" + idFields = ["person2_name"] + } + properties = ["since"] + mapping = { + fieldMapping = { + person1_name = "name" + person2_name = "name" + } + } + } + } +} +``` + +## Changelog + + diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 8a604a04773..4172161f5e4 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -149,6 +149,7 @@ seatunnel.source.GraphQL = connector-graphql seatunnel.sink.GraphQL = connector-graphql seatunnel.sink.Aerospike = connector-aerospike seatunnel.sink.SensorsData = connector-sensorsdata +seatunnel.sink.HugeGraph = connector-hugegraph # For custom transforms, make sure to use the seatunnel.transform.[PluginIdentifier]=[JarPerfix] naming convention. For example: # seatunnel.transform.Sql = seatunnel-transforms-v2 diff --git a/pom.xml b/pom.xml index 11df2d57c7e..eacd81f028c 100644 --- a/pom.xml +++ b/pom.xml @@ -145,6 +145,7 @@ 2.12.15 9.4.56.v20240826 4.0.4 + 1.5.0 false true @@ -167,6 +168,7 @@ 2.31.30 15.0.1 4.12.0 + 3.6.0 @@ -526,6 +528,12 @@ ${arrow.version} + + org.apache.hugegraph + hugegraph-client + ${hugegraph.client.version} + + diff --git a/seatunnel-connectors-v2/CONNECTOR_ARCHITECTURE.md b/seatunnel-connectors-v2/CONNECTOR_ARCHITECTURE.md new file mode 100644 index 00000000000..4bef58f78ca --- /dev/null +++ b/seatunnel-connectors-v2/CONNECTOR_ARCHITECTURE.md @@ -0,0 +1,357 @@ +# SeaTunnel Connector 架构设计分析 + +## 一、核心数据结构设计 + +### 1. SeaTunnelRow - 统一数据模型 + +#### 数据结构组成 +```java +public final class SeaTunnelRow implements Serializable { + // 核心字段 + private String tableId = ""; // 表标识符(多表支持) + private RowKind rowKind = RowKind.INSERT; // 行变更类型(CDC支持) + private final Object[] fields; // 实际数据数组 + private Map options; // 扩展选项 + private volatile int size; // 缓存的字节大小 +} +``` + +#### RowKind 枚举(CDC支持) +- `INSERT (+I)`: 插入操作 +- `UPDATE_BEFORE (-U)`: 更新前的旧数据 +- `UPDATE_AFTER (+U)`: 更新后的新数据 +- `DELETE (-D)`: 删除操作 + +#### 关键特性 +1. **轻量级设计**: 基于 Object[] 数组,避免复杂的对象嵌套 +2. **多表支持**: 通过 tableId 区分不同表的数据 +3. **CDC原生支持**: RowKind 内置变更类型 +4. **字节大小计算**: 支持内存管理和批量控制 +5. **灵活扩展**: options Map 支持自定义元数据 + +### 2. SeaTunnelRowType - Schema定义 + +```java +public class SeaTunnelRowType implements CompositeType { + private final String[] fieldNames; // 字段名称 + private final SeaTunnelDataType[] fieldTypes; // 字段类型 +} +``` + +支持的数据类型: +- 基础类型:BOOLEAN, BYTE, SHORT, INT, BIGINT, FLOAT, DOUBLE, STRING +- 时间类型:DATE, TIME, TIMESTAMP, TIMESTAMP_TZ +- 复杂类型:ARRAY, MAP, ROW(嵌套) +- 特殊类型:DECIMAL, BYTES, NULL + +## 二、Connector 核心接口设计 + +### Source 端架构 + +``` +SeaTunnelSource + ├── createEnumerator() // 创建分片枚举器 + ├── createReader() // 创建数据读取器 + ├── restoreEnumerator() // 从检查点恢复 + ├── getBoundedness() // 有界/无界流 + └── getProducedCatalogTables() // 表元数据 +``` + +### Sink 端架构 + +``` +SeaTunnelSink + ├── createWriter() // 创建写入器 + ├── createCommitter() // 创建提交器(事务) + ├── getSaveModeHandler() // 保存模式处理 + └── getWriteCatalogTable() // 写入表信息 +``` + +## 三、实现新的 KV Connector 关键点 + +### 1. 核心实现要点 + +#### Source 实现 +```java +public class MyKVSource extends AbstractSingleSplitSource { + @Override + public AbstractSingleSplitReader createReader( + SingleSplitReaderContext context) { + return new MyKVSourceReader(parameters, context, deserializationSchema); + } +} +``` + +#### Sink 实现 +```java +public class MyKVSink extends AbstractSimpleSink { + @Override + public MyKVSinkWriter createWriter(SinkWriter.Context context) { + return new MyKVSinkWriter(seaTunnelRowType, parameters); + } +} +``` + +### 2. 关键设计决策 + +#### A. Key 生成策略 +```java +// 1. 固定 Key +String key = "fixed_key"; + +// 2. 字段值作为 Key +String key = row.getField(keyFieldIndex).toString(); + +// 3. 模板化 Key(支持占位符) +String keyTemplate = "user:{id}:profile:{type}"; +String key = generateKey(keyTemplate, row, fieldNames); +``` + +#### B. 值序列化策略 +```java +// JSON 序列化(默认) +SerializationSchema schema = new JsonSerializationSchema(rowType); +byte[] value = schema.serialize(row); + +// 自定义字段映射 +String value = row.getField(valueFieldIndex).toString(); + +// Hash 类型(多字段) +Map hashValue = new HashMap<>(); +hashValue.put(hashKeyField, row.getField(hashKeyIndex).toString()); +hashValue.put(hashValueField, row.getField(hashValueIndex).toString()); +``` + +#### C. 批量操作优化 +```java +public class MyKVSinkWriter extends AbstractSinkWriter { + private final List keyBuffer; + private final List valueBuffer; + private final int batchSize; + + @Override + public void write(SeaTunnelRow element) { + keyBuffer.add(generateKey(element)); + valueBuffer.add(generateValue(element)); + + if (keyBuffer.size() >= batchSize) { + flush(); + } + } + + private void flush() { + // 批量写入实现 + client.batchWrite(keyBuffer, valueBuffer); + clearBuffers(); + } +} +``` + +### 3. 主要难点及解决方案 + +#### 难点1:数据类型映射 +**问题**:结构化数据 → KV 存储的转换 +**解决方案**: +```java +// 1. 扁平化嵌套结构 +private Map flattenRow(SeaTunnelRow row, SeaTunnelRowType type) { + Map flat = new HashMap<>(); + for (int i = 0; i < type.getTotalFields(); i++) { + String key = type.getFieldName(i); + Object value = row.getField(i); + if (value instanceof SeaTunnelRow) { + // 递归处理嵌套 + flat.putAll(flattenRow((SeaTunnelRow) value, ...)); + } else { + flat.put(key, convertToString(value)); + } + } + return flat; +} +``` + +#### 难点2:事务一致性 +**问题**:KV 数据库通常不支持事务 +**解决方案**: +```java +// 1. 批量原子操作 +public void commit() { + // 使用 Pipeline/Batch 确保批量原子性 + try (Pipeline pipeline = client.pipelined()) { + for (int i = 0; i < keys.size(); i++) { + pipeline.set(keys.get(i), values.get(i)); + } + pipeline.sync(); + } +} + +// 2. 两阶段提交(如果支持) +public class MyKVSinkCommitter implements SinkCommitter { + @Override + public List commit(List commitInfos) { + // 实现提交逻辑 + } +} +``` + +#### 难点3:Schema 演进 +**问题**:KV 存储通常无 Schema +**解决方案**: +```java +// 1. 版本化 Key +String versionedKey = key + ":v" + schemaVersion; + +// 2. 元数据存储 +client.set(key + ":schema", JsonUtils.toJsonString(schema)); + +// 3. 兼容性读取 +public SeaTunnelRow read(String key) { + String schemaJson = client.get(key + ":schema"); + Schema schema = parseSchema(schemaJson); + String value = client.get(key); + return deserialize(value, schema); +} +``` + +#### 难点4:性能优化 +**问题**:网络 RTT 开销 +**解决方案**: +```java +// 1. 连接池管理 +private final GenericObjectPool connectionPool; + +// 2. 异步写入 +CompletableFuture future = CompletableFuture.runAsync(() -> { + client.write(key, value); +}); + +// 3. 批量预聚合 +Map> groupedData = data.stream() + .collect(Collectors.groupingBy(this::generateKey)); +``` + +### 4. Factory 模式实现 + +```java +@AutoService(Factory.class) +public class MyKVSinkFactory implements TableSinkFactory { + @Override + public String factoryIdentifier() { + return "MyKV"; + } + + @Override + public TableSink createSink(TableSinkFactoryContext context) { + return () -> new MyKVSink(context.getOptions(), context.getCatalogTable()); + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(HOST, PORT, KEY_FIELD) + .optional(BATCH_SIZE, EXPIRE_TIME, VALUE_FORMAT) + .conditional(MODE, CLUSTER, NODES) + .build(); + } +} +``` + +## 四、最佳实践建议 + +### 1. 配置设计 +```hocon +sink { + MyKV { + # 连接配置 + host = "localhost" + port = 6379 + auth = "password" + + # Key 配置 + key = "data:{id}:{date}" # 支持占位符 + support_custom_key = true + + # 值配置 + data_type = "hash" # string/hash/list/set/zset + value_field = "content" + format = "json" + + # 性能配置 + batch_size = 100 + connection_pool_size = 10 + timeout_ms = 5000 + + # 可靠性配置 + max_retries = 3 + expire_seconds = 3600 + } +} +``` + +### 2. 错误处理 +```java +public class MyKVSinkWriter { + private void writeWithRetry(String key, String value) { + int attempts = 0; + Exception lastException = null; + + while (attempts < maxRetries) { + try { + client.write(key, value); + return; + } catch (Exception e) { + lastException = e; + attempts++; + if (attempts < maxRetries) { + Thread.sleep(getBackoffTime(attempts)); + } + } + } + + handleWriteFailure(key, value, lastException); + } +} +``` + +### 3. 监控指标 +```java +public class MyKVSinkWriter { + // 关键指标 + private final Counter writtenRecords; + private final Counter failedRecords; + private final Histogram batchSize; + private final Timer writeLatency; + + private void recordMetrics() { + writtenRecords.inc(batch.size()); + batchSize.update(batch.size()); + try (Timer.Context ignored = writeLatency.time()) { + flush(); + } + } +} +``` + +## 五、参考实现 + +### 已有 KV Connector 特点 + +| Connector | Key策略 | 值格式 | 批量支持 | 事务支持 | 特色功能 | +|-----------|---------|--------|----------|----------|----------| +| Redis | 模板化/自定义 | JSON/String | Pipeline | 批量原子 | 多数据类型 | +| Cassandra | 主键映射 | Row → CQL | Batch Statement | 批量CQL | Schema管理 | +| DynamoDB | 分区键+排序键 | Item | BatchWrite | 条件写入 | 自动重试 | +| MongoDB | _id 字段 | Document | BulkWrite | 事务(4.0+) | 嵌套文档 | + +## 六、总结 + +实现新的 KV Connector 需要重点关注: + +1. **数据模型转换**:SeaTunnelRow → KV 的映射策略 +2. **Key 生成机制**:灵活且高效的 Key 设计 +3. **批量操作**:减少网络开销,提高吞吐量 +4. **错误恢复**:重试、容错、一致性保证 +5. **性能优化**:连接池、异步、预聚合 +6. **配置友好**:直观的参数设计 + +通过遵循 SeaTunnel 的设计模式和接口规范,可以快速实现高质量的 KV Connector。 diff --git a/seatunnel-connectors-v2/connector-hugegraph/pom.xml b/seatunnel-connectors-v2/connector-hugegraph/pom.xml new file mode 100644 index 00000000000..d8cf4077fd0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hugegraph/pom.xml @@ -0,0 +1,73 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connectors-v2 + ${revision} + + + connector-hugegraph + SeaTunnel : Connectors V2 : HugeGraph + + + 1.5.0 + + + + + org.apache.seatunnel + connector-common + ${project.version} + + + + org.apache.hugegraph + hugegraph-client + ${hugegraph.client.version} + + + + org.apache.hugegraph + hugegraph-common + ${hugegraph.client.version} + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + ${hugegraph.client.version} + + + + + + + + diff --git a/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/buffer/BatchBuffer.java b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/buffer/BatchBuffer.java new file mode 100644 index 00000000000..7ccc4f26c71 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/buffer/BatchBuffer.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hugegraph.buffer; + +import org.apache.seatunnel.connectors.seatunnel.hugegraph.client.HugeGraphClient; + +import org.apache.hugegraph.structure.GraphElement; +import org.apache.hugegraph.structure.graph.Edge; +import org.apache.hugegraph.structure.graph.Vertex; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class BatchBuffer implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(BatchBuffer.class); + + private final List buffer = new ArrayList<>(); + private final int batchSize; + private final ScheduledExecutorService scheduler; + private final ScheduledFuture scheduledFuture; + + private volatile boolean closed = false; + private volatile Exception flushException; + private final HugeGraphClient client; + + public BatchBuffer(HugeGraphClient client, int batchSize, long batchIntervalMs) { + + this.batchSize = batchSize; + this.client = client; + + if (batchIntervalMs > 0) { + this.scheduler = + Executors.newSingleThreadScheduledExecutor( + runnable -> { + Thread thread = new Thread(runnable, "hugegraph-sink-flusher"); + thread.setDaemon(true); + return thread; + }); + this.scheduledFuture = + this.scheduler.scheduleAtFixedRate( + () -> { + try { + flush(); + } catch (Exception e) { + flushException = e; + } + }, + batchIntervalMs, + batchIntervalMs, + TimeUnit.MILLISECONDS); + } else { + this.scheduler = null; + this.scheduledFuture = null; + } + } + + public synchronized void add(GraphElement element) throws IOException { + checkFlushException(); + if (closed) { + throw new IOException("BatchBuffer is already closed."); + } + + try { + buffer.add(element); + if (buffer.size() >= batchSize) { + doFlush(); + } + } catch (Exception e) { + throw new IOException("Failed to add element and flush", e); + } + } + + public synchronized void flush() throws IOException { + checkFlushException(); + if (closed && buffer.isEmpty()) { + return; + } + doFlush(); + } + + private void doFlush() throws IOException { + if (buffer.isEmpty()) { + return; + } + try { + GraphElement firstElement = buffer.get(0); + if (firstElement instanceof Vertex) { + List vertices = + buffer.stream() + .map(element -> (Vertex) element) + .collect(Collectors.toList()); + client.batchWriteVertices(vertices); + } else { + List edges = + buffer.stream().map(element -> (Edge) element).collect(Collectors.toList()); + client.batchWriteEdges(edges); + } + + buffer.clear(); + } catch (Exception e) { + LOG.error("Failed to write batch data to HugeGraph", e); + throw new IOException("Failed to write batch data to HugeGraph", e); + } + } + + @Override + public void close() throws IOException { + synchronized (this) { + if (closed) { + return; + } + closed = true; + } + + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + } + if (scheduler != null) { + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + LOG.info("Closing BatchBuffer, performing final flush..."); + flush(); + checkFlushException(); + LOG.info("BatchBuffer closed."); + } + + private void checkFlushException() throws IOException { + if (flushException != null) { + throw new IOException("An error occurred during asynchronous flush", flushException); + } + } +} diff --git a/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/client/HugeGraphClient.java b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/client/HugeGraphClient.java new file mode 100644 index 00000000000..628f8ec9c05 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/client/HugeGraphClient.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hugegraph.client; + +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.HugeGraphSinkConfig; + +import org.apache.hugegraph.driver.HugeClient; +import org.apache.hugegraph.driver.SchemaManager; +import org.apache.hugegraph.exception.ServerException; +import org.apache.hugegraph.rest.ClientException; +import org.apache.hugegraph.structure.constant.IdStrategy; +import org.apache.hugegraph.structure.graph.Edge; +import org.apache.hugegraph.structure.graph.Vertex; +import org.apache.hugegraph.structure.schema.EdgeLabel; +import org.apache.hugegraph.structure.schema.PropertyKey; +import org.apache.hugegraph.structure.schema.VertexLabel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import lombok.Getter; + +import java.io.IOException; +import java.util.List; + +public final class HugeGraphClient { + + // TODO: Add handling for schema fetch failures. + private static final Logger LOG = LoggerFactory.getLogger(HugeGraphClient.class); + + private final HugeClient client; + @Getter private final SchemaManager schema; + + public HugeGraphClient(HugeGraphSinkConfig config) { + this.client = createClient(config); + this.schema = client.schema(); + } + + private static HugeClient createClient(HugeGraphSinkConfig config) { + int maxRetries = config.getMaxRetries() > 0 ? config.getMaxRetries() : 3; + long retryIntervalMillis = + config.getRetryBackoffMs() > 0 ? config.getRetryBackoffMs() : 5000L; + + for (int attempt = 1; attempt <= maxRetries; attempt++) { + try { + String url = String.format("http://%s:%d", config.getHost(), config.getPort()); + LOG.debug( + "Creating new HugeClient for url: {}, graph: {}", + url, + config.getGraphName()); + + HugeClient client = + HugeClient.builder(url, config.getGraphName()) + .configUser(config.getUsername(), config.getPassword()) + .configIdleTime(60) + .build(); + + client.graph().listVertices(); + LOG.info( + "Successfully created and validated HugeClient instance on attempt {}/{}.", + attempt, + maxRetries); + return client; + } catch (Exception e) { + LOG.error( + "Failed to create HugeClient on attempt {}/{}. Error: {}", + attempt, + maxRetries, + e.getMessage()); + if (attempt < maxRetries) { + try { + LOG.info("Will retry in {} ms...", retryIntervalMillis); + Thread.sleep(retryIntervalMillis); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IllegalStateException( + "Client creation was interrupted during retry wait.", ie); + } + } + } + } + throw new IllegalStateException( + "Failed to create HugeClient after " + maxRetries + " attempts."); + } + + public PropertyKey getPropertyKey(String propertyName) { + return schema.getPropertyKey(propertyName); + } + + public String getVertexLabel(String label) { + VertexLabel vertexLabel = this.client.schema().getVertexLabel(label); + return String.valueOf(vertexLabel.id()); + } + + public String getEdgeLabel(String label) { + EdgeLabel edgeLabel = this.client.schema().getEdgeLabel(label); + return String.valueOf(edgeLabel.id()); + } + + public IdStrategy getIdStrategy(String label) { + VertexLabel vertexLabel = this.client.schema().getVertexLabel(label); + return vertexLabel.idStrategy(); + } + + public void writeVertex(Vertex vertex) throws IOException { + try { + this.client.graph().addVertex(vertex); + } catch (ServerException | ClientException e) { + LOG.error( + "Failed to write vertex (will trigger task restart). Error: {}", + e.getMessage(), + e); + throw new IOException("Failed to write vertex, triggering task restart", e); + } catch (Exception e) { + LOG.error( + "Unknown error writing vertex (will trigger task restart). Error: {}", + e.getMessage(), + e); + throw new IOException("Unknown error writing vertex", e); + } + } + + public void writeEdge(Edge edge) throws IOException { + try { + this.client.graph().addEdge(edge); + } catch (ServerException | ClientException e) { + LOG.error( + "Failed to write edge (will trigger task restart). Error: {}", + e.getMessage(), + e); + throw new IOException("Failed to write edge, triggering task restart", e); + } catch (Exception e) { + LOG.error( + "Unknown error writing edge (will trigger task restart). Error: {}", + e.getMessage(), + e); + throw new IOException("Unknown error writing edge", e); + } + } + + public void deleteVertex(Object vertexId) throws IOException { + try { + this.client.graph().removeVertex(vertexId); + } catch (ServerException | ClientException e) { + LOG.error( + "Failed to delete vertex (will trigger task restart). Error: {}", + e.getMessage(), + e); + throw new IOException("Failed to delete vertex, triggering task restart", e); + } catch (Exception e) { + LOG.error( + "Unknown error deleting vertex (will trigger task restart). Error: {}", + e.getMessage(), + e); + throw new IOException("Unknown error deleting vertex", e); + } + } + + public void deleteEdge(String edgeId) throws IOException { + try { + this.client.graph().removeEdge(edgeId); + } catch (ServerException | ClientException e) { + LOG.error( + "Failed to delete edge (will trigger task restart). Error: {}", + e.getMessage(), + e); + throw new IOException("Failed to delete edge, triggering task restart", e); + } catch (Exception e) { + LOG.error( + "Unknown error deleting edge (will trigger task restart). Error: {}", + e.getMessage(), + e); + throw new IOException("Unknown error deleting edge", e); + } + } + + public void deleteVertexWithEdges(Object vertexId) throws IOException { + try { + List edges = this.client.graph().getEdges(vertexId); + for (Edge edge : edges) { + this.client.graph().removeEdge(edge.id()); + } + this.client.graph().removeVertex(vertexId); + } catch (ServerException | ClientException e) { + LOG.error( + "Failed to delete vertex (will trigger task restart). Error: {}", + e.getMessage(), + e); + throw new IOException("Failed to delete vertex, triggering task restart", e); + } catch (Exception e) { + LOG.error( + "Unknown error deleting vertex (will trigger task restart). Error: {}", + e.getMessage(), + e); + throw new IOException("Unknown error deleting vertex", e); + } + } + + public void batchWriteVertices(List buffer) throws IOException { + try { + this.client.graph().addVertices(buffer); + } catch (ServerException | ClientException e) { + LOG.error( + "Failed to batch write vertex (will trigger task restart). Error: {}", + e.getMessage(), + e); + throw new IOException("Failed to batch write vertex, triggering task restart", e); + } catch (Exception e) { + LOG.error( + "Unknown error batch writing vertex (will trigger task restart). Error: {}", + e.getMessage(), + e); + throw new IOException("Unknown error batch writing vertex", e); + } + } + + public void batchWriteEdges(List buffer) throws IOException { + try { + this.client.graph().addEdges(buffer); + } catch (ServerException | ClientException e) { + LOG.error( + "Failed to batch write edge (will trigger task restart). Error: {}", + e.getMessage(), + e); + throw new IOException("Failed to batch write edge, triggering task restart", e); + } catch (Exception e) { + LOG.error( + "Unknown error batch writing edge (will trigger task restart). Error: {}", + e.getMessage(), + e); + throw new IOException("Unknown error batch writing edge", e); + } + } + + public void close() { + if (this.client != null) { + LOG.info("Closing HugeClient instance."); + this.client.close(); + } + } +} diff --git a/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/config/HugeGraphOptions.java b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/config/HugeGraphOptions.java new file mode 100644 index 00000000000..5dcaa6c1e08 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/config/HugeGraphOptions.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hugegraph.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class HugeGraphOptions { + + public static final String PLUGIN_NAME = "HugeGraph"; + + public static final Option HOST = + Options.key("host") + .stringType() + .noDefaultValue() + .withDescription("HugeGraph server host"); + + public static final Option PORT = + Options.key("port").intType().noDefaultValue().withDescription("HugeGraph server port"); + + public static final Option GRAPH_NAME = + Options.key("graph_name") + .stringType() + .noDefaultValue() + .withDescription("The name of the graph to be operated on"); + + public static final Option GRAPH_SPACE = + Options.key("graph_space") + .stringType() + .noDefaultValue() + .withDescription("The graph space of the graph to be operated on"); + + public static final Option USERNAME = + Options.key("username") + .stringType() + .noDefaultValue() + .withDescription("HugeGraph username"); + + public static final Option PASSWORD = + Options.key("password") + .stringType() + .noDefaultValue() + .withDescription("HugeGraph password"); + + public static final Option BATCH_SIZE = + Options.key("batch_size").intType().defaultValue(500).withDescription("The batch size"); + + public static final Option BATCH_INTERVAL_MS = + Options.key("batch_interval_ms") + .intType() + .defaultValue(5000) + .withDescription("The batch flash period"); + + public static final Option MAX_RETRIES = + Options.key("max_retries").intType().defaultValue(3).withDescription("The retry times"); + + public static final Option RETRY_BACKOFF_MS = + Options.key("retry_backoff_ms") + .intType() + .defaultValue(5000) + .withDescription("The retry backoff time"); +} diff --git a/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/config/HugeGraphSinkConfig.java b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/config/HugeGraphSinkConfig.java new file mode 100644 index 00000000000..dcb0fb32c30 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/config/HugeGraphSinkConfig.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hugegraph.config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +public class HugeGraphSinkConfig implements Serializable { + + private String host; + private int port; + private String graphName; + private String graphSpace; + private String username; + private String password; + private SchemaConfig schemaConfig; + private int batchSize; + private int batchIntervalMs; + private int maxRetries; + private int retryBackoffMs; + + // mapping config + private List selectedFields; + private List ignoredFields; + + public static HugeGraphSinkConfig of(ReadonlyConfig config) { + HugeGraphSinkConfig sinkConfig = new HugeGraphSinkConfig(); + + sinkConfig.setHost(config.get(HugeGraphOptions.HOST)); + sinkConfig.setPort(config.get(HugeGraphOptions.PORT)); + sinkConfig.setGraphName(config.get(HugeGraphOptions.GRAPH_NAME)); + sinkConfig.setBatchSize( + config.getOptional(HugeGraphOptions.BATCH_SIZE) + .orElse(HugeGraphOptions.BATCH_SIZE.defaultValue())); + sinkConfig.setBatchIntervalMs( + config.getOptional(HugeGraphOptions.BATCH_INTERVAL_MS) + .orElse(HugeGraphOptions.BATCH_INTERVAL_MS.defaultValue())); + sinkConfig.setMaxRetries( + config.getOptional(HugeGraphOptions.MAX_RETRIES) + .orElse(HugeGraphOptions.MAX_RETRIES.defaultValue())); + sinkConfig.setRetryBackoffMs( + config.getOptional(HugeGraphOptions.RETRY_BACKOFF_MS) + .orElse(HugeGraphOptions.RETRY_BACKOFF_MS.defaultValue())); + sinkConfig.setSchemaConfig(config.get(HugeGraphSinkOptions.SCHEMA_CONFIG)); + + config.getOptional(HugeGraphSinkOptions.SELECTED_FIELDS) + .ifPresent(sinkConfig::setSelectedFields); + config.getOptional(HugeGraphSinkOptions.IGNORED_FIELDS) + .ifPresent(sinkConfig::setIgnoredFields); + + config.getOptional(HugeGraphOptions.GRAPH_SPACE).ifPresent(sinkConfig::setGraphSpace); + config.getOptional(HugeGraphOptions.USERNAME).ifPresent(sinkConfig::setUsername); + config.getOptional(HugeGraphOptions.PASSWORD).ifPresent(sinkConfig::setPassword); + + return sinkConfig; + } +} diff --git a/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/config/HugeGraphSinkOptions.java b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/config/HugeGraphSinkOptions.java new file mode 100644 index 00000000000..815ca789289 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/config/HugeGraphSinkOptions.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hugegraph.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +import java.util.List; + +public class HugeGraphSinkOptions { + + public static final Option> SELECTED_FIELDS = + Options.key("selected_fields") + .listType() + .noDefaultValue() + .withDescription("Selected Fields"); + + public static final Option> IGNORED_FIELDS = + Options.key("ignored_fields") + .listType() + .noDefaultValue() + .withDescription("Ignored Fields"); + + public static final Option SCHEMA_CONFIG = + Options.key("schema_config") + .objectType(SchemaConfig.class) + .noDefaultValue() + .withDescription( + "Schema configuration object that describes the mapping to a vertex or edge."); +} diff --git a/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/config/MappingConfig.java b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/config/MappingConfig.java new file mode 100644 index 00000000000..c71c2ba9142 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/config/MappingConfig.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hugegraph.config; + +import lombok.Data; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +@Data +public class MappingConfig implements Serializable { + private Map fieldMapping; + private Map valueMapping; + private List nullableKeys; + private List nullValues; + private List sortKeys; + + // Time config + private String dateFormat; + private String timeZone; +} diff --git a/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/config/SchemaConfig.java b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/config/SchemaConfig.java new file mode 100644 index 00000000000..4dfe3cddfe2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/config/SchemaConfig.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hugegraph.config; + +import org.apache.hugegraph.structure.constant.Frequency; +import org.apache.hugegraph.structure.constant.IdStrategy; + +import lombok.Data; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +@Data +public class SchemaConfig implements Serializable { + + // General config + private LabelType type; + private String label; + private String tablePath; + + // Property Config + private List properties; + + // General Label Config + private Long ttl; + private String ttlStartTime; + private String enableLabelIndex; + private Map userdata; + + // VertexLabel config + private IdStrategy idStrategy; + private List idFields; + + // EdgeLabel Config + private SourceTargetConfig sourceConfig; + private SourceTargetConfig targetConfig; + private Frequency frequency; + + // Mapping Config + private MappingConfig mapping; + + public enum LabelType { + VERTEX, + EDGE + } + + @Data + public static class SourceTargetConfig implements Serializable { + private String label; + private List idFields; + } +} diff --git a/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/mapper/EdgeMapper.java b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/mapper/EdgeMapper.java new file mode 100644 index 00000000000..8c4a55b9d8c --- /dev/null +++ b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/mapper/EdgeMapper.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hugegraph.mapper; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.client.HugeGraphClient; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.MappingConfig; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.SchemaConfig; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.SchemaConfig.SourceTargetConfig; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.utils.DataTypeUtil; + +import org.apache.hugegraph.structure.constant.IdStrategy; +import org.apache.hugegraph.structure.graph.Edge; +import org.apache.hugegraph.structure.schema.PropertyKey; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +public class EdgeMapper implements GraphDataMapper { + + private final SchemaConfig schemaConfig; + private final MappingConfig mappingConfig; + private final Map fieldsIndex; + private final HugeGraphClient client; + private final Object labelId; + private final Map propertyKeyCache; + + public EdgeMapper( + SchemaConfig schemaConfig, Map fieldsIndex, HugeGraphClient client) { + this.schemaConfig = schemaConfig; + this.mappingConfig = getMappingConfig(); + this.client = client; + this.labelId = client.getEdgeLabel(schemaConfig.getLabel()); + this.fieldsIndex = fieldsIndex; + this.propertyKeyCache = getPropertyKeyCache(); + } + + private MappingConfig getMappingConfig() { + MappingConfig mapping = + schemaConfig.getMapping() == null ? new MappingConfig() : schemaConfig.getMapping(); + if (mapping.getFieldMapping() == null) { + mapping.setFieldMapping(Collections.emptyMap()); + } + if (mapping.getValueMapping() == null) { + mapping.setValueMapping(Collections.emptyMap()); + } + schemaConfig.setMapping(mapping); + return mapping; + } + + private HashMap getPropertyKeyCache() { + HashMap cache = new HashMap<>(); + Map fieldMapping = mappingConfig.getFieldMapping(); + for (String fieldName : fieldsIndex.keySet()) { + String propertyName = fieldMapping.getOrDefault(fieldName, fieldName); + cache.put(propertyName, client.getPropertyKey(propertyName)); + } + return cache; + } + + @Override + public Edge map(SeaTunnelRow row) { + // 1. Build source and target vertex IDs + Object sourceId = buildVertexId(row, schemaConfig.getSourceConfig()); + Object targetId = buildVertexId(row, schemaConfig.getTargetConfig()); + + // If source or target ID can't be built, we can't create the edge + if (sourceId == null || targetId == null) { + return null; + } + + // 2. Create edge and set identifiers + Edge edge = new Edge(schemaConfig.getLabel()); + edge.sourceId(sourceId); + edge.targetId(targetId); + edge.sourceLabel(schemaConfig.getSourceConfig().getLabel()); + edge.targetLabel(schemaConfig.getTargetConfig().getLabel()); + + // 3. Set properties + Set idFields = new HashSet<>(); + idFields.addAll(schemaConfig.getSourceConfig().getIdFields()); + idFields.addAll(schemaConfig.getTargetConfig().getIdFields()); + + Map fieldMapping = new HashMap<>(mappingConfig.getFieldMapping()); + + for (Map.Entry fieldEntry : fieldsIndex.entrySet()) { + String fieldName = fieldEntry.getKey(); + String propertyName = fieldMapping.getOrDefault(fieldName, fieldName); + Object rawValue = row.getField(fieldEntry.getValue()); + PropertyKey propertyKey = propertyKeyCache.get(propertyName); + + // Skip fields used for source/target vertex IDs + if (idFields.contains(fieldName) || isConsideredNull(rawValue)) { + continue; + } + + Object fieldValue = + DataTypeUtil.convert( + rawValue, + propertyKey, + mappingConfig.getDateFormat(), + mappingConfig.getTimeZone()); + + edge.property(propertyName, getMappedValue(fieldValue)); + } + return edge; + } + + private Object buildVertexId(SeaTunnelRow row, SourceTargetConfig config) { + + String vertexLabelId = client.getVertexLabel(config.getLabel()); + IdStrategy strategy = client.getIdStrategy(config.getLabel()); + if (strategy == null || strategy == IdStrategy.AUTOMATIC) { + return null; + } + + List idFields = config.getIdFields(); + switch (strategy) { + case PRIMARY_KEY: + List pkValues = getFieldValues(row, idFields); + if (pkValues.size() != idFields.size() + || pkValues.stream().anyMatch(this::isConsideredNull)) { + return null; + } + return spliceVertexId(vertexLabelId, pkValues); + case CUSTOMIZE_STRING: + List stringValues = getFieldValues(row, idFields); + if (stringValues.size() != idFields.size() + || stringValues.stream().anyMatch(this::isConsideredNull)) { + return null; + } + return stringValues.stream().map(String::valueOf).collect(Collectors.joining(":")); + case CUSTOMIZE_NUMBER: + List numberValues = getFieldValues(row, idFields); + if (numberValues.size() != 1) { + return null; + } + Object numValue = numberValues.get(0); + if (isConsideredNull(numValue)) { + return null; + } + if (numValue instanceof Number) { + return ((Number) numValue).longValue(); + } else { + return Long.parseLong(String.valueOf(numValue)); + } + case CUSTOMIZE_UUID: + List uuidValues = getFieldValues(row, idFields); + if (uuidValues.size() != 1) { + return null; + } + Object uuidValue = uuidValues.get(0); + if (isConsideredNull(uuidValue)) { + return null; + } + return UUID.fromString(String.valueOf(uuidValue)); + default: + throw new UnsupportedOperationException("Unsupported IdStrategy: " + strategy); + } + } + + private List getFieldValues(SeaTunnelRow row, List fields) { + List values = new ArrayList<>(fields.size()); + Map fieldMapping = mappingConfig.getFieldMapping(); + for (String fieldName : fields) { + + Integer index = fieldsIndex.get(fieldName); + if (index == null) { + throw new IllegalArgumentException( + String.format( + "Field '%s' specified in id_fields not found in row schema. Available fields: %s", + fieldName, fieldsIndex.keySet())); + } + + Object rawValue = row.getField(index); + if (isConsideredNull(rawValue)) { + continue; + } + + String propertyName = fieldMapping.getOrDefault(fieldName, fieldName); + PropertyKey propertyKey = propertyKeyCache.get(propertyName); + + Object fieldValue = + DataTypeUtil.convert( + rawValue, + propertyKey, + mappingConfig.getDateFormat(), + mappingConfig.getTimeZone()); + + values.add(getMappedValue(fieldValue)); + } + return values; + } + + private boolean isConsideredNull(Object value) { + if (value == null) { + return true; + } + List nullValues = mappingConfig.getNullValues(); + if (nullValues == null || nullValues.isEmpty()) { + return false; + } + return nullValues.contains(String.valueOf(value)); + } + + private Object getMappedValue(Object originalValue) { + Map valueMapping = mappingConfig.getValueMapping(); + if (valueMapping.isEmpty()) { + return originalValue; + } + return valueMapping.getOrDefault(originalValue, originalValue); + } + + private String spliceVertexId(String vertexLabelId, List primaryValues) { + String joinedValues = + primaryValues.stream().map(Object::toString).collect(Collectors.joining("!")); + return String.format("%s:%s", vertexLabelId, joinedValues); + } + + private String getSortedKeyValues(SeaTunnelRow row) { + List sortedKeys = mappingConfig.getSortKeys(); + if (sortedKeys == null || sortedKeys.isEmpty()) { + return String.valueOf(labelId); + } + List skValues = getFieldValues(row, sortedKeys); + return skValues.stream().map(Object::toString).collect(Collectors.joining(",")); + } + + @Override + public Object extractId(SeaTunnelRow row) { + Object sourceId = buildVertexId(row, schemaConfig.getSourceConfig()); + Object targetId = buildVertexId(row, schemaConfig.getTargetConfig()); + String sortedKeyValues = getSortedKeyValues(row); + return String.format("S%s>%s>%s>>S%s", sourceId, labelId, sortedKeyValues, targetId); + } +} diff --git a/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/mapper/GraphDataMapper.java b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/mapper/GraphDataMapper.java new file mode 100644 index 00000000000..b5a669625b1 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/mapper/GraphDataMapper.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hugegraph.mapper; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.apache.hugegraph.structure.GraphElement; + +import java.io.Serializable; + +public interface GraphDataMapper extends Serializable { + + /** + * Maps a SeaTunnelRow to a HugeGraph GraphElement (Vertex or Edge). + * + * @param row The input SeaTunnelRow. + * @return The resulting GraphElement. + */ + GraphElement map(SeaTunnelRow row); + + /** + * Extracts the ID from a SeaTunnelRow. + * + * @param row The input SeaTunnelRow. + * @return The extracted ID object. + */ + Object extractId(SeaTunnelRow row); +} diff --git a/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/mapper/VertexMapper.java b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/mapper/VertexMapper.java new file mode 100644 index 00000000000..e47103204bc --- /dev/null +++ b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/mapper/VertexMapper.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hugegraph.mapper; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.client.HugeGraphClient; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.MappingConfig; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.SchemaConfig; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.utils.DataTypeUtil; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.utils.E; + +import org.apache.hugegraph.structure.constant.IdStrategy; +import org.apache.hugegraph.structure.graph.Vertex; +import org.apache.hugegraph.structure.schema.PropertyKey; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +public class VertexMapper implements GraphDataMapper { + + private final SchemaConfig schemaConfig; + private final MappingConfig mappingConfig; + private final Map fieldsIndex; + private final String labelId; + private final HugeGraphClient client; + private final Map propertyKeyCache; + + public VertexMapper( + SchemaConfig schemaConfig, Map fieldsIndex, HugeGraphClient client) { + this.schemaConfig = schemaConfig; + this.mappingConfig = getMappingConfig(); + this.client = client; + this.labelId = client.getVertexLabel(schemaConfig.getLabel()); + this.fieldsIndex = fieldsIndex; + this.propertyKeyCache = getPropertyKeyCache(); + } + + private MappingConfig getMappingConfig() { + MappingConfig mapping = + schemaConfig.getMapping() == null ? new MappingConfig() : schemaConfig.getMapping(); + if (mapping.getFieldMapping() == null) { + mapping.setFieldMapping(Collections.emptyMap()); + } + if (mapping.getValueMapping() == null) { + mapping.setValueMapping(Collections.emptyMap()); + } + schemaConfig.setMapping(mapping); + return mapping; + } + + private HashMap getPropertyKeyCache() { + HashMap cache = new HashMap<>(); + Map fieldMapping = mappingConfig.getFieldMapping(); + for (String fieldName : fieldsIndex.keySet()) { + String propertyName = fieldMapping.getOrDefault(fieldName, fieldName); + cache.put(propertyName, client.getPropertyKey(propertyName)); + } + return cache; + } + + @Override + public Vertex map(SeaTunnelRow row) { + String label = schemaConfig.getLabel(); + E.checkArgument(label != null && !label.isEmpty(), "Vertex label can't be null or empty."); + Vertex vertex = new Vertex(label); + + // 1. Set vertex ID + Object id = extractId(row); + if (id == null && schemaConfig.getIdStrategy() != IdStrategy.AUTOMATIC) { + return null; + } + + if (id != null && schemaConfig.getIdStrategy() != IdStrategy.PRIMARY_KEY) { + vertex.id(id); + } + + // 2. Set properties + Map fieldMapping = mappingConfig.getFieldMapping(); + + for (Map.Entry fieldEntry : fieldsIndex.entrySet()) { + + String fieldName = fieldEntry.getKey(); + String propertyName = fieldMapping.getOrDefault(fieldName, fieldName); + Object rawValue = row.getField(fieldEntry.getValue()); + PropertyKey propertyKey = propertyKeyCache.get(propertyName); + + if (isConsideredNull(rawValue)) { + continue; + } + + Object fieldValue = + DataTypeUtil.convert( + rawValue, + propertyKey, + mappingConfig.getDateFormat(), + mappingConfig.getTimeZone()); + + vertex.property(propertyName, getMappedValue(fieldValue)); + } + return vertex; + } + + @Override + public Object extractId(SeaTunnelRow row) { + IdStrategy strategy = schemaConfig.getIdStrategy(); + if (strategy == null || strategy == IdStrategy.AUTOMATIC) { + return null; + } + + List idFields = schemaConfig.getIdFields(); + E.checkArgument( + idFields != null && !idFields.isEmpty(), + "The 'idFields' must be specified for ID strategy '%s'.", + strategy); + + switch (strategy) { + case PRIMARY_KEY: + List pkValues = getFieldValues(row, idFields); + if (pkValues.size() != idFields.size() + || pkValues.stream().anyMatch(this::isConsideredNull)) { + return null; + } + return spliceVertexId(pkValues); + case CUSTOMIZE_STRING: + List stringValues = getFieldValues(row, idFields); + if (stringValues.size() != idFields.size() + || stringValues.stream().anyMatch(this::isConsideredNull)) { + return null; + } + return stringValues.stream().map(String::valueOf).collect(Collectors.joining(":")); + case CUSTOMIZE_NUMBER: + List numberValues = getFieldValues(row, idFields); + if (numberValues.size() != 1) { + return null; + } + Object numValue = numberValues.get(0); + if (isConsideredNull(numValue)) { + return null; + } + if (numValue instanceof Number) { + return ((Number) numValue).longValue(); + } else { + return Long.parseLong(String.valueOf(numValue)); + } + case CUSTOMIZE_UUID: + List uuidValues = getFieldValues(row, idFields); + if (uuidValues.size() != 1) { + return null; + } + Object uuidValue = uuidValues.get(0); + if (isConsideredNull(uuidValue)) { + return null; + } + return UUID.fromString(String.valueOf(uuidValue)); + default: + throw new UnsupportedOperationException("Unsupported IdStrategy: " + strategy); + } + } + + private List getFieldValues(SeaTunnelRow row, List fields) { + List values = new ArrayList<>(fields.size()); + Map fieldMapping = mappingConfig.getFieldMapping(); + for (String fieldName : fields) { + + Integer index = fieldsIndex.get(fieldName); + if (index == null) { + throw new IllegalArgumentException( + String.format( + "Field '%s' specified in id_fields not found in row schema. Available fields: %s", + fieldName, fieldsIndex.keySet())); + } + + Object rawValue = row.getField(index); + if (isConsideredNull(rawValue)) { + continue; + } + + String propertyName = fieldMapping.getOrDefault(fieldName, fieldName); + PropertyKey propertyKey = propertyKeyCache.get(propertyName); + + Object fieldValue = + DataTypeUtil.convert( + rawValue, + propertyKey, + mappingConfig.getDateFormat(), + mappingConfig.getTimeZone()); + + values.add(getMappedValue(fieldValue)); + } + return values; + } + + private boolean isConsideredNull(Object value) { + if (value == null) { + return true; + } + List nullValues = mappingConfig.getNullValues(); + if (nullValues == null || nullValues.isEmpty()) { + return false; + } + return nullValues.contains(String.valueOf(value)); + } + + private Object getMappedValue(Object originalValue) { + Map valueMapping = mappingConfig.getValueMapping(); + if (valueMapping.isEmpty()) { + return originalValue; + } + return valueMapping.getOrDefault(originalValue, originalValue); + } + + private String spliceVertexId(List primaryValues) { + String joinedValues = + primaryValues.stream().map(Object::toString).collect(Collectors.joining("!")); + return String.format("%s:%s", labelId, joinedValues); + } +} diff --git a/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/sink/HugeGraphSink.java b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/sink/HugeGraphSink.java new file mode 100644 index 00000000000..91f594a04b1 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/sink/HugeGraphSink.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hugegraph.sink; + +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.HugeGraphOptions; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.HugeGraphSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.utils.SchemaValidator; + +import java.io.IOException; +import java.util.Optional; + +public class HugeGraphSink extends AbstractSimpleSink { + + private final HugeGraphSinkConfig config; + private final CatalogTable catalogTable; + private final SeaTunnelRowType rowType; + + public HugeGraphSink(HugeGraphSinkConfig config, CatalogTable catalogTable) { + this.config = config; + this.catalogTable = catalogTable; + this.rowType = catalogTable.getSeaTunnelRowType(); + + // TODO: Discuss where to implement this in the future, maybe the catalog + SchemaValidator validator = new SchemaValidator(config, rowType); + validator.validateSchema(); + } + + @Override + public String getPluginName() { + return HugeGraphOptions.PLUGIN_NAME; + } + + @Override + public HugeGraphSinkWriter createWriter(SinkWriter.Context context) throws IOException { + return new HugeGraphSinkWriter(config, rowType); + } + + @Override + public Optional getWriteCatalogTable() { + return Optional.ofNullable(catalogTable); + } +} diff --git a/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/sink/HugeGraphSinkFactory.java b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/sink/HugeGraphSinkFactory.java new file mode 100644 index 00000000000..56e75ab214c --- /dev/null +++ b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/sink/HugeGraphSinkFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hugegraph.sink; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.connector.TableSink; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.HugeGraphOptions; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.HugeGraphSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.HugeGraphSinkOptions; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class HugeGraphSinkFactory implements TableSinkFactory { + + @Override + public String factoryIdentifier() { + return HugeGraphOptions.PLUGIN_NAME; + } + + @Override + public TableSink createSink(TableSinkFactoryContext context) { + HugeGraphSinkConfig sinkConfig = HugeGraphSinkConfig.of(context.getOptions()); + return () -> new HugeGraphSink(sinkConfig, context.getCatalogTable()); + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + // connection config + .required(HugeGraphOptions.HOST, HugeGraphOptions.PORT, HugeGraphOptions.GRAPH_NAME) + .optional( + HugeGraphOptions.GRAPH_SPACE, + HugeGraphOptions.USERNAME, + HugeGraphOptions.PASSWORD) + // mapping config + .exclusive( + HugeGraphSinkOptions.SELECTED_FIELDS, HugeGraphSinkOptions.IGNORED_FIELDS) + .required(HugeGraphSinkOptions.SCHEMA_CONFIG) + // batch config + .optional(HugeGraphOptions.BATCH_SIZE, HugeGraphOptions.BATCH_INTERVAL_MS) + // error operation + .optional(HugeGraphOptions.MAX_RETRIES, HugeGraphOptions.RETRY_BACKOFF_MS) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/sink/HugeGraphSinkWriter.java b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/sink/HugeGraphSinkWriter.java new file mode 100644 index 00000000000..d3ff4064c5b --- /dev/null +++ b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/sink/HugeGraphSinkWriter.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hugegraph.sink; + +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.buffer.BatchBuffer; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.client.HugeGraphClient; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.HugeGraphSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.SchemaConfig; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.SchemaConfig.LabelType; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.mapper.EdgeMapper; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.mapper.GraphDataMapper; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.mapper.VertexMapper; + +import org.apache.hugegraph.structure.GraphElement; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class HugeGraphSinkWriter extends AbstractSinkWriter + implements SupportMultiTableSinkWriter { + + private static final Logger LOG = LoggerFactory.getLogger(HugeGraphSinkWriter.class); + + private final HugeGraphSinkConfig sinkConfig; + private final GraphDataMapper mapper; + private final HugeGraphClient client; + private final BatchBuffer buffer; + + public HugeGraphSinkWriter(HugeGraphSinkConfig sinkConfig, SeaTunnelRowType rowType) { + this.sinkConfig = sinkConfig; + this.client = new HugeGraphClient(sinkConfig); + this.mapper = getMapper(rowType); + this.buffer = + new BatchBuffer( + this.client, sinkConfig.getBatchSize(), sinkConfig.getBatchIntervalMs()); + } + + private GraphDataMapper getMapper(SeaTunnelRowType rowType) { + SchemaConfig schemaConfig = sinkConfig.getSchemaConfig(); + List selectedFields = sinkConfig.getSelectedFields(); + List ignoredFields = sinkConfig.getIgnoredFields(); + Map originalFieldsIndex = + IntStream.range(0, rowType.getTotalFields()) + .boxed() + .collect(Collectors.toMap(rowType::getFieldName, i -> i)); + + Map finalFieldsIndex = new LinkedHashMap<>(); + + if (selectedFields != null && !selectedFields.isEmpty()) { + for (String field : selectedFields) { + Integer originalIndex = originalFieldsIndex.get(field); + if (originalIndex != null) { + finalFieldsIndex.put(field, originalIndex); + } + } + } else if (ignoredFields != null && !ignoredFields.isEmpty()) { + Set ignoreSet = new HashSet<>(ignoredFields); + for (Map.Entry entry : originalFieldsIndex.entrySet()) { + String fieldName = entry.getKey(); + Integer originalIndex = entry.getValue(); + + if (!ignoreSet.contains(fieldName)) { + finalFieldsIndex.put(fieldName, originalIndex); + } + } + } else { + finalFieldsIndex = originalFieldsIndex; + } + + if (schemaConfig.getType() == LabelType.VERTEX) { + return new VertexMapper(schemaConfig, finalFieldsIndex, client); + } else { + return new EdgeMapper(schemaConfig, finalFieldsIndex, client); + } + } + + @Override + public void write(SeaTunnelRow row) throws IOException { + switch (row.getRowKind()) { + case INSERT: + case UPDATE_AFTER: + handleUpsert(row); + break; + case DELETE: + case UPDATE_BEFORE: + handleDelete(row); + break; + default: + LOG.warn("Unsupported row kind: {}", row.getRowKind()); + break; + } + } + + private void handleUpsert(SeaTunnelRow row) throws IOException { + try { + GraphElement element = mapper.map(row); + if (element == null) { + LOG.warn("Cannot create graph element: required ID fields missing for row {}", row); + return; + } + buffer.add(element); + } catch (Exception e) { + if (e instanceof IOException) { + throw (IOException) e; + } + throw new IOException(e); + } + } + + private void handleDelete(SeaTunnelRow row) throws IOException { + try { + buffer.flush(); + if (sinkConfig.getSchemaConfig().getType() == LabelType.VERTEX) { + Object vertexId = mapper.extractId(row); + if (vertexId == null) { + LOG.warn("Cannot delete vertex: ID extraction failed for row {}", row); + return; + } + client.deleteVertexWithEdges(vertexId); + } else { + String edgeId = (String) mapper.extractId(row); + if (edgeId == null) { + LOG.warn("Cannot delete edge: ID extraction failed for row {}", row); + return; + } + client.deleteEdge(edgeId); + } + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public Optional prepareCommit() { + try { + buffer.flush(); + } catch (IOException e) { + LOG.error("Failed to flush data during prepareCommit, failing checkpoint.", e); + throw new RuntimeException("Failed to flush data during prepareCommit()", e); + } + return Optional.empty(); + } + + @Override + public void close() throws IOException { + if (buffer != null) { + buffer.close(); + } + + if (client != null) { + client.close(); + } + } +} diff --git a/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/utils/DataTypeUtil.java b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/utils/DataTypeUtil.java new file mode 100644 index 00000000000..d310f24df67 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/utils/DataTypeUtil.java @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hugegraph.utils; + +import org.apache.hugegraph.structure.constant.Cardinality; +import org.apache.hugegraph.structure.constant.DataType; +import org.apache.hugegraph.structure.schema.PropertyKey; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +public final class DataTypeUtil { + + private static final Set ACCEPTABLE_TRUE; + + static { + ACCEPTABLE_TRUE = new HashSet<>(); + ACCEPTABLE_TRUE.add("true"); + ACCEPTABLE_TRUE.add("1"); + ACCEPTABLE_TRUE.add("yes"); + ACCEPTABLE_TRUE.add("y"); + } + + private static final Set ACCEPTABLE_FALSE; + + static { + ACCEPTABLE_FALSE = new HashSet<>(); + ACCEPTABLE_FALSE.add("false"); + ACCEPTABLE_FALSE.add("0"); + ACCEPTABLE_FALSE.add("no"); + ACCEPTABLE_FALSE.add("n"); + } + + public static Object convert( + Object value, PropertyKey propertyKey, String dateFormat, String timeZone) { + E.checkArgumentNotNull(value, "The value to be converted can't be null"); + + String key = propertyKey.name(); + DataType dataType = propertyKey.dataType(); + Cardinality cardinality = propertyKey.cardinality(); + switch (cardinality) { + case SINGLE: + return parseSingleValue(key, value, dataType, dateFormat, timeZone); + case SET: + case LIST: + return parseMultiValues(key, value, dataType, cardinality, dateFormat, timeZone); + default: + throw new AssertionError( + String.format("Unsupported cardinality: '%s'", cardinality)); + } + } + + /** + * collection format: "obj1,obj2,...,obj_n" or "[obj1,obj2,...,obj_n]" ..etc TODO: After parsing + * to json, the order of the collection changed in some cases (such as list) + */ + private static Object parseMultiValues( + String key, + Object values, + DataType dataType, + Cardinality cardinality, + String dateFormat, + String timeZone) { + // JSON file should not parse again + if (values instanceof Collection + && checkCollectionDataType(key, (Collection) values, dataType)) { + return values; + } + + E.checkState( + values instanceof String, + "The value(key='%s') must be String type, " + "but got '%s'(%s)", + key, + values); + String rawValue = (String) values; + List valueColl = split(key, rawValue); + Collection results = + cardinality == Cardinality.LIST ? new ArrayList<>() : new LinkedHashSet<>(); + valueColl.forEach( + value -> { + results.add(parseSingleValue(key, value, dataType, dateFormat, timeZone)); + }); + E.checkArgument( + checkCollectionDataType(key, results, dataType), + "Not all collection elems %s match with data type %s", + results, + dataType); + return results; + } + + @SuppressWarnings("unchecked") + public static List splitField(String key, Object rawColumnValue) { + E.checkArgument(rawColumnValue != null, "The value to be split can't be null"); + if (rawColumnValue instanceof Collection) { + Collection collection = (Collection) rawColumnValue; + return new ArrayList<>(collection); + } + String rawValue = rawColumnValue.toString(); + return split(key, rawValue); + } + + public static UUID parseUUID(String key, Object rawValue) { + if (rawValue instanceof UUID) { + return (UUID) rawValue; + } else if (rawValue instanceof String) { + String value = ((String) rawValue).trim(); + if (value.contains("-")) { + return UUID.fromString(value); + } + // UUID represented by hex string + E.checkArgument(value.length() == 32, "Invalid UUID value(key='%s') '%s'", key, value); + String high = value.substring(0, 16); + String low = value.substring(16); + return new UUID(Long.parseUnsignedLong(high, 16), Long.parseUnsignedLong(low, 16)); + } + throw new IllegalArgumentException( + String.format( + "Failed to convert value(key='%s') " + "'%s'(%s) to UUID", + key, rawValue, rawValue.getClass())); + } + + private static Object parseSingleValue( + String key, Object rawValue, DataType dataType, String dateFormat, String timeZone) { + Object value = trimString(rawValue); + if (value == null) { + return null; + } + + if (dataType.isNumber()) { + return parseNumber(key, value, dataType); + } + + switch (dataType) { + case TEXT: + return value.toString(); + case BOOLEAN: + return parseBoolean(key, value); + case DATE: + return parseDate(key, value, dateFormat, timeZone); + case UUID: + return parseUUID(key, value); + default: + E.checkArgument( + checkDataType(key, value, dataType), + "The value(key='%s') '%s'(%s) is not match with data type %s and " + + "can't convert to it", + key, + value, + value.getClass(), + dataType); + } + return value; + } + + private static Object trimString(Object rawValue) { + if (rawValue instanceof String) { + return ((String) rawValue).trim(); + } + return rawValue; + } + + private static Boolean parseBoolean(String key, Object rawValue) { + if (rawValue instanceof Boolean) { + return (Boolean) rawValue; + } + if (rawValue instanceof String) { + String value = ((String) rawValue).toLowerCase(); + if (ACCEPTABLE_TRUE.contains(value)) { + return true; + } else if (ACCEPTABLE_FALSE.contains(value)) { + return false; + } else { + throw new IllegalArgumentException( + String.format( + "Failed to convert '%s'(key='%s') to Boolean, " + + "the acceptable boolean strings are %s or %s", + key, rawValue, ACCEPTABLE_TRUE, ACCEPTABLE_FALSE)); + } + } + throw new IllegalArgumentException( + String.format( + "Failed to convert value(key='%s') " + "'%s'(%s) to Boolean", + key, rawValue, rawValue.getClass())); + } + + private static Number parseNumber(String key, Object value, DataType dataType) { + E.checkState(dataType.isNumber(), "The target data type must be number"); + try { + switch (dataType) { + case BYTE: + return Byte.parseByte(value.toString()); + case INT: + return Integer.parseInt(value.toString()); + case LONG: + return parseLong(value.toString()); + case FLOAT: + return Float.parseFloat(value.toString()); + case DOUBLE: + return Double.parseDouble(value.toString()); + default: + throw new AssertionError( + String.format( + "Number type only contains Byte, " + + "Integer, Long, Float, Double, " + + "but got %s", + dataType.clazz())); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + String.format( + "Failed to convert value(key=%s) " + "'%s'(%s) to Number", + key, value, value.getClass()), + e); + } + } + + private static long parseLong(String rawValue) { + if (rawValue.startsWith("-")) { + return Long.parseLong(rawValue); + } else { + return Long.parseUnsignedLong(rawValue); + } + } + + private static Date parseDate(String key, Object value) { + if (value == null) { + return null; + } + if (value instanceof Date) { + return (Date) value; + } + + if (value instanceof LocalDateTime) { + return Date.from(((LocalDateTime) value).atZone(ZoneId.systemDefault()).toInstant()); + } + + if (value instanceof java.time.LocalDate) { + return Date.from( + ((java.time.LocalDate) value).atStartOfDay(ZoneId.systemDefault()).toInstant()); + } + + if (value instanceof Number) { + return new Date(((Number) value).longValue()); + } + + if (value instanceof String) { + String s = ((String) value).trim(); + if (s.isEmpty()) { + return null; + } + // 1. Try to parse as long timestamp + try { + return new Date(Long.parseLong(s)); + } catch (NumberFormatException e) { + // Not a timestamp, proceed to parse as date string + } + + try { + return org.apache.hugegraph.util.DateUtil.parse(s); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format( + "Failed to convert string value(key='%s') '%s' to Date " + + "using HugeGraph DateUtil.", + key, value), + e); + } + } + + throw new IllegalArgumentException( + String.format( + "Failed to convert value(key='%s') " + "'%s'(%s) to Date", + key, value, value.getClass())); + } + + private static Date parseDate(String key, Object value, String dateFormat, String timeZone) { + if (value instanceof Date) { + return (Date) value; + } + + ZoneId zoneId; + try { + if (timeZone != null && !timeZone.isEmpty()) { + zoneId = ZoneId.of(timeZone); + } else { + zoneId = ZoneId.systemDefault(); + } + } catch (Exception e) { + throw new IllegalArgumentException( + String.format("Invalid timeZone string provided: '%s'", timeZone), e); + } + + if (value instanceof LocalDateTime) { + return Date.from(((LocalDateTime) value).atZone(zoneId).toInstant()); + } + + if (value instanceof java.time.LocalDate) { + return Date.from(((java.time.LocalDate) value).atStartOfDay(zoneId).toInstant()); + } + + if (value instanceof Number) { + return new Date(((Number) value).longValue()); + + } else if (value instanceof String) { + String strValue = ((String) value).trim(); + if ("timestamp".equals(dateFormat)) { + try { + return new Date(Long.parseLong(strValue)); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + String.format("Invalid timestamp value '%s'", value), e); + } + } + + if (dateFormat == null || dateFormat.isEmpty()) { + // Fallback for when no format is provided. + try { + return new Date(Long.parseLong(strValue)); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "Date format must be provided to parse a date string that is not a timestamp.", + e); + } + } + + try { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(dateFormat); + LocalDateTime ldt = LocalDateTime.parse(strValue, formatter); + ZonedDateTime zdt = ldt.atZone(zoneId); + return Date.from(zdt.toInstant()); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format( + "Failed to parse date string '%s' with format '%s'", + value, dateFormat), + e); + } + } + throw new IllegalArgumentException( + String.format( + "Failed to convert value(key='%s') " + "'%s'(%s) to Date", + key, value, value.getClass())); + } + + private static List split(String key, String rawValue) { + List valueColl = new ArrayList<>(); + if (rawValue == null || rawValue.isEmpty()) { + return valueColl; + } + + String value = rawValue.trim(); + String startSymbol = "["; + String endSymbol = "]"; + if (value.startsWith(startSymbol) && value.endsWith(endSymbol)) { + value = value.substring(startSymbol.length(), value.length() - endSymbol.length()); + } + + String elemDelimiter = ","; + // TODO: use a configurable list format + com.google.common.base.Splitter.on(elemDelimiter) + .trimResults() + .omitEmptyStrings() + .split(value) + .forEach(valueColl::add); + return valueColl; + } + + /** Check the type of the value valid */ + private static boolean checkDataType(String key, Object value, DataType dataType) { + if (value instanceof Number && dataType.isNumber()) { + return parseNumber(key, value, dataType) != null; + } + return dataType.clazz().isInstance(value); + } + + /** Check the type of all the values (maybe some list properties) valid */ + private static boolean checkCollectionDataType( + String key, Collection values, DataType dataType) { + for (Object value : values) { + if (!checkDataType(key, value, dataType)) { + return false; + } + } + return true; + } +} diff --git a/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/utils/E.java b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/utils/E.java new file mode 100644 index 00000000000..4415f2846e7 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/utils/E.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hugegraph.utils; + +import javax.annotation.Nullable; + +import java.util.Collection; + +public final class E { + + public static void checkNotNull(Object object, String elem) { + if (object == null) { + throw new NullPointerException(String.format("The '%s' can't be null", elem)); + } + } + + public static void checkNotNull(Object object, String elem, String owner) { + if (object == null) { + throw new NullPointerException( + String.format("The '%s' of '%s' can't be null", elem, owner)); + } + } + + public static void checkNotEmpty(Collection collection, String elem) { + if (collection == null) { + throw new NullPointerException(String.format("The '%s' can't be null", elem)); + } + if (collection.isEmpty()) { + throw new IllegalArgumentException(String.format("The '%s' can't be empty", elem)); + } + } + + public static void checkNotEmpty(Collection collection, String elem, String owner) { + if (collection == null) { + throw new NullPointerException( + String.format("The '%s' of '%s' can't be null", elem, owner)); + } + if (collection.isEmpty()) { + throw new IllegalArgumentException( + String.format("The '%s' of '%s' can't be empty", elem, owner)); + } + } + + public static void checkArgument( + boolean expression, @Nullable String message, @Nullable Object... args) { + if (!expression) { + String formattedMessage = + (message == null || args == null || args.length == 0) + ? (message != null ? message : "") + : String.format(message, args); + throw new IllegalArgumentException(formattedMessage); + } + } + + public static void checkArgumentNotNull( + Object object, @Nullable String message, @Nullable Object... args) { + checkArgument(object != null, message, args); + } + + public static void checkState( + boolean expression, @Nullable String message, @Nullable Object... args) { + if (!expression) { + String formattedMessage = + (message == null || args == null || args.length == 0) + ? (message != null ? message : "") + : String.format(message, args); + throw new IllegalStateException(formattedMessage); + } + } +} diff --git a/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/utils/SchemaValidator.java b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/utils/SchemaValidator.java new file mode 100644 index 00000000000..2ca1c6f80f0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hugegraph/src/main/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/utils/SchemaValidator.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hugegraph.utils; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.client.HugeGraphClient; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.HugeGraphSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.MappingConfig; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.SchemaConfig; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.SchemaConfig.LabelType; + +import org.apache.hugegraph.driver.SchemaManager; +import org.apache.hugegraph.structure.constant.Cardinality; +import org.apache.hugegraph.structure.constant.DataType; +import org.apache.hugegraph.structure.schema.EdgeLabel; +import org.apache.hugegraph.structure.schema.PropertyKey; +import org.apache.hugegraph.structure.schema.VertexLabel; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** Validates the SeaTunnel schema against the HugeGraph schema. */ +public final class SchemaValidator { + + private final HugeGraphSinkConfig sinkConfig; + private final SeaTunnelRowType rowType; + private final HugeGraphClient client; + + public SchemaValidator(HugeGraphSinkConfig config, SeaTunnelRowType rowType) { + this.sinkConfig = config; + this.rowType = rowType; + this.client = new HugeGraphClient(sinkConfig); + } + + public void validateSchema() { + try { + SchemaConfig schemaConfig = sinkConfig.getSchemaConfig(); + if (schemaConfig.getType() == LabelType.VERTEX) { + validateVertex(schemaConfig); + } else if (schemaConfig.getType() == LabelType.EDGE) { + validateEdge(schemaConfig); + } else { + throw new IllegalArgumentException( + "Unsupported schema type: " + schemaConfig.getType()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + client.close(); + } + } + + private void validateVertex(SchemaConfig schemaConfig) { + SchemaManager schema = this.client.getSchema(); + String label = schemaConfig.getLabel(); + VertexLabel vertexLabel = schema.getVertexLabel(label); + if (vertexLabel == null) { + throw new IllegalArgumentException( + String.format("Vertex label '%s' does not exist in HugeGraph.", label)); + } + validateLabelProperties(schema, label, schemaConfig, vertexLabel.properties()); + } + + private void validateEdge(SchemaConfig schemaConfig) { + SchemaManager schema = this.client.getSchema(); + String label = schemaConfig.getLabel(); + EdgeLabel edgeLabel = schema.getEdgeLabel(label); + if (edgeLabel == null) { + throw new IllegalArgumentException( + String.format("Edge label '%s' does not exist in HugeGraph.", label)); + } + validateSourceTarget(schemaConfig, edgeLabel); + validateLabelProperties(schema, label, schemaConfig, edgeLabel.properties()); + } + + private void validateSourceTarget(SchemaConfig schemaConfig, EdgeLabel edgeLabel) { + String label = schemaConfig.getLabel(); + String schemaSource = edgeLabel.sourceLabel(); + if (!schemaSource.equals(schemaConfig.getSourceConfig().getLabel())) { + throw new IllegalArgumentException( + String.format( + "EdgeLabel[%s] sourceLabel mismatch: schema=%s, config=%s", + label, schemaSource, schemaConfig.getSourceConfig())); + } + + String schemaTarget = edgeLabel.targetLabel(); + if (!schemaTarget.equals(schemaConfig.getTargetConfig().getLabel())) { + throw new IllegalArgumentException( + String.format( + "EdgeLabel[%s] targetLabel mismatch: schema=%s, config=%s", + label, schemaTarget, schemaConfig.getTargetConfig())); + } + } + + /** + * Validates if the properties from SeaTunnelRowType are compatible with the HugeGraph schema. + */ + private void validateLabelProperties( + SchemaManager schema, + String label, + SchemaConfig schemaConfig, + Set hugegraphProperties) { + + MappingConfig mappingConfig = schemaConfig.getMapping(); + Map fieldMapping = + mappingConfig == null || mappingConfig.getFieldMapping() == null + ? Collections.emptyMap() + : mappingConfig.getFieldMapping(); + + for (int i = 0; i < rowType.getTotalFields(); i++) { + String fieldName = rowType.getFieldName(i); + SeaTunnelDataType seaTunnelType = rowType.getFieldType(i); + String propertyName = fieldMapping.getOrDefault(fieldName, fieldName); + + // 1. Check if the property exists in HugeGraph + if (!hugegraphProperties.contains(propertyName)) { + throw new IllegalArgumentException( + String.format( + "Property '%s' for label '%s' is defined in the connector config, but does not exist in the HugeGraph schema.", + propertyName, label)); + } + + // 2. Check for data type compatibility + PropertyKey propertyKey = schema.getPropertyKey(propertyName); + DataType hugeGraphType = propertyKey.dataType(); + Cardinality cardinality = propertyKey.cardinality(); + + if (!isCompatible(seaTunnelType, hugeGraphType, cardinality)) { + throw new IllegalArgumentException( + String.format( + "Data type mismatch for property '%s' on label '%s'. " + + "SeaTunnel type '%s' is not compatible with HugeGraph type '%s'.", + propertyName, label, seaTunnelType, hugeGraphType)); + } + } + } + + /** Checks if a SeaTunnelDataType is compatible with a HugeGraph DataType. */ + private boolean isCompatible( + SeaTunnelDataType seaTunnelType, DataType hugeGraphType, Cardinality cardinality) { + switch (seaTunnelType.getSqlType()) { + case BYTES: + return hugeGraphType == DataType.BLOB; + case TINYINT: + case SMALLINT: + case INT: + return hugeGraphType == DataType.INT; + case BIGINT: + return hugeGraphType == DataType.LONG; + case FLOAT: + return hugeGraphType == DataType.FLOAT; + case DOUBLE: + return hugeGraphType == DataType.DOUBLE; + case BOOLEAN: + return hugeGraphType == DataType.BOOLEAN; + case DATE: + case TIMESTAMP: + return hugeGraphType == DataType.DATE; + case ARRAY: + SeaTunnelDataType elementType = + ((ArrayType) seaTunnelType).getElementType(); + if (cardinality != Cardinality.SINGLE) { + return isCompatible(elementType, hugeGraphType, Cardinality.LIST); + } else { + return false; + } + case MAP: + case DECIMAL: // Decimal is mapped to TEXT to preserve precision + case ROW: + case TIME: + case NULL: + case STRING: + return hugeGraphType == DataType.TEXT; + default: + // Unsupported types are considered incompatible. + return false; + } + } +} diff --git a/seatunnel-connectors-v2/connector-hugegraph/src/test/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/config/HugeGraphSinkConfigTest.java b/seatunnel-connectors-v2/connector-hugegraph/src/test/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/config/HugeGraphSinkConfigTest.java new file mode 100644 index 00000000000..bd815076003 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hugegraph/src/test/java/org/apache/seatunnel/connectors/seatunnel/hugegraph/config/HugeGraphSinkConfigTest.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hugegraph.config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import org.apache.hugegraph.structure.constant.IdStrategy; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.when; + +class HugeGraphSinkConfigTest { + // Automatically create mock objects using @Mock annotation + @Mock private ReadonlyConfig mockConfig; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + } + + @Test + void testOf_shouldCreateConfigFromReadonlyConfig() { + // --- 1. Arrange --- + // Define and stub the expected values from mockConfig. + String expectedHost = "127.0.0.1"; + int expectedPort = 8080; + String expectedGraph = "my_graph"; + String expectedUsername = "test_user"; + String expectedProperty = "{test_password}"; + + // Required fields stubbing + when(mockConfig.get(HugeGraphOptions.HOST)).thenReturn(expectedHost); + when(mockConfig.get(HugeGraphOptions.PORT)).thenReturn(expectedPort); + when(mockConfig.get(HugeGraphOptions.GRAPH_NAME)).thenReturn(expectedGraph); + when(mockConfig.getOptional(HugeGraphOptions.BATCH_SIZE)).thenReturn(Optional.of(1024)); + when(mockConfig.getOptional(HugeGraphOptions.BATCH_INTERVAL_MS)) + .thenReturn(Optional.of(500)); + when(mockConfig.getOptional(HugeGraphOptions.MAX_RETRIES)).thenReturn(Optional.of(5)); + when(mockConfig.getOptional(HugeGraphOptions.RETRY_BACKOFF_MS)) + .thenReturn(Optional.of(200)); + + // Optional fields stubbing + when(mockConfig.getOptional(HugeGraphOptions.USERNAME)) + .thenReturn(Optional.of(expectedUsername)); + when(mockConfig.getOptional(HugeGraphOptions.PASSWORD)).thenReturn(Optional.empty()); + when(mockConfig.getOptional(HugeGraphOptions.GRAPH_SPACE)).thenReturn(Optional.empty()); + when(mockConfig.getOptional(HugeGraphSinkOptions.SELECTED_FIELDS)) + .thenReturn(Optional.empty()); + when(mockConfig.getOptional(HugeGraphSinkOptions.IGNORED_FIELDS)) + .thenReturn(Optional.empty()); + + // --- 2. Act --- + // Call the static method under test. + HugeGraphSinkConfig actualSinkConfig = HugeGraphSinkConfig.of(mockConfig); + + // --- 3. Assert --- + // Verify that the values in the returned sinkConfig object are as expected. + assertNotNull(actualSinkConfig); + assertEquals(expectedHost, actualSinkConfig.getHost()); + assertEquals(expectedPort, actualSinkConfig.getPort()); + assertEquals(expectedGraph, actualSinkConfig.getGraphName()); + assertEquals(1024, actualSinkConfig.getBatchSize()); + + assertEquals(expectedUsername, actualSinkConfig.getUsername()); + assertNull(actualSinkConfig.getPassword()); + } + + @Test + void testDefaultValues() { + // 1. Arrange: Create a map with only required fields, omitting those with defaults + Map configMap = new HashMap<>(); + configMap.put("host", "127.0.0.1"); + configMap.put("port", 8080); + configMap.put("graph_name", "hugegraph"); + + // Note: batch_size, batch_interval_ms, max_retries, retry_backoff_ms are omitted + + // 2. Act: Create ReadonlyConfig and parse it + ReadonlyConfig config = ReadonlyConfig.fromMap(configMap); + HugeGraphSinkConfig sinkConfig = HugeGraphSinkConfig.of(config); + + // 3. Assert: Verify that the omitted fields are populated with their default values + assertNotNull(sinkConfig); + assertEquals( + HugeGraphOptions.BATCH_SIZE.defaultValue(), + sinkConfig.getBatchSize(), + "Batch size should fall back to the default value"); + assertEquals( + HugeGraphOptions.BATCH_INTERVAL_MS.defaultValue(), + sinkConfig.getBatchIntervalMs(), + "Batch interval should fall back to the default value"); + assertEquals( + HugeGraphOptions.MAX_RETRIES.defaultValue(), + sinkConfig.getMaxRetries(), + "Max retries should fall back to the default value"); + assertEquals( + HugeGraphOptions.RETRY_BACKOFF_MS.defaultValue(), + sinkConfig.getRetryBackoffMs(), + "Retry backoff should fall back to the default value"); + } + + @Test + void testFullConfigMapping() { + // 1. Arrange: Create a comprehensive configuration map + Map configMap = new HashMap<>(); + configMap.put("host", "192.168.1.1"); + configMap.put("port", 8888); + configMap.put("graph_name", "full_graph"); + configMap.put("graph_space", "full_space"); + configMap.put("username", "admin"); + configMap.put("password", "pa$$w0rd"); + configMap.put("batch_size", 100); + configMap.put("batch_interval_ms", 2000); + configMap.put("max_retries", 10); + configMap.put("retry_backoff_ms", 1000); + configMap.put("selected_fields", Collections.singletonList("name")); + configMap.put("ignored_fields", Collections.singletonList("id")); + + Map propertyMapping = new HashMap<>(); + propertyMapping.put("name", "vertex_name"); + configMap.put("property_mapping", propertyMapping); + + Map schema = new HashMap<>(); + schema.put("type", "VERTEX"); + schema.put("label", "device"); + schema.put("idStrategy", "CUSTOMIZE_UUID"); + schema.put("idFields", Collections.singletonList("device_id")); + configMap.put("schema_config", schema); + + // 2. Act: Create ReadonlyConfig and parse it + ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap); + HugeGraphSinkConfig sinkConfig = HugeGraphSinkConfig.of(readonlyConfig); + + // 3. Assert: Verify all fields are correctly parsed + assertNotNull(sinkConfig); + assertEquals("192.168.1.1", sinkConfig.getHost()); + assertEquals(8888, sinkConfig.getPort()); + assertEquals("full_graph", sinkConfig.getGraphName()); + assertEquals("full_space", sinkConfig.getGraphSpace()); + assertEquals("admin", sinkConfig.getUsername()); + assertEquals("pa$$w0rd", sinkConfig.getPassword()); + assertEquals(100, sinkConfig.getBatchSize()); + assertEquals(2000, sinkConfig.getBatchIntervalMs()); + assertEquals(10, sinkConfig.getMaxRetries()); + assertEquals(1000, sinkConfig.getRetryBackoffMs()); + + // Assert collections and maps + assertEquals(1, sinkConfig.getSelectedFields().size()); + assertEquals("name", sinkConfig.getSelectedFields().get(0)); + assertEquals(1, sinkConfig.getIgnoredFields().size()); + assertEquals("id", sinkConfig.getIgnoredFields().get(0)); + + // Assert nested schema object + assertNotNull(sinkConfig.getSchemaConfig()); + assertEquals(SchemaConfig.LabelType.VERTEX, sinkConfig.getSchemaConfig().getType()); + assertEquals("device", sinkConfig.getSchemaConfig().getLabel()); + assertEquals(IdStrategy.CUSTOMIZE_UUID, sinkConfig.getSchemaConfig().getIdStrategy()); + assertEquals( + Collections.singletonList("device_id"), sinkConfig.getSchemaConfig().getIdFields()); + } + + @Test + void testEdgeSchemaConfigParsing() { + // 1. Arrange: Create a configuration map for an edge schema + Map configMap = new HashMap<>(); + configMap.put("host", "localhost"); + configMap.put("port", 8080); + configMap.put("graph_name", "edge_graph"); + + Map schema = new HashMap<>(); + schema.put("type", "EDGE"); + schema.put("label", "knows"); + schema.put("tablePath", "db1.person_friends"); + + Map sourceConfig = new HashMap<>(); + sourceConfig.put("label", "person"); + sourceConfig.put("idFields", Collections.singletonList("person_id")); + schema.put("sourceConfig", sourceConfig); + + Map targetConfig = new HashMap<>(); + targetConfig.put("label", "person"); + targetConfig.put("idFields", Collections.singletonList("friend_id")); + schema.put("targetConfig", targetConfig); + + configMap.put("schema_config", schema); + + // 2. Act: Create ReadonlyConfig and parse it + ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap); + HugeGraphSinkConfig sinkConfig = HugeGraphSinkConfig.of(readonlyConfig); + + // 3. Assert: Verify the edge schema fields are correctly parsed + assertNotNull(sinkConfig); + assertNotNull(sinkConfig.getSchemaConfig()); + SchemaConfig schemaConfig = sinkConfig.getSchemaConfig(); + + assertEquals(SchemaConfig.LabelType.EDGE, schemaConfig.getType()); + assertEquals("knows", schemaConfig.getLabel()); + assertEquals("db1.person_friends", schemaConfig.getTablePath()); + + assertNotNull(schemaConfig.getSourceConfig()); + assertEquals("person", schemaConfig.getSourceConfig().getLabel()); + assertEquals( + Collections.singletonList("person_id"), + schemaConfig.getSourceConfig().getIdFields()); + + assertNotNull(schemaConfig.getTargetConfig()); + assertEquals("person", schemaConfig.getTargetConfig().getLabel()); + assertEquals( + Collections.singletonList("friend_id"), + schemaConfig.getTargetConfig().getIdFields()); + } +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 9cfbdcd62db..c587da3740d 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -44,6 +44,7 @@ connector-hive connector-file connector-hudi + connector-hugegraph connector-assert connector-kudu connector-email diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index 9367cfb29a8..8f79459a541 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -644,6 +644,13 @@ provided + + org.apache.seatunnel + connector-hugegraph + ${project.version} + provided + + com.aliyun.phoenix diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hugegraph-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hugegraph-e2e/pom.xml new file mode 100644 index 00000000000..3bc973c3517 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hugegraph-e2e/pom.xml @@ -0,0 +1,112 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connector-v2-e2e + ${revision} + + + connector-hugegraph-e2e + SeaTunnel : E2E : Connector V2 : HugeGraph + + + + org.apache.seatunnel + connector-assert + ${project.version} + test + + + + org.apache.seatunnel + connector-console + ${project.version} + test + + + + org.apache.seatunnel + connector-hugegraph + ${project.version} + test + + + + org.apache.seatunnel + connector-fake + ${project.version} + test + + + + org.testcontainers + postgresql + ${testcontainer.version} + test + + + + org.apache.seatunnel + connector-jdbc-e2e-common + ${project.version} + test-jar + test + + + + org.mock-server + mockserver-netty-no-dependencies + 5.14.0 + test + + + + org.junit.jupiter + junit-jupiter-api + test + + + + org.junit.jupiter + junit-jupiter-engine + test + + + + org.testcontainers + junit-jupiter + ${testcontainer.version} + test + + + + io.rest-assured + rest-assured + 5.3.0 + test + + + + org.slf4j + slf4j-simple + test + + + + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hugegraph-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hugegraph/HugeGraphIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hugegraph-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hugegraph/HugeGraphIT.java new file mode 100644 index 00000000000..bc8bdc518b1 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hugegraph-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hugegraph/HugeGraphIT.java @@ -0,0 +1,672 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.hugegraph; + +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.HugeGraphSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.MappingConfig; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.SchemaConfig; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.config.SchemaConfig.SourceTargetConfig; +import org.apache.seatunnel.connectors.seatunnel.hugegraph.sink.HugeGraphSinkWriter; + +import org.apache.hugegraph.driver.HugeClient; +import org.apache.hugegraph.exception.ServerException; +import org.apache.hugegraph.structure.constant.IdStrategy; +import org.apache.hugegraph.structure.graph.Edge; +import org.apache.hugegraph.structure.graph.Vertex; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@Testcontainers +public class HugeGraphIT { + + private static final String HUGE_GRAPH_IMAGE = "hugegraph/hugegraph:latest"; + private static final String GRAPH_NAME = "hugegraph"; + private static final String VERTEX_LABEL_PERSON = "person_for_test"; + private static final String VERTEX_LABEL_ALL_TYPES = "vertex_all_types_for_test"; + private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE = + new SeaTunnelRowType( + new String[] {"name", "age"}, + new SeaTunnelDataType[] { + org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE, + org.apache.seatunnel.api.table.type.BasicType.INT_TYPE + }); + private static final DateTimeFormatter formatter = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); + private static HugeClient hugeClient; + + @Container + private static final GenericContainer HUGE_GRAPH_CONTAINER = + new GenericContainer<>(DockerImageName.parse(HUGE_GRAPH_IMAGE)) + .withExposedPorts(8080) + .waitingFor(Wait.forHttp("/graphs").forPort(8080).forStatusCode(200)) + .withStartupTimeout(Duration.ofMinutes(3)); + + @BeforeAll + public static void setup() { + String host = HUGE_GRAPH_CONTAINER.getHost(); + Integer port = HUGE_GRAPH_CONTAINER.getMappedPort(8080); + String url = String.format("http://%s:%d", host, port); + hugeClient = HugeClient.builder(url, GRAPH_NAME).build(); + setupSchema(); + } + + @AfterAll + public static void cleanup() { + if (hugeClient != null) { + hugeClient.close(); + } + } + + @BeforeEach + public void clearGraph() { + // Clear all vertices and edges before each test using GraphsManager.clearGraph() + try { + hugeClient.graphs().clearGraph(GRAPH_NAME, "I'm sure to delete all data"); + // After clearing, need to recreate schema + setupSchema(); + } catch (Exception e) { + // Ignore errors during clear + } + } + + private static void setupSchema() { + hugeClient.schema().propertyKey("name").asText().ifNotExist().create(); + hugeClient.schema().propertyKey("age").asInt().ifNotExist().create(); + hugeClient + .schema() + .vertexLabel(VERTEX_LABEL_PERSON) + .idStrategy(IdStrategy.PRIMARY_KEY) + .primaryKeys("name") + .properties("name", "age") + .ifNotExist() + .create(); + + hugeClient.schema().propertyKey("duration").asFloat().ifNotExist().create(); + hugeClient + .schema() + .edgeLabel("knows") + .sourceLabel(VERTEX_LABEL_PERSON) + .targetLabel(VERTEX_LABEL_PERSON) + .properties("duration") + .ifNotExist() + .create(); + + // New schema for all types vertex + hugeClient.schema().propertyKey("id_field").asText().ifNotExist().create(); + hugeClient.schema().propertyKey("prop_string").asText().ifNotExist().create(); + hugeClient.schema().propertyKey("prop_long").asLong().ifNotExist().create(); + hugeClient.schema().propertyKey("prop_double").asDouble().ifNotExist().create(); + hugeClient.schema().propertyKey("prop_boolean").asBoolean().ifNotExist().create(); + hugeClient.schema().propertyKey("prop_date").asDate().ifNotExist().create(); + + hugeClient + .schema() + .vertexLabel(VERTEX_LABEL_ALL_TYPES) + .idStrategy(IdStrategy.CUSTOMIZE_STRING) + .properties( + "id_field", + "prop_string", + "prop_long", + "prop_double", + "prop_boolean", + "prop_date") + .ifNotExist() + .create(); + + hugeClient.schema().propertyKey("lang").asText().ifNotExist().create(); + + hugeClient + .schema() + .vertexLabel("person_pk_for_edge") + .idStrategy(IdStrategy.PRIMARY_KEY) + .primaryKeys("name") + .properties("name") + .ifNotExist() + .create(); + + hugeClient + .schema() + .vertexLabel("software_cs_for_edge") + .idStrategy(IdStrategy.CUSTOMIZE_STRING) + .properties("lang") + .ifNotExist() + .create(); + + hugeClient + .schema() + .edgeLabel("transfer") + .sourceLabel("person_pk_for_edge") + .targetLabel("software_cs_for_edge") + .properties("prop_string", "prop_long", "prop_double", "prop_boolean", "prop_date") + .ifNotExist() + .create(); + } + + private HugeGraphSinkWriter createSinkWriter( + SchemaConfig schemaConfig, SeaTunnelRowType rowType) throws IOException { + HugeGraphSinkConfig config = new HugeGraphSinkConfig(); + config.setHost(HUGE_GRAPH_CONTAINER.getHost()); + config.setPort(HUGE_GRAPH_CONTAINER.getMappedPort(8080)); + config.setGraphName(GRAPH_NAME); + config.setSchemaConfig(schemaConfig); + return new HugeGraphSinkWriter(config, rowType); + } + + @Test + public void testInsert() throws IOException { + SchemaConfig schemaConfig = new SchemaConfig(); + schemaConfig.setType(SchemaConfig.LabelType.VERTEX); + schemaConfig.setLabel(VERTEX_LABEL_PERSON); + schemaConfig.setIdStrategy(IdStrategy.PRIMARY_KEY); + schemaConfig.setIdFields(Collections.singletonList("name")); + + try { + HugeGraphSinkWriter writer = createSinkWriter(schemaConfig, SEATUNNEL_ROW_TYPE); + SeaTunnelRow row = new SeaTunnelRow(new Object[] {"marko", 29}); + row.setRowKind(RowKind.INSERT); + writer.write(row); + writer.close(); + } finally { + + } + + // Verify using REST API + Map properties = new HashMap<>(); + properties.put("name", "marko"); + List vertices = + hugeClient.graph().listVertices(VERTEX_LABEL_PERSON, properties, 10); + assertEquals(1, vertices.size()); + assertEquals(29, vertices.get(0).property("age")); + } + + @Test + public void testEdgeInsert() throws IOException { + // 1. Insert source and target vertices + Vertex marko = + new Vertex(VERTEX_LABEL_PERSON).property("name", "marko").property("age", 29); + Vertex david = + new Vertex(VERTEX_LABEL_PERSON).property("name", "david").property("age", 30); + hugeClient.graph().addVertex(marko); + hugeClient.graph().addVertex(david); + + // 2. Define edge row type + SeaTunnelRowType edgeRowType = + new SeaTunnelRowType( + new String[] {"src_name", "tgt_name", "duration"}, + new SeaTunnelDataType[] { + org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE, + org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE, + org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE + }); + + // 3. Configure SchemaConfig for edge + SchemaConfig schemaConfig = new SchemaConfig(); + schemaConfig.setType(SchemaConfig.LabelType.EDGE); + schemaConfig.setLabel("knows"); + + SourceTargetConfig sourceConfig = new SourceTargetConfig(); + sourceConfig.setLabel(VERTEX_LABEL_PERSON); + sourceConfig.setIdFields(Collections.singletonList("src_name")); + + SourceTargetConfig targetConfig = new SourceTargetConfig(); + targetConfig.setLabel(VERTEX_LABEL_PERSON); + targetConfig.setIdFields(Collections.singletonList("tgt_name")); + + schemaConfig.setSourceConfig(sourceConfig); + schemaConfig.setTargetConfig(targetConfig); + + MappingConfig mappingConfig = new MappingConfig(); + Map map = new HashMap<>(); + map.put("duration", "duration"); + map.put("src_name", "name"); + map.put("tgt_name", "name"); + mappingConfig.setFieldMapping(map); + schemaConfig.setMapping(mappingConfig); + + try { + // 4. Create writer with new row type + HugeGraphSinkWriter writer = createSinkWriter(schemaConfig, edgeRowType); + // 5. Create and write row + SeaTunnelRow row = new SeaTunnelRow(new Object[] {"marko", "david", 1.5}); + row.setRowKind(RowKind.INSERT); + writer.write(row); + writer.close(); + } finally { + } + + // 6. Verify edge creation + List edges = hugeClient.graph().listEdges("knows"); + assertEquals(1, edges.size()); + Edge createdEdge = edges.get(0); + assertEquals(1.5, createdEdge.property("duration")); + + // Also verify source and target + Vertex sourceVertex = hugeClient.graph().getVertex(createdEdge.sourceId()); + Vertex targetVertex = hugeClient.graph().getVertex(createdEdge.targetId()); + assertEquals("marko", sourceVertex.property("name")); + assertEquals("david", targetVertex.property("name")); + + // 7. Verify the frequency setting + try { + HugeGraphSinkWriter writer = createSinkWriter(schemaConfig, edgeRowType); + SeaTunnelRow row = new SeaTunnelRow(new Object[] {"marko", "david", 11.0}); + row.setRowKind(RowKind.INSERT); + writer.write(row); + writer.close(); + } finally { + } + + List edges_overwrite = hugeClient.graph().listEdges("knows"); + assertEquals(1, edges_overwrite.size()); + Edge createdEdge_overwrite = edges_overwrite.get(0); + assertEquals(11.0, createdEdge_overwrite.property("duration")); + } + + @Test + public void testUpdate() throws IOException { + // First, insert a vertex using REST API + Vertex vadas = new Vertex(VERTEX_LABEL_PERSON); + vadas.property("name", "vadas"); + vadas.property("age", 27); + hugeClient.graph().addVertex(vadas); + + MappingConfig mappingConfig = new MappingConfig(); + Map map = new HashMap<>(); + map.put("name", "name"); + map.put("age", "age"); + mappingConfig.setFieldMapping(map); + SchemaConfig schemaConfig = new SchemaConfig(); + schemaConfig.setType(SchemaConfig.LabelType.VERTEX); + schemaConfig.setLabel(VERTEX_LABEL_PERSON); + schemaConfig.setIdStrategy(IdStrategy.PRIMARY_KEY); + schemaConfig.setIdFields(Collections.singletonList("name")); + schemaConfig.setMapping(mappingConfig); + + try { + HugeGraphSinkWriter writer = createSinkWriter(schemaConfig, SEATUNNEL_ROW_TYPE); + SeaTunnelRow row = new SeaTunnelRow(new Object[] {"vadas", 28}); + row.setRowKind(RowKind.UPDATE_AFTER); + writer.write(row); + writer.close(); + } finally { + } + + // Verify using REST API + Map properties = new HashMap<>(); + properties.put("name", "vadas"); + List vertices = + hugeClient.graph().listVertices(VERTEX_LABEL_PERSON, properties, 10); + assertEquals(1, vertices.size()); + assertEquals(28, vertices.get(0).property("age")); + } + + @Test + public void testEdgeDelete() throws IOException { + // 1. Insert vertices and an edge to be deleted + Vertex marko = + new Vertex(VERTEX_LABEL_PERSON).property("name", "marko").property("age", 29); + Vertex david = + new Vertex(VERTEX_LABEL_PERSON).property("name", "david").property("age", 30); + marko = hugeClient.graph().addVertex(marko); + david = hugeClient.graph().addVertex(david); + + Edge edge = new Edge("knows").source(marko).target(david).property("duration", 12.3); + hugeClient.graph().addEdge(edge); + + // Verify it exists first and there + assertEquals(1, hugeClient.graph().listEdges("knows").size()); + + // 2. Define edge row type (only source/target fields needed for identification) + SeaTunnelRowType edgeRowType = + new SeaTunnelRowType( + new String[] {"src_name", "tgt_name"}, + new SeaTunnelDataType[] { + org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE, + org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE + }); + + // 3. Configure SchemaConfig for edge deletion + SchemaConfig schemaConfig = new SchemaConfig(); + schemaConfig.setType(SchemaConfig.LabelType.EDGE); + schemaConfig.setLabel("knows"); + + SourceTargetConfig sourceConfig = new SourceTargetConfig(); + sourceConfig.setLabel(VERTEX_LABEL_PERSON); + sourceConfig.setIdFields(Collections.singletonList("src_name")); + SourceTargetConfig targetConfig = new SourceTargetConfig(); + targetConfig.setLabel(VERTEX_LABEL_PERSON); + targetConfig.setIdFields(Collections.singletonList("tgt_name")); + schemaConfig.setSourceConfig(sourceConfig); + schemaConfig.setTargetConfig(targetConfig); + + MappingConfig mappingConfig = new MappingConfig(); + Map map = new HashMap<>(); + map.put("duration", "duration"); + map.put("src_name", "name"); + map.put("tgt_name", "name"); + mappingConfig.setFieldMapping(map); + schemaConfig.setMapping(mappingConfig); + + try { + // 4. Create writer + HugeGraphSinkWriter writer = createSinkWriter(schemaConfig, edgeRowType); + // 5. Create and write DELETE row + SeaTunnelRow row = new SeaTunnelRow(new Object[] {"marko", "david"}); + row.setRowKind(RowKind.DELETE); + writer.write(row); + writer.close(); + } finally { + } + + // 6. Verify edge is deleted + Assertions.assertTrue(hugeClient.graph().listEdges("knows").isEmpty()); + } + + @Test + public void testDelete() throws IOException { + // First, insert a vertex using REST API + Vertex josh = new Vertex(VERTEX_LABEL_PERSON); + josh.property("name", "josh"); + josh.property("age", 32); + hugeClient.graph().addVertex(josh); + + SchemaConfig schemaConfig = new SchemaConfig(); + schemaConfig.setType(SchemaConfig.LabelType.VERTEX); + schemaConfig.setLabel(VERTEX_LABEL_PERSON); + schemaConfig.setIdStrategy(IdStrategy.PRIMARY_KEY); + schemaConfig.setIdFields(Collections.singletonList("name")); + + try { + HugeGraphSinkWriter writer = createSinkWriter(schemaConfig, SEATUNNEL_ROW_TYPE); + // The row only needs to contain the ID fields for a delete operation + SeaTunnelRow row = new SeaTunnelRow(new Object[] {"josh", 32}); + row.setRowKind(RowKind.DELETE); + writer.write(row); + writer.close(); + } finally { + } + + // Verify using REST API + Map properties = new HashMap<>(); + properties.put("name", "josh"); + List vertices = + hugeClient.graph().listVertices(VERTEX_LABEL_PERSON, properties, 10); + Assertions.assertTrue(vertices.isEmpty(), "Vertex should have been deleted"); + } + + @Test + public void testVertexWithCustomizedIdAndAllTypes() throws IOException { + // 1. Define RowType for vertex with various data types + SeaTunnelRowType allTypesRowType = + new SeaTunnelRowType( + new String[] { + "id_field", + "prop_string", + "prop_long", + "prop_double", + "prop_boolean", + "prop_date_1" + }, + new SeaTunnelDataType[] { + org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE, + org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE, + org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE, + org.apache.seatunnel.api.table.type.BasicType.DOUBLE_TYPE, + org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE, + org.apache.seatunnel.api.table.type.LocalTimeType.LOCAL_DATE_TIME_TYPE + }); + + // 2. Configure SchemaConfig for the new vertex type + MappingConfig mappingConfig = new MappingConfig(); + Map map = new HashMap<>(); + map.put("prop_date_1", "prop_date"); + mappingConfig.setFieldMapping(map); // 'id_field' will be used as the custom ID + mappingConfig.setTimeZone("UTC"); + + SchemaConfig schemaConfig = new SchemaConfig(); + schemaConfig.setType(SchemaConfig.LabelType.VERTEX); + schemaConfig.setLabel(VERTEX_LABEL_ALL_TYPES); + schemaConfig.setIdStrategy(IdStrategy.CUSTOMIZE_STRING); + schemaConfig.setIdFields(Collections.singletonList("id_field")); + schemaConfig.setMapping(mappingConfig); + + // 3. INSERT operation + HugeGraphSinkWriter writer = createSinkWriter(schemaConfig, allTypesRowType); + LocalDateTime insertDate = LocalDateTime.of(2023, 1, 1, 12, 0, 0); + Object[] insertData = + new Object[] {"custom_id_1", "hello", 2147483648L, 123.45, true, insertDate}; + SeaTunnelRow insertRow = new SeaTunnelRow(insertData); + insertRow.setRowKind(RowKind.INSERT); + writer.write(insertRow); + writer.close(); + + // 4. Verify INSERT + System.out.println(hugeClient.graph().getVertex("custom_id_1")); + Vertex insertedVertex = hugeClient.graph().getVertex("custom_id_1"); + Assertions.assertNotNull(insertedVertex); + assertEquals(VERTEX_LABEL_ALL_TYPES, insertedVertex.label()); + assertEquals("hello", insertedVertex.property("prop_string")); + assertEquals(2147483648L, insertedVertex.property("prop_long")); + assertEquals(123.45, insertedVertex.property("prop_double")); + assertEquals(true, insertedVertex.property("prop_boolean")); + // The date is serialized as a long (timestamp) + Date expectedDate = Date.from(insertDate.atZone(ZoneOffset.UTC).toInstant()); + LocalDateTime insertDateTime = + LocalDateTime.parse((String) insertedVertex.property("prop_date"), formatter); + long insertTimeStampUtc = insertDateTime.toInstant(ZoneOffset.UTC).toEpochMilli(); + Assertions.assertEquals(expectedDate.getTime(), insertTimeStampUtc); + + // 5. UPDATE operation + writer = createSinkWriter(schemaConfig, allTypesRowType); + LocalDateTime updateDate = LocalDateTime.of(2024, 2, 2, 1, 1, 1); + Object[] updateData = + new Object[] {"custom_id_1", "world", 2000000L, 543.21, false, updateDate}; + SeaTunnelRow updateRow = new SeaTunnelRow(updateData); + updateRow.setRowKind(RowKind.UPDATE_AFTER); + writer.write(updateRow); + writer.close(); + + // 6. Verify UPDATE + System.out.println(hugeClient.graph().getVertex("custom_id_1")); + Vertex updatedVertex = hugeClient.graph().getVertex("custom_id_1"); + Assertions.assertNotNull(updatedVertex); + assertEquals("world", updatedVertex.property("prop_string")); + assertEquals(2000000L, ((Number) updatedVertex.property("prop_long")).longValue()); + assertEquals(543.21, updatedVertex.property("prop_double")); + assertEquals(false, updatedVertex.property("prop_boolean")); + + Date expectedUpdateDate = Date.from(updateDate.atZone(ZoneOffset.UTC).toInstant()); + LocalDateTime updatedDateTime = + LocalDateTime.parse((String) updatedVertex.property("prop_date"), formatter); + long updatedTimeStampMillisUtc = updatedDateTime.toInstant(ZoneOffset.UTC).toEpochMilli(); + Assertions.assertEquals(expectedUpdateDate.getTime(), updatedTimeStampMillisUtc); + + // 7. DELETE operation + writer = createSinkWriter(schemaConfig, allTypesRowType); + // For delete, only the ID field is required. + Object[] deleteData = new Object[] {"custom_id_1", null, null, null, null, null}; + SeaTunnelRow deleteRow = new SeaTunnelRow(deleteData); + deleteRow.setRowKind(RowKind.DELETE); + writer.write(deleteRow); + writer.close(); + + // 8. Verify DELETE + ServerException serverException = + assertThrows( + ServerException.class, + () -> { + hugeClient.graph().getVertex("custom_id_1"); + }); + + String expectedErrorMessage = "Vertex 'custom_id_1' does not exist"; + assertEquals(expectedErrorMessage, serverException.getMessage()); + } + + @Test + public void testEdgeWithComplexTypesAndIdStrategies() throws IOException { + // 1. Insert source and target vertices + Vertex person = new Vertex("person_pk_for_edge").property("name", "person1"); + hugeClient.graph().addVertex(person); + + Vertex software = new Vertex("software_cs_for_edge"); + software.id("software1"); + software.property("lang", "java"); + hugeClient.graph().addVertex(software); + + // 2. Define edge row type with all properties + SeaTunnelRowType edgeRowType = + new SeaTunnelRowType( + new String[] { + "src_name", + "tgt_id", + "prop_string", + "prop_long", + "prop_double", + "prop_boolean", + "prop_date" + }, + new SeaTunnelDataType[] { + org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE, + org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE, + org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE, + org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE, + org.apache.seatunnel.api.table.type.BasicType.DOUBLE_TYPE, + org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE, + org.apache.seatunnel.api.table.type.LocalTimeType.LOCAL_DATE_TIME_TYPE + }); + + // 3. Configure SchemaConfig for edge + SchemaConfig schemaConfig = new SchemaConfig(); + schemaConfig.setType(SchemaConfig.LabelType.EDGE); + schemaConfig.setLabel("transfer"); + + SourceTargetConfig sourceConfig = new SourceTargetConfig(); + sourceConfig.setLabel("person_pk_for_edge"); + sourceConfig.setIdFields(Collections.singletonList("src_name")); + + SourceTargetConfig targetConfig = new SourceTargetConfig(); + targetConfig.setLabel("software_cs_for_edge"); + targetConfig.setIdFields(Collections.singletonList("tgt_id")); + + schemaConfig.setSourceConfig(sourceConfig); + schemaConfig.setTargetConfig(targetConfig); + + MappingConfig mappingConfig = new MappingConfig(); + Map map = new HashMap<>(); + map.put("src_name", "name"); + map.put("tgt_id", "lang"); + mappingConfig.setFieldMapping(map); + schemaConfig.setMapping(mappingConfig); + + // 4. INSERT operation + HugeGraphSinkWriter writer = createSinkWriter(schemaConfig, edgeRowType); + LocalDateTime insertDate = LocalDateTime.of(2023, 1, 1, 12, 0, 0); + Object[] insertData = + new Object[] { + "person1", "software1", "transfer_v1", 100L, 123.45, true, insertDate + }; + SeaTunnelRow insertRow = new SeaTunnelRow(insertData); + insertRow.setRowKind(RowKind.INSERT); + writer.write(insertRow); + writer.close(); + + // 5. Verify INSERT + System.out.println(hugeClient.graph().listEdges("transfer")); + List edges = hugeClient.graph().listEdges("transfer"); + assertEquals(1, edges.size()); + Edge createdEdge = edges.get(0); + assertEquals("transfer_v1", createdEdge.property("prop_string")); + assertEquals(100L, ((Number) createdEdge.property("prop_long")).longValue()); + assertEquals(123.45, createdEdge.property("prop_double")); + assertEquals(true, createdEdge.property("prop_boolean")); + + // Verify source and target + Vertex sourceVertex = hugeClient.graph().getVertex(createdEdge.sourceId()); + Vertex targetVertex = hugeClient.graph().getVertex(createdEdge.targetId()); + assertEquals("person1", sourceVertex.property("name")); + assertEquals("software1", targetVertex.id()); + + // 6. UPDATE operation + writer = createSinkWriter(schemaConfig, edgeRowType); + LocalDateTime updateDate = LocalDateTime.of(2024, 2, 2, 1, 1, 1); + Object[] updateData = + new Object[] { + "person1", "software1", "transfer_v2", 200L, 543.21, false, updateDate + }; + SeaTunnelRow updateRow = new SeaTunnelRow(updateData); + updateRow.setRowKind(RowKind.UPDATE_AFTER); + writer.write(updateRow); + writer.close(); + + // 7. Verify UPDATE + System.out.println(hugeClient.graph().listEdges("transfer")); + edges = hugeClient.graph().listEdges("transfer"); + assertEquals(1, edges.size()); + Edge updatedEdge = edges.get(0); + assertEquals("transfer_v2", updatedEdge.property("prop_string")); + assertEquals(200L, ((Number) updatedEdge.property("prop_long")).longValue()); + assertEquals(543.21, updatedEdge.property("prop_double")); + assertEquals(false, updatedEdge.property("prop_boolean")); + + // 8. DELETE operation + SeaTunnelRowType edgeDeleteRowType = + new SeaTunnelRowType( + new String[] {"src_name", "tgt_id"}, + new SeaTunnelDataType[] { + org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE, + org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE + }); + + writer = createSinkWriter(schemaConfig, edgeDeleteRowType); + Object[] deleteData = new Object[] {"person1", "software1"}; + SeaTunnelRow deleteRow = new SeaTunnelRow(deleteData); + deleteRow.setRowKind(RowKind.DELETE); + writer.write(deleteRow); + writer.close(); + + // 9. Verify DELETE + Assertions.assertTrue(hugeClient.graph().listEdges("transfer").isEmpty()); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index 4cdb7af240d..107715940e1 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -88,6 +88,7 @@ connector-graphql-e2e connector-aerospike-e2e connector-sensorsdata-e2e + connector-hugegraph-e2e