Files
BatchuKVM/server/service/stream/direct/streamer.go

124 lines
2.5 KiB
Go

package direct
import (
"NanoKVM-Server/common"
"NanoKVM-Server/service/stream"
"bytes"
"encoding/binary"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
)
type Streamer struct {
mutex sync.RWMutex
clients map[*websocket.Conn]bool
running int32
}
func newStreamer() *Streamer {
return &Streamer{
clients: make(map[*websocket.Conn]bool),
}
}
func (s *Streamer) addClient(ws *websocket.Conn) {
s.mutex.Lock()
s.clients[ws] = true
s.mutex.Unlock()
if atomic.CompareAndSwapInt32(&s.running, 0, 1) {
go s.run()
log.Debug("h264 stream started")
}
}
func (s *Streamer) removeClient(ws *websocket.Conn) {
s.mutex.Lock()
delete(s.clients, ws)
s.mutex.Unlock()
log.Debugf("h264 websocket disconnected, remaining clients: %d", len(s.clients))
}
func (s *Streamer) getClientCount() int {
s.mutex.RLock()
defer s.mutex.RUnlock()
return len(s.clients)
}
func (s *Streamer) run() {
defer atomic.StoreInt32(&s.running, 0)
duration := time.Second / time.Duration(120)
ticker := time.NewTicker(duration)
defer ticker.Stop()
screen := common.GetScreen()
vision := common.GetKvmVision()
startTime := time.Now()
for range ticker.C {
if s.getClientCount() == 0 {
log.Debug("h264 stream stopped due to no clients")
return
}
data, result := vision.ReadH264(screen.Width, screen.Height, screen.BitRate)
if result < 0 || len(data) == 0 {
continue
}
isKeyFrame := byte(0)
if result == 3 {
isKeyFrame = byte(1)
}
timestamp := time.Since(startTime).Microseconds()
if err := s.send(isKeyFrame, timestamp, data); err != nil {
continue
}
stream.GetFrameRateCounter().Update()
}
}
func (s *Streamer) send(isKeyFrame byte, timestamp int64, data []byte) error {
buf := BufferPool.Get().(*bytes.Buffer)
defer BufferPool.Put(buf)
buf.Reset()
if err := buf.WriteByte(isKeyFrame); err != nil {
log.Errorf("failed to write keyframe flag: %s", err)
return err
}
tsBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(tsBytes, uint64(timestamp))
if _, err := buf.Write(tsBytes); err != nil {
log.Errorf("failed to write timestamp: %s", err)
return err
}
if _, err := buf.Write(data); err != nil {
log.Errorf("failed to write h264 data: %s", err)
return err
}
for client := range s.clients {
if err := client.WriteMessage(websocket.BinaryMessage, buf.Bytes()); err != nil {
log.Errorf("failed to write message to client %s: %s.", client.RemoteAddr(), err)
s.removeClient(client)
}
}
return nil
}