Heka JSON Decoder using a SandboxDecoder and Lua

September 3, 2014 • 2 min read

First let me say if you’re looking for help on Heka, check out their IRC channel. It’s full of great guys that are extremely helpful! [IRC: #heka on irc.mozilla.org]

This Heka JSON encoder converts any simple key/value JSON payload into Heka fields.

# sample json payload
[{"name"=>"John Doe","title"=>"Sysadmin","@timestamp"=>"2014-09-02T22:10:28Z"}]
require "cjson"
local dt = require "date_time"

--[[
From trink in IRC - thanks!
Example use:
[HttpListenInput]
address = "0.0.0.0:8325"
decoder = "JsonDecoder"
[JsonDecoder]
type = "SandboxDecoder"
script_type = "lua"
filename = "lua_decoders/json.lua"
--]]

local message = {
   Timestamp = nil,
   Type = read_config("type"),
   Payload = nil
}

function process_message()
   local raw_message = read_message("Payload")
   local ok, json = pcall(cjson.decode, raw_message)
   if not ok then
      return 0 -- when plain text is found, ship it in it's raw form
   end

   -- allows timestamp (optional) to be set within the json "@timestamp" field
   -- useful for output to elasticsearch
   if json["@timestamp"] ~= nil then
      local ts = lpeg.match(dt.rfc3339, json["@timestamp"])
      if not ts then return -1 end

      message.Timestamp = dt.time_to_ns(ts)
      json["@timestamp"] = nil -- remove the original so it isn't duplicated in Fields
   end

   message.Payload = raw_message
   message.Fields = json

   if not pcall(inject_message, message) then return -1 end

   return 0
end

I hope to have a sample use case where we used this with a PHP application to import data into ElasticSearch. Pretty cool stuff - and super easy to setup with new and (in our case) legacy systems.