paint-brush
How to Back Fill Data in An Existing Databaseby@hacker-u65om3x

How to Back Fill Data in An Existing Database

by August 8th, 2024
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

A fail safe method to populate a database in a new system with missing fields or records.
featured image - How to Back Fill Data in An Existing Database
undefined HackerNoon profile picture

Problem Statement

You have one data set which is kept in any SQL/NoSql Table and few fields of that data of are either Null or outdated and now you need to populate that data using a back filling job in bulk where the table have millions of missing records.

Condition Given

  1. Need to update filed(s) in existing table (in System A) which have millions of records
  2. Table have key which is uniquely identifying the data in table.
  3. There is one system exist which is source of truth(System B) for the data we want to back fill.
  4. The system which is holding source of truth can provide interfaces to transfer that data to calling party.


Approach -1 - Using Kafka


  1. System B can take all the id of records in one text file and can generate the events in one kafka topic.
  2. System A can consume the events and can update the outdated records in the table.


Approach -2 - Pull mechanism (REST Api Based)


  1. System B already has one REST api which can provide data in response for the requested records
  2. System A can create one batch job which will call multiple Rest API in Parallel and update the records in their own data base.



Detailed Discussion on Approach 2 Using spring Boot and Java 17


Below is the sample spring Boot Code with sample Employee class which will call Employee Controller Rest Url and filling the missing data in System A


Details of System A



1) Employee Class which have records 


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
  public class Employee {
  private int id;
  private String name;
  private String department;
  private double salary;
  private String email;
}




# Task Job table which will keep track of last processed Index of table.


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
  public class TaskJobCompletion {
  private String jobId;
  private long lastSuccessIndex;
  private long timestamp;
}






Employee DTO in System A


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class EmployeeDto {
    private int id;
    private String name;
    private String department;
    private double salary;
    private String email;
    private String address;
}


# Employee Service Class in System A - Client side

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Service
public class EmployeeService {

    @Autowired
    private RestTemplate restTemplate;

    public EmployeeDto fetchEmployeeData(int employeeId) {
        return restTemplate.getForObject("http://localhost:8080/employee?employeeId=" + employeeId, EmployeeDto.class);
    }
}


# Repository class in System A which keep track of last Processed index of data 

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;

@Repository
public interface TaskJobCompletionRepository extends JpaRepository<TaskJobCompletion, String> {

    @Query("SELECT t.lastSuccessIndex FROM TaskJobCompletion t WHERE t.jobId = 'employeeJob'")
    long findLastProcessedIndex();

    @Modifying
    @Transactional
    @Query("UPDATE TaskJobCompletion t SET t.lastSuccessIndex = :newLastProcessedIndex WHERE t.jobId = 'employeeJob'")
    void updateLastProcessedIndex(long newLastProcessedIndex);
}




# Employee Repository Class 

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
import java.util.List;

@Repository
public interface EmployeeRepository extends JpaRepository<Employee, Integer> {

    @Query("SELECT e FROM Employee e WHERE (e.address IS NULL OR e.address = '') AND e.id > :lastProcessedIndex ORDER BY e.id ASC")
    List<Employee> findEmployeesWithNullOrBlankAddress(long lastProcessedIndex, int limit);

    @Modifying
    @Query("UPDATE Employee e SET e.name = :#{#employeeDto.name}, e.department = :#{#employeeDto.department}, e.salary = :#{#employeeDto.salary}, e.email = :#{#employeeDto.email}, e.address = :#{#employeeDto.address} WHERE e.id = :#{#employeeDto.id}")
    void updateEmployee(EmployeeDto employeeDto);
}



# ShedLock Config Class 

import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider;

@Configuration
@EnableSchedulerLock(defaultLockAtMostFor = "PT30M")
public class ShedLockConfig {

    @Bean
    public JdbcTemplateLockProvider lockProvider(JdbcTemplate jdbcTemplate) {
        return new JdbcTemplateLockProvider(
            JdbcTemplateLockProvider.Configuration.builder()
                .withJdbcTemplate(jdbcTemplate)
                .usingDbTime() // Works with PostgreSQL, MySQL, MariaDB, MS SQL, Oracle, HSQLDB, H2, and others
                .build()
        );
    }
}





# Cron Job worker Class which will fetch records from System B and update in Employee Table and also update the Job Table as well.


import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.CompletableFuture;

@Component
public class CronJob {

    @Autowired
    private TaskJobCompletionRepository taskJobCompletionRepository;

    @Autowired
    private EmployeeRepository employeeRepository;

    @Autowired
    private EmployeeService employeeService;

    @Scheduled(cron = "0 */5 * * * ?") // Runs every 5 minutes
    @SchedulerLock(name = "CronJob_runJob", lockAtMostFor = "PT30M", lockAtLeastFor = "PT5M")
    public void runJob() {
        long lastProcessedIndex = taskJobCompletionRepository.findLastProcessedIndex();
        List<Employee> employees = employeeRepository.findEmployeesWithNullOrBlankAddress(lastProcessedIndex, 1000);

        CompletableFuture.runAsync(() -> {
            employees.forEach(employee -> {
                EmployeeDto employeeDto = employeeService.fetchEmployeeData(employee.getId());
                employeeRepository.updateEmployee(employeeDto);
            });
            if (!employees.isEmpty()) {
                long newLastProcessedIndex = employees.get(employees.size() - 1).getId();
                taskJobCompletionRepository.updateLastProcessedIndex(newLastProcessedIndex);
            }
        }).join();
    }
}





Details Of System B


# Employee DTO response class which is response from System B

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
  public class EmployeeDto {
  private int id;
  private String name;
  private String department;
  private double salary;
  private String email;
}


Rest Controller deployed in System B


import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/employee")
public class EmployeeController {

@GetMapping
public EmployeeDto getEmployee(@RequestParam int employeeId) {
    // For demonstration, returning a dummy EmployeeDto
    return new EmployeeDto(employeeId, "John Doe", "Engineering", 75000.0, "[email protected]");
}
}



# Service class in System B

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/employee")
public class EmployeeController {

    @GetMapping
    public EmployeeDto getEmployee(@RequestParam int employeeId) {
        // For demonstration, returning a dummy EmployeeDto
        return new EmployeeDto(employeeId, "John Doe", "Engineering", 75000.0, "[email protected]");
    }
}