From 5ffbbe21606e4e04a5188391c9be46efd14a866b Mon Sep 17 00:00:00 2001 From: Mitchel Disveld Date: Wed, 1 May 2024 18:41:48 +0200 Subject: [PATCH] transcoding update --- clientbalancer.go | 79 ++++++++++++--- config.go | 85 +++++++++++++++- main.go | 241 +++++++++++++++++++++++++++++++++++++++------- viewers.go | 15 +-- 4 files changed, 362 insertions(+), 58 deletions(-) diff --git a/clientbalancer.go b/clientbalancer.go index 0e3813c..525c1d8 100644 --- a/clientbalancer.go +++ b/clientbalancer.go @@ -8,10 +8,23 @@ import ( "github.com/golang/protobuf/proto" "github.com/nknorg/nkn-sdk-go" "github.com/nknorg/nkn-sdk-go/payloads" + "github.com/nknorg/nkngomobile" ) var clientSendIndex = 0 +func getNextClient() *nkn.Client { + clientId := clientSendIndex % NUM_SUB_CLIENTS + client := client.GetClient(clientId) + clientSendIndex++ + + if client == nil { + client = getNextClient() + } + + return client +} + func publish(data []byte) { //Foreach chunk generate a message id and predefine the payload to reuse msgId, _ := nkn.RandomBytes(nkn.MessageIDSize) @@ -24,9 +37,53 @@ func publish(data []byte) { //Send VIEWER_SUB_CLIENTS times everytime with the next subclient in queue for i := 0; i < VIEWER_SUB_CLIENTS; i++ { - clientId := clientSendIndex % NUM_SUB_CLIENTS - go client.GetClient(clientId).SendPayload(viewerSubClientAddresses[i], msgPayload, segmentSendConfig) - clientSendIndex++ + go getNextClient().SendPayload(viewerSubClientAddresses[i], msgPayload, segmentSendConfig) + } +} + +func publishQualityLevels(qualityData ...[][]byte) { + qualityLevels := len(qualityData) + qualityAddrStrings := make([][]string, qualityLevels) + qualityNknAddrStrings := make([][]*nkngomobile.StringArray, qualityLevels) + + // Build address slices + for i := range qualityData { + qualityNknAddrStrings[i] = make([]*nkngomobile.StringArray, VIEWER_SUB_CLIENTS) + } + + // Build viewer lists for each quality + for k, _ := range viewers.messages { + qualityLevel := min(viewers.viewerQuality[k], qualityLevels) + qualityAddrStrings[qualityLevel] = append(qualityAddrStrings[qualityLevel], k) + } + + // Convert to multiclient recipient nkn addreses + for q := 0; q < qualityLevels; q++ { + // Preprocess addresses for this quality level (similar to original code) + for j := 0; j < VIEWER_SUB_CLIENTS; j++ { + prefixedAddresses := make([]string, len(qualityAddrStrings[q])) + for k, viewer := range qualityAddrStrings[q] { + prefixedAddresses[k] = "__" + strconv.Itoa(j) + "__." + viewer + } + qualityNknAddrStrings[q][j] = nkn.NewStringArray(prefixedAddresses...) + } + } + + // Send the chunks to each quality level + for q := 0; q < qualityLevels; q++ { + for _, v := range qualityData[q] { + msgId, _ := nkn.RandomBytes(nkn.MessageIDSize) + msgPayload := &payloads.Payload{ + Type: payloads.PayloadType_BINARY, + NoReply: true, + MessageId: msgId, + Data: v, + } + + for i := 0; i < VIEWER_SUB_CLIENTS; i++ { + go getNextClient().SendPayload(qualityNknAddrStrings[q][i], msgPayload, segmentSendConfig) + } + } } } @@ -48,9 +105,7 @@ func publishText(text string) { //Send VIEWER_SUB_CLIENTS times everytime with the next subclient in queue for i := 0; i < VIEWER_SUB_CLIENTS; i++ { - clientId := clientSendIndex % NUM_SUB_CLIENTS - go client.GetClient(clientId).SendPayload(viewerSubClientAddresses[i], msgPayload, segmentSendConfig) - clientSendIndex++ + go getNextClient().SendPayload(viewerSubClientAddresses[i], msgPayload, segmentSendConfig) } } @@ -64,13 +119,11 @@ func sendToClient(address string, data []byte) { } for i := 0; i < VIEWER_SUB_CLIENTS; i++ { - clientId := clientSendIndex % NUM_SUB_CLIENTS - go client.GetClient(clientId).SendPayload(nkn.NewStringArray("__"+strconv.Itoa(i)+"__."+address), msgPayload, &nkn.MessageConfig{ + go getNextClient().SendPayload(nkn.NewStringArray("__"+strconv.Itoa(i)+"__."+address), msgPayload, &nkn.MessageConfig{ Unencrypted: true, NoReply: true, MaxHoldingSeconds: 0, }) - clientId++ } } @@ -81,13 +134,11 @@ func reply(data []byte, msg *nkn.Message) { } for i := 0; i < VIEWER_SUB_CLIENTS; i++ { - clientId := clientSendIndex % NUM_SUB_CLIENTS - go client.GetClient(clientId).SendPayload(nkn.NewStringArray("__"+strconv.Itoa(i)+"__."+msg.Src), payload, &nkn.MessageConfig{ + go getNextClient().SendPayload(nkn.NewStringArray("__"+strconv.Itoa(i)+"__."+msg.Src), payload, &nkn.MessageConfig{ Unencrypted: true, NoReply: true, MaxHoldingSeconds: 0, }) - clientId++ } } @@ -98,12 +149,10 @@ func replyText(text string, msg *nkn.Message) { } for i := 0; i < VIEWER_SUB_CLIENTS; i++ { - clientId := clientSendIndex % NUM_SUB_CLIENTS - go client.GetClient(clientId).SendPayload(nkn.NewStringArray("__"+strconv.Itoa(i)+"__."+msg.Src), payload, &nkn.MessageConfig{ + go getNextClient().SendPayload(nkn.NewStringArray("__"+strconv.Itoa(i)+"__."+msg.Src), payload, &nkn.MessageConfig{ Unencrypted: true, NoReply: true, MaxHoldingSeconds: 0, }) - clientId++ } } diff --git a/config.go b/config.go index 51c8f77..1265cc1 100644 --- a/config.go +++ b/config.go @@ -6,15 +6,24 @@ import ( "errors" "fmt" "os" + "sort" + "strconv" + "strings" "github.com/nknorg/nkn-sdk-go" ) // Config represents the configuration data type Config struct { - Seed string `json:"seed"` - Title string `json:"title"` - Owner string `json:"owner"` + Seed string `json:"seed"` + Title string `json:"title"` + Owner string `json:"owner"` + Transcoders []string `json:transcoders` +} + +type Transcode struct { + Resolution int + Framerate int } // NewConfig reads the configuration file from a specified location and populates defaults @@ -68,6 +77,76 @@ func NewConfig(configFile string) (*Config, error) { return &cfg, nil } +func getTranscoders(config *Config) []Transcode { + var transcoders = make([]Transcode, 0) + + for _, v := range config.Transcoders { + transcodeStr := strings.Split(v, "p") + resolution, err := strconv.Atoi(transcodeStr[0]) + if err != nil { + fmt.Println("Skipping invalid transcode value in config:", v) + continue + } + + if sourceResolution <= resolution { + fmt.Println("Skipping transcode value in config:", v, "stream source is smaller:", sourceResolution) + continue + } + + framerate := 30 + if len(transcodeStr) == 2 && len(transcodeStr[1]) > 0 { + framerate, err = strconv.Atoi(transcodeStr[1]) + if err != nil { + fmt.Println("Skipping invalid transcode value in config:", v) + continue + } + } + + if framerate > sourceFramerate { + framerate = sourceFramerate + fmt.Println("Lowering transcode framerate value in config:", v, "stream source framerate:", sourceFramerate) + } + + if sourceResolution == resolution && framerate == sourceFramerate { + fmt.Println("Skipping transcode value in config:", v, "stream source resolution and framerate are equal") + continue + } + + transcoders = append(transcoders, Transcode{ + Resolution: resolution, + Framerate: framerate, + }) + } + + return removeDuplicateTranscodes(transcoders) +} + +func removeDuplicateTranscodes(transcodes []Transcode) []Transcode { + // Sort the unique slice by resolution (descending) and then framerate (descending) + sort.SliceStable(transcodes, func(i, j int) bool { + if transcodes[i].Resolution != transcodes[j].Resolution { + return transcodes[i].Resolution > transcodes[j].Resolution // Descending order for resolution + } + return transcodes[i].Framerate > transcodes[j].Framerate // Descending order for framerate + }) + + // Create a map to store seen resolutions and their corresponding framerates + seen := make(map[int]int) + var unique []Transcode + + // Loop through the original slice + for _, transcode := range transcodes { + // Check if the resolution is already seen + if prevFramerate, ok := seen[transcode.Resolution]; !ok || transcode.Framerate > prevFramerate { + // If not seen or framerate is higher, update seen map and add to unique slice + seen[transcode.Resolution] = transcode.Framerate + unique = append(unique, transcode) + } + } + + return unique +} + func generateMediaMTXConfig() { if _, err := os.Stat("mediamtx.yml"); errors.Is(err, os.ErrNotExist) { os.WriteFile("mediamtx.yml", []byte(mediaMTXDefaults), 0644) diff --git a/main.go b/main.go index 1d55df7..fce3a83 100644 --- a/main.go +++ b/main.go @@ -4,11 +4,13 @@ import ( "bytes" "encoding/binary" "encoding/hex" + "encoding/json" "fmt" "log" "os" "os/exec" "strconv" + "strings" "time" "github.com/bluenviron/mediamtx/core" @@ -36,6 +38,12 @@ var segmentSendConfig = &nkn.MessageConfig{ var lastRtmpSegment = time.Time{} +var sourceResolution int +var sourceFramerate int +var sourceCodec string + +var transcoders []Transcode + func main() { fmt.Println("Welcome to go-novon a golang client for RTMP streaming to novon") fmt.Println("") @@ -82,12 +90,19 @@ func createClient() *nkn.MultiClient { for i := 0; i < NUM_SUB_CLIENTS-(NUM_SUB_CLIENTS/20); i++ { <-client.OnConnect.C } - fmt.Println("connected to NKN") - fmt.Println("Your address", client.Address()) + log.Println("connected to NKN") + log.Println("Your address", client.Address()) return client } +type ChannelInfo struct { + Panels string `json:"panels"` + Viewers int `json:"viewers"` + Role string `json:"role"` + QualityLevels []Transcode `json:"qualityLevels"` +} + func receiveMessages() { go func() { for { @@ -102,6 +117,38 @@ func receiveMessages() { continue } + //Always reply to panel, this can be displayed when we are not broadcasting. + if len(msg.Data) == 11 && string(msg.Data[:]) == "channelinfo" { + + role := "" + if msg.Src == config.Owner { + role = "owner" + } + + qualityLevels := make([]Transcode, 0) + qualityLevels = append(qualityLevels, Transcode{ + Resolution: sourceResolution, + Framerate: sourceFramerate, + }) + + qualityLevels = append(qualityLevels, transcoders...) + + response := ChannelInfo{ + Panels: panels, + Viewers: len(viewerAddresses), + Role: role, + QualityLevels: qualityLevels, + } + + json, err := json.Marshal(response) + if err != nil { + log.Println("error on creating channel info response", err.Error()) + } + + go replyText(string(json), msg) + continue + } + //If we're not broadcasting don't reply to anything. if !isBroadcasting() { time.Sleep(time.Millisecond * 100) @@ -111,7 +158,7 @@ func receiveMessages() { if len(msg.Data) == 4 && string(msg.Data[:]) == "ping" { isNew := viewers.AddOrUpdateAddress(msg.Src) if isNew { - fmt.Println("viewer joined: ", msg.Src) + log.Println("viewer joined: ", msg.Src) } //Send last segment to newly joined if isNew { @@ -127,12 +174,15 @@ func receiveMessages() { go replyText(strconv.Itoa(len(viewerAddresses)), msg) } else if len(msg.Data) == 10 && string(msg.Data[:]) == "donationid" { go replyText(generateDonationEntry(), msg) - } else if len(msg.Data) == 7 && string(msg.Data[:]) == "getrole" { - role := "" - if msg.Src == config.Owner { - role = "owner" - } - go replyText(role, msg) + } else if len(msg.Data) == 8 && string(msg.Data[:]) == "quality0" { + viewers.viewerQuality[msg.Src] = 0 + go replyText(strconv.Itoa(segmentId), msg) + } else if len(msg.Data) == 8 && string(msg.Data[:]) == "quality1" { + viewers.viewerQuality[msg.Src] = 1 + go replyText(strconv.Itoa(segmentId), msg) + } else if len(msg.Data) == 8 && string(msg.Data[:]) == "quality2" { + viewers.viewerQuality[msg.Src] = 2 + go replyText(strconv.Itoa(segmentId), msg) } else { DecodeMessage(msg) } @@ -195,30 +245,79 @@ func ChunkByByteSizeWithMetadata(data []byte, chunkSize int, segmentId int) [][] } func publishTSPart(segment []byte) { - lastRtmpSegment = time.Now() - //Segment the data to max CHUNK_SIZE chunks - chunks := ChunkByByteSizeWithMetadata(segment, CHUNK_SIZE, segmentId) - segmentId++ + if !isBroadcasting() { + info, err := probeVideoInfo(segment) + if err != nil { + panic(err) + } + + sourceCodec = info["codec"] + sourceResolution, _ = strconv.Atoi(strings.Split(info["resolution"], "x")[1]) + sourceFramerate, _ = strconv.Atoi(strings.Split(info["framerate"], "/")[0]) - fmt.Println("Broadcasting -", "viewers:", len(viewerAddresses), "size:", len(segment), "chunks:", len(chunks)) + log.Println("Receiving codec:", sourceCodec, "resolution:", sourceResolution, "framerate:", sourceFramerate) - if len(viewerAddresses) > 0 { - for _, chunk := range chunks { - publish(chunk) + transcoders = getTranscoders(config) + for _, v := range transcoders { + log.Println("Stream will be transcoded in:", v.Resolution, "p", v.Framerate) } } - if (segmentId-1)%10 == 0 { - go screengrabSegment(segment) - } + lastRtmpSegment = time.Now() + //os.WriteFile("test.ts", segment, os.FileMode(0644)) + + go func() { + sourceChunks := ChunkByByteSizeWithMetadata(segment, CHUNK_SIZE, segmentId) + transcodedChunksArray := make([][][]byte, 0) + transcodedChunksArray = append(transcodedChunksArray, sourceChunks) + + //No transcoding, publish to all viewers in source quality. + if len(transcoders) == 0 { + log.Println("Broadcasting -", "viewers:", len(viewerAddresses), "source size:", len(segment), "source chunks:", len(sourceChunks)) + for i := 0; i < len(sourceChunks); i++ { + go publish(sourceChunks[i]) + } + segmentId++ + } else { - lastSegment = chunks + startTranscoderTime := time.Now() + log.Println("Broadcasting -", "viewers:", len(viewerAddresses), "source size:", len(segment), "source chunks:", len(sourceChunks)) + for _, t := range transcoders { + + beginTime := time.Now() + segment = resizeSegment(t, segment) + timeSpent := time.Since(beginTime).Milliseconds() + + tChunks := ChunkByByteSizeWithMetadata(segment, CHUNK_SIZE, segmentId) + transcodedChunksArray = append(transcodedChunksArray, tChunks) + log.Printf("Transcoded -%v@%v size: %v, chunks: %v, timeSpent: %v\n", t.Resolution, t.Framerate, len(segment), len(tChunks), timeSpent) + } + segmentId++ + + if len(viewerAddresses) > 0 { + publishQualityLevels(transcodedChunksArray...) + } + + totalTranscodingMs := time.Since(startTranscoderTime).Milliseconds() + if totalTranscodingMs > 1000 && totalTranscodingMs < 2000 { + log.Printf("WARNING: Total transcoding time '%vms' approaching segment duration, consider less transcoding configurations.", totalTranscodingMs) + } else if totalTranscodingMs > 2000 { + log.Printf("DANGER: Total transcoding time '%vms' exceeds segment duration, stream will suffer interrupts, reduce or remove transcoding configurations.", totalTranscodingMs) + } + } + + if (segmentId-1)%10 == 0 { + go screengrabSegment(segment) + } + + //For fastest join times we take the lowest quality level + lastSegment = transcodedChunksArray[len(transcodedChunksArray)-1] + }() } func screengrabSegment(segment []byte) { // Output image file - outputFile := "screenshot.jpg" width := "256" height := "144" @@ -226,9 +325,10 @@ func screengrabSegment(segment []byte) { cmd := exec.Command("ffmpeg", "-i", "-", // read from stdin (pipe) "-vframes", "1", - "-y", // overwrite output file "-vf", fmt.Sprintf("scale=%s:%s", width, height), // resize filter - outputFile) + "-f", + "image2pipe", + "-") var stdinPipe, stderrPipe bytes.Buffer cmd.Stdin = &stdinPipe @@ -237,22 +337,95 @@ func screengrabSegment(segment []byte) { // Write MPEG-TS data to stdin pipe stdinPipe.Write(segment) - err := cmd.Run() + var err error + thumbnail, err = cmd.Output() if err != nil { - fmt.Println("Error capturing screenshot:", err) - fmt.Println("FFmpeg stderr:", stderrPipe.String()) + log.Println("Error capturing screenshot:", err) + log.Println("FFmpeg stderr:", stderrPipe.String()) return } - // Read the temporary image file into memory - thumbnail, err = os.ReadFile(outputFile) + log.Println("Screenshot captured successfully.") +} + +func resizeSegment(transcode Transcode, segment []byte) []byte { + //ultrafast superfast veryfast faster fast medium (default) slow slower veryslow + + // Command arguments for ffmpeg + cmd := exec.Command("ffmpeg", + "-hwaccel", "auto", + "-i", "-", // read from stdin (pipe) + "-c:v", "libx264", // specify video encoder (optional) + "-crf", "30", // set constant rate factor (quality) + "-preset", "ultrafast", // set encoding preset for faster processing + "-acodec", "copy", + "-filter:v", fmt.Sprintf("scale=-2:%d,fps=%d", transcode.Resolution, transcode.Framerate), + "-copyts", + "-f", "mpegts", + "-") + + var stdinPipe, stderrPipe bytes.Buffer + cmd.Stdin = &stdinPipe + cmd.Stderr = &stderrPipe + + // Write MPEG-TS data to stdin pipe + stdinPipe.Write(segment) + + resizedSegment, err := cmd.Output() + if err != nil { - fmt.Println("Error reading temporary image file:", err) - return + log.Println("Error capturing screenshot:", err) + log.Println("FFmpeg stderr:", stderrPipe.String()) + return nil + } + + return resizedSegment +} + +func probeVideoInfo(segment []byte) (map[string]string, error) { + // Create ffprobe command with pipe input + cmd := exec.Command("ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", "-i", "-") + + var stdinPipe bytes.Buffer + cmd.Stdin = &stdinPipe + + // Write MPEG-TS data to stdin pipe + stdinPipe.Write(segment) + + // Capture ffprobe output + out, err := cmd.CombinedOutput() + if err != nil { + return nil, fmt.Errorf("error probing video info: %w", err) + } + + // Parse ffprobe JSON output + var info map[string]interface{} + err = json.Unmarshal(out, &info) + if err != nil { + return nil, fmt.Errorf("error parsing ffprobe output: %w", err) + } + + // Extract relevant info (modify as needed) + result := map[string]string{} + if streams, ok := info["streams"].([]interface{}); ok { + for _, stream := range streams { + if streamMap, ok := stream.(map[string]interface{}); ok { + if codecType, ok := streamMap["codec_type"].(string); ok && codecType == "video" { + result["codec"] = streamMap["codec_name"].(string) + if width, ok := streamMap["width"].(float64); ok { + result["resolution"] = fmt.Sprintf("%.0fx%.0f", width, streamMap["height"].(float64)) + } + if r_frame_rate, ok := streamMap["r_frame_rate"].(string); ok { + result["framerate"] = r_frame_rate + } + break // Extract info only from the first video stream + } + } + } } - fmt.Println("Screenshot captured successfully:", outputFile) + return result, nil } func checkFfmpegInstalled() { @@ -262,12 +435,12 @@ func checkFfmpegInstalled() { err := cmd.Run() if err != nil { // Handle ffmpeg not found error - fmt.Println("Error: ffmpeg is not installed. Please install ffmpeg and try again.") + log.Println("Error: ffmpeg is not installed. Please install ffmpeg and try again.") return } // ffmpeg is available, continue with your application logic - fmt.Println("ffmpeg is installed. Proceeding...") + log.Println("ffmpeg is installed. Proceeding...") } func isBroadcasting() bool { diff --git a/viewers.go b/viewers.go index 0d0fca3..baec64b 100644 --- a/viewers.go +++ b/viewers.go @@ -15,9 +15,10 @@ var viewerSubClientAddresses [VIEWER_SUB_CLIENTS]*nkngomobile.StringArray // Viewers is a thread-safe collection of message addresses with last receive timestamps. type Viewers struct { - messages map[string]*messageData - mutex sync.RWMutex - timeout time.Duration + messages map[string]*messageData + viewerQuality map[string]int + mutex sync.RWMutex + timeout time.Duration } // messageData holds the last received time for an address. @@ -28,9 +29,10 @@ type messageData struct { // NewViewers creates a new Viewers with a specified timeout duration. func NewViewers(timeout time.Duration) *Viewers { return &Viewers{ - messages: make(map[string]*messageData), - mutex: sync.RWMutex{}, - timeout: timeout, + messages: make(map[string]*messageData), + viewerQuality: make(map[string]int), + mutex: sync.RWMutex{}, + timeout: timeout, } } @@ -43,6 +45,7 @@ func (ms *Viewers) AddOrUpdateAddress(address string) (isNew bool) { if !ok { data = &messageData{lastTime: time.Now()} ms.messages[address] = data + ms.viewerQuality[address] = 1 ms.SetAddresses() } else { data.lastTime = time.Now()