package chanutil
import (
"reflect"
"sync"
)
Package chanutil implements multiple concurrency patterns using goroutines and channels
package chanutil
import (
"reflect"
"sync"
)
Merge combines input from multiple channels into a single channel
func Merge(input ...chan int) chan int {
out := make(chan int)
To synchronize closing the output channel when all input channels are closed
var shutdownSignal sync.WaitGroup
For each input channel a goroutine is launched which reads from the input channel and publishes to the output channel
for _, inputChan := range input {
Waitgroup is incremented for each goroutine launched (to read from assigned input channel)
shutdownSignal.Add(1)
Goroutine reads from assigned input channel and writes to output channel. Goroutine takes channel as an argument instead of accessing via closures to avoid data race conditions, since goroutine is launched from within a for loop Refer for race conditions inside for loops in Go
go func(inputChan chan int) {
Decrement the Waitgroup once goroutine exits
defer shutdownSignal.Done()
Block execution of the goroutine. Read from the assigned input channel. Write to the output channel
for num := range inputChan {
out <- num
}
}(inputChan)
}
A goroutine monitors when all upstream channels close. When all the upstream channels are closed, it’ll close the downstream channel
go func() {
Block until all the goroutines launched to read from input channels have been closed
shutdownSignal.Wait()
close(out)
}()
return out
}
Split broadcasts (duplicates) input from a single channel across multiple output channels
func Split(input chan int, outputs ...chan int) {
go func() {
Create n WaitGroups, each corresponding to goroutines spawned for writing to a single output channel
syncGoroutines := make([]sync.WaitGroup, len(outputs))
for index, outputChan := range outputs {
syncGoroutines[index] = sync.WaitGroup{}
Waitgroup incremented to account for when the input channel closes
syncGoroutines[index].Add(1)
The following goroutine monitors if all the goroutines spawned to publish to the respective output channel have closed using ‘wg’ and the ‘input’ channel has been closed, to close the ‘output’ channel
go func(outputChan chan int, wg *sync.WaitGroup) {
wg.Wait()
close(outputChan)
}(outputChan, &syncGoroutines[index])
}
Range over the input channel. Spawn a goroutine for each output channel each time a value is read from the input channel.
for inputVal := range input {
for index, outputChan := range outputs {
Increment the waitgroup for the assigned output channel to account for the goroutine that will be launched to write the present value to the output channel
syncGoroutines[index].Add(1)
go func(inputVal int, outputChan chan int, wg *sync.WaitGroup) {
outputChan <- inputVal
wg.Done()
}(inputVal, outputChan, &syncGoroutines[index])
}
}
Since the earlier for-range loop is blocking and will only release once the ‘input’ channel is closed. We can define code here to be executed once the ‘input’ channel is closed Range over n Waitgroups and signal their end since the input channel is closed
for index := range syncGoroutines {
syncGoroutines[index].Done()
}
}()
}
SplitRnd distributes input from a single channel across multiple output channels randomly
func SplitRnd(input chan int, outputs ...chan int) {
Spawn a goroutine for each output channel. The goroutine is responsible for reading from the input channel and writing to the output channel
for _, outputChan := range outputs {
Takes output channel as an argument and not via closures to avoid data race conditions
go func(outputChan chan int) {
Close the output channel once the input channel is closed
defer close(outputChan)
Blocks and runs until the input channel is closed
for num := range input {
outputChan <- num
}
}(outputChan)
}
}
Pipeline recursively creates a multi-stage asynchronous pipeline to filter values from the input channel in order of the functions provided and publish qualifying values to the output channel
func Pipeline(input chan int, filters ...func(task int) bool) chan int {
out := make(chan int)
filter := filters[0]
Goroutine creates a stage of the pipeline using a filter function and an output channel
go func(out chan int, filter func(task int) bool) {
Closes the output channel once the goroutine exits
defer close(out)
Blocks and reads from the input channel until it is closed
for value := range input {
if filter(value) {
out <- value
}
}
}(out, filter)
If there are more than one filter functions, Pipeline is invoked recursively
if len(filters) != 1 {
filters = filters[1:]
return Pipeline(out, filters...)
}
If there is only a single filter function, output channel is returned
return out
}
OrShutdown combines multiple signalling channels and returns a single signalling channel (The output channel is closed if any of the input signalling channels is closed)
func OrShutdown(inputs ...<-chan int) <-chan int {
out := make(chan int)
Goroutine creates a dynamically sized select-case statement using the reflect package.
go func() {
defer close(out)
cases := make([]reflect.SelectCase, len(inputs))
For each input channel a case statement is initialized to read from the respective channel
for idx, inputChan := range inputs {
cases[idx] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(inputChan)}
}
Goroutine blocks here until one of the input channels closes
reflect.Select(cases)
}()
return out
}
AndShutdown combines multiple signalling channels and returns a single signalling channel (The output channel is closed when all of the input signalling channels are closed)
func AndShutdown(inputs ...<-chan int) <-chan int {
out := make(chan int)
WaitGroup is used to synchronize closing the output channel when all the input channels are closed
var syncGoroutines sync.WaitGroup
For each input channel, a goroutine is spawned. When the input channel closes, the goroutine decrements the WaitGroup and exits
for _, inputChan := range inputs {
Increment WaitGroup for each goroutine launched
syncGoroutines.Add(1)
go func(inputChan <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
Goroutine blocks here until the input channel closes
<-inputChan
}(inputChan, &syncGoroutines)
}
Goroutine monitors when all the input channels are closed (signalled by respective goroutines spawned from the for loop).
go func(wg *sync.WaitGroup) {
Blocks until all goroutines spawned to monitor individual channels exit
wg.Wait()
close(out)
}(&syncGoroutines)
return out
}
Pool invokes multiple goroutines to process values on the input channel concurrently
func Pool(shutdown <-chan int, input <-chan int, poolSize int, process func(int) int) <-chan int {
out := make(chan int)
WaitGroup initialized to synchronize closing all the goroutines once the input channel is closed
var syncGoroutine sync.WaitGroup
Spawn ‘poolSize’ goroutines where each goroutine reads from the input channel runs the ‘process’ function on the value and publishes result to the output channel
for i := 0; i < poolSize; i++ {
syncGoroutine.Add(1)
go func(shutdown <-chan int, inputChan <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for {
Select-Case used to allow to close the goroutine if the ‘shutdown’ channel is closed
select {
case <-shutdown:
return
case num, ok := <-inputChan:
To allow the goroutine to exit when the input channel is closed
if !ok {
return
}
The following select-case block covers the case where value was read from the input channel and while the goroutine was waiting to write on the output channel, ‘shutdown’ channel was closed. Without the following select-case block, the goroutine would attempt to write to the output channel despite the ‘shutdown’ channel being signalled to close
select {
case <-shutdown:
return
case out <- process(num):
}
}
}
}(shutdown, input, &syncGoroutine)
}
The following goroutine closes the output channel once all the goroutines reading from the input channel have exited
go func(wg *sync.WaitGroup) {
syncGoroutine.Wait()
close(out)
}(&syncGoroutine)
return out
}