Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Did you forget to close a response body? #1

Closed
peteryuanpan opened this issue Dec 30, 2020 · 3 comments
Closed

Did you forget to close a response body? #1

peteryuanpan opened this issue Dec 30, 2020 · 3 comments

Comments

@peteryuanpan
Copy link
Owner

peteryuanpan commented Dec 30, 2020

执行参数:downlog -date 2020-12-01-20 -domains wdl1.cache.wps.cn -dest ./log/ -worker 100 -overwrite

主要报错

十二月 30, 2020 8:43:21 下午 okhttp3.internal.platform.Platform log
警告: A connection to http://172.168.5.13:9999/ was leaked. Did you forget to close a response body?

解决方案参考

@peteryuanpan
Copy link
Owner Author

peteryuanpan commented Dec 30, 2020

进一步调试

java.util.logging.Logger okhttplogger = java.util.logging.Logger.getLogger(OkHttpClient.class.getName());
okhttplogger.setLevel(Level.FINE);

报错如下

2020-12-30 21:08:58.170 [pool-1-thread-31] c.p.e.d.ExecuteDownLogAbstract[93] - retry times 0, Error: java.lang.IllegalArgumentException, ErrorMessage: Self-suppression not permitted, failed to download url http://cdnlog-sh-public.oss-cn-shanghai.aliyuncs.com/v1.l1cache/234581/wdl1.cache.wps.cn/2020_12_01/wdl1.cache.wps.cn_2020_12_01_200415_200430.gz?Expires=1609938357&OSSAccessKeyId=LTAIviCc6zy8x3xa&Signature=wvZnMqxx1fMZJm4Ypoby3Cn05jc%3D
java.lang.IllegalArgumentException: Self-suppression not permitted
	at java.lang.Throwable.addSuppressed(Throwable.java:1082)
	at okhttp3.ResponseBody.$closeResource(ResponseBody.java:137)
	at okhttp3.ResponseBody.bytes(ResponseBody.java:137)
	at com.peter.execute.downlog.ExecuteDownLogAbstract.lambda$execute$0(ExecuteDownLogAbstract.java:89)
	at java.lang.Thread.run(Thread.java:748)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
十二月 30, 2020 9:09:32 下午 okhttp3.internal.platform.Platform log
警告: A connection to http://172.168.5.13:9999/ was leaked. Did you forget to close a response body?
java.lang.Throwable: response.body().close()
	at okhttp3.internal.platform.Platform.getStackTraceForCloseable(Platform.java:149)
	at okhttp3.internal.connection.Transmitter.callStart(Transmitter.java:116)
	at okhttp3.RealCall.execute(RealCall.java:78)
	at com.peter.execute.ExecuteHttp.send(ExecuteHttp.java:22)
	at com.peter.execute.ExecuteHttp.get(ExecuteHttp.java:34)
	at com.peter.execute.downlog.ExecuteDownLogAbstract.lambda$execute$0(ExecuteDownLogAbstract.java:84)
	at java.lang.Thread.run(Thread.java:748)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

从 Caused by: java.lang.OutOfMemoryError: Java heap space 看出来,是OOM问题

增加VM options:-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=C:\Users\Admin\Desktop
输出了 dump 文件,通过 Eclipse Memory Analyse 工具分析,确认应该是 byte[] 对象过多导致的OOM

增加VM options:-Xms10240M -Xmx10240M -Xmn5120M
将新生代扩增到5GB,老年代到5GB

再次运行,全部下载完成

2020-12-30 22:22:30.186 [main] c.p.e.d.ExecuteDownLog[62] - Finished downloading urls
2020-12-30 22:22:30.190 [main] c.p.e.d.ExecuteDownLog[63] - TotalSize: 6.79 GB
2020-12-30 22:22:30.191 [main] c.p.e.d.ExecuteDownLog[64] - TotalTimeCost: 628.705 second
2020-12-30 22:22:30.191 [main] c.p.e.d.ExecuteDownLog[65] - AverageSpeed: 11.05 MB/s
2020-12-30 22:22:30.191 [main] c.p.e.d.ExecuteDownLog[66] - SuccessNumber: 240
2020-12-30 22:22:30.191 [main] c.p.e.d.ExecuteDownLog[67] - FailedNumber: 0

有趣的是,使用 jvisualvm 监控,新生代中 Eden区与Survivor区 起初是接近 8:1:1 的关系的,但到了后期会变成 1:1:1 的关系,也许是本机内存不够导致的,本机内存只有16GB,到了最后已经跑到90%了

起初
image

后期
image

@peteryuanpan peteryuanpan changed the title okhttp3.internal.platform.Platform error Did you forget to close a response body? Dec 30, 2020
@peteryuanpan
Copy link
Owner Author

ExecuteDownLogAbstract 的源码(能复现报错的)

package com.peter.execute.downlog;

import com.peter.execute.ExecuteHttp;
import com.peter.model.DownLog;
import okhttp3.Request;
import okhttp3.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public abstract class ExecuteDownLogAbstract implements ExecuteDownLogInterface {

    private static final Logger logger = LoggerFactory.getLogger(ExecuteDownLogAbstract.class);

    protected void beforeRequestSend(Request request) {
        // for subclass
    }

    @Override
    public ExecuteDownLog.Response execute(DownLog downLog, String[] urls) throws Exception {
        ExecuteDownLog.Response resp = new ExecuteDownLog.Response();

        final long begin = System.currentTimeMillis();
        final AtomicInteger idx = new AtomicInteger(-1);
        final AtomicLong totalSize = new AtomicLong(0);
        final AtomicInteger successNumber = new AtomicInteger(0);
        final int worker = downLog.getWorker();
        final String dest = downLog.getDest();
        final CountDownLatch countDownLatch = new CountDownLatch(worker);
        final ExecuteHttp http = ExecuteHttp.getInstance();

        logger.debug("urls.length: " + urls.length);
        logger.debug("worker: " + worker);
        logger.debug("dest: " + dest);

        final File destF = new File(dest);
        if (!destF.exists())
            destF.mkdirs();

        final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            worker, worker,
            0, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>()
        );

        for (int w = 0; w < worker; w ++) {
            threadPool.submit(new Thread(() -> {
                while (true) {
                    int i = idx.incrementAndGet();
                    if (i >= urls.length)
                        break;
                    String filePath = destF.getPath() + "/" + urls[i].substring(urls[i].lastIndexOf('/') + 1, urls[i].indexOf('?'));
                    logger.debug("filePath: " + filePath);
                    if (!downLog.isOverwrite() && new File(filePath).exists()) {
                        logger.info("Skip to download url " + urls[i] + " because file " + filePath + " exists");
                        successNumber.addAndGet(1);
                        continue;
                    }
                    Request request = null;
                    Response response = null;
                    byte[] bytes = null;
                    int j = 0;
                    try {
                        while (j <= downLog.getRetry()) {
                            request = http.getRequest(urls[i]);
                            beforeRequestSend(request);
                            logger.debug("retry times " + j + ", " + request.toString());
                            try {
                                response = http.get(request);
                                logger.debug("retry times " + j + ", " + response.toString());
                                if (response.code() == 200) {
                                    bytes = response.body().bytes(); // important
                                    break;
                                }
                            } catch (Exception e) {
                                logger.info("retry times " + j + ", Error: " + e.getClass().getName() + ", ErrorMessage: " + e.getMessage() + ", failed to download url " + urls[i]);
                            }
                            j ++;
                        }
                        if (j <= downLog.getRetry()) { // code = 200
                            OutputStream outputStream = new FileOutputStream(filePath);
                            outputStream.write(bytes);
                            outputStream.flush();
                            outputStream.close();
                            logger.info("Success to download url " + urls[i] + " size " + ExecuteDownLog.getSize(bytes.length));
                            totalSize.addAndGet(bytes.length);
                            successNumber.addAndGet(1);
                        } else
                            logger.info("Failed to download url " +  urls[i] + " after retrying " + downLog.getRetry());
                    } catch (Exception e) {
                        logger.error(e.getClass().getName() + " " + e.getMessage());
                    } finally {
                        if (response != null)
                            response.close();
                    }
                }
                countDownLatch.countDown();
            }));
        }

        threadPool.shutdown();
        countDownLatch.await();

        resp.totalSize = totalSize.get();
        resp.totalTimeCost = (System.currentTimeMillis() - begin) / 1000.0;
        resp.successNumber = successNumber.get();
        resp.failedNumber = urls.length - successNumber.get();
        return resp;
    }
}

@peteryuanpan
Copy link
Owner Author

修改后的 ExecuteDownLogAbstract 源码(也能复现报错,但修改后的更好)

package com.peter.execute.downlog;

import com.peter.execute.ExecuteHttp;
import com.peter.model.DownLog;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public abstract class ExecuteDownLogAbstract implements ExecuteDownLogInterface {

    private static final Logger logger = LoggerFactory.getLogger(ExecuteDownLogAbstract.class);

    protected void beforeRequestSend(Request request) {
        // for subclass
    }

    @Override
    public ExecuteDownLog.Response execute(DownLog downLog, String[] urls) throws Exception {
        ExecuteDownLog.Response resp = new ExecuteDownLog.Response();

        final long begin = System.currentTimeMillis();
        final AtomicInteger idx = new AtomicInteger(-1);
        final AtomicLong totalSize = new AtomicLong(0);
        final AtomicInteger successNumber = new AtomicInteger(0);
        final int worker = downLog.getWorker();
        final String dest = downLog.getDest();
        final CountDownLatch countDownLatch = new CountDownLatch(worker);
        final ExecuteHttp http = ExecuteHttp.getInstance();

        logger.debug("urls.length: " + urls.length);
        logger.debug("worker: " + worker);
        logger.debug("dest: " + dest);

        final File destF = new File(dest);
        if (!destF.exists())
            destF.mkdirs();

        final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            worker, worker,
            0, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>()
        );

        for (int w = 0; w < worker; w ++) {
            threadPool.submit(new Thread(() -> {
                while (true) {
                    // step 0: 获取idx,判断overwrite
                    int i = idx.incrementAndGet();
                    if (i >= urls.length)
                        break;
                    String filePath = destF.getPath() + "/" + urls[i].substring(urls[i].lastIndexOf('/') + 1, urls[i].indexOf('?'));
                    logger.debug("filePath: " + filePath);
                    if (!downLog.isOverwrite() && new File(filePath).exists()) {
                        logger.info("Skip to download url " + urls[i] + " because file " + filePath + " exists");
                        successNumber.addAndGet(1);
                        continue;
                    }
                    // step 1: http请求处理,获取数据bytes
                    Response response = null;
                    byte[] bytes = null;
                    int j = 0;
                    while (j <= downLog.getRetry()) {
                        Request request = http.getRequest(urls[i]);
                        beforeRequestSend(request);
                        logger.debug("retry times " + j + ", " + request.toString());
                        try {
                            response = http.get(request);
                            logger.debug("retry times " + j + ", " + response.toString());
                            if (response.code() == 200) {
                                ResponseBody respbody = response.body();
                                assert respbody != null;
                                bytes = respbody.bytes(); // important
                                break;
                            }
                        } catch (Exception e) {
                            logger.info("retry times " + j + ", Error: " + e.getClass().getName() + ", ErrorMessage: " + e.getMessage() + ", failed to download url " + urls[i]);
                            if (!(e instanceof IOException))
                                e.printStackTrace();
                        } finally {
                            if (response != null)
                                response.close(); // close response.body()
                        }
                        j ++;
                    }
                    // step 2: 输出数据bytes到文件filePath
                    OutputStream outputStream = null;
                    try {
                        if (j <= downLog.getRetry()) { // code = 200
                            outputStream = new FileOutputStream(filePath);
                            assert bytes != null;
                            outputStream.write(bytes);
                            outputStream.flush();
                            logger.info("Success to download url " + urls[i] + " size " + ExecuteDownLog.getSize(bytes.length));
                            totalSize.addAndGet(bytes.length);
                            successNumber.addAndGet(1);
                        } else
                            logger.info("Failed to download url " +  urls[i] + " after retrying " + downLog.getRetry());
                    } catch (Exception e) {
                        logger.error(e.getClass().getName() + " " + e.getMessage());
                    } finally {
                        if (outputStream != null) {
                            try {
                                outputStream.close();
                            } catch (Exception e) {
                                logger.error(e.getClass().getName() + " " + e.getMessage());
                            }
                        }
                    }
                }
                countDownLatch.countDown();
            }));
        }

        threadPool.shutdown();
        countDownLatch.await();

        resp.totalSize = totalSize.get();
        resp.totalTimeCost = (System.currentTimeMillis() - begin) / 1000.0;
        resp.successNumber = successNumber.get();
        resp.failedNumber = urls.length - successNumber.get();
        return resp;
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant