You have one data set which is kept in any SQL/NoSql Table and few fields of that data of are either Null or outdated and now you need to populate that data using a back filling job in bulk where the table have millions of missing records.
Approach -1 - Using Kafka
Approach -2 - Pull mechanism (REST Api Based)
Detailed Discussion on Approach 2 Using spring Boot and Java 17
Below is the sample spring Boot Code with sample Employee class which will call Employee Controller Rest Url and filling the missing data in System A
Details of System A
1) Employee Class which have records
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Employee {
private int id;
private String name;
private String department;
private double salary;
private String email;
}
# Task Job table which will keep track of last processed Index of table.
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TaskJobCompletion {
private String jobId;
private long lastSuccessIndex;
private long timestamp;
}
Employee DTO in System A
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class EmployeeDto {
private int id;
private String name;
private String department;
private double salary;
private String email;
private String address;
}
# Employee Service Class in System A - Client side
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
@Service
public class EmployeeService {
@Autowired
private RestTemplate restTemplate;
public EmployeeDto fetchEmployeeData(int employeeId) {
return restTemplate.getForObject("http://localhost:8080/employee?employeeId=" + employeeId, EmployeeDto.class);
}
}
# Repository class in System A which keep track of last Processed index of data
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
@Repository
public interface TaskJobCompletionRepository extends JpaRepository<TaskJobCompletion, String> {
@Query("SELECT t.lastSuccessIndex FROM TaskJobCompletion t WHERE t.jobId = 'employeeJob'")
long findLastProcessedIndex();
@Modifying
@Transactional
@Query("UPDATE TaskJobCompletion t SET t.lastSuccessIndex = :newLastProcessedIndex WHERE t.jobId = 'employeeJob'")
void updateLastProcessedIndex(long newLastProcessedIndex);
}
# Employee Repository Class
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
public interface EmployeeRepository extends JpaRepository<Employee, Integer> {
@Query("SELECT e FROM Employee e WHERE (e.address IS NULL OR e.address = '') AND e.id > :lastProcessedIndex ORDER BY e.id ASC")
List<Employee> findEmployeesWithNullOrBlankAddress(long lastProcessedIndex, int limit);
@Modifying
@Query("UPDATE Employee e SET e.name = :#{#employeeDto.name}, e.department = :#{#employeeDto.department}, e.salary = :#{#employeeDto.salary}, e.email = :#{#employeeDto.email}, e.address = :#{#employeeDto.address} WHERE e.id = :#{#employeeDto.id}")
void updateEmployee(EmployeeDto employeeDto);
}
# ShedLock Config Class
import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider;
@Configuration
@EnableSchedulerLock(defaultLockAtMostFor = "PT30M")
public class ShedLockConfig {
@Bean
public JdbcTemplateLockProvider lockProvider(JdbcTemplate jdbcTemplate) {
return new JdbcTemplateLockProvider(
JdbcTemplateLockProvider.Configuration.builder()
.withJdbcTemplate(jdbcTemplate)
.usingDbTime() // Works with PostgreSQL, MySQL, MariaDB, MS SQL, Oracle, HSQLDB, H2, and others
.build()
);
}
}
# Cron Job worker Class which will fetch records from System B and update in Employee Table and also update the Job Table as well.
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@Component
public class CronJob {
@Autowired
private TaskJobCompletionRepository taskJobCompletionRepository;
@Autowired
private EmployeeRepository employeeRepository;
@Autowired
private EmployeeService employeeService;
@Scheduled(cron = "0 */5 * * * ?") // Runs every 5 minutes
@SchedulerLock(name = "CronJob_runJob", lockAtMostFor = "PT30M", lockAtLeastFor = "PT5M")
public void runJob() {
long lastProcessedIndex = taskJobCompletionRepository.findLastProcessedIndex();
List<Employee> employees = employeeRepository.findEmployeesWithNullOrBlankAddress(lastProcessedIndex, 1000);
CompletableFuture.runAsync(() -> {
employees.forEach(employee -> {
EmployeeDto employeeDto = employeeService.fetchEmployeeData(employee.getId());
employeeRepository.updateEmployee(employeeDto);
});
if (!employees.isEmpty()) {
long newLastProcessedIndex = employees.get(employees.size() - 1).getId();
taskJobCompletionRepository.updateLastProcessedIndex(newLastProcessedIndex);
}
}).join();
}
}
Details Of System B
# Employee DTO response class which is response from System B
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class EmployeeDto {
private int id;
private String name;
private String department;
private double salary;
private String email;
}
Rest Controller deployed in System B
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/employee")
public class EmployeeController {
@GetMapping
public EmployeeDto getEmployee(@RequestParam int employeeId) {
// For demonstration, returning a dummy EmployeeDto
return new EmployeeDto(employeeId, "John Doe", "Engineering", 75000.0, "[email protected]");
}
}
# Service class in System B
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/employee")
public class EmployeeController {
@GetMapping
public EmployeeDto getEmployee(@RequestParam int employeeId) {
// For demonstration, returning a dummy EmployeeDto
return new EmployeeDto(employeeId, "John Doe", "Engineering", 75000.0, "[email protected]");
}
}