Continuing on this hot streak of posts, I figured I’d write up something I’ve wanted to do a post on for quite a while. The idea here is to have a sort of “arterial channel” where we merge some arbitrary number of channels into one singular channel. There’s actually a couple “Just for Func” episodes on this topic that were extremely helpful: justforfunc #26: why are there nil channels in Go? and justforfunc #27: two ways of merging N channels.

First, a little motivation. I had a service I was working on and I wanted to be able to supply the service with array of periodic callbacks that would all run in separate goroutines. For instance, maybe a service wants to ping some endpoint every 5 seconds, kick off some asynchronous task every 10 minutes, update some metric every 30 seconds, and so on. The time.Ticker makes this relatively easy, since you can simply listen on Ticker.C for the “notification” to run the callback. But there’s still some details you need to work out to get everything to work gracefully, hence this post. Ok, let’s look at some code.

Here’s what the service looks like:

type service struct {
	lock      *sync.RWMutex
	periodics []<-chan func()
}

There’s a lock, as well as an array of channels that feed us functions to execute. Here’s the main “run” function that should get called at startup (in its own goroutine):

func (s *service) runPeriodics() {
	if s.periodics == nil {
		return
	}
	for f := range merge(s.periodics...) {
		go f()
	}
}

Simple enough. But what is this merge thing? This is what’s responsible for merging a bunch of channels into one big channel we can read everything from. We’ll look into that down below, but first, there’s one more service method necessary to tie this all together:

func (s *service) addPeriodic(f func(), t time.Duration) {
	s.lock.Lock()
	defer s.lock.Unlock()

	c := make(chan func())
	s.periodics = append(s.periodics, c)
	go func() {
		defer close(c)
		for range time.NewTicker(t).C {
			c <- f
		}
	}()
}

Breaking this down, addPeriodic takes a plain old function f and a duration t, appends a new channel c to the service’s array of channels, and runs a goroutine that feeds f into c every d. Admittedly, this API should include a context.Context to support cancellation and shutdown, but for now this is fine, plus I’ve already covered that sort of thing in other posts.

Everything has been pretty straightforward thus far. Let’s dig into the merge function; this is where there’s a bit of subtlety (and also where those “Just for Func” episodes are really informative). The end product is going to be a function that accepts variadic chan func() and returns a single chan func().

There’s a bit of subtlety that comes from the fact that we don’t want the returned channel to close until all the child channels have closed. A sync.WaitGroup blocks the outer goroutine (preventing the deferred close call) while the inner goroutines run in the background. If/when all the child channels are closed, the WaitGroup will unblock and the main channel will close too. Here’s the implementation, which should make things more clear:

func merge(chans ...<-chan func()) <-chan func() {
	out := make(chan func())
	go func() {
		defer close(out)
		var wg sync.WaitGroup
		wg.Add(len(chans))
		for _, c := range chans {
			go func(c <-chan func()) {
				for v := range c {
					out <- v
				}
				wg.Done()
			}(c)
		}
		wg.Wait()
	}()
	return out
}

First the output channel is created and gets returned more or less immediately because all the “work” happens in this outer goroutine that gets spawned, which ranges over the supplied channels. For each channel, another goroutine is spawned to range over that channel. If/when a channel closes, the for loop breaks, the WaitGroup is incremented, and that channel’s goroutine finishes. The outer goroutine is blocked, waiting on all the channels to close. Again, this should support some form of cancellation by accepting a context, but for now, this should get the point across (it’s pretty straightforward to replace the for loop over the channel with a for select over the channel and a Context.Done() channel, which I’ve covered in other posts).

This is just a “Snippets” post, so that’s all I’m going to cover for now. If any part of this hasn’t quite stuck, definitely check out those “Just for Func” videos; you’ll definitely learn a thing or two. Anyway, until next time!