Back so soon? Today’s post is about a pretty neat Temporal feature. Imagine you have some business logic you want to run on a CRON-esque schedule. You might be tempted to just set up a script that gets called by a CRON job. Not so fast though, what if it’s some Very Important Task? You need retries, you need resiliancy, you need visibility, you need…a bunch of stuff. And you probably don’t have just one of these Very Important Tasks, you might have dozens or even hundreds that need to run at any given time. Fortunately, Temporal provides this sort of thing out of the box with Schedules. The Go SDK overview of the Schedule API is here.

A Schedule is basically just a workflow that periodically calls some specified workflow, and the API was designed specifically for a developer-friendly scheduling experience, so it’s quite powerful. The docs are great, so spend a few minutes reading them over to get a handle on the API, and then we’ll jump into an example.

Ready? Ok, here’s the use case: I have some arbitrary HTTP requests I want to execute on a quasi-periodic basis and have their results uploaded to my server. This way, my server isn’t responsible for the workload, I can horizontally scale scale my worker pool as needed, and I have all the resiliancy, retries, and visibility that Temporal affords.

You can serialize an http.Request to a []byte with the Write method, and you can parse a request from a []byte with http.ReadRequest. These are a bit unconventional (and somewhat discouraged by the docs), but we have to serialize the requests so Temporal can pass them off to workers, so it is what it is. Here’s how you’d serialize a request:


func main() {
    // construct request
    endpoint := "https://api.kaggo.brojonat.com/ping"
    wf_r, err := http.NewRequest(http.MethodGet, endpoint, nil)
    wf_r.Header.Add("foo", "bar")
    if err != nil {
        writeInternalError(l, w, err)
        return
    }

    // serialize
    buf := &bytes.Buffer{}
    err = r.Write(buf)
    if err != nil {
        writeInternalError(l, w, err)
        return
    }
    b := buf.Bytes()

    // construct an ID from the bytes
    h := md5.New()
    _, err = h.Write(b)
    if err != nil {
        writeInternalError(l, w, err)
        return
    }

    sid := "test-schedule-id"
    wid := fmt.Sprintf("request-%x", h.Sum(nil))
}

Note that when you deserialize a request in this way (in this example it would likely be inside an Activity), ReadRequest will attempt to set RequestURI on the request; this is an error! See this StackOverflow for more info. You’ll need to manually reconstruct the URL yourself like so:

    u, err := url.Parse("https://" + r.Host + r.RequestURI)
    if err != nil {
        return nil, fmt.Errorf("error parsing request URL: %s", r.RequestURI)
    }
    r.URL = u
    r.RequestURI = ""

So far, so good, Next, we’re going to create the Schedule:


// Create the schedule.
_, err = tc.ScheduleClient().Create(
    context.Background(),
    client.ScheduleOptions{
        ID: sid,
        Spec: client.ScheduleSpec{
            Calendars: []client.ScheduleCalendarSpec{
                {
                    Second: []client.ScheduleRange{
                        {Start: 1},
                    },
                    Comment: "run on the first second"},
            },
        },
        Action: &client.ScheduleWorkflowAction{
            ID:        wid,
            Workflow:  mypackage.DoArbitraryRequestWF,
            TaskQueue: "test-task-queue",
            Args:      []interface{}{"some-string-arg", b},
        },
    })

Let’s break this down real quick, starting from the bottom. Action is simply the underlying workflow that you want to call periodically. You specify the same things you’d normally specify via client.ExecuteWorkflow, just in a slightly different format. Side note: I learned something here about []interface{} that has it’s own entry on the wiki. Here we pass in our workflow that makes some arbitrary request for us, and pass in the arguments (this contrived workflow takes a string and a []byte representing the request).

The Spec field is slightly more complicated. The package documentation for ScheduleSpec recommends using Calendars, but there’s a couple other options that might be useful if you’re porting over old code. For Calendars, you just specify a slice of ScheduleCalendarSpec, which is a construct that let’s you specify a CRON-like matching criteria, and then whenever a time in the real world matches, it will execute the workflow specified in Action. For reference, here’s the docstring for ScheduleCalendarSpec:

ScheduleCalendarSpec is an event specification relative to the calendar,
similar to a traditional cron specification. A timestamp matches if at
least one range of each field matches the corresponding fields of the
timestamp, except for year: if year is missing, that means all years
match. For all fields besides year, at least one Range must be present to
match anything.

In other words, you can omit fields like Second, Hour, etc. and it will match the 00 value for that field. Note that omitting is different from setting to an empty slice; if you pass an empty slice, nothing will match! The exception is Year, which matches every year when an empty value is passed.

The individual ScheduleRange instances have Start, End, and Step fields. you can specify only Start and it will automatically set End such that the schedule will execute on that value alone. This is useful if you want to run on, say, :05 seconds after the minute rather than on the minute. Note that the structs don’t have json tags; their fields are serialized with capital letters.

There’s also a Skip field you can set that also consists of a slice of ScheduleCalendarSpec, which will override and skip any otherwise scheduled times. Pretty simple, but very powerful; you can get real creative with these matching criteria. Also of note is a Jitter value you can set to jitter the schedule times. Imagine you have 100 schedules that run every hour and fetch some data from an external API. You don’t want all 100 calls to hit the API within the first few seconds of every hour! You can apply a jitter of say, 10 minutes to spread out the workflow execution times.

You also have control over the time bounds of when a Schedule may run. Of course, you can pause a Schedule, but you can also specify a time bounds, outside of which the schedule will not run (e.g., maybe you only want a schedule to run for the next 2 weeks, after which it shouldn’t run anymore).

Cool, now you can schedule pretty much any regular interval you can think up. You can list, create, delete, and pause schedules pretty trivially with the scheduling client. Here’s what your handlers might look like (with some tweaks to the example from above):

// routes.go
func setupRoutes(l *slog.Logger, tc client.Client) http.Handler {
    mux.Handle("GET /schedule", handleGetSchedule(l, tc))
    mux.Handle("POST /schedule", handleCreateSchedule(l, tc))
    mux.Handle("PUT /schedule", handleUpdateSchedule(l, tc))
    mux.Handle("DELETE /schedule", handleCancelSchedule(l, tc))
    mux.Handle("POST /schedule/trigger", handleTriggerSchedule(l, tc))
    // ...other stuff...
}

// handlers.go
func handleGetSchedule(l *slog.Logger, tc client.Client) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        ss, err := tc.ScheduleClient().List(r.Context(), client.ScheduleListOptions{})
        if err != nil {
            writeBadRequestError(w, err)
            return
        }
        res := []*client.ScheduleListEntry{}
        for {
            if !ss.HasNext() {
                break
            }
            s, err := ss.Next()
            if err != nil {
                break
            }
            res = append(res, s)
        }
        w.WriteHeader(http.StatusOK)
        json.NewEncoder(w).Encode(res)
    }
}

func handleCreateSchedule(l *slog.Logger, tc client.Client) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        endpoint := "https://api.kaggo.brojonat.com/ping"
        rwf, err := http.NewRequest(http.MethodGet, endpoint, nil)
        rwf.Header.Add("foo", "bar")
        if err != nil {
            writeInternalError(l, w, err)
            return
        }

        buf := &bytes.Buffer{}
        err = rwf.Write(buf)
        if err != nil {
            writeInternalError(l, w, err)
            return
        }
        b := buf.Bytes()

        h := md5.New()
        _, err = h.Write(b)
        if err != nil {
            writeInternalError(l, w, err)
            return
        }

        sid := "test-schedule-id"
        wid := fmt.Sprintf("%x", h.Sum(nil))

        // Create the schedule.
        _, err = tc.ScheduleClient().Create(
            r.Context(),
            client.ScheduleOptions{
                ID: sid,
                Spec: client.ScheduleSpec{
                    Calendars: []client.ScheduleCalendarSpec{
                        {
                            Minute:  []client.ScheduleRange{{Start: 5}},
                            Hour:    []client.ScheduleRange{{Start: 0, End: 23}},
                            Comment: "run the workflow every hour, 5 min past the hour"
                        },
                    },
                },
                Action: &client.ScheduleWorkflowAction{
                    ID:        wid,
                    Workflow:  mypackage.DoArbitraryRequestWF,
                    TaskQueue: "test-task-queue",
                    Args:      []interface{}{mypackage.Request{Serial: bs}},
                },
            })
        if err != nil {
            writeInternalError(l, w, err)
            return
        }
        w.WriteHeader(http.StatusOK)
        json.NewEncoder(w).Encode(DefaultJSONResponse{Message: "ok"})
    }
}

func handleUpdateSchedule(l *slog.Logger, tc client.Client) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        action := r.URL.Query().Get("action")
        if action != "pause" {
            writeBadRequestError(w, fmt.Errorf("unsupported action: %s", action))
            return
        }
        sid := r.URL.Query().Get("schedule_id")
        note := r.URL.Query().Get("note")
        err := tc.ScheduleClient().GetHandle(r.Context(), sid).Pause(
            r.Context(), client.SchedulePauseOptions{Note: note})
        if err != nil {
            writeBadRequestError(w, err)
            return
        }
        writeOK(w)
    }
}

func handleCancelSchedule(l *slog.Logger, tc client.Client) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        sid := r.URL.Query().Get("schedule_id")
        err := tc.ScheduleClient().GetHandle(r.Context(), sid).Delete(r.Context())
        if err != nil {
            writeBadRequestError(w, err)
            return
        }
        writeOK(w)
    }
}

func handleTriggerSchedule(l *slog.Logger, tc client.Client) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        sid := r.URL.Query().Get("schedule_id")
        err := tc.ScheduleClient().GetHandle(r.Context(), sid).Trigger(
            r.Context(),
            client.ScheduleTriggerOptions{Overlap: enums.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL})
        if err != nil {
            writeBadRequestError(w, err)
            return
        }
        writeOK(w)
    }
}

These should all be pretty straightforward. There could be some improvement in the error handling to distinguish between service errors and bad requests, but you get the idea. The only potential “gotcha” I noticed when fiddling with these handlers was that you’ll probably want to specify SCHEDULE_OVERLAP_POLICY_ALLOW_ALL when you go to trigger a schedule, since otherwise it may no-op (but it ultimately depends on what the schedule’s existing overlap policy is). Cool, that should be enough to get you started. I’m going to go back to building; see you on the next one!