Problem Statement You have one data set which is kept in any SQL/NoSql Table and few fields of that data of are either Null or outdated and now you need to populate that data using a back filling job in bulk where the table have millions of missing records. Condition Given Need to update filed(s) in existing table (in System A) which have millions of records Table have key which is uniquely identifying the data in table. There is one system exist which is source of truth(System B) for the data we want to back fill. The system which is holding source of truth can provide interfaces to transfer that data to calling party. Approach -1 - Using Kafka System B can take all the id of records in one text file and can generate the events in one kafka topic. System A can consume the events and can update the outdated records in the table. Approach -2 - Pull mechanism (REST Api Based) System B already has one REST api which can provide data in response for the requested records System A can create one batch job which will call multiple Rest API in Parallel and update the records in their own data base. Detailed Discussion on Approach 2 Using spring Boot and Java 17 Below is the sample spring Boot Code with sample Employee class which will call Employee Controller Rest Url and filling the missing data in System A Details of System A 1) Employee Class which have records import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor public class Employee { private int id; private String name; private String department; private double salary; private String email; } # Task Job table which will keep track of last processed Index of table. import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor public class TaskJobCompletion { private String jobId; private long lastSuccessIndex; private long timestamp; } Employee DTO in System A import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor public class EmployeeDto { private int id; private String name; private String department; private double salary; private String email; private String address; } # Employee Service Class in System A - Client side import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; @Service public class EmployeeService { @Autowired private RestTemplate restTemplate; public EmployeeDto fetchEmployeeData(int employeeId) { return restTemplate.getForObject("http://localhost:8080/employee?employeeId=" + employeeId, EmployeeDto.class); } } # Repository class in System A which keep track of last Processed index of data import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; @Repository public interface TaskJobCompletionRepository extends JpaRepository<TaskJobCompletion, String> { @Query("SELECT t.lastSuccessIndex FROM TaskJobCompletion t WHERE t.jobId = 'employeeJob'") long findLastProcessedIndex(); @Modifying @Transactional @Query("UPDATE TaskJobCompletion t SET t.lastSuccessIndex = :newLastProcessedIndex WHERE t.jobId = 'employeeJob'") void updateLastProcessedIndex(long newLastProcessedIndex); } # Employee Repository Class import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query; import org.springframework.stereotype.Repository; import java.util.List; @Repository public interface EmployeeRepository extends JpaRepository<Employee, Integer> { @Query("SELECT e FROM Employee e WHERE (e.address IS NULL OR e.address = '') AND e.id > :lastProcessedIndex ORDER BY e.id ASC") List<Employee> findEmployeesWithNullOrBlankAddress(long lastProcessedIndex, int limit); @Modifying @Query("UPDATE Employee e SET e.name = :#{#employeeDto.name}, e.department = :#{#employeeDto.department}, e.salary = :#{#employeeDto.salary}, e.email = :#{#employeeDto.email}, e.address = :#{#employeeDto.address} WHERE e.id = :#{#employeeDto.id}") void updateEmployee(EmployeeDto employeeDto); } # ShedLock Config Class import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider; @Configuration @EnableSchedulerLock(defaultLockAtMostFor = "PT30M") public class ShedLockConfig { @Bean public JdbcTemplateLockProvider lockProvider(JdbcTemplate jdbcTemplate) { return new JdbcTemplateLockProvider( JdbcTemplateLockProvider.Configuration.builder() .withJdbcTemplate(jdbcTemplate) .usingDbTime() // Works with PostgreSQL, MySQL, MariaDB, MS SQL, Oracle, HSQLDB, H2, and others .build() ); } } # Cron Job worker Class which will fetch records from System B and update in Employee Table and also update the Job Table as well. import net.javacrumbs.shedlock.spring.annotation.SchedulerLock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; import java.util.concurrent.CompletableFuture; @Component public class CronJob { @Autowired private TaskJobCompletionRepository taskJobCompletionRepository; @Autowired private EmployeeRepository employeeRepository; @Autowired private EmployeeService employeeService; @Scheduled(cron = "0 */5 * * * ?") // Runs every 5 minutes @SchedulerLock(name = "CronJob_runJob", lockAtMostFor = "PT30M", lockAtLeastFor = "PT5M") public void runJob() { long lastProcessedIndex = taskJobCompletionRepository.findLastProcessedIndex(); List<Employee> employees = employeeRepository.findEmployeesWithNullOrBlankAddress(lastProcessedIndex, 1000); CompletableFuture.runAsync(() -> { employees.forEach(employee -> { EmployeeDto employeeDto = employeeService.fetchEmployeeData(employee.getId()); employeeRepository.updateEmployee(employeeDto); }); if (!employees.isEmpty()) { long newLastProcessedIndex = employees.get(employees.size() - 1).getId(); taskJobCompletionRepository.updateLastProcessedIndex(newLastProcessedIndex); } }).join(); } } Details Of System B # Employee DTO response class which is response from System B import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor public class EmployeeDto { private int id; private String name; private String department; private double salary; private String email; } Rest Controller deployed in System B import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/employee") public class EmployeeController { @GetMapping public EmployeeDto getEmployee(@RequestParam int employeeId) { // For demonstration, returning a dummy EmployeeDto return new EmployeeDto(employeeId, "John Doe", "Engineering", 75000.0, "john.doe@example.com"); } } # Service class in System B import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/employee") public class EmployeeController { @GetMapping public EmployeeDto getEmployee(@RequestParam int employeeId) { // For demonstration, returning a dummy EmployeeDto return new EmployeeDto(employeeId, "John Doe", "Engineering", 75000.0, "john.doe@example.com"); } } Problem Statement You have one data set which is kept in any SQL/NoSql Table and few fields of that data of are either Null or outdated and now you need to populate that data using a back filling job in bulk where the table have millions of missing records. Condition Given Need to update filed(s) in existing table (in System A) which have millions of records Table have key which is uniquely identifying the data in table. There is one system exist which is source of truth(System B) for the data we want to back fill. The system which is holding source of truth can provide interfaces to transfer that data to calling party. Need to update filed(s) in existing table (in System A) which have millions of records Table have key which is uniquely identifying the data in table. There is one system exist which is source of truth(System B) for the data we want to back fill. The system which is holding source of truth can provide interfaces to transfer that data to calling party. Approach -1 - Using Kafka System B can take all the id of records in one text file and can generate the events in one kafka topic. System A can consume the events and can update the outdated records in the table. System B can take all the id of records in one text file and can generate the events in one kafka topic. System A can consume the events and can update the outdated records in the table. Approach -2 - Pull mechanism (REST Api Based) System B already has one REST api which can provide data in response for the requested records System A can create one batch job which will call multiple Rest API in Parallel and update the records in their own data base. System B already has one REST api which can provide data in response for the requested records System A can create one batch job which will call multiple Rest API in Parallel and update the records in their own data base. Detailed Discussion on Approach 2 Using spring Boot and Java 17 Below is the sample spring Boot Code with sample Employee class which will call Employee Controller Rest Url and filling the missing data in System A Details of System A 1) Employee Class which have records import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor public class Employee { private int id; private String name; private String department; private double salary; private String email; } # Task Job table which will keep track of last processed Index of table. import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor public class TaskJobCompletion { private String jobId; private long lastSuccessIndex; private long timestamp; } Employee DTO in System A import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor public class EmployeeDto { private int id; private String name; private String department; private double salary; private String email; private String address; } # Employee Service Class in System A - Client side import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; @Service public class EmployeeService { @Autowired private RestTemplate restTemplate; public EmployeeDto fetchEmployeeData(int employeeId) { return restTemplate.getForObject("http://localhost:8080/employee?employeeId=" + employeeId, EmployeeDto.class); } } # Repository class in System A which keep track of last Processed index of data import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; @Repository public interface TaskJobCompletionRepository extends JpaRepository<TaskJobCompletion, String> { @Query("SELECT t.lastSuccessIndex FROM TaskJobCompletion t WHERE t.jobId = 'employeeJob'") long findLastProcessedIndex(); @Modifying @Transactional @Query("UPDATE TaskJobCompletion t SET t.lastSuccessIndex = :newLastProcessedIndex WHERE t.jobId = 'employeeJob'") void updateLastProcessedIndex(long newLastProcessedIndex); } # Employee Repository Class import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query; import org.springframework.stereotype.Repository; import java.util.List; @Repository public interface EmployeeRepository extends JpaRepository<Employee, Integer> { @Query("SELECT e FROM Employee e WHERE (e.address IS NULL OR e.address = '') AND e.id > :lastProcessedIndex ORDER BY e.id ASC") List<Employee> findEmployeesWithNullOrBlankAddress(long lastProcessedIndex, int limit); @Modifying @Query("UPDATE Employee e SET e.name = :#{#employeeDto.name}, e.department = :#{#employeeDto.department}, e.salary = :#{#employeeDto.salary}, e.email = :#{#employeeDto.email}, e.address = :#{#employeeDto.address} WHERE e.id = :#{#employeeDto.id}") void updateEmployee(EmployeeDto employeeDto); } # ShedLock Config Class import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider; @Configuration @EnableSchedulerLock(defaultLockAtMostFor = "PT30M") public class ShedLockConfig { @Bean public JdbcTemplateLockProvider lockProvider(JdbcTemplate jdbcTemplate) { return new JdbcTemplateLockProvider( JdbcTemplateLockProvider.Configuration.builder() .withJdbcTemplate(jdbcTemplate) .usingDbTime() // Works with PostgreSQL, MySQL, MariaDB, MS SQL, Oracle, HSQLDB, H2, and others .build() ); } } # Cron Job worker Class which will fetch records from System B and update in Employee Table and also update the Job Table as well. import net.javacrumbs.shedlock.spring.annotation.SchedulerLock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; import java.util.concurrent.CompletableFuture; @Component public class CronJob { @Autowired private TaskJobCompletionRepository taskJobCompletionRepository; @Autowired private EmployeeRepository employeeRepository; @Autowired private EmployeeService employeeService; @Scheduled(cron = "0 */5 * * * ?") // Runs every 5 minutes @SchedulerLock(name = "CronJob_runJob", lockAtMostFor = "PT30M", lockAtLeastFor = "PT5M") public void runJob() { long lastProcessedIndex = taskJobCompletionRepository.findLastProcessedIndex(); List<Employee> employees = employeeRepository.findEmployeesWithNullOrBlankAddress(lastProcessedIndex, 1000); CompletableFuture.runAsync(() -> { employees.forEach(employee -> { EmployeeDto employeeDto = employeeService.fetchEmployeeData(employee.getId()); employeeRepository.updateEmployee(employeeDto); }); if (!employees.isEmpty()) { long newLastProcessedIndex = employees.get(employees.size() - 1).getId(); taskJobCompletionRepository.updateLastProcessedIndex(newLastProcessedIndex); } }).join(); } } 1) Employee Class which have records import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor public class Employee { private int id; private String name; private String department; private double salary; private String email; } # Task Job table which will keep track of last processed Index of table. import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor public class TaskJobCompletion { private String jobId; private long lastSuccessIndex; private long timestamp; } Employee DTO in System A import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor public class EmployeeDto { private int id; private String name; private String department; private double salary; private String email; private String address; } # Employee Service Class in System A - Client side import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; @Service public class EmployeeService { @Autowired private RestTemplate restTemplate; public EmployeeDto fetchEmployeeData(int employeeId) { return restTemplate.getForObject("http://localhost:8080/employee?employeeId=" + employeeId, EmployeeDto.class); } } # Repository class in System A which keep track of last Processed index of data import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; @Repository public interface TaskJobCompletionRepository extends JpaRepository<TaskJobCompletion, String> { @Query("SELECT t.lastSuccessIndex FROM TaskJobCompletion t WHERE t.jobId = 'employeeJob'") long findLastProcessedIndex(); @Modifying @Transactional @Query("UPDATE TaskJobCompletion t SET t.lastSuccessIndex = :newLastProcessedIndex WHERE t.jobId = 'employeeJob'") void updateLastProcessedIndex(long newLastProcessedIndex); } # Employee Repository Class import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query; import org.springframework.stereotype.Repository; import java.util.List; @Repository public interface EmployeeRepository extends JpaRepository<Employee, Integer> { @Query("SELECT e FROM Employee e WHERE (e.address IS NULL OR e.address = '') AND e.id > :lastProcessedIndex ORDER BY e.id ASC") List<Employee> findEmployeesWithNullOrBlankAddress(long lastProcessedIndex, int limit); @Modifying @Query("UPDATE Employee e SET e.name = :#{#employeeDto.name}, e.department = :#{#employeeDto.department}, e.salary = :#{#employeeDto.salary}, e.email = :#{#employeeDto.email}, e.address = :#{#employeeDto.address} WHERE e.id = :#{#employeeDto.id}") void updateEmployee(EmployeeDto employeeDto); } # ShedLock Config Class import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider; @Configuration @EnableSchedulerLock(defaultLockAtMostFor = "PT30M") public class ShedLockConfig { @Bean public JdbcTemplateLockProvider lockProvider(JdbcTemplate jdbcTemplate) { return new JdbcTemplateLockProvider( JdbcTemplateLockProvider.Configuration.builder() .withJdbcTemplate(jdbcTemplate) .usingDbTime() // Works with PostgreSQL, MySQL, MariaDB, MS SQL, Oracle, HSQLDB, H2, and others .build() ); } } # Cron Job worker Class which will fetch records from System B and update in Employee Table and also update the Job Table as well. import net.javacrumbs.shedlock.spring.annotation.SchedulerLock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; import java.util.concurrent.CompletableFuture; @Component public class CronJob { @Autowired private TaskJobCompletionRepository taskJobCompletionRepository; @Autowired private EmployeeRepository employeeRepository; @Autowired private EmployeeService employeeService; @Scheduled(cron = "0 */5 * * * ?") // Runs every 5 minutes @SchedulerLock(name = "CronJob_runJob", lockAtMostFor = "PT30M", lockAtLeastFor = "PT5M") public void runJob() { long lastProcessedIndex = taskJobCompletionRepository.findLastProcessedIndex(); List<Employee> employees = employeeRepository.findEmployeesWithNullOrBlankAddress(lastProcessedIndex, 1000); CompletableFuture.runAsync(() -> { employees.forEach(employee -> { EmployeeDto employeeDto = employeeService.fetchEmployeeData(employee.getId()); employeeRepository.updateEmployee(employeeDto); }); if (!employees.isEmpty()) { long newLastProcessedIndex = employees.get(employees.size() - 1).getId(); taskJobCompletionRepository.updateLastProcessedIndex(newLastProcessedIndex); } }).join(); } } Details Of System B # Employee DTO response class which is response from System B import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor public class EmployeeDto { private int id; private String name; private String department; private double salary; private String email; } Rest Controller deployed in System B import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/employee") public class EmployeeController { @GetMapping public EmployeeDto getEmployee(@RequestParam int employeeId) { // For demonstration, returning a dummy EmployeeDto return new EmployeeDto(employeeId, "John Doe", "Engineering", 75000.0, "john.doe@example.com"); } } # Service class in System B import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/employee") public class EmployeeController { @GetMapping public EmployeeDto getEmployee(@RequestParam int employeeId) { // For demonstration, returning a dummy EmployeeDto return new EmployeeDto(employeeId, "John Doe", "Engineering", 75000.0, "john.doe@example.com"); } } # Employee DTO response class which is response from System B import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor public class EmployeeDto { private int id; private String name; private String department; private double salary; private String email; } Rest Controller deployed in System B import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/employee") public class EmployeeController { @GetMapping public EmployeeDto getEmployee(@RequestParam int employeeId) { // For demonstration, returning a dummy EmployeeDto return new EmployeeDto(employeeId, "John Doe", "Engineering", 75000.0, "john.doe@example.com"); } } # Service class in System B import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/employee") public class EmployeeController { @GetMapping public EmployeeDto getEmployee(@RequestParam int employeeId) { // For demonstration, returning a dummy EmployeeDto return new EmployeeDto(employeeId, "John Doe", "Engineering", 75000.0, "john.doe@example.com"); } }