Here at Peoplelogic, we aggregate and stitch together a lot of data from a lot of different sources on a regular basis – helping companies stay out of head of risks to their growth. Some of this aggregation takes a long time and we needed a way to distribute the processing of all this data across our fleet.
When we first started looking into how to solve this problem, we had a hard time finding the right combination of libraries, examples, and versions to combine Spring Boot, Quartz, and Hazelcast into a distributed task engine. Each one works great on their own, but what if we wanted to combine them?
This article shows you exactly that (along with combining Spring Beans and Hazelcast as a bonus!) and assumes that you’re moderately familiar with the structure and setup of a Spring Boot project (or at least the Spring Framework!). We will use IntelliJ IDEA for the project, but any integrated development environment (IDE) that supports Gradle projects should work just fine. Let’s get started!
Start with the Spring Initializr, either from inside IntelliJ IDEA or via the web, or by cloning our demo starter project at https://github.com/peoplelogic/article-starter. You’ll wind up with a project structure that looks a bit like the following:
With the project foundation in place, let’s get the project ready for Hazelcast. Open up the build.gradle file from the project you just cloned. Look for the dependencies section and add the following:
implementation 'com.hazelcast:hazelcast-all:3.12.9'
We will also need the ability to run Quartz jobs, so be sure to add the following to the build file:
compile 'org.springframework.boot:spring-boot-starter-quartz'
Finally, we want to make sure that the Quartz jobs don’t run at the same time across multiple nodes within the Hazelcast cluster, so we’ll need a library that will keep those in sync. Download https://github.com/peoplelogic/quartz-scheduler-hazelcast-jobstore/releases/download/quartz-hazelcast-jobstore-2.0.0/quartz-hazelcast-jobstore-2.0.0.jar from our GitHub repository and drop it into <project>/libs and then add the following to your build file:
compile files('libs/quartz-hazelcast-jobstore-2.0.0.jar')
Your dependencies section should now look like this:
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
compile 'org.springframework.boot:spring-boot-starter-quartz'
implementation 'com.hazelcast:hazelcast-all:3.12.1'
compile 'org.apache.commons:commons-lang3:3.11'
compile files('libs/quartz-hazelcast-jobstore-2.0.0.jar')
}
With your dependencies in place, we’re ready to start writing some code.
Now that Spring Boot is in place, configuring Hazelcast is easy. We simply need to create a Configuration class (from spring-context) and specify the Spring beans that we’ll expose when running locally or in production. First, create a new package called config under ai.peoplelogic.demo and create a new class called HazelcastConfiguration.java. Paste the following into that file:
package ai.peoplelogic.demo.demo.config;
import com.hazelcast.config.Config;
import com.hazelcast.config.JoinConfig;
import com.hazelcast.spring.context.SpringManagedContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import static java.util.Collections.singletonList;
@Configuration
public class HazelcastConfiguration {
@Bean
public SpringManagedContext managedContext() {
return new SpringManagedContext();
}
@Bean
@Profile("local")
public Config localConfig() {
Config config = new Config();
config.setInstanceName("scaleable-task-demo");
config.setManagedContext(managedContext());
config.getNetworkConfig().setPort(3000);
config.getGroupConfig().setName("scaleable-task-demo");
JoinConfig joinConfig = config.getNetworkConfig().getJoin();
joinConfig.getMulticastConfig().setEnabled(false);
joinConfig.getTcpIpConfig().setEnabled(true).setMembers(singletonList("127.0.0.1"));
return config;
}
@Bean
@Profile("prod")
public Config awsConfig() {
Config config = new Config();
config.setInstanceName("scaleable-task-demo");
config.setManagedContext(managedContext());
config.getGroupConfig().setName("scaleable-task-demo");
JoinConfig joinConfig = config.getNetworkConfig().getJoin();
joinConfig.getMulticastConfig().setEnabled(false);
joinConfig.getAwsConfig().setEnabled(true);
/**
* Shared instance IAM roles are preferred for locating other nodes in the cluster
*/
//joinConfig.getAwsConfig().setEnabled(true).setProperty("access-key", hazelcastAccessKey).setProperty("secret-key", hazelcastAccessSecret);
return config;
}
}
With the code above, we’re defining that this class provides configuration information for the autoconfiguration system in Spring Boot. We’re also defining a Spring Bean for a Hazelcast ManagedContext that tells Hazelcast that it’s tasks should be aware of Spring Beans for dependency injection.
Finally, you’re setting up two Hazelcast Config beans – one for running with a local profile and one for running in production (just change the profile name for your production instance). In this case, we’re running on AWS, so we’re using the AWS style configuration for production.
Note: We also set a name for the group in Hazelcast. This prevents other Hazelcast jobs in the local network from joining, even if they’re on the same ports. There are other configuration options available, but those are outside the scope of this demo.
Before we run, add the following line to your src/main/resources/application.properties file:
spring.profiles.active=local
With this configuration in place, let’s start up our application. You should see Hazelcast attempt to run and locate other members of the cluster name you specified:
2020-09-21 15:52:14.202 INFO 87304 --- [ main] a.peoplelogic.demo.demo.DemoApplication : The following profiles are active: local
2020-09-21 15:52:14.802 INFO 87304 --- [ main] com.hazelcast.instance.AddressPicker : [LOCAL] [scaleable-task-demo] [3.12.9] Interfaces is disabled, trying to pick one address from TCP-IP config addresses: [127.0.0.1]
2020-09-21 15:52:14.822 INFO 87304 --- [ main] com.hazelcast.instance.AddressPicker : [LOCAL] [scaleable-task-demo] [3.12.9] Picked [127.0.0.1]:3000, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=3000], bind any local is true
2020-09-21 15:52:14.830 INFO 87304 --- [ main] com.hazelcast.system : [127.0.0.1]:3000 [scaleable-task-demo] [3.12.9] Hazelcast 3.12.9 (20200819 - 3638e8b) starting at [127.0.0.1]:3000
It looks like things are ready to go! You can start up a second instance and you should see similar to the following in your logs:
Members {size:2, ver:2} [
Member [127.0.0.1]:3000 - 236936f0-bc4d-4ece-90de-c1143aa45578
Member [127.0.0.1]:3002 - 7c40f3fb-cbeb-448c-bc1f-3592fb33a3c4 this
]
Before we can get to coding our highly scalable task in Hazelcast, we need to finish setting up Quartz. Quartz is an enterprise Java Job Scheduler with good support by Spring and more features than the built-in Spring scheduler. In addition to simple time delay triggers and a crontab like trigger, it also allows for custom job stores that work in a clustered environment (Hazelcast, JDBC, etc).
To set things up, we’ll need another configuration class called QuartzConfiguration.java under the package ai.peoplelogic.demo.config with the following code:
package ai.peoplelogic.demo.demo.config;
import ai.peoplelogic.demo.demo.jobs.DemoJob;
import com.hazelcast.core.HazelcastInstance;
import com.idvp.data.infrastructure.scheduling.quarz.store.hazelcast.HazelcastJobStoreDelegate;
import org.apache.commons.lang3.ArrayUtils;
import org.quartz.CronTrigger;
import org.quartz.JobDetail;
import org.quartz.Trigger;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.*;
import java.util.Calendar;
import java.util.Properties;
@Configuration
public class QuartzConfiguration {
@Value("${demo.job.cron}")
String fetchCron;
@Autowired
HazelcastInstance hazelcastInstance;
private ApplicationContext applicationContext;
public QuartzConfiguration(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Bean
public SpringBeanJobFactory springBeanJobFactory() {
AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}
@Bean
public SchedulerFactoryBean scheduler(Trigger... triggers) {
HazelcastJobStoreDelegate.setInstance(hazelcastInstance);
SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean();
Properties properties = new Properties();
properties.setProperty("org.quartz.scheduler.instanceId", "AUTO");
properties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_CLASS, HazelcastJobStoreDelegate.class.getName());
schedulerFactory.setOverwriteExistingJobs(true);
schedulerFactory.setAutoStartup(true);
schedulerFactory.setQuartzProperties(properties);
schedulerFactory.setJobFactory(springBeanJobFactory());
schedulerFactory.setWaitForJobsToCompleteOnShutdown(true);
if (ArrayUtils.isNotEmpty(triggers)) {
schedulerFactory.setTriggers(triggers);
}
return schedulerFactory;
}
static CronTriggerFactoryBean createCronTrigger(JobDetail jobDetail, String cronExpression, String triggerName) {
// To fix an issue with time-based cron jobs
Calendar calendar = Calendar.getInstance();
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
CronTriggerFactoryBean factoryBean = new CronTriggerFactoryBean();
factoryBean.setJobDetail(jobDetail);
factoryBean.setCronExpression(cronExpression);
factoryBean.setStartTime(calendar.getTime());
factoryBean.setStartDelay(0L);
factoryBean.setName(triggerName);
factoryBean.setMisfireInstruction(CronTrigger.MISFIRE_INSTRUCTION_DO_NOTHING);
return factoryBean;
}
static JobDetailFactoryBean createJobDetail(Class jobClass, String jobName) {
JobDetailFactoryBean factoryBean = new JobDetailFactoryBean();
factoryBean.setName(jobName);
factoryBean.setJobClass(jobClass);
factoryBean.setDurability(true);
return factoryBean;
}
@Bean(name = "demoJob")
public JobDetailFactoryBean jobMemberClassStats() {
return createJobDetail(DemoQuartzJob.class, "Demo Quartz Job");
}
@Bean(name = "demoTrigger")
public CronTriggerFactoryBean triggerMemberClassStats(@Qualifier("demoJob") JobDetail jobDetail) {
return createCronTrigger(jobDetail, fetchCron, "Demo Quartz Trigger");
}
}
The code above programmatically creates a factory for creating new schedulers through the triggers that are defined as beans and sets the clustered Job store and also tells it that we’re going to start the scheduler immediately. It also defines a new Cron trigger using a property from application.properties or any other configuration provided location and defines the Job classes we’re going to execute (provided as Spring Beans).
We’re also going to need to create a Job class. Create a new package ai.peoplelogic.demo.jobs and put a class in it called DemoQuartzJob.java with the following code to start:
package ai.peoplelogic.demo.demo.jobs;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.Serializable;
@Component
public class DemoQuartzJob implements Job, Serializable {
private final Logger log = LoggerFactory.getLogger(this.getClass());
public void execute(JobExecutionContext context) {
log.info("Logging job execution.");
}
}
Note: Your Jobs will need to implement the Quartz Job interface (which provides the execute method above) and Serializable (so that Hazelcast can serialize your Job across the cluster in its store).
Finally, since we want to be able to expose our Jobs as Spring beans and also use Spring beans in our jobs we’re going to need one more helper class in ai.peoplelogic.demo.config called AutowiringSpringBeanJobFactory.java. Put the following code in that class to finish the Quartz setup:
package ai.peoplelogic.demo.demo.config;
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
public final class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {
private transient AutowireCapableBeanFactory beanFactory;
@Override
public void setApplicationContext(final ApplicationContext context) {
beanFactory = context.getAutowireCapableBeanFactory();
}
@Override
protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
final Object job = super.createJobInstance(bundle);
beanFactory.autowireBean(job);
return job;
}
}
To finish the setup of Quartz, we just need to add one more property to our src/main/resources/application.properties:
demo.job.cron=0 0/1 * * * ?
This line tells our Quartz cron job to run at the top of every minute. You could do 0/3 if you wanted to run every 3 minutes or 0/10 if you wanted to run every 10.
When you start your application again, you should see the following in the console:
2020-09-22 10:59:51.752 INFO 92337 --- [ main] org.quartz.core.SchedulerSignalerImpl : Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl
2020-09-22 10:59:51.752 INFO 92337 --- [ main] org.quartz.core.QuartzScheduler : Quartz Scheduler v.2.3.2 created.
2020-09-22 10:59:51.779 INFO 92337 --- [ main] c.i.d.i.s.q.s.h.HazelcastJobStore : HazelcastJobStore initialized.
2020-09-22 10:59:51.780 INFO 92337 --- [ main] org.quartz.core.QuartzScheduler : Scheduler meta-data: Quartz Scheduler (v2.3.2) 'scheduler' with instanceId 'Matthews-MacBook-Pro.local1600786791746'
Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
NOT STARTED.
Currently in standby mode.
Number of jobs executed: 0
Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
Using job-store 'com.idvp.data.infrastructure.scheduling.quarz.store.hazelcast.HazelcastJobStoreDelegate' - which supports persistence. and is clustered.
2020-09-22 10:59:51.780 INFO 92337 --- [ main] org.quartz.impl.StdSchedulerFactory : Quartz scheduler 'scheduler' initialized from an externally provided properties instance.
2020-09-22 10:59:51.780 INFO 92337 --- [ main] org.quartz.impl.StdSchedulerFactory : Quartz scheduler version: 2.3.2
2020-09-22 10:59:51.780 INFO 92337 --- [ main] org.quartz.core.QuartzScheduler : JobFactory set to: ai.peoplelogic.demo.demo.config.AutowiringSpringBeanJobFactory@72b40f87
2020-09-22 10:59:51.784 INFO 92337 --- [ main] c.h.i.p.impl.PartitionStateManager : [127.0.0.1]:3000 [scaleable-task-demo] [3.12.9] Initializing cluster partition table arrangement...
2020-09-22 10:59:51.957 INFO 92337 --- [ main] o.s.s.quartz.SchedulerFactoryBean : Starting Quartz Scheduler now
2020-09-22 10:59:51.990 INFO 92337 --- [ main] org.quartz.core.QuartzScheduler : Scheduler scheduler_$_Matthews-MacBook-Pro.local1600786791746 started.
Notice how it’s showing the custom Job store and tells us that we support persistence and that it’s clustered.
As it runs, you should see output from the Job we created above:
2020-09-22 11:02:01.697 INFO 92434 --- [eduler_Worker-1] a.p.demo.demo.jobs.DemoQuartzJob : Logging job execution.
If you start up a second instance of the demo application, you’ll see the output in both consoles, but at different timestamps – the clustered job store is doing its job!
Now that we’ve got Hazelcast, Quartz, and Spring boot configured and running, we need to create a task that we want to split the execution of across the Hazelcast cluster.
To get started, let’s open up the DemoQuartzJob.java file we created in the previous section. We’re going to modify the job we created earlier to ensure that we don’t execute the same job twice (ie. the job ran for too long) and also to add the code to grab an IExecutorService from Hazelcast and tell it to execute our new Task. Modify DemoQuartzJob.java so that it looks like the following:
package ai.peoplelogic.demo.demo.jobs;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IExecutorService;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
@Component
public class DemoQuartzJob implements Job, Serializable {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
HazelcastInstance hazelcastInstance;
public void execute(JobExecutionContext context) {
log.info("Job execution started.");
try {
Scheduler scheduler = context.getScheduler();
List<JobExecutionContext> jobs = scheduler.getCurrentlyExecutingJobs();
for (JobExecutionContext job : jobs) {
if (job.getTrigger().equals(context.getTrigger()) && job.getJobDetail() != context.getJobDetail()) {
log.warn("Ignored fetch job because another is running on this cluster.");
return;
}
}
executeJobOnCluster();
} catch (SchedulerException e) {
log.error("Error executing scheduler: '(", e);
}
}
private void executeJobOnCluster() {
IExecutorService executorService = hazelcastInstance.getExecutorService("scaleable-task-demo");
// Execute 100 tasks
for (int i = 0; i < 100; i++) {
DemoTask task = new DemoTask();
executorService.submit(task, new ExecutionCallback<Map<String, Object>>() {
public void onResponse(Map<String, Object> incomingResponse) {
if (incomingResponse.size() > 0 && incomingResponse.containsKey("name")) {
System.out.println("Received response on task: " + incomingResponse.get("name"));
}
}
public void onFailure(Throwable t) {
log.error("Error getting distributed task result", t);
}
});
}
}
}
You can see the execute function now checks that we’re not executing the same job twice and we’ve added a new method, executeJobOnCluster which uses Hazelcast’s callback executer pattern to send a Task (DemoTask in this case) to the cluster members and then listen for a response or failure. In the successful response (onResponse), we simply output the name key of the Map that was the response. To simulate sending a variety of tasks, we’re sending these tasks 100 times to the executor.
Note: The Task can return any object you want as long as it and everything that it references is Serializable. Be very careful about what you send back and forth over the Hazelcast cluster as it can have very sneaky serialization errors.
Note: Tasks will return in random order and asynchronously – when you handle the response, don’t expect them to be in any given order.
With the Quartz Job setup to send your Task to Hazelcast, we need to write one final class – the Callable Task – DemoTask.java. Create a new class in ai.peoplelogic.demo.jobs called DemoTask.java and add the following code:
package ai.peoplelogic.demo.demo.jobs;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.spring.context.SpringAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@SpringAware
public class DemoTask implements HazelcastInstanceAware, Callable<Map<String, Object>>, Serializable, ApplicationContextAware {
transient HazelcastInstance hazelcastInstance;
transient ApplicationContext context;
private final Logger log = LoggerFactory.getLogger(this.getClass());
public DemoTask() {
}
public void setApplicationContext(final ApplicationContext applicationContext) throws BeansException {
context = applicationContext;
}
@Override
public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
this.hazelcastInstance = hazelcastInstance;
}
@Override
public Map<String, Object> call() throws Exception {
// Here you would do some really fancy logic. We're just going to sleep for 10 seconds and store some values
TimeUnit.SECONDS.sleep(10);
log.info("Executing task");
Map<String, Object> result = new HashMap<>();
result.put("name", "Demo Task " +
hazelcastInstance.getCluster().getLocalMember().getUuid());
result.put("success", true);
return result;
}
}
In this very simple example, we’ve implemented a Callable interface that Hazelcast expects for its Executor and told Hazelcast that it needs to wire in the Hazelcast instance that we’re using and also that this Task is aware of the Spring ApplicationContext. With that configuration in place, then you can use @Autowire to inject Spring Beans into this task to do more complicated logic.
Note: Be sure to mark the autowired beans as transient like we have with the HazelcastITnstance and the ApplicationContext. This prevents them from being reused between executions.
Note: Be careful if you call database transactions from your task - particularly lazy loaded Hibernate or JPA objects – you may wind up with exceptions!
Now, with our code wired together, let’s fire it up and see if we execute the jobs across the cluster. You should see the same output across any nodes in the cluster during each Job run:
Node 2:
2020-09-22 13:25:10.057 INFO 96239 --- [cached.thread-7] ai.peoplelogic.demo.demo.jobs.DemoTask : Executing task
2020-09-22 13:25:10.058 INFO 96239 --- [cached.thread-8] ai.peoplelogic.demo.demo.jobs.DemoTask : Executing task
Node 3:
2020-09-22 13:25:10.058 INFO 96243 --- [cached.thread-4] ai.peoplelogic.demo.demo.jobs.DemoTask : Executing task
2020-09-22 13:25:10.058 INFO 96243 --- [cached.thread-5] ai.peoplelogic.demo.demo.jobs.DemoTask : Executing task
Along with output indicating that a response was returned on the node that executed the job:
Received response on task: Demo Task ff66bd34-f5b9-41f3-8cc8-4c49842e0fe2
Received response on task: Demo Task da90cff9-0475-43b6-9d9a-2fc87548ddaa
That’s it! You’re ready to scale the application across an entire fleet of servers. Just start up additional instances of your application and Hazelcast will find the new members and immediately start distributing the tasks to those new members.
In this article, you’ve seen how to setup a new Spring Boot application, add Hazelcast and Quartz to that application, and then implement the Hazelcast distributed executor pattern to execute logic from your Spring Beans across a cluster. From the example, you can now start to add more complicated logic, inject your custom Spring Beans into your tasks, or execute more compute intensive tasks.
We’ve uploaded all the sample code to a GitHub repository – feel free to clone it, use it, and contribute back changes – here https://github.com/peoplelogic/scalable-task-execution.
Happy coding!
Previously published at https://peoplelogic.ai/blog/scalable-task-execution-with-hazelcast-and-spring-boot