package grpc import ( "io" "github.com/sorti/openspeak/internal/voice" pb "github.com/sorti/openspeak/pkg/api/openspeak/v1" ) // VoiceServiceServer implements the VoiceService gRPC service type VoiceServiceServer struct { pb.UnimplementedVoiceServiceServer server *Server } // NewVoiceServiceServer creates a new VoiceServiceServer func NewVoiceServiceServer(s *Server) *VoiceServiceServer { return &VoiceServiceServer{ server: s, } } // PublishVoiceStream publishes voice packets from a client func (v *VoiceServiceServer) PublishVoiceStream(stream pb.VoiceService_PublishVoiceStreamServer) error { for { // Receive voice packet from client pbPacket, err := stream.Recv() if err == io.EOF { // Client closed the stream return nil } if err != nil { return err } // Convert proto packet to internal format internalPacket := &voice.Packet{ SourceUserID: pbPacket.SourceUserId, ChannelID: pbPacket.ChannelId, SequenceNum: pbPacket.SequenceNumber, Timestamp: pbPacket.Timestamp, SSRC: pbPacket.Ssrc, Payload: pbPacket.Payload, ClientTime: pbPacket.ClientTimestamp, } // Route packet to subscribers v.server.voiceRouter.PublishPacket(internalPacket) // Send acknowledgment err = stream.Send(&pb.PublishVoiceResponse{ Success: true, LastReceivedSequence: pbPacket.SequenceNumber, }) if err != nil { return err } } } // SubscribeVoiceStream subscribes to voice packets from a channel func (v *VoiceServiceServer) SubscribeVoiceStream(req *pb.SubscribeVoiceRequest, stream pb.VoiceService_SubscribeVoiceStreamServer) error { if req.ChannelId == "" { return ErrInvalidChannel } // Create a subscriber function that sends packets through the stream subscriber := func(packet *voice.Packet) error { pbPacket := &pb.VoicePacket{ SourceUserId: packet.SourceUserID, ChannelId: packet.ChannelID, SequenceNumber: packet.SequenceNum, Timestamp: packet.Timestamp, Ssrc: packet.SSRC, Payload: packet.Payload, PayloadLength: int32(len(packet.Payload)), ClientTimestamp: int64(packet.ClientTime), } return stream.Send(pbPacket) } // Subscribe to voice packets for this channel v.server.voiceRouter.Subscribe(req.ChannelId, subscriber) // Keep the stream open until the client closes it // In a production implementation, we would properly track subscriptions // and clean them up when the stream closes select {} }