Go, ElasticSearch and RabbitMQ full-text search microservice in k8s👋✨💫

Alexander Bryksin
DevOps.dev
Published in
7 min readAug 16, 2022

--

👨‍💻 Full list of what has been used:

Elasticsearch Elasticsearch client for Go
RabbitMQ Go RabbitMQ Client Library
Jaeger open source, end-to-end distributed tracing
Prometheus monitoring and alerting
Grafana for to compose observability dashboards with everything from Prometheus
Echo web framework
Kibana Kibana is a user interface that lets you visualize your Elasticsearch
Docker and docker-compose
Kubernetes K8s
Helm The package manager for Kubernetes

Source code you can find in the GitHub repository. The main idea of this project is the implementation of a full-text search with support for synonyms, mistyping, and the wrong keyboard layout using Elasticsearch and RabbitMQ.

All UI interfaces will be available on ports:

Grafana UI: http://localhost:3005

Kibana UI: http://localhost:5601/app/home#/

RabbitMQ UI: http://localhost:15672

Jaeger UI: http://localhost:16686

Prometheus UI: http://localhost:9090

Docker-compose file for this project:

version: "3.9"services:
rabbitmq:
image: rabbitmq:3.9-management-alpine
container_name: rabbitmq
restart: always
ports:
- "5672:5672"
- "15672:15672"
networks: [ "microservices" ]
node01:
image: docker.elastic.co/elasticsearch/elasticsearch:8.3.3
container_name: node01
restart: always
environment:
- node.name=node01
- cluster.name=es-cluster-8
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.license.self_generated.type=basic
- xpack.security.enabled=false
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- ./es-data01:/usr/share/elasticsearch/data
ports:
- "9200:9200"
- "9300:9300"
networks: [ "microservices" ]
kibana:
image: docker.elastic.co/kibana/kibana:8.3.3
restart: always
environment:
ELASTICSEARCH_HOSTS: http://node01:9200
ports:
- "5601:5601"
depends_on:
- node01
networks: [ "microservices" ]
jaeger:
container_name: jaeger_container
restart: always
image: jaegertracing/all-in-one:1.35
environment:
- COLLECTOR_ZIPKIN_HTTP_PORT=9411
ports:
- "5775:5775/udp"
- "6831:6831/udp"
- "6832:6832/udp"
- "5778:5778"
- "16686:16686"
- "14268:14268"
- "14250:14250"
- "9411:9411"
networks: [ "microservices" ]
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
command:
- --config.file=/etc/prometheus/prometheus.yml
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
networks: [ "microservices" ]
node_exporter:
container_name: node_exporter_container
restart: always
image: prom/node-exporter
ports:
- '9101:9100'
networks: [ "microservices" ]
grafana:
container_name: grafana_container
restart: always
image: grafana/grafana
ports:
- '3005:3000'
networks: [ "microservices" ]
volumes:
es-data01:
driver: local
networks:
microservices:
name: microservices

Full-text search with auto-completion can be realized in different ways, it’s up to you to choose which is better for your case. First, we have to create mappings for our index, of course, elasticsearch can create it for us, but it’s not a production solution. As the data model used here simple abstract shopping product item with searchable title, description, and shop name fields, for this example, it’s enough. The creation of the right mapping is very important and tricky, to realize a full-text search with synonyms, mistyping, and wrong keyboard layout we will configure our own analyzer, which combines the character filters, tokenizer, and token filters.

Our ngram_filter is elastic builtin edge_ngram filter, for understanding how it works highly recommend read elasticsearch documentation, We have to assign min and max values, it always depends on each unique case. Next, let’s specify name_synonym_filter, in the synonyms field, we have to add an array of strings, each string is a comma separate synonyms which will be used for search.

In mapping, properties let’s create our document fields mappings, for the full-text search field is “type”: “text”, as an analyzer our newly created autocomplete_analyzer, it’s used when elastic index documents, and for search queries, we don’t need so complex analyzer it will only slow down our search queries, so we add another one search_analyzer: standard Handling of wrong keyboard language layout we can implement in different ways, on elasticsearch side by synonyms, but it makes mappings to huge and on the application level by creating a mapping for keyboard language then map users search term to apposite language and send to elastic query for search any one of it, so, for example, search the words “Apple, apple, яблоко, Яблоко, z,kjrj, Z,kjrj, Фззду, фззду” will find the same for the all of it. In real-world scenarios usually mapping is much more complicated but here for example it’s enough:

For Go available two good libraries for Elasticsearch, the official Elasticsearch client and another one from community olivere elastic, both are good, but at this moment only the official client supports 8 version of Elasticsearch and for serious production think it’s the choice.

Our microservice interacts by HTTP using Echo web framework and RabbitMQ official client, REST controller has index document and search methods:

The useCase for IndexAsync method serializes data and publishes it to RabbitMQ:

The RabbitMQ consumer is listening queue and processing messages using BulkIndexer for Bulk API which has better performance for indexing documents.

The repository has the same methods for index and search. For the search method, we use should multi_match query where we pass the original term and mapped to the opposite keyboard language layout search term. Good practice for Elasticsearch is always to use an alias for indexes. The implementation for keyboard language layout converter is to load JSON file with mappings when the application starts, marshal it to the map and have one method for converting one language to another.

For Kubernetes prefer to use Minikube with helm, RabbitMQ, Elasticsearch, Jaeger, Kibana used the same images as the docker-compose file has, and prometheus community chart, for correct working we have to add ServiceMonitor component for our microservice. Working with k8s personally like to use lens it has a friendly UI and many useful features.

apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.searchMicroserviceName }}
labels:
app: {{ .Values.searchMicroserviceName }}
spec:
replicas: {{ .Values.searchMicroserviceReplicas }}
template:
metadata:
name: {{ .Values.searchMicroserviceName }}
labels:
app: {{ .Values.searchMicroserviceName }}
spec:
containers:
- name: {{ .Values.searchMicroserviceName }}
image: {{.Values.searchMicroserviceImage }}
imagePullPolicy: Always
resources:
requests:
memory: {{.Values.resources.requests.memory }}
cpu: {{.Values.resources.requests.cpu }}
limits:
memory: {{.Values.resources.limits.memory }}
cpu: {{.Values.resources.limits.cpu }}
livenessProbe:
httpGet:
path: {{.Values.searchMicroserviceLivenessProbePath }}
port: {{.Values.searchMicroserviceLivenessProbePort }}
initialDelaySeconds: {{ .Values.searchMicroserviceInitialDelaySeconds }}
periodSeconds: {{ .Values.searchMicroservicePeriodSeconds }}
readinessProbe:
httpGet:
path: {{.Values.searchMicroserviceReadinessProbePath }}
port: {{.Values.searchMicroserviceReadinessProbePort }}
initialDelaySeconds: {{ .Values.searchMicroserviceInitialDelaySeconds }}
periodSeconds: {{ .Values.searchMicroservicePeriodSeconds }}
ports:
- containerPort: {{.Values.searchMicroserviceHttpPort }}
name: http
- containerPort: {{.Values.searchMicroserviceMetricsPort }}
name: metrics
- containerPort: {{.Values.searchMicroserviceHealthcheckPort }}
name: healthcheck
env:
- name: JAEGER_HOST_PORT
value: {{ .Values.jaegerHotPost }}
- name: ELASTIC_URL
value: {{ .Values.elasticSearchURL }}
- name: RABBITMQ_URI
value: {{ .Values.rabbitMqURI }}
- name: CONFIG_PATH
value: "/search-config/search-config.yaml"
volumeMounts:
- name: config
mountPath: "/search-config"
restartPolicy: Always
terminationGracePeriodSeconds: {{ .Values.searchMicroserviceTerminationGracePeriodSeconds }}
volumes:
- name: config
configMap:
name: {{ .Values.searchMicroserviceName }}-config-map
items:
- key: search-config.yaml
path: search-config.yaml
selector:
matchLabels:
app: {{ .Values.searchMicroserviceName }}

---

apiVersion: v1
kind: Service
metadata:
name: {{ .Values.searchMicroserviceName }}-service
labels:
app: {{ .Values.searchMicroserviceName }}
spec:
type: ClusterIP
selector:
app: {{ .Values.searchMicroserviceName }}
ports:
- name: http
port: {{.Values.searchMicroserviceHttpPort }}
protocol: TCP
- name: healthcheck
port: {{.Values.searchMicroserviceHealthcheckPort }}
protocol: TCP
targetPort: metrics
- name: metrics
port: {{.Values.searchMicroserviceMetricsPort }}
protocol: TCP
targetPort: metrics

---

apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
labels:
release: monitoring
name: {{ .Values.searchMicroserviceName }}-service-monitor
namespace: default
spec:
selector:
matchLabels:
app: {{ .Values.searchMicroserviceName }}
endpoints:
- interval: 10s
port: metrics
path: {{.Values.prometheusPath }}
namespaceSelector:
matchNames:
- default

---

apiVersion: v1
kind: ConfigMap
metadata:
name: {{ .Values.searchMicroserviceName }}-config-map
data:
search-config.yaml: |
serviceName: search_microservice
grpc:
port: :5001
development: true
http:
port: :{{ .Values.searchMicroserviceHttpPort }}
development: {{ .Values.http.development }}
basePath: {{ .Values.http.basePath }}
productsPath: {{ .Values.http.productsPath }}
debugErrorsResponse: {{ .Values.http.debugErrorsResponse }}
ignoreLogUrls: {{ .Values.http.ignoreLogUrls }}
probes:
readinessPath: {{ .Values.searchMicroserviceReadinessProbePath }}
livenessPath: {{ .Values.searchMicroserviceLivenessProbePath }}
port: :{{ .Values.searchMicroserviceHealthcheckPort }}
pprof: :6001
prometheusPath: {{ .Values.prometheusPath }}
prometheusPort: :{{.Values.searchMicroserviceMetricsPort }}
checkIntervalSeconds: 10
logger:
level: {{ .Values.searchMicroserviceLogging.level }}
devMode: {{ .Values.searchMicroserviceLogging.devMode }}
encoder: {{ .Values.searchMicroserviceLogging.encoder }}
jaeger:
enable: true
serviceName: {{ .Values.searchMicroserviceName }}
hostPort: {{ .Values.jaegerHotPost }}
logSpans: false
timeouts:
postgresInitMilliseconds: 1500
postgresInitRetryCount: 3
elasticSearch:
addresses: [ {{ .Values.elasticSearchURL }} ]
username: ""
password: ""
apiKey: ""
enableLogging: false
elasticIndexes:
products:
path: {{ .Values.elasticIndexes.products.path }}
name: {{ .Values.elasticIndexes.products.name }}
alias: {{ .Values.elasticIndexes.products.alias }}
rabbitmq:
uri: {{ .Values.rabbitMqURI }}
exchangeAndQueueBindings:
indexProductBinding:
exchangeName: {{.Values.exchangeAndQueueBindings.indexProductBinding.exchangeName }}
exchangeKind: {{.Values.exchangeAndQueueBindings.indexProductBinding.exchangeKind }}
queueName: {{.Values.exchangeAndQueueBindings.indexProductBinding.queueName }}
bindingKey: {{.Values.exchangeAndQueueBindings.indexProductBinding.bindingKey }}
concurrency: {{.Values.exchangeAndQueueBindings.indexProductBinding.concurrency }}
consumer: {{.Values.exchangeAndQueueBindings.indexProductBinding.consumer }}
bulkIndexer:
numWorkers: {{ .Values.bulkIndexer.numWorkers }}
flushBytes: {{ .Values.bulkIndexer.flushBytes }}
flushIntervalSeconds: {{ .Values.bulkIndexer.flushIntervalSeconds }}
timeoutMilliseconds: {{ .Values.bulkIndexer.timeoutMilliseconds }}

More details and source code of the full project you can find here, of course, in real-world applications, full-text search, and business requirements can be much more complicated and for example includes machine learning, etc. I hope this article is usefully and helpfully, and be happy to receive any feedback or questions, feel free to contact me by email or any messengers :)

--

--