Skip to content

Commit

Permalink
remove ByteBufferWithInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
russgold committed Jul 22, 2012
1 parent fa18444 commit 12c3490
Show file tree
Hide file tree
Showing 20 changed files with 227 additions and 453 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
package com.sun.corba.ee.impl.encoding;

import java.nio.ByteBuffer;
import com.sun.corba.ee.impl.encoding.ByteBufferWithInfo;
import com.sun.corba.ee.impl.protocol.giopmsgheaders.FragmentMessage;
import com.sun.corba.ee.impl.protocol.giopmsgheaders.Message;

Expand All @@ -55,18 +54,18 @@ public interface BufferManagerRead
* in a fragment map (when collecting - GIOP 1.2 phase 1) or
* in an active server requests map (when streaming - GIOP 1.2 phase 2).
*
* As a model for implementation see IIOPInputStream's
* As a model for implementation see IIOPInputStream's
* constructor of the same name. There are going to be some variations.
*
*/

public void processFragment ( ByteBuffer byteBuffer,
public void processFragment ( ByteBuffer byteBuffer,
FragmentMessage header);


/**
* Case: called from CDRInputStream constructor before unmarshaling.
*
*
* Does:
*
* this.bufQ.get()
Expand All @@ -78,7 +77,7 @@ public void processFragment ( ByteBuffer byteBuffer,
/**
* Invoked when we run out of data to read. Obtains more data from the stream.
*/
ByteBufferWithInfo underflow (ByteBufferWithInfo bbwi);
ByteBuffer underflow(ByteBuffer byteBuffer);

/**
* Returns true if this buffer manager reads fragments when it underflows.
Expand All @@ -104,5 +103,5 @@ public void processFragment ( ByteBuffer byteBuffer,
/*
* Close BufferManagerRead and perform any oustanding cleanup.
*/
public void close(ByteBufferWithInfo bbwi);
public void close(ByteBuffer byteBuffer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
package com.sun.corba.ee.impl.encoding;

import java.nio.ByteBuffer;
import com.sun.corba.ee.spi.orb.ORB;

import com.sun.corba.ee.impl.protocol.giopmsgheaders.FragmentMessage;
import com.sun.corba.ee.impl.protocol.giopmsgheaders.Message;
import com.sun.corba.ee.spi.logging.ORBUtilSystemException;
Expand All @@ -60,7 +60,7 @@ public void processFragment (ByteBuffer byteBuffer, FragmentMessage header)

public void init(Message msg) {}

public ByteBufferWithInfo underflow (ByteBufferWithInfo bbwi) {
public ByteBuffer underflow(ByteBuffer byteBuffer) {
throw wrapper.unexpectedEof() ;
}

Expand Down Expand Up @@ -88,7 +88,7 @@ public void mark(RestorableInputStream is) {
}

// This will never happen
public void fragmentationOccured(ByteBufferWithInfo newFragment) {}
public void fragmentationOccured(ByteBuffer byteBuffer) {}

public void reset() {

Expand All @@ -101,5 +101,5 @@ public void reset() {
}

// Nothing to close and cleanup.
public void close(ByteBufferWithInfo bbwi) {}
public void close(ByteBuffer byteBuffer) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,13 @@ private void bufferMessage(String msg, int bbAddr, String tail) {}

@Transport
public void processFragment(ByteBuffer byteBuffer, FragmentMessage msg) {
ByteBufferWithInfo bbwi = new ByteBufferWithInfo(byteBuffer);
bbwi.position(msg.getHeaderLength());
byteBuffer.position(msg.getHeaderLength());

synchronized (fragmentQueue) {
if (orb.transportDebugFlag) {
logBufferMessage("processFragment() - queuing ByteByffer id (", byteBuffer, ") to fragment queue.");
}
fragmentQueue.enqueue(bbwi);
fragmentQueue.enqueue(byteBuffer);
endOfStream = !msg.moreFragmentsToFollow();
fragmentQueue.notify();
}
Expand All @@ -106,9 +105,9 @@ private void underflowMessage(String msg, int rid) {
}

@Transport
public ByteBufferWithInfo underflow(ByteBufferWithInfo bbwi) {
public ByteBuffer underflow(ByteBuffer byteBuffer) {

ByteBufferWithInfo result;
ByteBuffer result;

synchronized (fragmentQueue) {

Expand Down Expand Up @@ -143,10 +142,10 @@ public ByteBufferWithInfo underflow(ByteBufferWithInfo bbwi) {
result = fragmentQueue.dequeue();

// VERY IMPORTANT
// Release bbwi.byteBuffer to the ByteBufferPool only if
// Release byteBuffer to the ByteBufferPool only if
// this BufferManagerStream is not marked for potential restore.
if (!markEngaged && bbwi != null) {
getByteBufferPool().releaseByteBuffer(bbwi.getByteBuffer());
if (!markEngaged && byteBuffer != null) {
getByteBufferPool().releaseByteBuffer(byteBuffer);
}
}
return result;
Expand All @@ -163,14 +162,13 @@ public void init(Message msg) {
}
}

// Release any queued ByteBufferWithInfo's byteBuffers to the
// ByteBufferPoool
// Release any queued byteBuffers to the ByteBufferPoool
@Transport
public void close(ByteBufferWithInfo bbwi) {
public void close(ByteBuffer byteBuffer) {
int inputBbAddress = 0;

if (bbwi != null) {
inputBbAddress = System.identityHashCode(bbwi.getByteBuffer());
if (byteBuffer != null) {
inputBbAddress = System.identityHashCode(byteBuffer);
}
ByteBufferPool byteBufferPool = getByteBufferPool();

Expand All @@ -184,11 +182,11 @@ public void close(ByteBufferWithInfo bbwi) {
// on the stack. If one is found to equal, it will
// not be released to the ByteBufferPool.

ByteBufferWithInfo abbwi;
ByteBuffer aBuffer;
while (fragmentQueue.size() != 0) {
abbwi = fragmentQueue.dequeue();
if (abbwi != null) {
byteBufferPool.releaseByteBuffer(abbwi.getByteBuffer());
aBuffer = fragmentQueue.dequeue();
if (aBuffer != null) {
byteBufferPool.releaseByteBuffer(aBuffer);
}
}
}
Expand All @@ -204,10 +202,10 @@ public void close(ByteBufferWithInfo bbwi) {
// on the stack. If one is found to equal, it will
// not be released to the ByteBufferPool.

for (ByteBufferWithInfo abbwi : fragmentStack) {
if (abbwi != null) {
if (inputBbAddress != System.identityHashCode(abbwi.getByteBuffer())) {
byteBufferPool.releaseByteBuffer(abbwi.getByteBuffer());
for (ByteBuffer aBuffer : fragmentStack) {
if (aBuffer != null) {
if (inputBbAddress != System.identityHashCode(aBuffer)) {
byteBufferPool.releaseByteBuffer(aBuffer);
}
}
}
Expand All @@ -230,7 +228,7 @@ protected ByteBufferPool getByteBufferPool() {

// List of fragment ByteBufferWithInfos received since
// the mark was engaged.
private LinkedList<ByteBufferWithInfo> fragmentStack = null;
private LinkedList<ByteBuffer> fragmentStack = null;
private RestorableInputStream inputStream = null;

// Original state of the stream
Expand All @@ -250,20 +248,16 @@ public void mark(RestorableInputStream inputStream) {
}

// Collects fragments received since the mark was engaged.
public void fragmentationOccured(ByteBufferWithInfo newFragment) {
public void fragmentationOccured(ByteBuffer newFrament) {
if (!markEngaged) {
return;
}

if (fragmentStack == null) {
fragmentStack =
new LinkedList<ByteBufferWithInfo>();
fragmentStack = new LinkedList<ByteBuffer>();
}

ByteBufferWithInfo bbwi = newFragment.duplicate();
bbwi.limit(newFragment.limit());
bbwi.position(newFragment.position());
fragmentStack.addFirst(bbwi);
fragmentStack.addFirst(newFrament.duplicate());
}

public void reset() {
Expand All @@ -280,8 +274,8 @@ public void reset() {
if (fragmentStack != null && fragmentStack.size() != 0) {

synchronized (fragmentQueue) {
for (ByteBufferWithInfo bbwi : fragmentStack) {
fragmentQueue.push(bbwi);
for (ByteBuffer aBuffer : fragmentStack) {
fragmentQueue.push(aBuffer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

import com.sun.corba.ee.spi.logging.ORBUtilSystemException;

import java.nio.ByteBuffer;

/**
* Defines the contract between the BufferManager and
* CDR stream on the writing side. The CDR stream
Expand Down Expand Up @@ -85,7 +87,7 @@ public boolean sentFullMessage() {
/*
* Invoked when we run out of room to write. Must either expand the buffer or send it as a fragment and clear it.
*/
public abstract ByteBufferWithInfo overflow(ByteBufferWithInfo bbwi, int numBytesNeeded);
protected abstract ByteBuffer overflow(ByteBuffer byteBuffer, int numBytesNeeded);

/**
* Returns true if this buffer manager fragments when an overflow occurs.
Expand Down Expand Up @@ -120,6 +122,7 @@ public boolean sentFullMessage() {

public abstract void sendMessage ();


/**
* A reference to the connection level stream will be required when
* sending fragments.
Expand All @@ -128,7 +131,6 @@ public void setOutputObject(Object outputObject) {
this.outputObject = outputObject;
}


/**
* Close the BufferManagerWrite and do any outstanding cleanup.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,21 @@ public int getBufferSize() {
return orb.getORBData().getGIOPBufferSize();
}

public ByteBufferWithInfo overflow(ByteBufferWithInfo bbwi, int numBytesNeeded) {
int newLength = bbwi.limit() * 2;
@Override
protected ByteBuffer overflow(ByteBuffer byteBuffer, int numBytesNeeded) {
int newLength = byteBuffer.limit() * 2;

while (bbwi.position() + numBytesNeeded >= newLength)
while (byteBuffer.position() + numBytesNeeded >= newLength)
newLength = newLength * 2;

ByteBufferPool byteBufferPool = orb.getByteBufferPool();
ByteBuffer newBB = byteBufferPool.getByteBuffer(newLength);

bbwi.flip();
newBB.put(bbwi.toByteBuffer());

byteBufferPool.releaseByteBuffer(bbwi.getByteBuffer());
byteBuffer.flip();
newBB.put(byteBuffer);

return new ByteBufferWithInfo(newBB);
byteBufferPool.releaseByteBuffer(byteBuffer);
return newBB;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
*/
package com.sun.corba.ee.impl.encoding;

import java.nio.ByteBuffer;
import java.util.EmptyStackException;

import sun.corba.Bridge;
Expand Down Expand Up @@ -80,16 +81,15 @@ public int getBufferSize() {
return orb.getORBData().getGIOPFragmentSize();
}

public ByteBufferWithInfo overflow(ByteBufferWithInfo bbwi, int numBytesNeeded)
{
protected ByteBuffer overflow(ByteBuffer byteBuffer, int numBytesNeeded) {
// Set the fragment's moreFragments field to true
MessageBase.setFlag(bbwi, Message.MORE_FRAGMENTS_BIT);
MessageBase.setFlag(byteBuffer, Message.MORE_FRAGMENTS_BIT);

try {
sendFragment(false);
} catch (SystemException se) {
// REVISIT: this part similar to
// CorbaClientRequestDispatchImpl.beginRequest()
// REVISIT: this part similar to
// CorbaClientRequestDispatchImpl.beginRequest()
// and CorbaClientRequestDelegate.request()
ContactInfoListIterator itr;
try {
Expand All @@ -98,15 +98,15 @@ public ByteBufferWithInfo overflow(ByteBufferWithInfo bbwi, int numBytesNeeded)
// server side, don't reportException
throw se;
}

// bug 6382377: must not lose exception in PI
orb.getPIHandler().invokeClientPIEndingPoint( ReplyMessage.SYSTEM_EXCEPTION, se ) ;

boolean retry = itr.reportException(null, se);
if (retry) {
Bridge bridge = Bridge.get();
bridge.throwException(new RemarshalException());
} else {
} else {
// re-throw the SystemException
throw se;
}
Expand All @@ -117,18 +117,18 @@ public ByteBufferWithInfo overflow(ByteBufferWithInfo bbwi, int numBytesNeeded)
// REVISIT - need to account for case when needed > available
// even after fragmenting. This is the large array case, so
// the caller should retry when it runs out of space.
bbwi.position(0);
bbwi.limit(bbwi.capacity());
byteBuffer.position(0);
byteBuffer.limit(byteBuffer.capacity());

// Now we must marshal in the fragment header/GIOP header

// REVISIT - we can optimize this by not creating the fragment message
// each time.
// each time.

FragmentMessage header = ((CDROutputObject)outputObject).getMessageHeader().createFragmentMessage();

header.write(((CDROutputObject)outputObject));
return bbwi;
return byteBuffer;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,24 @@
*/
package com.sun.corba.ee.impl.encoding;

import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.LinkedList;

/**
* Simple unsynchronized queue implementation for ByteBufferWithInfos.
* Simple unsynchronized queue implementation for ByteBuffer.
*/
public class BufferQueue
{
private LinkedList<ByteBufferWithInfo> list =
new LinkedList<ByteBufferWithInfo>();
private LinkedList<ByteBuffer> list =
new LinkedList<ByteBuffer>();

public void enqueue(ByteBufferWithInfo item)
public void enqueue(ByteBuffer item)
{
list.addLast(item);
}

public ByteBufferWithInfo dequeue() throws NoSuchElementException
public ByteBuffer dequeue() throws NoSuchElementException
{
return list.removeFirst();
}
Expand All @@ -66,9 +66,8 @@ public int size()
return list.size();
}

// Adds the given ByteBufferWithInfo to the front
// of the queue.
public void push(ByteBufferWithInfo item)
// Adds the given ByteBuffer to the front of the queue.
public void push(ByteBuffer item)
{
list.addFirst(item);
}
Expand Down
Loading

0 comments on commit 12c3490

Please sign in to comment.