この記事は、Play or Scala Advent Calendar 2012の25日めです。
Looking back 2012
2012年は、
など、個人的にはPlayとScalaが身近な世界で躍進した年でした。そんな年にアドベントカレンダーの最終日を担当するというのは、何か感慨深いものがあります!最近はPlayやScalaが実践で使われ始めた影響か、バイナリ互換性やビルドツール、習得面などの実践的な課題が色々と話題になっています。課題に対しては来年移行も粛々と対応をしていき、Scalaの今後の発展に寄与していきたいと思います
さて、本題に入ります。先日、AkkaでMMOのサーバ(ほんの小さなものですが)を書きました。その時に使ったScala関連の技術をいくつかご紹介したいと思います。(ゲームロジックの詳細は今回は省きます)
Why
そもそも何故いまMMOのサーバなのか!身も蓋もない言い方をすると、「前々から書いてみたかった」んです。
I love WebDev
軽く自己紹介をさせていただきますが、僕は新卒からWeb業界で働き初めてはや5年くらいになる20代Webエンジニアです。なぜWeb業界にいるのかというと、
- Webのコモディティ化が著しい昨今、これからのエンジニアはWebができて当たり前、WebエンジニアリングはWebエンジニアにとってのそろばんなのではないか!
- 流行りものだからやるなら今だろう!
- お金になりそう!
- 日々すごい技術が生まれては消えていき、さらには教育機関と連携したりしちゃって日々自然言語処理を活用した新しいサービスが生まれているのではないか!
5年くらい働いてみて、僕にとってのWebはこのキラッキラしたものとは無縁だなーというのが実感です。黙ってたらみんな古い技術使い続けるし、流行り過ぎてエンジニアの市場価値としてはどうなんだ、お金は基本的に儲からない、教育機関と連携した先端技術もなにそれ美味しいの状態だし研究開発は縮小する一方って感じですし。
それでも僕の考えるWeb開発やプログラミングが楽しく、それでいてエンジニアとしての今後の成長を視野にいれて、微力ながらScalaやPlayを普及する活動を公私にわたって行なってきました。Web業界を輝かせるために1エンジニアとしてできることを考えた結果が今、です。
I love GameDev
で、今回はMMOのサーバを書きました。これは、上記のWeb業界をエンジニアリングの立場から盛り上げたり底上げするみたいな大それた考えとは一切関係なく、僕のもっと個人的な趣味であり、夢です。
僕の子供時代といえば1990年代。ファミコンやスーパーファミコンが全盛期で、僕はマリオ、くにおくん、ドラクエ、FF、聖剣伝説に育てられたといっても過言ではありません。
そんな僕が小学生時代にPC98 CanBeとVisual Basic 2.0を与えられて、ゲームプログラマになりたがらないわけがないですよね?学校サボってDiabloとかLineageとかUltima Onlineやってたら、ネトゲ開発者になりたがらないわけがないですよね?でも諸々の理由でサラリーマンとしてゲームプログラマーになりたいとは思わなかったので、仕事以外の時間を使ってライフワークとしてやっていこうと思ってます。ネトゲをつくりたいなーとは以前から考えていたので、今回の試みはそのステップ1です。
最近色々な勉強会で同業の方々とお話する機会があるのですが、ソフトウェアエンジニアになったキッカケは「ゲーム!ゲーム作りたい!」だった、という人って結構います。そういった方にゲームづくりに使える技術がこんなところにあるんだよー、ということが伝わるといいなーという思いもあります。
Scala, Akka, STMs for hobbysts
また、アドベントカレンダー的には、「なんと趣味でネトゲのサーバをさっくりと書けるような技術が個人の手に入るようになっていました!それがScalaとAkkaとSTMでした!」という点もポイントです(・∀・)
Technology topics
今回のサーバアプリケーションですが、目標としたのは以下の3点です。
- エレガントに。泥臭くスレッドプログラミングをせずに可用性、並列性が高く高速なネットワークアプリケーションを構築する
- 安定性。マルチスレッド環境で安定して動くものにする(共有メモリの問題をちゃんと考える)
- 開発速度。ゲームロジック以外のところは極力ありものを使って時間を節約する(アドベントカレンダーに間に合わせる)
サーバはScala + Akka、クライアントはUnity 4 + C# 4で実装
通信にはTCP/IP、マーシャリングにはApache Thrift(のプロトコル部分のみ)を利用
ワールドの状態保持のため、ScalaSTMとAkka Agentによる共有メモリを利用
Akka IOとIterateeを利用した関数型のソケットプログラミング
今回はPlay・Scalaアドベントカレンダーなので、サーバをScalaとAkkaで実装するにあたってのポイントをいくつかご紹介したいと思います。
Marshallling messages between Scala and C#, with Apache Thrift
Apache Thriftは、複数のプログラミング言語をまたがるようなシステムや、いわゆるRPCを活用したアプリケーションを開発するためのフレームワークです。
Thriftにはサーバの機能も含まれているのですが、今回はサーバにAkkaとAkka IOを使ったTCP/IPをしゃべるサーバを自前で実装することにしたのでそれは使いません。単にメッセージをプログラミング言語間でやりとりするためにマーシャリングする手段として使います。
なお、マーシャリングに使えるシリアライズ方式としてGoogleのProtocol BufferやJSONも考えられましたが、今回はほどほどの速度と各言語向けのバインディングを公式に用意しているという点を両立しているのがThriftだったので、Thriftを採用しました。
Thriftではインタフェース定義を.thriftという拡張子のファイルに独自のDSLで記述します。この場合、インタフェース定義というのは、プログラミング言語をまたいで送受信するメッセージの形式を定めたものです。そして、.thriftファイルをThriftのジェネレータに渡すと、複数のプログラミング言語向けのコードを一括生成してくれます。今回はScalaとC#間でメッセージをやり取りしたかったので、うってつけですね。
例えば、今回はネットワークゲームへプレイヤーが参加するときのJoinメッセージは以下のように記述しました。
struct Join {
1: string name,
}
structとあるように、Thriftでは独自の構造体を自由に定義できます。1: stringとありますが、これは一番目の要素がstring型であるということを意味しています。このように番号をつけておくことで、要素が削除されてもそれを欠番にすることで前方互換性を保つことができます。
なお、これをジェネレータにかけるとかなりやばい行数のソースが生成されるので、工数削減したった感が出ます!
これでインタフェース定義に基づいたDTOとプロトコルの実装が各言語向けに生成されるので、それを応用してマーシャリングを行います。なお、Scala向けの公式のジェネレータは存在しないので、今回はJava向けのジェネレータを使ってJavaソースを生成、それをScalaから呼び出すことにしました。こういう時にJavaとの親和性が高いのは助かりますね。
例えば、Scalaでは以下のようなコードになります。
アンマーシャリング(バイトデータ=>メッセージ)
import java.io.ByteArrayInputStream import org.apache.thrift.protocol.TBinaryProtocol import org.apache.thrift.transport.TIOStreamTransport val bais = new ByteArrayInputStream(bytes) val transport = new TIOStreamTransport(bais) val protocol = new TBinaryProtocol(transport) val join = new message.Join() join.read(protocol)
マーシャリング(メッセージ=>バイトデータ)
val baos = new java.io.ByteArrayOutputStream()
val transport = new TIOStreamTransport(baos)
val protocol = new TBinaryProtocol(transport)
val join = new message.Join("mumoshu")
protocol.write(join)
baos.toByteArray
Writing a MMO server with Akka IO and Iteratee's
Akka IOは、低レベルのIO処理を詳細に気を取られずにAkkaと使って非同期IOを伴う計算を実装するためのモジュールです。内部的にはJava NIOをラップした機能が色々と揃っていて、ソケットにも対応しています。今回は手続き的なソケットプログラミングの代わりに、Akka IOを使って関数型プログラミング的に通信部分を書きました。
サーバのコードを要約すると、以下の様な感じになります。(実際のコードはもっと汚いので心してお読みください( ^ω^ ))
// サーバのインスタンス(Actor)を返す
def createServer(port: Int = 1234) = actor(new Act with ActorLogging {
// ゲーム世界の状態を保持するAkka Agent(後述)
// 状態を保持するために単にvarを使わないのは、synchronizedやロックによる排他制御を行わず、もっと安全で高速な実装にするためです。
val world = Agent(new World)
// IOを処理するIterateeを保持するMapです。今回は1ソケットにつき1つのIterateeが割り当てられます
val state = IO.IterateeRef.Map.async[IO.Handle]()(context.dispatcher)
val address = new InetSocketAddress(port)
// IOの詳細はAkkaのIOManagerが見てくれます
val socket = IOManager(context.system) listen address
// クライアントから送信されてきたバイト列を大まかに分割します。
// 今回は「フレーム」という単位でメッセージが梱包されたものがバイトデータにマーシャリングされて送られてくるものとします。
// フレームには、ヘッダとボディがあり、ヘッダにはボディのバイト長、ボディにはThriftでマーシャリングされたメッセージのバイトデータが含まれます。
// ここでは、バイト列をフレームに戻してあげます
val FrameDecoder: IO.Iteratee[ByteString] = for {
bodyLenBytes <- IO.take(4) // まずヘッダを読み取り
bodyLen = bodyLenBytes.iterator.getInt // ボディの長さを計算
bodyBytes <- IO.take(bodyLen) // 長さのぶんだけバイトデータを読み取りボディとする
} yield {
bodyBytes // ボディを返す
}
// お待ちかねのIterateeです。
// クライアントから送信されてきたバイトデータをとにかくこのIterateeに食わせます。
// すると、上記のFrameDecoderによりフレームのボディ部分だけが1つずつ返ってくるので、ボディをThriftでアンマーシャリングして処理します
def processBytes(handle: IO.SocketHandle): IO.Iteratee[Unit] = {
IO repeat {
for {
bodyBytes <- FrameDecoder
} yield {
processBodyBytes(handle, bodyBytes)
}
}
}
// IO.SocketHandleはソケットの参照で、接続先を識別するIDを得たり、バイトデータを読み書きするために使えます。
// bytesは読み込んだチャンクです。1チャンクにシリアライズされた1個以上のメッセージが含まれます。
def processBodyBytes(handle: IO.SocketHandle, bodyBytes: ByteString) {
val id = handleToId(handle)
val message = deserialize(bodyBytes)
atomic { txn =>
message foreach {
// 新規プレイヤーの参加
// SEND(クライアントから一方的に通知するだけ) の通信例です。
case m: message.Join =>
// ゲームに参加中のプレイヤーが増える、つまりゲーム世界の状態が書き換わります
world send {
_.join(new Player(id = id, name = m.name))
}
// 接続中の他のプレイヤーに、新規プレイヤー参加を通知します(詳細は省略)
publish(m)
// 任意のプレイヤーの位置確認を行うためのメッセージ。
// REQUEST(クライアントが要求して)・RESPONSE(サーバが返答する)の通信例です。
case m: message.GetPosition =>
// まず要求を理解して
val targetId = strToId(m.id)
world.get().getPositionById(targetId) foreach { p =>
// 返答をバイト列にマーシャリングしてソケットに書き込みます
val rep: thrift.Position = new serializers.thrift.Position(targetId.str, p.x.toFloat, p.z.toFloat)
val repBytes = protocol.serialize(rep)
handle.asWritable.write(FrameEncoder(repBytes))
}
}
}
}
// ここまでが通信部分の実装でした。バックエンドがTCP/IPであることを意識させないコードになっていたと思います。
// ここからActorの振る舞いを記述します。
become {
// 新しい接続要求がきたら、それを受け付けます。
case IO.NewClient(server) ⇒
val socket = server.accept()
socket.write(FrameEncoder(ByteString("You are connected!")))
// このクライアントとの通信は先ほど定義したprocessBytes Iterateeが担当します
state(socket) flatMap (_ => processBytes(socket))
// クライアントからバイトデータのチャンクが送られてきたら
case IO.Read(handle, bytes) ⇒
// Iterateeに食わせます。後のことはIterateeさんにまかせた。
state(handle)(IO Chunk bytes)
// ソケットが閉じたら
case IO.Closed(socket, cause) =>
log.debug("Socket closed: " + cause)
// Iterateeを止めます
state(socket)(IO EOF)
state -= socket
case unexpected =>
log.debug("Unexpected message: " + unexpected)
}
} // おつかれさまでした
What is ScalaSTM
STMとはSoftware Transactional Memoryの略で、その名の通りミドルウェアの機能に頼らず言語側で共有メモリに対するアトミックな操作を可能とするための技術です。Scalaでは今回使ったScalaSTMが有名です。AkkaのSTM絡みの機能もScalaSTMを使って実現されていたります。
アトミックな操作を実現するためには、共有メモリへの変更が競合するときにロールバックする、という考え方が必要になります。ScalaSTMでは、共有メモリとなる変数を独自のコンテナでラップして、そのコンテナの操作を記録、競合が発生しない場合にのみ記録した操作をコミットする、というアイデアが使われているようです。(ツッコミお待ちしてます!)
使い方はこんな感じです。ScalaSTMは他のライブラリに一切依存しない、単一のJARからなるライブラリです。
ScalaSTMには以下の機能があります。
- 複数のSTM実装をサポートするような統一的なAPI
- CCSTMをベースとしたSTMのリファレンス実装
- スケーラビリティと並列性に優れたSetとMap(高速なスナップショット機能を含む)
- Refというデータ構造を用意。イミュータブルなオブジェクトとRefを利用すると、
で実現できます。
- 複数スレッドやアクターからアクセス可能な共有メモリを
- synchronizedなし
- デッドロックやレースコンディションなし
- いい感じのスケーラビリティ
また、JavaのwaitやnotifyAllより安全な代替え機能が提供されています。
import scala.concurrent.stm._
val count = Ref(0)
atomic { implicit txn =>
// read
val c = count()
// write
count() = c + 1
}
Refでラップした共有メモリを、atomicブロック内で読み書きするだけというお手軽さです。
もう少し詳しいコード例は公式のSyntax Cheat Sheetを参照してください。
Reactive, asynchronous shared memory manipulation with Akka Agent
AgentはScalaSTMをベースに、共有メモリに非同期処理の機能を持たせたものです。
実用上は、Ref同様にアトミックな読み書きができるだけでなく、書き込みをキューイングさせて非同期的に処理したり、書き込みを待ってから読み込みするといった機能があります。トランザクション中にメインスレッドで行いたくないような重い計算やIOを行う場合はAgentを使うといいしょう。
使い方もScalaSTMとそうかわりなく、
import akka.actor.ActorSystem
import akka.agent.Agent
implicit val system = ActorSystem("app")
val counter = Agent(0)
// 読み書き
counter send { _ + 1 }
// 読み
val c = count()
sendで関数として表現されたアトミックな操作をAgentに送る。送った後はAgentがそれを処理するのに任せる、という考え方です。
読み込みはRefと同じですね。
なお、STMはトランザクション外での書き込みができませんが、Agentはトランザクション外でも書き込み可能です。atomicブロック外なら暗黙的にトランザクションを開始し、内であれば既にあるトランザクションに参加してくれるだけです。一度に一箇所の共有メモリをatomicに操作できればいいだけであれば、Agentのほうが手軽です。また、非同期処理の機能はAkkaと非常に相性がいいですね。
Conclusion
Scala + AkkaでMMOのサーバを記述するための技術要素を抜粋して解説しました。Scala、Akka、Thriftなどの既存技術を組み合わせることで、意外と組めてしまいそうな感じが伝わったでしょうか?
また、今回解説してみて気づきましたが、このレベルであれば、MMOのサーバに限らずリアルタイムなネットワークアプリケーションであれば普通に必要な機能ですね。今後リアルタイムWebが来るのであれば、Web業界でこれら技術が普通に使われる日も来るかもしれません。
この記事ではカバーできていませんが、実サービスになるとさらに、
- シャーディング(Ultima Onlineの時代から有名な、エリア毎にマップを分ける方式)
- クラスタリング(ゲーム世界の各部分を複数のノードが合同で受け持ち、可用性を高くする)
- ゲームデータの保存の問題(どのタイミングで、何を保存するか)
以上、Play or Scala Advent Calendar 2012、25日めでした。
Thank you
最後に。本年はPlay、Scalaについて特に日本で大きな動きがあった一年だった思います。これも全国でPlay、Scalaのコミュニティを運営されている皆さん、勉強会を開催・参加している皆さん、twitter上で毎日のようにPlayについて熱い議論を交わしている皆さんをはじめ、日本の素晴らしいPlay or Scalaユーザの皆さんのお陰だと思います。
日本のPlay・Scalaユーザの皆さんに、
PlayやScalaを日々利用している一エンジニアとして、心より感謝申し上げます。
本年はお世話になりました!
来年もよろしくお願いいたします!!
P.S. 本当はデモを公開したかったのですが、Unity4にはまって間に合いそうにないので後日(´・ω:;.:...