To get the best results from the GeoIP filter, the data needs to be defined as a Geopoint Type. That requires creation of a template for the index from the dynamic mapping created by elasticsearch.

For this demonstration a test index is created using a sample FortiGate log.

  1. Create the logstash pipeline
$ vim ~/forti-test.conf
input { stdin { } }
filter {
        grok {
            match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{GREEDYDATA:syslog_message}" }
            add_field => [ "received_at", "%{@timestamp}" ]
            add_field => [ "received_from", "%{host}" ]
        }
        date {
            match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
        }
        kv {
            source => "syslog_message"
        }
        geoip {
            source => ["srcip"]
            database => "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"
        }  
        mutate {
            rename => { "type" => "fgt_type" }
            rename => { "subtype" => "fgt_subtype" }
            add_field => { "type" => "fortigate" }
            remove_field => ["message","syslog_message"]
        }
}
output {
        elasticsearch {
            hosts => ["http://192.168.86.551:9200"]
            manage_template => false
            index => "fortigate-logs-%{+YYYY.MM.dd}"
        }
    stdout { codec => rubydebug }
}
  1. Run Logstash with that configuration file
$ /usr/share/logstash/bin/logstash -f ~/forti-test.conf
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
[WARN ] 2018-11-23 08:54:31.224 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
[INFO ] 2018-11-23 08:54:31.240 [LogStash::Runner] runner - Starting Logstash {"logstash.version"=>"6.5.1"}
[INFO ] 2018-11-23 08:54:36.445 [Converge PipelineAction::Create<main>] pipeline - Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[INFO ] 2018-11-23 08:54:37.134 [[main]-pipeline-manager] elasticsearch - Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://192.168.86.55:9200/]}}
[WARN ] 2018-11-23 08:54:37.383 [[main]-pipeline-manager] elasticsearch - Restored connection to ES instance {:url=>"http://192.168.86.55:9200/"}
[INFO ] 2018-11-23 08:54:37.636 [[main]-pipeline-manager] elasticsearch - ES Output version determined {:es_version=>6}
[WARN ] 2018-11-23 08:54:37.640 [[main]-pipeline-manager] elasticsearch - Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>6}
[INFO ] 2018-11-23 08:54:37.690 [[main]-pipeline-manager] elasticsearch - New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://192.168.86.5:9200"]}
[INFO ] 2018-11-23 08:54:37.902 [[main]-pipeline-manager] geoip - Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[INFO ] 2018-11-23 08:54:38.102 [Converge PipelineAction::Create<main>] pipeline - Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x315f678b sleep>"}
The stdin plugin is now waiting for input:
[INFO ] 2018-11-23 08:54:38.186 [Ruby-0-Thread-1: /usr/share/logstash/lib/bootstrap/environment.rb:6] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[INFO ] 2018-11-23 08:54:38.474 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9601}

  1. Paste the following data to the console
Nov 23 07:14:42 1.1.1.1 date=2018-11-23 time=07:17:33 devname=firewall devid=FORTIGATE logid=0720018432 type=anomaly subtype=anomaly level=alert vd=root severity=critical srcip=110.249.212.46 srccountry="China" dstip=192.168.86.200 srcintf="port2" sessionid=0 action=detected proto=6 service="unknown" count=47209 attack="tcp_port_scan" srcport=55555 dstport=8118 attackid=100663398 policyid=1 policytype=DoS-policy ref="http://www.fortinet.com/ids/VID100663398" msg="anomaly: tcp_port_scan, 1001 > threshold 1000, repeats 47210 times" crscore=50 crlevel=critical

You should see output similar to the following, for each log file row entered:

{
                 "msg" => "anomaly: tcp_port_scan, 1001 > threshold 1000, repeats 47210 times",
               "geoip" => {
              "location" => {
            "lon" => 115.275,
            "lat" => 39.8897
        },
              "timezone" => "Asia/Shanghai",
         "country_code2" => "CN",
          "country_name" => "China",
                    "ip" => "110.249.212.46",
         "country_code3" => "CN",
             "city_name" => "Hebei",
              "latitude" => 39.8897,
           "region_code" => "13",
        "continent_code" => "AS",
           "region_name" => "Hebei",
             "longitude" => 115.275
    },
                 "ref" => "http://www.fortinet.com/ids/VID100663398",
                "date" => "2018-11-23",
                "host" => "es01",
                "type" => "fortigate",
    "syslog_timestamp" => "Nov 23 07:14:42",
       "received_from" => "192.168.86.100",
            "fgt_type" => "anomaly",
            "severity" => "critical",
         "fgt_subtype" => "anomaly",
          "srccountry" => "China",
               "proto" => "6",
             "crlevel" => "critical",
           "sessionid" => "0",
             "srcport" => "55555",
               "count" => "47209",
               "srcip" => "110.249.212.46",
     "syslog_hostname" => "es01",
             "srcintf" => "port2",
              "action" => "detected",
             "service" => "unknown",
             "dstport" => "8118",
          "policytype" => "DoS-policy",
             "crscore" => "50",
          "@timestamp" => 2018-11-22T21:14:42.000Z,
                  "vd" => "root",
                "time" => "07:17:33",
               "logid" => "0720018432",
               "level" => "alert",
              "attack" => "tcp_port_scan",
            "attackid" => "100663398",
             "devname" => "firewall",
         "received_at" => "2018-11-22T22:54:53.656Z",
               "dstip" => "192.168.86.200",
               "devid" => "FORTIGATE",
            "@version" => "1",
            "policyid" => "1"
}
  1. CTRL-C to exit Logstash.
^C[WARN ] 2018-11-23 08:54:56.723 [SIGINT handler] runner - SIGINT received. Shutting down.
[INFO ] 2018-11-23 08:54:57.882 [[main]>worker3] pipeline - Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x315f678b run>"}
  1. Download the dynamically created mapping
$ curl -XGET http://192.168.86.55:9200/fortigate*/_mapping?pretty > forti-mapping.json

Note that it downloads a copy for every index. In this demo, there is only one, but if you have multiples, you need to trim the downloaded json file, so it contains data for only the first index.

  1. Rename then open downloaded file to create a new template
$ mv seclog-mapping.json seclog-template.json
$ vim seclog-template.json
  1. Delete all lines up to "properties": {
{
  "fortigate-logs-index-2018.11.22" : {
    "mappings" : {
      "doc" : {
  1. Update the header with json below
{
  "template": "fortigate*",
  "version": 50001,
  "settings": {
    "index.refresh_interval": "5s"
  },
  "mappings": {
    "_default_": {
  1. Remove a trailing } from the end of the document. (If in doubt, VS Code can format the document CTRL+SHIFT+i)

  2. Update the following items in the geoip section:

  },
  "ip" : {
    "type" : "text",
    "fields" : {
      "keyword" : {
        "type" : "keyword",
        "ignore_above" : 256
      }
    }
  },
  "latitude" : {
    "type" : "float"
  },
  "location" : {
    "properties" : {
      "lat" : {
        "type" : "float"
      },
      "lon" : {
        "type" : "float"
      }
    }
  },
  "longitude" : {
    "type" : "float"
  },

with:

"dma_code" : {
    "type" : "short"
},
"ip" : {
    "type" : "ip"
},
"latitude" : {
    "type" : "half_float"
},
"location" : {
    "type" : "geo_point"
},
"longitude" : {
    "type" : "half_float"
},

There is much tweaking that can be done to the mapping to increase searchability and efficiency, especially with correct datatypes. For more information, see the official documentation.

  1. Delete the index
$ curl -XDELETE http://127.0.0.1:9200/fortigate*
{"acknowledged":true}
  1. Upload the template you have just created
$ curl -XPUT http://127.0.0.1:9200/_template/fortigate*?pretty -d @forti-template.json -H 'Content-Type: application/json'
{"acknowledged":true}
  1. Download the template again to confirm the change has worked.
$ curl -XGET http://127.0.0.1:9200/_template/forti*?pretty
  1. Recreate the index as per steps 1-3.

The template now has the type Geo-point, which makes using geoip data a lot easier in Kibana.