logstash: usando o filtro translate
Quando usamos o logstash para processar os dados antes de enviarmos para o destino final, seja ele o elasticsearch ou alguma outra saída, pode ser interessante realizar algum tipo de enriquecimento nos dados para facilitar o processo de análise, monitoramento e filtragem.
Podemos por exemplo substituir informações de códigos de dispositivos por palavras chaves que vão dar uma visão melhor do que está sendo monitorado ou que tipo de informação estamos vendo.
Um filtro do logstash que possibilita realizar esse enriquecimento nos dados é o filtro translate
.
o filtro translate
O filtro translate
funciona de uma forma bem simples, ele verifica se o valor de um campo existe como chave em um dicionário do tipo chave-valor e caso exista ele pode substituir o valor atual do campo pelo valor do dicionário ou criar um novo campo com esse valor.
Como exemplo vamos considerar que temos um dispositivo que monitora o número de pessoas que entraram ou saíram de um local, calcula o número médio de pessoas em determinado momento e envia esses dados para um logstash em um intervalo de tempo configurável.
Considerando o seguinte payload como documento:
{
"storeId": 1,
"customerIn": 5,
"customerOut": 2,
"customerAvg": 10
}
Nesse documento temos os seguintes campos:
storeId
: um identificador do local onde o dispositivo está instaladocustomerIn
: o número de pessoas que entraram no localcustomerOut
: o número de pessoas que saíram do localcustomerAvg
: o número médio de pessoas no local no momento da coleta
Quando temos uma quantidade pequena de dispositivos pode até ser possível lembrar a localização de cada um através do campo storeId
, mas a medida que o número aumenta, essa tarefa já se torna mais complicada e ter informações extras sobre o dispositivo é algo extremamente útil.
aplicando o filtro
O principal componente do filtro translate
é o seu dicionário, que pode tanto estar presente diretamente na configuração do filtro, quanto estar em um arquivo externo.
Quando utilizamos arquivos externos para o dicionário podemos utilizar um arquivo no formato YAML
, CSV
ou JSON
, onde cada item é um par de chave-valor.
Para esse exemplo vamos considerar que nosso dispositivo está localizado em uma loja de lembranças no Aeroporto Schiphol em Amsterdã, que tem como código as letras AMS.
Queremos criar um novo campo chamado storeName
a partir do valor do campo storeId
.
translate {
source => "storeId"
target => "storeName"
dictionary => {
"1" => "AMS"
}
}
Quando um evento onde o campo storeId
tem o valor igual a 1 passar por esse filtro, um novo campo chamado storeName
com o valor AMS será criado.
Considerando agora que temos um novo dispositivo com o storeId
igual a 2 instalado em uma outra filial da mesma loja localizada no Aeroporto do Galeão no Rio de Janeiro, que tem como código as letras GIG, o dicionário do filtro precisa então ter essa nova informação.
dictionary => {
"1" => "AMS"
"2" => "GIG"
}
A medida que o número de dispositivos aumenta, a opção de usar um dicionário em um arquivo externo se torna mais interessante, nesse caso precisamos apenas criar o arquivo, salvar em um lugar onde o logstash tenha permissão de leitura e configurar o filtro para ler esse arquivo.
"1": "AMS"
"2": "GIG"
"3": "CDG"
"4": "BER"
Salvando o dicionário acima como store.yml
, podemos configurar o filtro translate
para ler desse arquivo e verificar se houve alguma atualização em um intervalo agendado.
translate {
source => "storeId"
target => "storeName"
dictionary_path => "/caminho/para/o/arquivo/store.yml"
refresh_interval => 300
fallback => "local desconhecido"
}
Na configuração acima a opção refresh_interval
determina o intervalo em segundos que o logstash deve verificar se houve alteração no arquivo e em caso positivo carregar as alterações em memória, a opção fallback
é utilizada quando o valor do campo storeId
não é encontrado no dicionário.
Sendo assim:
PAYLOAD
{"storeId": 2, "customerIn": 3, "customerOut":2, "customerAvg": 39 }
OUTPUT
{
"storeId" => 2,
"@timestamp" => 2021-02-16T06:23:50.312Z,
"host" => "elk",
"customerIn" => 3,
"customerOut" => 2,
"customerAvg" => 39,
"storeName" => "GIG",
"@version" => "1"
}
substituindo o campo original
Podem haver casos onde queremos substituir o valor do campo original e não criar um novo campo, para isso o campo de origem e destino deve ser o mesmo, utilizamos a opção override
como true
e desabilitamos a opção de fallback
, pois caso o valor buscado não exista no dicionário, conseguimos identificar pelo storeId
e atualizá-lo.
translate {
source => "storeId"
target => "storeId"
dictionary_path => "/caminho/para/o/arquivo/store.yml"
refresh_interval => 300
override => true
}
um translate ou múltiplos translates?
Em alguns casos podemos também querer adicionar múltiplos campos a partir de um mesmo valor, no nosso exemplo poderíamos adicionar além do nome com o código do Aeroporto, a localização geográfica, o tamanho da loja, o país ou qualquer outro tipo de informação que possa ser útil para alguma análise posterior.
Para isso temos basicamente duas abordagens, a primeira consiste em utilizar um filtro translate
para cada informação que queremos adicionar e a segunda consiste em utilizar um único filtro translate
onde o valor associado a chave é um documento json
com os campos que queremos adicionar, posteriormente o campo adicionado pelo filtro translate
seria expandido utilizando o filtro json
.
A primeira abordagem é a mais simples, basta utilizar um arquivo para cada informação que queremos adicionar e utilizar múltiplos filtros translate
no pipeline.
EXEMPLO
translate {
source => "storeId"
target => "storeName"
dictionary_path => "/caminho/para/o/arquivo/store-name.yml"
refresh_interval => 300
fallback => "local desconhecido"
}
translate {
source => "storeId"
target => "storeCity"
dictionary_path => "/caminho/para/o/arquivo/store-city.yml"
refresh_interval => 300
fallback => "cidade desconhecida"
}
A segunda abordagem não deixa de ser simples, mas tem alguns pequenos detalhes a mais.
Primeiro temos que garantir que o valor no dicionário é um json
no formato que o filtro json
do logstash consiga fazer o parsing sem problemas, para isso basta utilizarmos o formato abaixo.
"1": '{ "storeName": "AMS", "storeCity": "Amsterdam", "storeCountry": "Netherlands" }'
"2": '{ "storeName": "GIG", "storeCity": "Rio de Janeiro", "storeCountry": "Brazil" }'
"3": '{ "storeName": "CDG", "storeCity": "Paris", "storeCountry": "France" }'
"4": '{ "storeName": "BER", "storeCity": "Berlin", "storeCountry": "Germany" }'
Na sequência, caso a opção fallback
esteja sendo usada, ela também precisa estar em formato json
, pois o campo de destino do filtro translate
será validado por um filtro json
posteriormente no pipeline.
filter {
translate {
source => "storeId"
target => "[@metadata][translate]"
dictionary_path => "/caminho/para/o/arquivo/store.yml"
fallback => '{"storeName": "local desconhecido"}'
}
json {
source => "[@metadata][translate]"
}
}
Na configuração acima estamos salvando o resultado do filtro translate
em um campo temporário chamado [@metadata][translate]
, os campos [@metadata]
podem ser criados nos pipelines e utilizados pelos filtros, mas eles não estão presentes no documento final, além disso a opção fallback
cria apenas o campo storeName
.
Após o filtro translate
utilizamos um filtro json
tendo como origem o campo temporário criado.
PAYLOAD
{"storeId": 1, "customerIn": 2, "customerOut":1, "customerAvg": 15 }
OUTPUT
{
"storeId" => 1,
"storeCountry" => "Netherlands",
"customerAvg" => 15,
"customerIn" => 2,
"@version" => "1",
"storeName" => "AMS",
"@timestamp" => 2021-02-16T07:19:52.808Z,
"customerOut" => 1,
"host" => "elk",
"storeCity" => "Amsterdam"
}
Utilizando o filtro translate
partimos então de um documento simples.
{
"storeId": 1,
"customerIn": 2,
"customerOut":1,
"customerAvg": 15
}
E chegamos em um documento enriquecido com mais informações.
{
"storeId": 1,
"storeName": "AMS",
"storeCity": "Amsterdam",
"storeCountry": "Netherlands",
"customerIn": 2,
"customerOut":1,
"customerAvg": 15
}
conclusão
Podemos ver que o filtro translate
nos permite enriquecer os documentos de uma forma bem simples, mas ele tem algumas limitações.
Como os dicionários utilizados pelo translate
são carregados em memória quando o logstash inicia, o tamanho dos dicionários pode impactar na performance do logstash como um todo.
A documentação da elastic informa que o filtro é testado internamente usando dicionários com cerca de 100.000 pares chave-valor, mas além da quantidade de chaves, o tamanho dos valores também influencia.
Se precisarmos utilizar dicionários muito grandes, talvez seja interessante verificar outras formas de realizar esse enriquecimento nos dados, seja fazendo um pré-processamento antes do dado entrar no logstash, seja utilizando outros filtros que realizam consultas externas como o filtro para consultar o elasticsearch
ou o memcached
.
Mais informações sobre o filtro translate
podem ser encontradas na documentação oficial.