「Apache Spark」×「Scala」で分散処理入門

Apache Sparkとは

Apache Sparkとは何かというとオープンソースの分散処理フレームワークです。

分散処理というとhadoopが有名ですが、hadoopがhdfsと呼ばれる独自のファイルシステムを介して 処理を実行するのに対して、
Sparkは「RDD(Resilient Distributed Dataset)」と呼ばれる耐障害耐性分散可能なデータ・セットをオンメモリで実行できるために、 高速な分散処理が実現できます。

Apache Sparkの構成

sparkの構成は以下のようになっています。

  • Spark Core Sparkの基本機能を提供します。RDDと呼ばれる、耐障害耐性分散可能なデータ・セットを提供します。
  • Spark Streaming データストリームの処理を提供します。ツイッターからリアルタイムデータの取得などに使えます。
  • Spark SQL 構造化データに対するアクセス機能を提供します。hiveSQLやクエリを使ってJSONなども扱えます。
  • Mlib 汎用的な機械学習ライブラリを取得します。word2vecを使った類似後分類とかできます。
  • Graph X グラフ理論に基づく計算を提供します。ソーシャルグラフを扱う場合に役立ちます。

環境構築

ではサンプルアプリケーションの構築に入ります。 以下の環境で構築しました。Sparkアプリケーションは

  • scala
  • java
  • python
  • R言語

の4種類で記述することができます。 今回は、Sparkの実装にscalaが用いられていることもあり、 一番ドキュメントが豊富なscalaを用いることにしました。 また、ビルドについては比較的簡単そうなsbtを使うことにしました。 実行環境は下記のとおりです。

  • MAC OS X 10.11.16
  • scala 2.11.8
  • spark 2.1.1
  • sbt 0.13.13
  • メモリ: 8GB
  • CPU: 2.7 GHz Intel Core i5

インストール

今回使用するものは全てhome brewで入れました。

$ brew install apache-spark$ brew install scala$ brew install sbt

また、ここで深くは触れませんが、上記インストールにより3種類の対話型実行環境も同時にインストールされるので デバックの際は結構便利です。

  • spark-shell (Scala)
  • pyspark (python)
  • sparkR (R言語)

余談ですが、

$ brew install spark

でも通ってしまうことが注意です。
関係のないメールソフトが入ります
https://sparkmailapp.com/ja

その後、Sparkのホームディレクトリの環境変数を設定しておきます

export Spark_HOME=Spark_HOME-/usr/local/Celler/apache-spark/x.y.z

※ x.y.zはインストールするバージョンによって変わります

実装

ビルド環境の構築

最初にsbtのビルド環境を整えます。
ビルドが成功するかを検証するためにワードカウントプログラムを作ります。
今回は以下のようなディレクトリ構成で作りました。
形態素解析エンジンにはscalaと親和性の高いkuromojiを用いました。

ディレクトリ構成

WordCount/├── build.sbt(ビルド方法を定義するsbtファイル)├── input.txt(ワードカウントする対象の入力ファイル)├── output/ (ワードカウントの結果)│├── project/ (sbtの追加設定を入れるファイル)│   └── assembly.sbt (sbtのプラグイン)└── src/   └──main/     └── scala/     └── jp/       └── excite/        └── news/          └── WordCountApp.scala

build.sbt

今回は下記のビルド設定で行いました。 sbtビルドファイルの内容としては

  • アプリケーションの名前
  • アプリケーションのバージョン
  • 使用するscalaのバージョン
  • ライブラリの依存関係。 今回使用するSpark関係のライブラリ、形態素解析エンジンのkuromojiなどが該当。
  • 通常のsbtに加えて、プラグインのsbt-assemblyに関する設定
  • アプリケーションのエントリーポイント用のクラス名

が下記のように記載されてます。

name := "WordCountApp"version := "1.0.0"scalaVersion := "2.11.8"resolvers += "Atilika Open Source repository" at "http://www.atilika.org/nexus/content/repositories/atilika"libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"libraryDependencies += "org.atilika.kuromoji" % "kuromoji" % "0.7.7"assemblyMergeStrategy in assembly := {  case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first  case PathList(ps @ _*) if ps.last endsWith ".properties" => MergeStrategy.first  case PathList(ps @ _*) if ps.last endsWith ".xml" => MergeStrategy.first  case PathList(ps @ _*) if ps.last endsWith ".types" => MergeStrategy.first  case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first  case "application.conf"                            => MergeStrategy.concat  case "unwanted.txt"                                => MergeStrategy.discard  case x =>    val oldStrategy = (assemblyMergeStrategy in assembly).value    oldStrategy(x)}mainClass in assembly := Some("WordCountApp")

sbt-assemblyに関して

プラグインの読み込みは下記を記載するファイル(project/assembly.sbt)を置くだけでOKです。

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")

これを使用することで、生成するjarファイルに依存するパッケージも全部入るために
並列処理を行うためのクラスタ環境へのデプロイ(spark-submit)が1つのファイル配布のみで済みます。

今回はクラスタ環境上で実行はしないですが、
これをやらないとspark-submitをする際のオプションが手間になります。

ただし、sbt-assemblyを使う際にspark内部でパッケージのコンフリクトには注意が必要です。
うまくマージ設定をしてあげないとビルドに失敗です。
そこでコンフリクトを解消するMergeStrategyを使用してあります。

設定はgithubの内容をコピって使用しております。
https://github.com/sbt/sbt-assembly

assemblyMergeStrategy in assembly := {  case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first  case PathList(ps @ _*) if ps.last endsWith ".properties" => MergeStrategy.first  case PathList(ps @ _*) if ps.last endsWith ".xml" => MergeStrategy.first  case PathList(ps @ _*) if ps.last endsWith ".types" => MergeStrategy.first  case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first  case "application.conf"                            => MergeStrategy.concat  case "unwanted.txt"                                => MergeStrategy.discard  case x =>    val oldStrategy = (assemblyMergeStrategy in assembly).value    oldStrategy(x)}

ワードカウント

以下にワードカウントを行うサンプルプログラムを記載します。
sparkのプログラムは実装元のscalaが関数型言語ということもあり、
処理は関数オブジェクト(クロージャー)を使って記述していきます。

package jp.excite.news import java.util.regex.{Matcher, Pattern}import scala.collection.convert.WrapAsScala._import org.apache.spark.SparkConfimport org.apache.spark.SparkContext._import org.apache.spark.SparkContextimport org.atilika.kuromoji.Tokenizerimport org.atilika.kuromoji.Token object WordCountApp{def main(args: Array[String]) {    //スパークの環境設定    val sparkConf = new SparkConf().setMaster("local[4]").setAppName("WordCount App")    val sc = new SparkContext(sparkConf)    //kuromojiのトークナイザ    val tokenizer = Tokenizer.builder.mode(Tokenizer.Mode.NORMAL).build()    //テキストファイルから1行ずつ読み込み。名詞を配列に分解する。    //テキストファイルからRDDオブジェクトを取得する。        val input = sc.textFile("input.txt").flatMap(line => {        val tokens : java.util.List[Token] = Tokenizer.builder().build().tokenize(line)        val output : scala.collection.mutable.ArrayBuffer[String] = new collection.mutable.ArrayBuffer[String]()        tokens.foreach(token => {            if(token.getAllFeatures().indexOf("名詞") != -1) {      output += token.getSurfaceForm()        }})        output// return    })    //ワードカウントを行う。数える名詞をキーにし、キーを元に加算処理を行う。    val wordCounts = input.map(x => (x, 1L)).reduceByKey((x, y)=> x + y)    //降順に単語を列挙して出力する。    val wordCounts = input.map(x => (x, 1L)).reduceByKey((x, y)=> x + y)    val output = wordCounts.map( x => (x._2, x._1)).sortByKey(false).saveAsTextFile("ouput")    }}

詳しくは公式のドキュメントを見たほうが早いですが、このプログラムの大体の流れは以下の通りです
https://spark.apache.org/docs/latest/quick-start.html

  1. input.txt1行ずつ入力 (ex 今日は晴れ。明日も晴れ。)
  2. 読み込んだ文字列を形態素解析で分解し名詞のみ配列に。(ex [今日, 晴れ, 明日, 晴れ])
  3. 数えるために名詞をキーとした(名詞, 1)タプル(組み)に変換 (ex [(今日,1), (晴れ,1), (明日,1), (晴れ,1)])
  4. 同じ名詞(キー)に対して加算をする。(ex [(今日,1), (晴れ,2), (明日,1)])
  5. キーでソートするためにキーと値を逆にする。(ex [(1,今日), (2,晴れ), (1,明日)])
  6. キーでソートする(デフォルトは昇順) (ex [(2,晴れ), (1,今日) ,(1,明日)])
  7. 上位10個のデータを取得し、それぞれを出力。

これらの処理クロージャーを使ってふるまいををRDD変換関数

  • map (別の型への変換)
  • flatMap (別の型への変換。要素数は複数可)
  • reduceByKey (キー毎に要素同士を演算する)
  • etc…

に記述することで分散処理を比較的簡単に書くことができます。

実行

環境、プログラムがそろったので実行してみます。
以下の入力ファイルに対してワードカウントを実行してみます。

入力ファイルの準備

内容はウィキペディアから引用しました下記の内容のinput.txtを用意します。

センサやデータベースなどから、ある程度の数のサンプルデータ集合を入力して解析を行い、そのデータから有用な規則、ルール、知識表現、判断基準などを抽出し、アルゴリズムを発展させる。なお、データ集合を解析するので、統計学との関連が深い。そのアルゴリズムは、第一にそのデータが生成した潜在的機構の特徴を捉え、複雑な関係を識別(すなわち定量化)する。第二にその識別したパターンを用いて、新たなデータについて予測を行う。データは、観測された変数群のとりうる関係の具体例と見ることができる。一方、アルゴリズムは、機械学習者として観測されたデータの部分(訓練例などと呼ぶ)を学習することで、データに潜在する確率分布の特徴を捉え、学習によって得た知識を用いて、新たな入力データについて知的な決定を行う。1つの根本的な課題は、観測例に全てのとりうる挙動例を示すあらゆる入力を含めるのは(多くの実用的な関心事の場合)大きすぎて現実的でないという点である。したがって、学習者は与えられた例を一般化して、新たなデータ入力から有用な出力を生成しなければならない[1]。光学文字認識では、印刷された活字を事前の例に基づいて自動認識する。これは典型的な機械学習の応用例である。機械学習は検索エンジン、医療診断、スパムメールの検出、金融市場の予測、DNA配列の分類、音声認識や文字認識などのパターン認識、ゲーム戦略、ロボット、など幅広い分野で用いられている。応用分野の特性に応じて学習手法も適切に選択する必要があり、様々な手法が提案されている[2]。これらの手法は、テストデータにおいての検出・予測性能において評価されることがある。大量のデータから従来にない知見を得るというビッグデータの時代では、特にその応用に期待が集まっている。
https://ja.wikipedia.org/wiki/%E6%A9%9F%E6%A2%B0%E5%AD%A6%E7%BF%92

実行方法

今回実行するにあたり、色々方式があるのですが一番単純なsbtコンソール上で実行をしてみます。 方法は簡単でbuild.sbtがおいてあるプロジェクトルートで

$ sbt run

とするだけです。

ちなみに上記runがきちんと動作するようであれば

$ sbt assembly 

とすればSparkのクラスタ環境へのデプロイ(spark-submit)用のjarファイルが作られます。

出力

ワードカウントを実行すると output以下にpart-00000x(xは数字)のファイルが作られています。
内容を見てみると下記のように数字が若いファイルから順に降順にデータが格納されています。

(13,データ)(7,学習)(7,例)(5,的)(5,認識)(4,入力)(3,手法)(3,応用)(3,予測)(3,観測)(3,こと)︙

まとめ

今回はワードカウントプログラムをscalaで書いてみたが如何でしたでしょうか?
私は今回が初めてscalaやsbtを触ったのですが比較的分散処理の記述は扱いやすいように思えました。
大学時代にhadoopで、悪銭苦闘したときとくらべるとわりと自由にかけて楽しかったです。
同じapacheプロジェクトのsolrとも連携が出来るようで、機械学習以外でも大規模なデータ処理全般に使えそうです。 今回は時間の関係で扱えなかったですが、wikipediaをコーパスに類義語作成とかも出来るみたいなので、そっち方面とかで使ってみたいです。

参考文献

コメントする