Channel Merging for (Periodic) Func
Continuing on this hot streak of posts, I figured I’d write up something I’ve wanted to do a post on for quite a while. The idea here is to have a sort of “arterial channel” where we merge some arbitrary number of channels into one singular channel. There’s actually a couple “Just for Func” episodes on this topic that were extremely helpful: justforfunc #26: why are there nil channels in Go? and justforfunc #27: two ways of merging N channels.
First, a little motivation. I had a service I was working on and I wanted to be able to supply the service with array of periodic callbacks that would all run in separate goroutines. For instance, maybe a service wants to ping some endpoint every 5 seconds, kick off some asynchronous task every 10 minutes, update some metric every 30 seconds, and so on. The time.Ticker
makes this relatively easy, since you can simply listen on Ticker.C
for the “notification” to run the callback. But there’s still some details you need to work out to get everything to work gracefully, hence this post. Ok, let’s look at some code.
Here’s what the service looks like:
type service struct {
lock *sync.RWMutex
periodics []<-chan func()
}
There’s a lock, as well as an array of channels that feed us functions to execute. Here’s the main “run” function that should get called at startup (in its own goroutine):
func (s *service) runPeriodics() {
if s.periodics == nil {
return
}
for f := range merge(s.periodics...) {
go f()
}
}
Simple enough. But what is this merge
thing? This is what’s responsible for merging a bunch of channels into one big channel we can read everything from. We’ll look into that down below, but first, there’s one more service
method necessary to tie this all together:
func (s *service) addPeriodic(f func(), t time.Duration) {
s.lock.Lock()
defer s.lock.Unlock()
c := make(chan func())
s.periodics = append(s.periodics, c)
go func() {
defer close(c)
for range time.NewTicker(t).C {
c <- f
}
}()
}
Breaking this down, addPeriodic
takes a plain old function f
and a duration t
, appends a new channel c
to the service’s array of channels, and runs a goroutine that feeds f
into c
every d
. Admittedly, this API should include a context.Context
to support cancellation and shutdown, but for now this is fine, plus I’ve already covered that sort of thing in other posts.
Everything has been pretty straightforward thus far. Let’s dig into the merge
function; this is where there’s a bit of subtlety (and also where those “Just for Func” episodes are really informative). The end product is going to be a function that accepts variadic chan func()
and returns a single chan func()
.
There’s a bit of subtlety that comes from the fact that we don’t want the returned channel to close until all the child channels have closed. A sync.WaitGroup
blocks the outer goroutine (preventing the deferred close
call) while the inner goroutines run in the background. If/when all the child channels are closed, the WaitGroup
will unblock and the main channel will close too. Here’s the implementation, which should make things more clear:
func merge(chans ...<-chan func()) <-chan func() {
out := make(chan func())
go func() {
defer close(out)
var wg sync.WaitGroup
wg.Add(len(chans))
for _, c := range chans {
go func(c <-chan func()) {
for v := range c {
out <- v
}
wg.Done()
}(c)
}
wg.Wait()
}()
return out
}
First the output channel is created and gets returned more or less immediately because all the “work” happens in this outer goroutine that gets spawned, which ranges over the supplied channels. For each channel, another goroutine is spawned to range over that channel. If/when a channel closes, the for
loop breaks, the WaitGroup
is incremented, and that channel’s goroutine finishes. The outer goroutine is blocked, waiting on all the channels to close. Again, this should support some form of cancellation by accepting a context, but for now, this should get the point across (it’s pretty straightforward to replace the for
loop over the channel with a for select
over the channel and a Context.Done()
channel, which I’ve covered in other posts).
This is just a “Snippets” post, so that’s all I’m going to cover for now. If any part of this hasn’t quite stuck, definitely check out those “Just for Func” videos; you’ll definitely learn a thing or two. Anyway, until next time!