From 826a889633f994e84b51eb34d76638e6e24a128f Mon Sep 17 00:00:00 2001 From: wgzhao Date: Thu, 23 May 2024 17:47:47 +0800 Subject: [PATCH 1/2] try to fix mongo reader when deploy mongodb cluster as replica set, it reports timeout, refs #1036 --- .../reader/mongodbreader/MongoDBReader.java | 3 +- .../util/CollectionSplitUtil.java | 3 +- .../reader/mongodbreader/util/MongoUtil.java | 50 +++++++++---------- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/MongoDBReader.java b/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/MongoDBReader.java index 997b3b0f6..821792096 100644 --- a/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/MongoDBReader.java +++ b/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/MongoDBReader.java @@ -19,7 +19,8 @@ package com.wgzhao.addax.plugin.reader.mongodbreader; -import com.mongodb.MongoClient; + +import com.mongodb.client.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; diff --git a/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/CollectionSplitUtil.java b/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/CollectionSplitUtil.java index d6b6bbbca..bfbcdfc16 100644 --- a/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/CollectionSplitUtil.java +++ b/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/CollectionSplitUtil.java @@ -19,8 +19,9 @@ package com.wgzhao.addax.plugin.reader.mongodbreader.util; -import com.mongodb.MongoClient; + import com.mongodb.MongoCommandException; +import com.mongodb.client.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.wgzhao.addax.common.exception.AddaxException; diff --git a/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/MongoUtil.java b/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/MongoUtil.java index 73e37f99d..3b5d0e0d5 100644 --- a/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/MongoUtil.java +++ b/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/MongoUtil.java @@ -19,18 +19,16 @@ package com.wgzhao.addax.plugin.reader.mongodbreader.util; -import com.mongodb.MongoClientOptions; +import com.mongodb.MongoClientSettings; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; import com.wgzhao.addax.common.exception.AddaxException; -import com.wgzhao.addax.common.util.Configuration; -import com.wgzhao.addax.plugin.reader.mongodbreader.KeyConstant; import com.wgzhao.addax.plugin.reader.mongodbreader.MongoDBReaderErrorCode; -import com.mongodb.MongoClient; import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -45,22 +43,7 @@ private MongoUtil() {} public static MongoClient initMongoClient(List addressList) { - - if (addressList == null || addressList.isEmpty()) { - throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_VALUE, "不合法参数"); - } - try { - return new MongoClient(parseServerAddress(addressList)); - } - catch (UnknownHostException e) { - throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS, "不合法的地址"); - } - catch (NumberFormatException e) { - throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_VALUE, "不合法参数"); - } - catch (Exception e) { - throw AddaxException.asAddaxException(MongoDBReaderErrorCode.UNKNOWN_EXCEPTION, "未知异常"); - } + return initCredentialMongoClient(addressList, "", "", null); } public static MongoClient initCredentialMongoClient(List addressList, String userName, String password, String database) @@ -70,11 +53,26 @@ public static MongoClient initCredentialMongoClient(List addressList, St throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_VALUE, "不合法参数"); } try { - MongoCredential credential = MongoCredential.createCredential(userName, database, password.toCharArray()); - return new MongoClient(parseServerAddress(addressList), Collections.singletonList(credential)); - } - catch (UnknownHostException e) { - throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS, "不合法的地址"); + MongoCredential credential = null; + if (! userName.isEmpty() && ! password.isEmpty()) { + credential = MongoCredential.createCredential(userName, database, password.toCharArray()); + } +// return new MongoClient(parseServerAddress(addressList), Collections.singletonList(credential)); + + MongoClientSettings.Builder mongoBuilder = MongoClientSettings.builder() + .applyToClusterSettings(builder -> { + try { + builder.hosts(parseServerAddress(addressList)); + } + catch (UnknownHostException e) { + throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS, "不合法的地址"); + } + }); + if (credential != null) { + mongoBuilder.credential(credential); + } + return MongoClients.create(mongoBuilder.build()); + } catch (NumberFormatException e) { throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_VALUE, "不合法参数"); From 51cac286a0e2a72b43020afe2d0ffed72e0b8bd7 Mon Sep 17 00:00:00 2001 From: wgzhao Date: Fri, 24 May 2024 09:21:16 +0800 Subject: [PATCH 2/2] fix read timeout when mongo replica cluster --- .../reader/mongodbreader/KeyConstant.java | 5 +---- .../reader/mongodbreader/MongoDBReader.java | 20 +++++++------------ .../reader/mongodbreader/util/MongoUtil.java | 3 --- 3 files changed, 8 insertions(+), 20 deletions(-) diff --git a/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/KeyConstant.java b/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/KeyConstant.java index f487207bd..5021c1563 100644 --- a/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/KeyConstant.java +++ b/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/KeyConstant.java @@ -44,10 +44,7 @@ public class KeyConstant public static final String UPPER_BOUND = "upperBound"; public static final String IS_OBJECT_ID = "isObjectId"; - /* - * 批量获取的记录数 - */ -// public static final String BATCH_SIZE = "batchSize" + /** * MongoDB的_id */ diff --git a/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/MongoDBReader.java b/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/MongoDBReader.java index 821792096..7373e9c34 100644 --- a/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/MongoDBReader.java +++ b/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/MongoDBReader.java @@ -66,9 +66,9 @@ public static class Job private MongoClient mongoClient; - private boolean isNullOrEmpty(String obj) + private boolean notNullAndEmpty(String obj) { - return obj == null || obj.isEmpty(); + return obj != null && !obj.isEmpty(); } @Override @@ -98,7 +98,7 @@ public void init() throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_VALUE, "The configuration column must be required and DOES NOT support \"*\" yet"); } - if (!isNullOrEmpty((userName)) && !isNullOrEmpty((password))) { + if (notNullAndEmpty((userName)) && notNullAndEmpty((password))) { this.mongoClient = MongoUtil.initCredentialMongoClient(addressList, userName, password, authDb); } else { @@ -130,9 +130,9 @@ public static class Task private boolean isObjectId = true; private int fetchSize; - private boolean isNullOrEmpty(String obj) + private boolean notNullAndEmpty(String obj) { - return obj == null || obj.isEmpty(); + return obj != null && !obj.isEmpty(); } @Override @@ -162,7 +162,7 @@ else if (upperBound.equals("max")) { filter.append(KeyConstant.MONGO_PRIMARY_ID, new Document("$gte", isObjectId ? new ObjectId(lowerBound.toString()) : lowerBound) .append("$lt", isObjectId ? new ObjectId(upperBound.toString()) : upperBound)); } - if (!isNullOrEmpty((query))) { + if (notNullAndEmpty((query))) { Document queryFilter = Document.parse(query); filter = new Document("$and", Arrays.asList(filter, queryFilter)); } @@ -251,7 +251,7 @@ public void init() this.collection = connConf.getString(KeyConstant.MONGO_COLLECTION_NAME); String authDb = connConf.getString(KeyConstant.MONGO_AUTH_DB, this.database); List addressList = connConf.getList(KeyConstant.MONGO_ADDRESS, Object.class); - if (!isNullOrEmpty((userName)) && !isNullOrEmpty((password))) { + if (notNullAndEmpty((userName)) && notNullAndEmpty((password))) { this.mongoClient = MongoUtil.initCredentialMongoClient(addressList, userName, password, authDb); } else { @@ -265,10 +265,4 @@ public void destroy() // } } - - public static void main(String[] args) - { - String a = "12"; - System.out.println(Double.parseDouble(a)); - } } diff --git a/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/MongoUtil.java b/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/MongoUtil.java index 3b5d0e0d5..4035d0458 100644 --- a/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/MongoUtil.java +++ b/plugin/reader/mongodbreader/src/main/java/com/wgzhao/addax/plugin/reader/mongodbreader/util/MongoUtil.java @@ -29,7 +29,6 @@ import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; /** @@ -57,8 +56,6 @@ public static MongoClient initCredentialMongoClient(List addressList, St if (! userName.isEmpty() && ! password.isEmpty()) { credential = MongoCredential.createCredential(userName, database, password.toCharArray()); } -// return new MongoClient(parseServerAddress(addressList), Collections.singletonList(credential)); - MongoClientSettings.Builder mongoBuilder = MongoClientSettings.builder() .applyToClusterSettings(builder -> { try {