Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add Wasm compatibility and allow it to run async #69

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ Cargo.lock

# These are backup files generated by rustfmt
**/*.rs.bk
.cargo/config.toml
40 changes: 25 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "0.3.3"
edition = "2021"
authors = [
"Gao Xiang Kang <1148118271@qq.com>",
"Jovi Hsu <jv.hsu@outlook.com>"
"Jovi Hsu <jv.hsu@outlook.com>",
]
description = "In addition to encryption library, pure RUST implementation of SSH-2.0 client protocol"
keywords = ["ssh", "sshAgreement", "sshClient"]
Expand All @@ -25,28 +25,38 @@ num-bigint = { version = "0.4", features = ["rand"] }
strum = "0.24"
strum_macros = "0.24"
# the crate rsa has removed the internal hash implement from 0.7.0
sha1 = { version = "0.10.5", default-features = false, features = ["oid"], optional = true }
sha2 = { version = "0.10.6", default-features = false, features = ["oid"]}
sha1 = { version = "0.10.5", default-features = false, features = [
"oid",
], optional = true }
sha2 = { version = "0.10.6", default-features = false, features = ["oid"] }
rsa = "^0.7"
aes = { version = "0.7", features = ["ctr"] }
ssh-key = { version = "0.5.1", features = ["rsa", "ed25519"]}
ssh-key = { version = "0.5.1", features = ["rsa", "ed25519"] }
signature = "1.6.4"
ring = "0.16.20"
filetime = "0.2"
ring = "0.16.20"
instant = "0.1.12"
async-std = "1.12.0"
async-recursion = "1.0.4"

# async
# [target.'cfg(not(target_arch = "wasm32"))'.dependencies]
# tokio = { version = "^1", features = ["full"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2", features = ["js"] }
# tokio = { version = "^1", features = [
# "sync",
# "macros",
# "io-util",
# "rt",
# "time"
# ]}
futures = "0.3.28"
web-sys = { version = "0.3.64", features = [
"Window",
"WebSocket",
"ErrorEvent",
"MessageEvent",
"BinaryType",
"FileReader",
"Blob",
"ProgressEvent",
"console",
"Crypto",
] }
wasm-bindgen = "0.2.87"
js-sys = "0.3"

[dev-dependencies]
paste = "1"
Expand Down
3 changes: 3 additions & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[tasks.test-win]
command = "cargo"
args = ["test","${@}","--target","x86_64-pc-windows-msvc","--", "--nocapture","--exact"]
250 changes: 250 additions & 0 deletions src/async_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
#[cfg(target_family = "wasm")]
pub mod browser_web_sokcet{
use async_std::io::{Read, Write};
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::StreamExt;
use std::marker::Send;
use std::task::Waker;
use std::{cell::RefCell, rc::Rc, task::Poll};
use wasm_bindgen::{prelude::Closure, JsCast};
use web_sys::{Blob, FileReader, MessageEvent, WebSocket};

#[cfg(target_family = "wasm")]
pub struct WebSocketStream {
ws: Rc<RefCell<WebSocket>>,
read_receiver: UnboundedReceiver<Vec<u8>>,
temp_read_buffer: Vec<u8>,
// Save the waker so we can wake up the task when we receive a message
read_waker: Rc<RefCell<Option<Waker>>>,
write_waker: Rc<RefCell<Option<Waker>>>,
}

#[cfg(target_family = "wasm")]
impl Read for WebSocketStream {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<std::io::Result<usize>> {
let ready_state = self.ws.as_ref().borrow_mut().ready_state();
log::debug!("read message is called, readystate = {:?}", ready_state);
let this = self.as_mut().get_mut();
if ready_state == 0 {
*(this.read_waker.borrow_mut()) = Some(cx.waker().clone());
return Poll::Pending;
}

if ready_state == 2 || ready_state == 3 {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::NotConnected,
format!("websocket closing or closed"),
)));
}

match this.read_receiver.poll_next_unpin(cx) {
Poll::Ready(Some(data)) => {
this.temp_read_buffer.extend_from_slice(&data);
}
Poll::Ready(None) => {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"WebSocket closed",
)));
}
Poll::Pending => {
if this.temp_read_buffer.is_empty() {
log::debug!("read pending");
return Poll::Pending;
}
}
}

if !this.temp_read_buffer.is_empty() {
// if temp buffer still has something in it
let len = buf.len().min(this.temp_read_buffer.len());
// Copy from temp_read_buffer to buf
buf[..len].copy_from_slice(&this.temp_read_buffer[..len]);

// Remove the bytes we have just copied from temp_read_buffer
this.temp_read_buffer.drain(..len);
return Poll::Ready(Ok(len));
}
Poll::Ready(Ok(0))
}
}

#[cfg(target_family = "wasm")]
impl Write for WebSocketStream {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
let ready_state = self.ws.as_ref().borrow_mut().ready_state();
log::debug!("write called state = {:?}", &ready_state,);

if ready_state == 0 {
*(self.get_mut().write_waker.borrow_mut()) = Some(cx.waker().clone());
return Poll::Pending;
}

if ready_state == 2 || ready_state == 3 {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::NotConnected,
format!("websocket closing or closed"),
)));
}

if let Err(_) = self.ws.as_ref().borrow_mut().send_with_u8_array(buf) {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
format!("websocket error"),
)));
}

log::debug!("write successful");
return Poll::Ready(Ok(buf.len()));
}

fn poll_flush(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
return Poll::Ready(Ok(())); // we don't need to flush webscoket
}

fn poll_close(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
if let Ok(_) = self.ws.as_ref().borrow_mut().close() {
return Poll::Ready(Ok(()));
}
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::ConnectionAborted,
"Failed to close WebSocket",
)));
}
}

#[cfg(target_family = "wasm")]
impl WebSocketStream {
#[allow(dead_code)]
pub fn new(connection_str: String) -> Result<WebSocketStream, Box<dyn std::error::Error>> {
let ws = WebSocket::new(&connection_str.as_str()).expect("websocket creation failed");
let ws_ref = Rc::new(RefCell::new(ws));
let (sender, reader) = unbounded::<Vec<u8>>();

let read_waker = Rc::new(RefCell::new(None::<Waker>));
let read_waker_clone = read_waker.clone();
let onmessage_callback = {
Closure::wrap(Box::new(move |e: MessageEvent| {
log::debug!("received msg from websoket");
let sender_copy = sender.clone();
let waker_double_clone = read_waker_clone.clone();
if let Ok(blob) = e.data().dyn_into::<Blob>() {
let file_reader = FileReader::new().unwrap();
let file_reader_clone = file_reader.clone();
let onloadend_cb = {
Closure::<dyn FnMut(_)>::new(move |_e: web_sys::ProgressEvent| {
log::debug!("{:?}", e);
let array =
js_sys::Uint8Array::new(&&file_reader_clone.result().unwrap());
sender_copy
.unbounded_send(array.to_vec())
.expect("Failed to write message to channel");
if let Some(waker_ref) = waker_double_clone.borrow_mut().as_ref() {
waker_ref.wake_by_ref();
}
})
};
file_reader.set_onloadend(Some(onloadend_cb.as_ref().unchecked_ref()));
file_reader
.read_as_array_buffer(&blob)
.expect("blob not readable");
onloadend_cb.forget();

log::debug!("msg is blob");
} else {
panic!("not recognized data type");
}
}) as Box<dyn FnMut(_)>)
};

ws_ref
.as_ref()
.borrow_mut()
.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
onmessage_callback.forget();

let onerr_callback = {
Closure::wrap(Box::new(move |e: MessageEvent| {
log::debug!("{:?}", e);
}) as Box<dyn FnMut(_)>)
};

ws_ref
.as_ref()
.borrow_mut()
.set_onerror(Some(onerr_callback.as_ref().unchecked_ref()));
onerr_callback.forget();

let write_waker = Rc::new(RefCell::new(None::<Waker>));
let onconnect_callback = {
let write_waker_clone = write_waker.clone();
let read_waker_clone = read_waker.clone();
Closure::wrap(Box::new(move |_e: MessageEvent| {
log::debug!("websocket connected");
if let Some(write_ref) = write_waker_clone.borrow_mut().as_ref() {
write_ref.wake_by_ref();
}

if let Some(read_waker) = read_waker_clone.borrow_mut().as_ref() {
read_waker.wake_by_ref();
}
}) as Box<dyn FnMut(_)>)
};

ws_ref
.as_ref()
.borrow_mut()
.set_onopen(Some(onconnect_callback.as_ref().unchecked_ref()));
onconnect_callback.forget();

let onclose_callback = {
let write_waker_clone = write_waker.clone();
let read_waker_clone = read_waker.clone();
Closure::wrap(Box::new(move |_e: MessageEvent| {
log::debug!("websocket closed");
if let Some(write_ref) = write_waker_clone.borrow_mut().as_ref() {
write_ref.wake_by_ref();
}

if let Some(read_waker) = read_waker_clone.borrow_mut().as_ref() {
read_waker.wake_by_ref();
}
}) as Box<dyn FnMut(_)>)
};

ws_ref
.as_ref()
.borrow_mut()
.set_onclose(Some(onclose_callback.as_ref().unchecked_ref()));
onclose_callback.forget();

return Ok(WebSocketStream {
ws: ws_ref.clone(),
read_receiver: reader,
read_waker: read_waker,
write_waker: write_waker,
temp_read_buffer: Vec::new(),
});
}
}

#[cfg(target_family = "wasm")]
unsafe impl Send for WebSocketStream {}

#[cfg(target_family = "wasm")]
impl Unpin for WebSocketStream {}
}
Loading