Pumpkin Spice Programs
This Week’s Program: Oct 16 – Oct 20
Hi fam. I wrote some code this week. It was in Racket.
Let’s make an event loop!
Today I want to focus on a group of related commits, and talk you through what I’ve done.
- 16f501d15efef42ce01973c21fce1d162ff1cc67
- 45531e1eaec8f690e89e43351941db4848e9ed6e
- c9b07cb6caecd9b3e583cec6dc48c6770839e647
- d4702d9512284eb17816431096b7631b56df913d
In the first commit, I blow away nearly all of overscan/main.rkt
and
start over again with my new gstreamer
module in my quiver.
The first thing I want to figure out in this rewrite, before getting all fancy, is to handle the messages that a pipeline emits through its lifetime. Related to that is how to shutdown the pipeline cleanly when there’s an error or the stream has ended. I also want to be able to incrementally add or take away things that can monitor the event bus, like debugging and logging utilities.
I need an event loop!
make-bus-channel
This function has existed for some time in different parts of the
Overscan codebase, and has been refined. It currently exists
in
gstreamer/bus.rkt
.
(define (make-bus-channel bus [filters '(any)]
#:timeout [timeout clock-time-none])
(let* ([bus-pipe (spawn-bus-place)]
[bus-dead? (place-dead-evt bus-pipe)])
(place-channel-put bus-pipe (list (gtype-instance-pointer (gobject-ptr bus))
timeout
filters))
(choice-evt (wrap-evt bus-pipe
(lambda (ptr) (and ptr (gstruct-cast ptr gst-message))))
(handle-evt bus-dead?
(lambda (ev) (wrap-evt ev (const (place-wait bus-pipe))))))))
bus-pipe
here is the read end of a
Racket place
channel that receives the arguments necessary to poll
the
gst_bus_timed_pop_filtered
function.
The place forwards messages back to this pipe and terminates when the
last message is a fatal-message?
— a message indicating that an
error or the end of the stream has been reached.
The return value of make-bus-channel
is a choice-evt
:
a
synchronizable event whose
value is either a message coming from the pipe (or NULL depending on
the timeout set) or a bus-dead?
event indicating that the place has
closed down. That last little choice-evt
is kind of tricky
considering how much wrap-evt
and handle-evt
is sprinkled through
there, but looking at the contract for make-bus-channel
helps
clarify:
[make-bus-channel
(->* ((gobject/c gst-bus))
(message-type/c
#:timeout clock-time?)
(evt/c (or/c message?
false/c
(evt/c exact-integer?))))]
This says that make-bus-channel
returns an event whose
synchronization result is either a message, #f
(representing
NULL), or another event with a synchronization result of the
exit status of the bus-pipe
.
To pull messages off the bus you continuously synchronize on it
(e.g. with sync
or its related functions) until you receive another
evt
in which case the bus has been closed.
spawn-bus-worker
Back in overscan/main.rkt
I build the loop part of the event
loop. The spawn-bus-worker
takes a GStreamer pipeline, and calls
make-bus-channel
to get the emitter. Then, it spawns a new thread
and starts a loop. Each pass it calls sync
, which will block
until something new is emitted from the bus.
(define (spawn-bus-worker broadcast)
(let* ([bus (send broadcast get-bus)]
[chan (make-bus-channel bus)])
(thread (thunk
(let loop ()
(let ([ev (sync chan)])
(if (evt? ev)
(semaphore-post broadcast-complete-evt)
(begin
(for ([proc (in-hash-values broadcast-listeners)])
(proc ev broadcast))
(loop)))))))))
When what comes over the bus is an evt?
, I know from that
make-bus-channel
contract that this means the bus has closed down;
my pipeline is done. I post to
a semaphore
and exit the loop. The semaphore is a little synchronization
mechanism that in this case indicates that the broadcast has
completed; I call this one the broadcast-complete-evt
.
If anything else comes over the bus, I loop through a set of
listeners, and call each listener with whatever that other thing was
and the pipeline that emitted the thing. I know from the above
contract that this thing (called ev
within this procedure) can only
be a message?
or #f
. Then I call another iteration of the
loop.
broadcast-listeners
The group of listeners that are called with each pass of the loop are
stored in some global state called broadcast-listeners
, which is
just a mutable hash. The keys are integers and the values are
procedures.
(define broadcast-listeners
(make-hash (list (cons 0 default-broadcast-listener))))
default-broadcast-listener
is the bare minimum thing that has to
happen when a fatal-message?
appears on the bus: set the pipeline
state to 'null
.
(define (default-broadcast-listener msg broadcast)
(when (fatal-message? msg)
(send broadcast set-state 'null)))
add-listener
Adding a new listener to this stack is as easy as mutating the
broadcast-listener
hash.
(define (add-listener proc)
(let* ([stack broadcast-listeners]
[key (hash-count stack)])
(hash-set! stack key proc)
key))
The key is just the number of things in the hash. I can trust on all the Racket contract stuff to make sure that the procedures always take the right arguments.
That’s my event loop. make-bus-channel
sets up an event emitter for
a pipeline, spawn-bus-handler
begins a loop to continuously read
from the emitter, and broadcast-listeners
allows the end-user to
manage listeners for events as they come through.
stop
Because Overscan is built for live streaming, the user has to
explicitly end the broadcast. They do this by calling stop
.
(define (stop #:timeout [timeout 5])
(define broadcast (get-current-broadcast))
(send broadcast send-event (make-eos-event))
(if (sync/timeout timeout broadcast-complete-evt)
(let-values ([(result current pending) (send broadcast get-state)])
(set-box! current-broadcast #f)
result)
(error "Could not stop broadcast")))
stop
pulls the current broadcast out of global state, sends it an
EOS event, and then waits for the broadcast-complete-evt
semaphore
to unblock. GStreamer internals handle propagating that event through
the pipeline. An EOS message is eventually emitted from the bus and
it shuts down. Once that happens, the event loop posts to the
semaphore, this procedure resumes, and the global state can be reset.
That all feels pretty clean, and gives me some structure to do a lot of different things with message handling.
So beyond working through this event stuff, this week I also started
writing documentation for the new-and-improved gstreamer
module.
Read the GStreamer documentation here. More to come over the next week!
Good concurrency and synchronization primitives are 😙👌.
👻 Mark