Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Httpreader support sock proxy (#177)
Browse files Browse the repository at this point in the history
* httpreader plugin add support for socks proxy
wgzhao authored Jun 14, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 74fd205 commit 7b11021
Showing 2 changed files with 105 additions and 30 deletions.
12 changes: 11 additions & 1 deletion docs/src/main/sphinx/reader/httpreader.md
Original file line number Diff line number Diff line change
@@ -333,8 +333,18 @@ bin/datax.py job/httpreader2stream.json
}
}
```
如果是 `sock` 代理 (V4,v5),则可以写

`host` 是代理地址,包含代理类型,目前仅支持 `http` 代理,其他类型比如 `socks5` 不支持。 如果代理需要认证,则可以配置 `auth` , 它由 用户名和密码组成,两者之间用冒号(:) 隔开。
```json
{
"proxy": {
"host": "socks://127.0.0.1:8080",
"auth": "user:pass"
}
}
```

`host` 是代理地址,包含代理类型,目前仅支持 `http` 代理和 `socks`(V4, V5均可) 代理。 如果代理需要认证,则可以配置 `auth` , 它由 用户名和密码组成,两者之间用冒号(:) 隔开。

### 限制说明

Original file line number Diff line number Diff line change
@@ -37,25 +37,35 @@
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.HttpContext;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.util.EntityUtils;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
@@ -102,7 +112,8 @@ public static class Task
{
private Configuration readerSliceConfig = null;
private URIBuilder uriBuilder;
private HttpHost proxy;
private HttpHost proxy = null;
private HttpClientContext context = null;
private String username;
private String password;
private String proxyAuth;
@@ -194,13 +205,13 @@ else if (object instanceof JSONObject) {
if (columns.size() == 1 && "*".equals(columns.get(0))) {
// 没有给定key的情况下,提取JSON的第一层key作为字段处理
columns.remove(0);
for (Object obj: JSONPath.keySet(jsonObject, "/")) {
for (Object obj : JSONPath.keySet(jsonObject, "/")) {
columns.add(obj.toString());
}
}
// first, check key exists or not ?
for (String k : columns) {
if (! jsonObject.containsKey(k)) {
if (!jsonObject.containsKey(k)) {
throw DataXException.asDataXException(
HttpReaderErrorCode.ILLEGAL_VALUE,
"您尝试从结果中获取key为 '" + k + "'的结果,但实际结果中不存在该key值"
@@ -234,13 +245,11 @@ private void createProxy(Configuration proxyConf)
{
String host = proxyConf.getString(Key.HOST);
this.proxyAuth = proxyConf.getString(Key.AUTH);
if (host.startsWith("socks")) {
throw DataXException.asDataXException(
HttpReaderErrorCode.NOT_SUPPORT, "sockes 代理暂时不支持"
);
}
URI uri = URI.create(host);
this.proxy = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
this.context = HttpClientContext.create();
this.context.setAttribute("proxy", uri);


}

private CloseableHttpResponse createCloseableHttpResponse(String method)
@@ -249,26 +258,18 @@ private CloseableHttpResponse createCloseableHttpResponse(String method)
Map<String, Object> headers = readerSliceConfig.getMap(Key.HEADERS, new HashMap<>());
CloseableHttpClient httpClient;
CloseableHttpResponse response;

if ("get".equalsIgnoreCase(method)) {
HttpGet request = new HttpGet(uriBuilder.build());
headers.forEach((k, v) -> request.setHeader(k, v.toString()));
httpClient = createCloseableHttpClient();

RequestConfig config = RequestConfig.custom()
.setProxy(proxy)
.build();
request.setConfig(config);
response = httpClient.execute(request);
response = httpClient.execute(request, this.context);
}
else if ("post".equalsIgnoreCase(method)) {
HttpPost request = new HttpPost(uriBuilder.build());
headers.forEach((k, v) -> request.setHeader(k, v.toString()));
httpClient = createCloseableHttpClient();
RequestConfig config = RequestConfig.custom()
.setProxy(proxy)
.build();
request.setConfig(config);
response = httpClient.execute(request);
response = httpClient.execute(request, this.context);
}
else {
throw DataXException.asDataXException(
@@ -282,6 +283,14 @@ private CloseableHttpClient createCloseableHttpClient()
{
HttpClientBuilder httpClientBuilder = HttpClients.custom();
CredentialsProvider provider = null;

Registry<ConnectionSocketFactory> reg = RegistryBuilder
.<ConnectionSocketFactory>create()
.register("http", new MyConnectionSocketFactory())
.register("https", new MyConnectionSocketFactory())
.build();
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(reg);
httpClientBuilder.setConnectionManager(cm);
if (this.password != null) {
httpClientBuilder = HttpClientBuilder.create();
// setup BasicAuth
@@ -298,17 +307,14 @@ private CloseableHttpClient createCloseableHttpClient()

if (this.proxyAuth != null) {
String[] up = this.proxyAuth.split(":");
provider = new BasicCredentialsProvider();
if (up.length == 1) {
provider.setCredentials(new AuthScope(this.proxy), new UsernamePasswordCredentials(up[0], null));
}
System.setProperty("java.net.socks.username", up[0]);
System.setProperty("http.proxyUser", up[0]);
if (up.length == 2) {
provider.setCredentials(new AuthScope(this.proxy), new UsernamePasswordCredentials(up[0], up[1]));
System.setProperty("java.net.socks.password", up[1]);
System.setProperty("http.proxyPassword", up[1]);
}
}
if (provider != null) {
httpClientBuilder.setDefaultCredentialsProvider(provider);
}

httpClientBuilder.setSSLSocketFactory(ignoreSSLErrors());

return httpClientBuilder.build();
@@ -337,4 +343,63 @@ private SSLConnectionSocketFactory ignoreSSLErrors()
}
}
}

static class MyConnectionSocketFactory
implements
ConnectionSocketFactory
{

@Override
public Socket createSocket(final HttpContext context)
throws IOException
{
Proxy proxy = null;
URI uri = (URI) context.getAttribute("proxy");
if (uri == null) {
return null;
}

InetSocketAddress socksaddr = new InetSocketAddress(uri.getHost(), uri.getPort());
String proxyType = uri.getScheme();
if (proxyType.startsWith("socks")) {
proxy = new Proxy(Proxy.Type.SOCKS, socksaddr);
}
else if (proxyType.startsWith("http")) {
proxy = new Proxy(Proxy.Type.HTTP, socksaddr);
}
if (proxy == null) {
return null;
} else {
return new Socket(proxy);
}
}

@Override
public Socket connectSocket(final int connectTimeout,
final Socket socket, final HttpHost host,
final InetSocketAddress remoteAddress,
final InetSocketAddress localAddress,
final HttpContext context)
throws IOException
{
Socket sock;
if (socket != null) {
sock = socket;
}
else {
sock = createSocket(context);
}
if (localAddress != null) {
sock.bind(localAddress);
}
try {
sock.connect(remoteAddress, connectTimeout);
}
catch (SocketTimeoutException ex) {
throw new ConnectTimeoutException(ex, host,
remoteAddress.getAddress());
}
return sock;
}
}
}

0 comments on commit 7b11021

Please sign in to comment.