Unverified Commit c1745aba authored by Leeward Bound's avatar Leeward Bound 💼

feat: jobs, play file job

parent 75850756
Pipeline #228 passed with stage
in 1 minute and 10 seconds
......@@ -6,7 +6,9 @@ import (
"flag"
"fmt"
noir "github.com/net-prophet/noir/pkg/noir"
"github.com/net-prophet/noir/pkg/noir/jobs"
"github.com/net-prophet/noir/pkg/noir/servers"
pb "github.com/net-prophet/noir/pkg/proto"
"net/http"
"os"
......@@ -134,6 +136,13 @@ func main() {
mgr := noir.SetupNoir(&sfu, rdb, id)
worker := *(mgr.GetWorker())
worker.RegisterHandler("PlayFile", func(request *pb.NoirRequest) noir.RunnableJob {
admin := request.GetAdmin()
roomAdmin := admin.GetRoomAdmin()
return jobs.NewPlayFileJob(&mgr, roomAdmin.GetRoomID(), "pink.video", true)
})
go mgr.Noir()
defer mgr.Cleanup()
......
......@@ -50,4 +50,4 @@ sdpsemantics = "unified-plan"
# icelite = true
[log]
level = "info"
level = "debug"
This diff is collapsed.
This diff is collapsed.
package noir
import (
pb "github.com/net-prophet/noir/pkg/proto"
log "github.com/pion/ion-log"
"github.com/pion/webrtc/v3"
"google.golang.org/protobuf/types/known/timestamppb"
)
type Job struct {
id string
manager *Manager
jobData *pb.JobData
}
type PeerJob struct {
Job
roomID string
peerJobData *pb.PeerJobData
pc *webrtc.PeerConnection
}
type RunnableJob interface {
Handle()
}
func NewBaseJob(manager *Manager, handler string, jobID string) *Job {
return &Job{
id: jobID,
manager: manager,
jobData: &pb.JobData{
Id: jobID,
Handler: handler,
Status: 0,
Created: timestamppb.Now(),
LastUpdate: timestamppb.Now(),
NodeID: manager.ID(),
},
}
}
func NewPeerJob(manager *Manager, handler string, roomID string, peerID string) *PeerJob {
return &PeerJob{
Job: *NewBaseJob(manager, handler, "job-"+peerID),
peerJobData: &pb.PeerJobData{
RoomID: roomID,
PeerID: peerID,
PublishTracks: []string{},
SubscribeTracks: []string{},
},
}
}
func (j *Job) GetCommandQueue() Queue {
return j.manager.GetQueue(pb.KeyTopicFromJob(j.id))
}
func (j *PeerJob) GetPeerQueue() Queue {
return j.manager.GetQueue(pb.KeyTopicFromPeer(j.peerJobData.GetPeerID()))
}
func (j *Job) GetManager() *Manager {
return j.manager
}
func (j *Job) GetData() *pb.JobData {
return j.jobData
}
func (j *Job) Kill(code int) {
log.Infof("exited %s handler=%s jobid=%s ", code, j.jobData.GetHandler(), j.id)
}
func (j *Job) KillWithError(err error) {
log.Errorf("job error: %s", err)
j.Kill(1)
}
func (j *PeerJob) GetPeerData() *pb.PeerJobData {
return j.peerJobData
}
func (j *PeerJob) GetPeerConnection() *webrtc.PeerConnection {
return j.pc
}
func (j *PeerJob) GetFromPeerQueue() Queue {
return j.manager.GetQueue(pb.KeyTopicFromPeer(j.peerJobData.PeerID))
}
package jobs
import (
"context"
"encoding/json"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/net-prophet/noir/pkg/noir"
pb "github.com/net-prophet/noir/pkg/proto"
log "github.com/pion/ion-log"
"github.com/pion/randutil"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"github.com/pion/webrtc/v3/pkg/media/ivfreader"
"io"
"os"
"time"
)
type PlayFileOptions struct {
Filename string `json:"filename"`
Repeat bool `json:"repeat"`
}
type PlayFileJob struct {
noir.PeerJob
options *PlayFileOptions
}
const handler = "PlayFile"
func NewPlayFileJob(manager *noir.Manager, roomID string, filename string, repeat bool) *PlayFileJob {
return &PlayFileJob{
PeerJob: *noir.NewPeerJob(manager, handler, roomID, noir.RandomString(16)),
options: &PlayFileOptions{Filename: filename, Repeat: repeat},
}
}
func (j *PlayFileJob) Handle() {
// Assert that we have an audio or video file
filename := j.options.Filename
_, err := os.Stat(filename)
haveVideoFile := !os.IsNotExist(err)
if !haveVideoFile {
panic("Could not find `" + filename + "`")
}
// We make our own mediaEngine so we can place the sender's codecs in it. This because we must use the
// dynamic media type from the sender in our answer. This is not required if we are the offerer
mediaEngine := webrtc.MediaEngine{}
mediaEngine.RegisterDefaultCodecs()
// Create a new RTCPeerConnection
api := webrtc.NewAPI(webrtc.WithMediaEngine(&mediaEngine))
peerConnection, err := api.NewPeerConnection(webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
})
if err != nil {
j.KillWithError(err)
return
}
iceConnectedCtx, iceConnectedCtxCancel := context.WithCancel(context.Background())
videoTrack, err := webrtc.NewTrackLocalStaticSample(
webrtc.RTPCodecCapability{MimeType: "video/vp8"},
fmt.Sprintf("video-%d", randutil.NewMathRandomGenerator().Uint32()),
fmt.Sprintf("video-%d", randutil.NewMathRandomGenerator().Uint32()),
)
// Create a video track
_, err = peerConnection.AddTrack(videoTrack)
if err != nil {
j.KillWithError(err)
return
}
go func() {
defer peerConnection.Close()
// Open a IVF file and start reading using our IVFReader
file, ivfErr := os.Open(filename)
if ivfErr != nil {
j.KillWithError(ivfErr)
return
}
ivf, header, ivfErr := ivfreader.NewWith(file)
if ivfErr != nil {
j.KillWithError(ivfErr)
return
}
log.Infof("waiting for connection...")
// Wait for connection established
<-iceConnectedCtx.Done()
log.Infof("done waiting for connection...")
// Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as.
// This isn't required since the video is timestamped, but we will such much higher loss if we send all at once.
sleepTime := time.Millisecond * time.Duration((float32(header.TimebaseNumerator)/float32(header.TimebaseDenominator))*1000)
for {
frame, _, ivfErr := ivf.ParseNextFrame()
if ivfErr == io.EOF {
fmt.Printf("All video frames parsed and sent")
j.Kill(0)
return
}
if ivfErr != nil {
j.KillWithError(ivfErr)
return
}
time.Sleep(sleepTime)
if ivfErr = videoTrack.WriteSample(media.Sample{Data: frame}); ivfErr != nil {
j.KillWithError(ivfErr)
return
}
}
}()
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
fmt.Printf("Connection State has changed %s \n", connectionState.String())
if connectionState == webrtc.ICEConnectionStateConnected {
iceConnectedCtxCancel()
}
})
gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
offer, err := peerConnection.CreateOffer(nil)
if err != nil {
log.Errorf("Error creating offer: %v", err)
j.KillWithError(err)
}
if err = peerConnection.SetLocalDescription(offer); err != nil {
log.Errorf("Error setting local description: %v", err)
j.KillWithError(err)
}
<-gatherComplete
if err != nil {
log.Errorf("Error publishing stream: %v", err)
j.KillWithError(err)
}
router := j.GetManager().GetRouter()
queue := (*router).GetQueue()
peerID := j.GetPeerData().PeerID
roomID := j.GetPeerData().RoomID
log.Infof("joining room=%s peer=%s", roomID, peerID)
err = noir.EnqueueRequest(*queue, &pb.NoirRequest{
Command: &pb.NoirRequest_Signal{
Signal: &pb.SignalRequest{
Id: peerID,
Payload: &pb.SignalRequest_Join{
Join: &pb.JoinRequest{
Sid: roomID,
Description: []byte(peerConnection.LocalDescription().SDP),
},
},
},
},
})
if err != nil {
log.Errorf("Error sending publish request: %v", err)
j.KillWithError(err)
}
peerQueue := j.GetPeerQueue()
for {
message, err := peerQueue.BlockUntilNext(noir.QueueMessageTimeout)
if err == io.EOF {
// WebRTC Transport closed
fmt.Println("WebRTC Transport Closed")
j.Kill(0)
return
}
if err != nil {
continue
}
var reply pb.NoirReply
err = proto.Unmarshal(message, &reply)
if signal, ok := reply.Command.(*pb.NoirReply_Signal); ok {
if join := signal.Signal.GetJoin() ; join != nil {
log.Debugf("playfile connected %s => %s!\n", peerID)
// Set the remote SessionDescription
desc := &webrtc.SessionDescription{}
json.Unmarshal(join.Description, desc)
if err = peerConnection.SetRemoteDescription(*desc); err != nil {
j.KillWithError(err)
return
}
}
if signal.Signal.GetKill() {
log.Debugf("signal killed room=%s peer=%s", roomID, peerID)
j.Kill(0)
return
}
}
}
}
// Search for Codec PayloadType
//
// Since we are answering we need to match the remote PayloadType
/*
func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecName string) uint8 {
for _, codec := range m.GetCodecsByKind(codecType) {
if codec.Name == codecName {
return codec.PayloadType
}
}
panic(fmt.Sprintf("Remote peer does not support %s", codecName))
}
*/
\ No newline at end of file
......@@ -98,7 +98,7 @@ func (m *Manager) Noir() {
panic("unable to retrieve cluster status")
}
if len(m.workers) == 0 {
panic("no node data found in redis (not even my own!)")
panic("no node jobData found in redis (not even my own!)")
}
case <-info.C:
log.Infof("%s: noirs=%d rooms=%d users=%d",
......@@ -218,7 +218,7 @@ func (m *Manager) ConnectUser(signal *pb.SignalRequest) (*sfu.Peer, *pb.UserData
if numTracks == 1 && desc.MediaDescriptions[0].MediaName.Media == "application" {
publishing = false
} else if numTracks >= 1 {
// we have more than 1 media track, or the 1 track we have is not data
// we have more than 1 media track, or the 1 track we have is not jobData
publishing = true
}
......@@ -355,14 +355,14 @@ func (m *Manager) UpdateAvailableNodes() error {
for _, id := range ids {
status, err := m.redis.HGet(pb.KeyNodeMap(), id).Result()
if err != nil {
log.Errorf("error getting worker data %s", err)
log.Errorf("error getting worker jobData %s", err)
return err
}
var decode pb.NoirObject
if err := proto.Unmarshal([]byte(status), &decode); err != nil {
log.Errorf("error decoding worker data, ignoring worker %s", err)
log.Errorf("error decoding worker jobData, ignoring worker %s", err)
delete(m.workers, id)
continue
}
......@@ -413,7 +413,7 @@ func (m *Manager) GetRemoteRoomExists(roomID string) (bool, error) {
func (m *Manager) GetRemoteRoomData(roomID string) (*pb.RoomData, error) {
loaded, err := m.LoadData(pb.KeyRoomData(roomID))
if err != nil {
log.Errorf("error loading room data! %s", err)
log.Errorf("error loading room jobData! %s", err)
return nil, err
}
room := m.rooms[roomID]
......@@ -425,7 +425,7 @@ func (m *Manager) ClaimRoomNode(roomID string, nodeID string) (bool, error) {
m.mu.Lock()
defer m.mu.Unlock()
if exists, _ := m.GetRemoteRoomExists(roomID); exists == false {
room := NewRoom(roomID) // Just used for the data
room := NewRoom(roomID) // Just used for the jobData
data := &room.data
data.NodeID = m.id
err := SaveRoomData(roomID, data, m)
......@@ -560,7 +560,7 @@ func (m *Manager) ValidateOffer(room *pb.RoomData, userID string, offer webrtc.S
func (m *Manager) GetRemoteUserData(userID string) (*pb.UserData, error) {
loaded, err := m.LoadData(pb.KeyUserData(userID))
if err != nil {
log.Errorf("error loading user#%s data! %s", userID, err)
log.Errorf("error loading user#%s jobData! %s", userID, err)
return nil, err
}
return loaded.GetUser(), nil
......@@ -580,7 +580,7 @@ func (m *Manager) GetRemoteNodeData(nodeID string) (*pb.NodeData, error) {
}
if err != nil {
log.Errorf("error loading room data! %s", err)
log.Errorf("error loading room jobData! %s", err)
return nil, err
}
return loaded.GetNode(), nil
......
......@@ -80,7 +80,7 @@ func (r *router) TargetForSignal(action string, signal *pb.SignalRequest) (strin
} else {
roomData, err := r.mgr.GetRemoteRoomData(roomID)
if err != nil {
log.Errorf("error getting roomID data: %s", err)
log.Errorf("error getting roomID jobData: %s", err)
return "", err
}
......
package servers
import "github.com/net-prophet/noir/pkg/noir"
func JobRunner(m *noir.Manager) {
}
......@@ -3,8 +3,8 @@ package noir
import (
"github.com/net-prophet/noir/pkg/proto"
log "github.com/pion/ion-log"
//"github.com/pion/ion-sfu/pkg/middlewares/datachannel"
"math/rand"
"runtime"
"sync"
"time"
......@@ -16,12 +16,14 @@ var (
)
type noirSFU struct {
webrtc sfu.WebRTCTransportConfig
router sfu.RouterConfig
mu sync.RWMutex
sessions map[string]*sfu.Session
nodeID string
manager *Manager
sfu.SFU
webrtc sfu.WebRTCTransportConfig
router sfu.RouterConfig
mu sync.RWMutex
sessions map[string]*sfu.Session
//datachannels []*sfu.Datachannel
nodeID string
manager *Manager
}
type NoirSFU interface {
......@@ -33,14 +35,15 @@ type NoirSFU interface {
func NewNoirSFU(c Config) NoirSFU {
rand.Seed(time.Now().UnixNano())
id := RandomString(8)
// Init ballast
ballast := make([]byte, c.Ion.SFU.Ballast*1024*1024)
w := sfu.NewWebRTCTransportConfig(c.Ion)
runtime.KeepAlive(ballast)
ion := sfu.NewSFU(c.Ion)
//dc := ion.NewDatachannel(sfu.APIChannelLabel)
//dc.Use(datachannel.SubscriberAPI)
return &noirSFU{
SFU: *ion,
webrtc: w,
sessions: make(map[string]*sfu.Session),
nodeID: id,
......@@ -60,7 +63,6 @@ func (s *noirSFU) ensureSession(sessionID string) *sfu.Session {
return s
}
log.Infof("creating session %s", sessionID)
mgr := *s.manager
......@@ -71,6 +73,7 @@ func (s *noirSFU) ensureSession(sessionID string) *sfu.Session {
session.OnClose(func() {
log.Infof("closing session %s", sessionID)
room, err := mgr.GetRemoteRoomData(sessionID)
defer mgr.UpdateRoomScore(sessionID)
if room != nil && err == nil {
if room.Options.MaxAgeSeconds == -1 {
......
......@@ -90,8 +90,8 @@ func ReadAdminAction(admin *pb.AdminRequest) (string, error) {
switch roomAdmin.Method.(type) {
case *pb.RoomAdminRequest_CreateRoom:
return action + "room.create", nil
case *pb.RoomAdminRequest_PlayFile:
return action + "room.playfile", nil
case *pb.RoomAdminRequest_RoomJob:
return action + "room.runjob", nil
default:
return action, errors.New("unhandled roomadmin")
}
......
......@@ -17,6 +17,7 @@ const (
type Worker interface {
HandleForever()
HandleNext(timeout time.Duration) error
RegisterHandler(name string, handler JobHandler)
GetQueue() *Queue
ID() string
}
......@@ -24,22 +25,25 @@ type Worker interface {
// worker runs 2 go threads -- Router() takes incoming router messages and loadbalances
// commands across commands queues on nodes while CommandRunner() runs commands on this node's queue
type worker struct {
id string
manager *Manager
queue Queue
mu sync.RWMutex
id string
manager *Manager
jobHandlers map[string]JobHandler
queue Queue
mu sync.RWMutex
}
type JobHandler func(request *pb.NoirRequest) RunnableJob
func NewRedisWorkerQueue(client *redis.Client, id string) Queue {
return NewRedisQueue(client, pb.KeyWorkerTopic(id), RouterMaxAge)
}
func NewRedisWorker(id string, manager *Manager, client *redis.Client) Worker {
return &worker{id: id, manager: manager, queue: NewRedisWorkerQueue(client, id)}
return &worker{id: id, manager: manager, queue: NewRedisWorkerQueue(client, id), jobHandlers: map[string]JobHandler{}}
}
func NewWorker(id string, manager *Manager, queue Queue) Worker {
return &worker{id: id, manager: manager, queue: queue}
return &worker{id: id, manager: manager, queue: queue, jobHandlers: map[string]JobHandler{}}
}
func (w *worker) HandleForever() {
......@@ -60,6 +64,11 @@ func (w *worker) HandleNext(timeout time.Duration) error {
return w.Handle(request)
}
func (w *worker) RegisterHandler(name string, handler JobHandler) {
log.Debugf("register job handler: %s", name)
w.jobHandlers[name] = handler
}
func (w *worker) NextCommand(timeout time.Duration) (*pb.NoirRequest, error) {
msg, popErr := w.queue.BlockUntilNext(timeout)
if popErr != nil {
......
......@@ -18,6 +18,19 @@ func (w *worker) Reply(request *pb.NoirRequest, reply *pb.NoirReply) error {
return nil
}
func (w *worker) HandleRoomJob(request *pb.NoirRequest) {
admin := request.GetAdmin()
roomAdmin := admin.GetRoomAdmin()
roomJob := roomAdmin.GetRoomJob()
handler, OK := w.jobHandlers[roomJob.GetHandler()]
if OK {
job := handler(request)
go job.Handle()
} else {
log.Errorf("no handler for job: %s", roomJob.GetHandler())
}
}
func (w *worker) HandleAdmin(request *pb.NoirRequest) error {
admin := request.GetAdmin()
if roomAdmin := admin.GetRoomAdmin() ; roomAdmin != nil {
......@@ -32,6 +45,10 @@ func (w *worker) HandleAdmin(request *pb.NoirRequest) error {
room.SetOptions(createRoom.GetOptions())
return SaveRoomData(roomAdmin.RoomID, &room.data, w.manager)
}
if roomJob := roomAdmin.GetRoomJob() ; roomJob != nil {
log.Infof("room=%s job=%s", roomAdmin.RoomID, roomJob.Handler)
w.HandleRoomJob(request)
}
} else if list := admin.GetRoomList() ; list != nil {
keys := w.manager.redis.ZCount(pb.KeyRoomScores(), "1", "+inf").Val()
rooms := []*pb.RoomListEntry{}
......
......@@ -176,7 +176,7 @@ func (w *worker) PeerChannel(userData *pb.UserData, peer *sfu.Peer) {
roomType := "room"
// Just one data track
// Just one jobData track
if D == 1 && A == 0 && V == 0 {
userData.Publishing = false
} else if A > 0 || V > 0 {
......
......@@ -52,6 +52,14 @@ func KeyTopicFromAdmin(clientID string) string {
return "noir/topic/from-admin/" + clientID
}
func KeyTopicToJob(jobID string) string {
return "noir/topic/to-job/" + jobID
}
func KeyTopicFromJob(jobID string) string {
return "noir/topic/from-job/" + jobID
}
// Topic News Channels - PUBLISH when topic has new messages