티스토리 뷰
브라우저에서 음성채팅을 구현하기 위해 웹에서 실시간 미디어 스트림을 송수신할 수 있는 WebRTC 기술을 사용했다. 먼저 어떻게 동작하는지 알기 위해 표준 방식인 P2P를 구현한 데모코드를 실행해봤다.
아래는 P2P방식의 서버 코드다. 그런데 이상하지 않은가? P2P방식의 가장 큰 특징이 중간 서버를 거치지 않고 Peer간 직접 통신을 해 지연시간이 낮다는건데, 그럼 아래의 서버 코드는 뭘까? 바로 시그널링 서버다. 시그널링 서버는 피어 간 연결을 맺기 위한 메타데이터를 교환하는 역할을 담당하고 있다. 즉, 실제로 미디어를 송수신하는 것은 피어 간 직접적으로 수행되고 해당 서버는 실시간 미디어스트림을 송수신 하기 전 피어들을 연결시키는 역할이다.
const express = require("express");
const app = express();
const cors = require("cors");
const compression = require("compression");
app.use(cors());
app.use(compression());
const socket_cors = {
cors: {
origin: "http://localhost:3000",
methods: ["GET", "POST"],
},
};
const http_server = require("http").createServer(app).listen(8081);
const io = require("socket.io")(http_server, socket_cors);
io.on("connection", (socket) => {
socket.emit("getid", socket.id);
socket.on("caller", (data) => {
io.to(data.ToCall).emit("caller", {
signal: data.signalData,
from: data.from,
});
});
socket.on("answerCall", (data) => {
io.to(data.to).emit("acceptcall", data.signal);
});
});
코드를 자세히 보면 클라이언트와 소켓으로 연결된 후, 클라이언트가 소켓을 통해 보낸 메타데이터를 소켓에 연결된 다른 피어한테 그대로 보내고 있는 것을 볼 수 있다. 메타데이터를 받은 피어 역시 소켓으로 자신의 데이터를 보냄으로써 피어가 서로 데이터를 주고받는다.
socket.on("caller", (data) => {
io.to(data.ToCall).emit("caller", {
signal: data.signalData,
from: data.from,
});
});
socket.on("answerCall", (data) => {
io.to(data.to).emit("acceptcall", data.signal);
});
메타데이터를 주고받고 나서 몇가지 단계(ip주소값 공유 등)을 더 거친 후에 Peer간에 실시간 미디어스트림 전달이 가능하다. 이렇게 시그널링 서버를 통해서 P2P를 구현하는 것을 Mesh 방식이라고 한다. 하지만 Mesh 방식으로 음성채팅을 구현하게 되면 몇가지 단점이 있는데, 그 중 하나가 연결되는 사용자 수가 늘어나면 클라이언트의 부하도 점점 심해진다. 실제로 동작해봤을 때 1:1 방식으로 연결했을 때는 문제가 없었지만 서로 브라우저창을 하나 더 늘려서 총 4개의 사용자가 연결된 상태로 만들어보니까 렉이 심해졌다.
P2P 통신에서는 각 참여자 간에 직접 통신을 하기 때문에 데이터가 나를 제외한 참여자의 수만큼 증가하게 된다. 미디어스트림을 송수신하기 위해 연결한 통신 경로를 link라고 하는데, 이 link는 참여자의 수에 따라 2배로 증가한다. 따라서 사용자 수가 증가하면 그만큼의 데이터를 전송해야 하는 양이 늘어나기 때문에 대역폭에 부하가 올 수 있다. 또한 P2P는 통신하는 Peer의 개수가 적을수록 P2P의 위력을 발휘할 수 있다.
내가 만들고 싶은 음성채팅은 최대 10명이 동시에 연결되어야 하고, 무엇보다 서비스가 데스크탑 애플리케이션이기 때문에 클라이언트에 부하가 올 수 있는 P2P방식을 사용하지 않아야 한다. 따라서 중간 서버를 둬서 최대한 앱을 사용하고 있는 사용자의 컴퓨터에 부담을 덜어주는 방식을 사용하기로 했다. 중간서버를 두어 구현하는 방식은 2가지가 있다.
최종적으로 SFU 방식을 선택했다. 그 이유는 다음과 같다.
WebRTC 아키텍쳐를 선택할 때 고려해야할 몇 가지 요소가 있다.
- 비용
- 확장성
- 유연성
비용을 물리적인 비용이라고 생각하면, SFU와 MCU 둘다 중앙 서버에 집중화된 아키텍쳐기 때문에, 구현 난이도와 서버에 들어가는 비용은 비슷할거라고 생각했다. 이제 확장성과 유연성을 생각해야 한다. 나는 최대한 저수준의 구현방식을 선호했다. 내가 만드려고 하는 서비스의 주요기능이 음성채팅이기 때문에 최대한 디스코드나 다른 대기업같이 좋은 품질로 만들어보고 싶었다. 그리고 저수준으로 만들게 되면 후에 확장해야 하거나 대역폭 관리, 보안 등 다양한 개선 작업을 할 때 훨씬 유연하다.
그럼 MCU는 저수준이 아닌가? 유연하게 짜지 못하나? 아니다. 최종적으로 SFU방식을 고른 이유는 mediasoup 라이브러리를 찾아보면서 선택했다. mediasoup 라이브러리는 화상 회의 또는 실시간 스트리밍 앱을 구축하기 위해 만들어진 저수준 라이브러리인데, mediasoup 홈페이지를 들어가면 기본적으로 SFU방식을 따르는 것을 볼 수 있었다.
저수준으로 구현하기 좋은 라이브러리와 중간 서버를 두는 SFU방식이 잘 들어맞았다. 물론 OpenVidu같은 다른 라이브러리도 있었는데 mediasoup가 인지도가 더 있고, 참고할 수 있는 커뮤니티도 있는 것 같아 선택했다. 그럼 mediasoup를 사용해서 SFU 방식을 적용해보자.
먼저 mediasoup에서 쓰이는 용어를 알아봤다
- Router: N명의 사용자가 참가한 방이라고 생각하면 된다. Router 내에서 클라이언트가 생성한 미디어는 모든 사용자에게 전달된다.
- Peer: 연결된 사용자를 나타낸다. 사용자와 연관된 Transport, Producer, Consumer를 보유하고 있다.
- Transport: 미디어스트림을 보내거나 받는 경로를 말한다. WebRTC P2P 방식에서는 이를 link라고 하는데, 보내는 경로와 받는 경로를 uplink, downlink라고 분리한다면 mediasoup에서는 sendTransport, recvTransport라고 분리한다.
- Producer: 클라이언트에서 서버로 보낸 미디어 트랙을 나타내고 처리하는 개체다.
- Consumer: 서버에서 클라이언트로 전송된 미디어 트랙을 나타내고 처리하는 개체다.
- Worker: Worker 하나당 여러 Router를 가질 수 있다. Worker는 CPU 코어에서 실행되기 때문에 개수를 제한해야 한다.
또한 모두가 모두에게 음성이 전달되어야 하기 때문에 아래와 같은 구조로 구현해야 한다.
정리해보면, 방에 참여한 사용자는 Peer로 나타낸다. 또한 Peer가 늘어날수록 필요한 Transport와 Producer, Consumer의 개수가 증가한다. 예를 들어 Peer가 4명이라고 한다면 Producer는 4개다. Producer를 쉽게 생각하면 Peer가 다른 Peer에게 음성을 전달하기 위한 개체이기 때문에 Peer당 하나의 Producer를 가지고 있다. Consumer은 12개다. Consumer을 쉽게 생각하면 다른 Peer들의 음성을 듣기 위한 객체이기 때문에 자신을 제외한 Peer들의 개수만큼 가지고 있어야 한다. 마지막으로 Transport는 16개다. Transport를 쉽게 생각하면 미디어스트림을 송수신하는 경로이기 때문에 각 Peer들의 Producer와 Consumer을 주고 받는 만큼 필요하다. 그래서 Producer의 개수와 Consumer의 개수를 더한 값이 Transport가 필요한 개수다.
물론 숫자가 중요한 것이 아니라 개체들이 각각 어떻게 사용하는지가 더 중요하다. 이제 코드를 보면서 통신 흐름을 잡아보자. 먼저 Room이라는 클래스를 생성한다.
class Room {
static rooms = new Map();
router;
peers;
constructor(router) {
this.router = router;
this.peers = new Map();
}
addPeer = (peer) => {
this.peers.set(peer.socketId, peer);
};
findPeerBySocketId = (socketId) => {
return this.peers.get(socketId);
};
getOtherPeerList = (socketId) => {
return Array.from(this.peers.values()).filter(
(peer) => peer.socketId !== socketId && peer.producer,
);
};
deletePeer = (socketId) => {
this.peers.delete(socketId);
};
}
Room.save = (roomName, room) => {
Room.rooms.set(roomName, room);
};
Room.findByName = (roomName) => {
return Room.rooms.get(roomName);
};
Room.delete = (roomName) => {
const room = Room.rooms.get(roomName);
room.router.close();
Room.rooms.delete(roomName);
};
export default Room;
Room은 router와 해당 방에 참여한 Peer객체들을 가지고 있다. 그리고 방에 Peer를 참여시키는 addPeer메서드, Peer를 찾는 findPeerBySocketId 메서드, 자신을 제외한 Peer들을 반환하는 getOtherPeerList 메서드, Peer를 방에서 제외시키는 deletePeer가 있다. 또한 전역으로 Room들을 저장하고 있고, Room을 저장하고, Room 이름으로 찾고 삭제시키는 static 메서드가 있다.
그럼 한번 Peer 클래스를 살펴보자.
class Peer {
socketId;
details;
producerTransport;
consumerTransports;
producer;
consumers;
teamName;
constructor(socketId, summoner, teamName = null) {
this.socketId = socketId;
this.details = summoner;
this.producerTransport = null;
this.consumerTransports = new Map();
this.producer = null;
this.consumers = new Map();
this.teamName = teamName;
}
addProducerTransport(transport) {
this.producerTransport = transport;
}
addConsumerTransport(transport, remoteProducerId) {
this.consumerTransports.set(remoteProducerId, transport);
}
findProducerTransport() {
return this.producerTransport;
}
findConsumerTransport(remoteProducerId) {
return this.consumerTransports.get(remoteProducerId);
}
addProducer(producer) {
this.producer = producer;
}
addConsumer(consumer, remoteProducerId) {
this.consumers.set(remoteProducerId, consumer);
}
findConsumer(remoteProducerId) {
return this.consumers.get(remoteProducerId);
}
disconnectVoice() {
this.producer?.close();
this.producerTransport?.close();
Array.from(this.consumers.values()).forEach((consumer) => {
consumer.close();
});
Array.from(this.consumerTransports.values()).forEach((transport) => {
transport.close();
});
}
}
export default Peer;
Peer는 자기 자신을 찾을 수 있는 키 역할인 socketId와 Peer의 정보, 그리고 Transport들과 Producer, Consumer들을 가지고 있다. 또한 해당 상태를 관리하는 여러 메서드들이 있다. 하나하나 설명하기에는 길어 생략하겠다. 메서드 이름을 보면 대충 어떤 행동을 하는지 예측할 수 있다.
이제 Router를 생성하기 위한 Worker를 먼저 생성해보자.
import mediasoup from 'mediasoup';
let worker;
const mediaCodecs = [
{
kind: 'audio',
mimeType: 'audio/opus',
clockRate: 48000,
channels: 2,
},
];
export const createWorker = async () => {
worker = await mediasoup.createWorker({
rtcMinPort: 2000,
rtcMaxPort: 2100,
});
worker.on('died', (error) => {
console.error(`mediasoup worker has died: ${error}`);
});
};
서버가 시작하는 엔드포인트에서 위 함수를 실행하면 된다.
import http from 'http';
import * as worker from './lib/worker.js';
import socket from './router/socket.js';
const server = http.createServer(app);
const PORT = 8080;
server.listen(PORT, () => {
console.log(`starting ${PORT}`);
});
worker.createWorker().then(() => {
socket(server);
});
socket.js에는 소켓을 설정하는 부분이다.
import { Server } from 'socket.io';
import { teamVoiceChatConnection } from './voice/index.js';
const socketCors = {
cors: {
origin: '*',
methods: ['GET', 'POST'],
credentials: true,
},
};
export default (server) => {
const io = new Server(server, socketCors);
onVoiceConnections(io);
};
function onVoiceConnections(io) {
const teamVoiceChatIo = io.of('/team-voice-chat');
teamVoiceChatIo.on('connection', (socket) => {
teamVoiceChatConnection(teamVoiceChatIo, socket);
});
}
이제 어떻게 동작하는지 알아보자. 설명하기 전에 클라이언트 입장에서는 글로만 작성하고, 서버의 입장에서는 코드와 함께 설명하도록 하겠다. 먼저 클라이언트는 유저의 미디어 장비에 접근해서 오디오 stream을 받고 서버에 Router의 rtpCapabilities를 요청한다. 서버에서는 클라이언트가 보낸 방 이름으로 Room을 찾는다. 방을 찾았다면 Peer 객체를 생성하고, Room에 저장한다. 그리고 해당 Room 안에 있는 Router의 rtpCapabilities를 전송한다.
socket.on('team-join-room', async ({ roomName, summoner }, callback) => {
socket.join(roomName);
socket.roomName = roomName;
const room = await getRoomOrCreate(roomName);
const peer = new Peer(socket.id, summoner);
room.addPeer(peer);
callback({
rtpCapabilities: room.router.rtpCapabilities,
});
console.log(`${summoner.name} 방 입장`);
});
getRoomOrCreate는 방 이름으로 방을 찾는다. 만약 존재하지 않는다면 Worker에서 router을 생성한 후에 Room객체를 생성하고 저장한 후 반환한다. 방을 찾았다면 그대로 반환한다.
async function getRoomOrCreate(roomName) {
const room = Room.findByName(roomName);
if (!room) {
const router = await worker.createRouter();
const generatedRoom = new Room(router);
Room.save(roomName, generatedRoom);
return generatedRoom;
}
return room;
}
여기서 worker는 따로 모듈로 분리하였다.
//worker.js
export const createRouter = async () => {
return await worker.createRouter({ mediaCodecs });
};
클라이언트는 받은 rtpCapabilities로 디바이스를 생성한 후 load를 진행한다. 여기서부터 중요하다. 클라이언트쪽에서 미디어를 전송 및 수신하기 위해서 transport를 생성해야 하는데, 바로 생성하는 것이 아니라 서버한테 transport를 생성하라고 요청한 다음에 생성된 transport의 정보를 받아서 클라이언트쪽에서 복제시킨다. 그리고 복제시킨 transport와 서버의 transport를 연결을 시켜 앞으로 클라이언트에서 서버로 미디어스트림을 보내는 경로를 연결한다.
아래 로직은 transport를 생성하고 transport의 정보를 클라이언트한테 전송한다. 해당 이벤트의 이름이 create-producer-transport인 이유는 생성할 transport가 나중에 producer를 생성해야 하기 때문이다. 서버는 Room을 찾은 다음, Room안에 있는 Peer 객체를 가져온다. 그 다음 Room 안에 있는 router의 값으로 transport를 생성한 다음 Peer객체에 저장한다. 마지막으로 클라이언트한테 생성한 transport의 정보를 반환한다.
socket.on('create-producer-transport', async (callback) => {
const room = Room.findByName(socket.roomName);
const peer = room.findPeerBySocketId(socket.id);
createWebRtcTransport(room.router).then((transport) => {
peer.addProducerTransport(transport);
callback({
id: transport.id,
iceParameters: transport.iceParameters,
iceCandidates: transport.iceCandidates,
dtlsParameters: transport.dtlsParameters,
});
console.log(`${peer.details.name} producer transport 생성`);
});
});
createWebRtcTransport는 router에서 transport를 생성한 후 반환하는 메서드다. 이 때 LISTENIP는 로컬에서 돌리려면 로컬 ip를, 배포했다면 배포된 ip를 사용해야 한다.
function createWebRtcTransport(router) {
const webRtcTransportOptions = {
listenIps: [
{
ip: LISTENIP,
announcedIp: null,
},
],
enableUdp: true,
enableTcp: true,
preferUdp: true,
};
return new Promise(async (resolve, reject) => {
try {
const transport = await router.createWebRtcTransport(webRtcTransportOptions);
resolve(transport);
} catch (error) {
reject(error);
}
});
}
transport의 정보를 받은 클라이언트는 해당 정보들로 client-side transport를 복제시킨 다음 connect 이벤트와 produce이벤트를 발생시킨다. connect이벤트는 서버로 dtlsParameters를 전달한다. 서버는 아까 저장했던 transport를 찾은 다음 connect() 메서드를 통해 연결시킨다.
socket.on('transport-connect', async (dtlsParameters) => {
const room = Room.findByName(socket.roomName);
const peer = room.findPeerBySocketId(socket.id);
const trnasport = peer.findProducerTransport();
trnasport.connect({ dtlsParameters });
console.log(`${peer.details.name} producer transport 연결`);
});
또 다른 이벤트인 produce는 서버한테 client-side transport의 정보를 준다. 그럼 서버에서는 역시 server-side transport를 찾은 뒤 produce() 메서드로 Producer를 생성한다. 생성된 Producer는 Peer에 저장한다.
socket.on('transport-produce', async ({ kind, rtpParameters }, callback) => {
const room = Room.findByName(socket.roomName);
const peer = room.findPeerBySocketId(socket.id);
const trnasport = peer.findProducerTransport();
const producer = await trnasport.produce({ kind, rtpParameters });
console.log(`${peer.details.name} producer가 생성`);
informNewProducer(room, peer, producer.id);
peer.addProducer(producer);
const otherPeers = room.getOtherPeerList(peer.socketId);
callback({
id: producer.id,
producersExist: otherPeers.length > 0,
});
});
위 로직에서 2가지 중요한 부분이 있는데, informNewProducer메서드와 producerExist 값이다. 먼저 informNewProducer 메서드는 이름 그대로 새로운 사용자가 참여했다는걸 자신을 제외한 다른 Peer들한테 알린다.
function informNewProducer(room, me, producerId) {
room.getOtherPeerList(me.socketId).forEach((peer) => {
io.to(peer.socketId).emit('new-producer', {
id: producerId,
summoner: me.details,
});
});
console.log(`${me.details.name} 새롭게 입장했다고 알림`);
}
만약 사용자 A가 먼저 음성채팅방에 참여했다고 가정해보자. 그럼 위 로직까지 잘 수행되서 다른 Peer들한테 자신의 ProducerId를 전송할 것이다. 하지만 방에는 사용자 A밖에 없기 때문에 아무일도 일어나지 않는다.
사용자 B가 들어왔다면, 역시 위 로직까지 잘 수행되서 다른 Peer들한테 자신의 ProducerId를 전송할 것이다. 이번에는 방에 사용자 A가 있기 때문에 사용자 A의 클라이언트는 사용자 B의 ProducerId를 받을 것이다. 여기서 해야할 일은 뭘까? 사용자 B가 들어왔다면 사용자 A는 사용자 B의 목소리를 들어야 한다. 그렇다면 서버에서 클라이언트로 미디어를 전송하는(다른 Peer의 음성을 듣기 위해 전달) 네트워크 경로를 만들어야 한다. 따라서 Consumer가 전달되는 Transport를 만들어보자.
socket.on('create-consumer-transport', async (remoteProducerId) => {
const room = Room.findByName(socket.roomName);
const peer = room.findPeerBySocketId(socket.id);
createWebRtcTransport(room.router).then((transport) => {
peer.addConsumerTransport(transport, remoteProducerId);
socket.emit('complete-create-consumer-transport', {
id: transport.id,
iceParameters: transport.iceParameters,
iceCandidates: transport.iceCandidates,
dtlsParameters: transport.dtlsParameters,
});
console.log(`${peer.details.name} consumer transport 생성`);
});
});
Producer를 전송하는 Transport를 생성할 때와 같다. 하나 다른점은 Transport를 저장할 때 ProducerTransport인지 ConsumerTransports인지 구분하기 위해 이벤트 또한 분리했다. 그리고 클라이언트한테 생성된 Transport의 정보를 전송하면, 클라이언트는 복제시킨 다음 Producer를 생성했을때와 동일하게 connect 이벤트를 실행한다.
socket.on('transport-recv-connect', async ({ dtlsParameters, remoteProducerId }) => {
const room = Room.findByName(socket.roomName);
const peer = room.findPeerBySocketId(socket.id);
const transport = peer.findConsumerTransport(remoteProducerId);
transport.connect({ dtlsParameters });
console.log(`${peer.details.name}님consumer transport 연결`);
});
이제 Consumer를 생성할 차례다. Consumer을 생성할 때 중요한 점은 바로 생성하는 것이 아니라 paused된 상태로 생성한 다음 다른 이벤트에서 다시 시작(resume)하는 것이 좋다고 한다.
따라서 consume을 할 때 paused가 true인 상태로 생성한다.
socket.on('consume', async ({ rtpCapabilities, remoteProducerId }, callback) => {
const room = Room.findByName(socket.roomName);
const peer = room.findPeerBySocketId(socket.id);
const transport = peer.findConsumerTransport(remoteProducerId);
if (room.router.canConsume({ producerId: remoteProducerId, rtpCapabilities })) {
const consumer = await transport.consume({
producerId: remoteProducerId,
rtpCapabilities,
paused: true,
});
peer.addConsumer(consumer, remoteProducerId);
console.log(`${peer.details.name} consumer 생성`);
callback({
params: {
id: consumer.id,
producerId: remoteProducerId,
kind: consumer.kind,
rtpParameters: consumer.rtpParameters,
serverConsumerId: consumer.id,
},
});
}
});
그리고 다른 이벤트로 요청해서 paused된 Consumer을 resume시킨다.
socket.on('consumer-resume', async (remoteProducerId) => {
const room = Room.findByName(socket.roomName);
const peer = room.findPeerBySocketId(socket.id);
const consumer = peer.findConsumer(remoteProducerId);
await consumer.resume();
console.log(`${peer.details.name} consum resume`);
});
여기까지 구현하고 잠깐 정리해보면, 먼저 들어와있던 사용자 A는 새롭게 들어온 사용자 B의 목소리를 들을 수 있게 되었다. 반면 새롭게 들어온 사용자 B는 기존에 있던 다른 Peer들의 목소리를 들어야 한다. 그래서 새롭게 들어온 참여자는 Producer생성이 끝나고 원래 방에 있는 Peer들이 있는지 producerExist로 확인한다. 만약 있다면 서버한테 아래 이벤트를 호출해 자신을 제외한 Peer들의 ProducerId 리스트를 받아온다.
socket.on('get-producers', (callback) => {
const room = Room.findByName(socket.roomName);
const producers = room.getOtherPeerList(socket.id).map((peer) => {
return {
id: peer.producer.id,
summoner: peer.details,
};
});
callback(producers);
console.log('새롭게 들어온 애한테 기존애있던애들 정보 전달');
});
그럼 클라이언트는 받은 ProducerId 리스트를 돌면서 위에서 설명했던 Consumer를 생성하고 resume하는 로직을 수행하고 연결하면 된다.
이렇게 mediasoup로 음성채팅을 구현할 수 있다. 이 로직은 완벽한 구현이 아니라 간단한 구현이다. 아직 구현 레이어에서 로그를 찍는 것을 옵서버 기능을 사용해서 분리시키고, 대역폭이나 기타 제한사항을 다루는 방법을 깊게 다루고 싶다. 만약 시간이 난다면 네트워크나 webRTC에 대해 깊게 공부해보고 싶다. 아무튼 이렇게 WebRTC의 표준 방식인 P2P에서 SFU 아키텍쳐 방식으로 성능을 개선시킬 수 있다.
'Performance up' 카테고리의 다른 글
인덱스를 사용해서 쿼리 성능 개선시키기 (2) | 2023.12.26 |
---|---|
nGrinder로 서비스 성능 테스트 및 분석해보기 (2) | 2023.11.08 |
Electron 앱 용량 662MB → 230MB로 줄이기 (2) | 2023.10.31 |
여러 객체를 fetch join을 하여 JPA N+1 문제 해결해보기 (2) | 2022.12.23 |
OneToOne 양방향일 때 발생하는 추가 쿼리 해결 (0) | 2022.12.19 |
- Total
- Today
- Yesterday