Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TEST][NO-MERGE] Stress test sockets #2

Draft
wants to merge 5 commits into
base: bell-db/v0.9.7-socket
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package protocbridge.frontend

import org.apache.commons.io.IOUtils
import org.scalatest.exceptions.TestFailedException
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.must.Matchers
import protocbridge.{ExtraEnv, ProtocCodeGenerator}

import java.io.ByteArrayOutputStream
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future, TimeoutException}
import scala.sys.process.ProcessIO
import scala.util.Random

Expand All @@ -30,20 +35,22 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers {
writeInput.close()
},
processOutput => {
val buffer = new Array[Byte](4096)
var bytesRead = 0
while (bytesRead != -1) {
bytesRead = processOutput.read(buffer)
if (bytesRead != -1) {
actualOutput.write(buffer, 0, bytesRead)
}
}
IOUtils.copy(processOutput, actualOutput)
processOutput.close()
},
_.close()
processError => {
IOUtils.copy(processError, System.err)
processError.close()
}
)
)
process.exitValue()
try {
Await.result(Future { process.exitValue() }, 5.seconds)
} catch {
case _: TimeoutException =>
System.err.println(s"Timeout")
process.destroy()
}
frontend.cleanup(state)
(state, actualOutput.toByteArray)
}
Expand All @@ -62,9 +69,27 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers {
toReceive
}
}
// Repeat 100,000 times since named pipes on macOS are flaky.
val repeatCount = 100000
for (i <- 1 until repeatCount) {
if (i % 100 == 1) println(s"Running iteration $i of $repeatCount")
val (state, response) =
testPluginFrontend(frontend, fakeGenerator, env, toSend)
try {
response mustBe toReceive
} catch {
case e: TestFailedException =>
System.err.println(s"""Failed on iteration $i of $repeatCount: ${e.getMessage}""")
}
}
val (state, response) =
testPluginFrontend(frontend, fakeGenerator, env, toSend)
response mustBe toReceive
try {
response mustBe toReceive
} catch {
case e: TestFailedException =>
System.err.println(s"""Failed on iteration $repeatCount of $repeatCount: ${e.getMessage}""")
}
state
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package protocbridge.frontend
class PosixPluginFrontendSpec extends OsSpecificFrontendSpec {
if (!PluginFrontend.isWindows && !PluginFrontend.isMac) {
it must "execute a program that forwards input and output to given stream" in {
testSuccess(PosixPluginFrontend)
testSuccess(MacPluginFrontend)
}

it must "not hang if there is an OOM in generator" in {
testFailure(PosixPluginFrontend)
testFailure(MacPluginFrontend)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package protocbridge.frontend
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.must.Matchers

import java.lang.management.ManagementFactory
import java.net.ServerSocket
import scala.collection.mutable
import scala.sys.process._
import scala.util.{Failure, Success, Try}

class SocketAllocationSpec extends AnyFlatSpec with Matchers {
it must "allocate an unused port" in {
val repeatCount = 100000

val currentPid = getCurrentPid
val portConflictCount = mutable.Map[Int, Int]()

for (i <- 1 to repeatCount) {
if (i % 100 == 1) println(s"Running iteration $i of $repeatCount")

val serverSocket = new ServerSocket(0) // Bind to any available port.
try {
val port = serverSocket.getLocalPort
Try {
s"lsof -i :$port -t".!!.trim
} match {
case Success(output) =>
if (output.nonEmpty) {
val pids = output.split("\n").filterNot(_ == currentPid.toString)
if (pids.nonEmpty) {
System.err.println("Port conflict detected on port " + port + " with PIDs: " + pids.mkString(", "))
portConflictCount(port) = portConflictCount.getOrElse(port, 0) + 1
}
}
case Failure(_) => // Ignore failure and continue
}
} finally {
serverSocket.close()
}
}

assert(portConflictCount.isEmpty, s"Found the following ports in use out of $repeatCount: $portConflictCount")
}

private def getCurrentPid: Int = {
val jvmName = ManagementFactory.getRuntimeMXBean.getName
val pid = jvmName.split("@")(0)
pid.toInt
}
}
68 changes: 68 additions & 0 deletions port_conflict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import os
import socket
import subprocess
import sys

def is_port_in_use(port, current_pid):
"""
Check if the given port is in use by other processes, excluding the current process.

:param port: Port number to check
:param pid: Current process ID to exclude from the result
:return: True if the port is in use by another process, False otherwise
"""
try:
# Run lsof command to check if any process is using the port
result = subprocess.run(
['lsof', '-i', f':{port}', '-t'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
output = result.stdout.strip()

if output:
# Check if the output contains lines with processes other than the current one
return [
line
for line in output.split('\n')
if line != str(current_pid)
]
return []
except subprocess.CalledProcessError as e:
print(f"Error checking port: {e}", file=sys.stderr)
return []

def main():
repeat_count = 10000

current_pid = os.getpid() # Get the current process ID
port_conflict_count = {}

for i in range(1, repeat_count + 1):
if i % 100 == 1:
print(f"Running iteration {i} of {repeat_count}")

# Bind to an available port (port 0)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('', 0)) # Bind to port 0 to get an available port
port = sock.getsockname()[1] # Get the actual port number assigned

# Check if the port is in use by any other process
pids = is_port_in_use(port, current_pid)
if pids:
print(f"Port conflict detected on port {port} with PIDs: {', '.join(pids)}", file=sys.stderr)
port_conflict_count[port] = port_conflict_count.get(port, 0) + 1

# Close the socket after checking
sock.close()

if port_conflict_count:
print("Ports that were found to be in use and their collision counts:")
for port, count in port_conflict_count.items():
print(f"Port {port} was found in use {count} times")
else:
print("No ports were found to be in use.")

if __name__ == '__main__':
main()