Unverified Commit 027f7043 authored by Leeward Bound's avatar Leeward Bound 💼

fix(publish): restoring peer news channels for pubsub clients

parent 690ec73b
Pipeline #202 passed with stage
in 1 minute and 39 seconds
......@@ -138,7 +138,7 @@ func main() {
go signal.PublicJSONRPC(&mgr, publicJrpcAddr, key, cert)
}
if adminJrpcAddr != "" {
go signal.AdminJSONRPC(&SFU, adminJrpcAddr)
go signal.AdminJSONRPC(&mgr, adminJrpcAddr)
}
if grpcAddr != "" {
go signal.AdminGRPC(&SFU, grpcAddr)
......
......@@ -2,24 +2,39 @@ package signal
import (
"context"
noir2 "github.com/net-prophet/noir/pkg/noir"
"encoding/json"
"github.com/net-prophet/noir/pkg/noir"
pb "github.com/net-prophet/noir/pkg/proto"
log "github.com/pion/ion-log"
"github.com/sourcegraph/jsonrpc2"
)
type adminJSONRPC struct {
sfu *noir2.NoirSFU
sfu *noir.NoirSFU
manager *noir.Manager
}
func NewAdminJSONRPC(s *noir2.NoirSFU) *adminJSONRPC {
return &adminJSONRPC{s}
func NewAdminJSONRPC(s *noir.NoirSFU, manager *noir.Manager) *adminJSONRPC {
return &adminJSONRPC{s, manager}
}
func (a *adminJSONRPC) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
log.Infof("got method %s", req.Method)
router := a.manager.GetRouter()
routerQueue := (*router).GetQueue()
original, _ := req.MarshalJSON()
var cmd *pb.NoirRequest
json.Unmarshal(original, cmd)
noir.EnqueueRequest(*routerQueue, cmd)
}
func (s *adminJSONRPC) Close() {
}
func (s *adminJSONRPC) Listen(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
......
......@@ -58,7 +58,7 @@ func PublicJSONRPC(mgr *noir.Manager, publicJrpcAddr string, key string, cert st
}
func AdminJSONRPC(n *noir.NoirSFU, adminJrpcAddr string) {
func AdminJSONRPC(mgr *noir.Manager, adminJrpcAddr string) {
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
......@@ -74,7 +74,7 @@ func AdminJSONRPC(n *noir.NoirSFU, adminJrpcAddr string) {
}
defer c.Close()
p := NewAdminJSONRPC(n)
p := NewAdminJSONRPC(mgr.SFU(), mgr)
defer p.Close()
......
......@@ -17,6 +17,7 @@ func (w *worker) HandleSignal(request *pb.NoirRequest) error {
return nil
}
func (w *worker) HandleJoin(signal *pb.SignalRequest) error {
w.mu.Lock()
defer w.mu.Unlock()
......@@ -43,7 +44,6 @@ func (w *worker) HandleJoin(signal *pb.SignalRequest) error {
}
recv := w.manager.GetQueue(pb.KeyTopicToPeer(pid))
send := w.manager.GetQueue(pb.KeyTopicFromPeer(pid))
log.Infof("listening on %s", recv.Topic())
......@@ -52,7 +52,7 @@ func (w *worker) HandleJoin(signal *pb.SignalRequest) error {
if err != nil {
log.Errorf("OnIceCandidate error %s", err)
}
err = EnqueueReply(send, &pb.NoirReply{
w.SignalReply(pid, &pb.NoirReply{
Command: &pb.NoirReply_Signal{
Signal: &pb.SignalReply{
Id: pid,
......@@ -80,7 +80,7 @@ func (w *worker) HandleJoin(signal *pb.SignalRequest) error {
if err != nil {
log.Errorf("OnIceCandidate error %s", err)
}
err = EnqueueReply(send, &pb.NoirReply{
w.SignalReply(pid, &pb.NoirReply{
Command: &pb.NoirReply_Signal{
Signal: &pb.SignalReply{
Id: pid,
......@@ -104,7 +104,7 @@ func (w *worker) HandleJoin(signal *pb.SignalRequest) error {
packed, _ := json.Marshal(answer)
EnqueueReply(send, &pb.NoirReply{
w.SignalReply(pid, &pb.NoirReply{
Command: &pb.NoirReply_Signal{
Signal: &pb.SignalReply{
Id: pid,
......@@ -123,9 +123,14 @@ func (w *worker) HandleJoin(signal *pb.SignalRequest) error {
return nil
}
func (w *worker) SignalReply(pid string, reply *pb.NoirReply) error {
send := w.manager.GetQueue(pb.KeyTopicFromPeer(pid))
defer w.manager.redis.Publish(pb.KeyPeerNewsChannel(pid), pid)
return EnqueueReply(send, reply)
}
func (w *worker) PeerChannel(userData *pb.UserData, peer *sfu.Peer) {
recv := w.manager.GetQueue(pb.KeyTopicToPeer(userData.Id))
send := w.manager.GetQueue(pb.KeyTopicFromPeer(userData.Id))
for {
request := pb.NoirRequest{}
message, err := recv.BlockUntilNext(0)
......@@ -190,7 +195,7 @@ func (w *worker) PeerChannel(userData *pb.UserData, peer *sfu.Peer) {
answer, _ := peer.Answer(desc.Desc)
bytes, err := json.Marshal(answer)
log.Debugf("got offer, sending reply %s", string(bytes))
err = EnqueueReply(send, &pb.NoirReply{
w.SignalReply(userData.Id, &pb.NoirReply{
Command: &pb.NoirReply_Signal{
Signal: &pb.SignalReply{
Id: userData.Id,
......
package proto
// This is a mini-DSL for describing key hierarchy in redis
// The complex KeyMap/KeyGroup never actually get used!!
// Except as the reference implementation of the key hierarchy
// So you can use it to generate a nice looking explanation of the keys
// and some helper functions
const (
TYPE_SET = 0
TYPE_SETNX = 1
TYPE_MAP = 2
TYPE_LIST = 3
)
type KeyMap struct {
groups map[string][]KeyGroup
}
type KeyGroup struct {
path string
pathParameter string
mapParameter string
keyType int32
}
func NewKeyMap() KeyMap {
return KeyMap{
groups: map[string][]KeyGroup{
"nodes": {
KeyGroup{
path: "data",
pathParameter: "", // none
mapParameter: "node",
keyType: TYPE_MAP,
},
KeyGroup{
path: "cleanup",
pathParameter: "target",
keyType: TYPE_SETNX,
},
},
"rooms": {
KeyGroup{
path: "data",
keyType: TYPE_SET,
},
KeyGroup{
path: "nodes",
pathParameter: "node",
mapParameter: "room",
keyType: TYPE_MAP,
},
},
"users": {
KeyGroup{
path: "data",
keyType: TYPE_SET,
},
KeyGroup{
path: "rooms",
pathParameter: "room",
mapParameter: "user",
keyType: TYPE_MAP,
},
},
},
}
}
// Node Map
func KeyNodeMap() string {
......@@ -107,3 +39,9 @@ func KeyTopicToPeer(peerID string) string {
func KeyTopicFromPeer(peerID string) string {
return "noir/topic/client/" + peerID
}
// Topic News Channels - PUBLISH when topic has new messages
func KeyPeerNewsChannel(peerID string) string {
return "noir/news/peers/" + peerID
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment