From 7b11021bade97235038829a019e1a7f9b77fdbe8 Mon Sep 17 00:00:00 2001 From: wgzhao Date: Mon, 14 Jun 2021 11:21:25 +0800 Subject: [PATCH] Httpreader support sock proxy (#177) * httpreader plugin add support for socks proxy --- docs/src/main/sphinx/reader/httpreader.md | 12 +- .../plugin/reader/httpreader/HttpReader.java | 123 +++++++++++++----- 2 files changed, 105 insertions(+), 30 deletions(-) diff --git a/docs/src/main/sphinx/reader/httpreader.md b/docs/src/main/sphinx/reader/httpreader.md index 6dde03a69..a99eff57c 100644 --- a/docs/src/main/sphinx/reader/httpreader.md +++ b/docs/src/main/sphinx/reader/httpreader.md @@ -333,8 +333,18 @@ bin/datax.py job/httpreader2stream.json } } ``` +如果是 `sock` 代理 (V4,v5),则可以写 -`host` 是代理地址,包含代理类型,目前仅支持 `http` 代理,其他类型比如 `socks5` 不支持。 如果代理需要认证,则可以配置 `auth` , 它由 用户名和密码组成,两者之间用冒号(:) 隔开。 +```json +{ + "proxy": { + "host": "socks://127.0.0.1:8080", + "auth": "user:pass" + } +} +``` + +`host` 是代理地址,包含代理类型,目前仅支持 `http` 代理和 `socks`(V4, V5均可) 代理。 如果代理需要认证,则可以配置 `auth` , 它由 用户名和密码组成,两者之间用冒号(:) 隔开。 ### 限制说明 diff --git a/plugin/reader/httpreader/src/main/java/com/wgzhao/datax/plugin/reader/httpreader/HttpReader.java b/plugin/reader/httpreader/src/main/java/com/wgzhao/datax/plugin/reader/httpreader/HttpReader.java index a1034a7d4..0a2e6b660 100644 --- a/plugin/reader/httpreader/src/main/java/com/wgzhao/datax/plugin/reader/httpreader/HttpReader.java +++ b/plugin/reader/httpreader/src/main/java/com/wgzhao/datax/plugin/reader/httpreader/HttpReader.java @@ -37,11 +37,15 @@ import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; -import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.client.utils.URIBuilder; +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.conn.socket.ConnectionSocketFactory; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.TrustSelfSignedStrategy; @@ -49,6 +53,8 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.protocol.HttpContext; import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.util.EntityUtils; @@ -56,6 +62,10 @@ import javax.net.ssl.SSLContext; import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.net.Socket; +import java.net.SocketTimeoutException; import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.Charset; @@ -102,7 +112,8 @@ public static class Task { private Configuration readerSliceConfig = null; private URIBuilder uriBuilder; - private HttpHost proxy; + private HttpHost proxy = null; + private HttpClientContext context = null; private String username; private String password; private String proxyAuth; @@ -194,13 +205,13 @@ else if (object instanceof JSONObject) { if (columns.size() == 1 && "*".equals(columns.get(0))) { // 没有给定key的情况下,提取JSON的第一层key作为字段处理 columns.remove(0); - for (Object obj: JSONPath.keySet(jsonObject, "/")) { + for (Object obj : JSONPath.keySet(jsonObject, "/")) { columns.add(obj.toString()); } } // first, check key exists or not ? for (String k : columns) { - if (! jsonObject.containsKey(k)) { + if (!jsonObject.containsKey(k)) { throw DataXException.asDataXException( HttpReaderErrorCode.ILLEGAL_VALUE, "您尝试从结果中获取key为 '" + k + "'的结果,但实际结果中不存在该key值" @@ -234,13 +245,11 @@ private void createProxy(Configuration proxyConf) { String host = proxyConf.getString(Key.HOST); this.proxyAuth = proxyConf.getString(Key.AUTH); - if (host.startsWith("socks")) { - throw DataXException.asDataXException( - HttpReaderErrorCode.NOT_SUPPORT, "sockes 代理暂时不支持" - ); - } URI uri = URI.create(host); - this.proxy = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme()); + this.context = HttpClientContext.create(); + this.context.setAttribute("proxy", uri); + + } private CloseableHttpResponse createCloseableHttpResponse(String method) @@ -249,26 +258,18 @@ private CloseableHttpResponse createCloseableHttpResponse(String method) Map headers = readerSliceConfig.getMap(Key.HEADERS, new HashMap<>()); CloseableHttpClient httpClient; CloseableHttpResponse response; + if ("get".equalsIgnoreCase(method)) { HttpGet request = new HttpGet(uriBuilder.build()); headers.forEach((k, v) -> request.setHeader(k, v.toString())); httpClient = createCloseableHttpClient(); - - RequestConfig config = RequestConfig.custom() - .setProxy(proxy) - .build(); - request.setConfig(config); - response = httpClient.execute(request); + response = httpClient.execute(request, this.context); } else if ("post".equalsIgnoreCase(method)) { HttpPost request = new HttpPost(uriBuilder.build()); headers.forEach((k, v) -> request.setHeader(k, v.toString())); httpClient = createCloseableHttpClient(); - RequestConfig config = RequestConfig.custom() - .setProxy(proxy) - .build(); - request.setConfig(config); - response = httpClient.execute(request); + response = httpClient.execute(request, this.context); } else { throw DataXException.asDataXException( @@ -282,6 +283,14 @@ private CloseableHttpClient createCloseableHttpClient() { HttpClientBuilder httpClientBuilder = HttpClients.custom(); CredentialsProvider provider = null; + + Registry reg = RegistryBuilder + .create() + .register("http", new MyConnectionSocketFactory()) + .register("https", new MyConnectionSocketFactory()) + .build(); + PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(reg); + httpClientBuilder.setConnectionManager(cm); if (this.password != null) { httpClientBuilder = HttpClientBuilder.create(); // setup BasicAuth @@ -298,17 +307,14 @@ private CloseableHttpClient createCloseableHttpClient() if (this.proxyAuth != null) { String[] up = this.proxyAuth.split(":"); - provider = new BasicCredentialsProvider(); - if (up.length == 1) { - provider.setCredentials(new AuthScope(this.proxy), new UsernamePasswordCredentials(up[0], null)); - } + System.setProperty("java.net.socks.username", up[0]); + System.setProperty("http.proxyUser", up[0]); if (up.length == 2) { - provider.setCredentials(new AuthScope(this.proxy), new UsernamePasswordCredentials(up[0], up[1])); + System.setProperty("java.net.socks.password", up[1]); + System.setProperty("http.proxyPassword", up[1]); } } - if (provider != null) { - httpClientBuilder.setDefaultCredentialsProvider(provider); - } + httpClientBuilder.setSSLSocketFactory(ignoreSSLErrors()); return httpClientBuilder.build(); @@ -337,4 +343,63 @@ private SSLConnectionSocketFactory ignoreSSLErrors() } } } + + static class MyConnectionSocketFactory + implements + ConnectionSocketFactory + { + + @Override + public Socket createSocket(final HttpContext context) + throws IOException + { + Proxy proxy = null; + URI uri = (URI) context.getAttribute("proxy"); + if (uri == null) { + return null; + } + + InetSocketAddress socksaddr = new InetSocketAddress(uri.getHost(), uri.getPort()); + String proxyType = uri.getScheme(); + if (proxyType.startsWith("socks")) { + proxy = new Proxy(Proxy.Type.SOCKS, socksaddr); + } + else if (proxyType.startsWith("http")) { + proxy = new Proxy(Proxy.Type.HTTP, socksaddr); + } + if (proxy == null) { + return null; + } else { + return new Socket(proxy); + } + } + + @Override + public Socket connectSocket(final int connectTimeout, + final Socket socket, final HttpHost host, + final InetSocketAddress remoteAddress, + final InetSocketAddress localAddress, + final HttpContext context) + throws IOException + { + Socket sock; + if (socket != null) { + sock = socket; + } + else { + sock = createSocket(context); + } + if (localAddress != null) { + sock.bind(localAddress); + } + try { + sock.connect(remoteAddress, connectTimeout); + } + catch (SocketTimeoutException ex) { + throw new ConnectTimeoutException(ex, host, + remoteAddress.getAddress()); + } + return sock; + } + } }