
私がScalaを学んで感じたこと
Scalaという言語は、データ処理を分散して実行する必要のある企業で導入されています。例えば、LINEや銀行のオンラインシステムでデータパイプライン処理部分に使われています。まさに言語が開発された動機と合致することだと思います。
また巷では、学習コストが高く難しい言語だと思われています。この記事では、私が学習して感じた点を紹介します。単にScalaという言語を学んだだけでなく、私の中でいろいろなことが整理できました。これから新たに言語を学ぼうとしている方の参考になれば幸いです。
私がScalaに興味を示した理由
- 並行並列処理に優れているといわれている。
- 私自身、更なる処理速度を求めるには必要な考え方だと思っており、その技術を学んでみたいと思った。
- Scalaは、アクターシステムという考え方が根底にあり、直感的にWEBシステムとの親和性が高いのではと思った。
- アクターシステムはスレッドセーフなデータ共有が簡単に実現できる。
- Scalaの並行並列処理を提供するAkkaというライブラリやPlayというWEBシステム構築用フレームワークが強力である。
- Java VM上で動作し、Javaのライブラリをそのまま使用できる。
- 既存の資産をそのまま使用できるので非常に開発効率が良いことが予想できる。
Scalaの特徴
勉強した際に知った特徴は下記の点です。
-
マルチパラダイム言語であること:
- オブジェクト指向の考え方を土台に、関数型プログラミングの要素を取り入れており、プログラムをすっきりと記述できます。
-
型安全なプログラミング:
- 型の構成を重視したスタイルで処理を構築できるため、型安全なコードを書けます。これにより、コンパイル段階で多くのバグを検出できます。
- Scalaのmatch文やcase文はリスコフの置換原則を用いた型比較ができるので、処理が簡素になります。
-
優れた型推論:
- 型推論が優れているため、自明な型定義を省略でき、コードの冗長な部分が削減され、理解しやすくなります。
-
ライブラリ管理ができるビルドツール:
- Scalaのコンパイル管理には、sbt (Simple Build Tool) というツールがあり、このツールを使ってライブラリ管理が可能です。
- このツールがあればコンソール上でビルドすることが苦労なく可能となり、ビルドの定義もScalaの構文で記述できます。
- このツールの定義ファイルで、処理で使用するオブジェクトのインジェクションを簡単に行うことができました。このことにより、処理で使用するDBを簡単に入れ替え可能であったのが驚きでした。
-
並行並列処理の考え方:
- Scalaではアクターシステムという考え方を取り入れており、並行並列処理の記述が容易に可能です。
- 従来はScalaの標準ライブラリで提供していたアクターシステムですが、より強力な機能を持つAkkaというライブラリ群を用いて並行並列処理を構成する方法がとられています。
- Akkaとは、何かの略称ではなく、リアクティブプログラミングを支援するフレームワークで、アクターモデルを使って構築されています。 ※以降の章でAkkaでアクターを使ったScalaのコーディング例を示します。
Akkaのアクターシステムについて
Akkaというライブラリは、以下の通り言い表せます。
- Akkaにおけるアクターの利用
- Akkaはアクターモデルを提供しています。この考え方に基づいて以下の要素(2.~4.)を構築しています。
- 並行処理の単位としてアクターを使用します。
- アクターは軽量なエンティティであり、物理スレッドとは異なります。
- スレッドプール上でアクターを動作させて、スレッドを効率的に管理します。
- リアクティブシステムの構築
- リアクティブシステムを構築するためのライブラリです。
- 特性として「応答性」、「レジリエンス」、「スケーラビリティ」、「メッセージ駆動」を実現しています。
- アクターモデルを導入しています。
- アクターモデルの役割
- 並行性や非同期処理を効率的に管理するための概念です。
- リアクティブシステムの構築に適しています。
- アクターとスレッドの管理
- アクターがメッセージを受信すると、スレッドプールからスレッドが割り当てられます。
- 多数のアクターが存在しても、少ない物理スレッドでリソースを効率的に利用可能です。
- アクターの数が多くても物理リソースが圧迫されることはありません。
ここでは、Akkaライブラリを使ってプログラム間で通信する例を以下に示します。Akkaのアクターシステムを使ってみたい人に1つの例として役に立てれば幸いです。
また上記ライブラリはScalaと同様、Lightbendという会社が作成管理しています。なおScalaはマーティン・オーダスキー教授が開発しました。
説明に使用する処理の概略
以降で説明する処理は以下の文脈を持つものとします。
- 「非アクタープロセス」が使用するアクターを生成し、中継アクターへ開始メッセージを送信し、中継アアクターからの実行結果を待つ。
- 「中継アクター」は、開始メッセージ受信後に「蓄積アクター」へ蓄積データを送信し、結果を取得し応答を「非アクタープロセス」へ返す。
- 「蓄積アクター」は蓄積データを保持し、結果取得メッセージを受けて結果を返す。
上記文脈に出てくる処理部を説明します。
- 「非アクタープロセス」:いわゆるコンソールから実行されるプログラムに当たり、今回作成する処理のなかで要点となる部分は2つあります。1つは必要なアクターの生成処理です。また2つ目はアクターへ実行トリガーを送信し、アクターから結果を受信することです。
- アクター
- 「中継アクター」:主にアクター外からのメッセージを受け取る処理を担当する部分です。
- 「蓄積アクター」:このアクターはアクター間の状態(データ)の蓄積と参照を提供する部分です。
- 「ログ出力アクター」:このアクターはログを出力する場合に使用されます(Akkaライブラリにあらかじめ用意されています)。
この構成に基づいた処理で以下のことが説明できていると思います。
-
並行並列処理としてアクターを用いる方法。
-
アクターではない処理からアクター用メッセージを送信する方法。
-
アクター間でメッセージを送受信する方法。
-
アクター内で情報を蓄積する方法と蓄積したデータをやり取りする方法。
-
アクターのメッセージを使ってデータの蓄積と取得の順序を制御する方法。
-
上述の件に関連して、メッセージのやり取り(メッセージの受信順序)で排他制御と同等の効果を出す方法。
上記処理部でWebシステムのMVC構成を例に出すなら、以下の構成に該当するものとみなして処理を構築しています。
- 「非アクタープロセス」:Viewの位置付け
- 「中継アクター」:Controllerの位置付け
- 「蓄積アクター」:Modelの位置付け
シーケンス図について
sequenceDiagram participant Main as 非アクタープロセス participant Relay as 中継アクター participant Storage as 蓄積アクター participant Logger as ログ収集アクター
既生成アクター activate Main Main-->>Main: アクターシステムを取得!(STT) Main->>Storage: 蓄積アクターを生成 (A01)
起動パラメータ(initialId: Int, maxData: Int) activate Storage deactivate Storage Main->>Relay: 中継アクターを生成 (A02)
起動パラメータ(蓄積アクター) activate Relay deactivate Relay Main->>Logger: ログ出力 (L01) activate Logger deactivate Logger Main->>Relay: メッセージ送信:Start (S01) activate Relay Relay->>Storage: メッセージ送信:AddData("User data 1") (S02) deactivate Relay activate Storage Storage->>Relay: 応答メッセージ受信:AddDataResponse(success = true) (R02) deactivate Storage activate Relay Relay->>Logger: ログ出力 (L02) deactivate Relay activate Logger deactivate Logger activate Relay Relay->>Storage: メッセージ送信:AddData("User data 2") (S03) deactivate Relay activate Storage Storage->>Relay: 応答メッセージ受信:AddDataResponse(success = true) (R03) deactivate Storage activate Relay Relay->>Logger: ログ出力 (L03) deactivate Relay activate Logger deactivate Logger activate Relay Relay->>Storage: メッセージ送信:GetData (S04) deactivate Relay activate Storage Storage->>Relay: 応答メッセージ受信:GetDataResponse(dataList) (R04) deactivate Storage activate Relay Relay->>Logger: ログ出力 (L04) deactivate Relay activate Logger deactivate Logger activate Relay Relay->>Main: 応答メッセージ受信:StartResponse(dataList) (R01) deactivate Relay Main->>Logger: ログ出力 (L05) activate Logger deactivate Logger Main-->>Main: アクターシステムを終了 (END) deactivate Main
シーケンス説明
(STT) アクターシステムを取得。
(A01) 非アクタープロセスが蓄積アクターを生成。
(A02) 非アクタープロセスが中継アクターを生成。
(L01) 非アクタープロセスが処理開始を指すメッセージをログに出力。
(S01) 非アクタープロセスが中継アクターにStartを送信。
(S02) 中継アクターが蓄積アクターにAddData("User data 1")を送信。
(R02) 蓄積アクターがデータを追加し、AddDataResponse(success = true)を中継アクターに返信。
(L02) 中継アクターが結果をログに出力。
(S03) 中継アクターが蓄積アクターにAddData("User data 2")を送信。
(R03) 蓄積アクターがデータを追加し、AddDataResponse(success = true)を中継アクターに返信。
(L03) 中継アクターが結果をログに出力。
(S04) 中継アクターが蓄積アクターにGetDataを送信。
(R04) 蓄積アクターがGetDataResponse(dataList)を中継アクターに返信。
(L04) 中継アクターがdataListをログに出力。
(R01) 中継アクターが非アクタープロセスにStartResponse(dataList)を送信。
(L05) 非アクタープロセスが受信内容をログに出力。
(End)アクターシステムを終了
上記シーケンス図で縦線が太い部分は、動作スレッドが物理的にリソースを保持している期間を示しています。 非アクタープロセスとアクターが動作するスレッドは物理的に別なので、非アクタープロセスは中継アクターにメッセージを送信してからメッセージを受信するまでは別の作業を実施させることが可能です。 スレッドの空き時間に別の作業を行えるようにすることは、スレッドに対する作業効率を上げることにつながります。
コーディング例:
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}import akka.actor.typed.scaladsl.{Behaviors, AskPattern}import akka.util.Timeout
import scala.concurrent.Futureimport scala.concurrent.duration._import scala.util.{Failure, Success}// もしアクタ-外からアクターのログシステムにメッセージを出す際には以下のロガーをインポートして使用する。//import org.slf4j.LoggerFactory
// アクターで送受信するメッセージの定義//*************************************sealed trait ReceptionToRellaycase class Start(replyTo: ActorRef[startResponse]) extends ReceptionToRellaycase class startResponse(data: List[(Int, String)])case class AddData(data: String, replyTo: ActorRef[AddDataResponse]) extends ReceptionToRellaycase class AddDataResponse(success: Boolean) extends ReceptionToRellaycase class GetData(replyTo: ActorRef[GetDataResponse]) extends ReceptionToRellaycase class GetDataResponse(dataList: List[(Int, String)]) extends ReceptionToRellay
sealed trait ReceptionToStoragecase class AddDataToStorage(data: String, replyTo: ActorRef[AddDataResponse]) extends ReceptionToStoragecase class GetDataFromStorage(replyTo: ActorRef[GetDataResponse]) extends ReceptionToStorage
//---------------------------------------------------// 上記定義でアクターの各受信ポイントを示すトレイト// (JAVAでいうインタフェースのようなもの)を定義している。// このトレイトをアクターのメッセージ受信ポイントで// 用いることで、そのアクターで必要なメッセージが何かを// 明確にすることができ、コンパイル時に対応不足の// メッセージが判明したり、不要なメッセージを受信することが// なくなるため、冗長な処理を記載しなくて済むようになる(リスコフの置換原則の応用)。// またアクターに受信するメッセージを増やす際に処置抜けを防いだりできる。//---------------------------------------------------
//*******************// 中継アクターの処理//*******************object RellayActor { // applyというメソッドはjavaでいうところの // ファクトリメソッドのようなものだと思ってください。 // このオブジェクトを実行すると、結果として、Behavior型のオブジェクトを渡すことになります。 def apply(storageActor: ActorRef[ReceptionToStorage]): Behavior[ReceptionToRellay] = Behaviors.setup { context => updatedBehavior(RelayState(None, storageActor)) }
case class RelayState(replyTo: Option[ActorRef[startResponse]], storageActor: ActorRef[ReceptionToStorage])
def updatedBehavior(state: RelayState): Behavior[ReceptionToRellay] = Behaviors.receive { (context, message) => // このアクターが受信したメッセージの処理を各メッセージ事に記述します。 message match { // 非アクタープロセスからのメッセージを処理 case Start(replyTo) => context.log.info("Received Start message") val newState = state.copy(replyTo = Some(replyTo)) // 蓄積アクターに蓄積データのメッセージを2回送信 state.storageActor ! AddDataToStorage("User data 1", context.self) // 蓄積アクターへのメッセージ送信、シーケンス説明(S02) state.storageActor ! AddDataToStorage("User data 2", context.self) // 蓄積アクターへのメッセージ送信、シーケンス説明(S03) // 蓄積アクターに蓄積データを要求 state.storageActor ! GetDataFromStorage(context.self) // 蓄積アクターへのメッセージ送信、シーケンス説明(S04) updatedBehavior(newState)
// 蓄積アクターからの応答(蓄積依頼の結果) case AddDataResponse(success) => context.log.info(s"Add data success: $success")// 蓄積アクターからのメッセージ受信、シーケンス説明(L02),(L03) Behaviors.same
// 蓄積アクターからの応答(蓄積データ取得) case GetDataResponse(dataList) => context.log.info(s"Retrieved data: $dataList")// 生成したアクターからのメッセージ受信、シーケンス説明(L04) // 保存されたreplyToに応答を返す state.replyTo.foreach(_ ! startResponse(dataList)) // Main処理へのメッセージ送信、シーケンス説明 (R01) updatedBehavior(state.copy(replyTo = None)) } }}
//*******************// 蓄積アクターの処理//*******************object StorageActor { // Scalaではapplyメソッドはそのクラスのファクトリメソッドと同じ動作となる。 def apply(initialId: Int, maxData: Int): Behavior[ReceptionToStorage] = // 以降で定義するアクターの振る舞い(どのようなメッセージを受けて処理するか)とか、 // そのアクターで保持する状態を定義する。 Behaviors.setup { context => // アクターの状態 var stateId = initialId var dataList = List.empty[(Int, String)]
Behaviors.receiveMessage { case AddData(data, replyTo) => // データの追加処理 if (dataList.size < maxData) { dataList = dataList :+ (stateId, data) stateId += 1 // 状態IDのインクリメント replyTo ! AddDataResponse(success = true)// 処理正常時、送信元アクターへのメッセージ返信、シーケンス説明(R02),(R03) } else { replyTo ! AddDataResponse(success = false)// 処理異常時、送信元アクターへのメッセージ返信、シーケンス説明(R02),(R03)に該当 } Behaviors.same
case GetData(replyTo) => // データの取得処理 replyTo ! GetDataResponse(dataList)// 送信元アクターへのメッセージ返信、シーケンス説明(R04) Behaviors.same } }}
//***********// メイン処理//***********object Main extends App {
// StorageActorの初期状態のIDと最大データ数を定義 val initialId = 0 val maxData = 100
val system: ActorSystem[ReceptionToRellay] = ActorSystem(Behaviors.setup[ReceptionToRellay] { context => // シーケンス説明番号(STT) // 蓄積アクターを生成 val storageActor = context.spawn(StorageActor(initialId,maxData), "StorageActor") // シーケンス説明番号(A01)
// 中継アクターを生成 val rellayActor = context.spawn(RellayActor(storageActor), "RellayActor") // シーケンス説明番号(A02)
// タイムアウトの設定 implicit val timeout: Timeout = 3.seconds implicit val scheduler = context.system.scheduler import context.executionContext // FutureのExecutionContext
// 中継アクターにメッセージを送信し、Futureで応答を受け取る val responseFuture: Future[RellayResponse] = rellayActor.ask(ref => Start(ref))
// 応答の処理 responseFuture.onComplete { case Success(RellayResponse(data)) => context.log.info(s"Received data in Main: $data")// シーケンス説明番号(L05) // アクターシステムを終了 context.system.terminate()// シーケンス説明番号(End)
case Failure(exception) => context.log.error(s"Failed to receive response: ${exception.getMessage}")// シーケンス説明番号(L05) // エラー時にもアクターシステムを終了 context.system.terminate()// シーケンス説明番号(End) }
// 以降に、他の作業を実行することができます(ここでは特に何もしていません)
Behaviors.empty }, "MainSystem")}
上記処理を実行したログ出力イメージ
[INFO] Received Start message[INFO] Add data success: true[INFO] Add data success: true[INFO] Retrieved data: List((1, "User data 1"), (2, "User data 2"))
なおこの処理実行時にはsbtに以下のライブラリ指定が必要です。
libraryDependencies += "com.typesafe.akka" %% "akka-actr-typed" % "2.8.0"
Scalaを学んでいて再発見したり、学んだ点
私がScalaを通して学んだことを以下に提示します。
- 関数型言語の特徴が理解できました。
- 関数型言語の「圏論」や「モナド」について学ぶことができました。
- モナド構造を持つオブジェクトはどの言語にも取り入れられており、非常に有益な機能を提供している点が理解できた。
- Akkaで取り入れている理論で並行並列実行するための理論と、WindowsやWebの動作で核となるメッセージドリブンの動作原理の関係性が私の頭の中でつながりました。
- 上記記事の内容以外にPlayというWebフレームワークを使ってWebプログラムを作ってみましたが、ライブラリ自身にWebサーバ機能も組み込まれており、そのままプログラムを実行することができます。これは非常に便利に感じました。また、このライブラリにはH2というオンメモリDBが含まれており、DB操作が非常に楽にできます。
- 並行処理と並列処理の違いは、複数のタスクをどのように扱うかという点で異なっています。
- 並行処理(Concurrency)は、複数のタスクが同時に進行しているように見せることに焦点を当てた処理です。
- 並列処理(Parallelism)は、OSやハードウェアの助力を得て、複数のタスクを文字通り並列に動作させることを指します。
Scala勉強のため参照したサイト
主に、以下のサイトを参考にしました。他にもためになるサイトはたくさんありますので、自分に合ったサイトを探してみてください。
A programming language that scales with you: from small scripts to large multiplatform application ⧉
Learn Scala ⧉
Scastie ⧉
Play 2.6.x ドキュメント ⧉
sbt:A simple build tool ⧉
AKKA:Build, Operate, and Secure Distributed Applications ⧉
Learning Akka Typed from Classic ⧉
Tour of Akka Typed: Protocols and Behaviors ⧉
皆さんも既存の言語だけでなく、新しい言語に挑戦してみてはいかがですか?きっと今まで得た知識の整理ができてよいのではないかと思います。