メインコンテンツまでスキップ
バージョン: 1.1.0

システム概要

概要

Playable! Map Scanner はゲームのフィールドをスキャンし、自動テストに用いる 3 次元データを生成するツールです。 レイキャストを取り付けたプレイヤーをマップ全体に移動させることでスキャンを実現します。

本ツールは、マップ全体を多数の小さな領域に分割してタスクを割り振る 1 つのサーバープログラムと、 サーバーから割り振られたタスクに従って実際にスキャンを行う複数並列のクライアントプログラムで構成されています。

Map Scanner の各クライアントは独立したスレッドで動いていますが、 サーバー側でクライアントから送信されたデータを受信する処理は単一プロセスで動いているため、データの競合防止のためにスレッドロックを使用しています。

image

ファイル構成

Python
├── alfort +-----------------------------+ ゲーム固有のコード
│   ├── custom_client_agents.py +-------+ クライアントの Agent 挙動に関するコード
│   ├── custom_client_thread.py +-------+ クライアントのメインループ
│   ├── custom_container.py +-----------+ ゲームから送信された情報を Python で扱いやすいように加工
│   ├── custom_controller.py +----------+ プレイヤーの行動に関するパッド入力定義
│   ├── custom_order.py +---------------+ タイトル画面やメニュー操作に関するパッド入力定義
│   └── custom_wrapper.py +-------------+ ゲーム環境との通信に関わるクラス
│ 
├── base +-------------------------------+ ゲーム共通のコード
│   ├── base_client_agents.py +---------+ クライアントの Agent 挙動に関するコード
│   ├── base_client_thread.py +---------+ クライアントのメインループ
│   ├── base_container.py +-------------+ ゲームから送信された情報を Python で扱いやすいように加工
│   ├── base_controller.py +------------+ プレイヤーの行動に関するパッド入力定義
│   ├── base_order.py +-----------------+ タイトル画面やメニュー操作に関するパッド入力定義
│   ├── base_wrapper.py +---------------+ ゲーム環境との通信に関わるクラス
│   └── server_global.py +--------------+ サーバーがグローバル (スレッド共有) に保持するデータに関するコード

├── font +-------------------------------+ GUI 上で使用する日本語フォント
├── map_data +---------------------------+ スキャンデータおよび関連ファイル
├── utils +------------------------------+ 汎用便利モジュール

├── Astar_model.pyd +-------------------+ Astar 探索クラス
├── map_scanner_client.py +-------------+ クライアントプログラム
├── map_scanner_server.py +-------------+ サーバープログラム
└── map_scanner_searcher.py +-----------+ 経路探索ツール

map_scanner_server.py

プログラムの実行順序関係

map_scanner_server.py
└── server_global.py

メインループ

# socket 接続サイクル
while True:
# プロセスプールへの新規 socket 通信を追加する
clientsock, clientaddress = self.server_socket.accept()
()
# 実行中タスク表示 UI の更新
self.update_server_UI()
t = threading.Thread(target=self.tcplink, args=(clientsock, clientaddress), name=clientaddress)
t.start()

# socket 通信サイクル
try:
while True:
received_msg = sock.recv(self.BUFFER_SIZE).decode('utf-8')
if received_msg != "" and received_msg is not None:
# クライアントにマップ情報を送信する
if received_msg == "map_info":
...
# タスクリストを送信する
elif received_msg == "scan_task":
...
# スキャン結果の取得
elif received_msg == "scan_data":
...
# スキャン失敗したタスクを回収する
elif received_msg == "task_failed":
...
# タスク進捗バーの更新
...
# 接続終了後に socket 通信を削除する
except Exception as e:
...
# 実行中タスク表示 UI の更新
self.update_server_UI()

map_scanner_client.py

プログラムの実行順序関係

map_scanner_client.py
├── util_modules.py
├── custom_client_thread.py
│ └── custom_agents.py
│   ├── custom_container.py
│   └── custom_controller.py
├── custom_order.py
└── custom_wrapper.py
└── custom_container.py

メインループ

# ゲーム通信ループ
while True:
try:
pad_input = PseudoPadInput()
# 有限オートマトン
task_end, pad_input = self.state_machines(pad_input)
dpg.set_value(f"thr_{self.thread_index}_thread_state", self.thread_state)
if self.thread_state == "scan move":
# 溜まっているデータをすべて remaining_datas に追加
self.game_data_now, datas = self.environment.step_all_queue(pad_input)
self.remaining_datas.extend(datas)
if len(self.remaining_datas) > 1:
# スキャン中に一個だけ処理することで最後の処理時間を軽減
self.save_scan_data(self.remaining_datas.pop())
dpg.set_value(f"thr_{self.thread_index}_remaining_size", len(self.remaining_datas))
elif self.thread_state == "wait subthread":
self.game_data_now, _ = self.environment.step_all_queue(pad_input)
else:
self.game_data_now = self.environment.step(pad_input)
if task_end:
break
except Exception as e:
traceback.print_exc()
print(f"Expection :{e}")
if self.reconnect:
self.reconnect = False
self.environment.reconnect()
else:
break

def state_machines(self, pad_input: PseudoPadInput):
"""
有限オートマトン
入力された環境情報に基づいて次の行動を決定する
"""
task_end = False
try:
# tips処理
if self.game_data_now["Alfort"]["IsShowMessage"]:
...
except:
pass

if self.thread_state == "Executing Order":
# メニュー関連コマンド
...

elif self.thread_state == "reconnect":
# タイトル画面を突破する
...

elif self.thread_state == "game setting":
# デバッグ機能の設定
...

elif self.thread_state == "get task":
# 次のタスクリストの取得,完了したらスキャン状態に移行
...

elif self.thread_state == "scan move":
# スキャン状態
...

elif self.thread_state == "save task":
# スキャンデータをサーバーに送る
...

elif self.thread_state == "Task End":
# タスク終了
task_end = True

return task_end, pad_input

map_scanner_searcher.py

プログラムの実行順序関係

map_scanner_searcher.py
├── util_modules.py
└── Astar_model.pyd

探索とグラフデータ

プレイヤーから行われるレイキャストは、大きく分けて垂直方向と水平方向の 2 通りが存在します。 水平方向にスキャンされたデータは「壁データ」(_is_wall)、垂直方向にスキャンされたデータは「地面データ」(_is_road) とみなすことができます。

また、ゲーム固有の移動方法 (しゃがみ移動、水面移動、潜水移動、流砂ギミックなど) に対応するためのスキャンデータが必要な場合もあります。 これらのデータに基づいて、キャラクターが実際のプレーで到達できる座標の集合を計算し、A* (A-star) 探索をサポートするためのグラフデータを生成します。

以下に具体的に行っている内容を解説します。なお、分かりやすいようにサンプルゲームにおける実装の解説を併記します。

hitbox 情報のテンソル変換

「hitbox の中心座標、角度、サイズ」のデータから、スキャンデータと同じスケールのボクセルデータに変換します。 このデータは np.array(bool) 型であり、hitbox が存在する箇所が 1、そうでない箇所が 0 となっています。

サンプルゲームでの例

サンプルゲームではマップ移動判定となるコリジョンの向こう側にも地形が存在するため、同コリジョンの手前で探索範囲を打ち切る必要があります。 よって、ゲームから取得した同コリジョンの位置とサイズなどのデータを元に、スキャンデータ上に同コリジョンを通行不可能な障害物として反映しています。 回転後の座標間隔が最大で 1 から √2 に変わることを考え、スキャンデータの倍の密度を持つ行列を構築し、スキャンデータの密度に合わせて回転計算を行います。

def load_hit_box(self, map_name, fineness_size):
...

image

キャラクターが通行可能な座標の計算

キャラクターが通行可能であるには、少なくともキャラクターの身長よりも大きい縦幅を持つ空間が必要となります。 ここでは、それを満たす全ての座標を通行可能な座標として扱います。

サンプルゲームでの例

サンプルゲームの場合、キャラクターの身長をボクセル 4 つ分とみなしています。

def _pass_walk_obstacle_floor(_is_road, _is_wall, _is_hitbox, _is_water):
...

image

キャラクターが歩行可能な座標の計算

「水面データ」と「地面データ」と「壁データ」の OR を取った時に現れる全ての水平面を立つことが可能な座標として扱い、 通行可能な座標のデータ、および局所的な傾斜データに基づいて、立つことができる座標の中でも歩行可能な座標を抽出します。

サンプルゲームでの例

サンプルゲームの場合、45 度以上の斜面ではキャラクターが滑り落ちるため、歩行可能な座標としてみなすことができません。 そのため、各座標とその一定範囲内の座標を参照し、各座標の傾斜度が閾値以下であるかどうかを計算しています。

def _search_floor(_is_pass, _is_walk):
...

image

歩行可能な座標の地形コストの計算

歩行可能な座標ごとに、「周囲の地形から考えてどの方向に移動するのが安全か」を判断する際の重みを計算します。 険しい地形はキャラクター移動時に障害となる可能性があるため、「地形コスト」を考慮することで探索アルゴリズムがより平坦な地形を優先して選択できるようにします。 歩行可能な座標と判断された箇所に対し、以下のようにコストを割り振ります。

  • 周囲の高さは中心より高い → cost = 1
  • 周囲と中心の高さが同じ → cost = 2
  • 周囲の高さは中心より低い → cost = 1
    なお、ここでは cost の値が大きい方が より安全に移動できる可能性が高い 座標であることを表しています。
def _cost_floor(_is_search, x_size, y_size):
...

image

水面領域としゃがみ領域の計算

ゲーム固有の移動方法 (しゃがみ移動、水面移動、潜水移動、流砂ギミックなど) で到達できる領域を計算します。

def _water_expansion(_is_pass, _is_walk):
...
def _squat_floor(_is_pass, _is_walk):
...

サンプルゲームでの例

サンプルゲームでは該当する仕様がないため、これらを考慮していません。

基準点から到達可能な座標の取得

予め決めておいた基準点を出発し、実際のプレーで到達できるであろう全ての座標を列挙します。
ここでは各座標を グラフ理論 におけるノード (頂点) とみなし、移動可能な座標間にエッジ (辺) が張られるようなグラフを構築することで、全ての座標間の移動可否関係を得ることを考えます。

また、3D のゲームなら次のことも考える必要があります。

  1. 重力を考慮すると、高所から落ちるとジャンプしても元の道には戻れない
  2. プレイヤーは様々かつ特殊な移動方法を持ち、移動経路のバリエーションに富む
  3. 最短に近い経路を求めることが重要な一方、安定して移動成功できることも重要

これらを考慮し、ここでは 重み付きの有向グラフ を構築します。

基本的に次の手順に従って作成しています。

  1. 事前処理として、幅優先探索を使ってプレイヤーが到達可能な範囲を決定
  2. 上記の範囲のすべての座標 (=ノード) に対して、到達可能な座標間の近接関係を取得し、グラフを構築

これにより、

  • 到達可能なすべてのノード
  • 各ノードからそれぞれ到達可能な隣接ノード
  • ノードとノードの間のコスト

といった情報を持つグラフを構築することができます。

注記

グラフデータの生成と呼び出しのコードは、より高速な実行を実現するために C++ で書かれています。

    py::array_t<bool> maker_graph(string file_name)
...