Skip to content

Commit

Permalink
adding support for sideband streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
asumit committed Oct 24, 2024
1 parent 4e6c3eb commit 9b1f514
Show file tree
Hide file tree
Showing 10 changed files with 500 additions and 2 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@
[submodule "third_party/utfcpp"]
path = third_party/utfcpp
url = https://github.com/nemtrif/utfcpp
[submodule "third_party/grpc-sideband"]
path = third_party/grpc-sideband
url = https://github.com/ni/grpc-sideband.git
26 changes: 25 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ endif()
option(USE_NILRT_LEGACY_TOOLCHAIN "Enable to use tools and libraries from a NILRT compile toolchain." OFF)
option(USE_PYTHON_VIRTUALENV "Enable to use the automatically-generated python venv, local to this project source." ON)
option(USE_SUBMODULE_LIBS "Enable to link against the submodule libraries, rather than their native-OS equivalents." ON)
option(INCLUDE_SIDEBAND_RDMA "Include support for RDMA sideband transfers" OFF)
option(SIDEBAND_STATIC "Build static library, disabled by default" ON)

#----------------------------------------------------------------------
# Setup build dependencies, according to the toolchain options.
#----------------------------------------------------------------------

add_subdirectory(third_party/grpc-sideband ${CMAKE_CURRENT_BINARY_DIR}/grpc-sideband)
if(USE_SUBMODULE_LIBS)
# The archetypical WIN32 build case
# protobuf_INSTALL must be turned OFF whenever building it as a cmake subdir.
Expand Down Expand Up @@ -117,6 +119,8 @@ include_directories(
"./generated"
"./imports/include"
"./source"
"./third_party/grpc-sideband/src"
"./third_party/grpc-sideband/moniker_src"
)
if(WIN32)
link_directories("./imports/lib/win64")
Expand Down Expand Up @@ -272,6 +276,7 @@ get_filename_component(nidevice_proto "imports/protobuf/nidevice.proto" ABSOLUTE
get_filename_component(deviceid_restricted_proto "source/protobuf_restricted/deviceid_restricted.proto" ABSOLUTE)
get_filename_component(debugsessionproperties_restricted_proto "source/protobuf_restricted/debugsessionproperties_restricted.proto" ABSOLUTE)
get_filename_component(calibrationoperations_restricted_proto "source/protobuf_restricted/calibrationoperations_restricted.proto" ABSOLUTE)
get_filename_component(data_moniker_proto "source/protobuf/data_moniker.proto" ABSOLUTE)
get_filename_component(session_proto_path "${session_proto}" PATH)

#----------------------------------------------------------------------
Expand Down Expand Up @@ -299,6 +304,7 @@ function(GenerateGrpcSources)
-I "${proto_path}"
${protobuf_includes_arg}
-I ${CMAKE_SOURCE_DIR}/imports/protobuf
-I ${CMAKE_SOURCE_DIR}/source/protobuf
--plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}"
"${proto_file}"
DEPENDS "${proto_file}" "${session_proto}")
Expand Down Expand Up @@ -326,6 +332,10 @@ set(calibrationoperations_restricted_proto_srcs "${proto_srcs_dir}/calibrationop
set(calibrationoperations_restricted_proto_hdrs "${proto_srcs_dir}/calibrationoperations_restricted.pb.h")
set(calibrationoperations_restricted_grpc_srcs "${proto_srcs_dir}/calibrationoperations_restricted.grpc.pb.cc")
set(calibrationoperations_restricted_grpc_hdrs "${proto_srcs_dir}/calibrationoperations_restricted.grpc.pb.h")
set(data_moniker_proto_srcs "${proto_srcs_dir}/data_moniker.pb.cc")
set(data_moniker_proto_hdrs "${proto_srcs_dir}/data_moniker.pb.h")
set(data_moniker_grpc_srcs "${proto_srcs_dir}/data_moniker.grpc.pb.cc")
set(data_moniker_grpc_hdrs "${proto_srcs_dir}/data_moniker.grpc.pb.h")

GenerateGrpcSources(
PROTO
Expand Down Expand Up @@ -400,6 +410,16 @@ set(nidriver_service_library_hdrs
"${calibrationoperations_restricted_grpc_hdrs}"
)

GenerateGrpcSources(
PROTO
${data_moniker_proto}
OUTPUT
"${data_moniker_proto_srcs}"
"${data_moniker_proto_hdrs}"
"${data_moniker_grpc_srcs}"
"${data_moniker_grpc_hdrs}"
)

foreach(api ${nidrivers})
GenerateGrpcSources(
PROTO
Expand Down Expand Up @@ -444,6 +464,9 @@ add_executable(ni_grpc_device_server
"source/server/syscfg_library.cpp"
"source/server/syscfg_resource_accessor.cpp"
"source/server/syscfg_session_handler.cpp"
"source/server/data_moniker_service.cpp"
${data_moniker_proto_srcs}
${data_moniker_grpc_srcs}
${nidevice_proto_srcs}
${session_proto_srcs}
${session_grpc_srcs}
Expand Down Expand Up @@ -528,6 +551,7 @@ set(server_lib_deps
${_REFLECTION}
${_UTF8CPP}
${CMAKE_DL_LIBS}
ni_grpc_sideband
nlohmann_json::nlohmann_json
)

Expand Down
79 changes: 79 additions & 0 deletions source/protobuf/data_moniker.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
syntax = "proto3";

option cc_enable_arenas = true;
option csharp_namespace = "NationalInstruments.DataMonikers";

package ni.data_monikers;

import "google/protobuf/any.proto";

service DataMoniker {
rpc BeginSidebandStream(BeginMonikerSidebandStreamRequest) returns (BeginMonikerSidebandStreamResponse) {};
rpc StreamReadWrite(stream MonikerWriteRequest) returns (stream MonikerReadResponse) {};
rpc StreamRead(MonikerList) returns (stream MonikerReadResponse) {};
rpc StreamWrite(stream MonikerWriteRequest) returns (stream StreamWriteResponse) {};
}

enum SidebandStrategy
{
UNKNOWN = 0;
GRPC = 1;
SHARED_MEMORY = 2;
DOUBLE_BUFFERED_SHARED_MEMORY = 3;
SOCKETS = 4;
SOCKETS_LOW_LATENCY = 5;
HYPERVISOR_SOCKETS = 6;
RDMA = 7;
RDMA_LOW_LATENCY = 8;
}

message BeginMonikerSidebandStreamRequest {
SidebandStrategy strategy = 1;
MonikerList monikers = 2;
}

message BeginMonikerSidebandStreamResponse {
SidebandStrategy strategy = 1;
string connection_url = 2;
string sideband_identifier = 3;
sint64 buffer_size = 4;
}

message Moniker {
string service_location = 1;
string data_source = 2;
int64 data_instance = 3;
}

message MonikerWriteRequest {
oneof write_data {
MonikerList monikers = 1;
MonikerValues data = 2;
}
}

message MonikerReadResponse {
MonikerValues data = 1;
}

message MonikerList {
repeated Moniker read_monikers = 2;
repeated Moniker write_monikers = 3;
}

message MonikerValues {
repeated google.protobuf.Any values = 1;
}

message SidebandWriteRequest {
bool cancel = 1;
MonikerValues values = 2;
}

message SidebandReadResponse {
bool cancel = 1;
MonikerValues values = 2;
}

message StreamWriteResponse {
}
35 changes: 34 additions & 1 deletion source/server/core_server.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/grpcpp.h>
#include <register_all_services.h>

#include <sideband_data.h>
#include <mutex>
#include <thread>

#include "feature_toggles.h"
#include "logging.h"
Expand All @@ -12,6 +13,7 @@
#if defined(__GNUC__)
#include "linux/daemonize.h"
#include "linux/syslog_logging.h"
#include <sys/mman.h>
#endif
#if defined(_WIN32)
#include "windows/console_ctrl_handler.h"
Expand All @@ -24,6 +26,7 @@ using FeatureState = nidevice_grpc::FeatureToggles::FeatureState;
struct ServerConfiguration {
std::string config_file_path;
std::string server_address;
std::string sideband_ip;
std::string server_cert;
std::string server_key;
std::string root_cert;
Expand All @@ -41,6 +44,7 @@ static ServerConfiguration GetConfiguration(const std::string& config_file_path)

config.config_file_path = server_config_parser.get_config_file_path();
config.server_address = server_config_parser.parse_address();
config.sideband_ip = server_config_parser.parse_sideband_ip();
config.server_cert = server_config_parser.parse_server_cert();
config.server_key = server_config_parser.parse_server_key();
config.root_cert = server_config_parser.parse_root_cert();
Expand Down Expand Up @@ -103,6 +107,10 @@ static void RunServer(const ServerConfiguration& config)
server = builder.BuildAndStart();
}

auto sideband_socket_thread = new std::thread(RunSidebandSocketsAccept, config.sideband_ip.c_str(), 50055);
// auto sideband_rdma_send_thread = new std::thread(AcceptSidebandRdmaSendRequests);
// auto sideband_rdma_recv_thread = new std::thread(AcceptSidebandRdmaReceiveRequests);

if (!server) {
nidevice_grpc::logging::log(
nidevice_grpc::logging::Level_Error,
Expand Down Expand Up @@ -221,6 +229,14 @@ Options parse_options(int argc, char** argv)
return options;
}

static void SysFsWrite(const std::string& fileName, const std::string& value)
{
std::ofstream fout;
fout.open(fileName);
fout << value;
fout.close();
}

int main(int argc, char** argv)
{
auto options = parse_options(argc, argv);
Expand All @@ -239,6 +255,23 @@ int main(int argc, char** argv)
#if defined(_WIN32)
nidevice_grpc::set_console_ctrl_handler(&StopServer);
#endif
#ifndef _WIN32
SysFsWrite("/dev/cgroup/cpuset/system_set/cpus", "0-5");
SysFsWrite("/dev/cgroup/cpuset/LabVIEW_ScanEngine_set", "0-5");
SysFsWrite("/dev/cgroup/cpuset/LabVIEW_tl_set/cpus", "6-8");
SysFsWrite("/dev/cgroup/cpuset/LabVIEW_tl_set/cpu_exclusive", "1");

sched_param schedParam;
schedParam.sched_priority = 95;
sched_setscheduler(0, SCHED_FIFO, &schedParam);

cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(6, &cpuSet);
sched_setaffinity(0, sizeof(cpu_set_t), &cpuSet);

mlockall(MCL_CURRENT|MCL_FUTURE);
#endif

RunServer(config);
return EXIT_SUCCESS;
Expand Down
5 changes: 5 additions & 0 deletions source/server/core_services_registrar.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "debug_session_properties_restricted_service_registrar.h"
#include "server_reset_observer_registrar_interface.h"
#include "session_utilities_service_registrar.h"
#include "data_moniker_service.h"

namespace nidevice_grpc {
void register_core_services(
Expand All @@ -25,5 +26,9 @@ void register_core_services(
server_builder,
feature_toggles,
*server_reset_observer_registrar));

auto moniker_service = std::make_shared<ni::data_monikers::DataMonikerService>();
server_builder.RegisterService(moniker_service.get());
service_vector->push_back(moniker_service);
}
} // namespace nidevice_grpc
Loading

0 comments on commit 9b1f514

Please sign in to comment.