📜 ⬆️ ⬇️

Centralized logs for applications using heka + elasticsearch + kibana

The article describes how to set up central logging for various types of applications (Python, Java (java.util.logging), Go, bash) using a fairly new Heka project.

Heka is developed in Mozilla and written in Go. That is why I use it instead of logstash, which has similar capabilities.

Heka is based on plugins that are of five types:
  1. Input - somehow receive data (listens to ports, read files, etc.);
  2. Decoders - process incoming requests and translate them into internal structures for messages;
  3. Filters - make any actions with messages;
  4. Encoders (it is not clear how to translate) - encode internal messages into formats that are sent via output plugins;
  5. Weekend - send somewhere data.

For example, in the case of Java applications, the input plugin is LogstreamerInput, which looks for changes in the log file. New lines in the log are processed by the PayloadRegexDecoder decoder (according to a specified format) and then sent to elasticsearch via the output plugin ElasticSearchOutput. The output plug-in, in turn, encodes the message from the internal structure to the elasticsearch format via ESJsonEncoder.

Heka installation


All installation methods are described on the website ( http://hekad.readthedocs.org/en/v0.8.2/installing.html#binaries ). But the easiest way to download a ready-made binary package for your system is from https://github.com/mozilla-services/heka/releases .
')
Since I use servers under ubuntu, the description will be for this system. In this case, the installation is reduced to installing the deb package itself, setting up the /etc/hekad.toml configuration file and adding it to the upstart services.

The basic configuration of /etc/hekad.toml includes setting up the number of processes (I set equal to the number of cores), the dashboard (where you can see which plugins are included), and the udp server on port 5565, which is waiting for messages using the google protobuf protocol (used for python and go applications):

maxprocs = 4 [Dashboard] type = "DashboardOutput" address = ":4352" ticker_interval = 15 [UdpInput] address = "127.0.0.1:5565" parser_type = "message.proto" decoder = "ProtobufDecoder" 

Configuration for upstart /etc/init/hekad.conf:

 start on runlevel [2345] respawn exec /usr/bin/hekad -config=/etc/hekad.toml 


Logging Python applications


It uses the https://github.com/kalail/heka-py library and a special handler for the logging module. Code:

 import logging from traceback import format_exception try: import heka HEKA_LEVELS = { logging.CRITICAL: heka.severity.CRITICAL, logging.ERROR: heka.severity.ERROR, logging.WARNING: heka.severity.WARNING, logging.INFO: heka.severity.INFORMATIONAL, logging.DEBUG: heka.severity.DEBUG, logging.NOTSET: heka.severity.NOTICE, } except ImportError: heka = None class HekaHandler(logging.Handler): _notified = None conn = None host = '127.0.0.1:5565' def __init__(self, name, host=None): if host is not None: self.host = host self.name = name super(HekaHandler, self).__init__() def emit(self, record): if heka is None: return fields = { 'Message': record.getMessage(), 'LineNo': record.lineno, 'Filename': record.filename, 'Logger': record.name, 'Pid': record.process, 'Exception': '', 'Traceback': '', } if record.exc_info: trace = format_exception(*record.exc_info) fields['Exception'] = trace[-1].strip() fields['Traceback'] = ''.join(trace[:-1]).strip() msg = heka.Message( type=self.name, severity=HEKA_LEVELS[record.levelno], fields=fields, ) try: if self.conn is None: self.__class__.conn = heka.HekaConnection(self.host) self.conn.send_message(msg) except: if self.__class__._notified is None: print "Sending HEKA message failed" self.__class__._notified = True 

By default, the handler expects Heka to listen on port 5565.

Logging Go Applications


For logging, I forked the logging library at https://github.com/ivpusic/golog and added the ability to send messages directly to Heka. The result is located here: https://github.com/ildus/golog

Using:

 import "github.com/ildus/golog" import "github.com/ildus/golog/appenders" ... logger := golog.Default logger.Enable(appenders.Heka(golog.Conf{ "addr": "127.0.0.1", "proto": "udp", "env_version": "2", "message_type": "myserver.log", })) ... logger.Debug("some message") 


Java application logging


For Java applications, an input plugin of type LogstreamerInput with a special regexp decoder is used. It reads application logs from files that must be written in a specific format.

Configuration for heka, responsible for reading and decoding logs:

 [JLogs] type = "LogstreamerInput" log_directory = "/some/path/to/logs" file_match = 'app_(?P<Seq>\d+\.\d+)\.log' decoder = "JDecoder" priority = ["Seq"] [JDecoder] type = "PayloadRegexDecoder" #Parses com.asdf[INFO|main|2014-01-01 3:08:06]: Server started match_regex = '^(?P<LoggerName>[\w\.]+)\[(?P<Severity>[AZ]+)\|(?P<Thread>[\w\d\-]+)\|(?P<Timestamp>[\d\-\s:]+)\]: (?P<Message>.*)' timestamp_layout = "2006-01-02 15:04:05" timestamp_location = "Europe/Moscow" [JDecoder.severity_map] SEVERE = 3 WARNING = 4 INFO = 6 CONFIG = 6 FINE = 6 FINER = 7 FINEST = 7 [JDecoder.message_fields] Type = "myserver.log" Message = "%Message%" Logger = "%LoggerName%" Thread = "%Thread%" Payload = "" 


In the application, you must change the Formatter through logging.properties. Example logging.properties:

 handlers= java.util.logging.FileHandler java.util.logging.ConsoleHandler java.util.logging.FileHandler.level=ALL java.util.logging.FileHandler.pattern = logs/app_%g.%u.log java.util.logging.FileHandler.limit = 1024000 java.util.logging.FileHandler.formatter = com.asdf.BriefLogFormatter java.util.logging.FileHandler.append=tru java.util.logging.ConsoleHandler.level=ALL java.util.logging.ConsoleHandler.formatter=com.asdf.BriefLogFormatter 


BriefLogFormatter code:

 package com.asdf; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.logging.Formatter; import java.util.logging.LogRecord; public class BriefLogFormatter extends Formatter { private static final DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private static final String lineSep = System.getProperty("line.separator"); /** * A Custom format implementation that is designed for brevity. */ public String format(LogRecord record) { String loggerName = record.getLoggerName(); if(loggerName == null) { loggerName = "root"; } StringBuilder output = new StringBuilder() .append(loggerName) .append("[") .append(record.getLevel()).append('|') .append(Thread.currentThread().getName()).append('|') .append(format.format(new Date(record.getMillis()))) .append("]: ") .append(record.getMessage()).append(' ') .append(lineSep); return output.toString(); } } 


Script Logging (bash)


For bash, the input filter TcpInput (which listens on a specific port) and PayloadRegexDecoder to decode messages are added to heka. Configuration in hekad.toml:

 [TcpInput] address = "127.0.0.1:5566" parser_type = "regexp" decoder = "TcpPayloadDecoder" [TcpPayloadDecoder] type = "PayloadRegexDecoder" #Parses space_checker[INFO|2014-01-01 3:08:06]: Need more space on disk /dev/sda6 match_regex = '^(?P<LoggerName>[\w\.\-]+)\[(?P<Hostname>[^\|]+)\|(?P<Severity>[AZ]+)\|(?P<Timestamp>[\d\-\s:]+)\]: (?P<Message>.*)' timestamp_layout = "2006-01-02 15:04:05" timestamp_location = "Europe/Moscow" [TcpPayloadDecoder.severity_map] ERROR = 3 WARNING = 4 INFO = 6 ALERT = 1 [TcpPayloadDecoder.message_fields] Type = "scripts" Message = "%Message%" Logger = "%LoggerName%" Hostname = "%Hostname%" Payload = "[%Hostname%|%LoggerName%] %Message%" 

For logging, a function has been written that sends messages to the TCP port in the specified format:

 log() { if [ "$1" ]; then echo -e "app1[`hostname`|INFO|`date '+%Y-%m-%d %H:%M:%S'`]: $1" | nc 127.0.0.1 5566 || true echo $1 fi } 

Sending an INFO message with an app1 type:

 log "test test test" 


Sending entries in elasticsearch


The following configuration is added to hekad.conf:

 [ESJsonEncoder] index = "heka-%{Type}-%{2006.01.02}" es_index_from_timestamp = true type_name = "%{Type}" [ElasticSearchOutput] message_matcher = "Type == 'myserver.log' || Type=='scripts' || Type=='nginx.access' || Type=='nginx.error'" server = "http://<elasticsearch_ip>:9200" flush_interval = 5000 flush_count = 10 encoder = "ESJsonEncoder" 

Here we indicate where elasticsearch is located, how indexes should be formed and what types of messages should be sent there.

View logs


Kibana 4 is used to view logs. It is still in beta, but already quite working. To install, you need to download the archive from the page http://www.elasticsearch.org/overview/kibana/installation/ , unpack it to any folder on the server and specify the server elasticsearch location in the file config / kibana.yml (parameter elasticsearch_url).

When you first start you will need to add indexes in the Settings tab. To be able to add an index template and define fields correctly, you need to send a test message from each application. Then you can add an index template like heka - * (which will show all messages) or heka-scripts- *, thereby separating the applications from each other. Then you can go to the Discover tab and see the records themselves.

Conclusion


I showed only a part of what can be logged with Heka.
If anyone is interested, I can show the part of Ansible configuration, which automatically installs heka on all servers, and on selected elasticsearch with kibana.

Source: https://habr.com/ru/post/250803/


All Articles