Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
adfbe3d
register pipe listener
lambdai Jun 22, 2020
5fd9c16
bb client pipe
lambdai Jun 23, 2020
19537cd
add buffer source socket
lambdai Jun 23, 2020
6b015ea
create peer pipe
lambdai Jun 24, 2020
79dfdc1
end2end connect write read but not close
lambdai Jun 25, 2020
f9a97c8
fix buffer source socket
lambdai Jun 29, 2020
33d6c35
fix connection accounting
lambdai Jun 29, 2020
e22c068
add close for client and server pipe
lambdai Jun 30, 2020
0e2c2af
Merge branch 'master' into hostconn
lambdai Aug 6, 2020
facee70
fix build
lambdai Aug 6, 2020
acdd921
introduce new internal address type
lambdai Aug 6, 2020
efd6236
InternalAddressIntegrationTest
lambdai Aug 7, 2020
b14eaf6
add enableXXX
lambdai Aug 13, 2020
38e30fb
debugging hooks
lambdai Aug 15, 2020
61e0013
internal address integration test
lambdai Aug 15, 2020
d4952ef
add WritablePeer replacing peer buffer
lambdai Aug 15, 2020
575f5db
make socket closed, crashing at destroy connection referring peer
lambdai Aug 15, 2020
a4ee041
split pipe_connection_impl
lambdai Aug 16, 2020
302b0f0
Buffersourcesocket: read_end_stream_ and 0 buffer is not an error
lambdai Aug 16, 2020
5c8abd7
TcpProxyDownstreamDisconnect is flaky
lambdai Aug 16, 2020
54eb37f
deliver FIN to filter manager
lambdai Aug 16, 2020
238e09c
add large write test
lambdai Aug 16, 2020
6cb71e1
pipe connection test
lambdai Aug 17, 2020
ee4cebd
client close test
lambdai Aug 17, 2020
04b3e03
fix readDisable and add tests
lambdai Aug 18, 2020
7b57443
add KickUndone
lambdai Aug 18, 2020
11320af
ReadDisableAfterCloseHandledGracefully
lambdai Aug 18, 2020
86baffb
add check peer overhighmark and fix WriteWatermarks test
lambdai Aug 18, 2020
9a0cebb
split readmark test and pass 1st
lambdai Aug 19, 2020
94219f3
pass all readdisabled test
lambdai Aug 19, 2020
8fca27a
fix tests, working on separate inline read at internal address
lambdai Aug 20, 2020
f2c43e3
removing inline rw, internal integration test failure b/c loop timeout
lambdai Aug 22, 2020
91624e7
fix internal address integration test
lambdai Aug 22, 2020
1664639
fixing pipe conn impl test: closable
lambdai Aug 23, 2020
a87051b
fix ReadEnableDispatches, need to set write ready as edge trigger
lambdai Aug 23, 2020
525c20b
fixing pipe conn test by edge trigger writable event
lambdai Aug 24, 2020
b77de92
fixing
lambdai Aug 24, 2020
8caf229
cleanup
lambdai Aug 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/envoy/config/core/v3/address.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ message Pipe {
uint32 mode = 2 [(validate.rules).uint32 = {lte: 511}];
}

message EnvoyInternalAddress {
// A special listener as the peer address.
string peer_listener_name = 1 [(validate.rules).string = {min_bytes: 1}];
}

// [#next-free-field: 7]
message SocketAddress {
option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.core.SocketAddress";
Expand Down Expand Up @@ -129,6 +134,8 @@ message Address {
SocketAddress socket_address = 1;

Pipe pipe = 2;

EnvoyInternalAddress envoy_internal_address = 3;
}
}

Expand Down
10 changes: 10 additions & 0 deletions api/envoy/config/core/v4alpha/address.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions generated_api_shadow/envoy/config/core/v3/address.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions generated_api_shadow/envoy/config/core/v4alpha/address.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions include/envoy/buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,31 @@ class BufferFragment {
*/
virtual void done() PURE;
};
class BufferIdGenerator {
public:
static int nextId() {
static int next_id_ = 0;
return ++next_id_;
}
};

class TestBufferIdGenerator {
public:
static int nextId() {
static int next_id_ = 1000;
return ++next_id_;
}
};
/**
* A basic buffer abstraction.
*/
class Instance {
public:
Instance() : buffer_id_(BufferIdGenerator::nextId()) {}
Instance(int) : buffer_id_(TestBufferIdGenerator::nextId()) {}

int buffer_id_;
int bid() const { return buffer_id_; }
virtual ~Instance() = default;

/**
Expand Down
14 changes: 12 additions & 2 deletions include/envoy/event/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "envoy/stream_info/stream_info.h"
#include "envoy/thread/thread.h"

#include "common/common/macros.h"

namespace Envoy {
namespace Event {

Expand Down Expand Up @@ -113,6 +115,14 @@ class Dispatcher {
Network::TransportSocketPtr&& transport_socket,
const Network::ConnectionSocket::OptionsSharedPtr& options) PURE;

virtual Network::ClientConnectionPtr
createUserspacePipe(Network::Address::InstanceConstSharedPtr address,
Network::Address::InstanceConstSharedPtr local_address) {
UNREFERENCED_PARAMETER(address);
UNREFERENCED_PARAMETER(local_address);
return nullptr;
}

/**
* Creates an async DNS resolver. The resolver should only be used on the thread that runs this
* dispatcher.
Expand Down Expand Up @@ -151,8 +161,8 @@ class Dispatcher {
* @return Network::ListenerPtr a new listener that is owned by the caller.
*/
virtual Network::ListenerPtr createListener(Network::SocketSharedPtr&& socket,
Network::ListenerCallbacks& cb,
bool bind_to_port) PURE;
Network::ListenerCallbacks& cb, bool bind_to_port,
const std::string& name) PURE;

/**
* Creates a logical udp listener on a specific port.
Expand Down
43 changes: 32 additions & 11 deletions include/envoy/network/address.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class Ipv6 {
virtual absl::uint128 address() const PURE;

/**
* @return true if address is Ipv6 and Ipv4 compatibility is disabled, false otherwise
* @return true if address is Ipv6 and Ipv4 compatibility is disabled, false
* otherwise
*/
virtual bool v6only() const PURE;
};
Expand All @@ -69,25 +70,27 @@ class Ip {
virtual bool isAnyAddress() const PURE;

/**
* @return whether this address is a valid unicast address, i.e., not an wild card, broadcast, or
* multicast address.
* @return whether this address is a valid unicast address, i.e., not an wild
* card, broadcast, or multicast address.
*/
virtual bool isUnicastAddress() const PURE;

/**
* @return Ipv4 address data IFF version() == IpVersion::v4, otherwise nullptr.
* @return Ipv4 address data IFF version() == IpVersion::v4, otherwise
* nullptr.
*/
virtual const Ipv4* ipv4() const PURE;

/**
* @return Ipv6 address data IFF version() == IpVersion::v6, otherwise nullptr.
* @return Ipv6 address data IFF version() == IpVersion::v6, otherwise
* nullptr.
*/
virtual const Ipv6* ipv6() const PURE;

/**
* @return the port associated with the address. Port may be zero if not specified, not
* determinable before socket creation, or not applicable.
* The port is in host byte order.
* @return the port associated with the address. Port may be zero if not
* specified, not determinable before socket creation, or not applicable. The
* port is in host byte order.
*/
virtual uint32_t port() const PURE;

Expand All @@ -114,7 +117,17 @@ class Pipe {
virtual mode_t mode() const PURE;
};

enum class Type { Ip, Pipe };
class EnvoyInternalAddress {
public:
virtual ~EnvoyInternalAddress() = default;

/**
* @return an listener name as the peer address.
*/
virtual const std::string& listenerName() const PURE;
};

enum class Type { Ip, Pipe, EnvoyInternal };

/**
* Interface for all network addresses.
Expand Down Expand Up @@ -153,15 +166,23 @@ class Instance {
virtual const std::string& logicalName() const PURE;

/**
* @return the IP address information IFF type() == Type::Ip, otherwise nullptr.
* @return the IP address information IFF type() == Type::Ip, otherwise
* nullptr.
*/
virtual const Ip* ip() const PURE;

/**
* @return the pipe address information IFF type() == Type::Pipe, otherwise nullptr.
* @return the pipe address information IFF type() == Type::Pipe, otherwise
* nullptr.
*/
virtual const Pipe* pipe() const PURE;

/**
* @return the envoy internal address information IFF type() ==
* Type::EnvoyInternal, otherwise nullptr.
*/
virtual const EnvoyInternalAddress* envoyInternalAddress() const PURE;

/**
* @return the underlying structure wherein the address is stored
*/
Expand Down
6 changes: 6 additions & 0 deletions include/envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include "envoy/network/listen_socket.h"
#include "envoy/stats/scope.h"

#include "common/common/macros.h"

namespace Envoy {
namespace Network {

Expand Down Expand Up @@ -169,6 +171,10 @@ class ListenerCallbacks {
*/
virtual void onAccept(ConnectionSocketPtr&& socket) PURE;

virtual void setupNewConnection(ConnectionPtr server_conn, ConnectionSocketPtr socket) {
UNREFERENCED_PARAMETER(server_conn);
UNREFERENCED_PARAMETER(socket);
}
/**
* Called when a new connection is rejected.
*/
Expand Down
71 changes: 71 additions & 0 deletions include/envoy/network/transport_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
#include "envoy/network/proxy_protocol.h"
#include "envoy/ssl/connection.h"

#include "common/common/assert.h"
#include "common/common/logger.h"

#include "absl/types/optional.h"

namespace Envoy {
Expand Down Expand Up @@ -98,6 +101,20 @@ class TransportSocketCallbacks {
*/
class TransportSocket {
public:
bool see_write_end_stream_{false};

void lambdaiCheckSeeEndStream(bool end_stream) {
if (see_write_end_stream_) {
RELEASE_ASSERT(end_stream,
"once end_stream is seen, the follow up write must provide end_stream = true");
} else {
if (end_stream) {
see_write_end_stream_ = true;
ENVOY_LOG_MISC(debug, "lambdai: first see end_stream in this buffer socket");
}
}
}

virtual ~TransportSocket() = default;

/**
Expand Down Expand Up @@ -239,5 +256,59 @@ class TransportSocketFactory {

using TransportSocketFactoryPtr = std::unique_ptr<TransportSocketFactory>;

class WritablePeer {
public:
virtual ~WritablePeer() = default;

/**
* Set the flag to indicate no further write.
*/
virtual void setWriteEnd() PURE;

/**
* Notify that consumable data arrives. The consumable data can be either data to read, or the end
* of stream event.
*/
virtual void maybeSetNewData() PURE;

/**
* @return the buffer to be written.
*/
virtual Buffer::Instance* getWriteBuffer() PURE;

/**
* @return false more data is acceptable.
*/
virtual bool isOverHighWatermark() const PURE;

virtual bool triggeredHighToLowWatermark() const PURE;
virtual void clearTriggeredHighToLowWatermark() PURE;
virtual void setTriggeredHighToLowWatermark() PURE;
};

class ReadableSource {
public:
virtual ~ReadableSource() = default;

/**
* Read the flag to indicate no further write. Used by early close detection.
*/
virtual bool isPeerShutDownWrite() const PURE;

virtual bool isOverHighWatermark() const PURE;
virtual bool isReadable() const PURE;
};

class EventSchedulable {
public:
virtual ~EventSchedulable() = default;

virtual void scheduleNextEvent() PURE;

virtual void scheduleWriteEvent() PURE;
virtual void scheduleReadEvent() PURE;
virtual void scheduleClosedEvent() PURE;
};

} // namespace Network
} // namespace Envoy
Loading