logstash: geolocalização com geoip
Um ponto importante no monitoramento de dispositivos, aplicações ou sistemas expostos na internet, é incluir informações sobre a origem dos acessos ou tentativas de acesso que recebemos.
Isso é útil tanto pelo lado da infraestrutura e segurança quanto pelo lado do negócio. Por exemplo, sabendo a origem e quantidade dos acessos, ou tentativas, podemos planejar melhor a distribuição geográfica de servidores, detectar acessos comprometidos ou tentativas de ataque e identificar novas oportunidades de negócios em localidades diferentes.
Uma forma bem simples de se fazer isso quando temos a pilha elastic como uma das ferramentas de monitoramento é utilizar o filtro geoip
do logstash.
o filtro geoip
O filtro geoip
tem uma função bem simples, ele consulta um endereço IP em uma base interna, identifica a geolocalização do mesmo e retorna alguns campos como o país, o código do país, a cidade, as coordenadas geográficas, o continente e alguns outros.
No caso do filtro geoip
utilizado pelo logstash, a base interna onde as informações de geolocalização são consultadas é a GeoLite2
, fornecida pela maxmind, e ao utilizarmos o filtro em um endereço IP é possível obter, além das informações geográficas, as informações referentes ao Sistema Autônomo (AS) associadas ao roteamento.
Para utilizar o filtro geoip
você precisa que o seu evento tenha um campo onde o valor é um endereço IP público e precisa também criar um mapeamento específico no seu índice para armazenar campos com informações de geolocalização.
aplicando o filtro
Como exemplo para aplicação do filtro geoip
vou utilizar uma API simples que desenvolvi em Go e que retorna o estado das conexões de uma máquina linux, emulando parte do funcionamento do netstat.
Quando consultada a API retorna um documento JSON no seguinte formato.
{
"srcip":"10.0.1.100",
"srcport":56954,
"dstip":"151.101.192.133",
"dstport":443,
"status":"ESTABLISHED"
}
O campo srcip
corresponde ao IP local da máquina e o campo dstip
corresponde ao IP externo da conexão, é esse campo que iremos utilizar com o filtro geoip
.
Para consumir essa API com o logstash, vou utilizar como input
o filtro http_poller
, que basicamente fica fazendo requisições para um endpoint configurado em um intervalo específico de tempo.
input {
http_poller {
urls => { "api" => "http://10.0.1.100:5000/netstat" }
schedule => { "every" => "30s"}
}
}
A forma como os dados são recebidos pelo logstash não faz diferença pro filtro
geoip
, você só precisa de um campo com um IP público.
A configuração mais simples do filtro geoip
exige apenas uma opção obrigatória, source
, que define qual é o campo com o IP que queremos descobrir a geolocalização.
filter {
geoip {
source => "dstip"
}
}
Quando o evento passa por esse filtro com sucesso, um novo campo chamado geoip
é adicionado ao evento.
"geoip" => {
"country_code2" => "IS",
"continent_code" => "EU",
"city_name" => "Reykjavik",
"country_code3" => "IS",
"region_code" => "1",
"timezone" => "Atlantic/Reykjavik",
"region_name" => "Capital Region",
"location" => {
"lon" => -21.9466,
"lat" => 64.1432
},
"latitude" => 64.1432,
"ip" => "31.209.137.10",
"country_name" => "Iceland",
"postal_code" => "101",
"longitude" => -21.9466
}
Em caso de falha, a tag _geoip_lookup_failure
será adicionada.
Podemos alterar esse comportamento padrão utilizando outras opções na configuração do filtro.
target
: nome do campo para salvar as informações de geolocalização, o padrão é o campogeoip
default_database_type
: tem apenas duas opçõesCity
eASN
, a primeira é o padrão e traz informações geográficas, a segunda traz informações do Sistema Autônomo (AS) associado.fields
: os campos que serão retornados, por padrão traz todos os disponíveis.tag_on_failure
: o nome da tag que será adicionada ao evento em caso de falha, por padrão é_geoip_lookup_failure
.
Como a ideia é enriquecer os eventos, vamos utilizar dois filtros geoip
na sequência, um com a opção default_database_type
igual a City
e outro com a mesma opção configurada como ASN
.
filter {
geoip {
default_database_type => "City"
source => "dstip"
target => "geo"
tag_on_failure => ["geoip-city-failed"]
}
geoip {
default_database_type => "ASN"
source => "dstip"
target => "geo"
tag_on_failure => ["geoip-asn-failed"]
}
}
filtrando endereços privados
Quando o campo que vamos utilizar como origem para o filtro geoip
pode ter também endereços IPs privados, precisamos filtrar esses IPs de alguma forma.
Um jeito simples de evitar que o IPs privados passem pelo filtro geoip
é utilizar um condicional para adicionar uma tag nos eventos que tem IP privado e um outro condicional para limitar a aplicação do filtro geoip
somente aos eventos sem essa tag.
Nesse exemplo será preciso filtrar a rede 10.0.1.0/24
, o IP de localhost 127.0.0.1
e o IP de roteamento local 0.0.0.0
.
if [dstip] =~ "^10.0.*" or [dstip] =~ "^127.0.*" or [dstip] == "0.0.0.0" {
mutate {
add_tag => ["internal"]
}
}
Agora todos os eventos que tem como destino um IP privado terão a tag internal
e podemos utilizar essa tag
para evitar que eles passem pelo filtro de geoip
.
if "internal" not in [tags] {
geoip {
default_database_type => "City"
source => "dstip"
target => "geo"
tag_on_failure => ["geoip-city-failed"]
}
geoip {
default_database_type => "ASN"
source => "dstip"
target => "geo"
tag_on_failure => ["geoip-asn-failed"]
}
}
mapeamento
Antes de podermos analisar os dados no elasticsearch e criarmos mapas no kibana, é preciso criar um mapeamento para indicar os tipos de dados de cada campo.
Embora o elasticsearch consiga identificar e criar um mapeamento na hora da ingestão, isso não é suficiente para os campos de geolocalização, esses campos precisam estar definidos antes da criação do índice.
A parte obrigatória do mapeamento é a que define o campo geo.location
como sendo do tipo geo_point
, portanto se utilizarmos um índice chamado endpoints
, poderíamos apenas criar o índice e aplicar o mapamento para esse campo.
PUT /endpoints
PUT /endpoints/_mapping
{
"properties": {
"geo": {
"properties": {
"location": {
"type": "geo_point"
}
}
}
}
}
Dessa forma garantimos que o campo geo.location
será do tipo geo_point
e deixaremos que o elasticsearch crie o mapeamento dos outros campos quando for indexar o primeiro documento.
Embora não seja nenhum problema trabalhar assim, o ideal é criar um template para o nosso índice já definindo os tipo para cada campo.
No nosso examplo poderíamos utilizar o seguinte template.
PUT _template/endpoints
{
"order" : 0,
"version" : 1,
"index_patterns" : [ "endpoints" ],
"settings" : {
"index" : {
"mapping" : {
"ignore_malformed" : "true"
},
"refresh_interval" : "5s",
"number_of_shards" : "1",
"number_of_replicas" : "0"
}
},
"mappings" : {
"properties" : {
"@timestamp" : {
"format" : "strict_date_optional_time||epoch_millis",
"type" : "date"
},
"@version" : { "type" : "keyword" },
"status" : { "type" : "keyword" },
"srcip" : { "type" : "ip" },
"srcport" : { "type" : "keyword" },
"dstip" : { "type" : "ip" },
"dstport" : { "type" : "keyword" },
"geo" : {
"properties" : {
"as_org" : { "type" : "keyword" },
"asn" : { "type" : "keyword" },
"country_code2" : { "type" : "keyword" },
"country_code3" : { "type" : "keyword" },
"country_name" : { "type" : "keyword" },
"continent_code" : { "type" : "keyword" },
"city_name" : { "type" : "keyword" },
"region_code" : { "type" : "keyword" },
"region_name" : { "type" : "keyword" },
"postal_code" : { "type" : "keyword" },
"ip" : { "type" : "ip" },
"location" : { "type" : "geo_point" },
"latitude" : { "type" : "float" },
"longitude" : { "type" : "float" },
"timezone" : { "type" : "keyword" }
}
},
"message" : { "type" : "text" },
"tags" : { "type" : "keyword"}
}
}
}
Como o template só é aplicado durante a criação do índice, caso o índice tenha sido criado antes, é preciso deletá-lo com o request
DELETE endpoints
Com o template criado, podemos adicionar o output para o elasticsearch no pipeline.
output {
elasticsearch {
hosts => ["http://elk:9200"]
index => "endpoints"
}
}
visualizando os dados e criando mapas
Assim que iniciamos o logstash com o pipeline configurado, os dados começarão a ser coletados e enviados para o elasticsearch, após a criação do index pattern, podemos visualizar os dados já com a gelocalização no kibana.
Enquanto o nosso evento inicial trazia apenas a informação do IP de destino, utilizando o filtro de geoip
conseguimos enriquecer o documento adicionando informações relacionadas a esse IP de destino.
O campo geo.country_name
é um exemplo de informação gerada pelo filtro geoip
com a opção default_database_type
como City
e o campo geo.as_org
é um exemplo de informação gerada quando usamos default_database_type
como ASN
, esse é o motivo para termos utilizado dois filtros geoip
no pipeline.
Podemos ainda visualizar graficamente esses dados utilizando a ferramenta Maps dentro kibana.
Se antes no nosso monitoramento tínhamos somente a informação do IP de destino, agora temos uma ideia de onde está esse destino e podemos visualizá-lo em um mapa.
Um ponto importante que devemos considerar é que a precisão das coordenadas obtidas pelo filtro geoip
não é exata e pode variar de acordo com a localização geográfica, sendo mais precisas em alguns países, o tipo de conexão, se é um IP de uma conexão fixa ou móvel, e o nível de zoom.
Esse link no site da maxmind permite ter uma estimativa da precisão em cada caso.
O resultado do filtro geoip nunca deve ser considerado como exato e pode não corresponder com a realidade.
Para mais informações sobre o filtro geoip
, você pode consultar a documentação oficial da elastic.