From 0e212514b2ea0fc4217669b23faf67b4bda7a400 Mon Sep 17 00:00:00 2001 From: Joe Eli McIlvain Date: Fri, 4 Mar 2022 15:14:10 -0800 Subject: [PATCH] Add `TCP` library code. --- LICENSE.md | 2 +- README.md | 4 +- manifest.savi | 23 +++++++ spec/Main.savi | 5 ++ spec/TCP.Spec.savi | 93 ++++++++++++++++++++++++++ src/TCP.ConnectionEngine.savi | 80 +++++++++++++++++++++++ src/TCP.Listener.savi | 119 ++++++++++++++++++++++++++++++++++ src/_Lib.savi | 12 ++++ src/_NetAddress.savi | 39 +++++++++++ 9 files changed, 374 insertions(+), 3 deletions(-) create mode 100644 manifest.savi create mode 100644 spec/Main.savi create mode 100644 spec/TCP.Spec.savi create mode 100644 src/TCP.ConnectionEngine.savi create mode 100644 src/TCP.Listener.savi create mode 100644 src/_Lib.savi create mode 100644 src/_NetAddress.savi diff --git a/LICENSE.md b/LICENSE.md index a82eb76..d400902 100644 --- a/LICENSE.md +++ b/LICENSE.md @@ -1,4 +1,4 @@ -Copyright 2018 Joe Eli McIlvain +Copyright 2021 Joe Eli McIlvain Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: diff --git a/README.md b/README.md index 16476b8..95a6264 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,3 @@ -A base repository for Savi language libraries, with common CI actions configured. +# TCP -See the [Guide](https://github.com/savi-lang/base-standard-library/wiki/Guide) for details on how it works and how to use it for your own libraries. +TCP networking implementation for the Savi standard library. diff --git a/manifest.savi b/manifest.savi new file mode 100644 index 0000000..27438c5 --- /dev/null +++ b/manifest.savi @@ -0,0 +1,23 @@ +:manifest lib TCP + :sources "src/*.savi" + + :dependency ByteStream v0 + :from "github:savi-lang/ByteStream" + + :dependency IO v0 + :from "github:savi-lang/IO" + :depends on ByteStream + :depends on OSError + + :dependency OSError v0 + +:manifest bin "spec" + :copies TCP + :sources "spec/*.savi" + + :dependency Spec v0 + :from "github:savi-lang/Spec" + :depends on Map + + :transitive dependency Map v0 + :from "github:savi-lang/Map" diff --git a/spec/Main.savi b/spec/Main.savi new file mode 100644 index 0000000..879f760 --- /dev/null +++ b/spec/Main.savi @@ -0,0 +1,5 @@ +:actor Main + :new (env) + Spec.Process.run(env, [ + Spec.Run(TCP.Spec).new(env) + ]) diff --git a/spec/TCP.Spec.savi b/spec/TCP.Spec.savi new file mode 100644 index 0000000..d5fc2e2 --- /dev/null +++ b/spec/TCP.Spec.savi @@ -0,0 +1,93 @@ +:class iso TCP.Spec.Listener.Notify + :is TCP.Listener.Notify + :let env Env + :new (@env) + + :fun ref listening(listen TCP.Listener'ref) + TCP.Spec.EchoClient.new(@env, Inspect[listen.local_port]) + @env.err.print("[Listener] Listening") + + :fun ref not_listening(listen TCP.Listener'ref) None + @env.err.print("[Listener] Not listening:") + @env.err.print(listen.listen_error.name) + + :fun ref closed(listen TCP.Listener'ref): None + @env.err.print("[Listener] Stopped listening") + + :fun ref connected!(listen TCP.Listener'ref, ticket TCP.Listener.AcceptTicket) + TCP.Spec.Echoer.new(@env, listen, --ticket) + +:actor TCP.Spec.Echoer + :is IO.Actor(IO.Action) + :let env Env + :let io TCP.ConnectionEngine + :new (@env, listen, ticket) + @io = TCP.ConnectionEngine.accept(@, listen, --ticket) + @env.err.print("[Echoer] Accepted") + + :fun ref _io_react(action IO.Action) + case action == ( + | IO.Action.Read | + @io.pending_reads -> (bytes_available | + @io.read_stream.advance_to_end + bytes val = @io.read_stream.extract_token + @env.err.print("[Echoer] Received:") + @env.err.print(bytes.as_string) + @io.write_stream << bytes.clone // TODO: is clone still needed? + try @io.flush! // TODO: should we flush automatically on close below? + @io.close + ) + | IO.Action.Closed | + @env.err.print("[Echoer] Closed") + try @io.listen.as!(TCP.Listener).dispose + ) + @ + +:actor TCP.Spec.EchoClient + :is IO.Actor(IO.Action) + :let env Env + :let io TCP.ConnectionEngine + :new (@env, service) + @io = TCP.ConnectionEngine.connect(@, "localhost", service) + + // TODO: Can we make this trigger _io_react with IO.Action.OpenFailed + // automatically via the same mechanism we will use for queuing later + // pending reads, instead of checking for this error case here? + if (@io.connect_error != OSError.None) ( + @env.err.print("[EchoClient] Failed to connect:") + @env.err.print(@io.connect_error.name) + ) + + :fun ref _io_react(action IO.Action) + case action == ( + | IO.Action.Opened | + @env.err.print("[EchoClient] Connected") + @io.write_stream << b"Hello, World!" + try @io.flush! + + | IO.Action.OpenFailed | + @env.err.print("[EchoClient] Failed to connect:") + @env.err.print(@io.connect_error.name) + + | IO.Action.Read | + @io.pending_reads -> (bytes_available | + if (bytes_available >= b"Hello, World!".size) ( + @io.read_stream.advance_to_end + @env.err.print("[EchoClient] Received:") + @env.err.print(@io.read_stream.extract_token.as_string) + @io.close + ) + ) + + | IO.Action.Closed | + @env.err.print("[EchoClient] Closed") + try @io.listen.as!(TCP.Listener).dispose + ) + @ + +:class TCP.Spec + :is Spec + :const describes: "TCP" + + :it "can listen, connect, send, respond, disconnect, and stop listening" + TCP.Listener.new(TCP.Spec.Listener.Notify.new(@env)) diff --git a/src/TCP.ConnectionEngine.savi b/src/TCP.ConnectionEngine.savi new file mode 100644 index 0000000..a0ee072 --- /dev/null +++ b/src/TCP.ConnectionEngine.savi @@ -0,0 +1,80 @@ +:class TCP.ConnectionEngine + :is IO.Engine(IO.Action) + :var io IO.CoreEngine + :var listen (TCP.Listener | None): None + :var connect_error OSError: OSError.None + :let read_stream: ByteStream.Reader.new + :let write_stream ByteStream.Writer + + :fun non connect( + // TODO: TCPConnectionAuth, rather than ambient authority. + actor AsioEventNotify + host String + service String + from String = "" + ) + try ( + @_new_with_io(IO.CoreEngine.new_tcp_connect!(actor, host, service, from)) + | + invalid = @_new_with_io(IO.CoreEngine.new) + invalid.connect_error = OSError.EINVAL + invalid + ) + + :fun non accept( + actor AsioEventNotify + listen TCP.Listener + ticket TCP.Listener.AcceptTicket + ) + io = IO.CoreEngine.new_from_fd_rw(actor, ticket._fd) + new = @_new_with_io(io) + new.listen = listen + new + + :new _new_with_io(@io) + @write_stream = ByteStream.Writer.new(@io) + + :fun ref deferred_actions + :yields IO.Action for None + // TODO + @ + + :fun ref react(event CPointer(AsioEvent), flags U32, arg U32) @ + :yields IO.Action + @io.react(event, flags, arg) -> (action | + case action == ( + | IO.Action.Closed | + try @listen.as!(TCP.Listener)._conn_closed + + // TODO: windows complete writes, flush-after-mute (pending writes logic from Pony) + // | IO.Action.Write | + // ... + ) + yield action + ) + @ + + :fun ref close + @io.close + @ + + :fun ref flush! + @write_stream.flush! + + :fun ref pending_reads + :yields USize for None + if Platform.windows ( + None // TODO: @_windows_complete_reads(arg) + | + @_pending_reads_unix -> (bytes_available | yield bytes_available) + ) + @ + + :fun ref _pending_reads_unix None + :yields USize for None + while @io.is_readable ( + try ( + bytes_read = @read_stream.receive_from!(@io) + if (bytes_read > 0) (yield @read_stream.bytes_ahead_of_marker) + ) + ) diff --git a/src/TCP.Listener.savi b/src/TCP.Listener.savi new file mode 100644 index 0000000..cb6ac71 --- /dev/null +++ b/src/TCP.Listener.savi @@ -0,0 +1,119 @@ +:trait TCP.Listener.Notify + :fun ref listening(listen TCP.Listener'ref): None + :fun ref not_listening(listen TCP.Listener'ref) None + :fun ref closed(listen TCP.Listener'ref): None + :fun ref connected!( + listen TCP.Listener'ref + ticket TCP.Listener.AcceptTicket + ) IO.Actor(IO.Action) + +// TODO: Is there another way to protect the fd by making it non-forgeable, +// while avoiding the overhead of an allocation and pointer indirection? +:class iso TCP.Listener.AcceptTicket + :var _fd U32 + :new iso _new(@_fd) + +:actor TCP.Listener + :let notify TCP.Listener.Notify + :var listen_error OSError: OSError.None + + :var _fd U32: -1 + :var _event CPointer(AsioEvent): CPointer(AsioEvent).null + + :var _count USize: 0 + :var _limit USize + :var _read_buffer_size USize + :var _yield_after_reading USize + :var _yield_after_writing USize + + :var _closed Bool: False + :var _paused Bool: False + + :fun local_port: _NetAddress._for_fd(@_fd).port + + :new ( + // TODO: TCP.Listener.Auth, rather than ambient authority. + notify TCP.Listener.Notify'iso + host String = "" + service String = "0" + @_limit = 0 + @_read_buffer_size = 16384 + @_yield_after_reading = 16384 + @_yield_after_writing = 16384 + ) + new_notify TCP.Listener.Notify'ref = --notify // TODO: should not be needed + @notify = new_notify + + event = _LibPonyOS.pony_os_listen_tcp(@, host.cstring, service.cstring) + if event.is_not_null ( + @_event = event + @_fd = AsioEvent.fd(@_event) + error = _LibPonyOS.pony_os_errno + new_notify.listening(@) + | + @listen_error = _LibPonyOS.pony_os_errno + @_closed = True + new_notify.not_listening(@) + ) + + :: This is a special behaviour that hooks into the AsioEventNotify runtime, + :: called whenever an event handle we're subscribed to receives an event. + :be _event_notify(event CPointer(AsioEvent), flags U32, arg U32) + if (@_event === event) ( + if AsioEvent.is_readable(flags) ( + @_accept(arg) + ) + if AsioEvent.is_disposable(flags) ( + AsioEvent.destroy(@_event) + @_event = CPointer(AsioEvent).null + ) + ) + + :be _accept(ns U32 = 0) + if Platform.windows ( + None // TODO + | + if @_closed.not ( + try ( + while (@_limit == 0 || @_count < @_limit) ( + conn_fd = _LibPonyOS.pony_os_accept(@_event) + case conn_fd == ( + | 0 | error! // EWOULDBLOCK, don't try again + | -1 | None // Some other error, so we can try again + | @_spawn(conn_fd) + ) + ) + @_paused = True + ) + ) + ) + + :fun ref _spawn(fd U32) + try ( + @notify.connected!(@, TCP.Listener.AcceptTicket._new(fd)) + @_count += 1 + | + _LibPonyOS.pony_os_socket_close(fd) + ) + + :be _conn_closed + @_count -= 1 + + // If releasing this connection takes us below the limit, + // unpause acceptance and try to accept more connections. + if (@_paused && @_count < @_limit) ( + @_paused = False + @_accept + ) + + :be dispose: @close + :fun ref close + if (@_closed.not && @_event.is_not_null) ( + // When not on windows, unsubscribe immediately here instead of later. + if Platform.windows.not AsioEvent.unsubscribe(@_event) + + _LibPonyOS.pony_os_socket_close(@_fd) + @_fd = -1 + + @notify.closed(@) + ) diff --git a/src/_Lib.savi b/src/_Lib.savi new file mode 100644 index 0000000..c97f75a --- /dev/null +++ b/src/_Lib.savi @@ -0,0 +1,12 @@ +:ffi _LibC + :fun ntohs(network_short U16) U16 + :fun ntohl(network_long U32) U32 + +:ffi _LibPonyOS + :fun pony_os_listen_tcp(owner AsioEventNotify, host CPointer(U8), service CPointer(U8)) CPointer(AsioEvent) + :fun pony_os_accept(event CPointer(AsioEvent)) U32 + :fun pony_os_socket_close(fd U32) None + :fun pony_os_errno OSError + :fun pony_os_sockname(fd U32, net_addr _NetAddress'ref) None + :fun pony_os_ipv4(net_addr _NetAddress'box) Bool + :fun pony_os_ipv6(net_addr _NetAddress'box) Bool diff --git a/src/_NetAddress.savi b/src/_NetAddress.savi new file mode 100644 index 0000000..24560f4 --- /dev/null +++ b/src/_NetAddress.savi @@ -0,0 +1,39 @@ + + +:class val _NetAddress + :is Equatable(_NetAddress) + + :let _family U16: 0 + :let _port U16: 0 :: Port number in network byte order. + :let _ipv4 U32: 0 :: Bits for an IPv4 address in network byte order. + :let _ipv6a U32: 0 :: Bits 0-32 of an IPv6 address in network byte order. + :let _ipv6b U32: 0 :: Bits 33-64 of an IPv6 address in network byte order. + :let _ipv6c U32: 0 :: Bits 65-96 of an IPv6 address in network byte order. + :let _ipv6d U32: 0 :: Bits 97-128 of an IPv6 address in network byte order. + :let _scope U32: 0 :: IPv6 scope (unicast, anycast, multicast, etc...). + + :new _for_fd(fd): _LibPonyOS.pony_os_sockname(fd, @) + + :fun is_ipv4: _LibPonyOS.pony_os_ipv4(@) + :fun is_ipv6: _LibPonyOS.pony_os_ipv6(@) + + :fun port: _LibC.ntohs(@_port) // (converted to host byte order) + :fun scope: _LibC.ntohl(@_scope) // (converted to host byte order) + :fun ipv4_addr: _LibC.ntohl(@_ipv4) // (converted to host byte order) + // TODO: ipv6_addr (needs tuple return value) + // TODO: family (needs Platform.big_endian) + + :fun "=="(other _NetAddress'box) + @_family == other._family + && @_port == other._port + && ( + if @is_ipv4 ( + @_ipv4 == other._ipv4 + | + @_ipv6a == other._ipv6a + && @_ipv6b == other._ipv6b + && @_ipv6c == other._ipv6c + && @_ipv6d == other._ipv6d + ) + ) + && @_scope == other._scope