Migrating data from Splunk to the Elastic stack

Himani Raghav
6 min readJan 11, 2022

The demands of organizations to be able to collect, store and search large volumes of data have been on an exponential rise for the last two decades. With the rise of 5G, and edge computing and IoT devices starting to fuel the success of corporations, we can expect an explosion of data in the coming days.

To draw insights from this large volume of data, many organizations have (and continue to) rely on Splunk for its ability to search, monitor, analyze and visualize machine-generated data.

So why would an organization want to migrate to Elastic?

While the decision to choose one over the other depends on the customer’s specific needs, there is a unique advantage of Elastic over Splunk for use cases with large volumes of data.

Splunk uses the pricing model based on daily data volume which has rendered it too expensive as the cost increases proportionally with the data.

Another reason for organizations to migrate to ELK is the new licensing structure for Splunk moving away from perpetual licensing. Also, some of Splunk’s premium applications are built on disparate platforms thereby making it difficult to predict cost and to manage infrastructure. This brings us to the ways we can migrate data from Splunk to ELK.

The migration approach used here is the “trickle migration” in which both Splunk and the ELK stack will run parallelly for a time frame until data can be directly ingested into the ELK stack.

With the bifurcation approach of migrating data, the traffic from legacy agents such as Splunk heavy forwarder is redirected to Logstash to ingest the events to Elasticsearch. Over time the legacy agents can be replaced by Elastic Beats.

Migrating historical data

For use cases where historical data needs to be migrated to Elastic, there are five ways to do the same -

  1. Using Splunk web UI
  2. Using CLI
  3. Using SDKs
  4. Using REST API
  5. Using dump command

For a low volume of data, the simplest method is to export the data using Splunk web UI. Right after running the search, one can click on the export button below the search bar.

the export icon on Splunk web UI

Now, the format for the log file can be chosen from the dialog box. The supported formats are CSV, JSON, PDF, XML, and raw events. Both CSV and JSON can be easily forwarded to Logstash.

Once exported, the file gets saved in the default download directory of the system.

For high volume data, we can either use the Splunk Web UI with an extended session timeout:

  1. Click Settings > Server Settings > General Settings.
  2. In the Splunk Web section, increase the number in the Session timeout field.
  3. Click Save.

Or we can use the CLI method which is said to be the most stable way. Before moving on it’s advisable to set the SPLUNK_HOME environment variable using the following command in the command prompt:

setx SPLUNK_HOME "<your_splunk_directory>"

The CLI method makes use of the following command template:

splunk search [eventdata] -maxout 0 -output [rawdata|json|csv|xml] > [myfilename.log]

Here eventdata is the Splunk instance you want to search, maxout lets you set the limit to the number of logs exported, and output lets you choose the format of data. Following is one example where the time frame has been defined-

This exported data can further be fed to Logstash by creating a configuration file. A Logstash config file needs to be populated with the plugins we want to use and settings for each plugin. Following is the configuration for a JSON data input:

input {
file{
path => “C:/logFiles/splunkLogs.log”
start_position => “beginning”
type =>json
}
}
filter{
json{
source => “message”
}
}
output {
elasticsearch {
hosts => [“localhost:9200”]
}
}

Note that all slashes in the path are backward for Windows and forward for Linux.

Migrating real-time data

Any Splunk enterprise instance that collects data and forwards it to a third-party system or another Splunk instance is a forwarder. Depending on the parsing of data, they can be classified as Universal and Heavy forwarders — Universal forwarder forwards raw data and is hence faster, whereas Heavy forwarder parses and indexes the data before it’s forwarded.

For configuring Splunk to forward data to Logstash, we need a forwarder. A Splunk forwarder can be configured to conditionally forward data over a TCP socket or in standard Syslog format. Out of the two types of Splunk Forwarders discussed above, heavy forwarders are used to forward Syslog data.

We need to edit the outputs.conf to configure the heavy forwarder for routing the syslog data to Logstash.

The path of outputs.conf is $SPLUNK_HOME/etc/system/local/outputs.conf

[syslog]
defaultGroup=syslogGroup
[syslog:syslogGroup]
server = 127.0.0.1:520

Logstash needs to be configured to receive Syslog data from the same port. Also, for every Syslog type, there is a need to construct grok patterns using a grok debugger. This is how the Logstash configuration file looks like:

input {
tcp {
port => 520
type => syslog
}
udp {
port => 520
type => syslog
}
}
filter {
if [type] == “syslog” {
grok {
match => { “message” => “<13> %{IPORHOST:hostname} %{DATESTAMP:timestamp} %{NOTSPACE:tz}(\n)LogName=%{WORD:logName}(\n)EventCode=%{WORD:eventCode}(\n)EventType=%{WORD:eventType}(\n)ComputerName=%{NOTSPACE:computerName}((\n)User=%{NOTSPACE:user})?((\n)Sid=%{NOTSPACE:sid})?((\n)SidType=%{NOTSPACE:sidType})?(\n)SourceName=%{GREEDYDATA:sourceName}(\n)Type=%{NOTSPACE:type}(\n)RecordNumber=%{NOTSPACE:recordNumber}(\n)Keywords=%{NOTSPACE:keywords}(\n)TaskCategory=%{NOTSPACE:taskCategory}(\n)OpCode=%{GREEDYDATA:opCode}(\n)Message=%{GREEDYDATA:message}((\n{1,})%{GREEDYDATA:source}(\n)%{GREEDYDATA:serviceFileName}(\n)%{GREEDYDATA:serviceType}(\n)%{GREEDYDATA:serviceStartType}(\n)%{GREEDYDATA:serviceAccount})?” }
}
}
if “_grokparsefailure” in [tags] {
drop { }
}
}
output {
elasticsearch {
hosts => [“localhost:9200”]
}
}

In the CONF file above, Logstash has been configured to listen on port 520 which is the port we will be receiving Splunk data from. The grok pattern has been defined for Syslog data. However, there are two types of logs being generated by Splunk — Syslog and internal Audit logs of Splunk. This is why we drop events that do not match this grok. This way the data that is being parsed is conditionally structured and only contains Syslog data.

Here is the skeleton of the CONF file:

input {
tcp {
port => 514
type => syslog
}
}
filter {
if [type] == "syslog" {
grok {
match => { "message" => "<grok pattern>" }
}
}
if “_grokparsefailure” in [tags] {
drop { }
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
}
}

For configuring Logstash to filter the data and only send the required events to Elasticsearch, grok filter is used. The events that do not match the pattern are given a tag of “_grokparsefailure” by Logstash and are not deleted or ignored. So, we write an “if” condition in the filter to drop all events that failed to parse.

if “_grokparsefailure” in [tags] {
drop { }
}

Result

The logs are being successfully forwarded to Elasticsearch and are structured because of the Logstash filters used.

Now that the data is structured, we can extract the fields to create visualizations on Kibana:

Summary

This implementation proves that logs can be forwarded from Splunk to the Elastic stack without any third-party tools. Just like Syslog data was used in this case study, application data can be sent over a plain TCP socket to Logstash where the data can be structured, and fields of interest extracted from it before ingesting it to Elasticsearch.

This POC, wherein an environment is set up with data sources that represent high-priority business needs, serves as a phase in the migration journey. This proves to be beneficial for initial development and validation and can streamline future phases.

--

--

Himani Raghav

Currently working on the ELK stack. Enthusiastic about psychology and spiritual science.