/tinyletter

The Programs of a Week of GStreamer

This Week’s Program: May 8 - May 12

Sixteen commits on Overscan this week, all of them of peak quality. 😙👌

Last week, I hit a major milestone with my ability to interact with GStreamer. I think GStreamer is a fine library. This week I’ve attempted to continue to work my way through the tutorials and to try to understand how GStreamer works. My GObject Introspection module seems steady enough, though kind of scattered and messy, and I’ve made some small changes to it as I needed more support from the API, or thought of ways to make the API a better citizen of the Racket ecosystem.

This week, I also gave a variation of a talk I’m calling Elm: Theory & Practice: an introduction to the Elm language based on my experiences building Hive City. Here’s a video of the talk, back when I first delivered it about a month ago.

c8ed73c24d22eedc45727d9aa598cbe39d011691

When a Signal is emitted, instead of applying the callback in the main thread, I now pass it off to a worker thread. I’m dipping my toe into the waters of concurrency in Racket. Threads in Racket have “mailboxes”, which are multi-producer single-consumer FIFO queues. Sending something into the mailbox (with thread-send) is asynchronous, while receiving something (thread-receive) blocks until something is available. This means that you can have threads behave close enough to CSP style concurrency (like one finds in core.async, which I used a bunch in sonic-sketches). In addition to threading primitives, there’s all sorts of synchronization mechanisms available in Racket, which I use more of a bit later.

I create a worker thread for each kind of signal and all callbacks emitted for that signal get run on this worker thread, with what I learned about #:async-apply in FFI last week.

Racket threads are green threads. I was still running into some deadlock issues in certain circumstances.

998ef26c1dd60d155efd3c8a7e0b45212df6b7f9

I remove the box that I used to keep the callback handle around. It turns out I didn’t need it. By removing this code, I fixed some crashes when running this program in DrRacket. DrRacket has much more aggressive garbage collection and my box was being collected too soon.

41617c6fa6ce09bd45935481ea5a0ec98d328c43

GObjects and Structs that have been introspected by GIR behave really similarly. So much so that I would like to pretend that they are basically the same. This code makes it so that their common ancestor (gi-registered-type) can be used to lookup methods and fields, and that the forms I use to provide an object-oriented API (e.g. send, get-field, etc.) can be used for both GObjects and C Structs.

6a47ecc540ba39b0869903e9a00ead9b509e42b8

This code makes it so that when invoking an introspected function, if there are any by-reference arguments, those will be returned along with the function return type. I built this specifically for functions like gst_element_query_position(). This function returns a boolean, but also takes a by-reference argument of an integer, and that integer reference actually provides the information you want to know!

Racket has multiple return values and they end up making a lot of the program weird. In this case, it means sprinkling in lots of call-with-values and let-values functions, but this feels “right”.

971a0731b41927ae00a9aa8587ab8e468e6cd0eb

I’m working my way through Basic Tutorial 4: Time management and keep bumping into the previously discussed deadlock problem. The solution for true parallelism in Racket is Places. Places are cool because you have similar channel messaging like you do with threads, but you’re restricted on the types of values that can be passed back and forth.

In my new gstreamer/bus module, I provide a new function make-bus-channel. This function wraps a common GST Bus operation, gst_bus_timed_pop_filtered(), used in most of the Tutorials. This C function is executed in a new place, and whenever a new message comes off the bus, it gets pushed into a channel. That channel is then wrapped, with wrap-evt so that the data coming out of the place is always in the expected form and returned to the caller. Now I can poll this channel and read out the messages from the bus without blocking or deadlocking or anything like that.

(thread (lambda () (let loop ()
                  (define msg (sync pipe))
                  (println (get-field type msg))
                  (unless (memf (lambda (x) (memq x '(eos error)))
                                (get-field type msg))
                    (loop)))))

This code starts a new thread that begins a loop. Each pass of the loop reads data out of the bus. That sync call doesn’t know anything about the implementation of my bus place, it just blocks until something is available. Whenever a message comes through, it prints the type of message it is. If the message is a type of EOS (“end-of-stream”) or Error, it stops the loop and returns.

Two warnings I saw periodically in the logs when working through this code:

!!! BUG: The current event queue and the main event queue are not the same. Events will not be handled correctly. This is probably because _TSGetMainThread was called for the first time off the main thread.

CoreAnimation: warning, deleted thread with uncommitted CATransaction; set CA_DEBUG_TRANSACTIONS=1 in environment to log backtraces, or set CA_ASSERT_MAIN_THREAD_TRANSACTIONS=1 to abort when an implicit transaction isn’t created on a main thread.

I don’t know what either of these mean. They don’t seem to be related to my usage of Places. Googling them doesn’t return anything fruitful. I’m guessing they have to do with the playbin Element. I need to do some more experimentation here.

eb7fcf8edac72c9758f2f4f89e0ae0d73901fcbb

I rig up gi-base=? for testing equality between two introspected info types. This provides the basis for is-a?, which can say if a GObject instance is an instance of an introspected type. With that comes is-a?/c, which is a contract that I can use to enforce a function boundary. With this in place, any caller that attempts to call make-bus-channel on something that isn’t a Bus will have broken the contract and produced an exception.

main.rkt> (make-bus-channel "this is wrong")
; make-bus-channel: contract violation
;   expected: (is-a? GstBus)
;   given: "this is wrong"
;   in: the 1st argument of
;       (->*
;        ((is-a? GstBus))
;        ((listof symbol?)
;         #:timeout
;         exact-nonnegative-integer?)
;        evt?)
;   contract from:
;       <pkgs>/overscan/gstreamer/bus.rkt
;   blaming: <pkgs>/overscan/gstreamer/main.rkt
;    (assuming the contract is correct)
;   at: <pkgs>/overscan/gstreamer/bus.rkt:8.24

e8cda07d61274c797c475e49bb12ce24a34ceda0

I update the Overscan README. This project has changed quite a bit since its inception. Now, I lay out what exactly I’m doing here:

Now, this project’s ambition is to provide a comprehensive live-coding environment for video compositing and broadcasting.

Yup. Doing that.

Next week, I hope to actually create (and then share with you of course) some examples of videos put together with Racket+GStreamer. This will probably involve me rebuilding a bunch of GStreamer plugins, many of which require additional C lib dependencies.

Have a great Mother’s Day weekend!

👨‍🍳 Mark