e.blog

主にUnity/UE周りのことについてまとめていきます

UnityでThreadを使って処理を分割する

概要

Unityではスレッドを使うことが想定されていません。
というのも、いわゆる「Unity API」と呼ばれる様々なUnityの機能が、メインスレッド以外からは呼び出せない仕様となっているからです。
UIはメインスレッドからのみ操作できるというのと似ていますね。

とはいえ、昨今のゲームでは負荷の高い処理を行う必要があることも少なくありません。
そこで、Unity上でもスレッドを扱う必要が出てきます。

ということで、今回はUnityでスレッドを使う上での注意や実際に使う場合の処理などを書きたいと思います。

今回の記事を書くにあたって、処理負荷軽減の恩恵を感じられるように、Flocking、いわゆる群衆シミュレーションに似た処理をスレッドによって軽減するようにしてみました。
(ただ正直、スレッドの扱いにはそこまで慣れてないのでなんか変なところあったらツッコミ入れてください;)

なお、今回のデモはGithubにアップしてあります。

Flocking

フロッキングとは、いわゆる群衆シミュレーションと呼ばれる、生物が集団で移動する際の状況を「それっぽく」見せるためのアルゴリズムです。
実装自体はとてもシンプルで、いくつかのシンプルな実装を組み合わせるだけで、まるで鳥が集団で飛んでいるかのような状況を作り出すことができます。(Birdroidを短縮してBoid、と呼ばれるのも同じものです)

今回はこのアルゴリズムのうち、いくつかを組み合わせて、リーダー機に従い、それぞれの僚機が一定距離を保って飛行する、という感じのものを作ってみました。

↓こんな感じ。機体の追加と、ターゲットにまとわりつく、みたいな処理のつもりw

こちらの記事(【ゲームAI】フロッキングアルゴリズム)がAIとしてのフロッキングについて解説しているので興味がある人は読んでみてください。

スレッドを使う

今回のサンプルを実装する上で使用したスレッド関連のクラスは以下です。

  • ManualResetEvent
  • Thread

今回の例はシンプルなもののため、スレッドプールなどは使っていません。
また、Unity2017からはC#5.0以降で使えるawaitasyncが使えるようになります。
そのため、Taskなども使えるようになりますが、今回はスレッド自体の説明のためそれらは使用していません。

www.buildinsider.net

qiita.com

(今回のサンプルでは)ManualResetEventクラスを用いて、シグナルを切り替えながら同期処理を行います。
イメージは「信号機」です。セマフォも似た仕組みですね。

ManualResetEventを使い、Resetメソッドで「非シグナル状態」にします。
そしてその後、WaitOneメソッドを実行すると、スレッドはそこで待機状態となり、次にシグナルがオンになるまで停止されます。
シグナルがオンになったら(つまり信号が青になったら)スレッドが再開され、停止していた位置から処理を再開します。

各寮機の位置を更新するクラス

public class UnitWorker
{
    // 非シグナル状態で初期化
    private readonly ManualResetEvent _mre = new ManualResetEvent(false);

    private Thread _thread;
    private bool _isRunning = false;

    private float _timeStep = 0;

    public List<UnitBase> Units { get; set; }

    // コンストラクタ
    public UnitWorker()
    {
        Initialize();
    }

    // 初期化処理
    // スレッドを生成し、スタートさせておく
    private void Initialize()
    {
        _thread = new Thread(ThreadRun);
        _thread.IsBackground = true;
        _thread.Start();
    }

    // スレッドの再開を外部から伝える
    public void Run()
    {
        _timeStep = Time.deltaTime;
        _isRunning = true;
        _mre.Set();
    }

    // 実際の位置計算処理を実行
    private void Calculate()
    {
        UnitBase unit;
        for (int i = 0; i < Units.Count; i++)
        {
            unit = Units[i];
            unit.UpdatePosition(_timeStep);
        }
    }

    // サブスレッドで実行される処理
    private void ThreadRun()
    {
        // シグナル状態になるのを待機する
        _mre.WaitOne();

        try
        {
            // 位置計算のアップデート
            Calculate();
        }
        finally
        {
            // 最後に、非シグナル状態に戻して次回の実行が待機されるようにする
            _isRunning = false;

            _mre.Reset();

            // 新しいスレッドを作ってスタートさせておく(初期化と同じ)
            _thread = new Thread(ThreadRun);
            _thread.IsBackground = true;
            _thread.Start();
        }
    }
}

ユニットを生成・管理するクラス

using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using UnityEngine;

public class DroneFactory : MonoBehaviour
{
    #region ### Variables ###
    [SerializeField]
    private Transform _leader;

    [SerializeField]
    private Transform _target;

    [SerializeField]
    private GameObject _unitPrefab;

    [SerializeField]
    private SteamVR_TrackedController _controller;

    private List<UnitBase> _units = new List<UnitBase>();
    public List<UnitBase> Units
    {
        get { return _units; }
    }

    private UnitWorker[] _unitWorkers = new UnitWorker[4];

    private bool _needsStopThread = false;
    #endregion ### Variables ###

    #region ### MonoBehaviour ###
    private void Start()
    {
        _units = new List<UnitBase>(GetComponentsInChildren<UnitBase>());

        for (int i = 0; i < _unitWorkers.Length; i++)
        {
            _unitWorkers[i] = new UnitWorker();
        }

        GiveUnits();
    }

    private void Update()
    {
        if (Time.frameCount % 5 == 0)
        {
            if (_controller.triggerPressed)
            {
                Injetion();
            }

            if (_controller.menuPressed)
            {
                GenerateUnit();
            }
        }

        for (int i = 0; i < _unitWorkers.Length; i++)
        {
            _unitWorkers[i].Run();
        }
    }
    #endregion ### MonoBehaviour ###

    /// <summary>
    /// 生成した4スレッド分に、計算するユニットを分配する
    /// </summary>
    private void GiveUnits()
    {
        int len = _unitWorkers.Length;
        int range = _units.Count / len;
        for (int i = 0; i < _unitWorkers.Length; i++)
        {
            List<UnitBase> units = _units.GetRange(range * i, range);
            _unitWorkers[i].Units = units;
        }
    }

    /// <summary>
    /// ユニットをターゲットに向けて射出する
    /// </summary>
    private void Injetion()
    {
        UnitBase unit = Units.FirstOrDefault(u => u.Target != _target);
        if (unit != null)
        {
            unit.Target = _target;
        }
    }

    /// <summary>
    /// ユニットを生成してリストに追加する
    /// </summary>
    private void GenerateUnit()
    {
        GameObject unitObj = Instantiate(_unitPrefab, _controller.transform.position, Quaternion.identity);
        UnitBase unit = unitObj.GetComponent<UnitBase>();
        unit.Leader = _leader;
        unit.Speed = Random.Range(0.2f, 0.5f);
        _units.Add(unit);

        GiveUnits();
    }
}

以上が、今回のサンプルの肝部分です。

解説

今回のサンプルは、ManualResetEventを使ってシグナル状態を管理、適切なタイミングでスレッドを起動し、位置を計算、計算後にそれを適用する、という流れになっています。
ポイントはスレッドの生成部分です。

実際はスレッドプールなどを生成して再利用しないと、毎フレームごとにスレッドを生成しているのでコストが高いですが、今回は分かりやすさ重視ということでこういう実装をしています。
スレッドを理解するには、スレッドは、OSからスケジューリングされて、決められた時間だけCPUを使い、計算を行う、という点です。

そのため、今回の_mre.WaitOne()のように、スレッド自体を停止させると、シグナル状態になるまでその処理が停止します。
メインスレッドで常に実行されるStartUpdateは、こうした「停止」処理自体が行なえません。

※ 厳密には、メインスレッドを停止してしまうと画面が固まって見えてしまうので、原則としてメインスレッドを待機状態にすることはまずないでしょう。
結局のところ、メインスレッドも「スレッドのひとつ」であることに変わりはないので、スレッドに対して行える処理は同様に行うことができます。

ざっくりと、理解の助けとなる手順を書くと以下のようになります。

  1. メソッド(ThreadRun)を、生成したスレッドに割り当ててそれを実行状態にする(Thread.Start
  2. ThreadRunメソッドは実行されてすぐに、_mre.WaitOne()によってシグナルを待つ状態に移行する
  3. Runメソッドが実行されると_mre.Set()が呼ばれ、シグナル状態となり、停止していたスレッドが動き出す
  4. スレッド(ThreadRun)の実行は、位置計算の更新処理後、最後のタイミングで再び非シグナル状態に戻し、さらに新しくスレッドを生成して終了する
  5. そして再び_mre.WaitOne()によってスレッドが停止され、以後それを繰り返す

という流れになります。

今回のサンプルではこの、「シグナル状態」が分かれば特にむずかしいことはないと思います。

参考記事

気になるとワスレルナ スレッドプログラミング AutoResetEvent

smdn.jp

スレッドプールの仕組みを作る

さて最後に、少しだけThreadPoolの仕組みを簡単に自作したものを載せておきます。
(ただ前述した通り、スレッドの扱いがまだ慣れてないので、あくまで自分の理解のために書いた感じなので注意してください)

参考: C#非同期処理関連のMSDNの資料読んでみた(2)

使うクラス

  • Thread
  • AutoResetEvent
  • WaitCallback

AutoResetEvent

前述のサンプルでも登場したManualResetEventですが、AutoResetEventというものもあります。 違いは以下です。

イベント待機ハンドル(WaitHandle)により、スレッドは相互に通知を行い、相手の通知を待機して動作を同期することができます。
イベント待機ハンドルは通知されたときに、自動的にリセットされるイベントと手動でリセットするイベントと2種類に分けられます。

ManualとAutoの違いはまさにこの「自動リセット」か「手動リセット」かの違いとなります。

AutoResetEventは待機中のスレッドがなくなると自動的に非シグナル状態へと遷移します。
一方、ManualResetEventは、Reset()を呼び出し、手動で非シグナル状態に戻す必要があります。

以下の記事が、ManualとAutoの違いの比較コードを載せてくれているので、興味がある人は読んでみてください。
参考: https://codezine.jp/article/detail/139#waithandle

※ それぞれのクラスはWaitHandleクラスを継承した派生クラスとなっています。

WaitHandle

Win32同期ハンドルをカプセル化し、複数の待機操作を実行するための抽象クラス。
派生クラスには上記以外に、Mutex, EventWaitHandle, Semaphoreなどがあります。

「待機ハンドル」と呼ばれるWaitHandleオブジェクトは、スレッドの同期に使われます。
待機ハンドルの状態には「シグナル状態」と「非シグナル状態」の2つがあり、待機ハンドルをどのスレッドも所有していなければ「シグナル状態」、所有していれば「非シグナル状態」となります。
WaitHandle.WaitOneメソッドなどを使うことにより、待機ハンドルがシグナル状態になるまでスレッドをブロックすることができます。
イメージ的には、「シグナル状態」は「青信号」で「非シグナル状態」は「赤信号」です。
つまり、非シグナル状態=赤信号の場合は、シグナル状態=青信号になるまで待機する、というわけですね。

WaitCallback

Define:
[ComVisibleAttribute(true)]
public delegate void WaitCallback(object state);

state ... コールバックメソッドが使用する情報を格納したオブジェクト。void*型と思えばよさげ。

コードサンプル

using System.Collections;
using System.Collections.Generic;
using System.Threading;
using UnityEngine;

public class SimpleThreadPool : MonoBehaviour
{
    /// <summary>
    /// サブスレッドタスク
    /// </summary>
    class Task
    {
        public WaitCallback Callback;
        public object Args;
    }

    private Queue<Task> _taskQueue = new Queue<Task>();
    private Thread _thread;
    private AutoResetEvent _are = new AutoResetEvent(false);
    private bool _isRunning = false;

    private int _id = 0;

    private void Start()
    {
        _isRunning = true;
        _thread = new Thread(new ThreadStart(ThreadProc));
        _thread.Start();
    }

    private void Update()
    {
        if (Input.GetKeyDown(KeyCode.T))
        {
            AddTask();
        }

        if (Input.GetKeyDown(KeyCode.A))
        {
            for (int i = 0; i < 30; i++)
            {
                AddTask();
            }
        }
    }

    private void AddTask()
    {
        Task task = new Task
        {
            Callback = new WaitCallback(TaskProc),
            Args = (object)_id++,
        };

        _taskQueue.Enqueue(task);

        Debug.Log("Added task. Task count is " + _taskQueue.Count);

        if (_taskQueue.Count == 1)
        {
            _are.Set();
        }
    }

    private void TaskProc(object args)
    {
        Debug.Log("Task Proc.");

        Thread.Sleep(500);

        int id = (int)args;
        Debug.LogFormat("Task {0} is finished.", id);

        _are.Set();
    }

    private void ThreadProc()
    {
        while (_isRunning)
        {
            _are.WaitOne();

            if (_taskQueue.Count > 0)
            {
                Task task = _taskQueue.Dequeue();
                task.Callback(task.Args);
            }
        }
    }
}

こちらのサンプルでは、常にタスクを監視して実行するThreadProcをサブスレッドで実行し、タスクキューにタスクを追加することでスレッド処理を行っているサンプルです。
タスクが追加されるまではスレッドは停止状態になりますが、タスクが追加されるとスレッドが起動されて、キューからタスクを取り出し実行します。

今回はサンプルのため、タスク処理の中でシグナル状態を制御していますが、汎用的にタスクを追加することを考えるとここは内部で適切に管理する必要があるでしょう。

WaitCallbackでタスクを登録する

タスクの処理はTaskProcで行っていますが、タスク自体はWaitCallbackクラスに、処理してもらいたいメソッドを登録して生成しています。
定義はdelegateになっていて、object型の引数をひとつ受け取るデリゲートです。

なので、void*型のように使用して、内部で適切にキャストしてあげる必要があります。

このように、スレッドを必要数起動させておいて、タスクをあとからキューに追加する形で実行するので、スレッドの新規生成を挟まず、生成コストを削減することができるようになります。

その他

以前、C言語のスレッドについて、書籍からのメモを書いた記事もあるので、そちらも合わせて読んでみてください。

qiita.com