Logstash

When developing a Logstash filter, or performing any troubleshooting, consider the following:

Direct Output to the Console

  1. Stop the Logstash service, so you can run it manually, calling a specific configuration file.
$ systemctl stop logstash
  1. Configure output to stdout. That prints all output to the screen, making it easier to debug filters.
output {
  stdout {}
}
  1. Run Logstash manually providing a specific configuration file.
$ /usr/share/logstash/bin/logstash -f /path/to/config

Working with sincedb

SinceDB keeps track of logs that have been parsed by Logstash. That ensures that data is not duplicated in ElasticSearch. When developing a Logstash filter though, we often want data replayed. We can achieve that in a couple of ways. I find I combine this with deleting data reasonably frequently. See ElasticSeach below for information on how to do that.

  1. Configure the sincedb to /dev/null. That ensures each time you run logstash, all log files in the path are parsed. Without this, only the log files written up to the file contained in the default sincedb are parsed.
input {
    file {
        path => "/path/to/logfiles"
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}
  1. Clear sincedb by deleting the file in the folder /var/lib/logstash/plugins/input/file containing the logs you are working with. Note they are hidden files, i.e. .sincedb_uuid.

If you have pipelines in production, be extremely careful.

Debug Logging

Enable debug logging. This is configured in /etc/logstash/logstash.yml

# ------------ Debugging Settings --------------
#
# Options for log.level:
#   * fatal
#   * error
#   * warn
#   * info (default)
#   * debug
#   * trace
#
log.level: debug
path.logs: /var/log/logstash

When running Logstash from the command line as mentioned above, debug messages are also displayed on the console.

Filters

This is where the power of Logstash lies. Filters are essential, but often complicated and confusing. Google has very much been my friend when working on these. Filters are what allow us to turn the input data into a structured queryable format, be that input data syslog, json, kv pairs or just about anything else. The following filter plugins and examples are the most common I've come across so far, and the example configuration is snippets from my FortiGate logstash pipeline.

Grok

Grok is a used to parse unstructured data into something structured and thus queryable. There is a link to the official documentation below.

So far the primary use I've had is retrieving the Syslog timestamp, then assigning the remaining data to a variable for further filtering.

if [type] == "seclog" {
    grok {
        match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{GREEDYDATA:syslog_message}" }
        add_field => [ "received_at", "%{@timestamp}" ]
        add_field => [ "received_from", "%{host}" ]
    }
  • The grok filter above takes the input data and assign it to a variable called message.
  • Grok handles syslog extremely well, as log the messages conform roughly to RFC3164. Because of that, we can use the SYSLOGTIMESTAMP and SYSLOGHOST labels to assign those fields to variables syslog_timestamp and syslog_message.
  • GREEDYDATA assigns the remaining message to the variable syslog_message allowing us to use other filter plugins to manipulate that data.
  • The add_field commands insert fields into the message containing the time that logstash received the data and the host that sent the data. Note that you can extract the syslog timestamp from the message using the date filter. This may differ from the received_at timestamp, especially if you are replaying logs during development.

Date

As mentioned above, the syslog timestamp can be extracted from the data using the date filter.

date {
    match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
  • The syslog_timestamp variable is the one extracted in the grok statement above.
  • Note that there are two matching filters, to accommodate the different date representation depending on what day of the month it is. Both are required.
  • This date is defined as the @timestamp field, which can then be used for accurately sorting in chronological order.

KV

The KV filter is for key/value pairs. For example, Fortigate logs contain data in the format:

date=2018-10-27 time=16:01:06 devname=firewall

Parsing this through KV results in

{
    date: "2018-10-27",
    time: "16:01:06",
    devname: "firewall"
}

By default, KV splits the data using the = sign, but this can be configured as any character.

kv {
    source => "syslog_message"
}
  • The above KV filter splits all of the pairs in the syslog_message variable (captured by GREEDYDATA in grok).
  • There are plenty of other options, but for the PoC requirements, this works.

GeoIP

GeoIP, as the name suggests, is used to determine the geo-location based on IP address. You can get some interesting insight with this information. As we are using the Free version of the stack, we only get the lite database.

To update the database run:

$ /usr/share/logstash/bin/logstash-plugin update logstash-filter-geoip

You will need to reference the database in the logstash pipeline. It lives here in my implementation:

$ /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb

The filter looks like this:

geoip {
    source => "remip"
    database => "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"
}  
  • The above filter takes the remip variable and finds the geolocation from the database specified. That results in several geo* entries appearing in the resulting data.
  • To get the best results in Kibana with GeoIP, the template mapping needs to be updated to define the geoip data as a geopoint data type. This is covered in another post, as it's easier to modify an auto-generated template than try and write one from scratch.

Mutate

Mutate, as the name suggests, allows us to mutate the data in a variety of ways. Check out the documentation for all of the options. In this instance, it's used because of a quirk with FortiGate logs and reserved names in logstash.

mutate {
    rename => { "type" => "fgt_type" }
    rename => { "subtype" => "fgt_subtype" }
    add_field => { "type" => "fortigate" }
    remove_field => ["message","syslog_message"]
}
  • The field type is renamed fgt_type, and for consistency, subtype => fgt_subtype.
  • A new field called type is created and assigned the data fortigate
  • This is because the conditional statements in the logstash pipline rely on the type variable. The syslog message overwrites the type property, meaning future conditional statements will fail. This is covered in more detail in the fortigate post. Just know that mutate can be used to make a variety of changes on data.

Another useful mutate option is gsub. It is used to perform a character substitution (or deletion, by substituting nothing). For example, take the following item:

policy: "/Common/SecurityPolicyName"

The following mutate -> gsub command

mutate {
    gsub => ["policy_name","\/Common\/",""]
}

returns

policy: "SecurityPolicyName"

Important to note that the gsub does a regular expression match. Backslashes need to be escaped. This Regex Tester is useful for us mere mortals not fluent is regex, and for understanding the above example.

Offical Documentation

Plug-in URL
grok https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
date https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html
geoip https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html
kv https://www.elastic.co/guide/en/logstash/current/plugins-filters-kv.html
mutate https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html

There are many others. These are the ones I've used the most so far.

Managing Plugins

Use the logstash-plugin application for installing/updating/removing plug-ins. In practice, I've only ever used this to list and update.

  1. Get help
$ /usr/share/logstash/bin/logstash-plugin --help
Usage:
    bin/logstash-plugin [OPTIONS] SUBCOMMAND [ARG] ...

Parameters:
    SUBCOMMAND                    subcommand
    [ARG] ...                     subcommand arguments

Subcommands:
    list                          List all installed Logstash plugins
    install                       Install a Logstash plugin
    remove                        Remove a Logstash plugin
    update                        Update a plugin
    pack                          Package currently installed plugins, Deprecated: Please use prepare-offline-pack instead
    unpack                        Unpack packaged plugins, Deprecated: Please use prepare-offline-pack instead
    generate                      Create the foundation for a new plugin
    uninstall                     Uninstall a plugin. Deprecated: Please use remove instead
    prepare-offline-pack          Create an archive of specified plugins to use for offline installation

Options:
    -h, --help                    print help
$
  1. Update all plug-ins
$ /usr/share/logstash/bin/logstash-plugin update
  1. Update a single plug-in
$ /usr/share/logstash/bin/logstash-plugin plugin-name
  1. Install a plug-in from a file
$ /usr/share/logstash/bin/logstash-plugin install /path/to/file