Featured image of post Message Queue with Golang

Message Queue with Golang

Learn Message Queue with Golang

Motivation

During my work in Geniox mobile, I face responsibility to maintain Golang service that handle millions of request a day. Therefore, I have to undersand message queue concept to do this job.

Message Queue (MQ)

A message queue (MQ) is a form asynchronous service-to-service used in serverless and microservices architectures (or distributed systems) to communicate with one another. (source Baeldung, aws).

This term consist of two words, message and queue.

Message

The message might be:

  • data needs to get transmitted
  • types of data formats

Queue

  • kind of a line of things that a process sequentially.

Implementing MQ with Golang

This snippet I created based on this article from Vultr.

Prerequisites

To follow this tutorrial, you need to have

  • Docker for creating MySQL database and Redis container
  • Golang Package

Database Container

We will run container instances for Redis and MySQL. First, clone this repo https://github.com/aysf/example, then go to redis folder, pick the simple one redis-with-config, and run up the docker-compose.

For MySQL, do the similar steps as above.

Prepare SQL Database

  1. Enter to docker container docker exec -it mysqldb /bin/sh
  2. Run mysql -u root -p. If it doesn’t work, prepend sudo on the command and try again.
  3. Enter root password that you’ve set on docker-compose.yaml and press enter
  4. Create web_payments database by using
1
2
3
4
mysql> CREATE DATABASE web_payments;
       CREATE USER 'web_payments_user'@'localhost' IDENTIFIED WITH mysql_native_password BY 'EXAMPLE_PASSWORD';
       GRANT ALL PRIVILEGES ON web_payments.* TO 'web_payments_user'@'localhost';
       FLUSH PRIVILEGES;
  1. Switch to the new web_payments database: mysql> USE web_payments;
  2. Create a payments table
1
2
3
4
5
6
7
8
9
mysql> CREATE TABLE payments (
           payment_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
           payment_date DATETIME,
           first_name VARCHAR(50),
           last_name VARCHAR(50),
           payment_mode VARCHAR(255),
           payment_ref_no VARCHAR (255),
           amount DECIMAL(17,4)
       ) ENGINE = InnoDB;
  1. logout from MySQL server: mysql> QUIT;

Prepare Golang Apps

This app is simple without app initialization go mod init app. The structure can be seen

1
2
3
4
5
6
7
payment_gateway
	|-- client
	|	└-- -- main.go
	|-- queue
	|	└- -- main.go
	└-- worker
		└-- -- main.go 

Client

Client is used for creating request simulation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import (
	"context"
	"log"
	"net/http"
	"strings"
	"sync"
)

func main() {

	log.Println("client running...")

	ctx := context.TODO()
	var wg sync.WaitGroup

	wg.Add(1)
	go job(ctx, &wg)
	wg.Wait()

	log.Println("job end")

}

func job(ctx context.Context, wg *sync.WaitGroup) {

	for {

		select {
		case <-ctx.Done():
			wg.Done()
		default:
			generateRequest()
		}

	}
}

func generateRequest() {
	jsonString := `{"first_name": "MARY", "last_name": "SMITH", "payment_mode": "CHEQUE", "payment_ref_no": "985", "amount" : 985.65}`

	j := strings.NewReader(jsonString)

	resp, err := http.Post("http://127.0.0.1:8080/payments", "application/json", j)
	if err != nil {
		log.Println(err)
	} else {
		log.Println("request generated...")
		log.Println("response: ", resp.StatusCode)
	}

}

Queue

Queue is used for handling the massive requests from client and enqueue the request data to Redis.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
	"bytes"
	"context"
	"fmt"
	"net/http"

	"github.com/go-redis/redis/v8"
)

func main() {
	http.HandleFunc("/payments", paymentsHandler)
	http.ListenAndServe(":8080", nil)
}

func paymentsHandler(w http.ResponseWriter, req *http.Request) {

	redisClient := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "",
		DB:       0,
	})

	ctx := context.TODO()

	buf := new(bytes.Buffer)

	// Include a Validation logic here to sanitize the req.Body when working in a production environment

	buf.ReadFrom(req.Body)

	paymentDetails := buf.String()

	err := redisClient.RPush(ctx, "payments", paymentDetails).Err()

	if err != nil {
		fmt.Fprintf(w, err.Error()+"\r\n")
	} else {
		fmt.Fprintf(w, "Payment details accepted successfully\r\n")
	}

}

Worker

Worker task is to process and dequeue data from Redis. This ‘mini-service’ have to connect to database and insert the request to database.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
package main

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
	"log"
	"strconv"
	"strings"
	"time"

	"github.com/go-redis/redis/v8"
	_ "github.com/go-sql-driver/mysql"
)

func main() {

	ctx := context.TODO()

	redisClient := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "",
		DB:       0,
	})

	for {

		result, err := redisClient.BLPop(ctx, 0*time.Second, "payments").Result()

		if err != nil {
			fmt.Println(err.Error())
		} else {

			params := map[string]interface{}{}

			err := json.NewDecoder(strings.NewReader(string(result[1]))).Decode(&params)

			if err != nil {
				fmt.Println(err.Error())
			} else {

				paymentId, err := savePayment(params)

				if err != nil {
					fmt.Println(err.Error())
				} else {
					fmt.Println("Payment # " + strconv.FormatInt(paymentId, 10) + " processed successfully.\r\n")
				}
			}

		}
	}
}

func savePayment(params map[string]interface{}) (int64, error) {

	db, err := sql.Open("mysql", "root:custompwd@tcp(127.0.0.1:6033)/web_payments")

	if err != nil {
		return 0, err
	}

	log.Println("successfully connect to db")

	defer db.Close()

	queryString := `insert into payments (
                        payment_date,
                        first_name,
                        last_name,
                        payment_mode,
                        payment_ref_no,
                        amount
                    ) values (
                        ?,
                        ?,
                        ?,
                        ?,
                        ?,
                        ?
                    )`

	stmt, err := db.Prepare(queryString)

	if err != nil {
		return 0, err
	}

	defer stmt.Close()

	paymentDate := time.Now().Format("2006-01-02 15:04:05")
	firstName := params["first_name"]
	lastName := params["last_name"]
	paymentMode := params["payment_mode"]
	paymentRefNo := params["payment_ref_no"]
	amount := params["amount"]

	res, err := stmt.Exec(paymentDate, firstName, lastName, paymentMode, paymentRefNo, amount)

	if err != nil {
		return 0, err
	}

	paymentId, err := res.LastInsertId()

	if err != nil {
		return 0, err
	}

	return paymentId, nil
}

Run Apps

Open three terminals then run the worker, queue, and client

run on terminal

Checking Database

Now close the worker first, then follow with other services. Once the worker closed, the client still can send request to queue but the request stuck on the Redis.

Check Redis

To check data stucked in Redis, we can use Redis GUI. AnotherRedisDesktopManager is one of my favourite.

data stuck on Redis

To check number of data stored on list, use LLEN payments in redis-cli

cli on Redis GUI

To clear all data in cache use FLUSHALL or FLUSHDB in redis-cli

Check MySQL

To check data that already written in SQL, follow this command line

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
~ 👉 docker exec -it mysqldb /bin/sh
sh-4.2# mysql -u root -p
Enter password: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 4763
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> SHOW databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| appdb              |
| mysql              |
| performance_schema |
| sys                |
| web_payments       |
+--------------------+
6 rows in set (0.04 sec)

mysql> USE web_payments;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> SELECT * FROM payments;

Then you’ll see tons of rows of request already recorded. If you wanna see only number of rows run SELECT COUNT(*) FROM payments;. If you want to clean up all the records, just run DELETE FROM payments;

Next Question

This simple service is good for learning purpose but still not enough to use in production. In the nest article, we’ll create five instances and Go apps with concurency feature.

SFTP architecture


References:

comments powered by Disqus
Built with Hugo
Theme Stack designed by Jimmy