mediasoupのデータチャネルを試す

はじめに

ここ数年、新型コロナウイルス感染症の影響でリモートワーク・オンライン授業・オンライン診察など多くの場面で「オンライン」の需要が増えてきています。 WebRTCは、そのオンラインを実現するためのネットワーク基盤技術としてビデオチャットなどで広く使用されています。活用している人も多いのではないでしょうか。

この記事では、WebRTCの欠点をおぎなうSFU(Selective Forwarding Unit)という技術について、 オープンソースソフトウェアのmediasoupを利用したサンプルコードをまじえて紹介します。

なぜSFUが必要なのか

WebRTCはP2P型のプロトコルを採用した技術です。 一番最初の接続先を決定する通信(シグナリング処理)はサーバーとやりとりしますが、 接続先が決まったあとはWebブラウザとWebブラウザが直接カメラ映像などのデータを送受信します。

たとえば2人でビデオチャットをする場合、何も問題はありません。 おたがいにP2Pで通信するためのWebRTCコネクションを作成して通信するだけです。

では、3人になるとどうでしょう? この場合、それぞれ他の2人とP2Pで通信するためのWebRTCコネクションを作成し、受信する人数分ビデオチャットのデータを送信する必要があります。

想像すればわかると思いますが、人数が増えるほど送信側に負荷がかかり効率が悪くなるのは明らかです。 この問題から、WebRTCでは多人数によるビデオチャットは10人ぐらいが限界だと言われています。

WebRTCの問題点

SFUは、主にこの問題を解決するために利用される技術です。 SFUを利用する場合、WebRTCコネクションはSFUに対して作成します。 ビデオチャットのデータもSFUに対してだけ送信すればよく、SFUがデータの受信を希望する相手に複製して送信します。 これにより、人数が増えても送信側の負荷は変わらないという大きな効果が期待できます。

受信側が受信するデータ自体はSFUを利用しても変わりませんが、同じWebRTCコネクションで複数のデータを受信できる部分が負荷軽減につながります。

SFUを利用する場合

SFUは負荷軽減が大きな目的のひとつですが、 データを転送することを利用して以下のような目的で使われることもあります。

  • ビデオチャットなどのストリームデータをサーバーで録画する
  • テキストデータを解析したり、加工してから転送する(テキストチャットのBotなど)

mediasoup

mediasoupは、オープンソースで開発が進められているSFUです。 実績も豊富で、他のオープンソースと比べると以下の特徴をもっています。

  • SFUに特化している
  • スタンドアロンサーバーとしてではなく、Node.jsのモジュールとして利用可能
  • C++ライブラリも提供されている
  • 低レベルなAPIにより、RTPパケットレベルの細かい制御も可能

また、WebRTCのストリーム配信だけでなく、GStreamerやFFmpegと組み合わせることで、 動画ファイルの配信や録画も可能になっています。

2022年5月現在の最新版であるバージョン3では、WebRTCのデータチャネルにも対応しました。 データチャネルを利用すると、ストリーム以外の任意のデータを送信することができるため、 テキストチャットやファイル転送など、実現できることの幅が広がります。

サンプルコード

サンプルコードでは、mediasoupのデータチャネルを利用して画面共有の静止画像データを3秒間隔で送信し、一覧表示する例を紹介します。

もちろん画面共有をリアルタイムなストリームとして送信することもできますが、今回は以下の理由であえて静止画像で試すことにしました。

  • 多数の画面を一覧表示するなら数秒に1回更新されれば十分なケースが多い
  • クラウド環境ではデータ転送量によって利用料が高くなるため、転送量を抑えたい
  • SFUを利用しても受信側にはそれなりの負荷がかかるため、受信側の負荷をなるべく抑えたい

動作確認環境

バージョン
OS Ubuntu 22.04
Node.js 12.22.9
mediasoup 3.9.13
mediasoup-client 3.6.51
Socket.IO 4.5.0
Browserify 17.0.0
Express 4.18.1

ファイル構成

screen-share/
  ├ public/
  │  ├ producer.html
  │  └ consumer.html
  ├ package.json
  └ screen-share.js

ソースコード

理解しやすさを重視したため、必要最小限のコードになっています。 実際はエラー処理や切断イベントによるリソース破棄などの処理が必要になりますので注意してください。

package.json

{
  "name": "screen-share",
  "version": "0.1.0",
  "dependencies": {
    "browserify": "^17.0.0",
    "express": "^4.18.1",
    "mediasoup": "^3.9.13",
    "mediasoup-client": "^3.6.51",
    "socket.io": "^4.5.0"
  }
}

screen-share.js

ソースコードを開く/閉じる
'use strict';

let producerList = {};
let consumerList = {};

// --- HTTPサーバー ---

const http = require('http');
const express = require('express');
const app = express();
app.use(express.static('public'));
const webServer = http.Server(app).listen(3000);

// --- WebSocketサーバー ---

const io = require('socket.io')(webServer);

io.on('connection', sock => {
  // ----- 共通 -----

  // クライアントがMediaSoupのDeviceを準備するために必要な情報を返す
  sock.on('get-rtp-capabilities', (_, callback) => {
    callback(router.rtpCapabilities);
  });

  // ----- Producerのリクエスト処理 -----

  sock.on('create-producer-transport', async (_, callback) => {
    const { transport, params } = await createTransport();
    transport.observer.on('close', () => {
      transport.producer.close();
      transport.producer = null;
      delete producerList[transport.id];
      transport = null;
    });
    callback(params);

    producerList[transport.id] = transport;
  });

  sock.on('connect-producer-transport', async (req, callback) => {
    const transport = producerList[req.transportId];
    await transport.connect({ dtlsParameters: req.dtlsParameters });
    callback({});
  });

  sock.on('produce-data', async (req, callback) => {
    const transport = producerList[req.transportId];
    const dataProducer = await transport.produceData(req.produceParameters);
    callback(dataProducer.id);

    // 新しいProducerをブロードキャストでConsumerへ通知
    sock.broadcast.emit('new-producer', {
      producerId: dataProducer.id,
    });

    transport.producer = dataProducer;
  });

  // ----- Consumerのリクエスト処理 -----

  sock.on('create-consumer-transport', async (_, callback) => {
    const { transport, params } = await createTransport();
    transport.observer.on('close', () => {
      transport.consumer.close();
      transport.consumer = null;
      delete consumerList[transport.id];
      transport = null;
    });
    callback(params);

    consumerList[transport.id] = transport;
  });

  sock.on('connect-consumer-transport', async (req, callback) => {
    const transport = consumerList[req.transportId];
    await transport.connect({ dtlsParameters: req.dtlsParameters });
    callback({});
  });

  sock.on('consume-data', async (req, callback) => {
    const transport = consumerList[req.transportId];
    const dataConsumer = await transport.consumeData(req.consumeParameters);
    const params = {
      id: dataConsumer.id,
      dataProducerId: dataConsumer.dataProducerId,
      sctpStreamParameters: dataConsumer.sctpStreamParameters,
      label: dataConsumer.label,
      protocol: dataConsumer.protocol,
    };
    callback(params);

    transport.consumer = dataConsumer;
  });
});

// --- MediaSoupサーバー ---

let worker = null;
let router = null;

const mediasoup = require('mediasoup');
const transportOption = {
  listenIps: [
    { ip: '192.168.0.1' },
  ],
  enableSctp: true,
};

async function startWorker() {
  worker = await mediasoup.createWorker();
  router = await worker.createRouter({});
}

async function createTransport() {
  const transport = await router.createWebRtcTransport(transportOption);
  return {
    transport: transport,
    params: {
      id: transport.id,
      iceParameters: transport.iceParameters,
      iceCandidates: transport.iceCandidates,
      dtlsParameters: transport.dtlsParameters,
      sctpParameters: transport.sctpParameters,
    }
  };
}

startWorker();

public/producer.html

ソースコードを開く/閉じる
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>MediaSoup Producer</title>
<script src="mediasoup-client.js"></script>
<script src="socket.io/socket.io.js"></script>
</head>
<body>

<p><button id="start" onclick="start();">送信開始</button></p>
<video id="video"></video>
<canvas id="canvas" style="display: none;"></canvas>

<script>
'use strict';

const imageFormat = 'image/png';
const interval = 3000;  // 送信間隔(ミリ秒)
const canvasWidth = 320;
const canvasHeight = 180;

const video = document.getElementById('video');
const canvas = document.getElementById('canvas');
const buttonStart = document.getElementById('start');

let producer;

function start() {
  buttonStart.disabled = true;
  video.style.width = canvasWidth + 'px';
  video.style.height = canvasHeight + 'px';

  navigator.mediaDevices.getDisplayMedia({video: true, audio: false}).then(stream => {
    video.srcObject = stream;
    video.play();

    producer = new Producer();
    producer.join();
  });
}

class Producer {
  constructor() {
    this.timerId = null;
    this.sock = null;
    this.msDevice = null;
    this.msTransport = null;
  }

  async join() {
    await this.createWebSocket();
    await this.createDevice();
    await this.createTransport();
    await this.createProducer();
  }

  // WebSocketの生成
  async createWebSocket() {
    const sock = io('/');
    this.sock = sock;
  }

  // MediaSoupを利用する場合、一番最初にDeviceオブジェクトを準備する
  async createDevice() {
    const rtpCap = await this.sendRequest('get-rtp-capabilities', {});
    const device = new MediasoupClient.Device();
    await device.load({ routerRtpCapabilities: rtpCap });
    this.msDevice = device;
  }

  // Deviceから通信用オブジェクトTransportを生成する
  async createTransport() {
    const params = await this.sendRequest('create-producer-transport', {});
    const transport = this.msDevice.createSendTransport(params);

    // connectイベントが発生したらパラメータを送信してサーバー側でWebRtcTransport.connect()を実行する
    transport.on('connect', async ({ dtlsParameters }, callback, errback) => {
      this.sendRequest('connect-producer-transport', {
        transportId: transport.id,
        dtlsParameters: dtlsParameters,
      }).then(callback)
        .catch(errback);
    });

    // producedataイベントが発生したらパラメータを送信してサーバー側でDataProducerを生成する
    transport.on('producedata', async (parameters, callback, errback) => {
      try {
        const id = await this.sendRequest('produce-data', {
          transportId: transport.id,
          produceParameters: parameters,
        });
        callback({ id: id });
      } catch (err) {
        errback(err);
      }
    });

    this.msTransport = transport;
  }

  // Transportからデータ送信用のDataProducerを生成する
  async createProducer() {
    const producer = await this.msTransport.produceData();

    producer.on('open', () => {
      this.timerId = setInterval(() => {
        const context = canvas.getContext('2d');
        canvas.width = canvasWidth;
        canvas.height = canvasHeight;
        context.drawImage(video, 0, 0, canvasWidth, canvasHeight);

        canvas.toBlob(blob => {
          const reader = new FileReader();
          reader.onloadend = () => {
            // 画面共有の画像データを送信
            producer.send(reader.result);
          };
          reader.readAsArrayBuffer(blob);
        }, imageFormat);
      }, interval);
    });
  }

  // WebSocket通信用共通メソッド
  sendRequest(type, data) {
    return new Promise((resolve, reject) => {
      this.sock.emit(type, data, res => resolve(res));
    });
  }
}
</script>
</body>
</html>

public/consumer.html

ソースコードを開く/閉じる
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>MediaSoup Consumer</title>
<script src="mediasoup-client.js"></script>
<script src="socket.io/socket.io.js"></script>
</head>
<style>
#screenlist img {
  border: 1px solid black;
  width: 320px;
  height: 180px;
}
#screenlist div {
  padding: 3px 10px;
  float: left;
}
</style>
<body>

<p><button id="start" onclick="start();">受信開始</button></p>
<div id="screenlist"></div>

<script>
'use strict';

const buttonStart = document.getElementById('start');

let consumer;

function start() {
  buttonStart.disabled = true;

  consumer = new Consumer();
  consumer.join();
}

class Consumer {
  constructor() {
    this.sock = null;
    this.msDevice = null;
    this.msTransport = null;
  }

  async join() {
    await this.createWebSocket();
    await this.createDevice();
    await this.createTransport();
  }

  // WebSocketの生成
  async createWebSocket() {
    const sock = io('/');

    // サーバーから新しいProducerの通知を受信したらDataConsumerを生成する
    sock.on('new-producer', async data => {
      const params = await this.sendRequest('consume-data', {
        transportId: this.msTransport.id,
        consumeParameters: {
          dataProducerId: data.producerId,
        },
      });

      const consumer = await this.msTransport.consumeData(params);

      // 画面共有の画像データを受信
      consumer.on('message', msg => {
        this.addOrUpdateScreen(data.producerId, msg);
      });
    });

    this.sock = sock;
  }

  // MediaSoupを利用する場合、一番最初にDeviceオブジェクトを準備する
  async createDevice() {
    const rtpCap = await this.sendRequest('get-rtp-capabilities', {});
    const device = new MediasoupClient.Device();
    await device.load({ routerRtpCapabilities: rtpCap });
    this.msDevice = device;
  }

  // Deviceから通信用オブジェクトTransportを生成する
  async createTransport() {
    const params = await this.sendRequest('create-consumer-transport', {});
    const transport = this.msDevice.createRecvTransport(params);

    // connectイベントが発生したらパラメータを送信してサーバー側でWebRtcTransport.connect()を実行する
    transport.on('connect', async ({ dtlsParameters }, callback, errback) => {
      this.sendRequest('connect-consumer-transport', {
        transportId: transport.id,
        dtlsParameters: dtlsParameters,
      }).then(callback)
        .catch(errback);
    });

    this.msTransport = transport;
  }

  // ProducerのIDで検索し、画面共有画像の更新または追加をおこなう
  addOrUpdateScreen(producerId, imageData) {
    let div = document.getElementById(producerId);
    if (div == null) {
      div = document.createElement('div');
      div.id = producerId;
      div.appendChild(document.createElement('img'));

      const list = document.getElementById('screenlist');
      list.appendChild(div);
    }

    const img = div.getElementsByTagName('img')[0];
    const blob = new Blob([imageData]);
    img.src = URL.createObjectURL(blob);
  }

  // WebSocket通信用共通メソッド
  sendRequest(type, data) {
    return new Promise((resolve, reject) => {
      this.sock.emit(type, data, res => resolve(res));
    });
  }
}
</script>
</body>
</html>

実行方法

Ubuntu 22.04の環境を用意し、上記のとおりファイルを作成します。 その際、screen-share.jsの以下のIPアドレスをサーバーを実行するホストのものに変更してください。

const transportOption = {
  listenIps: [
    { ip: '192.168.0.1' },
  ],
  enableSctp: true,
};

続けて、ビルド・実行に必要となるパッケージをインストールします。 プロキシが必要な環境では、別途プロキシの設定をおこなってください。

$ sudo apt update
$ sudo apt install nodejs npm python3-pip

作成したscreen-shareディレクトリに移動し、依存パッケージをインストールします。

$ cd screen-share
$ npm install

mediasoupのクライアントライブラリはTypeScriptで書かれているため、Node.jsではそのまま読み込んで実行できますが、Webブラウザからは読み込めません。 公式ドキュメントにも記載されているとおり、Browserifyを用いてWebブラウザから読み込めるライブラリに変換します。

$ node_modules/browserify/bin/cmd.js node_modules/mediasoup-client/lib/index.js -s MediasoupClient -o public/mediasoup-client.js

以上で実行する準備は完了です。 次のコマンドで、mediasoupのサーバーを起動します。

$ node screen-share.js

サーバーが起動できたら、ローカルホスト上のChromeまたはFirefoxからhttp://127.0.0.1:3000/consumer.htmlを開き、「受信開始」ボタンを押します。 続けてhttp://127.0.0.1:3000/producer.htmlを開いて「送信開始」ボタンを押し、共有する画面を選択します。

実行結果

mediasoupでは、受信側をConsumer、送信側をProducerと呼びます。 下図は2つのConsumerに対して、2つのProducerがそれぞれシステムモニターおよびマインスイーパーのウィンドウの画面を静止画像で送信したときの実行結果です。

実行結果

Consumer側の画面は、以下で設定した間隔で更新されます。

const interval = 3000;  // 送信間隔(ミリ秒)

解説

mediasoupのオブジェクトは下図のような関係になっています。

mediasoupのオブジェクト

オブジェクト 説明
Worker 単一のCPU上で動作する一番上位に位置するオブジェクトです。複数のRouterを制御します。
Router 複数のTransportを管理します。大規模なシステムでは、Routerを別々のホストで作成し、Router間でデータを転送することで負荷を軽減させることもできるように設計されています。
Transport 通信経路をあらわし、実際のTCP/IPにおけるUDPポートもこの単位で決まります。WebRTCで利用するWebRTCTransportのほか、Router間でデータを転送するためのPipeTransport、ビデオ・オーディオデータを直接あつかうPlainTransportなどがあります。
DataProducer Transportから作成するオブジェクトで、データチャネルでメッセージを送信します。
DataConsumer Transportから作成するオブジェクトで、データチャネルでメッセージを受信します。DataProducerを作成したときに生成されるProducerIdを指定してDataConsumerを作成することで、指定したデータを受信できます。

mediasoupを利用するときの処理の流れは以下のようになります。

  1. サーバーがWorkerとRouterオブジェクトを準備
  2. クライアント(送信側・受信側)とサーバーで通信経路となるTransportを作成
  3. 送信側がDataProducerを作成
  4. サーバーは新しいDataProducerが作成されたことを受信側に通知
  5. 受信側が渡されたProducerIdを指定してDataConsumerを作成
  6. 接続が確立するとDataProducerでopenイベントが発生
  7. DataProducerのsend()でメッセージを送信
  8. DataConsumerのmessageイベントで受信したメッセージを処理

いろいろ準備する必要はありますが、
最終的にはDataProducerのsend()で送信し、DataConsumerのmessageイベントで受信する
という部分を覚えてしまえば理解しやすいと思います。

補足:リモートホストから実行するには

今回紹介したサンプルコードは、そのままではローカルホスト上でしか実行できません。 リモートホストから実行するには以下の手順が必要になります。

リモートホスト側のChromeで、chrome://flagsの設定画面を開き、 #unsafely-treat-insecure-origin-as-secureに以下を追加します。 ここでは192.168.0.1としていますが、mediasoupサーバーを実行するホストのIPアドレスを入力してください。

http://192.168.0.1:3000

画面共有で使用するgetDisplayMedia()は、リモートホストから実行するにはHTTPSでないと動作しません。 上記の設定を追加することでHTTPもセキュアとみなして回避できますが、実際に運用する場合はHTTPSに対応させる必要があります。

以上の手順で、リモートホストからもhttp://192.168.0.1:3000/にアクセスして実行できるようになります。

おわりに

mediasoupは「低レベルなAPIを提供する」という特徴を見ると利用するのは難しいイメージがありますが、 簡単なSFU機能を利用するだけであればサンプルコードのとおり数百行程度で実現することができました。

オブジェクト構造もわかりやすく作られていて、公式サイトのAPIドキュメントやサンプルコードも充実しています。 まだ試せてはいませんが、動画の配信や録画も興味深いところです。 興味をもった方は試してみてはいかがでしょうか。

ALPHA SYSTEMS INC.

株式会社アルファシステムズは、ITサービス事業を展開しています。このブログで、技術的な取り組みを紹介しています。