Nginx Logs to Elasticsearch (in AWS) Using Pipelines and Filebeat (no Logstash)

A pretty raw post about one of many ways of sending data to Elasticsearch. Possibly the way that requires the least amount of setup (read: effort) while still producing decent results. It’s hardly AWS specific, but it assumes an AWS Elasticsearch cluster and has a few notes regarding that.

It involves an Elasticsearch cluster and a server to send logs from. Nginx in this example. No Logstash, CloudWatch, Kibana Firehose or any other thing like that. All of these have their place and advantages, but might not be needed right away. Basically it’s a good setup for a proof of concept or for starting with Elasticsearch.

Spinning up a cluster is out of scope for this post. AWS makes it pretty easy.

Elasticsearch Basics

Getting started:

  • Document: basically a record, but it doesn’t have to be structured. a log line is a document (structured record)
  • Index: A collection of documents. An index is identified by a name (that must be all lowercase) and this name is used to refer to the index when performing indexing, search, update, and delete operations against the documents in it.

Sending data to Elasticsearch

  1. Direct API call - POST to Elasticsearch directly (usually not what you want)
  2. Logstash - separate component that sits in front of Elasticsearch. Sort of like a reverse proxy. Documents (log records) are being sent to Logstash where they can be transformed, enriched, sent to other loggers, etc. Logstash can execute plugins which give it a lot of power. But that also makes it costly in terms of resources. Amazon Elasticsearch service does NOT include Logstash, which means that it’s another thing to setup, pay for and worry about.
  3. Ingest node pipelines - introduced with Elasticsearch 5, can do some light ETL, enough for many use cases. Ingest nodes are part of Elasticsearch, no need to set up anything extra.
  4. Beats (Filebeat) - Filebeat reads (log) files line by line as they are written and sends data to Elasticsearch using one of the methods above. Part of the Beats family of data shippers.

In AWS there are more options. Like, a Lambda function that gets triggered when a log is uploaded to S3 or CloudWatch. Or using Firehose to load logs into Elasticsearch. Won’t talk about these.

Practical example: nginx log ingestion using Filebeat and pipelines

We use the last two ingest methods to get logs into Elasticsearch. Steps: - Define a pipeline on Elasticsearch cluster. The pipeline will translate a log line to JSON, informing Elasticsearch about what each field represents. For example, the first field is the client IP address. - Install and configure Filebeat to read nginx access logs and send them to Elasticsearch using the pipeline created above. - Start Filebeat and confirm that it all works as expected.

Interacting with Elasticsearch is done through API calls. One convenient way to do that is to use Kibana’s Console, under “Dev Tools”, in the left side menu. In order to get to Kibana on Amazon Elasticsearch, go to https://cluster.url/_plugin/kibana. API calls below are presented in Console format.

1. Create a pipeline for ingesting Nginx logs

Pipeline definition:

PUT _ingest/pipeline/weblog_combined
{
    "description": "Ingest pipeline for Combined Log Format",
    "processors": [
      {
        "grok": {
          "field": "message",
          "patterns": [
            """%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}"""
          ]
        }
      },
      {
        "date": {
          "field": "timestamp",
          "formats": [
            "dd/MMM/YYYY:HH:mm:ss Z"
          ]
        }
      },
      {
        "user_agent": {
          "field": "agent"
        }
      }
    ]
}

This defines three processors:

  1. grok: Translates the log line so that Elasticsearch understands each field. This is the main processor, it has many options, described in the docs.
  2. date: used to parse the fourth field, the timestamp.
  3. user_agent: Used to parse the last field. This is a plugin, doesn’t come with Elasticsearch by default. It is already installed on AWS.

To see the list of available plugins, GET _nodes/ingest. Amazon Elasticsearch service does not allow adding new plugins. The geo-ip plugin is not installed as of version 6.3, so it can’t be used in a pipeline. This is one instance where Logstash comes in if that functionality is needed.

After a pipeline is created it can be tested by using the simulate API:

POST _ingest/pipeline/weblog_combined/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "6.6.6.6 - - [07/Aug/2017:10:11:12 +0000] \"GET /login HTTP/1.1\" 200 1062 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:61.0) Gecko/20100101 Firefox/61.0\""
      }
    }
  ]
}

Which should result in:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_type": "_type",
        "_id": "_id",
        "_source": {
          "request": "/login",
          "agent": """"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:61.0) Gecko/20100101 Firefox/61.0"""",
          "auth": "-",
          "ident": "-",
          "verb": "GET",
          "message": """6.6.6.6 - - [07/Aug/2017:10:11:12 +0000] "GET /login HTTP/1.1" 200 1062 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:61.0) Gecko/20100101 Firefox/61.0"""",
          "referrer": """"-"""",
          "@timestamp": "2017-08-07T10:11:12.000Z",
          "response": 200,
          "bytes": 1062,
          "clientip": "6.6.6.6",
          "httpversion": "1.1",
          "user_agent": {
            "major": "61",
            "minor": "0",
            "os": "Mac OS X 10.13",
            "os_minor": "13",
            "os_major": "10",
            "name": "Firefox",
            "os_name": "Mac OS X",
            "device": "Other"
          },
          "timestamp": "07/Aug/2017:10:11:12 +0000"
        },
        "_ingest": {
          "timestamp": "2017-08-07T12:17:39.029Z"
        }
      }
    }
  ]
}

2. Install and configure Filebeat

On FreeBSD the package is beats.

filebeat.yml:

### Filebeat prospectors
filebeat.prospectors:
- type: log
  enabled: true
  paths:
      - /var/log/nginx/access.log
      - /var/log/nginx/*_access.log
  exclude_lines:
      - 'GET.*ELB-HealthChecker\/'
  tags:
      - weblogs
      - nginx
  fields:
      # used in the output section to send each log to its
      # proper index instead of the default 'filebeat-*'
      index_name: weblog_access
  pipeline: "weblog_combined"

### Index templates
setup.template.enabled: false

### Outputs
output.elasticsearch:
    hosts: ["logses.internal.domain:443"]
    protocol: "https"
    #username: "elastic"
    #password: "changeme"
    ssl.verification_mode: none
    # use index_name defined in the input section
    index: "%{[fields.index_name]:logs}-%{+YYYY.MM.dd}"

### Logging
#logging.level: debug
#logging.selectors: ["*"]
logging.to_syslog: true
logging.to_files: false

Filebeat starts a harvester for each file configured in inputs section.

A Filebeat configuration should have at least an input and an output section.

The config will:

  • Set one input, a log prospector, to read Nginx log files. It excludes ELB health checks from the logs, sends the logs to “weblog_combined” pipeline and adds a custom field “index_name”.
  • Because we use a custom index name we need to either define a custom template for it or tell Filebeat not to set the template at all. The field mapping is defined in the pipeline, so we disable index templates. This will create an index with default settings, like five shards and one replica. Might not be ideal.
  • Configure an Elasticsearch output to send log records to Elasticsearch cluster directly. The pipeline is specified in input config. We also use the “index_name” field defined there to set the index the output should go to.
  • Configure logging

A note here. Sending documents to Elasticsearch that the pipeline can’t process will result in

ERR Failed to publish events: temporary bulk send failure

BSD’s newsyslog (log rotation system) might append a message at the end of a log it rotates saying that it was turned over and why. This is very likely to cause the pipeline to return an error resulting in the above message in Filebeat logs and will stop further processing. The solution is to either add the “B” flag to newsyslog config or to add that line to exclude_lines in Filebeat config. Or both.

3. Start Filebeat and check

To start Filebeat with stdout output, pass it -e option. On FreeBSD it would be

filebeat -path.config /usr/local/etc -path.home /var/db/beats/filebeat -e

Set logging.level to debug in config file for verbose output.

To see a list of indeces:

GET _cat/indices

weblog_access-{date} should be there

To search for documents in an index:

GET weblog_access-*/_search?pretty=true&q=*:*

By default it only returns 10 documents.

To see the logs in Kibana, the index must be defined. Go to Management -> Index patterns. Add a new index weblog_access-*. Set @timestamp as time filter field name (this is the field we used the date processor on when we created the pipeline). Logs can be seen, searched and filtered under Discover.

Bibliography of sorts