Welcome to the second article of our three-part series on building a real-time log anomaly detection system.
In our previous article, we laid the foundation for our system, exploring how to leverage Spring State Machine & Spring Reactor to create a flexible and efficient framework for processing log entries and detecting anomalies. Expanding on our initial framework, this article will focus on implementing error rate-based detection, a crucial aspect of comprehensive log analysis. We'll extend our existing system to identify anomalies based on the frequency of error messages, providing deeper insights into potential system issues or security threats.
By the end of this article, you'll understand how to:
This implementation will significantly boost your system's ability to detect and respond to anomalies in real-time, further strengthening your application's resilience and security. Let's dive in ..!
Step 1: Update the existing model
LogEvent
Update the LogEvent to include error rate events (i.e. HIGH_ERROR_RATE)
public enum LogEvent {
KEYWORD_DETECTED, NORMAL_ACTIVITY, HIGH_ERROR_RATE
}
LogEntry
Update the LogEntry to include ipAddress & flag (isError) to track the error.
public record LogEntry(LocalDateTime timestamp, String ipAddress, String apiEndpoint, String message, boolean isError) {
}
LogState
Update the LogState to include error rate
public enum LogState {
NORMAL, KEYWORK_ALERT, ERROR_RATE_ALERT
}
NORMAL: It’s the default state, when no anomalies are detected.
ERROR_RATE_ALERT: Signifies an elevated error rate in recent log entries.
Step2 : Update State Transition
The system transitions from a NORMAL state to an ERROR_RATE_ALERT state when a HIGH_ERROR_RATE event is detected. If a NORMAL_ACTIVITY event is subsequently received, the system then shifts back from ERROR_RATE_ALERT to the NORMAL state.
@Override
public void configure(StateMachineTransitionConfigurer<LogState, LogEvent> transitions) throws Exception {
transitions
.withExternal()
.source(LogState.NORMAL).target(LogState.ERROR_RATE_ALERT)
.event(LogEvent.HIGH_ERROR_RATE)
.and()
.withExternal()
.source(LogState.ERROR_RATE_ALERT).target(LogState.NORMAL)
.event(LogEvent.NORMAL_ACTIVITY);
}
Step3: Process Anomaly Detection
The state machine is started before processing the entries and stopped after all entries are handled. This step initializes a state machine for a batch of entries. It creates two concurrent hash maps: one to track IP address frequency and another for error rates. It processes each log entry in the following way:
Executes anomaly detection, then updates IP-specific error frequencies
private Flux<AnomalyResult> processLogsForGivenWindow(Flux<LogEntry> entries) {
StateMachine<LogState, LogEvent> stateMachine = stateMachineFactory.getStateMachine();
ConcurrentHashMap<String, Integer> ipErrorMap = new ConcurrentHashMap<>();
return stateMachine.startReactively()
.thenMany(entries.doOnNext(entry -> updateErrorOccurrence(entry, ipErrorMap))
.flatMap(entry -> detectAnomaly(entry, stateMachine, ipErrorMap)))
.doFinally(signalType -> stateMachine.stopReactively());
}
Step 4: Keep track of the occurrence
This method updates concurrent hash maps that monitor IP address activity by keeping track of the number of error entries associated with each IP address. The count is incremented only if the log entry indicates an error.
private void updateErrorOccurrence(LogEntry entry, ConcurrentHashMap<String, Integer> ipErrorMap) {
if (entry.isError()) {
ipErrorMap.compute(entry.ipAddress(), (key, value) -> (value == null) ? 1 : value + 1);
}
}
Step 5: Perform rate based anomaly detection
The method below checks if the error rate for a given IP address exceeds a predefined threshold. If the IP address has no recorded errors, it returns an empty event. If the error count exceeds the threshold, it sends a `HIGH_ERROR_RATE` event to the provided state machine. The state machine processes the incoming event and transforms it into its respective response.
Ideally state machine processing should be separated from core business logic.
private Mono<AnomalyResult> detectErrorRateBasedAnomaly(LogEntry entry,StateMachine<LogState, LogEvent> stateMachine, ConcurrentHashMap<String, Integer> ipErrorMap) {
int currentErrorCount = dynamicErrorThreshold(entry, ipErrorMap);
if (currentErrorCount > currentErrorThreshold) {
Message<LogEvent> eventMessage = MessageBuilder.withPayload(LogEvent.HIGH_ERROR_RATE).build();
Flux<StateMachineEventResult<LogState, LogEvent>> results = stateMachine.sendEvent(Mono.just(eventMessage));
return results.next().flatMap(result -> {
if (result.getResultType() == StateMachineEventResult.ResultType.ACCEPTED) {
return Mono.just(new AnomalyResult(entry, LogState.ERROR_RATE_ALERT));
} else {
return Mono.empty();
}
});
} else {
return Mono.empty();
}
}
Step 6: Dynamic Threshold calculation:
It maintains a sliding window of recent error counts for each IP address. The method dynamically calculates an error threshold based on the average error rate in this window. When processing a log entry, it compares the current error count for the IP address against this dynamic threshold.
private int dynamicErrorThreshold(LogEntry entry, ConcurrentHashMap<String, Integer> ipErrorMap) {
int currentErrorCount = ipErrorMap.getOrDefault(entry.ipAddress(), 0);
// Update recent error counts
recentErrorCounts.offer(currentErrorCount);
if (recentErrorCounts.size() > WINDOW_SIZE) {
recentErrorCounts.poll();
}
// Recalculate threshold
if (recentErrorCounts.size() == WINDOW_SIZE) {
double averageErrorRate = recentErrorCounts.stream().mapToInt(Integer::intValue).average().orElse(0);
currentErrorThreshold = averageErrorRate * THRESHOLD_MULTIPLIER;
}
return currentErrorCount;
}
Step 7: Expose as an API:
The use of Flux allows our API to handle potentially unlimited streams of data in a non-blocking, reactive manner. This is particularly useful for log analysis, where we might be dealing with a continuous stream of log entries.
@RestController
public class AnomalyDetectionController {
private final ErrorRateBasedDetectionService errorRateBasedDetectionService;
public AnomalyDetectionController(ErrorRateBasedDetectionService errorRateBasedDetectionService) {
this.errorRateBasedDetectionService = errorRateBasedDetectionService;
}
@PostMapping("/detect-error-anomalies")
public Flux<AnomalyResult> detectErrorRateAnomalies(@RequestBody Flux<LogEntry> logEntries) {
return errorRateBasedDetectionService.detectErrorRateBasedAnomalies(logEntries);
}
}
Step 8: Testing
To evaluate the API's functionality, Postman can be employed to emulate varying error frequencies. Focus on monitoring the 'isError' flag and the 'ipAddress' field in the request payload. When a specific IP address exhibits a high error rate, the system should trigger an alert.
Conclusion:
We've significantly enhanced our system by implementing dynamic error rate-based anomaly detection. This adaptive approach allows our system to intelligently adjust its sensitivity based on recent error patterns, striking a balance between alertness and resilience to false positives. Looking ahead to the final part of our series, we'll focus on integrating high traffic-based detection using IP addresses. This will add another crucial layer to our anomaly detection capabilities, allowing us to identify potential DDoS attacks or unusual spikes in traffic from specific sources.
Also in the final part, we'll demonstrate how to seamlessly combine all of our detection mechanisms—including the high frequency detection from part one, the error rate detection we've just covered, and the upcoming traffic analysis—into a single, cohesive application.