Golang : Implementation RabbitMQ

Okky Muhamad Budiman
4 min readSep 14, 2020
https://www.rabbitmq.com/

Message Broker sudah tidak asing lagi di dalam pengembangan suatu sistem yang menerapkan microservices sebagai architecture utamanya, karena microservices bisa terdiri dari beberapa service maupun database, maka dari itu dibutuhkan suatu event sebagai komunikasi antar services.

Disini kita menggunakan RabbitMQ sebagai message broker dan akan menjelaskan cara membuat wrapper nya dalam bahasa pemrograman Golang.

Adapaun pemilihan RabbitMQ itu dikarenakan ringan dan mudah dalam penerapannya, juga RabbitMQ sendiri bisa running di beberapa operating systems maupun cloud envirenments.

Mengenai dokumentasi RabbitMQ bisa dibaca di website resminya https://www.rabbitmq.com

Hal pertama yang dilakukan adalah inisialisasi project golang dengan menggunakan go mod sebagai collection module yang disarankan di dokumentasi Golang sendiri.

go mod init

kemudian install package RabbitMQ dengan menggunakan perintah berikut

go get -u github.com/streadway/amq

Sebelum kita implementasi RabbitMQ, buatlah suatu folder bernama infra/mq di dalam proyek anda sehingga penerapan wrapper nya akan dilakukan didalam folder tersebut.

Buatlah file bernama mq.go yang akan kita gunakan untuk implement wrapper dari RabbitMQ.

Untuk wrapper sendiri kurang lebih seperti berikut :

package mq

import (
"bytes"
"github.com/streadway/amqp"
"go-rabbitmq/config"
"log"
"time"
)
type mqService struct {
MQConfig config.MQConfiguration
conn *amqp.Connection
channel *amqp.Channel
qosConf ConfigQOS
}
type MQInterface interface {
// MQ Method
Connect
() error
QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args map[string]interface{}) (amqp.Queue, error) Publisher(exchanged, name string, mandatory, immediate bool, msg []byte) error Consumer(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args map[string]interface{}) chan bool
Close() error
}
func NewMQService(mqConfig config.MQConfiguration) MQInterface {
return &mqService{
MQConfig: mqConfig,
}
}

Dalam code diatas kita mencoba untuk membuat wrapper dari RabbitMQ yang dimana kita membuat MQInterface sebagai kerangka dari setiap method yang akan di implementasikan, dimana terdapat method Connect, QueueDeclare, Publisher, dan Consumer.

NewMQService digunakan sebagai inisialisasi object dari wrapper message broker itu sendiri yang dimana dalam kasus ini kita menggunakan RabbitMQ.

Adapun config.MQConfiguration sbg parameter yang di dalamnya terdapat config rabbitMQ.

package config

type MQConfiguration struct {
Dial string
}

Di dalam struct MQconfiguration terdapat dial yg berisi URL atau alamat RabbitMQ nya. contoh amqp://guest:guest@localhost:5672. Untuk config sendiri bisa disimpan di file .yml dalam contoh ini menggunakan Viper sebagai config management, bisa dilihat disini https://github.com/spf13/viper

Lanjut pada penerapan wrapper RabbitMQ, method Connect nantinya di implementasikan untuk membuat koneksi terhadap RabbitMQ itu sendiri. berikut contoh implementasinya :

func (m *mqService) Connect() error {
conn, err := amqp.Dial(m.MQConfig.Dial)
if err != nil {
log.Fatalf("%s: %s", "failed to connect rabbitMQ", err)
return err
}
m.conn = conn

ch, err := m.conn.Channel()
if err != nil {
log.Fatalf("%s: %s", "failed to open a channel", err)
return err
}
m.channel = ch

log.Printf("success connect to rabbitMQ")
return nil
}

kemudian implementasi untuk method QueueDeclare untuk mendeklarasi suatu topic :

func (m *mqService) QueueDeclare(
name string,
durable,
autoDelete,
exclusive,
noWait bool,
args map[string]interface{},
) (amqp.Queue, error) {
q, err := m.channel.QueueDeclare(
name, // name
durable, // durable
autoDelete, // delete when unused
exclusive, // exclusive
noWait, // no-wait
args, // arguments
)
if err != nil {
return q, err
}
return q, nil
}

kemudian implementasi untuk method Publisher yang nantinya digunakan untuk mengirim suatu pesan :

func (m *mqService) Publisher(exchanged, name string, mandatory, immediate bool, msg []byte) error {
err := m.channel.Publish(
exchanged, // exchange
name, // routing key
mandatory, // mandatory
immediate, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: msg,
})
if err != nil {
return err
}

return nil
}

kemudian method Consumer yang akan digunakan untuk menerima/mengambil suatu pesan.

func (m *mqService) Consumer(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args map[string]interface{}) chan bool{
msgs, err := m.channel.Consume(
queue,
consumer,
autoAck,
exclusive,
noLocal,
noWait,
args,
)
if err != nil {
log.Fatalf("%s: %s", "failed to register consumer", err)
}

forever := make(chan bool)

go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()

return forever
}

dan terakhir Close untuk menutup koneksi RabbitMQ

func (m *mqService) Close() error {
if m.conn == nil {
return nil
}

return m.conn.Close()
}

Untuk contoh penerapan pada Consumer :

func runConsumer(cmd *cobra.Command, args []string) error {
// initial config
config := configs.InitConfig()

// initial MessageQueue
mq := mq.NewMQService(config.MQServer)
// connect to rabbitMQ
mq.Connect()
// declare a queue
q, _ := mq.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)

forever := mq.Consumer(
q.Name,
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

mq.Close()
return nil
}

Dan terakhir untuk penerapan Publisher :

func runPublisher(cmd *cobra.Command, args []string) error {
// initial config
config := configs.InitConfig()

// initial MessageQueue
mq := mq.NewMQService(config.MQServer)

// connect to rabbitMQ
mq.Connect()

// declare a queue
q, _ := mq.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)

// example publish messages
mq.Publisher(
"",
q.Name,
false,
false,
[]byte(bodyFrom(os.Args)),
)

mq.Close()
return nil
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}

Dengan demikian selesai sudah contoh penerapan RabbitMQ+Golang, untuk detail penerapannya bisa dilihat di repository berikut https://github.com/okiww/go-rabbitmq.

Di artikel selanjutnya kita akan bahas bagaimana menerapkan NATS dalam Golang.

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

Okky Muhamad Budiman
Okky Muhamad Budiman

Written by Okky Muhamad Budiman

Tech Enthusiast, Punk Rock Software Engineer, Hustler Harder

Responses (1)

Write a response