Go and Postgres Listen/Notify or: How I Learned to Stop Worrying and Love PubSub
I was reading HackerNews a few months ago and came across this post. It demystified something I’d long wondered about but hadn’t investigated myself: PubSub with PostgreSQL. Most PubSub implementations I’ve seen rely on some sort of message broker like Redis. To be fair, the Redis PubSub interface is pretty easy to work with (I think I have a post or two showing just how easy), and if you were purely working in memory, it would be totally fine. However, most applications rely on some sort of SQL database. In that case, in order to “publish”, you have to write to the database and then notify listeners over your message broker. You end up having to maintain two systems and it’s kind of annoying.
If you’re using MySQL or Sqlite, then you’re pretty much stuck. However, Postgres is a true modern marvel and once again has its own solution called Listen/Notify. This post is going to show you how to use Listen/Notify in your Golang applications with pgx
Conn.WaitForNotification:
func (c *Conn) WaitForNotification(ctx context.Context) (*pgconn.Notification, error)
Something important to note is that listening on a topic (at least with the Go client) requires it’s own database connection; this connection can’t be used for anything else. We want to conserve connections, so what we need is a Notifier
abstraction. To summarize from the post from @brandur, a Notifier “holds a single Postgres connection per process, allows other components in the same program to use it to subscribe to any number of topics, waits for notifications, and distributes them to listening components as they’re received”. That actually sounds like something you could implement pretty easily with goroutines and channels, doesn’t it? Here’s a diagram from the original post:
In the article @brandur says he’s not going to through a full implementation but readers should look at River’s notifier for a full implementation. Well, I want my own notifier package so for the rest of this post, I’m going to talk about my implementation while also highlighting some of the nuanced points @brandur brings up. You should absolutely do an in depth read of his post though, it is a real treasure trove of handy patterns.
Here’s a few points where I deviated from the example implementations above. Check out (most of) what’s exported from my package:
// Listener interface connects to the database and allows callers to listen to a
// particular topic by issuing a LISTEN command. WaitForNotification blocks
// until receiving a notification or until the supplied context expires. The
// default implementation is tightly coupled to pgx (following River's
// implementation), but callers may implement their own listeners for any
// backend they'd like.
type Listener interface {
Close(ctx context.Context) error
Connect(ctx context.Context) error
Listen(ctx context.Context, topic string) error
Ping(ctx context.Context) error
Unlisten(ctx context.Context, topic string) error
WaitForNotification(ctx context.Context) (*Notification, error)
}
// Notification represents a published message
type Notification struct {
Channel string `json:"channel"`
Payload []byte `json:"payload"`
}
// Notifier interface wraps a Listener. It holds a single Postgres connection
// per process, allows other components in the same program to use it to
// subscribe to any number of topics, waits for notifications, and distributes
// them to listening components as they’re received
type Notifier interface {
// Returns a Subscription to the supplied channel topic which can be used to by
// the caller to receive data published to that channel
Listen(channel string) Subscription
// Run the receiving loop until cancellation
Run(ctx context.Context) error
}
// Subscription provides a means to listen on a particular topic. Notifiers
// return Subscriptions that callers can use to receive updates.
type Subscription interface {
NotificationC() <-chan []byte
EstablishedC() <-chan struct{}
Unsubscribe(ctx context.Context)
}
The Listener
interface is pretty straightforward; it’s what hold that single DB connection and listens for notifications from Postgres. I"ve opted to create my own Notification
type that differs from the pgconn.Notification
type by having a different Payload
field. I mentioned this above, but this will make it easy to implement other sorts of Listeners
that don’t listen for only Postgres notifications.
The Notifier
interface is also pretty simple: the Listen
method lets callers subscribe, and the Run
method is what starts the whole thing.
Finally, the Subscription
interface should be pretty self evident. NotificationC
returns channel that callers can use to wait on notification payloads; EstablishedC
returns a channel that callers can use in conjunction with a blocking read to wait for the subscription to be set up. Specifically, this channel should be closed by the Notifier
once the subscription is full set up and listening, at which point, all reads will succeed (see below for more details). There’s also the Unsubscribe
method which does what you’d expect. I’ve opted for this name as opposed to Unlisten
to avoid confusion: a Notifier
may have many subscribers for a given topic. An UNLISTEN
is only issued when the last subscriber unsubscribes; otherwise the subscriber is simply removed from the Notifier
subscribers.
Here’s an interesting point from the original post that I’ve adopted:
[calls to WaitForNotification] block until receiving a notification, which can be problem since we’re only using a single connection. What if the notifier is in a blocking receive loop, but another component wants to add a new subscription that requires LISTEN be issued?
The solution is to use “interruptible receives” (i.e., basically just use a cancellable context in the call to WaitForNotification
). The implementation below uses one that has a 30 second time out too, but that isn’t strictly necessary. Here’s my implementation that borrows from @brandur’s post and the River
implementation:
// waitOnce blocks until either 1) a notification is received and
// distributed to all topic listeners, 2) the timeout is hit, or 3) an external
// caller calls l.waitForNotificationCancel. In all 3 cases, nil is returned to
// signal good/expected exit conditions, meaning a caller can simply call
// handleNextNotification again.
func (n *notifier) waitOnce(ctx context.Context) error {
if err := n.processChannelChanges(ctx); err != nil {
return err
}
// WaitForNotification is a blocking function, but since we want to wake
// occasionally to process new `LISTEN`/`UNLISTEN`, we let the context
// timeout and also expose a way for external code to cancel this loop with
// waitForNotificationCancel.
notification, err := func() (*Notification, error) {
const listenTimeout = 30 * time.Second
ctx, cancel := context.WithTimeout(ctx, listenTimeout)
defer cancel()
// Provides a way for the blocking wait to be cancelled in case a new
// subscription change comes in.
n.mu.Lock()
n.waitForNotificationCancel = cancel
n.mu.Unlock()
notification, err := n.listener.WaitForNotification(ctx)
if err != nil {
return nil, fmt.Errorf("error waiting for notification: %w", err)
}
return notification, nil
}()
if err != nil {
// If the error was a cancellation or the deadline being exceeded but
// there's no error in the parent context, return no error.
if (errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded)) && ctx.Err() == nil {
return nil
}
return err
}
n.mu.RLock()
defer n.mu.RUnlock()
for _, sub := range n.subscriptions[notification.Channel] {
select {
case sub.listenChan <- []byte(notification.Payload):
default:
n.logger.Error("dropped notification due to full buffer", "payload", notification.Payload)
}
}
return nil
}
The crucial part is this assignment to waitForNotificationCancel
, which external callers can use to cancel this blocking wait. Cancelation/expiration related errors are ignored and lead to an early return. Otherwise, WaitForNotification
returns a notification that gets distributed to all subscribers.
If you’re worried that we’ll miss notifications while resetting this loop, don’t be. WaitForNotification
maintains a queue of notifications under the hood (see here).
You may note that I’ve tweaked the interface here a bit: Listeners
return a *Notification
and subscribers receive a slice of bytes. I opted for this rather than coupling to the pgconn.Notification
type because I want to be able to implement other Listeners
that return JSON data.
Here’s another important point from @brandur’s post:
Subscriptions use a buffered channel like
make(chan string, 100)
and non-blocking sends (usingselect
withdefault
).
The point here is that the Notifier
is responsible for delivering messages to every subscriber and you don’t want to let one slow subscriber with a full queue to block everything. The trade off is that if a subscriber’s buffer is full, any notifications to that subscriber will vanish into the ether with an empty default
block. If you absolutely must handle every update, then you’ll need to use a different Notifier
implementation. In most cases though, it’s better to drop the notification and log an error rather than bring the whole notification system to a crawl.
Here’s another neat concept that happens to be a pretty common Go idiom: subscriptions have an “established” channel that gets closed once the subscription’s LISTEN
is fully set up. Callers can tell the subscription to start listening (which is asynchronous) and then perform a blocking read from the channel returned by EstablishedC()
. The subscription closes the channel once it’s actually listening, so callers can block on that channel until the subscription is “officially” set up (recall that reads on a close channel always succeed).
Here’s a little func main()
you can use to play around with this package:
package main
import (
"context"
"flag"
"fmt"
"log/slog"
"os"
"path/filepath"
"time"
"github.com/brojonat/notifier/notifier"
"github.com/jackc/pgx/v5/pgxpool"
)
func main() {
ctx := context.Background()
l := getDefaultLogger(slog.LevelInfo)
var url string
flag.StringVar(&url, "dbhost", "", "DB host (postgresql://{user}:{password}@{hostname}/{db}?sslmode=require)")
var topic string
flag.StringVar(&topic, "channel", "", "a string")
flag.Parse()
if url == "" || topic == "" {
fmt.Fprintf(os.Stderr, "missing required flag")
os.Exit(1)
return
}
// get a connection pool
pool, err := pgxpool.New(ctx, url)
if err != nil {
fmt.Fprintf(os.Stderr, "error connection to DB: %v", err)
os.Exit(1)
}
if err = pool.Ping(ctx); err != nil {
fmt.Fprintf(os.Stderr, "error pinging DB: %v", err)
os.Exit(1)
}
// setup the listener
li := notifier.NewListener(pool)
if err := li.Connect(ctx); err != nil {
fmt.Fprintf(os.Stderr, "error setting up listener: %v", err)
os.Exit(1)
}
// setup the notifier
n := notifier.NewNotifier(l, li)
go n.Run(ctx)
// subscribe to the topic
sub := n.Listen(topic)
// indefinitely listen for updates
go func() {
<-sub.EstablishedC()
for {
select {
case <-ctx.Done():
sub.Unlisten(ctx)
fmt.Println("done listening for notifications")
return
case p := <-sub.NotificationC():
fmt.Printf("Got notification: %s\n", p)
}
}
}()
// unsubscribe after some time
go func() {
time.Sleep(20 * time.Second)
sub.Unlisten(ctx)
}()
select {}
}
func getDefaultLogger(lvl slog.Level) *slog.Logger {
return slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
AddSource: true,
Level: lvl,
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
if a.Key == slog.SourceKey {
source, _ := a.Value.Any().(*slog.Source)
if source != nil {
source.Function = ""
source.File = filepath.Base(source.File)
}
}
return a
},
}))
}