|  | 
|  | 1 | +// | 
|  | 2 | +// | 
|  | 3 | +// Tencent is pleased to support the open source community by making tRPC available. | 
|  | 4 | +// | 
|  | 5 | +// Copyright (C) 2023 Tencent. | 
|  | 6 | +// All rights reserved. | 
|  | 7 | +// | 
|  | 8 | +// If you have downloaded a copy of the tRPC source code from Tencent, | 
|  | 9 | +// please note that tRPC source code is licensed under the  Apache 2.0 License, | 
|  | 10 | +// A copy of the Apache 2.0 License is included in this file. | 
|  | 11 | +// | 
|  | 12 | +//#include <iostream> | 
|  | 13 | +#include <string> | 
|  | 14 | +#include <memory> | 
|  | 15 | +#include <chrono> | 
|  | 16 | + | 
|  | 17 | +#include "gflags/gflags.h" | 
|  | 18 | +#include "trpc/client/make_client_context.h" | 
|  | 19 | +#include "trpc/client/trpc_client.h" | 
|  | 20 | +#include "trpc/client/sse/http_sse_proxy.h" | 
|  | 21 | +#include "trpc/common/config/trpc_config.h" | 
|  | 22 | +#include "trpc/common/runtime_manager.h" | 
|  | 23 | +#include "trpc/common/status.h" | 
|  | 24 | +#include "trpc/coroutine/fiber.h" | 
|  | 25 | +#include "trpc/util/log/logging.h" | 
|  | 26 | +#include "trpc/util/http/sse/sse_event.h" | 
|  | 27 | + | 
|  | 28 | +DEFINE_string(service_name, "sse_client", "callee service name"); | 
|  | 29 | +DEFINE_string(client_config, "trpc_cpp_fiber.yaml", "path to client config"); | 
|  | 30 | +DEFINE_string(addr, "127.0.0.1:24856", "server ip:port"); | 
|  | 31 | +DEFINE_string(path, "/sse/test", "SSE URL path"); | 
|  | 32 | + | 
|  | 33 | +namespace http::sse_demo { | 
|  | 34 | + | 
|  | 35 | +using HttpSseProxyPtr = std::shared_ptr<::trpc::HttpSseProxy>; | 
|  | 36 | + | 
|  | 37 | +// Callback-based SSE client using streaming approach | 
|  | 38 | +bool StartSseClient(const HttpSseProxyPtr& proxy) { | 
|  | 39 | +    std::string url = "http://" + FLAGS_addr + FLAGS_path; | 
|  | 40 | +    TRPC_FMT_INFO("StartSseClient connecting to {}", url); | 
|  | 41 | + | 
|  | 42 | +    auto ctx = ::trpc::MakeClientContext(proxy); | 
|  | 43 | +     | 
|  | 44 | +    // Set very long timeout for the context | 
|  | 45 | +    ctx->SetTimeout(120000); // 120 seconds | 
|  | 46 | + | 
|  | 47 | +    // Create the reader | 
|  | 48 | +    auto reader = proxy->Get(ctx, url); | 
|  | 49 | +    if (!reader.IsValid()) { | 
|  | 50 | +        TRPC_FMT_ERROR("Failed to create SSE stream reader"); | 
|  | 51 | +        return false; | 
|  | 52 | +    } | 
|  | 53 | + | 
|  | 54 | +    // Event callback | 
|  | 55 | +    auto event_callback = [](const ::trpc::http::sse::SseEvent& event) { | 
|  | 56 | +        std::string id_str = event.id.has_value() ? event.id.value() : ""; | 
|  | 57 | +        TRPC_FMT_INFO("Received SSE event - id: {}, event: {}, data: {}", | 
|  | 58 | +                      id_str, event.event_type, event.data); | 
|  | 59 | +    }; | 
|  | 60 | + | 
|  | 61 | +    // Start streaming with the new non-blocking approach | 
|  | 62 | +    ::trpc::Status status = reader.StartStreaming(event_callback, 30000); // 30 second timeout for reads | 
|  | 63 | +    if (!status.OK()) { | 
|  | 64 | +        TRPC_FMT_ERROR("Failed to start SSE streaming: {}", status.ToString()); | 
|  | 65 | +        return false; | 
|  | 66 | +    } | 
|  | 67 | + | 
|  | 68 | +    TRPC_FMT_INFO("SSE client started successfully with streaming (callback-based)"); | 
|  | 69 | +     | 
|  | 70 | +    // Wait for events to be received | 
|  | 71 | +    ::trpc::FiberSleepFor(std::chrono::seconds(15)); // Wait for callback to print events | 
|  | 72 | + | 
|  | 73 | +    return true; | 
|  | 74 | +} | 
|  | 75 | + | 
|  | 76 | +// Manual read SSE client - using streaming approach | 
|  | 77 | +bool GetSseClient(const HttpSseProxyPtr& proxy) { | 
|  | 78 | +    std::string url = "http://" + FLAGS_addr + FLAGS_path; | 
|  | 79 | +    TRPC_FMT_INFO("GetSseClient connecting to {}", url); | 
|  | 80 | +    TRPC_FMT_DEBUG("Fiber Scheduler running: {}", ::trpc::IsRunningInFiberWorker()); | 
|  | 81 | + | 
|  | 82 | +    auto ctx = ::trpc::MakeClientContext(proxy); | 
|  | 83 | +     | 
|  | 84 | +    // Set very long timeout for the context | 
|  | 85 | +    ctx->SetTimeout(120000); // 120 seconds | 
|  | 86 | + | 
|  | 87 | +    // Create the reader | 
|  | 88 | +    ::trpc::HttpSseStreamReader reader = proxy->Get(ctx, url); | 
|  | 89 | +    if (!reader.IsValid()) { | 
|  | 90 | +        TRPC_FMT_ERROR("Failed to create SSE stream reader"); | 
|  | 91 | +        return false; | 
|  | 92 | +    } | 
|  | 93 | + | 
|  | 94 | +    // For manual reading, we'll use the streaming approach but with a different pattern | 
|  | 95 | +    TRPC_FMT_INFO("Using streaming approach for manual SSE reading"); | 
|  | 96 | +     | 
|  | 97 | +    // We'll use a flag to control the reading loop | 
|  | 98 | +    bool should_continue = true; | 
|  | 99 | +    int event_count = 0; | 
|  | 100 | +    const int max_events = 10; | 
|  | 101 | +     | 
|  | 102 | +    // Start streaming with a callback that stores events | 
|  | 103 | +    auto event_callback = [&should_continue, &event_count, max_events](const ::trpc::http::sse::SseEvent& event) { | 
|  | 104 | +        std::string id_str = event.id.has_value() ? event.id.value() : ""; | 
|  | 105 | +        TRPC_FMT_INFO("Received SSE event - id: {}, event: {}, data: {}", | 
|  | 106 | +                      id_str, event.event_type, event.data); | 
|  | 107 | +         | 
|  | 108 | +        event_count++; | 
|  | 109 | +        if (event_count >= max_events) { | 
|  | 110 | +            should_continue = false; | 
|  | 111 | +        } | 
|  | 112 | +    }; | 
|  | 113 | + | 
|  | 114 | +    // Start streaming | 
|  | 115 | +    ::trpc::Status status = reader.StartStreaming(event_callback, 30000); // 30 second timeout for reads | 
|  | 116 | +    if (!status.OK()) { | 
|  | 117 | +        TRPC_FMT_ERROR("Failed to start SSE streaming: {}", status.ToString()); | 
|  | 118 | +        return false; | 
|  | 119 | +    } | 
|  | 120 | + | 
|  | 121 | +    TRPC_FMT_INFO("SSE streaming started successfully (manual reading)"); | 
|  | 122 | +     | 
|  | 123 | +    // Wait for events to be received | 
|  | 124 | +    for (int i = 0; i < 15 && should_continue; i++) { | 
|  | 125 | +        ::trpc::FiberSleepFor(std::chrono::seconds(1)); | 
|  | 126 | +    } | 
|  | 127 | + | 
|  | 128 | +    return true; | 
|  | 129 | +} | 
|  | 130 | + | 
|  | 131 | +int Run() { | 
|  | 132 | +    bool final_ok = true; | 
|  | 133 | + | 
|  | 134 | +    ::trpc::ServiceProxyOption option; | 
|  | 135 | +    option.name = FLAGS_service_name; | 
|  | 136 | +    option.codec_name = "http"; | 
|  | 137 | +    option.network = "tcp"; | 
|  | 138 | +    option.conn_type = "long";  // Long connection | 
|  | 139 | +    option.timeout = 180000;    // 180 seconds timeout | 
|  | 140 | +    option.selector_name = "direct"; | 
|  | 141 | +    option.target = FLAGS_addr; | 
|  | 142 | + | 
|  | 143 | +    auto sse_client = ::trpc::GetTrpcClient()->GetProxy<::trpc::HttpSseProxy>(FLAGS_service_name, option); | 
|  | 144 | + | 
|  | 145 | +    TRPC_FMT_INFO("Testing SSE client with Start API (callback-based)"); | 
|  | 146 | +    if (!StartSseClient(sse_client)) final_ok = false; | 
|  | 147 | + | 
|  | 148 | +    ::trpc::FiberSleepFor(std::chrono::seconds(3)); | 
|  | 149 | + | 
|  | 150 | +    TRPC_FMT_INFO("Testing SSE client with Get API (manual reading)"); | 
|  | 151 | +    if (!GetSseClient(sse_client)) final_ok = false; | 
|  | 152 | + | 
|  | 153 | +    std::cout << "Final SSE result: " << final_ok << std::endl; | 
|  | 154 | +    return final_ok ? 0 : -1; | 
|  | 155 | +} | 
|  | 156 | + | 
|  | 157 | +}  // namespace http::sse_demo | 
|  | 158 | + | 
|  | 159 | +void ParseClientConfig(int argc, char* argv[]) { | 
|  | 160 | +    google::ParseCommandLineFlags(&argc, &argv, true); | 
|  | 161 | +    google::CommandLineFlagInfo info; | 
|  | 162 | +    if (GetCommandLineFlagInfo("client_config", &info) && info.is_default) { | 
|  | 163 | +        std::cerr << "start client with client_config, for example: " << argv[0] | 
|  | 164 | +                  << " --client_config=/client/client_config/filepath" << std::endl; | 
|  | 165 | +        exit(-1); | 
|  | 166 | +    } | 
|  | 167 | +    std::cout << "FLAGS_service_name: " << FLAGS_service_name << std::endl; | 
|  | 168 | +    std::cout << "FLAGS_client_config: " << FLAGS_client_config << std::endl; | 
|  | 169 | +    std::cout << "FLAGS_addr: " << FLAGS_addr << std::endl; | 
|  | 170 | +    std::cout << "FLAGS_path: " << FLAGS_path << std::endl; | 
|  | 171 | +} | 
|  | 172 | + | 
|  | 173 | +int main(int argc, char* argv[]) { | 
|  | 174 | +    ParseClientConfig(argc, argv); | 
|  | 175 | + | 
|  | 176 | +    if (::trpc::TrpcConfig::GetInstance()->Init(FLAGS_client_config) != 0) { | 
|  | 177 | +        std::cerr << "load client_config failed." << std::endl; | 
|  | 178 | +        return -1; | 
|  | 179 | +    } | 
|  | 180 | + | 
|  | 181 | +    return ::trpc::RunInTrpcRuntime([]() { return http::sse_demo::Run(); }); | 
|  | 182 | +} | 
0 commit comments