From 826a889633f994e84b51eb34d76638e6e24a128f Mon Sep 17 00:00:00 2001 From: wgzhao Date: Thu, 23 May 2024 17:47:47 +0800 Subject: [PATCH] try to fix mongo reader when deploy mongodb cluster as replica set, it reports timeout, refs #1036 --- .../reader/mongodbreader/MongoDBReader.java | 3 +- .../util/CollectionSplitUtil.java | 3 +- .../reader/mongodbreader/util/MongoUtil.java | 50 +++++++++---------- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/MongoDBReader.java b/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/MongoDBReader.java index 997b3b0f6..821792096 100644 --- a/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/MongoDBReader.java +++ b/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/MongoDBReader.java @@ -19,7 +19,8 @@ package com.wgzhao.addax.plugin.reader.mongodbreader; -import com.mongodb.MongoClient; + +import com.mongodb.client.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; diff --git a/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/CollectionSplitUtil.java b/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/CollectionSplitUtil.java index d6b6bbbca..bfbcdfc16 100644 --- a/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/CollectionSplitUtil.java +++ b/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/CollectionSplitUtil.java @@ -19,8 +19,9 @@ package com.wgzhao.addax.plugin.reader.mongodbreader.util; -import com.mongodb.MongoClient; + import com.mongodb.MongoCommandException; +import com.mongodb.client.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.wgzhao.addax.common.exception.AddaxException; diff --git a/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/MongoUtil.java b/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/MongoUtil.java index 73e37f99d..3b5d0e0d5 100644 --- a/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/MongoUtil.java +++ b/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/MongoUtil.java @@ -19,18 +19,16 @@ package com.wgzhao.addax.plugin.reader.mongodbreader.util; -import com.mongodb.MongoClientOptions; +import com.mongodb.MongoClientSettings; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; import com.wgzhao.addax.common.exception.AddaxException; -import com.wgzhao.addax.common.util.Configuration; -import com.wgzhao.addax.plugin.reader.mongodbreader.KeyConstant; import com.wgzhao.addax.plugin.reader.mongodbreader.MongoDBReaderErrorCode; -import com.mongodb.MongoClient; import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -45,22 +43,7 @@ private MongoUtil() {} public static MongoClient initMongoClient(List addressList) { - - if (addressList == null || addressList.isEmpty()) { - throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_VALUE, "不合法参数"); - } - try { - return new MongoClient(parseServerAddress(addressList)); - } - catch (UnknownHostException e) { - throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS, "不合法的地址"); - } - catch (NumberFormatException e) { - throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_VALUE, "不合法参数"); - } - catch (Exception e) { - throw AddaxException.asAddaxException(MongoDBReaderErrorCode.UNKNOWN_EXCEPTION, "未知异常"); - } + return initCredentialMongoClient(addressList, "", "", null); } public static MongoClient initCredentialMongoClient(List addressList, String userName, String password, String database) @@ -70,11 +53,26 @@ public static MongoClient initCredentialMongoClient(List addressList, St throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_VALUE, "不合法参数"); } try { - MongoCredential credential = MongoCredential.createCredential(userName, database, password.toCharArray()); - return new MongoClient(parseServerAddress(addressList), Collections.singletonList(credential)); - } - catch (UnknownHostException e) { - throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS, "不合法的地址"); + MongoCredential credential = null; + if (! userName.isEmpty() && ! password.isEmpty()) { + credential = MongoCredential.createCredential(userName, database, password.toCharArray()); + } +// return new MongoClient(parseServerAddress(addressList), Collections.singletonList(credential)); + + MongoClientSettings.Builder mongoBuilder = MongoClientSettings.builder() + .applyToClusterSettings(builder -> { + try { + builder.hosts(parseServerAddress(addressList)); + } + catch (UnknownHostException e) { + throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS, "不合法的地址"); + } + }); + if (credential != null) { + mongoBuilder.credential(credential); + } + return MongoClients.create(mongoBuilder.build()); + } catch (NumberFormatException e) { throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_VALUE, "不合法参数");