Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Executor Service utilisation #1

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions src/main/java/pe/pi/sctp4j/sctp/small/CachedBlockingSCTPStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2017 pi.pe gmbh .
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package pe.pi.sctp4j.sctp.small;

import com.phono.srtplight.Log;
import pe.pi.sctp4j.sctp.SCTPMessage;
import pe.pi.sctp4j.sctp.SCTPStream;
import pe.pi.sctp4j.sctp.dataChannel.DECP.DCOpen;
import pe.pi.sctp4j.sctp.messages.DataChunk;

import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
*
* @author Westhawk Ltd<[email protected]>
*/
public class CachedBlockingSCTPStream extends SCTPStream {

private HashMap<Integer, SCTPMessage> undeliveredOutboundMessages = new HashMap();
private final CachedThreadedAssociation _ta;
private final ExecutorService _ex;

CachedBlockingSCTPStream(CachedThreadedAssociation a, Integer id) {
super(a, id);
_ex = a.getExecutorService();
_ta = a;
}

@Override
synchronized public void send(String message) throws Exception {
SCTPMessage m = _ta.makeMessage(message, this);
undeliveredOutboundMessages.put(m.getSeq(), m);
_ta.sendAndBlock(m);
}

@Override
synchronized public void send(byte[] message) throws Exception {
SCTPMessage m = _ta.makeMessage(message, this);
undeliveredOutboundMessages.put(m.getSeq(), m);
_ta.sendAndBlock(m);
}

@Override
public void send(DCOpen message) throws Exception {
SCTPMessage m = _ta.makeMessage(message, this);
undeliveredOutboundMessages.put(m.getSeq(), m);
Log.debug("About to send message for dcep size is " + m.getData().length);
_ta.sendAndBlock(m);
}

@Override
public void deliverMessage(SCTPMessage message) {
_ex.execute(message);
}

@Override
protected void alOnDCEPStream(SCTPStream _stream, String label, int _pPid) throws Exception {
_ex.execute(() -> {
try {
super.alOnDCEPStream(_stream, label, _pPid);
} catch (Exception ex) {
Log.error("can't notify DCEPStream " + ex.getMessage());
}
});
}

@Override
public void delivered(DataChunk d) {
int f = d.getFlags();
if ((f & DataChunk.ENDFLAG) > 0) {
int ssn = d.getSSeqNo();
SCTPMessage st = undeliveredOutboundMessages.remove(ssn);
if (st != null) {
st.acked();
}
}
}

@Override
public boolean idle() {
return undeliveredOutboundMessages.isEmpty();
}

@Override
public void close() throws Exception {
super.close();
if ((_ex != null) && (!_ex.isShutdown())) {
_ex.shutdownNow();
Log.debug("shutdown of Stream-" + this.getNum() + "-Exec");
}
}
}
Loading