/tinyletter

Pumpkin Spice Programs

This Week’s Program: Oct 16 – Oct 20

Hi fam. I wrote some code this week. It was in Racket.

Let’s make an event loop!

Today I want to focus on a group of related commits, and talk you through what I’ve done.

In the first commit, I blow away nearly all of overscan/main.rkt and start over again with my new gstreamer module in my quiver.

The first thing I want to figure out in this rewrite, before getting all fancy, is to handle the messages that a pipeline emits through its lifetime. Related to that is how to shutdown the pipeline cleanly when there’s an error or the stream has ended. I also want to be able to incrementally add or take away things that can monitor the event bus, like debugging and logging utilities.

I need an event loop!

make-bus-channel

This function has existed for some time in different parts of the Overscan codebase, and has been refined. It currently exists in gstreamer/bus.rkt.

(define (make-bus-channel bus [filters '(any)]
                          #:timeout [timeout clock-time-none])
  (let* ([bus-pipe (spawn-bus-place)]
         [bus-dead? (place-dead-evt bus-pipe)])
    (place-channel-put bus-pipe (list (gtype-instance-pointer (gobject-ptr bus))
                                      timeout
                                      filters))
    (choice-evt (wrap-evt bus-pipe
                          (lambda (ptr) (and ptr (gstruct-cast ptr gst-message))))
                (handle-evt bus-dead?
                            (lambda (ev) (wrap-evt ev (const (place-wait bus-pipe))))))))

bus-pipe here is the read end of a Racket place channel that receives the arguments necessary to poll the gst_bus_timed_pop_filtered function.

The place forwards messages back to this pipe and terminates when the last message is a fatal-message? — a message indicating that an error or the end of the stream has been reached.

The return value of make-bus-channel is a choice-evt: a synchronizable event whose value is either a message coming from the pipe (or NULL depending on the timeout set) or a bus-dead? event indicating that the place has closed down. That last little choice-evt is kind of tricky considering how much wrap-evt and handle-evt is sprinkled through there, but looking at the contract for make-bus-channel helps clarify:

[make-bus-channel
 (->* ((gobject/c gst-bus))
      (message-type/c
       #:timeout clock-time?)
      (evt/c (or/c message?
                   false/c
                   (evt/c exact-integer?))))]

This says that make-bus-channel returns an event whose synchronization result is either a message, #f (representing NULL), or another event with a synchronization result of the exit status of the bus-pipe.

To pull messages off the bus you continuously synchronize on it (e.g. with sync or its related functions) until you receive another evt in which case the bus has been closed.

spawn-bus-worker

Back in overscan/main.rkt I build the loop part of the event loop. The spawn-bus-worker takes a GStreamer pipeline, and calls make-bus-channel to get the emitter. Then, it spawns a new thread and starts a loop. Each pass it calls sync, which will block until something new is emitted from the bus.

(define (spawn-bus-worker broadcast)
  (let* ([bus (send broadcast get-bus)]
         [chan (make-bus-channel bus)])
    (thread (thunk
             (let loop ()
               (let ([ev (sync chan)])
                 (if (evt? ev)
                     (semaphore-post broadcast-complete-evt)
                     (begin
                       (for ([proc (in-hash-values broadcast-listeners)])
                         (proc ev broadcast))
                       (loop)))))))))

When what comes over the bus is an evt?, I know from that make-bus-channel contract that this means the bus has closed down; my pipeline is done. I post to a semaphore and exit the loop. The semaphore is a little synchronization mechanism that in this case indicates that the broadcast has completed; I call this one the broadcast-complete-evt.

If anything else comes over the bus, I loop through a set of listeners, and call each listener with whatever that other thing was and the pipeline that emitted the thing. I know from the above contract that this thing (called ev within this procedure) can only be a message? or #f. Then I call another iteration of the loop.

broadcast-listeners

The group of listeners that are called with each pass of the loop are stored in some global state called broadcast-listeners, which is just a mutable hash. The keys are integers and the values are procedures.

(define broadcast-listeners
  (make-hash (list (cons 0 default-broadcast-listener))))

default-broadcast-listener is the bare minimum thing that has to happen when a fatal-message? appears on the bus: set the pipeline state to 'null.

(define (default-broadcast-listener msg broadcast)
  (when (fatal-message? msg)
    (send broadcast set-state 'null)))

add-listener

Adding a new listener to this stack is as easy as mutating the broadcast-listener hash.

(define (add-listener proc)
  (let* ([stack broadcast-listeners]
         [key (hash-count stack)])
    (hash-set! stack key proc)
    key))

The key is just the number of things in the hash. I can trust on all the Racket contract stuff to make sure that the procedures always take the right arguments.

That’s my event loop. make-bus-channel sets up an event emitter for a pipeline, spawn-bus-handler begins a loop to continuously read from the emitter, and broadcast-listeners allows the end-user to manage listeners for events as they come through.

stop

Because Overscan is built for live streaming, the user has to explicitly end the broadcast. They do this by calling stop.

(define (stop #:timeout [timeout 5])
  (define broadcast (get-current-broadcast))
  (send broadcast send-event (make-eos-event))
  (if (sync/timeout timeout broadcast-complete-evt)
      (let-values ([(result current pending) (send broadcast get-state)])
        (set-box! current-broadcast #f)
        result)
      (error "Could not stop broadcast")))

stop pulls the current broadcast out of global state, sends it an EOS event, and then waits for the broadcast-complete-evt semaphore to unblock. GStreamer internals handle propagating that event through the pipeline. An EOS message is eventually emitted from the bus and it shuts down. Once that happens, the event loop posts to the semaphore, this procedure resumes, and the global state can be reset.

That all feels pretty clean, and gives me some structure to do a lot of different things with message handling.

So beyond working through this event stuff, this week I also started writing documentation for the new-and-improved gstreamer module.

Read the GStreamer documentation here. More to come over the next week!

Good concurrency and synchronization primitives are 😙👌.

👻 Mark