Unverified Commit 06bc95d9 authored by Leeward Bound's avatar Leeward Bound 💼

fix: node checkin was broken

parent 2d022cf7
Pipeline #182 passed with stage
in 2 minutes and 14 seconds
......@@ -261,7 +261,11 @@ func (m *Manager) SFU() *NoirSFU {
func (m *Manager) Checkin() error {
id := m.worker.ID()
status := &pb.NodeData{Id: id, LastUpdate: timestamppb.Now()}
status := &pb.NoirObject{
Data: &pb.NoirObject_Node{
Node: &pb.NodeData{Id: id, LastUpdate: timestamppb.Now()},
},
}
value, err := proto.Marshal(status)
if err != nil {
return err
......@@ -328,20 +332,23 @@ func (m *Manager) UpdateAvailableNodes() error {
return err
}
var decode pb.NodeData
var decode pb.NoirObject
if err := proto.Unmarshal([]byte(status), &decode); err != nil {
log.Errorf("error decoding worker data, ignoring worker %s", err)
delete(m.workers, id)
continue
}
if !ValidateHealthy(&decode) && decode.Id != m.worker.ID() {
log.Warnf("haven't heard from %s; marking it offline", decode.Id)
m.MarkOffline(decode.Id)
remote := decode.GetNode()
if !ValidateHealthy(remote) && remote.Id != m.worker.ID() {
log.Warnf("haven't heard from %s; marking it offline", remote.Id)
m.MarkOffline(remote.Id)
continue
}
m.workers[id] = decode
m.workers[id] = *decode.GetNode()
}
m.updated = time.Now()
......@@ -533,6 +540,7 @@ func (m *Manager) GetRemoteNodeData(nodeID string) (*pb.NodeData, error) {
log.Warnf("failed loading noirstatus %s", err)
return nil, err
}
if err = proto.Unmarshal([]byte(data), loaded); err != nil {
log.Warnf("failed unmarshaling noirstatus %s", err)
return nil, err
......@@ -545,6 +553,19 @@ func (m *Manager) GetRemoteNodeData(nodeID string) (*pb.NodeData, error) {
return loaded.GetNode(), nil
}
func (m *Manager) ValidateHealthyNodeID(nodeID string) error {
exists, _ := m.redis.HExists(pb.KeyNodeMap(), nodeID).Result()
if exists == false {
return errors.New("no such room")
}
data, invalid := m.GetRemoteNodeData(nodeID)
if invalid == nil && ValidateHealthy(data) {
return nil
}
return invalid
}
func ParseSDP(offer webrtc.SessionDescription) (*sdp.SessionDescription, error) {
desc, err := offer.Unmarshal()
if err != nil {
......
......@@ -64,22 +64,30 @@ func (r *router) NextCommand() (*pb.NoirRequest, error) {
func (r *router) TargetForSignal(action string, signal *pb.SignalRequest) (string, error) {
// Signal messages get routed to the worker handling the Room
room, _ := r.mgr.LookupSignalRoomID(signal)
roomID, _ := r.mgr.LookupSignalRoomID(signal)
roomExists, _ := r.mgr.GetRemoteRoomExists(room)
roomExists, _ := r.mgr.GetRemoteRoomExists(roomID)
if roomExists == false {
// Assign the first peer queue a Room to a new worker based on capacity
log.Infof("no such room, routing to random worker")
return r.mgr.FirstAvailableWorkerID(action)
log.Infof("no such roomID, routing to random worker")
target, err := r.mgr.FirstAvailableWorkerID(action)
claimed, err := r.mgr.ClaimRoomNode(roomID, target)
if claimed == true && err == nil {
return target, nil
} else {
return "", err
}
} else {
roomData, err := r.mgr.GetRemoteRoomData(room)
roomData, err := r.mgr.GetRemoteRoomData(roomID)
if err != nil {
log.Errorf("error getting room data: %s", err)
log.Errorf("error getting roomID data: %s", err)
return "", err
}
nodeData, err := r.mgr.GetRemoteNodeData(roomData.NodeID)
if ValidateHealthy(nodeData) {
err = r.mgr.ValidateHealthyNodeID(roomData.NodeID)
if err == nil {
log.Infof("found node %s", roomData.NodeID)
return roomData.NodeID, nil
} else {
......@@ -94,9 +102,9 @@ func (r *router) TargetForSignal(action string, signal *pb.SignalRequest) (strin
return "", err
}
if claimed == false {
return "", errors.New("unable to assign room")
return "", errors.New("unable to assign roomID")
}
log.Infof("room %s was assigned to %s which is offline, moved to %s", room, roomData.NodeID, target)
log.Infof("roomID %s was assigned to %s which is offline, moved to %s", roomID, roomData.NodeID, target)
return target, nil
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment