Properly batch items from input channel

Issue

Use case

I want to persist a lot data in a MySQL database which I receive via a channel. For performance reasons I process them in batches of 10 items. I receive the input items only every 3 hours.

The problem

Assuming I get 10004 items, there will be 4 items left because my go routine waits for 10 items before it “flushes them away” in a batch. I want to make sure that it creates a batch with less than 10 items in case there are no more items in that channel (channel is also closed by the producer then).

Code:

// ProcessAudits sends the given audits in batches to SQL
func ProcessAudits(done <-chan bq.Audit) {
    var audits []bq.Audit
    for auditRow := range done {
        user := auditRow.UserID.StringVal
        log.Infof("Received audit %s", user)
        audits = append(audits, auditRow)

        if len(audits) == 10 {
            upsertBigQueryAudits(audits)
            audits = []bigquery.Audit{}
        }
    }
}

I am new to Go and I am not sure how I would properly implement that?

Solution

Here’s a working example. When a channel is closed, the range exits, so you can just process any remaining items after the loop.

package main

import (
    "fmt"
    "sync"
)

type Audit struct {
    ID int
}

func upsertBigQueryAudits(audits []Audit) {
    fmt.Printf("Processing batch of %d\n", len(audits))
    for _, a := range audits {
        fmt.Printf("%d ", a.ID)
    }
    fmt.Println()
}

func processAudits(audits <-chan Audit, batchSize int) {
    var batch []Audit
    for audit := range audits {
        batch = append(batch, audit)
        if len(batch) == batchSize {
            upsertBigQueryAudits(batch)
            batch = []Audit{}
        }
    }
    if len(batch) > 0 {
        upsertBigQueryAudits(batch)
    }
}

func produceAudits(x int, to chan Audit) {
    for i := 0; i < x; i++ {
        to <- Audit{
            ID: i,
        }
    }
}

const batchSize = 10

func main() {
    var wg sync.WaitGroup
    audits := make(chan Audit)
    wg.Add(1)
    go func() {
        defer wg.Done()
        processAudits(audits, batchSize)
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        produceAudits(25, audits)
        close(audits)
    }()
    wg.Wait()
    fmt.Println("Complete")
}

Output:

Processing batch of 10
0 1 2 3 4 5 6 7 8 9
Processing batch of 10
10 11 12 13 14 15 16 17 18 19
Processing batch of 5
20 21 22 23 24
Complete

Answered By – a-h

This Answer collected from stackoverflow, is licensed under cc by-sa 2.5 , cc by-sa 3.0 and cc by-sa 4.0

Leave a Reply

(*) Required, Your email will not be published