This Week’s Program: Oct 16 – Oct 20
Hi fam. I wrote some code this week. It was in Racket.
Let’s make an event loop!
Today I want to focus on a group of related commits, and talk you through what I’ve done.
In the first commit, I blow away nearly all of
start over again with my new
gstreamer module in my quiver.
The first thing I want to figure out in this rewrite, before getting all fancy, is to handle the messages that a pipeline emits through its lifetime. Related to that is how to shutdown the pipeline cleanly when there’s an error or the stream has ended. I also want to be able to incrementally add or take away things that can monitor the event bus, like debugging and logging utilities.
I need an event loop!
This function has existed for some time in different parts of the
Overscan codebase, and has been refined. It currently exists
(define (make-bus-channel bus [filters '(any)] #:timeout [timeout clock-time-none]) (let* ([bus-pipe (spawn-bus-place)] [bus-dead? (place-dead-evt bus-pipe)]) (place-channel-put bus-pipe (list (gtype-instance-pointer (gobject-ptr bus)) timeout filters)) (choice-evt (wrap-evt bus-pipe (lambda (ptr) (and ptr (gstruct-cast ptr gst-message)))) (handle-evt bus-dead? (lambda (ev) (wrap-evt ev (const (place-wait bus-pipe))))))))
The place forwards messages back to this pipe and terminates when the
last message is a
fatal-message? — a message indicating that an
error or the end of the stream has been reached.
The return value of
make-bus-channel is a
synchronizable event whose
value is either a message coming from the pipe (or NULL depending on
the timeout set) or a
bus-dead? event indicating that the place has
closed down. That last little
choice-evt is kind of tricky
considering how much
handle-evt is sprinkled through
there, but looking at the contract for
[make-bus-channel (->* ((gobject/c gst-bus)) (message-type/c #:timeout clock-time?) (evt/c (or/c message? false/c (evt/c exact-integer?))))]
This says that
make-bus-channel returns an event whose
synchronization result is either a message,
NULL), or another event with a synchronization result of the
exit status of the
To pull messages off the bus you continuously synchronize on it
sync or its related functions) until you receive another
evt in which case the bus has been closed.
overscan/main.rkt I build the loop part of the event
spawn-bus-worker takes a GStreamer pipeline, and calls
make-bus-channel to get the emitter. Then, it spawns a new thread
and starts a loop. Each pass it calls
sync, which will block
until something new is emitted from the bus.
(define (spawn-bus-worker broadcast) (let* ([bus (send broadcast get-bus)] [chan (make-bus-channel bus)]) (thread (thunk (let loop () (let ([ev (sync chan)]) (if (evt? ev) (semaphore-post broadcast-complete-evt) (begin (for ([proc (in-hash-values broadcast-listeners)]) (proc ev broadcast)) (loop)))))))))
When what comes over the bus is an
evt?, I know from that
make-bus-channel contract that this means the bus has closed down;
my pipeline is done. I post to
and exit the loop. The semaphore is a little synchronization
mechanism that in this case indicates that the broadcast has
completed; I call this one the
If anything else comes over the bus, I loop through a set of
listeners, and call each listener with whatever that other thing was
and the pipeline that emitted the thing. I know from the above
contract that this thing (called
ev within this procedure) can only
#f. Then I call another iteration of the
The group of listeners that are called with each pass of the loop are
stored in some global state called
broadcast-listeners, which is
just a mutable hash. The keys are integers and the values are
(define broadcast-listeners (make-hash (list (cons 0 default-broadcast-listener))))
default-broadcast-listener is the bare minimum thing that has to
happen when a
fatal-message? appears on the bus: set the pipeline
(define (default-broadcast-listener msg broadcast) (when (fatal-message? msg) (send broadcast set-state 'null)))
Adding a new listener to this stack is as easy as mutating the
(define (add-listener proc) (let* ([stack broadcast-listeners] [key (hash-count stack)]) (hash-set! stack key proc) key))
The key is just the number of things in the hash. I can trust on all the Racket contract stuff to make sure that the procedures always take the right arguments.
That’s my event loop.
make-bus-channel sets up an event emitter for
spawn-bus-handler begins a loop to continuously read
from the emitter, and
broadcast-listeners allows the end-user to
manage listeners for events as they come through.
Because Overscan is built for live streaming, the user has to
explicitly end the broadcast. They do this by calling
(define (stop #:timeout [timeout 5]) (define broadcast (get-current-broadcast)) (send broadcast send-event (make-eos-event)) (if (sync/timeout timeout broadcast-complete-evt) (let-values ([(result current pending) (send broadcast get-state)]) (set-box! current-broadcast #f) result) (error "Could not stop broadcast")))
stop pulls the current broadcast out of global state, sends it an
EOS event, and then waits for the
to unblock. GStreamer internals handle propagating that event through
the pipeline. An EOS message is eventually emitted from the bus and
it shuts down. Once that happens, the event loop posts to the
semaphore, this procedure resumes, and the global state can be reset.
That all feels pretty clean, and gives me some structure to do a lot of different things with message handling.
So beyond working through this event stuff, this week I also started
writing documentation for the new-and-improved
Read the GStreamer documentation here. More to come over the next week!
Good concurrency and synchronization primitives are 😙👌.