Python + Elasticsearch

Feb 02, 2018 3 min leitura
  • Tags:
  • elasticsearch
  • python
Atenção: esse post foi escrito a algum tempo e pode não ser mais válido, ok?

Então você tem muitos documentos e precisa buscar com rapidez, certo? Ou você tem muitos dados e precisa processar e extrair métricas. De qualquer jeito, Elasticsearch (ES) pode ser um poderoso motor para ajudá-lo a indexar, buscar e extrair métricas dos seu armazenamento orientado a documentos. Este post é bem direto e pretende apresentar como usar a linguagem Python para interagir com o motor e indexar, retornar e buscar documentos.

A comunidade python desenvolveu dois projetos bem conhecidos: elasticsearch-py e elasticsearch-dsl. Enquanto o primeiro provê algumas ferramentas para interagir com ES e, na minha humilde opinião, para ter um controle mais granular das ações, o último foi criado para ajudá-lo com a busca e a persistência.

Conexão

A primeira questão é: como eu conector com o ES? Ao usar elasticsearch-dsl você pode criar uma conexão default que será usada globalmente:

connections.create_connection(hosts=['localhost'])

Entretando, você pode querer usar um cliente e ter um controle mais granual. Ao usar o elasticsearch-py você consegue isso:

from elasticsearch import Elasticsearch

client = Elasticsearch(hosts, http_auth=(username, password), **kwargs)

Execute client.indices.get_alias("*") para retornar os índices existentes e checar se está propriamente configurado.

Persistência

Armazenar documentos é fácil pois elasticsearch-dsl provê DocType – uma classe que cuida de mapear sua classe python para documentos JSON. Ao invés de preocupar-se com estruturas JSON, vamos criar um documento que armazena o o acesso de um usuário a uma página específica:

from elasticsearch_dsl import DocType, Integer, Date, Keyword


class UserHit(DocType):
    page = Keyword()
    datetime = Date()
    user_id = Integer()
    environment = Keyword()

Preste atenção aos campos que escolhemos: Integer, Date, Keyword. Eles serão mapeados para o motor do Elasticsearch, o que significa que você pode usar algumas features específicas. Por exemplo, o campo datetime pode ser usado para buscar um intervalo de datas ou agregar os dados por minutos, horas, dias, meses. Outro detalhe é o campo environment: é uma solução para integrar o ES com diferentes ambientes (homologação, desenvolvimento e produção). Dessa forma você não corre o risco de misturar dados falsos com os de produção.

Atualizado em 4 de Fev:: Uma outra estratégia é criar índices concatenados com o ambiente de produção. Ao utilizar uma variável de ambiente sua aplicação pode criar diferentes índices (e.g. myindex-2018.02.01-production, myindex-2018.02.01-staging, myindex-2018.02.01-development). Obrigado pela contribuição, Robson Peixoto.

Uma vez que você criou a classe, a indexação se torna fácil:

user_hit = UserHit(
    page='product-list',
    datetime=datetime.now(),
    user_id=10,
    environment='production'
)

UserHit.init(using=client, index=index_name)

# Indexed can be True or False depending on the operation success
indexed = user_hit.save(using=client, index='myindex-2018.02.01')

Você tem que ser atento a duas questões: (i) antes de usar o documento você precisa garantir que o mapeamento no Elasticsearch está criado e é por isso que nós temos que usar o método init na linha 8; (ii) o retorno da operação uma vez que o método .save pode retornar ou True ou False.

Buscar

Como buscar os documentos? O trecho de código abaixo ilustra um exemplo simples.

from elasticsearch_dsl import Search
from elasticsearch import ElasticsearchException


search = (Search(using=client, index='myindex-2018.02.01')
          .sort('datetime')
          .query('match', page='product-list')
          .query('match', user_id=12)
          .query('match', environment='production'))

count = search.count()
search = search[0:count]

response = search.execute()
if not response.success():
    raise ElasticsearchException('Fail to get the hits')

return response.hits

É importante mencionar que o ES traz apenas 10 resultados por padrão e é por isso que precisamos das linhas 11 e 12 .

Filtrando Resultados

search = (Search(using=client, index='myindex-2018.02.01')
          .sort('datetime')
          .query('match', page='product-list')
          .query('match', user_id=12)
          .query('match', environment='production'))

search = search.filter('range', datetime={'gte': from_datetime, 'lte': to_datetime, 'time_zone': time_zone_delta})

count = search.count()
print(count)

search = search[0:count]

response = search.execute()
print(response.hits)

Você acabou de buscar, mas agora quer filtrar os resultados por um intervalo de data. A linha 7 faz o truque.

Agregando

Você pode gerar métricas baseadas em data, por exemplo. A linha 12 diz para o ES agrugar os dados por intervalos de 30 minutos.

from elasticsearch_dsl import Search
from elasticsearch import ElasticsearchException


search = (Search(using=client, index='myindex-2018.02.01')
          .sort('datetime')
          .query('match', page='product-list')
          .query('match', user_id=12)
          .query('match', environment='production'))

search = search.filter('range', datetime={'gte': from_datetime, 'lte': to_datetime, 'time_zone': time_zone_delta})
search.aggs.bucket('datetime', 'date_histogram', field='datetime', interval='30m')

count = search.count()
search = search[0:count]

res = search.execute()
if not response.success():
    raise ElasticsearchException('Fail to get the hits')

return res.aggs['datetime']['buckets']

Comentários