Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions library/cc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ envoy_cc_library(
"bridge_utility.cc",
"engine.cc",
"engine_builder.cc",
"engine_callbacks.cc",
"headers.cc",
"headers_builder.cc",
"log_level.cc",
Expand All @@ -33,6 +34,7 @@ envoy_cc_library(
"bridge_utility.h",
"engine.h",
"engine_builder.h",
"engine_callbacks.h",
"envoy_error.h",
"headers.h",
"headers_builder.h",
Expand Down
23 changes: 9 additions & 14 deletions library/cc/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,17 @@
namespace Envoy {
namespace Platform {

Engine::Engine(envoy_engine_t engine, const std::string& configuration, LogLevel log_level)
: engine_(engine), terminated_(false) {
run_engine(this->engine_, configuration.c_str(), logLevelToString(log_level).c_str());

this->stream_client_ = std::make_shared<StreamClient>(this->engine_);
this->pulse_client_ = std::make_shared<PulseClient>();
}

Engine::~Engine() {
if (!this->terminated_) {
terminate_engine(this->engine_);
}
Engine::Engine(envoy_engine_t engine) : engine_(engine), terminated_(false) {}

// we lazily construct the stream and pulse clients
// because they either require or will require a weak ptr
// which can't be provided from inside of the constructor
// because of how std::enable_shared_from_this works
StreamClientSharedPtr Engine::streamClient() {
return std::make_shared<StreamClient>(this->shared_from_this());
}

StreamClientSharedPtr Engine::streamClient() { return this->stream_client_; }
PulseClientSharedPtr Engine::pulseClient() { return this->pulse_client_; }
PulseClientSharedPtr Engine::pulseClient() { return std::make_shared<PulseClient>(); }

void Engine::terminate() {
if (this->terminated_) {
Expand Down
19 changes: 7 additions & 12 deletions library/cc/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,25 @@
namespace Envoy {
namespace Platform {

struct EngineCallbacks {
std::function<void()> on_engine_running;
// unused:
// std::function<void()> on_exit;
};

using EngineCallbacksSharedPtr = std::shared_ptr<EngineCallbacks>;
class StreamClient;
using StreamClientSharedPtr = std::shared_ptr<StreamClient>;

class Engine {
class Engine : public std::enable_shared_from_this<Engine> {
public:
~Engine();

StreamClientSharedPtr streamClient();
PulseClientSharedPtr pulseClient();

void terminate();

private:
Engine(envoy_engine_t engine, const std::string& configuration, LogLevel log_level);
Engine(envoy_engine_t engine);

// required to access private constructor
friend class EngineBuilder;
// required to use envoy_engine_t without exposing it publicly
friend class StreamPrototype;

envoy_engine_t engine_;
EngineCallbacksSharedPtr callbacks_;
StreamClientSharedPtr stream_client_;
PulseClientSharedPtr pulse_client_;
bool terminated_;
Expand Down
33 changes: 8 additions & 25 deletions library/cc/engine_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,6 @@
namespace Envoy {
namespace Platform {

namespace {

void c_on_engine_running(void* context) {
EngineCallbacks* engine_callbacks = static_cast<EngineCallbacks*>(context);
engine_callbacks->on_engine_running();
}

void c_on_exit(void* context) {
// NOTE: this function is intentionally empty
// as we don't actually do any post-processing on exit.
(void)context;
}

} // namespace

EngineBuilder::EngineBuilder(std::string config_template) : config_template_(config_template) {}
EngineBuilder::EngineBuilder() : EngineBuilder(std::string(config_template)) {}

Expand Down Expand Up @@ -120,16 +105,14 @@ EngineSharedPtr EngineBuilder::build() {
.release = envoy_noop_const_release,
.context = nullptr,
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would need to verify, but by convention, it should be safe to simply calloc this (or any other envoy mobile struct) if you're not going to use it.


envoy_engine_callbacks envoy_callbacks{
.on_engine_running = &c_on_engine_running,
.on_exit = &c_on_exit,
.context = this->callbacks_.get(),
};

Engine* engine =
new Engine(init_engine(envoy_callbacks, null_logger), config_str, this->log_level_);
return EngineSharedPtr(engine);
auto envoy_engine = init_engine(this->callbacks_->asEnvoyEngineCallbacks(), null_logger);
run_engine(envoy_engine, config_str.c_str(), logLevelToString(this->log_level_).c_str());

// we can't construct via std::make_shared
// because Engine is only constructible as a friend
Engine* engine = new Engine(envoy_engine);
auto engine_ptr = EngineSharedPtr(engine);
return engine_ptr;
}

} // namespace Platform
Expand Down
1 change: 1 addition & 0 deletions library/cc/engine_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <string>

#include "engine.h"
#include "engine_callbacks.h"
#include "log_level.h"

namespace Envoy {
Expand Down
29 changes: 29 additions & 0 deletions library/cc/engine_callbacks.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include "engine_callbacks.h"

namespace Envoy {
namespace Platform {

namespace {

void c_on_engine_running(void* context) {
auto engine_callbacks = *static_cast<EngineCallbacksSharedPtr*>(context);
engine_callbacks->on_engine_running();
}

void c_on_exit(void* context) {
auto engine_callbacks_ptr = static_cast<EngineCallbacksSharedPtr*>(context);
delete engine_callbacks_ptr;
}

} // namespace

envoy_engine_callbacks EngineCallbacks::asEnvoyEngineCallbacks() {
return envoy_engine_callbacks{
.on_engine_running = &c_on_engine_running,
.on_exit = &c_on_exit,
.context = new EngineCallbacksSharedPtr(this->shared_from_this()),
};
}

} // namespace Platform
} // namespace Envoy
23 changes: 23 additions & 0 deletions library/cc/engine_callbacks.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#pragma once

#include <functional>
#include <memory>

#include "engine.h"
#include "library/common/types/c_types.h"

namespace Envoy {
namespace Platform {

struct EngineCallbacks : public std::enable_shared_from_this<EngineCallbacks> {
std::function<void()> on_engine_running;
// unused:
// std::function<void()> on_exit;

envoy_engine_callbacks asEnvoyEngineCallbacks();
};

using EngineCallbacksSharedPtr = std::shared_ptr<EngineCallbacks>;

} // namespace Platform
} // namespace Envoy
5 changes: 1 addition & 4 deletions library/cc/stream.cc
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
#include "stream.h"

#include <iostream>

#include "bridge_utility.h"
#include "library/common/main_interface.h"
#include "library/common/types/c_types.h"

namespace Envoy {
namespace Platform {

Stream::Stream(envoy_stream_t handle, StreamCallbacksSharedPtr callbacks)
: handle_(handle), callbacks_(callbacks) {}
Stream::Stream(envoy_stream_t handle) : handle_(handle) {}

Stream& Stream::sendHeaders(RequestHeadersSharedPtr headers, bool end_stream) {
envoy_headers raw_headers = rawHeaderMapAsEnvoyHeaders(headers->allHeaders());
Expand Down
6 changes: 4 additions & 2 deletions library/cc/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
namespace Envoy {
namespace Platform {

class Engine;
using EngineSharedPtr = std::shared_ptr<Engine>;

class Stream {
public:
Stream(envoy_stream_t handle, StreamCallbacksSharedPtr callbacks);
Stream(envoy_stream_t handle);

Stream& sendHeaders(RequestHeadersSharedPtr headers, bool end_stream);
Stream& sendData(envoy_data data);
Expand All @@ -22,7 +25,6 @@ class Stream {

private:
envoy_stream_t handle_;
StreamCallbacksSharedPtr callbacks_;
};

using StreamSharedPtr = std::shared_ptr<Stream>;
Expand Down
26 changes: 16 additions & 10 deletions library/cc/stream_callbacks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Platform {
namespace {

void* c_on_headers(envoy_headers headers, bool end_stream, void* context) {
auto stream_callbacks = static_cast<StreamCallbacks*>(context);
auto stream_callbacks = *static_cast<StreamCallbacksSharedPtr*>(context);
if (stream_callbacks->on_headers.has_value()) {
auto raw_headers = envoyHeadersAsRawHeaderMap(headers);
ResponseHeadersBuilder builder;
Expand All @@ -27,7 +27,7 @@ void* c_on_headers(envoy_headers headers, bool end_stream, void* context) {
}

void* c_on_data(envoy_data data, bool end_stream, void* context) {
auto stream_callbacks = static_cast<StreamCallbacks*>(context);
auto stream_callbacks = *static_cast<StreamCallbacksSharedPtr*>(context);
if (stream_callbacks->on_data.has_value()) {
auto on_data = stream_callbacks->on_data.value();
on_data(data, end_stream);
Expand All @@ -36,7 +36,7 @@ void* c_on_data(envoy_data data, bool end_stream, void* context) {
}

void* c_on_trailers(envoy_headers metadata, void* context) {
auto stream_callbacks = static_cast<StreamCallbacks*>(context);
auto stream_callbacks = *static_cast<StreamCallbacksSharedPtr*>(context);
if (stream_callbacks->on_trailers.has_value()) {
auto raw_headers = envoyHeadersAsRawHeaderMap(metadata);
ResponseTrailersBuilder builder;
Expand All @@ -50,7 +50,8 @@ void* c_on_trailers(envoy_headers metadata, void* context) {
}

void* c_on_error(envoy_error raw_error, void* context) {
auto stream_callbacks = static_cast<StreamCallbacks*>(context);
auto stream_callbacks_ptr = static_cast<StreamCallbacksSharedPtr*>(context);
auto stream_callbacks = *stream_callbacks_ptr;
if (stream_callbacks->on_error.has_value()) {
EnvoyErrorSharedPtr error = std::make_shared<EnvoyError>();
error->error_code = raw_error.error_code;
Expand All @@ -61,25 +62,30 @@ void* c_on_error(envoy_error raw_error, void* context) {
auto on_error = stream_callbacks->on_error.value();
on_error(error);
}
return context;
delete stream_callbacks_ptr;
return nullptr;
}

void* c_on_complete(void* context) {
auto stream_callbacks = static_cast<StreamCallbacks*>(context);
auto stream_callbacks_ptr = static_cast<StreamCallbacksSharedPtr*>(context);
auto stream_callbacks = *stream_callbacks_ptr;
if (stream_callbacks->on_complete.has_value()) {
auto on_complete = stream_callbacks->on_complete.value();
on_complete();
}
return context;
delete stream_callbacks_ptr;
return nullptr;
}

void* c_on_cancel(void* context) {
auto stream_callbacks = static_cast<StreamCallbacks*>(context);
auto stream_callbacks_ptr = static_cast<StreamCallbacksSharedPtr*>(context);
auto stream_callbacks = *stream_callbacks_ptr;
if (stream_callbacks->on_cancel.has_value()) {
auto on_cancel = stream_callbacks->on_cancel.value();
on_cancel();
}
return context;
delete stream_callbacks_ptr;
return nullptr;
}

} // namespace
Expand All @@ -93,7 +99,7 @@ envoy_http_callbacks StreamCallbacks::asEnvoyHttpCallbacks() {
.on_error = &c_on_error,
.on_complete = &c_on_complete,
.on_cancel = &c_on_cancel,
.context = this,
.context = new StreamCallbacksSharedPtr(this->shared_from_this()),
};
}

Expand Down
6 changes: 5 additions & 1 deletion library/cc/stream_callbacks.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,22 @@
#include "library/common/types/c_types.h"
#include "response_headers.h"
#include "response_trailers.h"
#include "stream.h"

namespace Envoy {
namespace Platform {

class Stream;
using StreamSharedPtr = std::shared_ptr<Stream>;

using OnHeadersCallback = std::function<void(ResponseHeadersSharedPtr headers, bool end_stream)>;
using OnDataCallback = std::function<void(envoy_data data, bool end_stream)>;
using OnTrailersCallback = std::function<void(ResponseTrailersSharedPtr trailers)>;
using OnErrorCallback = std::function<void(EnvoyErrorSharedPtr error)>;
using OnCompleteCallback = std::function<void()>;
using OnCancelCallback = std::function<void()>;

struct StreamCallbacks {
struct StreamCallbacks : public std::enable_shared_from_this<StreamCallbacks> {
absl::optional<OnHeadersCallback> on_headers;
absl::optional<OnDataCallback> on_data;
absl::optional<OnTrailersCallback> on_trailers;
Expand Down
2 changes: 1 addition & 1 deletion library/cc/stream_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace Envoy {
namespace Platform {

StreamClient::StreamClient(envoy_engine_t engine) : engine_(engine) {}
StreamClient::StreamClient(EngineSharedPtr engine) : engine_(engine) {}

StreamPrototypeSharedPtr StreamClient::newStreamPrototype() {
return std::make_shared<StreamPrototype>(this->engine_);
Expand Down
11 changes: 9 additions & 2 deletions library/cc/stream_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,26 @@

#include <memory>

#include "engine.h"
#include "stream_prototype.h"

namespace Envoy {
namespace Platform {

class Engine;
using EngineSharedPtr = std::shared_ptr<Engine>;

class StreamPrototype;
using StreamPrototypeSharedPtr = std::shared_ptr<StreamPrototype>;

class StreamClient {
public:
StreamClient(envoy_engine_t engine);
StreamClient(EngineSharedPtr engine);

StreamPrototypeSharedPtr newStreamPrototype();

private:
envoy_engine_t engine_;
EngineSharedPtr engine_;
};

using StreamClientSharedPtr = std::shared_ptr<StreamClient>;
Expand Down
9 changes: 4 additions & 5 deletions library/cc/stream_prototype.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
namespace Envoy {
namespace Platform {

StreamPrototype::StreamPrototype(envoy_engine_t engine) : engine_(engine) {
StreamPrototype::StreamPrototype(EngineSharedPtr engine) : engine_(engine) {
this->callbacks_ = std::make_shared<StreamCallbacks>();
}

StreamSharedPtr StreamPrototype::start() {
auto stream = init_stream(this->engine_);
start_stream(stream, this->callbacks_->asEnvoyHttpCallbacks());

return std::make_shared<Stream>(stream, this->callbacks_);
auto envoy_stream = init_stream(this->engine_->engine_);
start_stream(envoy_stream, this->callbacks_->asEnvoyHttpCallbacks());
return std::make_shared<Stream>(envoy_stream);
}

StreamPrototype& StreamPrototype::setOnHeaders(OnHeadersCallback closure) {
Expand Down
Loading