Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-v2][Mongodb]Refactor mongodb connector #4620

Merged
merged 12 commits into from
May 16, 2023

Conversation

MonsterChenzhuo
Copy link
Contributor

@MonsterChenzhuo MonsterChenzhuo commented Apr 19, 2023

Purpose of this pull request

close:#4280

Check list

@MonsterChenzhuo MonsterChenzhuo changed the title [Feature][Connector-v2][Mongodb]Refactor mongodb source connector [Feature][Connector-v2][Mongodb]Refactor mongodb connector Apr 19, 2023
@MonsterChenzhuo
Copy link
Contributor Author

I will commit the changes to the release-note.md file after the PR review is completed, in order to prevent conflicts that may cause the CI to not run.

@MonsterChenzhuo
Copy link
Contributor Author

cc @TyrantLucifer @hailin0 PTAL, thanks.

@@ -106,75 +83,6 @@ public void initConnection() {
client = MongoClients.create(url);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add more test case to verify refactor is make sense.

import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;

@Slf4j
@DisabledOnContainer(
value = {},
type = {EngineType.FLINK, EngineType.SPARK})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why disable spark and flink?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TyrantLucifer I apologize for the inconvenience. Due to the fact that the Flink 1.13 image cannot run on the Mac M1, I have encountered issues with running Flink CI locally, even though I have successfully validated it with Zeta, Flink, and Spark. When I submit the code directly to GitHub, it often fails for various reasons. My plan is to first use this annotation to limit the scope and get Flink running smoothly. After that, I will remove the annotation.

c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_tinyint = int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change it? Connector should support all type that SeaTunnel has defined, although mongo does not distinguish, you should and convert logic in your connector.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TyrantLucifer You can take a look at the documentation of mongodb connector. The data type of mongodb and the data type of st are not fully mapped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

图片

For example, source, write mongdb through the row of st, and the BSON of mongodb is not distinguished by TINYINT and SMALLINT.


private GenericContainer<?> mongodbContainer;
private MongoClient client;

private static final Random random = new Random();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static parameter should on the top.

rules =
{
row_rules = [
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add more check rules.

database = "test_db"
collection = "source_table"
collection = "sink_table"
no-timeout = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unify the parameter naming style

}

source {
MongoDB {
uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
Mongodb {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change this name

@MonsterChenzhuo
Copy link
Contributor Author

图片

图片

I tested 400w mongodb table reading today.

@MonsterChenzhuo
Copy link
Contributor Author

图片

100G 460w Line data. Mongodb record test EngineServer three nodes, 10 concurrent execution.

@MonsterChenzhuo
Copy link
Contributor Author

图片

图片

Test writing 1000w pieces of data

docs/en/connector-v2/sink/MongoDB.md Outdated Show resolved Hide resolved
docs/en/connector-v2/sink/MongoDB.md Outdated Show resolved Hide resolved
docs/en/connector-v2/source/MongoDB.md Outdated Show resolved Hide resolved
Comment on lines 41 to 43
MongodbConfig.COLLECTION,
MongodbConfig.DATABASE,
MongodbConfig.COLLECTION,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate config COLLECTION ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing uri ?

@MonsterChenzhuo
Copy link
Contributor Author

MonsterChenzhuo commented May 9, 2023

@hailin0 PTAL, thanks.
There is a problem with github in the afternoon. The code has not been fully pushed up. Now it is comprehensive. Please take a look.

doBulkWrite();
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@Override
public Optional<Void> prepareCommit() {
doBulkWrite();
return Optional.empty();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

图片

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hailin0 Does your review mean what is described in the picture above?

}
}

void doBulkWrite() throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void doBulkWrite() throws IOException {
synchronized void doBulkWrite() throws IOException {

@MonsterChenzhuo MonsterChenzhuo force-pushed the mongodb2 branch 2 times, most recently from ac75c9c to 84bd528 Compare May 10, 2023 02:54
@MonsterChenzhuo
Copy link
Contributor Author

@hailin0 @EricJoy2048 PTAL, thanks.

Copy link
Member

@EricJoy2048 EricJoy2048 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, PTAL @hailin0 @TyrantLucifer

hailin0
hailin0 previously approved these changes May 12, 2023
Copy link
Member

@hailin0 hailin0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@TyrantLucifer TyrantLucifer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide multi split read e2e examples.

case ROW:
return createRowConverter((SeaTunnelRowType) type);
default:
throw new UnsupportedOperationException("Not support to parse type: " + type);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MongoDBConnectorException


private static final long serialVersionUID = 1L;

private static String[] upsertKey;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why static

.withDatabase(database)
.withCollection(collection)
.withFlushSize(
pluginConfig.hasPath(MongodbConfig.BUFFER_FLUSH_MAX_ROWS.key())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use if-else instead

boolean useSimpleTextSchema = CatalogTableUtil.buildSimpleTextSchema().equals(rowType);
return new MongodbSinkWriter(rowType, useSimpleTextSchema, params);
return new MongodbWriter(
new RowDataDocumentSerializer(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add context into writer, it will be used to dirty records collect

rowType.getFieldNames(),
rowType,
pluginConfig.hasPath(MongodbConfig.FLAT_SYNC_STRING.key())
? pluginConfig.getBoolean(MongodbConfig.FLAT_SYNC_STRING.key())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same as above

# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same as above

# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same as above

# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same as above

# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same as above

# See the License for the specific language governing permissions and
# limitations under the License.
#
######
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same as above

/** A builder class for creating {@link MongodbClientProvider}. */
public class MongodbCollectionProvider {

public static Builder getBuilder() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static Builder getBuilder() {
public static Builder builder() {

@TyrantLucifer
Copy link
Member

TyrantLucifer commented May 12, 2023

IMO, we will implement ddl change in the future. so the write logic of all connectors should not use async write, write action should bundle with checkpoint and preCommit stage cc @hailin0 @EricJoy2048

@MonsterChenzhuo
Copy link
Contributor Author

@TyrantLucifer PTAL, thanks.

@MonsterChenzhuo MonsterChenzhuo force-pushed the mongodb2 branch 2 times, most recently from 98d82d8 to 7d76fd0 Compare May 15, 2023 12:41
@@ -67,36 +58,145 @@
@Slf4j
public class MongodbIT extends TestSuiteBase implements TestResource {

private static final String MONGODB_IMAGE = "mongo:6.0.5";
private static final Random random = new Random();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static final Random random = new Random();
private static final Random RANDOM = new Random();

Copy link
Member

@TyrantLucifer TyrantLucifer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hailin0 hailin0 merged commit 5b1a843 into apache:dev May 16, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants