Golang: Easy Fetch Millions of Data using Concurrent
Intro
As Software Engineers, we are required to give a highly powerful system with less effort and maintenance. Maybe you guys in the Backend ever have a task to fetch millions of data from doc, CSV, or some API.
The problems when fetching huge data are:
- Performance: Fetching a million records at once can be slow and resource-intensive. Consider implementing pagination or limiting the number of records retrieved in a single request. You can use techniques like cursor-based pagination or offset/limit queries to efficiently retrieve data in smaller chunks.
- Resource Utilization: Fetching a large amount of data can put a strain on your server’s resources, leading to potential crashes or slowdowns. Use asynchronous processing, load balancing, and server clustering to distribute the workload across multiple servers if necessary.
- Network Bandwidth: Transmitting a large amount of data over the network can cause performance bottlenecks. Compress the data and use efficient data formats like JSON or Protocol Buffers. You may also consider using a Content Delivery Network (CDN) for large file downloads.
- Data Filtering and Sorting: Allow clients to specify filters and sorting options to reduce the amount of data retrieved. This can also improve the user experience by delivering more relevant results.
- Batch Processing: In some cases, it might be more efficient to perform data processing in batches, rather than fetching all records at once. Consider breaking down the task into smaller, manageable chunks.
- and many more.
In this case, I want to highlight Performance, Data Filtering, and Batch Processing.
Case Study
let's say you have a system for selling digital products that integrates with some partner's API to get the product data.
The Business Team and Product Managers told you that we need to fetch every let's say 10 minutes to get the fresh data and upsert to your DB if data is changed so that our data always syncs with the partners.
Long story short, as a Backend, at least you need to do:
- Integration of your system with partner API.
- Implementation partner API products to get fresh data.
- Upsert to your database.
- Make the function run every 10 minutes.
the problem after we implement is the performance, get millions of data of course we need to batch and get by limit per limit. With the current situation and amount of data, it will take a long to upsert millions of data, maybe taking 30 to 120 minutes to do, but your system will be fetching every minute, that is exactly the problem. your system will be under an infinite loop to fetch the data.
What should we do?
I recommend using some concurrent running for fetching data. let's say have 5 concurrent running with different batches of data. So it will be:
- Thread one running to fetch 1–100000 data
- Thread two running to fetch 100001–200000 data
- and continue with another thread
let's say 1 concurrent is done in a minute so that a million data will be done with 10 concurrent in 10 minutes (you can choose the amount, concurrent size, and batch size by yourself depending on the data and cases).
What is Concurrent
Concurrency is the ability of the computer to execute multiple tasks at the same time depending on hardware and software.
Implementation
In Golang, we can run multiple processes (goroutine) to handle fetching data using sync.WaitGroup.
sync.WaitGroup
is used for waiting for goroutine. Implementation is very easy, you can use Add()
for setting how many goroutines you want to run.
In my case, I want to fetch data from 3rd party API products based on params productType
. Assumed every product type has a thousand data, and I wanted to upsert in my database.
package main
import "sync"
import "runtime"
import "fmt"
var LIST_PRODUCT_TYPE = [3]string{"food", "electronics", "clothing"}
type GetListProductResponse struct {
Data []ProductListResponse `json:"data"`
}
type ProductListResponse struct {
Code string `json:"code"`
Name string `json:"name"`
Price string `json:"price"`
Status bool `json:"status"`
}
func getProducts(ctx context.Context, req *GetProductListRequest) (*GetListProductResponse, error) {
// calling endpoint 3rd party
// parse to response
// and return the data
return &productList, nil
}
func main() {
wg := sync.WaitGroup{}
doneChan := make(chan interface{}, 1)
productsChan := make(chan *GetListProductResponse)
errChan := make(chan error)
for key := range LIST_PRODUCT_TYPE {
wg.Add(1)
req := &GetProductListRequest{
ProductType: LIST_PRODUCT_TYPE[key],
}
go func() {
defer wg.Done()
products, err := getProductList(ctx, req)
if err != nil {
errChan <- err
return
}
productsChan <- products
}()
}
go func() {
wg.Wait()
doneChan <- nil
}()
var (
catalogues GetListProductResponse
data []ProductListResponse
)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-errChan:
return nil, err
case products := <-productsChan:
data = append(data, products.Data...)
catalogues.Data = data
case <-doneChan:
return &catalogues, nil
}
}
}
In this example code above I added wg:= sync.Waitgroup
and set some channel:
- doneChan: is for waiting for a signal
- productChan: is for sharing product data from every goroutine that we run
- errChan: is a channel for sharing errors when fetching the data in every goroutine
next the concurrently fetches product data
- for key := range LIST_PRODUCT_TYPE: This loop iterates over the keys in the LIST_PRODUCT_TYPE map (assuming LIST_PRODUCT_TYPE is a map).
- wg.Add(1): It increments the WaitGroup (wg) counter by 1. This is commonly used to wait for a collection of goroutines to finish.
- req := &GetProductListRequest{ ProductType: LIST_PRODUCT_TYPE[key], }: It creates a GetProductListRequest with the ProductType set based on the current iteration of the loop.
- go func() {…}: This starts a new goroutine to handle the fetching of product lists for the given product type.
- defer wg.Done(): defer is used to ensure that wg.Done() is called when the goroutine exits, regardless of how it exits (normal return or early return due to an error).
wg.Done() decrements the WaitGroup counter, signaling that the goroutine has been completed. - products, err := getProductList(ctx, req): It calls a function GetProductList on some object b to retrieve the product list for the specified product type.
- if err != nil {…}: If an error occurs during the retrieval, it sends the error to the errChan channel.
- productsChan <- products: If the retrieval is successful, it sends the fetched product list to the productsChan channel.
In summary, this code is concurrently fetching product lists for different product types using goroutines. It utilizes a WaitGroup (wg) to wait for all goroutines to finish and communicates errors and results through channels (errChan and productsChan, respectively).
Last on the part
This code is starting a new goroutine that waits for a sync.WaitGroup
(wg
) to complete and then send a nil
value to a channel (doneChan
) to signal that the task is done. The actual functionality and purpose of this code would depend on the context in which it is used, particularly how the sync.WaitGroup
and doneChan
are being managed elsewhere in the program.
- The
for
loop runs indefinitely, and within it, there's aselect
statement, which is used to handle multiple channels concurrently. case <-ctx.Done():
: If the context (ctx
) is canceled (e.g., due to a timeout or cancellation signal), the loop will return an error with the reason provided byctx.Err()
.case err := <-errChan:
: If an error is received on theerrChan
, the function will return with the error value.case products := <-productsChan:
: If data is received on theproductsChan
, it appends the data to thedata
slice and updates thecatalogues.Data
field with the aggregated data.case <-doneChan:
: If a signal is received ondoneChan
, the function returns with the assembledcatalogues
response.
That is the example of Concurrently fetching data in Golang, I hope this is useful and if there is a topic you want me to discuss, feel free to drop in a comment. See ya!