Redis como bróker de mensajería?
26 noviembre, 2021
8 minutos de lectura
¿Ves algún error o quieres modificar algo? Haz una Pull Request
Los brókers de mensajería permiten desacoplar el intercambio de mensajes entre las aplicaciones, haciendo uso de patrones (Pub/Sub | Queue). Son tecnologías que están dedicadas a procesar e intercambiar mensajes de datos entre aplicaciones, haciendo de mediador entre las mismas. Entre sus características podemos destacar:
- Enrutan los mensajes.
- Desacoplan los productores de mensajes de los consumidores.
- Organizan y comprueban los mensajes.
- Almacenan los mensajes.
- Presentan diversos patrones de mensajería (Colas de mensajes Queue y publicador-suscriptor Pub/Sub).
- Las colas de mensajes ofrecen mejor oportunidad para escalar el procesamiento.
- El pub/sub permite transmitir eventos de forma asíncrona.
Entre los más usados podemos nombrar Apache Kafka y RabbitMQ . De estos dos existe mucha documentación y cursos online que puede ser consultada en sus sitios oficiales.
El día de hoy estaremos hablando sobre un tipo de dato que tiene implementado redis a partir de su version 5.0. Este tipo de dato es llamado Redis Stream .
El tipo de datos stream es para realizar operaciones de anexo de datos. El cual pude ser comparado con un registro de eventos en un fichero log, al menos conceptualmente. En este caso redis Stream implementa operaciones más complejas para superar las limitaciones de los archivos logs.
Redis stream implementa características que lo hacen super interesantes:
- Presenta un conjunto de operaciones de bloqueo que permiten que los consumidores puedan esperar nuevos datos agregados a un flujo por los productores.
- Además del concepto llamado Grupos de consumidores como apache Kafka.
Redis reimplementa una idea similar en los Grupos de consumidores como apache Kafka, lo que en términos completamente diferentes, pero el objetivo es el mismo: permitir que un grupo de clientes coopere consumiendo una porción diferente del mismo flujo de mensajes.
Teniendo esta breve explicación espero que estés motivado en usar esta característica, como sí estuviésemos usando apache kafka. Para más detalles teóricos y ejemplos de los comandos utilizados en este tutorial puedes revisar el apartado de recursos.
Realizando una 🔍 en github de posibles fuentes de inspiración sobre el tema en cuestión, encontré el siguiente proyecto Helsinki Transit System - Real-Time Vehicle 🚍 Tracking with Redis el cual fue lanzado en la Redis 2021 Hackathon, este cuenta con una excelente implementación en go y redis, donde se hace uso de los tipos de datos redis stream, pub/sub y timeseries trabajando con un dataset en tiempo real.
El día de hoy teniendo como base este genial proyecto, vamos a realizar una reimplementación para el envío de eventos, provenientes de los 🚍 de transporte municipal hacia nuestra instancia redis.
⚙️ Configuración inicial de nuestro proyecto
En este caso usaremos redis, la cual es una imagen de docker configurada con esta funcionalidad.
# Download image
podman pull redislabs/redis:latest
# Run the container for our redis instance
podman run --name redis --rm -e ALLOW_EMPTY_PASSWORD=yes -p 6379:6379 redislabs/redis:latest
Nuestro repositorio de ejemplo cuenta con todos los ficheros necesarios para seguir este tutorial. Este es el estado actual de las carpetas.
├── data
│ └── pizzas.csv
├── examples
│ ├── cacheapp
│ │ └── main.go
│ ├── minibroker ✨
│ │ ├── consumer
│ │ │ └── main.go
│ │ ├── domain
│ │ │ └── vp.go
│ │ └── producer
│ │ ├── cmd
│ │ │ └── producer.go
│ │ └── main.go
│ ├── minisearch
│ │ ├── domain
│ │ │ ├── pizza.go
│ │ │ └── pizzasearch.go
│ │ ├── handlers
│ │ │ └── handlers.go
│ │ ├── http.rest
│ │ └── main.go
│ └── ratelimit
│ ├── http.rest
│ └── main.go
├── go.mod
├── go.sum
├── makefile
├── pkg
│ ├── db
│ │ └── redis.go
│ ├── httpsrv
│ │ └── httpsrv.go
│ └── mq
│ └── mqtt.go
└── README.md
Como se 👀 tenemos un nuevo proyecto llamado minibroker
, aquí se encuentran alojados nuestros servicios y será el centro de esta aventura. La idea final de este tutorial es entender como enviar datos a través de redis stream y como consumir esos datos siguiendo el patrón producer/consumer
🧑🏫 Explicación del proyecto
El proyecto minibroker
está compuesto por dos funcionalidades:
- Productor de eventos (producer)
- Consumidor de eventos (consumer)
🚇 Productor de eventos
La primera funcionalidad 🎁 que estaremos explicando tiene como objetivo subscribirse a nuestra fuente de datos en tiempo real High-frequency positioning a través de un protocolo de mensajería llamado mqtt más detalles en la descripción de su sitio oficial.
func main() {
// init project
ctx := context.Background()
rdb := db.GetRedisDbClient(context.Background())
// signal control
quitChannel := make(chan os.Signal, 1)
// init mqtt client
msgBroker := mq.NewMsgBroker(1024)
mq.InitMQTTClient(msgBroker)
// logic for mqtt sub -> send data to redis stream
for i := 0; i < nWorkers; i++ {
go cmd.SendData(ctx, msgBroker.StagingC, rdb)
}
signal.Notify(quitChannel, syscall.SIGINT, syscall.SIGTERM)
<-quitChannel
}
La función main
esta compuesta por la instancia de nuestro client mqtt, el cual una vez subscrito es capaz de recolectar la información retornando en un channel
de tipo <-chan []byte
el cuerpo del mensaje. Los métodos NewMsgBroker
y InitMQTTClient
forman parte de un nuevo paquete dentro de nuestro proyecto llamado mq
el cual se encuentra en la ruta pkg/mq/mqtt.go
este paquete fue confeccionado siguiendo el proyecto base, haciendo uso de la liberería de terceros github.com/eclipse/paho.mqtt.golang . Una vez obtenido el mensaje este es pasado por parámetro a la función SendData
.
func SendData(ctx context.Context, payload <-chan []byte, client *redis.Client) {
for msg := range payload {
// Receive the content of the MQTT message and de-serialize bytes into
// struct
e := &domain.EventHolder{}
err := mq.DeserializeMQTTBody(msg, e)
if err != nil {
log.Println(err)
continue
}
pipe := client.TxPipeline()
// // 2. XADD the full event body to a stream of events, these
value, err := e.VP.ToMAP()
if err != nil {
log.Fatal(err)
}
pipe.XAdd(
ctx, &redis.XAddArgs{
Stream: "events:vp:bus",
Values: value,
},
)
// Execute Pipe!
_, err = pipe.Exec(ctx)
// Failed to Write an Event
if err != nil {
log.Fatal(err)
}
}
}
La función SendData
además de leer los mensajes provenientes del servicio de mqtt
y parsear esa información, es la encargada de realizar la conexión con la instancia de redis, y de ejecutar el comando XADD permitiendo insertar los eventos dentro de redis Stream.
📌 Una práctica común es enviar tipos de datos como
hash map
ostring
en nuestro tipo de datos stream.
...
value, err := e.VP.ToMAP()
if err != nil {
log.Fatal(err)
}
pipe.XAdd(
ctx, &redis.XAddArgs{
Stream: "events:vp:bus",
Values: value,
},
)
...
En el fragemento de código anterior se encuentra implementado en go del comando XADD, este recibe por parámetros un redis.XAddArgs
entre sus propiedades básicas y de interés para este tutorial se destacan Stream
contiene el nombre del tópico por donde se emitirán los eventos y Value
hace mención a la data que se transmitirá.
🚇 Consumidor de eventos
La segunda funcionalidad 🎁 que estaremos explicando tiene como objetivo subscribirse a nuestra instancia de redis. Para llevar a cabo esta misión es necesario definir el nombre del tópico que se consumirá en nuestro caso events:vp:bus
y para hacer uso de las funcionalidades que provee los Grupos de consumidores definir nuestro nombre.
func main() {
ctx := context.Background()
rdb := db.GetRedisDbClient(context.Background())
// define stream and consumer groups names
streams := []string{"events:vp:bus"}
var ids []string
if groupName == "" {
groupName = "consumer-" + StringWithCharset()
}
for _, v := range streams {
ids = append(ids, ">")
err := rdb.XGroupCreate(ctx, v, groupName, "0").Err()
if err != nil {
log.Println(err)
}
}
// for each stream it requires an '>' :{"events:vp:bus", ">"}
streams = append(streams, ids...)
fmt.Printf("Consumer gruop with name: [%s]\n", groupName)
// subscribe to all streams
for {
entries, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: fmt.Sprintf("%d", time.Now().UnixNano()),
Streams: streams,
Count: 2,
Block: 0,
NoAck: false,
}).Result()
if err != nil {
log.Fatal(err)
}
// excute func
for _, stream := range entries {
ReceiveMSG(ctx, stream, rdb, groupName)
}
}
}
La función main
esta compuesta por 2 fases:
- La fase inicial es la encargada de construir de forma dinámica nuestros tópicos y nombres de grupos de consumidores permitiendo crear nuestro
XGroupCreate
. - La segunda fase es la encargada de registrar el grupo de consumidor y de leer los tópicos definidos con la función
XReadGroup
, dando paso a procesar todos losstreams
(tópicos).
func ReceiveMSG(
ctx context.Context,
stream redis.XStream,
rdb *redis.Client,
groupName string,
) {
for i := 0; i < len(stream.Messages); i++ {
messageID := stream.Messages[i].ID
values := stream.Messages[i].Values
bytes, err := json.Marshal(values)
if err != nil {
log.Fatal(err)
}
rdb.XAck(
ctx,
stream.Stream,
groupName,
messageID,
)
fmt.Printf("ConsumerGroup: [%s] data : %s\n", groupName, string(bytes))
}
}
En este punto de la función main
nos encontramos ReceiveMSG
esta recibe una estructura de datos redis.XStream
la cual contiene el mensaje proveniente de ese tópico. En nuestro caso solo lo estamos imprimiendo por la terminal de líneas de comandos. Existen otros casos de usos donde se podría hacer uso más detallado de este proceso como un envío de notificaciones hacia diversas plataformas, o seguir generando pipelines de procesamiento de datos como Helsinki Transit System - Real-Time Vehicle Tracking with Redis proyecto de base de este turorial.
DEMO
Conclusión
- A través de este tutorial, pudimos desarrollar una aplicación en go y redis, donde abordamos conceptos propios de la base de datos, como el uso de los
streams
tipo de dato muy potente que permite que nuestra instancia de redis pueda ser utilizada como bróker de mensajería. - Se pudo utilizar una libería nueva para consumir información proveniente de github.com/eclipse/paho.mqtt.golang .
- Se abordaron conceptos referentes a sistemas de alto impacto relacionados con las colas de mensajes y comunicación asíncrona en el envio de eventos.
Recursos
Código del proyecto
Aquí tienes el repositorio en GitHub con todo el código utilizado en el artículo. Por si quieres revisarlo.