I was reading HackerNews a few months ago and came across this post. It demystified something I’d long wondered about but hadn’t investigated myself: PubSub with PostgreSQL. Most PubSub implementations I’ve seen rely on some sort of message broker like Redis. To be fair, the Redis PubSub interface is pretty easy to work with (I think I have a post or two showing just how easy), and if you were purely working in memory, it would be totally fine. However, most applications rely on some sort of SQL database. In that case, in order to “publish”, you have to write to the database and then notify listeners over your message broker. You end up having to maintain two systems and it’s kind of annoying.

If you’re using MySQL or Sqlite, then you’re pretty much stuck. However, Postgres is a true modern marvel and once again has its own solution called Listen/Notify. This post is going to show you how to use Listen/Notify in your Golang applications with pgx Conn.WaitForNotification:

func (c *Conn) WaitForNotification(ctx context.Context) (*pgconn.Notification, error)

Something important to note is that listening on a topic (at least with the Go client) requires it’s own database connection; this connection can’t be used for anything else. We want to conserve connections, so what we need is a Notifier abstraction. To summarize from the post from @brandur, a Notifier “holds a single Postgres connection per process, allows other components in the same program to use it to subscribe to any number of topics, waits for notifications, and distributes them to listening components as they’re received”. That actually sounds like something you could implement pretty easily with goroutines and channels, doesn’t it? Here’s a diagram from the original post:

Schematic from @brandur's post

In the article @brandur says he’s not going to through a full implementation but readers should look at River’s notifier for a full implementation. Well, I want my own notifier package so for the rest of this post, I’m going to talk about my implementation while also highlighting some of the nuanced points @brandur brings up. You should absolutely do an in depth read of his post though, it is a real treasure trove of handy patterns.

Here’s a few points where I deviated from the example implementations above. Check out (most of) what’s exported from my package:

// Listener interface connects to the database and allows callers to listen to a
// particular topic by issuing a LISTEN command. WaitForNotification blocks
// until receiving a notification or until the supplied context expires. The
// default implementation is tightly coupled to pgx (following River's
// implementation), but callers may implement their own listeners for any
// backend they'd like.
type Listener interface {
    Close(ctx context.Context) error
    Connect(ctx context.Context) error
    Listen(ctx context.Context, topic string) error
    Ping(ctx context.Context) error
    Unlisten(ctx context.Context, topic string) error
    WaitForNotification(ctx context.Context) (*Notification, error)
}

// Notification represents a published message
type Notification struct {
    Channel string `json:"channel"`
    Payload []byte `json:"payload"`
}

// Notifier interface wraps a Listener. It holds a single Postgres connection
// per process, allows other components in the same program to use it to
// subscribe to any number of topics, waits for notifications, and distributes
// them to listening components as they’re received
type Notifier interface {
    // Returns a Subscription to the supplied channel topic which can be used to by
    // the caller to receive data published to that channel
    Listen(channel string) Subscription

    // Run the receiving loop until cancellation
    Run(ctx context.Context) error
}

// Subscription provides a means to listen on a particular topic. Notifiers
// return Subscriptions that callers can use to receive updates.
type Subscription interface {
    NotificationC() <-chan []byte
    EstablishedC() <-chan struct{}
    Unsubscribe(ctx context.Context)
}

The Listener interface is pretty straightforward; it’s what hold that single DB connection and listens for notifications from Postgres. I"ve opted to create my own Notification type that differs from the pgconn.Notification type by having a different Payload field. I mentioned this above, but this will make it easy to implement other sorts of Listeners that don’t listen for only Postgres notifications.

The Notifier interface is also pretty simple: the Listen method lets callers subscribe, and the Run method is what starts the whole thing.

Finally, the Subscription interface should be pretty self evident. NotificationC returns channel that callers can use to wait on notification payloads; EstablishedC returns a channel that callers can use in conjunction with a blocking read to wait for the subscription to be set up. Specifically, this channel should be closed by the Notifier once the subscription is full set up and listening, at which point, all reads will succeed (see below for more details). There’s also the Unsubscribe method which does what you’d expect. I’ve opted for this name as opposed to Unlisten to avoid confusion: a Notifier may have many subscribers for a given topic. An UNLISTEN is only issued when the last subscriber unsubscribes; otherwise the subscriber is simply removed from the Notifier subscribers.

Here’s an interesting point from the original post that I’ve adopted:

[calls to WaitForNotification] block until receiving a notification, which can be problem since we’re only using a single connection. What if the notifier is in a blocking receive loop, but another component wants to add a new subscription that requires LISTEN be issued?

The solution is to use “interruptible receives” (i.e., basically just use a cancellable context in the call to WaitForNotification). The implementation below uses one that has a 30 second time out too, but that isn’t strictly necessary. Here’s my implementation that borrows from @brandur’s post and the River implementation:

// waitOnce blocks until either 1) a notification is received and
// distributed to all topic listeners, 2) the timeout is hit, or 3) an external
// caller calls l.waitForNotificationCancel. In all 3 cases, nil is returned to
// signal good/expected exit conditions, meaning a caller can simply call
// handleNextNotification again.
func (n *notifier) waitOnce(ctx context.Context) error {
    if err := n.processChannelChanges(ctx); err != nil {
        return err
    }

    // WaitForNotification is a blocking function, but since we want to wake
    // occasionally to process new `LISTEN`/`UNLISTEN`, we let the context
    // timeout and also expose a way for external code to cancel this loop with
    // waitForNotificationCancel.
    notification, err := func() (*Notification, error) {
        const listenTimeout = 30 * time.Second

        ctx, cancel := context.WithTimeout(ctx, listenTimeout)
        defer cancel()

        // Provides a way for the blocking wait to be cancelled in case a new
        // subscription change comes in.
        n.mu.Lock()
        n.waitForNotificationCancel = cancel
        n.mu.Unlock()

        notification, err := n.listener.WaitForNotification(ctx)
        if err != nil {
            return nil, fmt.Errorf("error waiting for notification: %w", err)
        }

        return notification, nil
    }()
    if err != nil {
        // If the error was a cancellation or the deadline being exceeded but
        // there's no error in the parent context, return no error.
        if (errors.Is(err, context.Canceled) ||
            errors.Is(err, context.DeadlineExceeded)) && ctx.Err() == nil {
            return nil
        }

        return err
    }

    n.mu.RLock()
    defer n.mu.RUnlock()

    for _, sub := range n.subscriptions[notification.Channel] {
        select {
        case sub.listenChan <- []byte(notification.Payload):
        default:
            n.logger.Error("dropped notification due to full buffer", "payload", notification.Payload)
        }
    }

    return nil
}

The crucial part is this assignment to waitForNotificationCancel, which external callers can use to cancel this blocking wait. Cancelation/expiration related errors are ignored and lead to an early return. Otherwise, WaitForNotification returns a notification that gets distributed to all subscribers.

If you’re worried that we’ll miss notifications while resetting this loop, don’t be. WaitForNotification maintains a queue of notifications under the hood (see here).

You may note that I’ve tweaked the interface here a bit: Listeners return a *Notification and subscribers receive a slice of bytes. I opted for this rather than coupling to the pgconn.Notification type because I want to be able to implement other Listeners that return JSON data.

Here’s another important point from @brandur’s post:

Subscriptions use a buffered channel like make(chan string, 100) and non-blocking sends (using select with default).

The point here is that the Notifier is responsible for delivering messages to every subscriber and you don’t want to let one slow subscriber with a full queue to block everything. The trade off is that if a subscriber’s buffer is full, any notifications to that subscriber will vanish into the ether with an empty default block. If you absolutely must handle every update, then you’ll need to use a different Notifier implementation. In most cases though, it’s better to drop the notification and log an error rather than bring the whole notification system to a crawl.

Here’s another neat concept that happens to be a pretty common Go idiom: subscriptions have an “established” channel that gets closed once the subscription’s LISTEN is fully set up. Callers can tell the subscription to start listening (which is asynchronous) and then perform a blocking read from the channel returned by EstablishedC(). The subscription closes the channel once it’s actually listening, so callers can block on that channel until the subscription is “officially” set up (recall that reads on a close channel always succeed).

Here’s a little func main() you can use to play around with this package:

package main

import (
	"context"
	"flag"
	"fmt"
	"log/slog"
	"os"
	"path/filepath"
	"time"

	"github.com/brojonat/notifier/notifier"
	"github.com/jackc/pgx/v5/pgxpool"
)

func main() {

	ctx := context.Background()
	l := getDefaultLogger(slog.LevelInfo)

	var url string
	flag.StringVar(&url, "dbhost", "", "DB host (postgresql://{user}:{password}@{hostname}/{db}?sslmode=require)")

	var topic string
	flag.StringVar(&topic, "channel", "", "a string")

	flag.Parse()

	if url == "" || topic == "" {
		fmt.Fprintf(os.Stderr, "missing required flag")
		os.Exit(1)
		return
	}

	// get a connection pool
	pool, err := pgxpool.New(ctx, url)
	if err != nil {
		fmt.Fprintf(os.Stderr, "error connection to DB: %v", err)
		os.Exit(1)
	}
	if err = pool.Ping(ctx); err != nil {
		fmt.Fprintf(os.Stderr, "error pinging DB: %v", err)
		os.Exit(1)
	}

	// setup the listener
	li := notifier.NewListener(pool)
	if err := li.Connect(ctx); err != nil {
		fmt.Fprintf(os.Stderr, "error setting up listener: %v", err)
		os.Exit(1)
	}

	// setup the notifier
	n := notifier.NewNotifier(l, li)
	go n.Run(ctx)

	// subscribe to the topic
	sub := n.Listen(topic)

	// indefinitely listen for updates
	go func() {
		<-sub.EstablishedC()
		for {
			select {
			case <-ctx.Done():
				sub.Unlisten(ctx)
				fmt.Println("done listening for notifications")
				return
			case p := <-sub.NotificationC():
				fmt.Printf("Got notification: %s\n", p)
			}
		}
	}()

	// unsubscribe after some time
	go func() {
		time.Sleep(20 * time.Second)
		sub.Unlisten(ctx)
	}()

	select {}
}

func getDefaultLogger(lvl slog.Level) *slog.Logger {
	return slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
		AddSource: true,
		Level:     lvl,
		ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
			if a.Key == slog.SourceKey {
				source, _ := a.Value.Any().(*slog.Source)
				if source != nil {
					source.Function = ""
					source.File = filepath.Base(source.File)
				}
			}
			return a
		},
	}))
}