Skip to content

Commit

Permalink
transcoding update
Browse files Browse the repository at this point in the history
  • Loading branch information
MutsiMutsi committed May 1, 2024
1 parent a689ed9 commit 5ffbbe2
Show file tree
Hide file tree
Showing 4 changed files with 362 additions and 58 deletions.
79 changes: 64 additions & 15 deletions clientbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,23 @@ import (
"github.com/golang/protobuf/proto"
"github.com/nknorg/nkn-sdk-go"
"github.com/nknorg/nkn-sdk-go/payloads"
"github.com/nknorg/nkngomobile"
)

var clientSendIndex = 0

func getNextClient() *nkn.Client {
clientId := clientSendIndex % NUM_SUB_CLIENTS
client := client.GetClient(clientId)
clientSendIndex++

if client == nil {
client = getNextClient()
}

return client
}

func publish(data []byte) {
//Foreach chunk generate a message id and predefine the payload to reuse
msgId, _ := nkn.RandomBytes(nkn.MessageIDSize)
Expand All @@ -24,9 +37,53 @@ func publish(data []byte) {

//Send VIEWER_SUB_CLIENTS times everytime with the next subclient in queue
for i := 0; i < VIEWER_SUB_CLIENTS; i++ {
clientId := clientSendIndex % NUM_SUB_CLIENTS
go client.GetClient(clientId).SendPayload(viewerSubClientAddresses[i], msgPayload, segmentSendConfig)
clientSendIndex++
go getNextClient().SendPayload(viewerSubClientAddresses[i], msgPayload, segmentSendConfig)
}
}

func publishQualityLevels(qualityData ...[][]byte) {
qualityLevels := len(qualityData)
qualityAddrStrings := make([][]string, qualityLevels)
qualityNknAddrStrings := make([][]*nkngomobile.StringArray, qualityLevels)

// Build address slices
for i := range qualityData {
qualityNknAddrStrings[i] = make([]*nkngomobile.StringArray, VIEWER_SUB_CLIENTS)
}

// Build viewer lists for each quality
for k, _ := range viewers.messages {
qualityLevel := min(viewers.viewerQuality[k], qualityLevels)
qualityAddrStrings[qualityLevel] = append(qualityAddrStrings[qualityLevel], k)
}

// Convert to multiclient recipient nkn addreses
for q := 0; q < qualityLevels; q++ {
// Preprocess addresses for this quality level (similar to original code)
for j := 0; j < VIEWER_SUB_CLIENTS; j++ {
prefixedAddresses := make([]string, len(qualityAddrStrings[q]))
for k, viewer := range qualityAddrStrings[q] {
prefixedAddresses[k] = "__" + strconv.Itoa(j) + "__." + viewer
}
qualityNknAddrStrings[q][j] = nkn.NewStringArray(prefixedAddresses...)
}
}

// Send the chunks to each quality level
for q := 0; q < qualityLevels; q++ {
for _, v := range qualityData[q] {
msgId, _ := nkn.RandomBytes(nkn.MessageIDSize)
msgPayload := &payloads.Payload{
Type: payloads.PayloadType_BINARY,
NoReply: true,
MessageId: msgId,
Data: v,
}

for i := 0; i < VIEWER_SUB_CLIENTS; i++ {
go getNextClient().SendPayload(qualityNknAddrStrings[q][i], msgPayload, segmentSendConfig)
}
}
}
}

Expand All @@ -48,9 +105,7 @@ func publishText(text string) {

//Send VIEWER_SUB_CLIENTS times everytime with the next subclient in queue
for i := 0; i < VIEWER_SUB_CLIENTS; i++ {
clientId := clientSendIndex % NUM_SUB_CLIENTS
go client.GetClient(clientId).SendPayload(viewerSubClientAddresses[i], msgPayload, segmentSendConfig)
clientSendIndex++
go getNextClient().SendPayload(viewerSubClientAddresses[i], msgPayload, segmentSendConfig)
}
}

Expand All @@ -64,13 +119,11 @@ func sendToClient(address string, data []byte) {
}

for i := 0; i < VIEWER_SUB_CLIENTS; i++ {
clientId := clientSendIndex % NUM_SUB_CLIENTS
go client.GetClient(clientId).SendPayload(nkn.NewStringArray("__"+strconv.Itoa(i)+"__."+address), msgPayload, &nkn.MessageConfig{
go getNextClient().SendPayload(nkn.NewStringArray("__"+strconv.Itoa(i)+"__."+address), msgPayload, &nkn.MessageConfig{
Unencrypted: true,
NoReply: true,
MaxHoldingSeconds: 0,
})
clientId++
}
}

Expand All @@ -81,13 +134,11 @@ func reply(data []byte, msg *nkn.Message) {
}

for i := 0; i < VIEWER_SUB_CLIENTS; i++ {
clientId := clientSendIndex % NUM_SUB_CLIENTS
go client.GetClient(clientId).SendPayload(nkn.NewStringArray("__"+strconv.Itoa(i)+"__."+msg.Src), payload, &nkn.MessageConfig{
go getNextClient().SendPayload(nkn.NewStringArray("__"+strconv.Itoa(i)+"__."+msg.Src), payload, &nkn.MessageConfig{
Unencrypted: true,
NoReply: true,
MaxHoldingSeconds: 0,
})
clientId++
}
}

Expand All @@ -98,12 +149,10 @@ func replyText(text string, msg *nkn.Message) {
}

for i := 0; i < VIEWER_SUB_CLIENTS; i++ {
clientId := clientSendIndex % NUM_SUB_CLIENTS
go client.GetClient(clientId).SendPayload(nkn.NewStringArray("__"+strconv.Itoa(i)+"__."+msg.Src), payload, &nkn.MessageConfig{
go getNextClient().SendPayload(nkn.NewStringArray("__"+strconv.Itoa(i)+"__."+msg.Src), payload, &nkn.MessageConfig{
Unencrypted: true,
NoReply: true,
MaxHoldingSeconds: 0,
})
clientId++
}
}
85 changes: 82 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,24 @@ import (
"errors"
"fmt"
"os"
"sort"
"strconv"
"strings"

"github.com/nknorg/nkn-sdk-go"
)

// Config represents the configuration data
type Config struct {
Seed string `json:"seed"`
Title string `json:"title"`
Owner string `json:"owner"`
Seed string `json:"seed"`
Title string `json:"title"`
Owner string `json:"owner"`
Transcoders []string `json:transcoders`
}

type Transcode struct {
Resolution int
Framerate int
}

// NewConfig reads the configuration file from a specified location and populates defaults
Expand Down Expand Up @@ -68,6 +77,76 @@ func NewConfig(configFile string) (*Config, error) {
return &cfg, nil
}

func getTranscoders(config *Config) []Transcode {
var transcoders = make([]Transcode, 0)

for _, v := range config.Transcoders {
transcodeStr := strings.Split(v, "p")
resolution, err := strconv.Atoi(transcodeStr[0])
if err != nil {
fmt.Println("Skipping invalid transcode value in config:", v)
continue
}

if sourceResolution <= resolution {
fmt.Println("Skipping transcode value in config:", v, "stream source is smaller:", sourceResolution)
continue
}

framerate := 30
if len(transcodeStr) == 2 && len(transcodeStr[1]) > 0 {
framerate, err = strconv.Atoi(transcodeStr[1])
if err != nil {
fmt.Println("Skipping invalid transcode value in config:", v)
continue
}
}

if framerate > sourceFramerate {
framerate = sourceFramerate
fmt.Println("Lowering transcode framerate value in config:", v, "stream source framerate:", sourceFramerate)
}

if sourceResolution == resolution && framerate == sourceFramerate {
fmt.Println("Skipping transcode value in config:", v, "stream source resolution and framerate are equal")
continue
}

transcoders = append(transcoders, Transcode{
Resolution: resolution,
Framerate: framerate,
})
}

return removeDuplicateTranscodes(transcoders)
}

func removeDuplicateTranscodes(transcodes []Transcode) []Transcode {
// Sort the unique slice by resolution (descending) and then framerate (descending)
sort.SliceStable(transcodes, func(i, j int) bool {
if transcodes[i].Resolution != transcodes[j].Resolution {
return transcodes[i].Resolution > transcodes[j].Resolution // Descending order for resolution
}
return transcodes[i].Framerate > transcodes[j].Framerate // Descending order for framerate
})

// Create a map to store seen resolutions and their corresponding framerates
seen := make(map[int]int)
var unique []Transcode

// Loop through the original slice
for _, transcode := range transcodes {
// Check if the resolution is already seen
if prevFramerate, ok := seen[transcode.Resolution]; !ok || transcode.Framerate > prevFramerate {
// If not seen or framerate is higher, update seen map and add to unique slice
seen[transcode.Resolution] = transcode.Framerate
unique = append(unique, transcode)
}
}

return unique
}

func generateMediaMTXConfig() {
if _, err := os.Stat("mediamtx.yml"); errors.Is(err, os.ErrNotExist) {
os.WriteFile("mediamtx.yml", []byte(mediaMTXDefaults), 0644)
Expand Down
Loading

0 comments on commit 5ffbbe2

Please sign in to comment.