Skip to content

Commit

Permalink
grpc-online-streaming (#506)
Browse files Browse the repository at this point in the history
* add grpc submodel

* common change

* change for grpc

* compile ok

* fix bug on grpc

* remove io thread, and fix code format

* fix bug

* code style

* code style

* fix bug

* Update sherpa/cpp_api/grpc/online-grpc-client-impl.cc

Co-authored-by: Fangjun Kuang <[email protected]>

* Update sherpa/cpp_api/grpc/online-grpc-client-impl.h

Co-authored-by: Fangjun Kuang <[email protected]>

* Update sherpa/cpp_api/grpc/online-grpc-client.cc

Co-authored-by: Fangjun Kuang <[email protected]>

* Update sherpa/cpp_api/grpc/online-grpc-client.cc

Co-authored-by: Fangjun Kuang <[email protected]>

* Update sherpa/cpp_api/grpc/online-grpc-client.cc

Co-authored-by: Fangjun Kuang <[email protected]>

* Update sherpa/cpp_api/grpc/online-grpc-client.cc

Co-authored-by: Fangjun Kuang <[email protected]>

* Update sherpa/cpp_api/grpc/online-grpc-client.cc

Co-authored-by: Fangjun Kuang <[email protected]>

* Update sherpa/cpp_api/grpc/online-grpc-server-impl.cc

Co-authored-by: Fangjun Kuang <[email protected]>

* code style and modify client input

* fix bug

* Update sherpa/cpp_api/grpc/online-grpc-client.cc

Co-authored-by: Fangjun Kuang <[email protected]>

* Apply suggestions from code review

Co-authored-by: Fangjun Kuang <[email protected]>

* minor change

* minor change

---------

Co-authored-by: Fangjun Kuang <[email protected]>
  • Loading branch information
y00281951 and csukuangfj authored Jan 12, 2024
1 parent 62f9697 commit 35748c6
Show file tree
Hide file tree
Showing 11 changed files with 1,112 additions and 1 deletion.
11 changes: 10 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ set(CMAKE_CXX_EXTENSIONS OFF)
option(SHERPA_ENABLE_TESTS "Whether to build tests" OFF)
option(SHERPA_ENABLE_PORTAUDIO "Whether to build with portaudio" ON)
option(SHERPA_ENABLE_WEBSOCKET "Whether to build with websocket" ON)
option(SHERPA_ENABLE_GRPC "Whether to build with grpc" OFF)
option(BUILD_SHARED_LIBS "Whether to build shared libraries" ON)

message(STATUS "SHERPA_ENABLE_TESTS: ${SHERPA_ENABLE_TESTS}")
message(STATUS "SHERPA_ENABLE_PORTAUDIO: ${SHERPA_ENABLE_PORTAUDIO}")
message(STATUS "SHERPA_ENABLE_WEBSOCKET: ${SHERPA_ENABLE_WEBSOCKET}")
message(STATUS "SHERPA_ENABLE_GRPC: ${SHERPA_ENABLE_GRPC}")

if(BUILD_SHARED_LIBS AND MSVC)
set(CMAKE_WINDOWS_EXPORT_ALL_SYMBOLS ON)
Expand Down Expand Up @@ -147,11 +149,18 @@ if(SHERPA_ENABLE_PORTAUDIO)
include(portaudio)
endif()

if(SHERPA_ENABLE_WEBSOCKET)
if(SHERPA_ENABLE_WEBSOCKET OR SHERPA_ENABLE_GRPC)
include(asio)
endif()

if(SHERPA_ENABLE_WEBSOCKET)
include(websocketpp)
endif()

if(SHERPA_ENABLE_GRPC)
include(grpc)
endif()

if(SHERPA_ENABLE_TESTS)
enable_testing()
include(googletest)
Expand Down
17 changes: 17 additions & 0 deletions cmake/grpc.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
function(download_grpc)
message(STATUS "Using gRPC via add_subdirectory")
include(FetchContent)
#SET(CMAKE_CXX_FLAGS "-DBUILD_SHARED_LIBS=ON")

set(ABSL_ENABLE_INSTALL ON)
FetchContent_Declare(gRPC
GIT_REPOSITORY https://github.com/grpc/grpc
GIT_TAG v1.57.0
)
set(FETCHCONTENT_QUIET OFF)
FetchContent_MakeAvailable(gRPC)

message(STATUS "grpc is downloaded to ${grpc_SOURCE_DIR}")
endfunction()

download_grpc()
4 changes: 4 additions & 0 deletions sherpa/cpp_api/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,7 @@ add_subdirectory(bin)
if(SHERPA_ENABLE_WEBSOCKET)
add_subdirectory(websocket)
endif()

if(SHERPA_ENABLE_GRPC)
add_subdirectory(grpc)
endif()
117 changes: 117 additions & 0 deletions sherpa/cpp_api/grpc/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
add_definitions(-DASIO_STANDALONE)

# compile sherpo.proto
set(PROTO_DIR "${CMAKE_CURRENT_BINARY_DIR}")
set(PROTO_IN "${CMAKE_CURRENT_SOURCE_DIR}")
set(grpc_BINARY_DIR ${CMAKE_RUNTIME_OUTPUT_DIRECTORY})
set(grpc_LIBRARY_DIR ${CMAKE_LIBRARY_OUTPUT_DIRECTORY})
include_directories(${CMAKE_BINARY_DIR})
add_custom_command(
OUTPUT ${PROTO_DIR}/sherpa.pb.cc
${PROTO_DIR}/sherpa.pb.h
${PROTO_DIR}/sherpa.grpc.pb.cc
${PROTO_DIR}/sherpa.grpc.pb.h
COMMAND ${grpc_BINARY_DIR}/protoc
ARGS --grpc_out "${PROTO_DIR}"
--cpp_out "${PROTO_DIR}"
-I "${PROTO_IN}"
--plugin=protoc-gen-grpc=${grpc_BINARY_DIR}/grpc_cpp_plugin
sherpa.proto)

add_executable(sherpa-online-grpc-server
online-grpc-server.cc
online-grpc-server-impl.cc
${PROTO_DIR}/sherpa.pb.cc
${PROTO_DIR}/sherpa.grpc.pb.cc
)
target_link_libraries(sherpa-online-grpc-server sherpa_cpp_api grpc++ grpc++_reflection)

if(NOT WIN32)
target_link_libraries(sherpa-online-grpc-server -pthread)
target_compile_options(sherpa-online-grpc-server PRIVATE -Wno-deprecated-declarations)
endif()

add_executable(sherpa-online-grpc-client
online-grpc-client.cc
online-grpc-client-impl.cc
${PROTO_DIR}/sherpa.pb.cc
${PROTO_DIR}/sherpa.grpc.pb.cc
)

target_link_libraries(sherpa-online-grpc-client
sherpa_core
kaldi_native_io_core
grpc++
grpc++_reflection
)

if(NOT WIN32)
target_link_libraries(sherpa-online-grpc-client -pthread)
endif()

set(bins
sherpa-online-grpc-server
sherpa-online-grpc-client
)

if(NOT WIN32)
if(NOT DEFINED ENV{VIRTUAL_ENV})
message(STATUS "Outside a virtual environment")
execute_process(
COMMAND "${PYTHON_EXECUTABLE}" -c "import site; print(';'.join(site.getsitepackages()))"
OUTPUT_STRIP_TRAILING_WHITESPACE
OUTPUT_VARIABLE path_list
)
else()
message(STATUS "Inside a virtual environment")
execute_process(
COMMAND "${PYTHON_EXECUTABLE}" -c "from distutils.sysconfig import get_python_lib; print(get_python_lib())"
OUTPUT_STRIP_TRAILING_WHITESPACE
OUTPUT_VARIABLE PYTHON_SITE_PACKAGE_DIR
)
set(path_list ${PYTHON_SITE_PACKAGE_DIR})
endif()

message(STATUS "path list: ${path_list}")
foreach(p IN LISTS path_list)
foreach(exe IN LISTS bins)
target_link_libraries(${exe} "-Wl,-rpath,${p}/sherpa/lib")
target_link_libraries(${exe} "-Wl,-rpath,${p}/../lib")
endforeach()
endforeach()

foreach(exe IN LISTS bins)
target_link_libraries(${exe} "-Wl,-rpath,${SHERPA_RPATH_ORIGIN}/../lib")
endforeach()

# add additional paths
set(additional_paths
${SHERPA_RPATH_ORIGIN}/../lib/python${PYTHON_VERSION_MAJOR}.${PYTHON_VERSION_MINOR}/site-packages/torch/lib
${SHERPA_RPATH_ORIGIN}/../lib/python${PYTHON_VERSION_MAJOR}.${PYTHON_VERSION_MINOR}/site-packages/torch/lib64
${SHERPA_RPATH_ORIGIN}/../lib/python${PYTHON_VERSION_MAJOR}.${PYTHON_VERSION_MINOR}/site-packages/k2/lib
${SHERPA_RPATH_ORIGIN}/../lib/python${PYTHON_VERSION_MAJOR}.${PYTHON_VERSION_MINOR}/site-packages/k2/lib64
${SHERPA_RPATH_ORIGIN}/../lib/python${PYTHON_VERSION_MAJOR}.${PYTHON_VERSION_MINOR}/site-packages/kaldifeat/lib
${SHERPA_RPATH_ORIGIN}/../lib/python${PYTHON_VERSION_MAJOR}.${PYTHON_VERSION_MINOR}/site-packages/kaldifeat/lib64
${SHERPA_RPATH_ORIGIN}/../lib/python${PYTHON_VERSION_MAJOR}.${PYTHON_VERSION_MINOR}/site-packages/sherpa/lib
${SHERPA_RPATH_ORIGIN}/../lib/python${PYTHON_VERSION_MAJOR}.${PYTHON_VERSION_MINOR}/site-packages/sherpa/lib64
${SHERPA_RPATH_ORIGIN}/../lib/python${PYTHON_VERSION_MAJOR}.${PYTHON_VERSION_MINOR}/dist-packages/torch/lib
${SHERPA_RPATH_ORIGIN}/../lib/python${PYTHON_VERSION_MAJOR}.${PYTHON_VERSION_MINOR}/dist-packages/torch/lib64
${SHERPA_RPATH_ORIGIN}/../lib/python${PYTHON_VERSION_MAJOR}.${PYTHON_VERSION_MINOR}/dist-packages/k2/lib
${SHERPA_RPATH_ORIGIN}/../lib/python${PYTHON_VERSION_MAJOR}.${PYTHON_VERSION_MINOR}/dist-packages/k2/lib64
${SHERPA_RPATH_ORIGIN}/../lib/python${PYTHON_VERSION_MAJOR}.${PYTHON_VERSION_MINOR}/dist-packages/kaldifeat/lib
${SHERPA_RPATH_ORIGIN}/../lib/python${PYTHON_VERSION_MAJOR}.${PYTHON_VERSION_MINOR}/dist-packages/kaldifeat/lib64
${SHERPA_RPATH_ORIGIN}/../lib/python${PYTHON_VERSION_MAJOR}.${PYTHON_VERSION_MINOR}/dist-packages/sherpa/lib
${SHERPA_RPATH_ORIGIN}/../lib/python${PYTHON_VERSION_MAJOR}.${PYTHON_VERSION_MINOR}/dist-packages/sherpa/lib64
)
message(STATUS "additional_paths: ${additional_paths}")
foreach(p IN LISTS additional_paths)
foreach(exe IN LISTS bins)
target_link_libraries(${exe} "-Wl,-rpath,${p}")
target_link_libraries(${exe} "-Wl,-rpath,${p}")
endforeach()
endforeach()
endif()

install(TARGETS ${bins}
DESTINATION bin
)
85 changes: 85 additions & 0 deletions sherpa/cpp_api/grpc/online-grpc-client-impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) 2021 Ximalaya Speech Team (Xiang Lyu)
// 2023 y00281951
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "sherpa/cpp_api/grpc/online-grpc-client-impl.h"
#include "sherpa/csrc/log.h"

namespace sherpa {
using grpc::Channel;
using grpc::ClientContext;
using grpc::ClientReaderWriter;
using grpc::Status;

GrpcClient::GrpcClient(const std::string& host,
int32_t port,
int32_t nbest,
const std::string& reqid)
: host_(host),
port_(port),
nbest_(nbest),
reqid_(reqid) {
Connect();
t_ = std::make_unique<std::thread>(&GrpcClient::ReadLoopFunc, this);
}

void GrpcClient::Connect() {
channel_ = grpc::CreateChannel(host_ + ":" + std::to_string(port_),
grpc::InsecureChannelCredentials());
stub_ = ASR::NewStub(channel_);
context_ = std::make_unique<ClientContext>();
stream_ = stub_->Recognize(context_.get());
request_ = std::make_unique<Request>();
response_ = std::make_unique<Response>();
request_->mutable_decode_config()->set_nbest_config(nbest_);
request_->mutable_decode_config()->set_reqid(reqid_);
stream_->Write(*request_);
}

void GrpcClient::SendBinaryData(const void* data, size_t size) {
const int16_t* pdata = reinterpret_cast<const int16_t*>(data);
request_->set_audio_data(pdata, size);
stream_->Write(*request_);
}

void GrpcClient::ReadLoopFunc() {
try {
while (stream_->Read(response_.get())) {
for (int32_t i = 0; i < response_->nbest_size(); i++) {
// you can also traverse wordpieces like demonstrated above
SHERPA_LOG(INFO) << i + 1 << "best " << response_->nbest(i).sentence();
}
if (response_->status() != Response_Status_ok) {
break;
}
if (response_->type() == Response_Type_speech_end) {
done_ = true;
break;
}
}
} catch (std::exception const& e) {
SHERPA_LOG(ERROR) << e.what();
}
}

void GrpcClient::Join() {
stream_->WritesDone();
t_->join();
Status status = stream_->Finish();
if (!status.ok()) {
SHERPA_LOG(INFO) << "Recognize rpc failed.";
}
}
} // namespace sherpa

72 changes: 72 additions & 0 deletions sherpa/cpp_api/grpc/online-grpc-client-impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) 2021 Ximalaya Speech Team (Xiang Lyu)
// 2023 y00281951
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef SHERPA_CPP_API_GRPC_ONLINE_GRPC_CLIENT_IMPL_H_
#define SHERPA_CPP_API_GRPC_ONLINE_GRPC_CLIENT_IMPL_H_

#include <iostream>
#include <memory>
#include <string>
#include <thread> // NOLINT

#include "grpc/grpc.h"
#include "grpcpp/channel.h"
#include "grpcpp/client_context.h"
#include "grpcpp/create_channel.h"

#include "sherpa/csrc/log.h"
#include "sherpa/cpp_api/grpc/sherpa.grpc.pb.h"

namespace sherpa {

using grpc::Channel;
using grpc::ClientContext;
using grpc::ClientReaderWriter;

class GrpcClient {
public:
GrpcClient(const std::string& host,
int32_t port,
int32_t nbest,
const std::string& reqid);

void SendBinaryData(const void* data, size_t size);
void SetKey(const std::string& key) { key_ = key; }
void Join();
bool Done() const { return done_; }


private:
void ReadLoopFunc();
void Connect();
std::string host_;
int32_t port_;
int32_t nbest_;
std::string reqid_;
std::string key_;
bool done_ = false;

std::shared_ptr<Channel> channel_;
std::unique_ptr<ASR::Stub> stub_;
std::unique_ptr<ClientContext> context_;
std::unique_ptr<ClientReaderWriter<Request, Response>> stream_;
std::unique_ptr<Request> request_;
std::unique_ptr<Response> response_;
std::unique_ptr<std::thread> t_;
};

} // namespace sherpa

#endif // SHERPA_CPP_API_GRPC_ONLINE_GRPC_CLIENT_IMPL_H_
Loading

0 comments on commit 35748c6

Please sign in to comment.