If we were to apply a standard deviation outlier detection to the whole dataset upperBound=avg+stdev*2
there would be 3,306 results between 672 users.
Copy to Clipboard
There isn’t that much actual anomalous behavior happening in this example, but what’s normal for one user can be abnormal for another.
Applying a single upper bound to all users doesn’t actually capture anomalies. However, what if we were to have separate upper bounds for each user? Interestingly, this is worse–8,061 results, and that’s only looking at users with over 30 data points. Much of the activity was buried under a high upper bound from users or accounts that regularly log in from many sources.
Copy to Clipboard
Now let’s look at the hour of day and weekdays/weekends. We’ll need to look at 30 days to have enough data points for grouping.
Copy to Clipboard
Requiring more than 15 data points, there are 14,298 results. It’s getting even worse because more events aren’t getting buried by high counts during certain hours of day.
What about MLTK?
Splunk’s Machine Learning Toolkit (MLTK) adds machine learning capabilities to Splunk. One of the included algorithms for anomaly detection is called DensityFunction. This algorithm is meant to detect outliers in this kind of data.
Unfortunately, outside of editing config files and making sure you have enough processing power, the DensityFunction is limited to 1024 groupings and 100,000 events before it starts sampling data.
If identity data in Splunk for different types of users is high quality, reflects different usage patterns, and there are less than 1024 of them then MLTK may be the direction to go.
Using streamstats to get neighboring values
As an alternative to MLTK, I use streamstats to mimic how I–as an analyst–investigate an alert.
For our example of a user being seen logging in from an anomalous number of sources, I would start by looking at historical source counts over the past 30 days. If the source count was significantly higher than any previous source counts I would consider it anomalous.
Using streamstats we can put a number to how much higher a source count is to previous counts:
1. Calculate the metric you want to find anomalies in.
Copy to Clipboard
In our case we’re looking at a distinct count of src by user and _time where _time is in 1 hour spans.
2. Sort the metric ascending.
Copy to Clipboard
We need the 0 here to make sort work on any number of events; normally it defaults to 10,000.
3. Run streamstats over the data to get the lower values for each value calculating the sum and how many previous values there were.
Copy to Clipboard
Current=f to only look at the previous values. For window=5 we’re looking at the previous 5 lower values but the number here isn’t too important, it just needs to be enough to get a good sample of previous values. Global=f needs to be used since we’re using a window and want to have separate windows for each user. I’m also listing out the previous values for added context.
4. Sort the metric descending.
Copy to Clipboard
Same as ascending we need to use sort 0.
5. Run streamstats over the data descending to get the higher values for each value calculating the sum of higher values and how many higher values there were.
Copy to Clipboard
For this we can look at all higher counts that have been seen so no window is required.
6. Use fillnull to fill in 0 if there were no values found for one of the calculations.
Copy to Clipboard
7. Calculate the total number of nearby values and their sum.
Copy to Clipboard
8. Calculate a distance metric.
Copy to Clipboard
9. Filter the results on the distance metric.
Copy to Clipboard
Adjust the threshold for distance score based on your results. Add a fallback threshold if you still want results if there is no history.
Putting it all together
To put this together as a correlation search, we need to make sure we’re pulling in the data we want and that it’s normalized. It can also be useful to add additional metrics to filter on.
In the case of this search, in addition to src_count, I’ve added a new_src_count for a count of sources only seen a single day in the past 30.
| tstats `summariesonly` count from datamodel=Authentication where Authentication.signature_id=4624 NOT Authentication.user=”-” NOT Authentication.user=”ANONYMOUS LOGON” NOT Authentication.user=”unknown” NOT Authentication.src=”unknown” by Authentication.user Authentication.src Authentication.dest_nt_domain _time span=1h |
|
---|---|
| rename Authentication.* as * dest_nt_domain as user_domain |
|
| `get_asset(src)` |
|
| eval src=lower(if(match(src, “([0-9]{1,3}\.){3}[0-9]{1,3}”) AND isnotnull(src_nt_host), mvindex(src_nt_host, 0), src)) |
|
| eval user=lower(mvindex(split(user, “@”), 0)) |
|
| where lower(src)!=lower(user_domain) |
|
| bin _time span=1d as day |
|
| eventstats dc(day) as day_count by user src |
|
| stats dc(src) as src_count dc(eval(if(day_count=1, src, null()))) as new_src_count by user _time |
|
| sort 0 src_count |
|
| streamstats window=5 current=f global=f count as events_with_closest_lower_count sum(src_count) as sum_of_last_five list(src_count) as previous_five_counts by user |
|
| sort 0 -src_count |
|
| streamstats current=f count as events_with_higher_count values(src_count) as higher_counts_seen sum(src_count) as sum_of_higher_count by user |
|
| fillnull events_with_higher_count events_with_closest_lower_count sum_of_higher_count sum_of_last_five |
|
| eval count_of_nearby_values=events_with_higher_count+events_with_closest_lower_count, sum_of_nearby_values=sum_of_higher_count+sum_of_last_five |
|
| eval distance_score=(src_count*count_of_nearby_values)/sum_of_nearby_values |
|
| where (((distance_score>2 AND new_src_count/src_count>0.3) OR distance_score>5) OR (count_of_nearby_values=0 AND src_count>3)) AND _time>=relative_time(now(), “-4h”) | rename _time as orig_time | convert ctime(orig_time) | Alert conditions:
|
Running this search over 30 days returns 10 results, and even accessing a few new sources can trigger an anomaly.