Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak in MessageExchangeStore #429

Closed
sbernard31 opened this issue Oct 3, 2017 · 50 comments
Closed

Memory leak in MessageExchangeStore #429

sbernard31 opened this issue Oct 3, 2017 · 50 comments

Comments

@sbernard31
Copy link
Contributor

It seems that memoryExchangeStore leaks.
Analyzing heap dump show that the leak is in ExchangeByToken map and mainly concern observe request with block2.

The strange thing is most of exchanges have a response set, so it seems there are "complete" but not removed from the map...

Here some old discussion about that :
https://dev.eclipse.org/mhonarc/lists/cf-dev/msg01404.html

#418 this PR aime to reproduce it via Junit.

@sbernard31
Copy link
Contributor Author

Waiting a possible workaround is to use a MessageExchangeStore with expirable cache instead of classic map. (like guava or cache2k)

ExpirableMessageExchangeStore.java using guava (click to see code)
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.eclipse.californium.core.coap.Message;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.network.Exchange;
import org.eclipse.californium.core.network.Exchange.KeyMID;
import org.eclipse.californium.core.network.Exchange.KeyToken;
import org.eclipse.californium.core.network.InMemoryMessageIdProvider;
import org.eclipse.californium.core.network.InMemoryRandomTokenProvider;
import org.eclipse.californium.core.network.MessageExchangeStore;
import org.eclipse.californium.core.network.MessageIdProvider;
import org.eclipse.californium.core.network.TokenProvider;
import org.eclipse.californium.core.network.config.NetworkConfig;
import org.eclipse.californium.core.network.deduplication.Deduplicator;
import org.eclipse.californium.core.network.deduplication.DeduplicatorFactory;
import org.eclipse.californium.elements.util.DaemonThreadFactory;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

/**
 * A copy of {@code InMemoryMessageExchangeStore} with expiration for {@code Exchange} map.
 *
 */
public class ExpirableMessageExchangeStore implements MessageExchangeStore {

	private static final long DEFAULT_EXPIRATION =  60L * 30; // in seconds
	private static final Logger LOGGER = Logger.getLogger(ExpirableMessageExchangeStore.class.getName());
	// for all
	private final Cache<KeyMID, Exchange> exchangesByMID;
	// for outgoing
	private final Cache<KeyToken, Exchange> exchangesByToken;

	private final NetworkConfig config;
	private final TokenProvider tokenProvider;
	private boolean running = false;
	private volatile Deduplicator deduplicator;
	private volatile MessageIdProvider messageIdProvider;
	private ScheduledFuture<?> statusLogger;
	private ScheduledExecutorService scheduler;

	/**
	 * Creates a new store for configuration values with default expirations set to 30 min.
	 * 
	 * @param config the configuration to use.
	 * 
	 */
	public ExpirableMessageExchangeStore(final NetworkConfig config) {
		this(config, new InMemoryRandomTokenProvider(config), DEFAULT_EXPIRATION);
		LOGGER.log(Level.CONFIG, "using default TokenProvider {0}", InMemoryRandomTokenProvider.class.getName());
	}

	/**
	 * Creates a new store for configuration values.
	 * 
	 * @param config the configuration to use.
	 * @param tokenProvider the TokenProvider which provides CoAP tokens that
	 *            are guaranteed to be not in use.
	 * @param expirationInSec expiration in seconds
	 */
	public ExpirableMessageExchangeStore(final NetworkConfig config, TokenProvider tokenProvider, long expirationInSec) {
		if (config == null) {
			throw new NullPointerException("Configuration must not be null");
		}
		if (tokenProvider == null) {
			throw new NullPointerException("TokenProvider must not be null");
		}
		this.tokenProvider = tokenProvider;
		this.config = config;
		
		exchangesByMID = CacheBuilder.newBuilder().expireAfterWrite(expirationInSec, TimeUnit.SECONDS).build();
		exchangesByToken = CacheBuilder.newBuilder().expireAfterWrite(expirationInSec, TimeUnit.SECONDS).build();
	}

	private void startStatusLogging() {

		final Level healthStatusLevel = Level
				.parse(config.getString(NetworkConfig.Keys.HEALTH_STATUS_PRINT_LEVEL, Level.FINEST.getName()));
		final int healthStatusInterval = config.getInt(NetworkConfig.Keys.HEALTH_STATUS_INTERVAL, 60); // seconds
		// this is a useful health metric
		// that could later be exported to some kind of monitoring interface
		if (LOGGER.isLoggable(healthStatusLevel)) {
			this.scheduler = Executors
					.newSingleThreadScheduledExecutor(new DaemonThreadFactory("MessageExchangeStore"));
			statusLogger = scheduler.scheduleAtFixedRate(new Runnable() {

				@Override
				public void run() {
					LOGGER.log(healthStatusLevel, dumpCurrentLoadLevels());
				}
			}, healthStatusInterval, healthStatusInterval, TimeUnit.SECONDS);
		}
	}

	private String dumpCurrentLoadLevels() {
		StringBuilder b = new StringBuilder("MessageExchangeStore contents: ");
		b.append(exchangesByMID.size()).append(" exchanges by MID, ");
		b.append(exchangesByToken.size()).append(" exchanges by token, ");
		return b.toString();
	}

	/**
	 * Sets the object to use for detecting duplicate incoming messages.
	 * 
	 * @param deduplicator the deduplicator.
	 * @throws NullPointerException if deduplicator is {@code null}.
	 * @throws IllegalStateException if this store is already running.
	 */
	public synchronized void setDeduplicator(final Deduplicator deduplicator) {
		if (running) {
			throw new IllegalStateException("Cannot set Deduplicator when store is already started");
		} else if (deduplicator == null) {
			throw new NullPointerException("Deduplicator must not be null");
		} else {
			this.deduplicator = deduplicator;
		}
	}

	/**
	 * Sets the provider to use for creating message IDs for outbound messages.
	 * 
	 * @param provider the provider.
	 * @throws NullPointerException if provider is {@code null}.
	 * @throws IllegalStateException if this store is already running.
	 */
	public synchronized void setMessageIdProvider(final MessageIdProvider provider) {
		if (running) {
			throw new IllegalStateException("Cannot set messageIdProvider when store is already started");
		} else if (provider == null) {
			throw new NullPointerException("Message ID Provider must not be null");
		} else {
			this.messageIdProvider = provider;
		}
	}

	@Override
	public boolean isEmpty() {
		LOGGER.finer(dumpCurrentLoadLevels());
		return exchangesByMID.size() == 0 && exchangesByToken.size() == 0 && deduplicator.isEmpty();
	}

	@Override
	public int assignMessageId(final Message message) {
		int mid = message.getMID();
		if (Message.NONE == mid) {
			InetSocketAddress dest = new InetSocketAddress(message.getDestination(), message.getDestinationPort());
			mid = messageIdProvider.getNextMessageId(dest);
			if (Message.NONE == mid) {
				LOGGER.log(Level.WARNING, "Cannot send message to {0}, all MIDs are in use", dest);
			} else {
				message.setMID(mid);
			}
		}
		return mid;
	}

	private int registerWithMessageId(final Exchange exchange, final Message message) {

		int mid = message.getMID();
		if (Message.NONE == mid) {
			mid = assignMessageId(message);
			if (Message.NONE != mid) {
				KeyMID key = KeyMID.fromOutboundMessage(message);
				if (exchangesByMID.asMap().putIfAbsent(key, exchange) != null) {
					LOGGER.log(Level.WARNING,
							"newly generated MID [{0}] already in use, overwriting already registered exchange", mid);
				}
			}
		} else {
			Exchange existingExchange = exchangesByMID.asMap().putIfAbsent(KeyMID.fromOutboundMessage(message), exchange);
			if (existingExchange != null) {
				if (existingExchange != exchange) {
					throw new IllegalArgumentException(String
							.format("message ID [%d] already in use, cannot register exchange", message.getMID()));
				} else if (exchange.getFailedTransmissionCount() == 0) {
					throw new IllegalArgumentException(String.format(
							"message with already registered ID [%d] is not a re-transmission, cannot register exchange",
							message.getMID()));
				}
			}
		}
		return mid;
	}

	private void registerWithToken(final Exchange exchange) {
		Request request = exchange.getCurrentRequest();
		KeyToken idByToken;
		if (request.getToken() == null) {
			idByToken = tokenProvider.getUnusedToken(request);
			request.setToken(idByToken.getToken());
		} else {
			idByToken = KeyToken.fromOutboundMessage(request);
			// ongoing requests may reuse token
			if (!(exchange.getFailedTransmissionCount() > 0 || request.getOptions().hasBlock1()
					|| request.getOptions().hasBlock2() || request.getOptions().hasObserve())
					&& tokenProvider.isTokenInUse(idByToken)) {
				LOGGER.log(Level.WARNING, "Manual token overrides existing open request: {0}", idByToken);
			}
		}
		exchangesByToken.put(idByToken, exchange);
	}

	@Override
	public boolean registerOutboundRequest(final Exchange exchange) {

		if (exchange == null) {
			throw new NullPointerException("exchange must not be null");
		} else if (exchange.getCurrentRequest() == null) {
			throw new IllegalArgumentException("exchange does not contain a request");
		} else {
			int mid = registerWithMessageId(exchange, exchange.getCurrentRequest());
			if (Message.NONE != mid) {
				registerWithToken(exchange);
				return true;
			} else {
				return false;
			}
		}
	}

	@Override
	public boolean registerOutboundRequestWithTokenOnly(final Exchange exchange) {
		if (exchange == null) {
			throw new NullPointerException("exchange must not be null");
		} else if (exchange.getCurrentRequest() == null) {
			throw new IllegalArgumentException("exchange does not contain a request");
		} else {
			registerWithToken(exchange);
			return true;
		}
	}

	@Override
	public void remove(final KeyToken token, final Exchange exchange) {
		boolean removed = exchangesByToken.asMap().remove(token, exchange);
		if (removed) {
			LOGGER.log(Level.FINE, "removing exchange for token {0}", new Object[] { token });
		}
	}

	@Override
	public Exchange remove(final KeyMID messageId, final Exchange exchange) {
		Exchange removedExchange;
		if (null == exchange) {
			removedExchange = exchangesByMID.asMap().remove(messageId);
		} else if (exchangesByMID.asMap().remove(messageId, exchange)) {
			removedExchange = exchange;
		} else {
			removedExchange = null;
		}
		if (null != removedExchange) {
			LOGGER.log(Level.FINE, "removing exchange for MID {0}", new Object[] { messageId });
		}
		return removedExchange;
	}

	@Override
	public Exchange get(final KeyToken token) {
		if (token == null) {
			return null;
		} else {
			return exchangesByToken.getIfPresent(token);
		}
	}

	@Override
	public Exchange get(final KeyMID messageId) {
		if (messageId == null) {
			return null;
		} else {
			return exchangesByMID.getIfPresent(messageId);
		}
	}

	@Override
	public boolean registerOutboundResponse(final Exchange exchange) {
		if (exchange == null) {
			throw new NullPointerException("exchange must not be null");
		} else if (exchange.getCurrentResponse() == null) {
			throw new IllegalArgumentException("exchange does not contain a response");
		} else {
			return registerWithMessageId(exchange, exchange.getCurrentResponse()) > Message.NONE;
		}
	}

	@Override
	public synchronized void start() {
		if (!running) {
			startStatusLogging();
			if (deduplicator == null) {
				DeduplicatorFactory factory = DeduplicatorFactory.getDeduplicatorFactory();
				this.deduplicator = factory.createDeduplicator(config);
			}
			this.deduplicator.start();
			if (messageIdProvider == null) {
				LOGGER.log(Level.CONFIG, "no MessageIdProvider set, using default {0}",
						InMemoryMessageIdProvider.class.getName());
				messageIdProvider = new InMemoryMessageIdProvider(config);
			}
			running = true;
		}
	}

	/**
	 * Stops this store and purges all registered exchanges.
	 */
	@Override
	public synchronized void stop() {
		if (running) {
			if (statusLogger != null) {
				statusLogger.cancel(false);
			}
			deduplicator.stop();
			exchangesByMID.invalidateAll();
			exchangesByToken.invalidateAll();
			running = false;
		}
	}

	@Override
	public Exchange findPrevious(final KeyMID messageId, final Exchange exchange) {
		return deduplicator.findPrevious(messageId, exchange);
	}

	@Override
	public Exchange find(final KeyMID messageId) {
		return deduplicator.find(messageId);
	}

	@Override
	public List<Exchange> findByToken(byte[] token) {
		List<Exchange> result = new ArrayList<>();
		if (token != null) {
			for (Entry<KeyToken, Exchange> entry : exchangesByToken.asMap().entrySet()) {
				if (entry.getValue().isOfLocalOrigin()) {
					Request request = entry.getValue().getRequest();
					if (request != null && Arrays.equals(token, request.getToken())) {
						result.add(entry.getValue());
					}
				}
			}
		}
		return result;
	}

	@Override
	public void releaseToken(KeyToken keyToken) {
		tokenProvider.releaseToken(keyToken);
	}
}

It's also possible to add listener to monitor leaks :

exchangesByToken = CacheBuilder.newBuilder()
                .expireAfterWrite(expirationInSec, TimeUnit.SECONDS)
		.removalListener((RemovalNotification<KeyToken, Exchange> notification) -> {
			if (notification.getCause() == RemovalCause.EXPIRED) {
			       // monitor leak
		        }
		}).build();

@boaks
Copy link
Contributor

boaks commented Oct 5, 2017

I will spent more time on this next week.
If I remember well, this occurs in your "real test setup", so it's not easy for me to check it on my own, right?

@sbernard31
Copy link
Contributor Author

If I remember well, this occurs in your "real test setup"

Exactly and we don't succeed to reproduce it. #418 is probably a good start. I should have time tomorrow to look at this more deeply. I will let you know.

@sbernard31
Copy link
Contributor Author

Without surprise, we also observe memory leak in InMemoryRandomTokenProvider which should be cleaned when exchange is complete.

On our side, we use as workaround a stateless random token provider. Our number of clients and request don't expose us to high risk of collisions.

I still think that using an "endpoint identifier" + stateless random generation (#173 (comment)) would be better than this reserve/release pattern, but this is probably another debate.

@sophokles73
Copy link
Contributor

@sbernard31
Is this issue fixed with #441?

@sbernard31
Copy link
Contributor Author

I'm not sure yet, I have some metrics in production which allow me to monitor that, but we don't deploy a version with #441 for now.

I let it open waiting, I check this. But if you prefer we can close it and I will reopen it only if this is not fixed.

@boaks
Copy link
Contributor

boaks commented Jan 30, 2018

Adding a "reverse observe" to the cf-extplugtest-server and cf-extplugtest-client/BenchmarkClient, I'm able to reproduce the leaks.

(Re-)Extending the logging in InMemoryMessageExchangeStore with dump() (see PR #326, logging in the unit tests only as in #504 is just not useful for such kind of debugging), shows, that the exchanges are completed, but still stored with a "previous, not longer available token".

Until now, I didn't really find the root-cause for that, but a cure extending the ExchangeObserver with updateCurrentRequest and removing the exchange, when the token is changing, seems to solve it.

I continue to analyse the root cause, but for me, the above cure, looks much simpler as the "current complete" approach. Therefore I would also go to clean up that.

Does anyone have larger change in "core" open?

@sbernard31
Copy link
Contributor Author

Did you observe the leak in BlockwiseLayer too ?

@boaks
Copy link
Contributor

boaks commented Jan 30, 2018

Currently I only focused on the message exchange store.
But though the token used as key is not available from requests of the exchange, any "late clean up" is damned to fail ;-(.

@boaks
Copy link
Contributor

boaks commented Jan 30, 2018

Seems, that a blockwise notify is sometimes processed with a status of a normal blockwise response. The notify uses different token (response.getOptions().hasObserve()) to load the left blocks, but when assembled, status.isNotification() is used additionally and inhibit from calling completeCurrentRequest, but then overwrites the currentRequest with the origin request (using different token).

In the logs it seems, that this happens, if the processing of handleBlock2Response is delayed after receiveResponse (cleanup status).

So, I would got for using the extended ExchangeObserver and leave the status for now.

@sbernard31
Copy link
Contributor Author

Do you have any news or any work in progress about this ?

@boaks
Copy link
Contributor

boaks commented Feb 1, 2018

See PR #539

Currently I'm work on the improve the fix.

@boaks
Copy link
Contributor

boaks commented Feb 2, 2018

Cleaning up the exchange house-keeping shows, that there are too many race conditions on parallel processing the same exchange. Therefore I started to add a striped executor to the CoapEndpoint. This looks much better, but unfortunately I'm not ready :-).

I could provide a "early preview" for that work, but it would be too early to start discussing details.

@boaks
Copy link
Contributor

boaks commented Feb 12, 2018

With PR #551 my test didn't show any leaks.
It contains:

  • Correction of exchange house-keeping (encapsulate the house-keeping into exchange/exchange-observer).
  • Cleanup stuff (e.g. remove the response.last)
  • Use striped executor for solving threading issues around the exchange.
  • Redesign the blockwise layer, especially the synchronization.
  • Enhancement of ExtendedTestServer and BenchmarkClient to proof the fixes.

Remove the Californium???.properties before startin the test to ensure the right setup.

ExtendedTestServer: -d64 -Xmx2g -loopback -noPlugtest
BenchmarkClient: -d64 -Xmx2g coap://localhost:5783/reverse-observe?obs=25&res=feed-CON&rlen=400 100 3 x 2000 -500 1000
BenchmarkClient: -d64 -Xmx2g coap://localhost:5783/reverse-observe?obs=25&res=feed-NON&rlen=400 100 3 x 2000 -500 1000
BenchmarkClient: -d64 -Xmx2g coap://localhost:5783/benchmark 500 20000

Tests blockwise notifies (NON and CON) and normal requests. After the clients finished, wait until the pending request times out, then the stores should be empty (validated by logging the health status).

@sbernard31
Copy link
Contributor Author

I tried to reproduce leak using commit : 05b12ed
and command : coap://localhost:5783/reverse-observe?obs=25&res=feed&rlen=400" 100 3 x 2000 -500 1000

I only succeed 2 time on more than 15 tries.
Not directly linked but I face several

possible MID reuse before lifetime end for token [[099c046d69eecc77]], expected MID 59908 but received 59905

a few

11:02:19.738 ERROR [CoapEndpoint]: exception in protocol stage thread: MID must be a 16 bit unsigned int: -1
java.lang.IllegalArgumentException: MID must be a 16 bit unsigned int: -1
	at org.eclipse.californium.core.network.Exchange$KeyMID.<init>(Exchange.java:669)
	at org.eclipse.californium.core.network.Exchange$KeyMID.fromOutboundMessage(Exchange.java:741)
	at org.eclipse.californium.core.network.UdpMatcher.receiveResponse(UdpMatcher.java:277)
	at org.eclipse.californium.core.network.CoapEndpoint$InboxImpl.receiveResponse(CoapEndpoint.java:877)
	at org.eclipse.californium.core.network.CoapEndpoint$InboxImpl.receiveMessage(CoapEndpoint.java:796)

and a lot of

ignoring block2 response with wrong block number 0 (expected 1)

(I didn't investigate)


Then I try with commit b84062e
and coap://localhost:5783/reverse-observe?obs=25&res=feed-CON&rlen=400" 100 3 x 2000 -500 1000

I don't see any leak either, but not so relevant as I was not able to reproduce it too much before and I only tested 2 times because tests are far longer.

I sill face possible MID reuse before lifetime end for token and ignoring block2 response with wrong block number 0 (expected 1). I didn't face the MID -1 but this error seems more infrequent.


I don't know if it's relevant as test code changed between b84062e and 05b12ed
but it seems there is a performance impact (or at least tests execution time impact).

before : 
172966 notifies (794 notifies/s, 100 clients)
182360 notifies (939 notifies/s, 100 clients)
192080 notifies (972 notifies/s, 100 clients)
Benchmark clients finished.
300 requests sent, 300 expected
300 requests in 805ms, 372 reqs/s
200000 notifies sent, 200000 expected, 4419 observe requests
200000 notifies in 220410ms, 907 notifies/s
100 clients with 2 to 3 requests.

after : 
193595 notifies (264 notifies/s, 100 clients)
196203 notifies (260 notifies/s, 100 clients)
198756 notifies (255 notifies/s, 100 clients)
Benchmark clients finished.
300 requests sent, 300 expected
300 requests in 939ms, 319 reqs/s
200000 notifies sent, 200000 expected, 9323 observe requests
200000 notifies in 706818ms, 282 notifies/s
100 clients with 2 to 4 requests.

I would to know if you observe that too ?

I will now trying to read all the changes. 😓

@boaks
Copy link
Contributor

boaks commented Feb 13, 2018

05b12ed calls ever 250ms CoapResource.changed(),
b84062e creates random intervals between -500 (500ms after the transfer of the response completes) to 1000ms (1000ms after the last call of changed() and don't care, if the response is transfered).
So the different results didn't show a performance impact.

About the ignoring block2 response with wrong block number, I've seen them, but I didn't analyse it, because notifies during transfer of notifies will cause that. The leaks on my PCs depend more on the logging levels and settings, as this makes the race conditions more probable. The ExtendedTestServer showed them.

I didn't recognize the possible MID reuse before lifetime end for token but I will check fro them.

@boaks
Copy link
Contributor

boaks commented Feb 13, 2018

20:02:54.603 WARN [BlockwiseLayer]: ignoring block2 response with wrong block number 0 (expected 1): CON-2.05 MID=43163, Token=[31a2cb6e024138ba], OptionSet={"Observe":18, "Content-Format":"text/plain", "Block2":"(szx=2/64, m=true, num=0)", "Size2":400}, "hello 67 feed-183982****".. 64 bytes, (was CON-2.05 MID=43164, Token=[b3040e2bc0460b81], OptionSet={"Observe":18, "Content-Format":"text/plain", "Block2":"(szx=2/64, m=true, num=0)", "Size2":400}, "hello 67 feed-183980****".. 64 bytes)

Seems to be caused by a new observe with different token.

@boaks
Copy link
Contributor

boaks commented Feb 13, 2018

java.lang.IllegalArgumentException: MID must be a 16 bit unsigned int: -1
at org.eclipse.californium.core.network.Exchange$KeyMID.(Exchange.java:669)
at org.eclipse.californium.core.network.Exchange$KeyMID.fromOutboundMessage(Exchange.java:741)
at org.eclipse.californium.core.network.UdpMatcher.receiveResponse(UdpMatcher.java:277)

Seems to be a race condition when sending a new current request while receiving a response (reuse the token in blockwise causes such issues). Even with the striped exchange execution, this section is not protected.
I will try to fix it tomorrow.

@boaks
Copy link
Contributor

boaks commented Feb 14, 2018

I pushed a fix for the MID issues.

@sbernard31
Copy link
Contributor Author

sbernard31 commented Feb 14, 2018

05b12ed calls ever 250ms CoapResource.changed(),
b84062e creates random intervals between -500 (500ms after the transfer of the response completes) to 1000ms (1000ms after the last call of changed() and don't care, if the response is transfered).
So the different results didn't show a performance impact.

That could be great if we could have commit history in a way we can test before and after easily with exactly the same tests.

(by the way I have trouble with logback config as ext-plugtest-* depend on cf-plugtest-* and both have a logback.xml config)

@boaks
Copy link
Contributor

boaks commented Feb 14, 2018

That could be great if we could have commit history in a way we can test before and after easily with exactly the same tests.

Sorry, I will not be able to spent too much more time in this issue than the already spent 2 weeks.
My target was to fix the leak, not to benchmark the observe/notifies.

(by the way I have trouble with logback config as ext-plugtest-* depend on cf-plugtest-* and both have a logback.xml config)

Yes, I recognized this also. But the solution hints I found in the net, didn't work or would take too much time for now. If you know, how to fix it, go for it :-).

boaks pushed a commit that referenced this issue Jul 2, 2018
Extract CleanupMessageObserver from ExchangeCleanupLayer and add CON
response to cleanup into it. Add also a CleanupMessageObserver to
blockwise notifies in BlockwiseLayer.

Signed-off-by: Achim Kraus <[email protected]>
@boaks boaks mentioned this issue Jul 2, 2018
sbernard31 added a commit that referenced this issue Jul 2, 2018
sbernard31 added a commit that referenced this issue Jul 2, 2018
The idea is to keep "cleaning exchange" code closer to the "adding
exchange" one. Consequences : we don't need ExchangeCleanupLayer.
boaks pushed a commit that referenced this issue Jul 3, 2018
Extract CleanupMessageObserver from ExchangeCleanupLayer and add CON
response to cleanup into it. Add also a CleanupMessageObserver to
blockwise notifies in BlockwiseLayer.

Signed-off-by: Achim Kraus <[email protected]>
boaks pushed a commit that referenced this issue Jul 3, 2018
boaks pushed a commit that referenced this issue Jul 3, 2018
Extract CleanupMessageObserver from ExchangeCleanupLayer and add CON
response to cleanup into it. Add also a CleanupMessageObserver to
blockwise notifies in BlockwiseLayer.

Signed-off-by: Achim Kraus <[email protected]>
sbernard31 added a commit that referenced this issue Jul 3, 2018
sbernard31 added a commit that referenced this issue Jul 3, 2018
The idea is to keep "cleaning exchange" code closer to the "adding
exchange" one. Consequences : we don't need ExchangeCleanupLayer.
boaks pushed a commit that referenced this issue Jul 4, 2018
boaks pushed a commit that referenced this issue Jul 4, 2018
Extract CleanupMessageObserver from ExchangeCleanupLayer and add CON
response to cleanup into it. Add also a CleanupMessageObserver to
blockwise notifies in BlockwiseLayer.

Signed-off-by: Achim Kraus <[email protected]>
@sbernard31
Copy link
Contributor Author

We integrated Californium 2.0.0-M11. It behaves better about memory leak.
But I have still some residual memory leak. Something like less than 1 leak by hour.

To summarize progress :
2.0.0-M9 -> several hundred leaks by minute
2.0.0-M10 -> 1 or 2 leaks by minute
2.0.0-M11 -> less than 1 leak by hour

I will investigate this last? leak but for now, I haven't any clue :-/

@boaks
Copy link
Contributor

boaks commented Jul 23, 2018

2.0.0-M11 -> less than 1 leak by hour

Not that bad :-). Still in the exchange store?

@sbernard31
Copy link
Contributor Author

Still in the exchange store?

Yep

@boaks
Copy link
Contributor

boaks commented Jul 23, 2018

DTLSConnector:

	@Override
	public final void send(final RawData msg) {
		if (msg == null) {
			throw new NullPointerException("Message must not be null");
		} else if (!running.get()) {
			throw new IllegalStateException("connector must be started before sending messages is possible");
		} else if (msg.getSize() > MAX_PLAINTEXT_FRAGMENT_LENGTH) {
			throw new IllegalArgumentException("Message data must not exceed "
					+ MAX_PLAINTEXT_FRAGMENT_LENGTH + " bytes");
		} else {
			if (pendingOutboundMessages.decrementAndGet() >= 0) {
				...
			}
			else {
				pendingOutboundMessages.incrementAndGet();
				LOGGER.warn("Outbound message overflow! Dropping outbound message to peer [{}]",
						msg.getInetSocketAddress());
			}
		}
	}

Does this "overflow" occur?
If so, may be adding a onError() there helps.

@sbernard31
Copy link
Contributor Author

sbernard31 commented Jul 23, 2018

Does this "overflow" occur?

No. I don't see warning like this.

@sbernard31
Copy link
Contributor Author

sbernard31 commented Jul 23, 2018

I have a track !

I'm logging exchanges which are leaking. For now, I notice this is only exchanges about coap request which are acknowledged but without any response (no observe, no blockmode)

In our server application, we have LWM2M features and pure CoAP features.
LWM2M features are using the Leshan sender and so benefits of response timeout.
For our pure CoAP feature, we are using californium API and we don't do anything if we never get response...
That's means if request is acknowledged and we never receive separated response, the exchange stay in store for ever.

If next logs confirm this, that means the issue is not in californium ! 🎉 🎊 🎉 !

I added a CoAP API in Leshan to be able to send pure CoAP request to a device registered with LWM2M. This API handle response timeout, so switching to this API should fix the leak.

@sbernard31
Copy link
Contributor Author

All the logs show that leaks follow the pattern described above. So I pretty confident that the last remaining leaks are not because of californium issues. \o/

@boaks
Copy link
Contributor

boaks commented Sep 24, 2018

Any news on this?

@sbernard31
Copy link
Contributor Author

Unfortunately, the new CoAP API in Leshan is still not used in our product. So I can not check if my supposition is true. As I'm pretty confident, if you want we can close this and I will reopen if I was wrong.

@sbernard31
Copy link
Contributor Author

We recently integrated the new CoAP API of Leshan (with response timeout) in production. And until now 4 days without any leaks observed.

@sbernard31
Copy link
Contributor Author

No leak in 10 days now. I think we can close this !

@phani50
Copy link

phani50 commented Mar 19, 2019

Hi,

When we are doing the performance test, we are facing the below issue,

2019-03-19T12:04:38.386Z org.eclipse.californium.core.network.CoapEndpoint SEVERE: Exception in protocol stage thread: message with already registered ID [51340] is not a re-transmission, cannot register exchange

I am not understanding what is happening here. Can you please help me ?

@boaks
Copy link
Contributor

boaks commented Mar 19, 2019

Please open a new issue.
Provide the requested information (branch, logs) the best you can.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants