CLOVER🍀

That was when it all began.

Spring Boot × Hazelcastで、分散ExecutorService

Hazelcastが持っているSpringとの統合機能を、Spring Bootの上に乗せて遊んでみました。

Spring Integration
http://docs.hazelcast.org/docs/3.4/manual/html-single/hazelcast-documentation.html#spring-integration

これ、ドキュメントだけ読んでいるとXMLの設定ファイルを書かなくてはいけなさそうに思っていたのですが、hazelcast-springのソースを見ているとSpringのJavaConfigに取り込めそうだったので、その路線でいってみました。

まあ、ソースを見ていても最初はSpringのためのXMLパーサーらしきものがゴロゴロでてきたので、ちょっと不安でしたが。結果は、なんとかなりました。

今回試してみたのは、こんな感じです。

  • Spring BootでRestControllerをアプリケーションのエントリポイントにしたWebアプリを作る
  • Hazelcastのインスタンスを、JavaConfigで定義する
  • リクエストを受け取ったRestControllerにHazelcastのインスタンスをAutowiredし、分散ExecutorServiceで処理を行う
  • 分散ExecutorServiceで使うCallableに対して、SpringのBeanをAutowiredでインジェクションする

最後がポイントですね。普通にHazelcastのExecutorServiceとして使う分にはただのCallableなところを、Springと統合します。

そのために使った機能は、こちらです。

Spring Managed Context with @SpringAware
http://docs.hazelcast.org/docs/3.4/manual/html-single/hazelcast-documentation.html#spring-managed-context-with-springaware

ソースコードを見ていたら単品で使えそうだったので、引っこ抜いてみました的な。

では、いってみましょう。

pom.xml

用意したpom.xmlは、このような形。
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.littlewings</groupId>
  <artifactId>spring-boot-hazelcast-distexec</artifactId>
  <packaging>jar</packaging>
  <version>0.0.1-SNAPSHOT</version>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.2.2.RELEASE</version>
  </parent>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>        
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>com.hazelcast</groupId>
      <artifactId>hazelcast</artifactId>
      <version>3.4.1</version>
    </dependency>
    <dependency>
      <groupId>com.hazelcast</groupId>
      <artifactId>hazelcast-spring</artifactId>
      <version>3.4.1</version>
    </dependency>

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.0</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
          <args>
            <arg>-Xlint</arg>
            <arg>-unchecked</arg>
            <arg>-deprecation</arg>
            <arg>-feature</arg>
          </args>
          <recompileMode>incremental</recompileMode>
        </configuration>
      </plugin>
    </plugins>
  </build>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.8</java.version>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <scala.major.version>2.11</scala.major.version>
    <scala.version>${scala.major.version}.6</scala.version>
  </properties>
</project>

Scalaを使っていることは置いておいて、特徴的な依存関係はこちらですね。

    <dependency>
      <groupId>com.hazelcast</groupId>
      <artifactId>hazelcast</artifactId>
      <version>3.4.1</version>
    </dependency>
    <dependency>
      <groupId>com.hazelcast</groupId>
      <artifactId>hazelcast-spring</artifactId>
      <version>3.4.1</version>
    </dependency>

「hazelcast-spring」がHazelcastとSpringの連携のために必要な依存関係ですが、これとは別にHazelcast本体も必要です。最初、入れずにいたら見事にコケました。「hazelcast-spring」での依存関係で、「hazelcast」がprovidedなんですよねぇ。

サンプルのService

ここからは、下の層から積み上げて見ていきます。何らかの処理をするServiceを用意するのですが、今回は受け取った文字列をデコレーションするものを作りました。
src/main/scala/org/littlewings/springboot/hazelcast/service/MessageService.scala

package org.littlewings.springboot.hazelcast.service

import org.springframework.stereotype.Service

@Service
class MessageService {
  def decorate(message: String) = s"*** $message ***"
}

内容は、あまり気にしない…。

Callable

続いて、HazelcastのExecutorServiceで使用するCallableを作成します。

ここでは、先ほど作成したServiceをAutowiredでインジェクションして、コンストラクタ引数にはデコレーション対象のメッセージを受け取る形にしました。
src/main/scala/org/littlewings/springboot/hazelcast/service/MessageCallable.scala

package org.littlewings.springboot.hazelcast.service

import java.util.concurrent.Callable

import com.hazelcast.spring.context.SpringAware
import org.springframework.beans.factory.annotation.Autowired

@SpringAware
@SerialVersionUID(1L)
class MessageCallable(val message: String) extends Callable[String] with Serializable {
  @transient
  @Autowired
  private var messageService: MessageService = _

  @throws(classOf[Exception])
  override def call(): String = messageService.decorate(message)
}

callメソッドで、MessageServiceを使用します。

HazelcastのExecutorServiceで使うCallableは、単純に作るだけならこちらを見て作ればいいのですが、

Executor Service
http://docs.hazelcast.org/docs/3.4/manual/html-single/hazelcast-documentation.html#executor-service

Springと連携するために@SpringAwareアノテーションを付与しています。これを使うことで、Springで管理しているBeanがインジェクションされます。

@SpringAware
@SerialVersionUID(1L)
class MessageCallable(val message: String) extends Callable[String] with Serializable {

ApplicationContextAwareインターフェースを実装することでSpringのApplicationContextを受け取れるようですが…今回はパスしました。

また、インジェクションするフィールドはtransientにしています。

  @transient
  @Autowired
  private var messageService: MessageService = _

その他の注意事項としては、CallableそのものはSerializableであるということですね。

RestController

リクエストを受け取るRestController。パラメータで「message」を受け取り、コンストラクタインジェクションしたHazelcastのインスタンスから、先に説明したCallableのインスタンスを使って、クラスタの全メンバーに対して分散処理を実行します。
src/main/scala/org/littlewings/springboot/hazelcast/rest/DistExecController.scala

package org.littlewings.springboot.hazelcast.rest

import scala.collection.JavaConverters._

import com.hazelcast.core.HazelcastInstance
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.web.bind.annotation.{ RestController, RequestMapping, RequestMethod, RequestParam }

import org.littlewings.springboot.hazelcast.service.MessageCallable

@RestController
class DistExecController @Autowired()(private val hazelcast: HazelcastInstance) {
  @RequestMapping(Array("/distexec"))
  def distexec(@RequestParam message: String): String = {
    val executor = hazelcast.getExecutorService("default")
    val callable = new MessageCallable(message)

    val result = executor.submitToAllMembers(callable)
    result
      .asScala
      .map { case (k, v) => s"$k: ${v.get}" }
      .mkString(System.lineSeparator)
  }
}

戻り値は、クラスタのメンバーの文字列表現、Callableの戻り値を改行でつなげたものです。

JavaConfig

で、これだけだとHazelcastをSpringの管理対象にしていないので、ちょっと動きません。

ここをつなぎ合わせるために、JavaConfigを用意。
src/main/scala/org/littlewings/springboot/hazelcast/config/AppConfig.scala

package org.littlewings.springboot.hazelcast.config

import com.hazelcast.config.Config
import com.hazelcast.core.{ Hazelcast, HazelcastInstance }
import com.hazelcast.spring.context.SpringManagedContext
import org.springframework.context.annotation.{ Bean, Configuration }

@Configuration
class AppConfig {
  @Bean
  def springManagedContext: SpringManagedContext = new SpringManagedContext

  @Bean(destroyMethod = "shutdown")
  def hazelcast: HazelcastInstance = {
    val config = new Config
    config.setManagedContext(springManagedContext)
    Hazelcast.newHazelcastInstance(config)
  }
}

ポイントは、SpringManageContextをSpringの管理対象として、これをHazelcastのConfigに設定することですね。これで、先ほどCallableに設定した@SpringAwareアノテーションなどが機能することになります。

Mainクラス

最後は、アプリケーションを起動するためのMainクラス。
src/main/scala/org/littlewings/springboot/hazelcast/App.scala

package org.littlewings.springboot.hazelcast

import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.SpringBootApplication

import org.littlewings.springboot.hazelcast.config.AppConfig

object App {
  def main(args: Array[String]): Unit =
    SpringApplication.run(classOf[App], args: _*)
}

@SpringBootApplication
class App

ここは、普通に…。

パッケージングと実行

それでは、動かしてみましょう。パッケージングして

$ mvn package

実行。今回は、3つのNodeで実行しました。

# Node 1
$ java -jar target/spring-boot-hazelcast-distexec-0.0.1-SNAPSHOT.jar --server.port=8080

# Node 2
$ java -jar target/spring-boot-hazelcast-distexec-0.0.1-SNAPSHOT.jar --server.port=8081

# Node 3
$ java -jar target/spring-boot-hazelcast-distexec-0.0.1-SNAPSHOT.jar --server.port=8082

コンソール上で、クラスタが構成されたことを確認します。

Members [3] {
	Member [192.168.254.129]:5701 this
	Member [192.168.254.129]:5702
	Member [192.168.254.129]:5703
}

あとは、リクエストを投げ込んでみます。

$ curl http://localhost:8080/distexec?message=HelloCluster
Member [192.168.254.129]:5703: *** HelloCluster ***
Member [192.168.254.129]:5702: *** HelloCluster ***
Member [192.168.254.129]:5701 this: *** HelloCluster ***

特に面白い結果ではないですが、3つのNodeが仕事をして値を返してくれたようです。

Springの設定ファイルを使わなかった時点でけっこうてこずるかと思ったのですが、意外とあっさりでした。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-distexec-spring-boot