paint-brush
Error Rate-Based Anomaly Detection in Log Filesby@cybersreejith

Error Rate-Based Anomaly Detection in Log Files

by SreejithAugust 14th, 2024
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

This article will focus on implementing error rate-based detection, a crucial aspect of comprehensive log analysis. We'll extend our existing system to identify anomalies based on the frequency of error messages.
featured image - Error Rate-Based Anomaly Detection in Log Files
Sreejith HackerNoon profile picture


Welcome to the second article of our three-part series on building a real-time log anomaly detection system.


In our previous article, we laid the foundation for our system, exploring how to leverage Spring State Machine & Spring Reactor to create a flexible and efficient framework for processing log entries and detecting anomalies. Expanding on our initial framework, this article will focus on implementing error rate-based detection, a crucial aspect of comprehensive log analysis. We'll extend our existing system to identify anomalies based on the frequency of error messages, providing deeper insights into potential system issues or security threats.


By the end of this article, you'll understand how to:


  • Enhance your state machine to handle error rate detection
  • Implement reactive error counting and threshold monitoring
  • Dynamically adjust error thresholds based on system behavior


This implementation will significantly boost your system's ability to detect and respond to anomalies in real-time, further strengthening your application's resilience and security. Let's dive in ..!


Step 1: Update the existing model


  1. LogEvent


Update the LogEvent to include error rate events (i.e. HIGH_ERROR_RATE)


public enum LogEvent {
    		 KEYWORD_DETECTED, NORMAL_ACTIVITY, HIGH_ERROR_RATE
}


  • HIGH_ERROR_RATE: Signifies an elevated rate of error messages.
  • KEYWORD_DETECTED: Triggered when a specific keyword or pattern is found in a log message.
  • NORMAL_ACTIVITY: Represents standard expected log patterns.


  1. LogEntry


    Update the LogEntry to include ipAddress & flag (isError) to track the error.


public record LogEntry(LocalDateTime timestamp, String ipAddress, String apiEndpoint, String message, boolean isError) {
	
}


  • timestamp: When the log entry was created.
  • ipAddress: The source IP address for the logged event.
  • apiEndpoint: The API endpoint involved in the logged event.
  • message: The actual log message content.
  • isError: A boolean flag indicating whether this log entry represents an error.


  1. LogState


    Update the LogState to include error rate


public enum LogState {
    NORMAL, KEYWORK_ALERT, ERROR_RATE_ALERT
}


  • NORMAL: It’s the default state, when no anomalies are detected.

  • ERROR_RATE_ALERT: Signifies an elevated error rate in recent log entries.



Step2 : Update State Transition


The system transitions from a NORMAL state to an ERROR_RATE_ALERT state when a HIGH_ERROR_RATE event is detected. If a NORMAL_ACTIVITY event is subsequently received, the system then shifts back from ERROR_RATE_ALERT to the NORMAL state.


    @Override
    public void configure(StateMachineTransitionConfigurer<LogState, LogEvent> transitions) throws Exception {
        transitions
            .withExternal()
                .source(LogState.NORMAL).target(LogState.ERROR_RATE_ALERT)
                .event(LogEvent.HIGH_ERROR_RATE)
            .and()
            .withExternal()
                .source(LogState.ERROR_RATE_ALERT).target(LogState.NORMAL)
                .event(LogEvent.NORMAL_ACTIVITY);
    }



Step3: Process Anomaly Detection


The state machine is started before processing the entries and stopped after all entries are handled. This step initializes a state machine for a batch of entries. It creates two concurrent hash maps: one to track IP address frequency and another for error rates. It processes each log entry in the following way:


Executes anomaly detection, then updates IP-specific error frequencies



	private Flux<AnomalyResult> processLogsForGivenWindow(Flux<LogEntry> entries) {
		StateMachine<LogState, LogEvent> stateMachine = stateMachineFactory.getStateMachine();
		ConcurrentHashMap<String, Integer> ipErrorMap = new ConcurrentHashMap<>();

		return stateMachine.startReactively()
				.thenMany(entries.doOnNext(entry -> updateErrorOccurrence(entry, ipErrorMap))
				.flatMap(entry -> detectAnomaly(entry, stateMachine, ipErrorMap)))
				.doFinally(signalType -> stateMachine.stopReactively());
	}



Step 4: Keep track of the occurrence


This method updates concurrent hash maps that monitor IP address activity by keeping track of the number of error entries associated with each IP address. The count is incremented only if the log entry indicates an error.


	private void updateErrorOccurrence(LogEntry entry, ConcurrentHashMap<String, Integer> ipErrorMap) {
		if (entry.isError()) {
			ipErrorMap.compute(entry.ipAddress(), (key, value) -> (value == null)  ? 1 : value + 1);
		}
	}



Step 5: Perform rate based anomaly detection


The method below checks if the error rate for a given IP address exceeds a predefined threshold. If the IP address has no recorded errors, it returns an empty event. If the error count exceeds the threshold, it sends a `HIGH_ERROR_RATE` event to the provided state machine. The state machine processes the incoming event and transforms it into its respective response.


Ideally state machine processing should be separated from core business logic.


	private Mono<AnomalyResult> detectErrorRateBasedAnomaly(LogEntry entry,StateMachine<LogState, LogEvent> stateMachine, ConcurrentHashMap<String, Integer> ipErrorMap) {
		int currentErrorCount = dynamicErrorThreshold(entry, ipErrorMap);

		if (currentErrorCount > currentErrorThreshold) {
			Message<LogEvent> eventMessage = MessageBuilder.withPayload(LogEvent.HIGH_ERROR_RATE).build();
			Flux<StateMachineEventResult<LogState, LogEvent>> results = stateMachine.sendEvent(Mono.just(eventMessage));

			return results.next().flatMap(result -> {
				if (result.getResultType() == StateMachineEventResult.ResultType.ACCEPTED) {
					return Mono.just(new AnomalyResult(entry, LogState.ERROR_RATE_ALERT));
				} else {
					return Mono.empty();
				}
			});
		} else {
			return Mono.empty();
		}
	}



Step 6: Dynamic Threshold calculation:


It maintains a sliding window of recent error counts for each IP address. The method dynamically calculates an error threshold based on the average error rate in this window. When processing a log entry, it compares the current error count for the IP address against this dynamic threshold.


private int dynamicErrorThreshold(LogEntry entry, ConcurrentHashMap<String, Integer> ipErrorMap) {
		int currentErrorCount = ipErrorMap.getOrDefault(entry.ipAddress(), 0);

		// Update recent error counts
		recentErrorCounts.offer(currentErrorCount);
		if (recentErrorCounts.size() > WINDOW_SIZE) {
			recentErrorCounts.poll();
		}

		// Recalculate threshold
		if (recentErrorCounts.size() == WINDOW_SIZE) {
			double averageErrorRate = recentErrorCounts.stream().mapToInt(Integer::intValue).average().orElse(0);
			currentErrorThreshold = averageErrorRate * THRESHOLD_MULTIPLIER;
		}
		return currentErrorCount;
	}



Step 7: Expose as an API:


The use of Flux allows our API to handle potentially unlimited streams of data in a non-blocking, reactive manner. This is particularly useful for log analysis, where we might be dealing with a continuous stream of log entries.


@RestController
public class AnomalyDetectionController {

    private final  ErrorRateBasedDetectionService errorRateBasedDetectionService;

    public AnomalyDetectionController(ErrorRateBasedDetectionService errorRateBasedDetectionService) {
        this.errorRateBasedDetectionService = errorRateBasedDetectionService;
    }

    @PostMapping("/detect-error-anomalies")
    public Flux<AnomalyResult> detectErrorRateAnomalies(@RequestBody Flux<LogEntry> logEntries) {
        return errorRateBasedDetectionService.detectErrorRateBasedAnomalies(logEntries);
    }
    
}



Step 8: Testing


To evaluate the API's functionality, Postman can be employed to emulate varying error frequencies. Focus on monitoring the 'isError' flag and the 'ipAddress' field in the request payload. When a specific IP address exhibits a high error rate, the system should trigger an alert.


Error Rate Detection

Conclusion:


We've significantly enhanced our system by implementing dynamic error rate-based anomaly detection. This adaptive approach allows our system to intelligently adjust its sensitivity based on recent error patterns, striking a balance between alertness and resilience to false positives. Looking ahead to the final part of our series, we'll focus on integrating high traffic-based detection using IP addresses. This will add another crucial layer to our anomaly detection capabilities, allowing us to identify potential DDoS attacks or unusual spikes in traffic from specific sources.


Also in the final part, we'll demonstrate how to seamlessly combine all of our detection mechanisms—including the high frequency detection from part one, the error rate detection we've just covered, and the upcoming traffic analysis—into a single, cohesive application.