Unverified Commit 4e7fd861 authored by Leeward Bound's avatar Leeward Bound 💼

feat: play file

parent 9ab45fe2
Pipeline #244 passed with stage
in 1 minute and 20 seconds
......@@ -46,7 +46,7 @@ func NewBaseJob(manager *Manager, handler string, jobID string) *Job {
func NewPeerJob(manager *Manager, handler string, roomID string, jobID string) *PeerJob {
userID := "job-" + handler + "-" + jobID
return &PeerJob{
Job: *NewBaseJob(manager, handler, jobID),
Job: *NewBaseJob(manager, handler, jobID),
mediaEngine: &webrtc.MediaEngine{},
peerJobData: &pb.PeerJobData{
RoomID: roomID,
......@@ -121,6 +121,35 @@ func (j *PeerJob) GetPeerConnection() (*webrtc.PeerConnection, error) {
return nil, err
}
j.pc = pc
pc.OnICECandidate(func(c *webrtc.ICECandidate) {
if c == nil {
// Gathering done
return
}
bytes, err := json.Marshal(c.ToJSON())
if err != nil {
log.Errorf("OnIceCandidate error %s", err)
}
log.Debugf("send candidate:\n %s", string(bytes))
err = EnqueueRequest(j.GetQueueToPeer(),
&pb.NoirRequest{
Command: &pb.NoirRequest_Signal{
Signal: &pb.SignalRequest{
Id: j.peerJobData.UserID,
Payload: &pb.SignalRequest_Trickle{
Trickle: &pb.Trickle{
Init: string(bytes),
},
},
},
},
},
)
if err != nil {
log.Errorf("OnIceCandidate error %s", err)
}
})
}
return j.pc, nil
}
......@@ -151,29 +180,50 @@ func (j *PeerJob) SendJoin() error {
})
}
func (j *PeerJob) WaitForReply() (*pb.NoirReply, error) {
message, err := j.GetQueueFromPeer().BlockUntilNext(QueueMessageTimeout)
if err == io.EOF {
return nil, nil
}
if err != nil {
j.KillWithError(err)
return nil, err
}
var reply pb.NoirReply
err = proto.Unmarshal(message, &reply)
if err != nil {
j.KillWithError(err)
return nil, err
}
return &reply, nil
}
func (j *PeerJob) SendSignalRequest(request *pb.SignalRequest) error {
request.Id = j.peerJobData.UserID
return EnqueueRequest(j.GetQueueToPeer(), &pb.NoirRequest{
Command: &pb.NoirRequest_Signal{Signal: request},
})
}
func (j *PeerJob) PeerBridge() {
peerQueue := j.GetQueueFromPeer()
for {
message, err := peerQueue.BlockUntilNext(QueueMessageTimeout)
if err == io.EOF {
// WebRTC Transport closed
log.Infof("WebRTC Transport Closed")
j.Kill(0)
reply, err := j.WaitForReply()
if err != nil {
return
}
if err != nil {
if reply == nil {
continue
}
var reply pb.NoirReply
err = proto.Unmarshal(message, &reply)
if signal, ok := reply.Command.(*pb.NoirReply_Signal); ok {
if join := signal.Signal.GetJoin(); join != nil {
log.Debugf("playfile connected %s => %s!\n", signal.Signal.Id)
log.Debugf("%s joined %s => %s!\n", j.jobData.Handler, signal.Signal.Id)
// Set the remote SessionDescription
desc := &webrtc.SessionDescription{}
json.Unmarshal(join.Description, desc)
......@@ -182,8 +232,28 @@ func (j *PeerJob) PeerBridge() {
return
}
}
if trickle := signal.Signal.GetTrickle(); trickle != nil {
//log.Debugf("job trickle %s", trickle)
var candidate webrtc.ICECandidateInit
_ = json.Unmarshal([]byte(trickle.Init), &candidate)
err := j.pc.AddICECandidate(candidate)
if err != nil {
log.Errorf("error adding ice candidate: %e", err)
}
}
if data := signal.Signal.GetDescription(); data != nil {
log.Infof("job negotiate %s", data)
var desc webrtc.SessionDescription
err := json.Unmarshal(data, &desc)
if err != nil {
log.Errorf("Unmarshal negotiate error %s", err)
continue
}
}
if signal.Signal.GetKill() {
log.Debugf("signal killed user=%s", signal.Signal.Id)
log.Debugf("signal killed job=%s", signal.Signal.Id)
j.Kill(0)
return
}
......
......@@ -94,7 +94,7 @@ func (j *PlayFileJob) Handle() {
}
go func() {
defer peerConnection.Close()
defer j.Kill(0)
// Open a IVF file and start reading using our IVFReader
file, ivfErr := os.Open(filename)
......@@ -112,7 +112,7 @@ func (j *PlayFileJob) Handle() {
log.Infof("waiting for connection...")
// Wait for connection established
<-iceConnectedCtx.Done()
log.Infof("done waiting for connection...")
log.Infof("done waiting, job connected!")
// A positive repeat will play the file N times, a negative repeat will loop forever
repeat := j.options.Repeat
......
......@@ -2,6 +2,7 @@ package noir
import (
"github.com/go-redis/redis"
"io"
"time"
)
......@@ -55,7 +56,7 @@ func (q *redisQueue) Next() ([]byte, error) {
func (q *redisQueue) BlockUntilNext(timeout time.Duration) ([]byte, error) {
result, err := q.client.BRPop(timeout, q.topic).Result()
if err != nil {
return nil, err
return nil, io.EOF
}
return []byte(result[1]), nil
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment