e.blog

主にUnity/UE周りのことについてまとめていきます

Unity で MQTT を使って IoT デバイスと通信する

はじめに

最近は IoT を使ってなにかできないか色々模索中です。特に、AI と絡めることでできることが一気に広がる感じがしてとても楽しいです。

下の画像は ESP32 のボードに LED を光らせる仕組みを、M5StickCPlus に MQTT ブローカーと(デバッグ用の)Web サーバを立ち上げて、ブラウザからメッセージを飛ばすと LED が光る、というデモの様子です。(本当は赤外線を飛ばす予定。ただ分かりづらいので動画では普通の LED に差し替えましたw)

今回は色々試していく中で、Unity で MQTT を使って IoT デバイスにメッセージを飛ばす部分について書こうと思います。また、せっかくなのである程度意味があることを実現したいなと思い、Meta Quest の音量を M5Stick に飛ばして可視化する、というのをやってみました。なぜこれを作ったかというと、Quest を使った展示をしていると気づかないうちに音量が下がっていたりして、無音で体験してもらってしまう問題があったためにその対策として作りました。

▼ 今回実装したもの

以下は Meta Quest 版ではなく MQTT を叩くだけのシンプルな実装のものです。ただ、Quest でも動作するのは確認済みなので適宜コピーして利用してください。

github.com

▼ 動作している様子



MQTT とは

MQTT は「Message Queuing Telemetry Transport」の略とされていますが、Queuing の機能はなく名称だけが残っているようです。HTTP よりも軽量なため消費電力が少なく、非同期な双方向通信が可能です。そのため IoT に最適なメッセージングプロトコルです。Pub/Sub パターンを採用しており、非同期に 1 対多の通信が可能です。特に大事なのは組み込みデバイスやセンサーなど、メモリやネットワーク環境に制限があるような状況を想定して作られている点です。そのため IoT を用いた開発では重要になってくる技術です。

MQTT 自体の詳細な解説はここでは割愛します。どんなものかを知りたい方は以下の記事が参考になります。

dev.classmethod.jp

ライブラリ選定について

今回使用しているのは MQTTnet というライブラリです。これ以外にも M2Mqtt というライブラリもあるのですが、ChatGPT に聞いてみたところ MQTTnet を進められたのでこちらを利用しています。

かいつまんでオススメの理由を書くと以下のように説明されました。

  • メンテ状況が圧倒的に新しい
  • プロトコル対応が広い(MQTT v5 対応)
  • トランスポート種別が豊富で Unity と相性が良い

環境構築

前述の通り、今回は MQTTnet を利用するためその環境を構築します。MQTTnet は NuGet で提供されているためまずは NuGet for Unity をインストールします。

インストールしたら Window メニューから NuGet > Manager NuGet Packages を選択しマネージャを開きます。

.NET のバージョンなどによって動作する・しないがありそうなので、少し前の 3.1.2 をインストールしました。

実装

今回は M5StickC-Plus というデバイスから Android / iOS アプリに接続してトピックを購読する形にしました。

▼ 動作させている様子

www.youtube.com

MQTTnet ではクライアントもブローカーもどちらも実装できるため、ブローカー役を Android、publish するクライアント役を iOS で実行してみました。

ブローカー側の実装

ブローカー側の実装の解説です。ポイントごとに解説します。コード全文は GitHub にアップしてあるのでそちらをご覧ください。

以下はブローカーを起動する部分のコード断片です。

// 1) サーバ(ブローカー)生成
MqttFactory mqttFactory = new MqttFactory();
_mqttServer = mqttFactory.CreateMqttServer();

MqttFactory クラスのインスタンスを生成し、そこから IMqttServer インターフェースを実装したサーバのインスタンスを得ます。

// 2) オプション設定
//   - デフォルトエンドポイント(=TCP)を有効化し、1883で待ち受け
//   - 認証/認可や発行・購読のインターセプタは必要に応じて追加
MqttServerOptionsBuilder optionsBuilder = new MqttServerOptionsBuilder()
    .WithDefaultEndpoint()
    .WithDefaultEndpointPort(_port)
    .WithDefaultEndpointBoundIPAddress(IPAddress.Any)
    .WithoutEncryptedEndpoint()
    .WithApplicationMessageInterceptor(context =>
    {
        // 発行メッセージのフィルタ
        // 例)特定トピックのみ許可、メッセージ書き換え 等
        context.AcceptPublish = true; // 全許可(必要に応じて条件分岐)
    });

サーバを起動する際に利用するオプションを、ビルダーパターンで生成します。今回はサンプルのため基本的な設定のみ行っています。本番運用する場合は認証などの設定を追加したほうがいいでしょう。

// 3) イベント購読
_mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(args =>
{
    Debug.Log($"[Connected] ClientId={args.ClientId}");
});

_mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(args =>
{
    Debug.Log($"[Disconnected] ClientId={args.ClientId} Reason={args.DisconnectType}");
});

_mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(args =>
{
    string payload = args.ApplicationMessage?.Payload == null
        ? null
        : Encoding.UTF8.GetString(args.ApplicationMessage.Payload);

    string display = $"[Received] Topic={args.ApplicationMessage?.Topic} " +
                        $"QoS={(args.ApplicationMessage?.QualityOfServiceLevel)} Retain={args.ApplicationMessage?.Retain} " +
                        $"Payload='{payload}'";
    Debug.Log(display);

    _context.Post(_ => { _receivedText.text = display; }, null);
});

各種イベントを購読します。今回は主にログ出力のために購読しています。ブローカーとしての振る舞いのみでよければこのあたりの購読は不要です。

// 4) サーバ起動
IMqttServerOptions options = optionsBuilder.Build();
await _mqttServer.StartAsync(options);

設定が完了したらサーバを起動します。起動する際に、先ほど作成したオプションを指定します。

Android で謎のエラーが出るので対処

実は実際のコードでは以下のようにエラーハンドリングをしています。

catch (Exception e)
{
    if (e.Message.ToLower().Contains("Address already in use".ToLower()))
    {
        // NOTE: なぜか Android だと正常にサーバが起動しても "Address already in use" 例外が発生するので無視する
        success = true;
    }
    else
    {
        Debug.LogError($"Failed to start MQTT broker: {e.Message}");
    }
}
finally
{
    if (success)
    {
        string ip = GetLocalIPAddress();
        string hostInfo = ip != string.Empty ? $"{ip}:{_port}" : $"localhost:{_port}";
        Debug.Log($"MQTT broker started at {hostInfo}");

        _context.Post(_ => { _statusText.text = $"Broker started at {hostInfo}"; }, null);
    }
}

なぜか Android 実機でだけ、正常にサーバが起動しているにも関わらず Address already in use のエラーが発生します。しかし、実際にはサーバが起動しているため正常に動作します。

MQTTnet のリポジトリの issue にも同様の現象を報告しているものがありました。2022 年の issue なのですが、実害がないからなのか Open のままになっているようです・・。

github.com

そのため、エラー内容を見てもし Address already in use だったら無視するようにしています。ただ、本当にポートが使われているなどの理由でこのエラーが出ている場合もあるため、実際の運用をする場合は注意が必要です。

基本的にサーバ側の実装は以上です。無事に起動できたら UI に起動済みであること、IP アドレスなどが表示されるようになっています。

※ MQTT の通信のコールバックはメインスレッド外で通知されるため、nGUI に表示する場合はメインスレッドに切り替えて行う必要がある点に注意してください。

クライアント側の実装

次はクライアント側の実装です。今回のサンプルでは主にパブリッシュがメインとなっていますが、エコー用にサブスクライブもしているのでサブスクライブするパターンも網羅しています。

_mqttClient = new MqttFactory().CreateMqttClient();
_mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnAppMessage);
_mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected);
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected);

// ---- 中略 ----

IMqttClientOptions options = CreateClientOptions();
await _mqttClient.ConnectAsync(options, CancellationToken.None);

まずはサーバへの接続部分です。サーバと同様に MqttFactory クラスを介してクライアントインスタンスを取得します。

サーバと同様に各種イベントを購読しています。またサーバと同様、接続用のオプションも生成が必要です。

オプション生成は以下のようになっています。

private IMqttClientOptions CreateClientOptions()
{
    string host = _hostInputField.text;
    if (string.IsNullOrEmpty(host))
    {
        host = "localhost";
    }

    if (!int.TryParse(_portInputField.text, out int port))
    {
        port = 1883;
    }

    string topic = _topicInputField.text;
    if (string.IsNullOrEmpty(topic))
    {
        topic = "get/volume";
    }
    _topic = topic;

    MqttClientOptionsBuilder optionsBuilder = new MqttClientOptionsBuilder()
        .WithTcpServer(host, port);
    if (_useCredentials)
    {
        optionsBuilder.WithCredentials(_username, _password);
    }
    if (_useTls)
    {
        optionsBuilder.WithTls();
    }
    return optionsBuilder.Build();
}

サーバ接続に必要な情報を設定します。特に、インターネット上にあるサービスを利用する場合は TLS を利用することや、ユーザ名・パスワードが必要な場合があるため、それを利用するかで分岐しています。

今回はサンプルなので認証周りは不要です。メソッドの最後でビルドしたものを返しています。

接続完了はコールバックで通知されるので、そのコールバック内でサブスクライブなどを行います。

private async void OnConnected(MqttClientConnectedEventArgs args)
{
    Debug.Log("MQTT broker connected.");

    _unityContext.Post(_ =>
    {
        _statusText.text = "Connected";
        _connectButton.GetComponentInChildren<TMP_Text>().text = "Disconnect";
        _connectButton.interactable = true;
        _publishButton.interactable = true;
    }, null);

    if (string.IsNullOrEmpty(_topicInputField.text))
    {
        Debug.Log("Topic is empty. Subscription skipped.");
        _receivedText.text = "Topic is empty. Please enter a valid topic.";
        return;
    }

    await _mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(_topic).Build());

    Debug.Log($"TOPIC [{_topic}] Subscribed.");
}

サーバと同様、コールバックはメインスレッド外で通知されるのでメインスレッドで UI を更新するのを忘れないようにしてください。

トピックのサブスクライブは SubscribeAsync() メソッドで行います。サブスクライブ対象のトピックはビルダークラスがあるのでそれを利用して設定します。

サブスクライブしたトピックに通知がきたらコールバックでこれを受け取ります。コールバックの処理は以下です。

private void OnAppMessage(MqttApplicationMessageReceivedEventArgs args)
{
    string payload = Encoding.UTF8.GetString(args.ApplicationMessage.Payload);
    Debug.Log($"Received message: Topic = {args.ApplicationMessage.Topic}, Payload = {payload}");

    _unityContext.Post(_ => { _receivedText.text = payload; }, null);
}

Payload は UTF-8 で受信するため、それをテキストに変換します。今回は取得した Payload をただ UI に表示するだけですが、実際にはここで内容を見て処理を行うことになります。

以下はパブリッシュ処理です。

private async void Publish()
{
    try
    {
        Debug.Log("Publish message.");

        MqttApplicationMessage message = new MqttApplicationMessageBuilder()
            .WithTopic(_topic)
            .WithPayload(_payloadInputField.text)
            .WithAtLeastOnceQoS()
            .WithRetainFlag()
            .Build();

        await _mqttClient.PublishAsync(message, CancellationToken.None);
    }
    catch (Exception e)
    {
        Debug.Log($"Failed to publish message: {e.Message}");
    }
}

パブリッシュするには PublishAsync() メソッドを使います。引数はこちらも他と同様にビルダーパターンで生成します。 今回はトピックとペイロードQoSを設定して送信しています。


以上でサーバとクライアントの実装解説は終了です。MQTTnet を使うことでかなり簡単に実装できることが分かります。

次は、ここで実装したブローカーに実際に接続する IoT デバイス側の実装を紹介します。

M5StickC-Plus は Arduino IDE という IDE で開発を行います。ただ、こちらの解説を入れるとだいぶ長くなってしまうためここでは割愛します。このあたりについては別のブログ記事で解説しているので、詳細はそちらをご覧ください。ただ、M5StickC-Plus はまた少し違うセットアップが必要なので、適宜別の記事も参照ください。

zenn.dev

M5StickC-Plus 側の実装

今回は M5StickC-Plus というデバイスを使って MQTT 通信を試しました。M5Stack というシリーズの小型のデバイスです。

こういう感じの↓

この M5StickC-Plus 向けのコードもリポジトリに含まれているのでコード全文を見たい方はそちらをご覧ください。

以下は要点を絞って解説していきます。(ただ、主題は Unity なので軽く解説します)

///
/// セットアップ
///
void setup() {
  Serial.begin(115200);

  M5.begin();
  
  // 中略

  // 接続
  ensureWifi();
  ensureMqtt();
}

setup 内で Wi-Fi と MQTT ブローカーへの接続を確認しています。

///
/// MQTT ブローカーへの接続処理
///
bool mqttConnect() {
  mqtt.setServer(BROKER_URL, BROKER_PORT);
  mqtt.setCallback(onMqttMessage);
  mqtt.setBufferSize(MQTT_BUF_SIZE);
  mqtt.setKeepAlive(MQTT_KEEPALIVE_SEC);
  mqtt.setSocketTimeout(15);

  // Last Will and Testament(LWT)
  const char* willTopic = TOPIC;
  const char* willMsg   = "{\"status\":\"offline\"}";
  bool willRetain = false;

  Serial.printf("[MQTT] Connecting to %s:%d ...\n", BROKER_URL, BROKER_PORT);
  
  M5.Lcd.fillScreen(BLACK);
  M5.Lcd.setCursor(0, 0);
  M5.Lcd.println("Connecting to MQTT...");

  bool ok = mqtt.connect(clientId.c_str(), nullptr, nullptr, willTopic, 1, willRetain, willMsg, true);

  if (ok) {
    Serial.println("[MQTT] Connected.");
    // 購読
    if (mqtt.subscribe(TOPIC, 1)) {
      Serial.printf("[MQTT] Subscribed: %s\n", TOPIC);
      M5.Lcd.fillScreen(BLACK);
      M5.Lcd.setCursor(0, 0);
      M5.Lcd.println("Connected MQTT.");
    }
    else {
      Serial.println("[MQTT] Subscribe failed");
    }
  }
  else {
    Serial.printf("[MQTT] Connect failed. State=%d\n", mqtt.state());
  }
  return ok;
}

上記はブローカーへの接続処理です。接続後は subscribe 関数を使って購読します。

接続時にコールバック関数を指定しているため、メッセージを受信した際は以下の関数が呼ばれます。

///
/// MQTT の受信コールバック
///
void onMqttMessage(char* topic, byte* payload, unsigned int length) {
  Serial.printf("[MQTT] Message arrived: topic=%s, len=%u\n", topic, length);

  String msg;
  msg.reserve(length);
  for (unsigned int i = 0; i < length; i++) {
    msg += (char)payload[i];
  }
  Serial.printf("[MQTT] Payload: %s\n", msg.c_str());
  M5.Lcd.fillScreen(BLACK);
  M5.Lcd.setCursor(0, 0);
  M5.Lcd.printf("[MQTT] Payload: %s\n", msg.c_str());
}

引数にトピックやペイロードが含まれているため、それを元に処理を行います。今回はサンプルなので画面に表示するのみです。

最後に、Arduino では loop 関数を用いてループ処理を書きます。この中で、ブローカーからの通知を受け取るために mqtt.loop() を実行します。

///
/// ループ
///
void loop() {
  M5.update();

  // 接続維持
  ensureWifi();
  ensureMqtt();

  if (mqtt.connected()) {
    mqtt.loop();
  }
}

簡単ですが、以上が M5StickC-Plus 向けの実装となります。


最後に

MQTTnet ライブラリのお陰で、Unity でも簡単に MQTT を扱うことができました。 最近は AI グラが多数登場してきています。AI と XR デバイスとの相性もよく、IoT デバイスを用いることでさらに AI の活用の幅が広がると思っています。

今後は AI だけじゃなく、IoT 関連も積極的に追っていこうと思います。

再掲となりますが、よかったらぜひ、ご自身でも色々動かしてみてください。

github.com