Skip to content

Commit

Permalink
update client: now communicating with server via stream
Browse files Browse the repository at this point in the history
  • Loading branch information
mcencini committed Oct 15, 2024
1 parent d0b9548 commit 1420503
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 21 deletions.
69 changes: 54 additions & 15 deletions src/pulseclient/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
"SERVER_PROCESS_NAME": "external_server.py",
"file_path_simulation": "params.dat",
"file_path_production": "/srv/nfs/psd/usr/psd/pulseq/temp/params.dat",
"output_path_simulation": "sequence.bin",
"output_path_production": "/srv/nfs/psd/usr/psd/pulseq/temp/sequence.bin",
}

# Define the default config file location
Expand All @@ -32,6 +34,7 @@
def load_config():
"""
Load configuration from the pulseclient.ini file.
First, check the PULSECLIENT_CONFIG environment variable for the file location.
If not set or file does not exist, fall back to the default config location.
If no config file is found, use default values.
Expand Down Expand Up @@ -93,6 +96,16 @@ def load_config():
"file_path_production",
fallback=config["file_path_production"],
),
"output_path_simulation": parser.get(
"settings",
"output_path_simulation",
fallback=config["output_path_simulation"],
),
"output_path_production": parser.get(
"settings",
"output_path_production",
fallback=config["output_path_production"],
),
}
)
else:
Expand All @@ -102,9 +115,7 @@ def load_config():


def is_server_running(config):
"""
Check if the server is running on a remote machine via SSH.
"""
"""Check if the server is running on a remote machine via SSH."""
try:
# Build SSH command to check if the process is running
ssh_command = [
Expand Down Expand Up @@ -133,9 +144,7 @@ def is_server_running(config):


def start_server(config):
"""
Start the server on a remote machine using SSH.
"""
"""Start the server on a remote machine using SSH."""
if is_server_running(config):
return

Expand Down Expand Up @@ -163,9 +172,7 @@ def start_server(config):


def is_file_complete(file_path, config):
"""
Check if the file is complete and ready for processing by comparing file size over time.
"""
"""Check if the file is complete and ready for processing by comparing file size over time."""
try:
file_size = os.stat(file_path).st_size
time.sleep(config["CHECK_INTERVAL"]) # Wait a bit to see if the size changes
Expand All @@ -178,9 +185,7 @@ def is_file_complete(file_path, config):


def send_file_to_server(file_path, config):
"""
Send the file over a socket connection to the external server.
"""
"""Send the file over a socket connection to the external server."""
try:
# Open a socket connection to the server
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Expand All @@ -205,14 +210,48 @@ def send_file_to_server(file_path, config):
print("Failed to send file to server: %s" % str(e))


def watch_file(file_path, config):
def send_buffer_to_server(data_buffer, config, response_file_path):
"""
Watch the file for changes and send it to the server once it's complete.
Send the byte buffer over a socket connection to the external server.
After sending the buffer, waits for the server's response to write it down to a file.
"""
try:
# Open a socket connection to the server
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((config["SERVER_IP"], config["SERVER_PORT"]))
print(
"Connected to server %s:%d" % (config["SERVER_IP"], config["SERVER_PORT"])
)

# Send the entire data buffer over the socket using sendall
sock.sendall(data_buffer)
print("Data buffer sent to the server successfully.")

# Now, receive the response from the server
with open(response_file_path, "wb") as response_file:
while True:
received_data = sock.recv(1024)
if not received_data:
break
response_file.write(received_data)
print(
"Response received from server and written to %s." % response_file_path
)

except Exception as e:
print("Failed to communicate with server: %s" % str(e))

finally:
sock.close() # Ensure the socket is closed


def watch_file(file_path, config, output_path):
"""Watch the file for changes and send it to the server once it's complete."""
while True:
if os.path.exists(file_path) and is_file_complete(file_path, config):
print("File detected and is ready: %s" % file_path)
send_file_to_server(file_path, config)
send_buffer_to_server(file_path, config, output_path)
break

# Wait before checking again
Expand Down
12 changes: 8 additions & 4 deletions src/pulseclient/start_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@

def main():
"""
Main entry point for the CLI. Parses an integer argument to determine the mode of operation:
Main entry point for the CLI.
Parses an integer argument to determine the mode of operation:
- 1 for simulation (uses configured simulation file path),
- 0 for production (uses configured production file path).
If no argument is provided, the default is 1 (simulate mode).
"""
# Create an argument parser for CLI
Expand Down Expand Up @@ -39,16 +43,16 @@ def main():
# Determine the file path based on the mode
if args.mode == 1:
file_path = config["file_path_simulation"]
print(f"Running in simulate mode (using {file_path})")
output_path = config["output_path_simulation"]
else:
file_path = config["file_path_production"]
print(f"Running in production mode (using {file_path})")
output_path = config["output_path_production"]

# Start the server (on a remote machine)
start_server(config)

# Watch for the file and send it once ready
watch_file(file_path, config)
watch_file(file_path, config, output_path)


if __name__ == "__main__":
Expand Down
67 changes: 65 additions & 2 deletions tests/test_pulseclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,21 @@
import time
import unittest

from io import BytesIO # Used to mock binary file handling

# Conditional imports based on Python version
if sys.version_info[0] == 2:
from mock import patch, mock_open, MagicMock # For Python 2.7
from mock import call, patch, mock_open, MagicMock # For Python 2.7
else:
from unittest.mock import patch, mock_open, MagicMock # For Python 3.x
from unittest.mock import call, patch, mock_open, MagicMock # For Python 3.x

from pulseclient.lib import (
load_config,
is_server_running,
start_server,
is_file_complete,
send_file_to_server,
send_buffer_to_server,
watch_file,
)

Expand Down Expand Up @@ -82,3 +85,63 @@ def test_start_server_when_not_running(self, mock_popen):
with patch("pulseclient.lib.is_server_running", return_value=False):
mock_process = MagicMock()
mock_process.returnc


class TestSendBufferToServer(unittest.TestCase):

@patch("builtins.open", create=True)
@patch("socket.socket")
def test_send_buffer_to_server(self, mock_socket, mock_open):
"""
Test that send_buffer_to_server correctly sends data to the server
and writes the server's response to a file.
"""

# Setup
data_buffer = b"Test byte buffer data to be sent"
config = {"SERVER_IP": "127.0.0.1", "SERVER_PORT": 5000}
response_file_path = "test_response.bin"
expected_server_response = [
b"Server response data part 1",
b"Server response data part 2",
]

# Mock the socket connection and behavior
mock_sock_instance = mock_socket.return_value

# Simulate server sending data in parts, followed by an empty string (end of stream)
mock_sock_instance.recv.side_effect = expected_server_response + [
b""
] # Simulate end of transmission

# Mock file open to return a file-like mock object
mock_file = MagicMock()
mock_open.return_value.__enter__.return_value = mock_file

# Run the function
send_buffer_to_server(data_buffer, config, response_file_path)

# Assertions:
# 1. Assert socket connection was made to the correct IP and port
mock_socket.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM)
mock_sock_instance.connect.assert_called_once_with(("127.0.0.1", 5000))

# 2. Assert data was sent through the socket
mock_sock_instance.sendall.assert_called() # Ensure some data was sent

# 3. Assert file writes occurred correctly
# Check that the file was opened in 'wb' mode
mock_open.assert_called_once_with(response_file_path, "wb")

# Check the data that was written to the file
# Now using assert_has_calls to allow for multiple calls
mock_file.write.assert_has_calls(
[
call(b"Server response data part 1"),
call(b"Server response data part 2"),
],
any_order=True,
)

# # 4. Assert socket was closed
mock_sock_instance.close.assert_called_once()

0 comments on commit 1420503

Please sign in to comment.