From 274ef0fd00f5794ef008f1dd28df867c6f098ab9 Mon Sep 17 00:00:00 2001 From: hayabusa-cloud Date: Mon, 16 Sep 2024 21:25:32 +0900 Subject: [PATCH] feat: implement Linux 6.6 io-uring basic usages --- buffers.go | 7 + context.go | 38 ---- uring_linux.go | 337 ++++++++++++++++++++++++++----- uring_linux_buffer_rings.go | 169 ++++++++++++++++ uring_linux_buffers.go | 383 ++++++++++++++++++++++++++++++++++++ uring_linux_ops.go | 123 +++++++----- uring_linux_test.go | 2 +- 7 files changed, 917 insertions(+), 142 deletions(-) create mode 100644 uring_linux_buffer_rings.go create mode 100644 uring_linux_buffers.go diff --git a/buffers.go b/buffers.go index 2d33316..a9cf204 100644 --- a/buffers.go +++ b/buffers.go @@ -118,6 +118,13 @@ func NewHugeBuffer() HugeBuffer { return HugeBuffer{} } // NewGiantBuffer returns a new instance of GiantBuffer. func NewGiantBuffer() GiantBuffer { return GiantBuffer{} } +// BufferType is an interface that represents different types of buffers, including +// PicoBuffer, NanoBuffer, MicroBuffer, SmallBuffer, MediumBuffer, LargeBuffer, +// HugeBuffer, and GiantBuffer. Each buffer type is a byte array with a specific size. +type BufferType interface { + PicoBuffer | NanoBuffer | MicroBuffer | SmallBuffer | MediumBuffer | LargeBuffer | HugeBuffer | GiantBuffer +} + // PicoBuffer represents a byte array with size of BufferSizePico type PicoBuffer [BufferSizePico]byte diff --git a/context.go b/context.go index 5699c12..d796953 100644 --- a/context.go +++ b/context.go @@ -58,41 +58,3 @@ func ContextUserdata[T any](ctx context.Context) (ret T) { return } - -type fdGetter interface { - getFD() (fd int) -} - -type fdSetter interface { - setFD(fd int) -} - -type fdCtx struct { - context.Context - fd int -} - -func (ctx *fdCtx) getFD() (fd int) { - return ctx.fd -} - -func (ctx *fdCtx) setFD(fd int) { - ctx.fd = fd -} - -func contextWithFD(parent context.Context, fd int) context.Context { - if fdc, ok := parent.(fdSetter); ok { - fdc.setFD(fd) - return parent - } - - return &fdCtx{Context: parent, fd: fd} -} - -func contextFD(ctx context.Context) int { - if fdc, ok := ctx.(fdGetter); ok { - return fdc.getFD() - } - - return -1 -} diff --git a/uring_linux.go b/uring_linux.go index 3349b36..a71f174 100644 --- a/uring_linux.go +++ b/uring_linux.go @@ -15,9 +15,35 @@ import ( ) const ( - IORING_SETUP_IOPOLL = 1 << 0 - IORING_SETUP_SQPOLL = 1 << 1 - IORING_SETUP_SQ_AFF = 1 << 2 + _ = 1 << (iota + 7) + _ + UringEntriesPico + UringEntriesNano + UringEntriesMicro + UringEntriesSmall + UringEntriesMedium + UringEntriesLarge + UringEntriesHuge +) + +const ( + IORING_SETUP_IOPOLL = 1 << 0 + IORING_SETUP_SQPOLL = 1 << 1 + IORING_SETUP_SQ_AFF = 1 << 2 + IORING_SETUP_CQSIZE = 1 << 3 + IORING_SETUP_CLAMP = 1 << 4 + IORING_SETUP_ATTACH_WQ = 1 << 5 + IORING_SETUP_R_DISABLED = 1 << 6 + IORING_SETUP_SUBMIT_ALL = 1 << 7 + IORING_SETUP_COOP_TASKRUN = 1 << 8 + IORING_SETUP_TASKRUN_FLAG = 1 << 9 + IORING_SETUP_SQE128 = 1 << 10 + IORING_SETUP_CQE32 = 1 << 11 + IORING_SETUP_SINGLE_ISSUER = 1 << 12 + IORING_SETUP_DEFER_TASKRUN = 1 << 13 + IORING_SETUP_NO_MMAP = 1 << 14 + IORING_SETUP_REGISTERED_FD_ONLY = 1 << 15 + IORING_SETUP_NO_SQARRAY = 1 << 16 ) const ( @@ -46,9 +72,12 @@ const ( ) const ( - IORING_OFF_SQ_RING int64 = 0 - IORING_OFF_CQ_RING int64 = 0x8000000 - IORING_OFF_SQES int64 = 0x10000000 + IORING_OFF_SQ_RING int64 = 0 + IORING_OFF_CQ_RING int64 = 0x8000000 + IORING_OFF_SQES int64 = 0x10000000 + IORING_OFF_PBUF_RING = 0x80000000 + IORING_OFF_PBUF_SHIFT = 16 + IORING_OFF_MMAP_MASK = 0xf8000000 ) const ( @@ -121,6 +150,9 @@ const ( IORING_UNREGISTER_PBUF_RING IORING_REGISTER_SYNC_CANCEL IORING_REGISTER_FILE_ALLOC_RANGE + IORING_REGISTER_PBUF_STATUS + IORING_REGISTER_NAPI + IORING_UNREGISTER_NAPI ) const ( @@ -144,13 +176,21 @@ type ioUring struct { cq ioUringCq ringFd int ops []ioUringProbeOp - bufs Buffers + ctxs []ioUringCtx + bufs [][]byte } +type ( + ioUringFd int32 + ioUringUserdata []byte +) + type ioUringCtx struct { - context.Context - op uint8 - flags uint8 + op uint8 + flags uint8 + buf bufferGroupIndex // buf_index or buf_group + fd ioUringFd + userdata ioUringUserdata } func (ctx *ioUringCtx) getOp() (uint8, uint8) { @@ -184,7 +224,7 @@ func newIoUring(entries int, opts ...func(params *ioUringParams)) (*ioUring, err ringSz: params.cqOff.cqes + uint32(unsafe.Sizeof(uint32(0)))*params.cqEntries, }, ringFd: fd, - bufs: Buffers{}, + bufs: [][]byte{}, } b, err := unix.Mmap(uring.ringFd, IORING_OFF_SQ_RING, int(uring.sq.ringSz), unix.PROT_READ|unix.PROT_WRITE|unix.PROT_EXEC, unix.MAP_SHARED|unix.MAP_POPULATE) @@ -212,9 +252,26 @@ func newIoUring(entries int, opts ...func(params *ioUringParams)) (*ioUring, err uring.cq.kOverflow = (*uint32)(unsafe.Pointer(ptr + uintptr(params.cqOff.overflow))) uring.cq.cqes = unsafe.Slice((*ioUringCqe)(unsafe.Pointer(ptr+uintptr(params.cqOff.cqes))), int(params.cqEntries)) + uring.ctxs = make([]ioUringCtx, *uring.cq.kRingEntries) + return uring, nil } +type ioUringProbe struct { + lastOp uint8 + opsLen uint8 + resv uint16 + resv2 [3]uint32 + ops [256]ioUringProbeOp +} + +type ioUringProbeOp struct { + op uint8 + resv uint8 + flags uint16 + resv2 uint32 +} + func (ur *ioUring) registerProbe(probe *ioUringProbe) error { addr, n := uintptr(unsafe.Pointer(probe)), uintptr(len(probe.ops)) _, _, errno := unix.Syscall6(unix.SYS_IO_URING_REGISTER, uintptr(ur.ringFd), IORING_REGISTER_PROBE, addr, n, 0, 0) @@ -232,16 +289,24 @@ func (ur *ioUring) registerProbe(probe *ioUringProbe) error { return nil } -func (ur *ioUring) registerBuffers(n, size int) error { +func (ur *ioUring) registerBuffers(addr unsafe.Pointer, n, size int) error { if ur.bufs != nil && len(ur.bufs) > 0 { panic("io-uring buffers already registered") } if n < 1 || size < 1 || 0 != n&(n-1) || 0 != size&(size-1) { return ErrInvalidParam } - ur.bufs = NewBuffers(n, size) - addr, n := ioVecFromBytesSlice(ur.bufs) - _, _, errno := unix.Syscall6(unix.SYS_IO_URING_REGISTER, uintptr(ur.ringFd), IORING_REGISTER_BUFFERS, addr, uintptr(n), 0, 0) + vectors := make([]ioVec, 0, n) + ur.bufs = make([][]byte, 0, n) + for i := range n { + base := unsafe.Add(addr, i*size) + vectors = append(vectors, ioVec{Base: (*byte)(base), Len: uint64(size)}) + ur.bufs = append(ur.bufs, unsafe.Slice((*byte)(base), size)) + } + data := uintptr(unsafe.Pointer(unsafe.SliceData(vectors))) + reg := ioUringRSrcRegister{nr: uint32(n), data: uint64(data)} + regPtr, regSize := uintptr(unsafe.Pointer(®)), unsafe.Sizeof(reg) + _, _, errno := unix.Syscall6(unix.SYS_IO_URING_REGISTER, uintptr(ur.ringFd), IORING_REGISTER_BUFFERS2, regPtr, regSize, 0, 0) if errno != 0 { return errFromUnixErrno(errno) } @@ -257,8 +322,44 @@ func (ur *ioUring) unregisterBuffers() error { if errno != 0 { return errFromUnixErrno(errno) } - ur.bufs = Buffers{} + ur.bufs = [][]byte{} + + return nil +} + +func (ur *ioUring) registerBufRing(entries int, groupID uint16) (*ioUringBufRing, error) { + if entries < 1 || entries > (1<<15) { + panic("entries must be between 1 and 32768") + } + entries-- + entries |= entries >> 1 + entries |= entries >> 2 + entries |= entries >> 4 + entries |= entries >> 8 + entries++ + s := AlignedMem(entries * int(unsafe.Sizeof(ioUringBuf{}))) + ringAddr := uintptr(unsafe.Pointer(unsafe.SliceData(s))) + r := (*ioUringBufRing)(unsafe.Pointer(ringAddr)) + reg := ioUringBufReg{ + ringAddr: uint64(ringAddr), + ringEntries: uint32(entries), + bgid: groupID, + } + addr := uintptr(unsafe.Pointer(®)) + _, _, errno := unix.Syscall6(unix.SYS_IO_URING_REGISTER, uintptr(ur.ringFd), IORING_REGISTER_PBUF_RING, addr, 1, 0, 0) + if errno != 0 { + return r, errFromUnixErrno(errno) + } + return r, nil +} +func (ur *ioUring) unregisterBufRing(groupID uint16) error { + reg := ioUringBufReg{bgid: groupID} + addr := uintptr(unsafe.Pointer(®)) + _, _, errno := unix.Syscall6(unix.SYS_IO_URING_REGISTER, uintptr(ur.ringFd), IORING_UNREGISTER_PBUF_RING, addr, 1, 0, 0) + if errno != 0 { + return errFromUnixErrno(errno) + } return nil } @@ -285,7 +386,15 @@ func (ur *ioUring) feature(feat uint32) bool { return feat == ur.params.features&feat } -func (ur *ioUring) submit(ctx context.Context, op, flags uint8, fn func(e *ioUringSqe)) error { +func (ur *ioUring) enable() error { + _, _, errno := unix.Syscall6(unix.SYS_IO_URING_REGISTER, uintptr(ur.ringFd), IORING_REGISTER_ENABLE_RINGS, 0, 0, 0, 0) + if errno != 0 { + return errFromUnixErrno(errno) + } + return nil +} + +func (ur *ioUring) directSubmission(ctx context.Context, fn func(e *ioUringSqe)) error { ur.sl.Lock() defer ur.sl.Unlock() @@ -295,11 +404,37 @@ func (ur *ioUring) submit(ctx context.Context, op, flags uint8, fn func(e *ioUri } e := &ur.sq.sqes[t&*ur.sq.kRingMask] - e.opcode = op - e.flags = flags fn(e) - userData := ioUringCtx{Context: ctx, op: op, flags: flags} - e.userData = uint64(uintptr(unsafe.Pointer(&userData))) + c := &ur.ctxs[t&*ur.cq.kRingMask] + c.op = e.opcode + c.flags = e.flags + c.buf = ContextUserdata[bufferGroupIndex](ctx) + c.fd = ContextUserdata[ioUringFd](ctx) + c.userdata = ContextUserdata[ioUringUserdata](ctx) + e.userData = uint64(uintptr(unsafe.Pointer(c))) + *ur.sq.kTail++ + + return nil +} + +func (ur *ioUring) indirectSubmission(ctx context.Context, fn func(e *ioUringSqe)) error { + ur.sl.Lock() + defer ur.sl.Unlock() + + h, t := *ur.sq.kHead, *ur.sq.kTail + if (t+1)&*ur.sq.kRingMask == h { + return ErrTemporarilyUnavailable + } + + e := &ur.sq.sqes[t&*ur.sq.kRingMask] + fn(e) + c := &ur.ctxs[t&*ur.cq.kRingMask] + c.op = e.opcode + c.flags = e.flags + c.buf = ContextUserdata[bufferGroupIndex](ctx) + c.fd = ContextUserdata[ioUringFd](ctx) + c.userdata = ContextUserdata[ioUringUserdata](ctx) + e.userData = uint64(uintptr(unsafe.Pointer(c))) ur.sq.array[t&*ur.sq.kRingMask] = t & *ur.sq.kRingMask *ur.sq.kTail++ @@ -307,26 +442,43 @@ func (ur *ioUring) submit(ctx context.Context, op, flags uint8, fn func(e *ioUri return nil } -func (ur *ioUring) submit3(ctx context.Context, op uint8, flags uint8, fd int, addr uint64, n int) error { - return ur.submit(ctx, op, flags, func(e *ioUringSqe) { +func (ur *ioUring) submit3(ctx context.Context, op uint8, flags uint8, ioprio uint16, fd int, addr uint64, n int) error { + setSqe := func(e *ioUringSqe) { + e.opcode = op + e.flags = flags + e.ioprio = ioprio e.fd = int32(fd) e.addr = addr e.len = uint32(n) - }) + } + if ur.params.flags&IORING_SETUP_NO_SQARRAY == IORING_SETUP_NO_SQARRAY { + return ur.directSubmission(ctx, setSqe) + } + return ur.indirectSubmission(ctx, setSqe) } -func (ur *ioUring) submit6(ctx context.Context, op uint8, flags uint8, fd int, off uint64, addr uint64, n int, uflags uint32) error { - return ur.submit(ctx, op, flags, func(e *ioUringSqe) { +func (ur *ioUring) submit6(ctx context.Context, op uint8, flags uint8, ioprio uint16, fd int, off uint64, addr uint64, n int, uflags uint32) error { + setSqe := func(e *ioUringSqe) { + e.opcode = op + e.flags = flags + e.ioprio = ioprio e.fd = int32(fd) e.off = off e.addr = addr e.len = uint32(n) e.uflags = uflags - }) + } + if ur.params.flags&IORING_SETUP_NO_SQARRAY == IORING_SETUP_NO_SQARRAY { + return ur.directSubmission(ctx, setSqe) + } + return ur.indirectSubmission(ctx, setSqe) } -func (ur *ioUring) submit9(ctx context.Context, op uint8, flags uint8, fd int, off uint64, spliceOffIn uint64, n int, uflags uint32, bufGroup uint16, personality uint16, spliceFdIn int) error { - return ur.submit(ctx, op, flags, func(e *ioUringSqe) { +func (ur *ioUring) submit9(ctx context.Context, op uint8, flags uint8, ioprio uint16, fd int, off uint64, spliceOffIn uint64, n int, uflags uint32, bufGroup uint16, personality uint16, spliceFdIn int) error { + setSqe := func(e *ioUringSqe) { + e.opcode = op + e.flags = flags + e.ioprio = ioprio e.fd = int32(fd) e.off = off e.addr = spliceOffIn @@ -336,18 +488,25 @@ func (ur *ioUring) submit9(ctx context.Context, op uint8, flags uint8, fd int, o e.bufIndex = bufGroup e.personality = personality e.spliceFdIn = int32(spliceFdIn) - }) + } + if ur.params.flags&IORING_SETUP_NO_SQARRAY == IORING_SETUP_NO_SQARRAY { + return ur.directSubmission(ctx, setSqe) + } + return ur.indirectSubmission(ctx, setSqe) } func (ur *ioUring) enter() error { - if atomic.LoadUint32(ur.sq.kFlags)&IORING_SQ_NEED_WAKEUP != 0 { + if atomic.LoadUint32(ur.sq.kFlags)&IORING_SQ_NEED_WAKEUP == IORING_SQ_NEED_WAKEUP { _, err := ioUringEnter(ur.ringFd, uintptr(ur.params.sqEntries), 0, IORING_ENTER_SQ_WAKEUP) if err != nil { return err } } + ur.sl.Lock() + defer ur.sl.Unlock() if (ur.params.flags&IORING_SETUP_SQPOLL == 0) && *ur.sq.kHead != *ur.sq.kTail { - _, err := ioUringEnter(ur.ringFd, uintptr(ur.params.sqEntries), 0, 0) + n := (*ur.sq.kTail - *ur.sq.kHead) & *ur.sq.kRingMask + _, err := ioUringEnter(ur.ringFd, uintptr(n), 0, IORING_ENTER_GETEVENTS) if err != nil { return err } @@ -360,7 +519,8 @@ func (ur *ioUring) poll(n int) error { if ur.params.flags&IORING_SETUP_IOPOLL == 0 { return nil } - _, err := ioUringEnter(ur.ringFd, 0, uintptr(n), IORING_ENTER_GETEVENTS) + submit := (*ur.sq.kTail - *ur.sq.kHead) & *ur.sq.kRingMask + _, err := ioUringEnter(ur.ringFd, uintptr(submit), uintptr(n), IORING_ENTER_GETEVENTS) return err } @@ -384,21 +544,87 @@ func (ur *ioUring) wait() (*ioUringCqe, error) { return nil, ErrTemporarilyUnavailable } -type ioUringProbe struct { - lastOp uint8 - opsLen uint8 - resv uint16 - resv2 [3]uint32 - ops [256]ioUringProbeOp +func (ur *ioUring) cqAdvance(nr uint32) { + if nr == 0 { + return + } + ur.cq.advance(nr) } -type ioUringProbeOp struct { - op uint8 - resv uint8 - flags uint16 - resv2 uint32 +func (ur *ioUring) bufRingInit(br *ioUringBufRing) { + br.tail = 0 } +func (ur *ioUring) bufRingAdd(br *ioUringBufRing, addr uintptr, n int, bid uint16, mask, offset uintptr) { + add := ioUringBufSize * ((uintptr(br.tail) + offset) & mask) + buf := (*ioUringBuf)(unsafe.Add(unsafe.Pointer(br), add)) + buf.addr = uint64(addr) + buf.len = uint32(n) + buf.bid = bid +} + +func (ur *ioUring) bufRingAdvance(br *ioUringBufRing, count int) { + br.tail += uint16(count) +} + +func (ur *ioUring) bufRingAvailable(br *ioUringBufRing, bgid uint16) int { + head, ret := uint16(0), 0 + ret = ur.bufRingHead(bgid, &head) + if ret > 0 { + return ret + } + return int(br.tail - head) +} + +func (ur *ioUring) bufRingHead(groupID uint16, head *uint16) int { + status := ioUringBufStatus{bufGroup: uint32(groupID)} + + ret, _, errno := unix.Syscall6(unix.SYS_IO_URING_REGISTER, uintptr(ur.ringFd), IORING_REGISTER_PBUF_STATUS, uintptr(unsafe.Pointer(&status)), 1, 0, 0) + if ret != 0 { + return int(errno) + } + *head = uint16(status.head) + return 0 +} + +func (ur *ioUring) bufRingCQAdvance(br *ioUringBufRing, count int) { + ur.bufRingAdvance(br, count) + ur.cqAdvance(uint32(count)) +} + +type ioUringRSrcRegister struct { + nr uint32 + resv uint32 + resv2 uint64 + data uint64 + tags uint64 +} + +type ioUringBufReg struct { + ringAddr uint64 + ringEntries uint32 + bgid uint16 + pad uint16 + _ [3]uint64 +} + +type ioUringBufStatus struct { + bufGroup uint32 + head uint32 + _ [8]uint32 +} + +type ioUringBuf struct { + addr uint64 + len uint32 + bid uint16 + tail uint16 +} + +var ioUringBufSize = unsafe.Sizeof(ioUringBuf{}) + +type ioUringBufRing ioUringBuf + type ioUringSq struct { kHead *uint32 kTail *uint32 @@ -439,15 +665,19 @@ type ioUringCq struct { ringSz uint32 } +func (cq *ioUringCq) advance(nr uint32) { + atomic.AddUint32(cq.kTail, nr) +} + type ioUringCqe struct { userData uint64 res int32 flags uint32 } -func (cqe *ioUringCqe) Context() context.Context { +func (cqe *ioUringCqe) context() *ioUringCtx { if cqe.userData == 0 { - return context.Background() + return nil } return (*ioUringCtx)(unsafe.Pointer(uintptr(cqe.userData))) } @@ -488,10 +718,20 @@ type ioUringParams struct { } var ( - ioUringDefaultParams = &ioUringParams{} + ioUringDefaultParams = &ioUringParams{} + ioUringDisabledOptions = func(params *ioUringParams) { + params.flags |= IORING_SETUP_R_DISABLED + } + ioUringNoSQArrayOptions = func(params *ioUringParams) { + params.flags |= IORING_SETUP_NO_SQARRAY + } ioUringIoPollOptions = func(params *ioUringParams) { params.flags |= IORING_SETUP_IOPOLL + params.flags &= ^uint32(IORING_SETUP_COOP_TASKRUN) + params.flags &= ^uint32(IORING_SETUP_TASKRUN_FLAG) + params.flags &= ^uint32(IORING_SETUP_DEFER_TASKRUN) } + // sq poll mode is not recommended ioUringSqPollOptions = func(params *ioUringParams) { params.flags |= IORING_SETUP_SQPOLL | IORING_SETUP_SQ_AFF params.sqThreadCPU = ioUringDefaultSqThreadCPU @@ -510,8 +750,7 @@ func ioUringSetup(entries uint32, params *ioUringParams) (fd int, err error) { err = errFromUnixErrno(errno) return } - fd, err = int(r1), nil - return + return int(r1), nil } func ioUringEnter(fd int, toSubmit uintptr, minComplete uintptr, flags uintptr) (n int, err error) { diff --git a/uring_linux_buffer_rings.go b/uring_linux_buffer_rings.go new file mode 100644 index 0000000..3acb0d3 --- /dev/null +++ b/uring_linux_buffer_rings.go @@ -0,0 +1,169 @@ +// ©Hayabusa Cloud Co., Ltd. 2024. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +//go:build linux + +package sox + +import ( + "sync/atomic" + "unsafe" +) + +func newUringBufferRings() *uringBufferRings { + ret := uringBufferRings{ + rings: make([]*ioUringBufRing, 0), + counts: make([]uintptr, 0), + masks: make([]uintptr, 0), + } + return &ret +} + +type uringBufferRings struct { + rings []*ioUringBufRing + locks []Spinlock + counts []uintptr + masks []uintptr +} + +func (rings *uringBufferRings) registerBuffers(ur *ioUring, b *uringProvideBuffers) error { + for i := range b.gn { + gid := uint16(i) + b.gidOffset + ptr := unsafe.Add(b.ptr, b.size*b.n*i) + br, err := rings.registerGroup(ur, b.n, gid, ptr, b.size) + if err != nil { + return err + } + rings.rings = append(rings.rings, br) + rings.locks = append(rings.locks, Spinlock{}) + rings.counts = append(rings.counts, uintptr(b.n)) + rings.masks = append(rings.masks, uintptr(b.n-1)) + } + return nil +} + +func (rings *uringBufferRings) registerGroups(ur *ioUring, g *uringProvideBufferGroups) error { + for i := range g.scale { + gid := uint16(i)*bufferGroupIndexEnd + bufferGroupIndexPico + g.gidOffset + ptr := unsafe.Pointer(&g.picoBuffers[i*bufferNumPico]) + br, err := rings.registerGroup(ur, bufferNumPico, gid, ptr, BufferSizePico) + if err != nil { + return err + } + rings.rings = append(rings.rings, br) + rings.locks = append(rings.locks, Spinlock{}) + rings.counts = append(rings.counts, bufferNumPico) + rings.masks = append(rings.masks, bufferNumPico-1) + + gid = uint16(i)*bufferGroupIndexEnd + bufferGroupIndexNano + g.gidOffset + ptr = unsafe.Pointer(&g.nanoBuffers[i*bufferNumNano]) + br, err = rings.registerGroup(ur, bufferNumNano, gid, ptr, BufferSizeNano) + if err != nil { + return err + } + rings.rings = append(rings.rings, br) + rings.locks = append(rings.locks, Spinlock{}) + rings.counts = append(rings.counts, bufferNumNano) + rings.masks = append(rings.masks, bufferNumNano-1) + + gid = uint16(i)*bufferGroupIndexEnd + bufferGroupIndexMicro + g.gidOffset + ptr = unsafe.Pointer(&g.microBuffers[i*bufferNumMicro]) + br, err = rings.registerGroup(ur, bufferNumMicro, gid, ptr, BufferSizeMicro) + if err != nil { + return err + } + rings.rings = append(rings.rings, br) + rings.locks = append(rings.locks, Spinlock{}) + rings.counts = append(rings.counts, bufferNumMicro) + rings.masks = append(rings.masks, bufferNumMicro-1) + + gid = uint16(i)*bufferGroupIndexEnd + bufferGroupIndexSmall + g.gidOffset + ptr = unsafe.Pointer(&g.smallBuffers[i*bufferNumSmall]) + br, err = rings.registerGroup(ur, bufferNumSmall, gid, ptr, BufferSizeSmall) + if err != nil { + return err + } + rings.rings = append(rings.rings, br) + rings.locks = append(rings.locks, Spinlock{}) + rings.counts = append(rings.counts, bufferNumSmall) + rings.masks = append(rings.masks, bufferNumSmall-1) + + gid = uint16(i)*bufferGroupIndexEnd + bufferGroupIndexMedium + g.gidOffset + ptr = unsafe.Pointer(&g.mediumBuffers[i*bufferNumMedium]) + br, err = rings.registerGroup(ur, bufferNumMedium, gid, ptr, BufferSizeMedium) + if err != nil { + return err + } + rings.rings = append(rings.rings, br) + rings.locks = append(rings.locks, Spinlock{}) + rings.counts = append(rings.counts, bufferNumMedium) + rings.masks = append(rings.masks, bufferNumMedium-1) + + gid = uint16(i)*bufferGroupIndexEnd + bufferGroupIndexLarge + g.gidOffset + ptr = unsafe.Pointer(&g.largeBuffers[i*bufferNumLarge]) + br, err = rings.registerGroup(ur, bufferNumLarge, gid, ptr, BufferSizeLarge) + if err != nil { + return err + } + rings.rings = append(rings.rings, br) + rings.locks = append(rings.locks, Spinlock{}) + rings.counts = append(rings.counts, bufferNumLarge) + rings.masks = append(rings.masks, bufferNumLarge-1) + + gid = uint16(i)*bufferGroupIndexEnd + bufferGroupIndexHuge + g.gidOffset + ptr = unsafe.Pointer(&g.hugeBuffers[i*bufferNumHuge]) + br, err = rings.registerGroup(ur, bufferNumHuge, gid, ptr, BufferSizeHuge) + if err != nil { + return err + } + rings.rings = append(rings.rings, br) + rings.locks = append(rings.locks, Spinlock{}) + rings.counts = append(rings.counts, bufferNumHuge) + rings.masks = append(rings.masks, bufferNumHuge-1) + + gid = uint16(i)*bufferGroupIndexEnd + bufferGroupIndexGiant + g.gidOffset + ptr = unsafe.Pointer(&g.giantBuffers[i*bufferNumGiant]) + br, err = rings.registerGroup(ur, bufferNumGiant, gid, ptr, BufferSizeGiant) + if err != nil { + return err + } + rings.rings = append(rings.rings, br) + rings.locks = append(rings.locks, Spinlock{}) + rings.counts = append(rings.counts, bufferNumGiant) + rings.masks = append(rings.masks, bufferNumGiant-1) + } + return nil +} + +func (rings *uringBufferRings) registerGroup(ur *ioUring, entries int, gid uint16, ptr unsafe.Pointer, size int) (*ioUringBufRing, error) { + r, err := ur.registerBufRing(entries, gid) + if err != nil { + return nil, err + } + ur.bufRingInit(r) + mask := entries - 1 + base := uintptr(ptr) + for i := range entries { + ur.bufRingAdd(r, base+uintptr(i*size), size, uint16(i), uintptr(mask), uintptr(i)) + } + return r, nil +} + +func (rings *uringBufferRings) provide(ur *ioUring, gidOffset, group uint16, addr unsafe.Pointer, size int, index uint32) { + br := rings.rings[group-gidOffset] + offset := atomic.AddUintptr(&rings.counts[group-gidOffset], 1) + ur.bufRingAdd(br, uintptr(addr), size, uint16(index), rings.masks[group-gidOffset], offset-1) +} + +func (rings *uringBufferRings) advance(ur *ioUring) { + for i, r := range rings.rings { + if r == nil { + break + } + rings.locks[i].Lock() + ur.bufRingAdvance(r, int(rings.counts[i])) + rings.counts[i] = 0 + rings.locks[i].Unlock() + } +} diff --git a/uring_linux_buffers.go b/uring_linux_buffers.go new file mode 100644 index 0000000..d1d63aa --- /dev/null +++ b/uring_linux_buffers.go @@ -0,0 +1,383 @@ +// ©Hayabusa Cloud Co., Ltd. 2024. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +//go:build linux + +package sox + +import ( + "context" + "math" + "unsafe" +) + +type bufferGroupIndex uint16 + +const ( + bufferGroupIndexPico = iota + bufferGroupIndexNano + bufferGroupIndexMicro + bufferGroupIndexSmall + bufferGroupIndexMedium + bufferGroupIndexLarge + bufferGroupIndexHuge + bufferGroupIndexGiant + bufferGroupIndexEnd + + bufferGroupIndexMask = bufferGroupIndexEnd - 1 +) + +const ( + bufferNumPico = 1 << (15 - iota/2) + bufferNumNano + bufferNumMicro + bufferNumSmall + bufferNumMedium = 1 << (21 - iota*2) + bufferNumLarge + bufferNumHuge + bufferNumGiant +) + +const ( + registerBufferNum = UringEntriesLarge + registerBufferSize = BufferSizeHuge + registerBufferDefaultMem = 1 << 24 +) + +type ( + + // RegisterBuffer represents a fixed-size buffer used for registering with the I/O ring. + RegisterBuffer [registerBufferSize]byte + + // RegisterBufferPool represents a pool of fixed-size buffers used for registering with the I/O ring. + RegisterBufferPool = BoundedPool[RegisterBuffer] +) + +// NewRegisterBufferPool creates a new instance of RegisterBufferPool with the specified capacity. +func NewRegisterBufferPool(capacity int) *RegisterBufferPool { + return NewBoundedPool[RegisterBuffer](capacity) +} + +func newUringProvideBuffers(size, n int) *uringProvideBuffers { + if size < 1 || size > (1<<24) { + panic("size must be between 1 and 16777216") + } + if n < 1 || n > (1<<24) { + panic("n must be between 1 and 16777216") + } + size-- + size |= size >> 1 + size |= size >> 2 + size |= size >> 4 + size |= size >> 8 + size |= size >> 16 + size++ + mask := n - 1 + mask |= mask >> 1 + mask |= mask >> 2 + mask |= mask >> 4 + mask |= mask >> 8 + mask |= mask >> 16 + n = mask + 1 + mask |= math.MaxInt16 + gn := 1 + if n > (1 << 15) { + gn, n = n>>15, 1<<15 + } + mem := AlignedMem(int(size) * n) + ptr := unsafe.Pointer(unsafe.SliceData(mem)) + return &uringProvideBuffers{ + gn: gn, + n: n, + gMask: gn - 1, + mask: mask, + mem: mem, + ptr: ptr, + size: size, + } +} + +type uringProvideBuffers struct { + gn int + n int + gMask int + mask int + mem []byte + ptr unsafe.Pointer + size int + gidOffset uint16 +} + +func (g *uringProvideBuffers) setGIDOffset(offset int) { + g.gidOffset = uint16(offset) +} + +func (g *uringProvideBuffers) register(ctx context.Context, ur *ioUring, flags uint8) error { + for i := range g.gn { + addr := unsafe.Add(g.ptr, g.size*g.n*i) + gid := uint16(i) + g.gidOffset + err := ur.provideBuffers(ctx, flags, g.n, 0, addr, g.size, gid) + if err != nil { + return err + } + } + return nil +} + +func (g *uringProvideBuffers) provide(ctx context.Context, ur *ioUring, flags uint8, group uint16, data []byte, index uint32) error { + ptr, n := unsafe.Pointer(unsafe.SliceData(data)), len(data) + err := ur.provideBuffers(ctx, flags, 1, int(index), ptr, n, group) + if err != nil { + return err + } + return nil +} + +func (g *uringProvideBuffers) bufGroup(f PollFd) uint16 { + return uint16(f.Fd()&g.gMask) + g.gidOffset +} + +func (g *uringProvideBuffers) buf(bufGroup bufferGroupIndex, bufIndex uint32) []byte { + i := (int(bufGroup)-int(g.gidOffset))*g.n + int(bufIndex) + return unsafe.Slice((*byte)(unsafe.Add(g.ptr, g.size*i)), g.size) +} + +func (g *uringProvideBuffers) data(f PollFd, bufIndex uint32, length int) []byte { + i := (f.Fd()&g.gMask)*g.n + int(bufIndex) + return unsafe.Slice((*byte)(unsafe.Add(g.ptr, g.size*i)), length) +} + +func newUringBufferGroups(scale int) *uringProvideBufferGroups { + if scale < 1 || scale > (1<<12) { + panic("scale must be between 1 and 4096") + } + mask := scale - 1 + mask |= mask >> 1 + mask |= mask >> 2 + mask |= mask >> 4 + mask |= mask >> 8 + scale = mask + 1 + + pico := AlignedMem(BufferSizePico * bufferNumPico * scale) + nano := AlignedMem(BufferSizeNano * bufferNumNano * scale) + micro := AlignedMem(BufferSizeMicro * bufferNumMicro * scale) + small := AlignedMem(BufferSizeSmall * bufferNumSmall * scale) + medium := AlignedMem(BufferSizeMedium * bufferNumMedium * scale) + large := AlignedMem(BufferSizeLarge * bufferNumLarge * scale) + huge := AlignedMem(BufferSizeHuge * bufferNumHuge * scale) + giant := AlignedMem(BufferSizeGiant * bufferNumGiant * scale) + + return &uringProvideBufferGroups{ + scale: scale, + scaleMask: mask, + + picoMem: pico, + nanoMem: nano, + microMem: micro, + smallMem: small, + mediumMem: medium, + largeMem: large, + hugeMem: huge, + giantMem: giant, + + picoBuffers: sliceOfPicoArray(pico, 0, len(pico)/BufferSizePico), + nanoBuffers: sliceOfNanoArray(nano, 0, len(nano)/BufferSizeNano), + microBuffers: sliceOfMicroArray(micro, 0, len(micro)/BufferSizeMicro), + smallBuffers: sliceOfSmallArray(small, 0, len(small)/BufferSizeSmall), + mediumBuffers: sliceOfMediumArray(medium, 0, len(medium)/BufferSizeMedium), + largeBuffers: sliceOfLargeArray(large, 0, len(large)/BufferSizeLarge), + hugeBuffers: sliceOfHugeArray(huge, 0, len(huge)/BufferSizeHuge), + giantBuffers: sliceOfGiantArray(giant, 0, len(giant)/BufferSizeGiant), + } +} + +type uringProvideBufferGroups struct { + scale int + scaleMask int + + picoMem []byte + nanoMem []byte + microMem []byte + smallMem []byte + mediumMem []byte + largeMem []byte + hugeMem []byte + giantMem []byte + + _ [0][1]byte + picoBuffers []PicoBuffer + nanoBuffers []NanoBuffer + microBuffers []MicroBuffer + smallBuffers []SmallBuffer + mediumBuffers []MediumBuffer + largeBuffers []LargeBuffer + hugeBuffers []HugeBuffer + giantBuffers []GiantBuffer + + gidOffset uint16 +} + +func (g *uringProvideBufferGroups) setGIDOffset(offset int) { + g.gidOffset = uint16(offset) +} + +func (g *uringProvideBufferGroups) register(ctx context.Context, ur *ioUring, flags uint8) error { + for i := range g.scale { + addr := unsafe.Pointer(&g.picoBuffers[i*bufferNumPico]) + gid := uint16(i*bufferGroupIndexEnd+bufferGroupIndexPico) + g.gidOffset + err := ur.provideBuffers(ctx, flags, bufferNumPico, 0, addr, BufferSizePico, gid) + if err != nil { + return err + } + addr = unsafe.Pointer(&g.nanoBuffers[i*bufferNumNano]) + gid = uint16(i*bufferGroupIndexEnd+bufferGroupIndexNano) + g.gidOffset + err = ur.provideBuffers(ctx, flags, bufferNumNano, 0, addr, BufferSizeNano, gid) + if err != nil { + return err + } + addr = unsafe.Pointer(&g.microBuffers[i*bufferNumMicro]) + gid = uint16(i*bufferGroupIndexEnd+bufferGroupIndexMicro) + g.gidOffset + err = ur.provideBuffers(ctx, flags, bufferNumMicro, 0, addr, BufferSizeMicro, gid) + if err != nil { + return err + } + addr = unsafe.Pointer(&g.smallBuffers[i*bufferNumSmall]) + gid = uint16(i*bufferGroupIndexEnd+bufferGroupIndexSmall) + g.gidOffset + err = ur.provideBuffers(ctx, flags, bufferNumSmall, 0, addr, BufferSizeSmall, gid) + if err != nil { + return err + } + addr = unsafe.Pointer(&g.mediumBuffers[i*bufferNumMedium]) + gid = uint16(i*bufferGroupIndexEnd+bufferGroupIndexMedium) + g.gidOffset + err = ur.provideBuffers(ctx, flags, bufferNumMedium, 0, addr, BufferSizeMedium, gid) + if err != nil { + return err + } + addr = unsafe.Pointer(&g.largeBuffers[i*bufferNumLarge]) + gid = uint16(i*bufferGroupIndexEnd+bufferGroupIndexLarge) + g.gidOffset + err = ur.provideBuffers(ctx, flags, bufferNumLarge, 0, addr, BufferSizeLarge, gid) + if err != nil { + return err + } + addr = unsafe.Pointer(&g.hugeBuffers[i*bufferNumHuge]) + gid = uint16(i*bufferGroupIndexEnd+bufferGroupIndexHuge) + g.gidOffset + err = ur.provideBuffers(ctx, flags, bufferNumHuge, 0, addr, BufferSizeHuge, gid) + if err != nil { + return err + } + addr = unsafe.Pointer(&g.giantBuffers[i*bufferNumGiant]) + gid = uint16(i*bufferGroupIndexEnd+bufferGroupIndexGiant) + g.gidOffset + err = ur.provideBuffers(ctx, flags, bufferNumGiant, 0, addr, BufferSizeGiant, gid) + if err != nil { + return err + } + } + return nil +} + +func (g *uringProvideBufferGroups) provide(ctx context.Context, ur *ioUring, flags uint8, group uint16, data []byte, index uint32) error { + ptr, n := unsafe.Pointer(unsafe.SliceData(data)), len(data) + err := ur.provideBuffers(ctx, flags, 1, int(index), ptr, n, group) + if err != nil { + return err + } + return nil +} + +func (g *uringProvideBufferGroups) bufGroup(f PollFd) uint16 { + offset := uint16(f.Fd() & g.scaleMask) + return bufferGroupIndexEnd*offset + bufferGroupIndexMedium + g.gidOffset +} + +func (g *uringProvideBufferGroups) bufGroupBySize(f PollFd, size int) uint16 { + offset := uint16(f.Fd() & g.scaleMask) + if size <= BufferSizePico { + return bufferGroupIndexEnd*offset + bufferGroupIndexPico + g.gidOffset + } else if size <= BufferSizeNano { + return bufferGroupIndexEnd*offset + bufferGroupIndexNano + g.gidOffset + } else if size <= BufferSizeMicro { + return bufferGroupIndexEnd*offset + bufferGroupIndexMicro + g.gidOffset + } else if size <= BufferSizeSmall { + return bufferGroupIndexEnd*offset + bufferGroupIndexSmall + g.gidOffset + } else if size <= BufferSizeMedium { + return bufferGroupIndexEnd*offset + bufferGroupIndexMedium + g.gidOffset + } else if size <= BufferSizeLarge { + return bufferGroupIndexEnd*offset + bufferGroupIndexLarge + g.gidOffset + } else if size <= BufferSizeHuge { + return bufferGroupIndexEnd*offset + bufferGroupIndexHuge + g.gidOffset + } else if size <= BufferSizeGiant { + return bufferGroupIndexEnd*offset + bufferGroupIndexGiant + g.gidOffset + } + return bufferGroupIndexEnd*(offset+1) + g.gidOffset +} + +func (g *uringProvideBufferGroups) bufGroupByData(f PollFd, b []byte) uint16 { + if b == nil { + return g.bufGroup(f) + } + return g.bufGroupBySize(f, len(b)) +} + +func (g *uringProvideBufferGroups) buf(bufGroup bufferGroupIndex, bufIndex uint32) []byte { + switch (uint16(bufGroup) - g.gidOffset) & bufferGroupIndexMask { + case bufferGroupIndexPico: + i := (uint32(bufGroup)-uint32(g.gidOffset))/bufferGroupIndexEnd*bufferNumPico + bufIndex + return g.picoBuffers[i][:] + case bufferGroupIndexNano: + i := (uint32(bufGroup)-uint32(g.gidOffset))/bufferGroupIndexEnd*bufferNumNano + bufIndex + return g.nanoBuffers[i][:] + case bufferGroupIndexMicro: + i := (uint32(bufGroup)-uint32(g.gidOffset))/bufferGroupIndexEnd*bufferNumMicro + bufIndex + return g.microBuffers[i][:] + case bufferGroupIndexSmall: + i := (uint32(bufGroup)-uint32(g.gidOffset))/bufferGroupIndexEnd*bufferNumSmall + bufIndex + return g.smallBuffers[i][:] + case bufferGroupIndexMedium: + i := (uint32(bufGroup)-uint32(g.gidOffset))/bufferGroupIndexEnd*bufferNumMedium + bufIndex + return g.mediumBuffers[i][:] + case bufferGroupIndexLarge: + i := (uint32(bufGroup)-uint32(g.gidOffset))/bufferGroupIndexEnd*bufferNumLarge + bufIndex + return g.largeBuffers[i][:] + case bufferGroupIndexHuge: + i := (uint32(bufGroup)-uint32(g.gidOffset))/bufferGroupIndexEnd*bufferNumHuge + bufIndex + return g.hugeBuffers[i][:] + case bufferGroupIndexGiant: + i := (uint32(bufGroup)-uint32(g.gidOffset))/bufferGroupIndexEnd*bufferNumGiant + bufIndex + return g.giantBuffers[i][:] + default: + return []byte{} + } +} + +func (g *uringProvideBufferGroups) picoData(f PollFd, bufIndex uint32, length int) []byte { + return g.picoBuffers[(f.Fd()&g.scaleMask)*bufferNumPico+int(bufIndex)][:length] +} + +func (g *uringProvideBufferGroups) nanoData(f PollFd, bufIndex uint32, length int) []byte { + return g.nanoBuffers[(f.Fd()&g.scaleMask)*bufferNumNano+int(bufIndex)][:length] +} + +func (g *uringProvideBufferGroups) microData(f PollFd, bufIndex uint32, length int) []byte { + return g.microBuffers[(f.Fd()&g.scaleMask)*bufferNumMicro+int(bufIndex)][:length] +} + +func (g *uringProvideBufferGroups) smallData(f PollFd, bufIndex uint32, length int) []byte { + return g.smallBuffers[(f.Fd()&g.scaleMask)*bufferNumSmall+int(bufIndex)][:length] +} + +func (g *uringProvideBufferGroups) mediumData(f PollFd, bufIndex uint32, length int) []byte { + return g.mediumBuffers[(f.Fd()&g.scaleMask)*bufferNumMedium+int(bufIndex)][:length] +} + +func (g *uringProvideBufferGroups) largeData(f PollFd, bufIndex uint32, length int) []byte { + return g.largeBuffers[(f.Fd()&g.scaleMask)*bufferNumLarge+int(bufIndex)][:length] +} + +func (g *uringProvideBufferGroups) hugeData(f PollFd, bufIndex uint32, length int) []byte { + return g.hugeBuffers[(f.Fd()&g.scaleMask)*bufferNumHuge+int(bufIndex)][:length] +} + +func (g *uringProvideBufferGroups) giantData(f PollFd, bufIndex uint32, length int) []byte { + return g.giantBuffers[(f.Fd()&g.scaleMask)*bufferNumGiant+int(bufIndex)][:length] +} diff --git a/uring_linux_ops.go b/uring_linux_ops.go index 8a88e4c..bf1dfa0 100644 --- a/uring_linux_ops.go +++ b/uring_linux_ops.go @@ -72,7 +72,7 @@ const ( ) func (ur *ioUring) nop(ctx context.Context, flags uint8, fd int) error { - return ur.submit3(contextWithFD(ctx, fd), IORING_OP_NOP, flags, fd, 0, 0) + return ur.submit3(ContextWithUserdata(ctx, ioUringFd(fd)), IORING_OP_NOP, flags, 0, fd, 0, 0) } func (ur *ioUring) readv(ctx context.Context, flags uint8, fd int, iov [][]byte) error { @@ -82,7 +82,7 @@ func (ur *ioUring) readv(ctx context.Context, flags uint8, fd int, iov [][]byte) opcode := IORING_OP_READV addr, n := ioVecFromBytesSlice(iov) - return ur.submit6(contextWithFD(ctx, fd), opcode, flags, fd, 0, uint64(addr), n, unix.MSG_WAITALL) + return ur.submit6(ContextWithUserdata(ctx, ioUringFd(fd)), opcode, flags, 0, fd, 0, uint64(addr), n, unix.MSG_WAITALL) } func (ur *ioUring) writev(ctx context.Context, flags uint8, fd int, iov [][]byte) error { @@ -93,11 +93,11 @@ func (ur *ioUring) writev(ctx context.Context, flags uint8, fd int, iov [][]byte opcode := IORING_OP_WRITEV addr, n := ioVecFromBytesSlice(iov) - return ur.submit3(contextWithFD(ctx, fd), opcode, flags, fd, uint64(addr), n) + return ur.submit3(ContextWithUserdata(ctx, ioUringFd(fd)), opcode, flags, 0, fd, uint64(addr), n) } func (ur *ioUring) fsync(ctx context.Context, flags uint8, fd int) error { - return ur.submit3(contextWithFD(ctx, fd), IORING_OP_FSYNC, flags, fd, 0, 0) + return ur.submit3(ContextWithUserdata(ctx, ioUringFd(fd)), IORING_OP_FSYNC, flags, 0, fd, 0, 0) } func (ur *ioUring) readFixed(ctx context.Context, flags uint8, fd int, i int) (buf []byte, err error) { @@ -107,7 +107,7 @@ func (ur *ioUring) readFixed(ctx context.Context, flags uint8, fd int, i int) (b opcode := IORING_OP_READ_FIXED addr := uint64(uintptr(unsafe.Pointer(unsafe.SliceData(ur.bufs[i])))) - return ur.bufs[i], ur.submit9(contextWithFD(ctx, fd), opcode, flags, fd, 0, addr, len(ur.bufs[i]), unix.MSG_WAITALL, uint16(i), 0, 0) + return ur.bufs[i], ur.submit9(ContextWithUserdata(ctx, ioUringFd(fd)), opcode, flags, 0, fd, 0, addr, len(ur.bufs[i]), unix.MSG_WAITALL, uint16(i), 0, 0) } func (ur *ioUring) writeFixed(ctx context.Context, flags uint8, fd int, i int, n int) error { @@ -117,22 +117,22 @@ func (ur *ioUring) writeFixed(ctx context.Context, flags uint8, fd int, i int, n opcode := IORING_OP_WRITE_FIXED addr := uint64(uintptr(unsafe.Pointer(unsafe.SliceData(ur.bufs[i])))) - return ur.submit9(contextWithFD(ctx, fd), opcode, flags, fd, 0, addr, n, 0, uint16(i), 0, 0) + return ur.submit9(ContextWithUserdata(ctx, ioUringFd(fd)), opcode, flags, 0, fd, 0, addr, n, 0, uint16(i), 0, 0) } func (ur *ioUring) pollAdd(ctx context.Context, flags uint8, fd int, how int, events int) error { - return ur.submit6(ctx, IORING_OP_POLL_ADD, flags, fd, 0, 0, how, uint32(events)) + return ur.submit6(ctx, IORING_OP_POLL_ADD, flags, 0, fd, 0, 0, how, uint32(events)) } func (ur *ioUring) pollRemove(ctx context.Context, flags uint8) error { - return ur.submit3(ctx, IORING_OP_POLL_REMOVE, flags, 0, 0, 0) + return ur.submit3(ctx, IORING_OP_POLL_REMOVE, flags, 0, 0, 0, 0) } func (ur *ioUring) syncFileRange(ctx context.Context, flags uint8, fd int, off int64, n int, uflags int) error { - return ur.submit6(ctx, IORING_OP_SYNC_FILE_RANGE, flags, fd, uint64(off), 0, n, uint32(uflags)) + return ur.submit6(ctx, IORING_OP_SYNC_FILE_RANGE, flags, 0, fd, uint64(off), 0, n, uint32(uflags)) } -func (ur *ioUring) sendmsg(ctx context.Context, flags uint8, fd int, buffers [][]byte, oob []byte, to unix.Sockaddr) error { +func (ur *ioUring) sendmsg(ctx context.Context, flags uint8, ioprio uint16, fd int, buffers [][]byte, oob []byte, to unix.Sockaddr) error { saPtr, saN, err := unsafe.Pointer(uintptr(0)), 0, error(nil) if to != nil { saPtr, saN, err = sockaddr(to) @@ -155,10 +155,10 @@ func (ur *ioUring) sendmsg(ctx context.Context, flags uint8, fd int, buffers [][ msg.Controllen = uint64(len(oob)) } - return ur.submit6(contextWithFD(ctx, fd), opcode, flags, fd, 0, uint64(addr), 1, 0) + return ur.submit6(ContextWithUserdata(ctx, ioUringFd(fd)), opcode, flags, ioprio, fd, 0, uint64(addr), 1, 0) } -func (ur *ioUring) recvmsg(ctx context.Context, flags uint8, fd int, buffers [][]byte, oob []byte) error { +func (ur *ioUring) recvmsg(ctx context.Context, flags uint8, ioprio uint16, fd int, buffers [][]byte, oob []byte) error { from := unix.RawSockaddrAny{} opcode := IORING_OP_RECVMSG addr, n := ioVecFromBytesSlice(buffers) @@ -175,7 +175,7 @@ func (ur *ioUring) recvmsg(ctx context.Context, flags uint8, fd int, buffers [][ msg.Controllen = uint64(len(oob)) } - return ur.submit6(contextWithFD(ctx, fd), opcode, flags, fd, 0, uint64(addr), 1, unix.MSG_WAITALL) + return ur.submit6(ContextWithUserdata(ctx, ioUringFd(fd)), opcode, flags, ioprio, fd, 0, uint64(addr), 1, unix.MSG_WAITALL) } const ( @@ -192,32 +192,38 @@ const ( func (ur *ioUring) timeout(ctx context.Context, flags uint8, cnt int, ts *unix.Timespec, uflags int) error { addr := uint64(uintptr(unsafe.Pointer(ts))) - return ur.submit6(ctx, IORING_OP_TIMEOUT, flags, 0, uint64(cnt), addr, 1, uint32(uflags)) + return ur.submit6(ctx, IORING_OP_TIMEOUT, flags, 0, 0, uint64(cnt), addr, 1, uint32(uflags)) } func (ur *ioUring) timeoutUpdate(ctx context.Context, flags uint8, userData unsafe.Pointer, ts *unix.Timespec, uflags int) error { addr := uint64(uintptr(userData)) addr2 := uint64(uintptr(unsafe.Pointer(ts))) - return ur.submit6(ctx, IORING_OP_TIMEOUT_REMOVE, flags, 0, addr2, addr, 0, uint32(uflags|IORING_TIMEOUT_UPDATE)) + return ur.submit6(ctx, IORING_OP_TIMEOUT_REMOVE, flags, 0, 0, addr2, addr, 0, uint32(uflags|IORING_TIMEOUT_UPDATE)) } func (ur *ioUring) timeoutRemove(ctx context.Context, flags uint8, userData unsafe.Pointer, uflags int) error { addr := uint64(uintptr(userData)) - return ur.submit6(ctx, IORING_OP_TIMEOUT_REMOVE, flags, 0, 0, addr, 0, uint32(uflags)) + return ur.submit6(ctx, IORING_OP_TIMEOUT_REMOVE, flags, 0, 0, 0, addr, 0, uint32(uflags)) } -func (ur *ioUring) accept(ctx context.Context, flags uint8, fd int) error { - return ur.submit6(contextWithFD(ctx, fd), IORING_OP_ACCEPT, flags, fd, 0, 0, 0, unix.SOCK_NONBLOCK|unix.SOCK_CLOEXEC) +const ( + IORING_ACCEPT_MULTISHOT = 1 << 0 + IORING_ACCEPT_DONTWAIT = 1 << 1 + IORING_ACCEPT_POLL_FIRST = 1 << 2 +) + +func (ur *ioUring) accept(ctx context.Context, flags uint8, ioprio uint16, fd int) error { + return ur.submit6(ContextWithUserdata(ctx, ioUringFd(fd)), IORING_OP_ACCEPT, flags, ioprio, fd, 0, 0, 0, unix.SOCK_NONBLOCK|unix.SOCK_CLOEXEC) } func (ur *ioUring) asyncCancel(ctx context.Context, flags uint8, userdata *ioUringCtx) error { addr := uint64(uintptr(unsafe.Pointer(userdata))) - return ur.submit3(ctx, IORING_OP_ASYNC_CANCEL, flags, 0, addr, 0) + return ur.submit3(ctx, IORING_OP_ASYNC_CANCEL, flags, 0, 0, addr, 0) } func (ur *ioUring) linkTimeout(ctx context.Context, flags uint8, ts *unix.Timespec, uflags int) error { addr := uint64(uintptr(unsafe.Pointer(ts))) - return ur.submit6(ctx, IORING_OP_LINK_TIMEOUT, flags, 0, 0, addr, 1, uint32(uflags)) + return ur.submit6(ctx, IORING_OP_LINK_TIMEOUT, flags, 0, 0, 0, addr, 1, uint32(uflags)) } func (ur *ioUring) connect(ctx context.Context, flags uint8, fd int, sa unix.Sockaddr) error { @@ -226,11 +232,11 @@ func (ur *ioUring) connect(ctx context.Context, flags uint8, fd int, sa unix.Soc return err } - return ur.submit6(contextWithFD(ctx, fd), IORING_OP_CONNECT, flags, fd, uint64(n), uint64(uintptr(ptr)), 0, 0) + return ur.submit6(ContextWithUserdata(ctx, ioUringFd(fd)), IORING_OP_CONNECT, flags, 0, fd, uint64(n), uint64(uintptr(ptr)), 0, 0) } func (ur *ioUring) fAllocate(ctx context.Context, flags uint8, fd int, mode uint32, off int64, len int64) error { - return ur.submit6(contextWithFD(ctx, fd), IORING_OP_FALLOCATE, flags, fd, uint64(off), uint64(len), int(mode), 0) + return ur.submit6(ContextWithUserdata(ctx, ioUringFd(fd)), IORING_OP_FALLOCATE, flags, 0, fd, uint64(off), uint64(len), int(mode), 0) } func (ur *ioUring) openAt(ctx context.Context, flags uint8, dirfd int, pathname string, uflags int, mode uint32) error { @@ -240,11 +246,11 @@ func (ur *ioUring) openAt(ctx context.Context, flags uint8, dirfd int, pathname } addr := uint64(uintptr(unsafe.Pointer(ptr))) - return ur.submit6(contextWithFD(ctx, dirfd), IORING_OP_OPENAT, flags, dirfd, 0, addr, int(mode), uint32(uflags|unix.O_LARGEFILE)) + return ur.submit6(ContextWithUserdata(ctx, ioUringFd(dirfd)), IORING_OP_OPENAT, flags, 0, dirfd, 0, addr, int(mode), uint32(uflags|unix.O_LARGEFILE)) } func (ur *ioUring) close(ctx context.Context, flags uint8, fd int) error { - return ur.submit3(contextWithFD(ctx, fd), IORING_OP_CLOSE, flags, fd, 0, 0) + return ur.submit3(ContextWithUserdata(ctx, ioUringFd(fd)), IORING_OP_CLOSE, flags, 0, fd, 0, 0) } func (ur *ioUring) statx(ctx context.Context, flags uint8, dirfd int, path string, uflags int, mask int, stat *unix.Statx_t) error { @@ -255,7 +261,7 @@ func (ur *ioUring) statx(ctx context.Context, flags uint8, dirfd int, path strin addr := uint64(uintptr(unsafe.Pointer(ptr))) off := uint64(uintptr(unsafe.Pointer(stat))) - return ur.submit6(contextWithFD(ctx, dirfd), IORING_OP_STATX, flags, dirfd, off, addr, mask, uint32(uflags)) + return ur.submit6(ContextWithUserdata(ctx, ioUringFd(dirfd)), IORING_OP_STATX, flags, 0, dirfd, off, addr, mask, uint32(uflags)) } func (ur *ioUring) read(ctx context.Context, flags uint8, fd int, p []byte) error { @@ -266,14 +272,14 @@ func (ur *ioUring) read(ctx context.Context, flags uint8, fd int, p []byte) erro opcode := IORING_OP_READ addr := uint64(uintptr(unsafe.Pointer(unsafe.SliceData(p)))) - return ur.submit3(contextWithFD(ctx, fd), opcode, flags, fd, addr, len(p)) + return ur.submit3(ContextWithUserdata(ctx, ioUringFd(fd)), opcode, flags, 0, fd, addr, len(p)) } func (ur *ioUring) readWithBufferSelect(ctx context.Context, flags uint8, fd int, n int, group uint16) error { opcode := IORING_OP_READ flags |= IOSQE_BUFFER_SELECT - return ur.submit9(contextWithFD(ctx, fd), opcode, flags, fd, 0, 0, n, 0, group, 0, 0) + return ur.submit9(ContextWithUserdata(ctx, ioUringFd(fd)), opcode, flags, 0, fd, 0, 0, n, 0, group, 0, 0) } func (ur *ioUring) write(ctx context.Context, flags uint8, fd int, p []byte, n int) error { @@ -284,18 +290,26 @@ func (ur *ioUring) write(ctx context.Context, flags uint8, fd int, p []byte, n i opcode := IORING_OP_WRITE addr := uint64(uintptr(unsafe.Pointer(unsafe.SliceData(p)))) - return ur.submit3(contextWithFD(ctx, fd), opcode, flags, fd, addr, n) + return ur.submit3(ContextWithUserdata(ctx, ioUringFd(fd)), opcode, flags, 0, fd, addr, n) } func (ur *ioUring) fadvise(ctx context.Context, flags uint8, fd int, offset int64, n int, advice int) error { - return ur.submit6(contextWithFD(ctx, fd), IORING_OP_FADVISE, flags, fd, uint64(offset), 0, n, uint32(advice)) + return ur.submit6(ContextWithUserdata(ctx, ioUringFd(fd)), IORING_OP_FADVISE, flags, 0, fd, uint64(offset), 0, n, uint32(advice)) } func (ur *ioUring) madvise(ctx context.Context, flags uint8, b []byte, advice int) error { addr := uint64(uintptr(unsafe.Pointer(unsafe.SliceData(b)))) - return ur.submit6(ctx, IORING_OP_MADVISE, flags, 0, 0, addr, len(b), uint32(advice)) + return ur.submit6(ctx, IORING_OP_MADVISE, flags, 0, 0, 0, addr, len(b), uint32(advice)) } +const ( + IORING_RECVSEND_POLL_FIRST = 1 << iota + IORING_RECV_MULTISHOT + IORING_RECVSEND_FIXED_BUF + IORING_SEND_ZC_REPORT_USAGE + IORING_RECVSEND_BUNDLE +) + func (ur *ioUring) send(ctx context.Context, flags uint8, fd int, p []byte) error { if p == nil || len(p) < 1 { return ErrInvalidParam @@ -303,24 +317,25 @@ func (ur *ioUring) send(ctx context.Context, flags uint8, fd int, p []byte) erro opcode := IORING_OP_SEND addr := uint64(uintptr(unsafe.Pointer(unsafe.SliceData(p)))) - return ur.submit3(contextWithFD(ctx, fd), opcode, flags, fd, addr, len(p)) + return ur.submit3(ContextWithUserdata(ctx, ioUringFd(fd)), opcode, flags, 0, fd, addr, len(p)) } -func (ur *ioUring) receive(ctx context.Context, flags uint8, fd int, p []byte) error { +func (ur *ioUring) receive(ctx context.Context, flags uint8, ioprio uint16, fd int, p []byte) error { if p == nil || len(p) < 1 { return ErrInvalidParam } opcode := IORING_OP_RECV addr := uint64(uintptr(unsafe.Pointer(unsafe.SliceData(p)))) - return ur.submit6(contextWithFD(ctx, fd), opcode, flags, fd, 0, addr, len(p), unix.MSG_WAITALL) + return ur.submit6(ContextWithUserdata(ctx, ioUringFd(fd)), opcode, flags, ioprio, fd, 0, addr, len(p), unix.MSG_WAITALL) } -func (ur *ioUring) receiveWithBufferSelect(ctx context.Context, flags uint8, fd int, n int, group uint16) error { +func (ur *ioUring) receiveWithBufferSelect(ctx context.Context, flags uint8, ioprio uint16, fd int, n int, group uint16) error { opcode := IORING_OP_RECV flags |= IOSQE_BUFFER_SELECT + ctx = ContextWithUserdata(ctx, bufferGroupIndex(group)) - return ur.submit9(contextWithFD(ctx, fd), opcode, flags, fd, 0, 0, n, 0, group, 0, 0) + return ur.submit9(ContextWithUserdata(ctx, ioUringFd(fd)), opcode, flags, ioprio, fd, 0, 0, n, 0, group, 0, 0) } func (ur *ioUring) openAt2(ctx context.Context, flags uint8, dirfd int, pathname string, how *unix.OpenHow) error { @@ -331,7 +346,7 @@ func (ur *ioUring) openAt2(ctx context.Context, flags uint8, dirfd int, pathname addr := uint64(uintptr(unsafe.Pointer(ptr))) off := uint64(uintptr(unsafe.Pointer(how))) - return ur.submit6(contextWithFD(ctx, dirfd), IORING_OP_OPENAT2, flags, dirfd, off, addr, unix.SizeofOpenHow, 0) + return ur.submit6(ContextWithUserdata(ctx, ioUringFd(dirfd)), IORING_OP_OPENAT2, flags, 0, dirfd, off, addr, unix.SizeofOpenHow, 0) } func (ur *ioUring) epollCtl(ctx context.Context, flags uint8, epfd int, op int, fd int, events uint32) error { @@ -339,19 +354,19 @@ func (ur *ioUring) epollCtl(ctx context.Context, flags uint8, epfd int, op int, opcode := IORING_OP_EPOLL_CTL addr := uint64(uintptr(unsafe.Pointer(&e))) - return ur.submit6(contextWithFD(ctx, epfd), flags, opcode, epfd, uint64(fd), addr, op, 0) + return ur.submit6(ContextWithUserdata(ctx, ioUringFd(epfd)), opcode, flags, 0, epfd, uint64(fd), addr, op, 0) } func (ur *ioUring) epollAdd(ctx context.Context, epfd int, fd int, events uint32) error { - return ur.epollCtl(contextWithFD(ctx, epfd), 0, epfd, unix.EPOLL_CTL_ADD, fd, events) + return ur.epollCtl(ContextWithUserdata(ctx, ioUringFd(epfd)), 0, epfd, unix.EPOLL_CTL_ADD, fd, events) } func (ur *ioUring) epollMod(ctx context.Context, epfd int, fd int, events uint32) error { - return ur.epollCtl(contextWithFD(ctx, epfd), 0, epfd, unix.EPOLL_CTL_MOD, fd, events) + return ur.epollCtl(ContextWithUserdata(ctx, ioUringFd(epfd)), 0, epfd, unix.EPOLL_CTL_MOD, fd, events) } func (ur *ioUring) epollDel(ctx context.Context, epfd int, fd int) error { - return ur.epollCtl(contextWithFD(ctx, epfd), 0, epfd, unix.EPOLL_CTL_DEL, fd, 0) + return ur.epollCtl(ContextWithUserdata(ctx, ioUringFd(epfd)), 0, epfd, unix.EPOLL_CTL_DEL, fd, 0) } func (ur *ioUring) splice(ctx context.Context, flags uint8, rfd int, rOff *int64, wfd int, wOff *int64, n int, uflags int) error { @@ -362,23 +377,23 @@ func (ur *ioUring) splice(ctx context.Context, flags uint8, rfd int, rOff *int64 if wOff != nil { wOffVal = uint64(*wOff) } - return ur.submit9(contextWithFD(ctx, wfd), IORING_OP_SPLICE, flags, wfd, wOffVal, rOffVal, n, uint32(uflags), 0, 0, rfd) + return ur.submit9(ContextWithUserdata(ctx, ioUringFd(wfd)), IORING_OP_SPLICE, flags, 0, wfd, wOffVal, rOffVal, n, uint32(uflags), 0, 0, rfd) } func (ur *ioUring) provideBuffers(ctx context.Context, flags uint8, num int, starting int, addr unsafe.Pointer, size int, group uint16) error { - return ur.submit9(ctx, IORING_OP_PROVIDE_BUFFERS, flags, num, uint64(starting), uint64(uintptr(addr)), size, 0, group, 0, 0) + return ur.submit9(ctx, IORING_OP_PROVIDE_BUFFERS, flags, 0, num, uint64(starting), uint64(uintptr(addr)), size, 0, group, 0, 0) } func (ur *ioUring) removeBuffers(ctx context.Context, flags uint8, num int, group uint16) error { - return ur.submit9(ctx, IORING_OP_REMOVE_BUFFERS, flags, num, 0, 0, 0, 0, group, 0, 0) + return ur.submit9(ctx, IORING_OP_REMOVE_BUFFERS, flags, 0, num, 0, 0, 0, 0, group, 0, 0) } func (ur *ioUring) tee(ctx context.Context, flags uint8, rfd int, wfd int, n int, uflags int) error { - return ur.submit9(contextWithFD(ctx, wfd), IORING_OP_TEE, flags, wfd, 0, 0, n, uint32(uflags), 0, 0, rfd) + return ur.submit9(ContextWithUserdata(ctx, ioUringFd(wfd)), IORING_OP_TEE, flags, 0, wfd, 0, 0, n, uint32(uflags), 0, 0, rfd) } func (ur *ioUring) shutdown(ctx context.Context, flags uint8, fd int, how int) error { - return ur.submit3(contextWithFD(ctx, fd), IORING_OP_SHUTDOWN, flags, fd, 0, how) + return ur.submit3(ContextWithUserdata(ctx, ioUringFd(fd)), IORING_OP_SHUTDOWN, flags, 0, fd, 0, how) } func (ur *ioUring) renameAt(ctx context.Context, flags uint8, oldPath, newPath string, uflags int) error { @@ -393,7 +408,7 @@ func (ur *ioUring) renameAt(ctx context.Context, flags uint8, oldPath, newPath s } newAddr := uint64(uintptr(unsafe.Pointer(newPtr))) - return ur.submit6(ctx, IORING_OP_RENAMEAT, flags, unix.AT_FDCWD, newAddr, oldAddr, unix.AT_FDCWD, uint32(uflags)) + return ur.submit6(ctx, IORING_OP_RENAMEAT, flags, 0, unix.AT_FDCWD, newAddr, oldAddr, unix.AT_FDCWD, uint32(uflags)) } func (ur *ioUring) unlinkAt(ctx context.Context, flags uint8, dirfd int, pathname string, uflags int) error { @@ -403,7 +418,7 @@ func (ur *ioUring) unlinkAt(ctx context.Context, flags uint8, dirfd int, pathnam } addr := uint64(uintptr(unsafe.Pointer(ptr))) - return ur.submit6(contextWithFD(ctx, dirfd), IORING_OP_UNLINKAT, flags, dirfd, 0, addr, 0, uint32(uflags)) + return ur.submit6(ContextWithUserdata(ctx, ioUringFd(dirfd)), IORING_OP_UNLINKAT, flags, 0, dirfd, 0, addr, 0, uint32(uflags)) } func (ur *ioUring) mkdirAt(ctx context.Context, flags uint8, dirfd int, pathname string, uflags int, mode uint32) error { @@ -413,7 +428,7 @@ func (ur *ioUring) mkdirAt(ctx context.Context, flags uint8, dirfd int, pathname } addr := uint64(uintptr(unsafe.Pointer(ptr))) - return ur.submit6(contextWithFD(ctx, dirfd), IORING_OP_MKDIRAT, flags, dirfd, 0, addr, int(mode), uint32(uflags)) + return ur.submit6(ContextWithUserdata(ctx, ioUringFd(dirfd)), IORING_OP_MKDIRAT, flags, 0, dirfd, 0, addr, int(mode), uint32(uflags)) } func (ur *ioUring) symlinkAt(ctx context.Context, flags uint8, oldPath string, newDirfd int, newPath string) error { @@ -429,7 +444,7 @@ func (ur *ioUring) symlinkAt(ctx context.Context, flags uint8, oldPath string, n } newAddr := uint64(uintptr(unsafe.Pointer(ptr))) - return ur.submit6(contextWithFD(ctx, newDirfd), IORING_OP_SYMLINKAT, flags, newDirfd, newAddr, oldAddr, 0, 0) + return ur.submit6(ContextWithUserdata(ctx, ioUringFd(newDirfd)), IORING_OP_SYMLINKAT, flags, 0, newDirfd, newAddr, oldAddr, 0, 0) } func (ur *ioUring) linkAt(ctx context.Context, flags uint8, oldDirfd int, oldPath string, newDirfd int, newPath string, uflags int) error { @@ -445,20 +460,20 @@ func (ur *ioUring) linkAt(ctx context.Context, flags uint8, oldDirfd int, oldPat } newAddr := uint64(uintptr(unsafe.Pointer(ptr))) - return ur.submit6(contextWithFD(ctx, newDirfd), IORING_OP_LINKAT, flags, oldDirfd, newAddr, oldAddr, newDirfd, uint32(uflags)) + return ur.submit6(ContextWithUserdata(ctx, ioUringFd(newDirfd)), IORING_OP_LINKAT, flags, 0, oldDirfd, newAddr, oldAddr, newDirfd, uint32(uflags)) } func (ur *ioUring) msgRing(ctx context.Context, flags uint8, targetFd int, userData int64, res int32) error { - return ur.submit6(contextWithFD(ctx, targetFd), IORING_OP_MSG_RING, flags, targetFd, uint64(userData), 0, int(res), 0) + return ur.submit6(ContextWithUserdata(ctx, ioUringFd(targetFd)), IORING_OP_MSG_RING, flags, 0, targetFd, uint64(userData), 0, int(res), 0) } func (ur *ioUring) socket(ctx context.Context, flags uint8, domain, typ, proto int, fileIndex uint32) error { - return ur.submit9(ctx, IORING_OP_SOCKET, flags, domain, uint64(typ), 0, proto, 0, 0, 0, int(fileIndex)) + return ur.submit9(ctx, IORING_OP_SOCKET, flags, 0, domain, uint64(typ), 0, proto, 0, 0, 0, int(fileIndex)) } func (ur *ioUring) sendZeroCopy(ctx context.Context, flags uint8, fd int, p []byte, msgFlags uint32) error { addr := uint64(uintptr(unsafe.Pointer(unsafe.SliceData(p)))) - return ur.submit6(contextWithFD(ctx, fd), IORING_OP_SEND_ZC, flags, fd, 0, addr, len(p), msgFlags) + return ur.submit6(ContextWithUserdata(ctx, ioUringFd(fd)), IORING_OP_SEND_ZC, flags, 0, fd, 0, addr, len(p), msgFlags) } func (ur *ioUring) sendToZeroCopy(ctx context.Context, flags uint8, target Sockaddr, p []byte, msgFlags uint32) error { @@ -467,5 +482,5 @@ func (ur *ioUring) sendToZeroCopy(ctx context.Context, flags uint8, target Socka return err } addr := uint64(uintptr(unsafe.Pointer(unsafe.SliceData(p)))) - return ur.submit9(ctx, IORING_OP_SEND_ZC, flags, 0, uint64(uintptr(addr2)), addr, len(p), msgFlags, 0, 0, addrLen<<16) + return ur.submit9(ctx, IORING_OP_SEND_ZC, flags, 0, 0, uint64(uintptr(addr2)), addr, len(p), msgFlags, 0, 0, addrLen<<16) } diff --git a/uring_linux_test.go b/uring_linux_test.go index cdeb57c..fa06ab2 100644 --- a/uring_linux_test.go +++ b/uring_linux_test.go @@ -181,7 +181,7 @@ func TestIOUring_BasicUsage(t *testing.T) { } rb := make([]byte, len(wb)) - err = ur.receive(context.TODO(), 0, so[0].fd, rb) + err = ur.receive(context.TODO(), 0, 0, so[0].fd, rb) if err != nil { t.Errorf("submit recv: %v", err) return