Skip to content

Commit

Permalink
feat: create Bytes from std::sync::Arc
Browse files Browse the repository at this point in the history
...via infallible (`Bytes::from_arc_projection`) and fallible
(`Bytes::try_from_arc_projection`) constructors.

#359 (comment)
#437 (comment)
  • Loading branch information
scottlamb committed Mar 6, 2025
1 parent 19d1427 commit 22defe0
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 14 deletions.
148 changes: 134 additions & 14 deletions src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use core::mem::{self, ManuallyDrop};
use core::ops::{Deref, RangeBounds};
use core::ptr::NonNull;
use core::{cmp, fmt, hash, ptr, slice, usize};
use std::sync::Arc;

use alloc::{
alloc::{dealloc, Layout},
Expand Down Expand Up @@ -289,6 +290,74 @@ impl Bytes {
ret
}

/// Creates a new `Bytes` from an [`Arc<T>`] owner and a function that
/// returns the buffer given a reference to the contained `T`.
///
/// `T` must be [`Sized`] rather than a trait object or slice.
///
/// The returned `Bytes` can be cloned via `Arc` referencing counting,
/// whereas `Bytes` created via conversion from [`Vec`] must perform a new
/// allocation internally on first clone to hold the reference count.
/// This optimization is most significant if many `Bytes` instances are
/// created from the same `owner` and subsequently cloned.
///
/// ```
/// # use std::sync::Arc;
/// # use bytes::Bytes;
/// struct Pieces(Vec<Vec<u8>>);
/// let pieces = Arc::new(Pieces(vec![b"hello".to_vec(), b"world".to_vec()]));
/// let bytes: Vec<Bytes> = (0..2).map(|i| {
/// Bytes::from_arc_projection(pieces.clone(), |p| &p.0[i])
/// }).collect();
/// let bytes_cloned = bytes.clone();
/// assert_eq!(bytes[0], b"hello"[..]);
/// assert_eq!(bytes_cloned[1], b"world"[..]);
/// ```
///
/// See also [`Bytes::try_from_arc_projection`] for a fallible version.
pub fn from_arc_projection<T: Sync + 'static>(
owner: Arc<T>,
projection: impl FnOnce(&T) -> &[u8],
) -> Self {
let buf = projection(&*owner);
Bytes {
ptr: buf.as_ptr(),
len: buf.len(),
data: AtomicPtr::new(Arc::into_raw(owner) as *mut ()),
vtable: arcproj_vtable::<T>(),
}
}

/// Tries to creates a new `Bytes` from an [`Arc`] owner and a function that
/// returns the buffer given a reference to the contained `T` or fails.
///
/// This is similar to [`Bytes::from_arc_projection`] but fallible.
///
/// ```
/// # use std::sync::Arc;
/// # use bytes::Bytes;
/// struct Pieces(Vec<Vec<u8>>);
/// let pieces = Arc::new(Pieces(vec![b"hello".to_vec(), b"world".to_vec()]));
/// let bytes: Vec<Result<Bytes, &str>> = (0..3).map(|i| {
/// Bytes::try_from_arc_projection(pieces.clone(), |p| {
/// p.0.get(i).map(|v| &**v).ok_or("out of bounds")
/// })
/// }).collect();
/// assert_eq!(bytes, [Ok(b"hello"[..].into()), Ok(b"world"[..].into()), Err("out of bounds")]);
/// ```
pub fn try_from_arc_projection<T: Sync + 'static, E>(
owner: Arc<T>,
projection: impl FnOnce(&T) -> Result<&[u8], E>,
) -> Result<Self, E> {
let buf = projection(&*owner)?;
Ok(Bytes {
ptr: buf.as_ptr(),
len: buf.len(),
data: AtomicPtr::new(Arc::into_raw(owner) as *mut ()),
vtable: arcproj_vtable::<T>(),
})
}

/// Returns the number of bytes contained in this `Bytes`.
///
/// # Examples
Expand Down Expand Up @@ -322,8 +391,9 @@ impl Bytes {
/// Returns true if this is the only reference to the data and
/// `Into<BytesMut>` would avoid cloning the underlying buffer.
///
/// Always returns false if the data is backed by a [static slice](Bytes::from_static),
/// or an [owner](Bytes::from_owner).
/// Always returns false if the data is backed by a
/// [static slice](Bytes::from_static), [owner](Bytes::from_owner),
/// or [Arc projection](Bytes::from_arc_projection).
///
/// The result of this method may be invalidated immediately if another
/// thread clones this value while this is being called. Ensure you have
Expand Down Expand Up @@ -627,8 +697,9 @@ impl Bytes {
/// If `self` is not unique for the entire original buffer, this will fail
/// and return self.
///
/// This will also always fail if the buffer was constructed via either
/// [from_owner](Bytes::from_owner) or [from_static](Bytes::from_static).
/// Always fails if the data is backed by a
/// [static slice](Bytes::from_static), [owner](Bytes::from_owner),
/// or [Arc projection](Bytes::from_arc_projection).
///
/// # Examples
///
Expand Down Expand Up @@ -1073,13 +1144,17 @@ impl fmt::Debug for Vtable {
}
}

fn never_unique(_: &AtomicPtr<()>) -> bool {
false
}

// ===== impl StaticVtable =====

const STATIC_VTABLE: Vtable = Vtable {
clone: static_clone,
to_vec: static_to_vec,
to_mut: static_to_mut,
is_unique: static_is_unique,
is_unique: never_unique,
drop: static_drop,
};

Expand All @@ -1098,10 +1173,6 @@ unsafe fn static_to_mut(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesM
BytesMut::from(slice)
}

fn static_is_unique(_: &AtomicPtr<()>) -> bool {
false
}

unsafe fn static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) {
// nothing to drop for &'static [u8]
}
Expand Down Expand Up @@ -1152,10 +1223,6 @@ unsafe fn owned_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Byte
BytesMut::from_vec(owned_to_vec(data, ptr, len))
}

unsafe fn owned_is_unique(_data: &AtomicPtr<()>) -> bool {
false
}

unsafe fn owned_drop_impl(owned: *mut ()) {
let lifetime = owned.cast::<OwnedLifetime>();
let ref_cnt = &(*lifetime).ref_cnt;
Expand Down Expand Up @@ -1183,7 +1250,7 @@ static OWNED_VTABLE: Vtable = Vtable {
clone: owned_clone,
to_vec: owned_to_vec,
to_mut: owned_to_mut,
is_unique: owned_is_unique,
is_unique: never_unique,
drop: owned_drop,
};

Expand Down Expand Up @@ -1489,6 +1556,59 @@ unsafe fn shallow_clone_arc(shared: *mut Shared, ptr: *const u8, len: usize) ->
}
}

fn arcproj_vtable<T: Sync>() -> &'static Vtable {
// Produce vtable via const promotion to &'static.
// <https://users.rust-lang.org/t/custom-vtables-with-integers/78508/2>
trait V {
const VTABLE: Vtable;
}
impl<T: Sync> V for T {
const VTABLE: Vtable = Vtable {
clone: arcproj_clone::<T>,
to_vec: arcproj_to_vec::<T>,
to_mut: arcproj_to_mut::<T>,
is_unique: never_unique,
drop: arcproj_drop::<T>,
};
}
&<T as V>::VTABLE
}

unsafe fn arcproj_clone<T: Sync>(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
let arc = data.load(Ordering::Relaxed);

// Replicate `Arc::increment_strong_count`, which has a MSRV of 1.51.0.
let _ = std::mem::ManuallyDrop::new(Arc::<T>::from_raw(arc as *const T)).clone();

Bytes {
ptr,
len,
data: AtomicPtr::new(arc),
vtable: arcproj_vtable::<T>(),
}
}

unsafe fn arcproj_to_vec<T: Sync>(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> {
let vec = slice::from_raw_parts(ptr, len).to_vec();
arcproj_drop_impl::<T>(data);
vec
}

unsafe fn arcproj_to_mut<T: Sync>(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
let out = BytesMut::from(slice::from_raw_parts(ptr, len));
arcproj_drop_impl::<T>(data);
out
}

unsafe fn arcproj_drop<T: Sync>(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) {
arcproj_drop_impl::<T>(data);
}

unsafe fn arcproj_drop_impl<T: Sync>(data: &AtomicPtr<()>) {
// Replicate `Arc::decrement_strong_count`, which has a MSRV of 1.51.0.
drop(Arc::from_raw(data.load(Ordering::Relaxed) as *const T));
}

#[cold]
unsafe fn shallow_clone_vec(
atom: &AtomicPtr<()>,
Expand Down
99 changes: 99 additions & 0 deletions tests/test_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1647,3 +1647,102 @@ fn owned_safe_drop_on_as_ref_panic() {
assert!(result.is_err());
assert_eq!(drop_counter.get(), 1);
}

#[test]
fn arcproj_is_unique_always_false() {
let b1 = Bytes::from_arc_projection(Arc::new([1, 2, 3, 4, 5, 6, 7]), |v| &v[..]);
assert!(!b1.is_unique()); // even if ref_cnt == 1
let b2 = b1.clone();
assert!(!b1.is_unique());
assert!(!b2.is_unique());
drop(b1);
assert!(!b2.is_unique()); // even if ref_cnt == 1
}

#[test]
fn arcproj_buf_sharing() {
let buf = [1, 2, 3, 4, 5, 6, 7];
let b1 = Bytes::from_arc_projection(Arc::new(buf), |v| &v[..]);
let b2 = b1.clone();
assert_eq!(&buf[..], &b1[..]);
assert_eq!(&buf[..], &b2[..]);
assert_eq!(b1.as_ptr(), b2.as_ptr());
assert_eq!(b1.len(), b2.len());
assert_eq!(b1.len(), buf.len());
}

#[test]
fn arcproj_buf_slicing() {
let b1 = Bytes::from_arc_projection(Arc::new(Vec::from(SHORT)), |v| &v[..]);
assert_eq!(SHORT, &b1[..]);
let b2 = b1.slice(1..(b1.len() - 1));
assert_eq!(&SHORT[1..(SHORT.len() - 1)], b2);
assert_eq!(unsafe { b1.as_ptr().add(1) }, b2.as_ptr());
assert_eq!(SHORT.len() - 2, b2.len());
}

#[test]
fn arcproj_dropped_exactly_once() {
let buf: [u8; 5] = [1, 2, 3, 4, 5];
let drop_counter = SharedAtomicCounter::new();
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_arc_projection(Arc::new(owner), |o| o.as_ref());
let b2 = b1.clone();
assert_eq!(drop_counter.get(), 0);
drop(b1);
assert_eq!(drop_counter.get(), 0);
let b3 = b2.slice(1..b2.len() - 1);
drop(b2);
assert_eq!(drop_counter.get(), 0);
drop(b3);
assert_eq!(drop_counter.get(), 1);
}

#[test]
fn arcproj_to_mut() {
let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let drop_counter = SharedAtomicCounter::new();
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_arc_projection(Arc::new(owner), |o| o.as_ref());

// Holding an owner will fail converting to a BytesMut,
// even when the bytes instance has a ref_cnt == 1.
let b1 = b1.try_into_mut().unwrap_err();

// That said, it's still possible, just not cheap.
let bm1: BytesMut = b1.into();
let new_buf = &bm1[..];
assert_eq!(new_buf, &buf[..]);

// `.into::<BytesMut>()` has correctly dropped the owner
assert_eq!(drop_counter.get(), 1);
}

#[test]
fn arcproj_into_vec() {
let drop_counter = SharedAtomicCounter::new();
let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_arc_projection(Arc::new(owner), |o| o.as_ref());

let v1: Vec<u8> = b1.into();
assert_eq!(&v1[..], &buf[..]);
// into() vec will copy out of the owner and drop it
assert_eq!(drop_counter.get(), 1);
}

#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn arcproj_safe_drop_on_as_ref_panic() {
let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let drop_counter = SharedAtomicCounter::new();
let mut owner = OwnedTester::new(buf, drop_counter.clone());
owner.panic_as_ref = true;

let result = panic::catch_unwind(AssertUnwindSafe(|| {
let _ = Bytes::from_arc_projection(Arc::new(owner), |o| o.as_ref());
}));

assert!(result.is_err());
assert_eq!(drop_counter.get(), 1);
}

0 comments on commit 22defe0

Please sign in to comment.