Skip to content

Commit

Permalink
modified the logic and renamed the struct
Browse files Browse the repository at this point in the history
  • Loading branch information
Raghav Rawat committed Dec 5, 2024
1 parent c4eec83 commit 083c3fd
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 60 deletions.
6 changes: 3 additions & 3 deletions source/config/localhost_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
"root_cert": ""
},
"core_configuration": {
"sideband_read_write_core": 8,
"stream_write_core": 1,
"server_run_core": 6
"sideband_read_write_core": 0,
"stream_write_core": 0,
"server_run_core": 0
}
}
14 changes: 0 additions & 14 deletions source/server/core_configuration.h

This file was deleted.

20 changes: 5 additions & 15 deletions source/server/core_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include "logging.h"
#include "server_configuration_parser.h"
#include "server_security_configuration.h"
#include "core_configuration.h"
#include "streaming_core_configuration.h"

#if defined(__GNUC__)
#include <sys/mman.h>
Expand Down Expand Up @@ -38,7 +38,7 @@ struct ServerConfiguration {
int max_message_size;
int sideband_port;
nidevice_grpc::FeatureToggles feature_toggles;
CoreConfiguration core_config;
StreamingCoreConfiguration streaming_core_config;
};

static ServerConfiguration GetConfiguration(const std::string& config_file_path)
Expand All @@ -53,7 +53,7 @@ static ServerConfiguration GetConfiguration(const std::string& config_file_path)
config.server_address = server_config_parser.parse_address();
config.sideband_address = server_config_parser.parse_sideband_address();
config.sideband_port = server_config_parser.parse_sideband_port();
config.core_config = server_config_parser.parse_core_configuration();
config.streaming_core_config = server_config_parser.parse_streaming_core_configuration();
config.server_cert = server_config_parser.parse_server_cert();
config.server_key = server_config_parser.parse_server_key();
config.root_cert = server_config_parser.parse_root_cert();
Expand All @@ -72,16 +72,6 @@ static ServerConfiguration GetConfiguration(const std::string& config_file_path)
static std::mutex server_mutex;
static std::unique_ptr<grpc::Server> server;
static bool shutdown = false;
int s_SidebandReadWriteCore;
int s_StreamWriteCore;
int s_ServerRunCore;

void SetCoreConfiguration(CoreConfiguration core_config)
{
s_SidebandReadWriteCore = core_config.sideband_read_write_core;
s_StreamWriteCore = core_config.stream_write_core;
s_ServerRunCore = core_config.server_run_core;
}

static void StopServer()
{
Expand Down Expand Up @@ -126,7 +116,7 @@ static void RunServer(const ServerConfiguration& config)
server = builder.BuildAndStart();
if (ni::data_monikers::is_sideband_streaming_enabled(config.feature_toggles)) {
auto sideband_socket_thread = new std::thread(RunSidebandSocketsAccept, config.sideband_address.c_str(), config.sideband_port);
SetCoreConfiguration(config.core_config);
ni::data_monikers::configure_streaming_cores_for_cpu_pinning(config.streaming_core_config);
// auto sideband_rdma_send_thread = new std::thread(AcceptSidebandRdmaSendRequests);
// auto sideband_rdma_recv_thread = new std::thread(AcceptSidebandRdmaReceiveRequests);
}
Expand Down Expand Up @@ -286,7 +276,7 @@ int main(int argc, char** argv)
schedParam.sched_priority = 95;
sched_setscheduler(0, SCHED_FIFO, &schedParam);

if (s_ServerRunCore >= 0) {
if (config.streaming_core_config.server_run_core >= 0) {
cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(s_ServerRunCore, &cpuSet);
Expand Down
12 changes: 10 additions & 2 deletions source/server/data_moniker_service.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//---------------------------------------------------------------------
//---------------------------------------------------------------------
#include "data_moniker_service.h"
#include "streaming_core_configuration.h"

#include <sideband_data.h>
#include <sideband_grpc.h>
Expand Down Expand Up @@ -45,6 +46,13 @@ static void SysFsWrite(const std::string& fileName, const std::string& value)

namespace ni::data_monikers {

static StreamingCoreConfiguration s_StreamingCoreConfig;

void configure_streaming_cores_for_cpu_pinning(const StreamingCoreConfiguration& streaming_core_config)
{
s_StreamingCoreConfig = streaming_core_config;
}

bool is_sideband_streaming_enabled(const nidevice_grpc::FeatureToggles& feature_toggles)
{
return feature_toggles.is_feature_enabled("sideband_streaming", nidevice_grpc::FeatureToggles::CodeReadiness::kNextRelease);
Expand Down Expand Up @@ -109,7 +117,7 @@ void DataMonikerService::RunSidebandReadWriteLoop(string sidebandIdentifier, ::S
{
#ifndef _WIN32
if ((strategy == ::SidebandStrategy::RDMA_LOW_LATENCY ||
strategy == ::SidebandStrategy::SOCKETS_LOW_LATENCY) && s_SidebandReadWriteCore >= 0) {
strategy == ::SidebandStrategy::SOCKETS_LOW_LATENCY) && s_StreamingCoreConfig.sideband_read_write_core >= 0) {
pid_t threadId = syscall(SYS_gettid);
::SysFsWrite("/dev/cgroup/cpuset/LabVIEW_tl_set/tasks", std::to_string(threadId));

Expand Down Expand Up @@ -240,7 +248,7 @@ Status DataMonikerService::StreamRead(ServerContext* context, const MonikerList*
Status DataMonikerService::StreamWrite(ServerContext* context, ServerReaderWriter<StreamWriteResponse, MonikerWriteRequest>* stream)
{
#ifndef _WIN32
if(s_StreamWriteCore >= 0) {
if(s_StreamingCoreConfig.stream_write_core >= 0) {
cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(s_StreamWriteCore, &cpuSet);
Expand Down
2 changes: 2 additions & 0 deletions source/server/data_moniker_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
#include <map>
#include <sideband_data.h>
#include "feature_toggles.h"
#include "streaming_core_configuration.h"

//---------------------------------------------------------------------
//---------------------------------------------------------------------
namespace ni::data_monikers
{
void configure_streaming_cores_for_cpu_pinning(const StreamingCoreConfiguration& streaming_core_config);
bool is_sideband_streaming_enabled(const nidevice_grpc::FeatureToggles& feature_toggles);
//---------------------------------------------------------------------
//---------------------------------------------------------------------
Expand Down
36 changes: 18 additions & 18 deletions source/server/server_configuration_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <sstream>

#include "feature_toggles.h"
#include "core_configuration.h"
#include "streaming_core_configuration.h"

#if defined(_MSC_VER)
#include <windows.h>
Expand All @@ -20,7 +20,7 @@ static const char* kAddressJsonKey = "address";
static const char* kPortJsonKey = "port";
static const char* kSidebandAddressJsonKey = "sideband_address";
static const char* kSidebandPortJsonKey = "sideband_port";
static const char* kCoreConfigurationKey = "core_configuration";
static const char* kStreamingCoreConfigurationKey = "streaming_core_configuration";
static const char* kSidebandReadWriteCoreKey = "sideband_read_write_core";
static const char* kStreamWriteCoreKey = "stream_write_core";
static const char* kServerRunCoreKey = "server_run_core";
Expand Down Expand Up @@ -292,26 +292,26 @@ int ServerConfigurationParser::parse_port_with_key(const std::string& key) const
return parsed_port;
}

CoreConfiguration ServerConfigurationParser::parse_core_configuration() const
StreamingCoreConfiguration ServerConfigurationParser::parse_streaming_core_configuration() const
{
CoreConfiguration core_config;
StreamingCoreConfiguration streaming_core_config;

auto core_config_it = config_file_.find(kCoreConfigurationKey);
auto core_config_it = config_file_.find(kStreamingCoreConfigurationKey);
if (core_config_it != config_file_.end()) {
core_config.sideband_read_write_core = parse_core_with_key(kSidebandReadWriteCoreKey);
core_config.stream_write_core = parse_core_with_key(kStreamWriteCoreKey);
core_config.server_run_core = parse_core_with_key(kServerRunCoreKey);
streaming_core_config.sideband_read_write_core = parse_streaming_core_with_key(kSidebandReadWriteCoreKey);
streaming_core_config.stream_write_core = parse_streaming_core_with_key(kStreamWriteCoreKey);
streaming_core_config.server_run_core = parse_streaming_core_with_key(kServerRunCoreKey);
}
else{
core_config.sideband_read_write_core = -1;
core_config.stream_write_core = -1;
core_config.server_run_core = -1;
streaming_core_config.sideband_read_write_core = 0;
streaming_core_config.stream_write_core = 0;
streaming_core_config.server_run_core = 0;
}

return core_config;
return streaming_core_config;
}

int ServerConfigurationParser::parse_core_with_key(const std::string& key) const
int ServerConfigurationParser::parse_streaming_core_with_key(const std::string& key) const
{
int parsed_core = -1;

Expand All @@ -321,12 +321,12 @@ int ServerConfigurationParser::parse_core_with_key(const std::string& key) const
parsed_core = it->get<int>();
}
catch (const nlohmann::json::type_error& ex) {
throw WrongCoreTypeException(ex.what());
throw WrongStreamingCoreTypeException(ex.what());
}
}

if (parsed_core < -1) {
throw InvalidCoreException();
if (parsed_core <= -1) {
throw InvalidStreamingCoreException();
}

return parsed_core;
Expand All @@ -352,7 +352,7 @@ ServerConfigurationParser::InvalidPortException::InvalidPortException()
{
}

ServerConfigurationParser::InvalidCoreException::InvalidCoreException()
ServerConfigurationParser::InvalidStreamingCoreException::InvalidStreamingCoreException()
: std::runtime_error(kInvalidCoreMessage)
{
}
Expand All @@ -367,7 +367,7 @@ ServerConfigurationParser::WrongPortTypeException::WrongPortTypeException(const
{
}

ServerConfigurationParser::WrongCoreTypeException::WrongCoreTypeException(const std::string& type_error_details)
ServerConfigurationParser::WrongStreamingCoreTypeException::WrongStreamingCoreTypeException(const std::string& type_error_details)
: std::runtime_error(kWrongCoreTypeMessage + type_error_details)
{
}
Expand Down
16 changes: 8 additions & 8 deletions source/server/server_configuration_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
#include <nlohmann/json.hpp>

#include "feature_toggles.h"
#include "core_configuration.h"
#include "streaming_core_configuration.h"

namespace nidevice_grpc {

static const char* kConfigFileNotFoundMessage = "The server configuration file was not found at: ";
static const char* kInvalidAddressMessage = "The specified address is not valid.\n Use a valid IPv4 or IPv6 address. Valid values include localhost, 192.168.1.1, [::], [::1], etc.";
static const char* kWrongAddressTypeMessage = "The server address must be specified in the server's configuration file as a string: \n\n";
static const char* kInvalidPortMessage = "The specified port number must between 0 and 65535.";
static const char* kInvalidCoreMessage = "The specified core number must be -1 or greater. -1 indicates that any available core can be used.";
static const char* kInvalidCoreMessage = "The specified core number must be greater than or equal to 0.";
static const char* kMalformedJsonMessage = "The JSON in the server configuration file is malformed: \n\n";
static const char* kWrongPortTypeMessage = "The server port must be specified in the server's configuration file as an integer: \n\n";
static const char* kWrongCoreTypeMessage = "The cpu core must be specified in the server's configuration file as an integer: \n\n";
Expand Down Expand Up @@ -45,7 +45,7 @@ class ServerConfigurationParser {
std::string parse_root_cert() const;
int parse_max_message_size() const;
int parse_sideband_port() const;
CoreConfiguration parse_core_configuration() const;
StreamingCoreConfiguration parse_streaming_core_configuration() const;
FeatureToggles parse_feature_toggles() const;
FeatureToggles::CodeReadiness parse_code_readiness() const;

Expand All @@ -65,8 +65,8 @@ class ServerConfigurationParser {
InvalidPortException();
};

struct InvalidCoreException : public std::runtime_error {
InvalidCoreException();
struct InvalidStreamingCoreException : public std::runtime_error {
InvalidStreamingCoreException();
};

struct MalformedJsonException : public std::runtime_error {
Expand All @@ -77,8 +77,8 @@ class ServerConfigurationParser {
WrongPortTypeException(const std::string& type_error_details);
};

struct WrongCoreTypeException : public std::runtime_error {
WrongCoreTypeException(const std::string& type_error_details);
struct WrongStreamingCoreTypeException : public std::runtime_error {
WrongStreamingCoreTypeException(const std::string& type_error_details);
};

struct UnspecifiedPortException : public std::runtime_error {
Expand Down Expand Up @@ -117,7 +117,7 @@ class ServerConfigurationParser {
std::string parse_bind_address() const;
int parse_port() const;
int parse_port_with_key(const std::string& key) const;
int parse_core_with_key(const std::string& key) const;
int parse_streaming_core_with_key(const std::string& key) const;

std::string config_file_path_;
nlohmann::json config_file_;
Expand Down
10 changes: 10 additions & 0 deletions source/server/streaming_core_configuration.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#ifndef STREAMING_CORE_CONFIGURATION_H
#define STREAMING_CORE_CONFIGURATION_H

struct StreamingCoreConfiguration {
int sideband_read_write_core;
int stream_write_core;
int server_run_core;
};

#endif // STREAMING_CORE_CONFIGURATION_H

0 comments on commit 083c3fd

Please sign in to comment.