ããã¯ããªã«ãããããŠæžãããã®ïŒ
Infinispanã®Server Taskã®ãœãŒã¹ã³ãŒããèŠãŠããããåã«èžãã§ããåé¡ãããããçŽã£ãŠããã®ã«æ°ã¥ããŸããŠã
ãã£ãããªã®ã§ã確èªããŠãããããªãšã
åã«èžãã§ããåé¡
ãã®ãšã³ããªãŒãæžããæã®è©±ãªã®ã§ããã以äžã®ãããªãšããã«åŒã£ããããŸããã
- Server Taskã®å®è¡ã«ããããã¯ã©ã€ã¢ã³ãããæ¥ç¶ãããŠãŒã¶ãŒã«ADMINæš©éãå¿ èŠ
- Server Taskã®å®è¡ã¢ãŒããå
šããŒãïŒ
ALL_NODES
ïŒã«ããŠããã€MarshallingãProtoStreamã«ãããšå®è¡ã«å€±æãã - Server Taskã®å®è¡ã¢ãŒããå
šããŒãïŒ
ALL_NODES
ïŒã«ãããšãæž¡ãããã©ã¡ãŒã¿ãŒããã¹ãŠString
ã«ãªã
Infinispan Server 12.1でProtoStreamでのMarshallingを使いつつ、Server Taskを実行する - CLOVER🍀
å ããŠãServer Taskã®ã€ã³ã¹ã¿ã³ã¹ã¯å®ã¯ã·ã³ã°ã«ãã³ã§ãã£ãããããšãã話ããããŸããã
æž¡ãããã©ã¡ãŒã¿ãŒããã¹ãŠString
ã«ãªããšããç¹ãé€ããŠããããã®åé¡ã¯ä¿®æ£ãããŠããããã§ãã
以äžã®issueã«å¯Ÿããä¿®æ£ã§å®è¡ã«ADMINæš©éãäžèŠã«ãªãã
[ISPN-14143] Task execution needs ADMIN permission additional to EXEC - Red Hat Issue Tracker
ãã¡ãã®issueã«å¯Ÿããä¿®æ£ã§å®è¡ã¢ãŒããå
šããŒãïŒALL_NOES
ïŒã§ãã£ãŠãå®è¡ã«å€±æããªããªã£ãããã§ãã
ãŸããå®è¡ã¢ãŒãã«ãã£ãŠæš©éãç°ãªããšããissueã®ä¿®æ£ã®é¢é£ã§
ã©ããServer Taskã®ã€ã³ã¹ã¿ã³ã¹ã®çæã«ã€ããŠæå®ãã§ããããã«ãªã£ãã¿ããã§ãã
ã€ã³ã¹ã¿ã³ã¹çæã«ã€ããŠã¯ãServerTask
ã€ã³ã¿ãŒãã§ãŒã¹ã®èŠªã€ã³ã¿ãŒãã§ãŒã¹ã§ãããTask
ã確èªããŸãã
Task (Infinispan JavaDoc 14.0.1.Final API)
Task#getInstantiationMode
ã§è¿ãå€ã§å¶åŸ¡ã§ããããã«ãªã£ãã¿ããã§ãã
æ»ãå€ã«äœ¿çšããã®ã¯TaskInstantiationMode
åæåã§ãããã©ã«ãã¯SHARED
ã§ã·ã³ã°ã«ãã³ãšãªãServer Taskã®ã€ã³ã¹ã¿ã³ã¹ã¯1床ã ã
äœæããããªã¯ãšã¹ããè·šãã§äœ¿ãåãããŸããISOLATED
ã¯Server Taskã®åŒã³åºãããšã«ã€ã³ã¹ã¿ã³ã¹ãäœæãããŸãã
TaskInstantiationMode (Infinispan JavaDoc 14.0.1.Final API)
Infinispan Serverã®ã¬ã€ãã«ãè¿œèšããããŸããã
ServerTask#setTaskContext
ã®èª¬æãèŠããšã以äžã®ããã«æžãããŠããŸãã
In most cases, implementations store this information locally and use it when tasks are actually executed. When using SHARED instantiation mode, the task should use a ThreadLocal to store the TaskContext for concurrent invocations.
TaskInstantiationMode
ãSHARED
ã®æã¯ãããã§æž¡ãããå€ãThreadLocal
ã§ä¿æããå¿
èŠããããšæžãããŠããŸãã
åãããŠããµã³ãã«ã³ãŒããThreadLocal
ã䜿ãããã«ä¿®æ£ãããŠããŸãã
package example; import org.infinispan.tasks.ServerTask; import org.infinispan.tasks.TaskContext; public class HelloTask implements ServerTask<String> { private static final ThreadLocal<TaskContext> taskContext = new ThreadLocal<>(); @Override public void setTaskContext(TaskContext ctx) { taskContext.set(ctx); } @Override public String call() throws Exception { TaskContext ctx = taskContext.get(); String name = (String) ctx.getParameters().get().get("name"); return "Hello " + name; } @Override public String getName() { return "hello-task"; } }
Server Taskã®ã€ã³ã¹ã¿ã³ã¹ã¯åã
ããã·ã³ã°ã«ãã³ã ã£ãã®ã§ãããããå¿
èŠãããããã«æã£ãŠããŸãããããã¥ã¡ã³ããä¿®æ£ãããã®ã§
çãåãããã§ããæ°ã«ãªããŸããã
ãã£ããä¿®æ£ãããã®ã§ãä»åã¯æš©éãšServer Taskã®ã€ã³ã¹ã¿ã³ã¹åã«ã€ããŠç¢ºèªããŠã¿ãããšæããŸãã
ãªããServer Taskã®å®è¡ã¢ãŒããå
šããŒãïŒALL_NODES
ïŒã«ãããšãæž¡ãããã©ã¡ãŒã¿ãŒããã¹ãŠString
ã«ãªããšããç¹ã«é¢ããŠã¯ã
å€ãããªãããïŒå€ããããããªãïŒïŒæ°ãããŸããã
ç°å¢
ä»åã®ç°å¢ã¯ããã¡ãã
$ java --version openjdk 17.0.4 2022-07-19 OpenJDK Runtime Environment (build 17.0.4+8-Ubuntu-120.04) OpenJDK 64-Bit Server VM (build 17.0.4+8-Ubuntu-120.04, mixed mode, sharing) $ mvn --version Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 17.0.4, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-131-generic", arch: "amd64", family: "unix"
Infinispan Serverã¯ã172.18.0.2ã172.18.0.4ã®3ã€ã®ããŒãã§åäœããã¯ã©ã¹ã¿ãŒãæ§æããŠãããã®ãšããŸãã
$ java --version openjdk 17.0.4.1 2022-08-12 OpenJDK Runtime Environment Temurin-17.0.4.1+1 (build 17.0.4.1+1) OpenJDK 64-Bit Server VM Temurin-17.0.4.1+1 (build 17.0.4.1+1, mixed mode, sharing) $ bin/server.sh --version Infinispan Server 14.0.1.Final (Flying Saucer) Copyright (C) Red Hat Inc. and/or its affiliates and other contributors License Apache License, v. 2.0. http://www.apache.org/licenses/LICENSE-2.0
èµ·åã³ãã³ãã¯ãã¡ãã
$ bin/server.sh \ -b 0.0.0.0 \ -Djgroups.tcp.address=`hostname -i`
ãé¡
ä»åã®ãé¡ã¯ã以äžã®ããã«ããŸãã
- Mavenãããžã§ã¯ããã²ãšã€äœæ
src/main/java
åŽã§Infinispan Serverã®Server Taskãäœæ- åŠçå
容ã¯åãã§ã
TaskInstantiationMode
ã®æå®ãããã©ã«ãïŒSHARED
ïŒãšISOLATED
ã®2ã€ã®Server Taskãäœæ - ã€ã³ã¹ã¿ã³ã¹åã®åæ°ã確èªããããã«ãåŒã³åºãããšã«ã«ãŠã³ãã¢ããããåŠçãå ¥ããŠãã
- åŠç察象ããŒãã¯ããã¹ãŠã®ããŒããšããŠMarshallingã®ç¢ºèªãè¡ã
- åŠçå
容ã¯åãã§ã
- åäœç¢ºèªã¯ãã¹ãã³ãŒãã§è¡ã
æºå
MavenäŸåé¢ä¿ãªã©ã
<properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> </properties> <dependencies> <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-tasks-api</artifactId> <version>14.0.1.Final</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-client-hotrod</artifactId> <version>14.0.1.Final</version> <scope>test</scope> </dependency> <!-- RemoteCacheManagerAdminã§äœ¿çšãinfinispan-tasks-apiã«å«ãŸããŠãã --> <!-- <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-core</artifactId> <version>14.0.1.Final</version> <scope>test</scope> </dependency> --> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>5.9.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.23.1</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.2</version> </plugin> </plugins> </build>
Infinispan Serverã®åNodeã«ã¯ã管çãŠãŒã¶ãŒããã³ã¢ããªã±ãŒã·ã§ã³ãŠãŒã¶ãŒãäœæããŠãããŸãã
$ bin/cli.sh user create -g admin -p password ispn-admin $ bin/cli.sh user create -g application -p password ispn-user
Server Taskãäœæãã
ã§ã¯ããŸãã¯Server TaskãäœæããŠãããŸãã
æåã¯TaskInstantiationMode#SHARED
ã®Server Taskããã®Server Taskã¯ãåŠçãå®è¡ããéã«1床ã ãã€ã³ã¹ã¿ã³ã¹åããã
ããšã¯ãªã¯ãšã¹ãããŸããã£ãŠã€ã³ã¹ã¿ã³ã¹ãåå©çšãããŸãã
src/main/java/org/littlewings/infinispan/remote/task/SharedInstanceSummarizeServerTask.java
package org.littlewings.infinispan.remote.task; import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.infinispan.Cache; import org.infinispan.stream.CacheCollectors; import org.infinispan.tasks.ServerTask; import org.infinispan.tasks.TaskContext; import org.infinispan.tasks.TaskExecutionMode; public class SharedInstanceSummarizeServerTask implements ServerTask<String> { AtomicInteger callCounter = new AtomicInteger(); ThreadLocal<TaskContext> threadLocalTaskContext = new ThreadLocal<>(); @Override public void setTaskContext(TaskContext taskContext) { this.threadLocalTaskContext.set(taskContext); } @Override public String call() throws Exception { TaskContext taskContext = threadLocalTaskContext.get(); try { Map<String, Object> parameters = taskContext.getParameters().orElse(Collections.emptyMap()); int called = callCounter.incrementAndGet(); @SuppressWarnings("unchecked") Cache<String, Integer> cache = (Cache<String, Integer>) taskContext.getCache().get(); int sum = cache.values().stream().collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(i -> i))); return String.format("%s, shared instance server task call count = %d, sum result = %d", parameters.get("message"), called, sum); } finally { threadLocalTaskContext.remove(); } } @Override public String getName() { return SharedInstanceSummarizeServerTask.class.getSimpleName(); } @Override public TaskExecutionMode getExecutionMode() { return TaskExecutionMode.ALL_NODES; } /* ããã©ã«ãã§ä»¥äžãšåã @Override public TaskInstantiationMode getInstantiationMode() { return TaskInstantiationMode.SHARED; } */ }
TaskInstantiationMode
ã¯ServerTask#getInstantiationMode
ã¡ãœããããªãŒããŒã©ã€ãããŠæå®ããå¿
èŠããããŸãããããã©ã«ãã
TaskInstantiationMode#SHARED
ãªã®ã§ä»åã¯ãã®ãŸãŸããããšæããŸãã
/* ããã©ã«ãã§ä»¥äžãšåã @Override public TaskInstantiationMode getInstantiationMode() { return TaskInstantiationMode.SHARED; } */
TaskContext
ã¯åæåŒã³åºããããããšã«åããããã¥ã¡ã³ãã«ç¿ã£ãŠThreadLocal
ã§ä¿æããŸãã
ThreadLocal<TaskContext> threadLocalTaskContext = new ThreadLocal<>(); @Override public void setTaskContext(TaskContext taskContext) { this.threadLocalTaskContext.set(taskContext); }
ä»åããã¥ã¡ã³ãã«æèšãããŸããããä»ãŸã§ãããããã¹ãã ã£ããã§ããããã
TaskContext
ã¯call
ã¡ãœããã®æåã«åãåºããŠã
@Override public String call() throws Exception { TaskContext taskContext = threadLocalTaskContext.get();
æåŸã«åé€ã
} finally {
threadLocalTaskContext.remove();
}
ããšã¯ãã€ã³ã¹ã¿ã³ã¹ã1床ããäœæãããŠããªãããšã確èªããããã«ãã«ãŠã³ã¿ãŒãçšæããŠãããŸãã
AtomicInteger callCounter = new AtomicInteger();
ããã§ãåŒã³åºãããšã«ã¡ãã»ãŒãžå ã®å€ãã«ãŠã³ãã¢ãããããŠããã¯ãã§ãã
return String.format("%s, shared instance server task call count = %d, sum result = %d", parameters.get("message"), called, sum);
å®è¡ããŒãã¯ãå šããŒãã察象ã§ãã
@Override public TaskExecutionMode getExecutionMode() { return TaskExecutionMode.ALL_NODES; }
ãªããæååã®éåä¿¡ã ãã§ã¯ãªããªã®ã§ãCache
ã䜿ã£ãã³ãŒããå
¥ããŠãããŸããã
@SuppressWarnings("unchecked") Cache<String, Integer> cache = (Cache<String, Integer>) taskContext.getCache().get(); int sum = cache.values().stream().collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(i -> i)));
ç¶ããŠã¯ãåŒã³åºãã®éœåºŠã€ã³ã¹ã¿ã³ã¹åãè¡ãServer TaskãTaskInstantiationMode#ISOLATED
ãæå®ããããŒãžã§ã³ã§ããã
src/main/java/org/littlewings/infinispan/remote/task/IsolatedInstanceSummarizeServerTask.java
package org.littlewings.infinispan.remote.task; import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.infinispan.Cache; import org.infinispan.stream.CacheCollectors; import org.infinispan.tasks.ServerTask; import org.infinispan.tasks.TaskContext; import org.infinispan.tasks.TaskExecutionMode; import org.infinispan.tasks.TaskInstantiationMode; public class IsolatedInstanceSummarizeServerTask implements ServerTask<String> { AtomicInteger callCounter = new AtomicInteger(); TaskContext taskContext; @Override public void setTaskContext(TaskContext taskContext) { this.taskContext = taskContext; } @Override public String call() throws Exception { Map<String, Object> parameters = taskContext.getParameters().orElse(Collections.emptyMap()); int called = callCounter.incrementAndGet(); @SuppressWarnings("unchecked") Cache<String, Integer> cache = (Cache<String, Integer>) taskContext.getCache().get(); int sum = cache.values().stream().collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(i -> i))); return String.format("%s, isolated instance server task call count = %d, sum result = %d", parameters.get("message"), called, sum); } @Override public String getName() { return IsolatedInstanceSummarizeServerTask.class.getSimpleName(); } @Override public TaskExecutionMode getExecutionMode() { return TaskExecutionMode.ALL_NODES; } @Override public TaskInstantiationMode getInstantiationMode() { return TaskInstantiationMode.ISOLATED; } }
å
ã»ã©ãšã®éãã¯ããŸãã¯ServerTask#getInstantiationMode
ã¡ãœããã§TaskInstantiationMode#ISOLATED
ãè¿ãããã«ããŠããããšã§ããã
@Override public TaskInstantiationMode getInstantiationMode() { return TaskInstantiationMode.ISOLATED; }
ããã§ãåŒã³åºãéœåºŠãã®Server Taskãã€ã³ã¹ã¿ã³ã¹åãããããšã«ãªããŸãã
ãã®ãããTaskContext
ãThreadLocal
ã§ä¿æããå¿
èŠã¯ãªããªããŸãã
TaskContext taskContext; @Override public void setTaskContext(TaskContext taskContext) { this.taskContext = taskContext; }
æåŸã«ãServer Taskã¯Service Loaderã®ä»çµã¿ã§ããŒãããããããMETA-INF/services/org.infinispan.tasks.ServerTask
ãšãããã¡ã€ã«ã«
ä»åäœæããã¯ã©ã¹ã®FQCNãæžããŠãããŸãã
src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask
org.littlewings.infinispan.remote.task.SharedInstanceSummarizeServerTask org.littlewings.infinispan.remote.task.IsolatedInstanceSummarizeServerTask
ããšã¯ããã±ãŒãžã³ã°ããŠ
$ mvn package -DskipTests=true
Infinispan Serverã«ãããã€ãServer Taskã®JARãã¡ã€ã«ããserver/lib
ã«ã³ããŒããŸãã
$ cp /path/to/target/remote-task-instantiation-0.0.1-SNAPSHOT.jar server/lib
JARãã¡ã€ã«ãé 眮ããããInfinispan Serverãåèµ·åããŸãã
èµ·åäžã®ãã°ã§ãäœæããServer TaskãããŒããããŠããã®ã確èªããŸãããã
2022-10-25 15:05:34,561 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'org.littlewings.infinispan.remote.task.SharedInstanceSummarizeServerTask' 2022-10-25 15:05:34,561 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'org.littlewings.infinispan.remote.task.IsolatedInstanceSummarizeServerTask'
åãããšããå šããŒãã«å¯ŸããŠè¡ã£ãããããã€å®äºã§ãã
確èªãã
確èªã¯ããã¹ãã³ãŒãã§è¡ããŸãã
äœæãããã¹ãã³ãŒãã¯ãã¡ãã
src/test/java/org/littlewings/infinispan/remote/task/RemoteTaskInstantiationTest.java
package org.littlewings.infinispan.remote.task; import java.util.List; import java.util.Map; import java.util.function.Consumer; import java.util.stream.IntStream; import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.RemoteCacheManager; import org.infinispan.client.hotrod.RemoteCacheManagerAdmin; import org.infinispan.client.hotrod.configuration.Configuration; import org.infinispan.client.hotrod.configuration.ConfigurationBuilder; import org.infinispan.configuration.cache.CacheMode; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; public class RemoteTaskInstantiationTest { @BeforeAll static void createCache() { Configuration configuration = new ConfigurationBuilder() .uri("hotrod://ispn-admin:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222") .build(); try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) { RemoteCacheManagerAdmin admin = manager.administration(); admin.removeCache("distCache"); org.infinispan.configuration.cache.Configuration cacheConfiguration = new org.infinispan.configuration.cache.ConfigurationBuilder() .clustering().cacheMode(CacheMode.DIST_SYNC) .security().authorization().enable() .encoding().key().mediaType("application/x-protostream") .encoding().value().mediaType("application/x-protostream") .build(); admin.getOrCreateCache("distCache", cacheConfiguration); } } <K, V> void withRemoteCache(String cacheName, Consumer<RemoteCache<K, V>> consumer) { Configuration configuration = new ConfigurationBuilder() .uri("hotrod://ispn-user:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222") .build(); try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) { RemoteCache<K, V> cache = manager.getCache(cacheName); consumer.accept(cache); } } @Test public void instanceSharedTasks() { this.<String, Integer>withRemoteCache("distCache", cache -> { IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, i)); assertThat(cache).hasSize(100); List<String> results1 = cache.execute("SharedInstanceSummarizeServerTask", Map.of("message", "hello")); assertThat(results1) .hasSize(3) .containsOnly( "hello, shared instance server task call count = 1, sum result = 5050", "hello, shared instance server task call count = 1, sum result = 5050", "hello, shared instance server task call count = 1, sum result = 5050" ); List<String> results2 = cache.execute("SharedInstanceSummarizeServerTask", Map.of("message", "world")); assertThat(results2) .hasSize(3) .containsOnly( "world, shared instance server task call count = 2, sum result = 5050", "world, shared instance server task call count = 2, sum result = 5050", "world, shared instance server task call count = 2, sum result = 5050" ); List<String> results3 = cache.execute("SharedInstanceSummarizeServerTask", Map.of("message", "yeah")); assertThat(results3) .hasSize(3) .containsOnly( "yeah, shared instance server task call count = 3, sum result = 5050", "yeah, shared instance server task call count = 3, sum result = 5050", "yeah, shared instance server task call count = 3, sum result = 5050" ); cache.clear(); }); } @Test public void instanceIsolatedTasks() { this.<String, Integer>withRemoteCache("distCache", cache -> { IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, i)); assertThat(cache).hasSize(100); List<String> results1 = cache.execute("IsolatedInstanceSummarizeServerTask", Map.of("message", "hello")); assertThat(results1) .hasSize(3) .containsOnly( "hello, isolated instance server task call count = 1, sum result = 5050", "hello, isolated instance server task call count = 1, sum result = 5050", "hello, isolated instance server task call count = 1, sum result = 5050" ); List<String> results2 = cache.execute("IsolatedInstanceSummarizeServerTask", Map.of("message", "world")); assertThat(results2) .hasSize(3) .containsOnly( "world, isolated instance server task call count = 1, sum result = 5050", "world, isolated instance server task call count = 1, sum result = 5050", "world, isolated instance server task call count = 1, sum result = 5050" ); List<String> results3 = cache.execute("IsolatedInstanceSummarizeServerTask", Map.of("message", "yeah")); assertThat(results3) .hasSize(3) .containsOnly( "yeah, isolated instance server task call count = 1, sum result = 5050", "yeah, isolated instance server task call count = 1, sum result = 5050", "yeah, isolated instance server task call count = 1, sum result = 5050" ); cache.clear(); }); } }
ãã¹ãã³ãŒãã®å 容ã説æããŠãããŸãã
ãã¡ãã¯ããã¹ãã®æåã«ãã£ãã·ã¥ã®äœæãè¡ãããã«ããŠããŸãããã¡ãã®æ¥ç¶ã§äœ¿çšããŠãããŠãŒã¶ãŒã¯ã管çãŠãŒã¶ãŒã§ãã
@BeforeAll static void createCache() { Configuration configuration = new ConfigurationBuilder() .uri("hotrod://ispn-admin:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222") .build(); try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) { RemoteCacheManagerAdmin admin = manager.administration(); admin.removeCache("distCache"); org.infinispan.configuration.cache.Configuration cacheConfiguration = new org.infinispan.configuration.cache.ConfigurationBuilder() .clustering().cacheMode(CacheMode.DIST_SYNC) .security().authorization().enable() .encoding().key().mediaType("application/x-protostream") .encoding().value().mediaType("application/x-protostream") .build(); admin.getOrCreateCache("distCache", cacheConfiguration); } }
ãã£ãã·ã¥ã®çš®é¡ã¯ããªããšãªãDistributed Cacheã§ãã
ãã¡ãã¯ããã¹ãå
ã§äœ¿çšããRemoteCache
ãäœæããã¡ãœãããæ¥ç¶ã«ã¯ã¢ããªã±ãŒã·ã§ã³çšã®ãŠãŒã¶ãŒã䜿çšããŸãã
<K, V> void withRemoteCache(String cacheName, Consumer<RemoteCache<K, V>> consumer) { Configuration configuration = new ConfigurationBuilder() .uri("hotrod://ispn-user:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222") .build(); try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) { RemoteCache<K, V> cache = manager.getCache(cacheName); consumer.accept(cache); } }
ããšã¯ãã¹ãã§ãã
æåã¯ãTaskInstantiationMode
ãTaskInstantiationMode#SHARED
ã«ããServer Taskã«å¯ŸããåŒã³åºãã
@Test public void instanceSharedTasks() { this.<String, Integer>withRemoteCache("distCache", cache -> { IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, i)); assertThat(cache).hasSize(100); List<String> results1 = cache.execute("SharedInstanceSummarizeServerTask", Map.of("message", "hello")); assertThat(results1) .hasSize(3) .containsOnly( "hello, shared instance server task call count = 1, sum result = 5050", "hello, shared instance server task call count = 1, sum result = 5050", "hello, shared instance server task call count = 1, sum result = 5050" ); List<String> results2 = cache.execute("SharedInstanceSummarizeServerTask", Map.of("message", "world")); assertThat(results2) .hasSize(3) .containsOnly( "world, shared instance server task call count = 2, sum result = 5050", "world, shared instance server task call count = 2, sum result = 5050", "world, shared instance server task call count = 2, sum result = 5050" ); List<String> results3 = cache.execute("SharedInstanceSummarizeServerTask", Map.of("message", "yeah")); assertThat(results3) .hasSize(3) .containsOnly( "yeah, shared instance server task call count = 3, sum result = 5050", "yeah, shared instance server task call count = 3, sum result = 5050", "yeah, shared instance server task call count = 3, sum result = 5050" ); cache.clear(); }); }
åŒã³åºãããšã«ãã«ãŠã³ãã¢ãããããŠããŸãããããã¯ãServer Taskã®ã€ã³ã¹ã¿ã³ã¹ã䜿ãåãããŠããããã§ãã
ç¶ããŠãTaskInstantiationMode
ãTaskInstantiationMode#ISOLATED
ã«ããServer Taskã«å¯ŸããåŒã³åºãã
@Test public void instanceIsolatedTasks() { this.<String, Integer>withRemoteCache("distCache", cache -> { IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, i)); assertThat(cache).hasSize(100); List<String> results1 = cache.execute("IsolatedInstanceSummarizeServerTask", Map.of("message", "hello")); assertThat(results1) .hasSize(3) .containsOnly( "hello, isolated instance server task call count = 1, sum result = 5050", "hello, isolated instance server task call count = 1, sum result = 5050", "hello, isolated instance server task call count = 1, sum result = 5050" ); List<String> results2 = cache.execute("IsolatedInstanceSummarizeServerTask", Map.of("message", "world")); assertThat(results2) .hasSize(3) .containsOnly( "world, isolated instance server task call count = 1, sum result = 5050", "world, isolated instance server task call count = 1, sum result = 5050", "world, isolated instance server task call count = 1, sum result = 5050" ); List<String> results3 = cache.execute("IsolatedInstanceSummarizeServerTask", Map.of("message", "yeah")); assertThat(results3) .hasSize(3) .containsOnly( "yeah, isolated instance server task call count = 1, sum result = 5050", "yeah, isolated instance server task call count = 1, sum result = 5050", "yeah, isolated instance server task call count = 1, sum result = 5050" ); cache.clear(); }); }
ãã¡ãã¯ãã«ãŠã³ãã¢ãããããŸãããåŒã³åºãããšã«Server Taskã®ã€ã³ã¹ã¿ã³ã¹ãäœãããŠããããã§ããã
ãšããããã§ã確èªããŠããããã£ãããšã¯ããã§æžã¿ãŸããã
ãœãŒã¹ã³ãŒããèŠãŠã¿ã
ä»åã®å 容ã«ã€ããŠãInfinispanåŽã®ãœãŒã¹ã³ãŒããå°ãèŠãŠãããŸãããã
ãŸããServer Taskã®ã€ã³ã¹ã¿ã³ã¹åã®å¶åŸ¡ã«ã€ããŠã¯ãã¡ãã§ãã
public T run(TaskContext context) throws Exception { final ServerTask<T> t; if (task.getInstantiationMode() == TaskInstantiationMode.ISOLATED) { t = Util.getInstance(task.getClass()); } else { t = task; } t.setTaskContext(context); if (log.isTraceEnabled()) { log.tracef("Executing task '%s' in '%s' mode using context %s", getName(), getInstantiationMode(), context); } return t.call(); }
Server Taskã®å®è¡ã«ããããInfinispan Serverã«æ¥ç¶ãããŠãŒã¶ãŒã«ADMINæš©éãå¿
èŠãšãããŠããç¹ã«ã€ããŠã¯ãPull Requestã§
ãœãŒã¹ã³ãŒãã倧éã«å€ãã£ãŠããã®ã§ãããŸã深远ãããŸããã§ããâŠã
ãŸããMarshallerãProtoStreamã«ããŠå®è¡ããŒããå
šããŒãã«ããå Žåã«Marshallingã«å€±æããåé¡ã¯ã以äžã®éšåã§
Collections#synchronizedList
ãArrayList
ã«ããããšã§è§£æ¶ãããŠããŸãã
確èªã¯ããããªãšããã§ã
ãŸãšã
ä»åã¯ãInfinispan Serverã®Server Taskã§ã以åã«ããã€ãå°ã£ãããšãããåé¡ãä¿®æ£ãããŠããã®ã確èªããŠã¿ãŸããã
ã ãã¶äœ¿ãããããªã£ãã®ã§ã¯ãªãããªããšæããŸãã
ä»åäœæãããœãŒã¹ã³ãŒãã¯ããã¡ãã«çœ®ããŠããŸãã
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-task-instantiation