Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a new BinaryStreamSink that allows reading a stream of encoded bytes #6242

Merged
merged 12 commits into from
May 7, 2024

Conversation

jleibs
Copy link
Member

@jleibs jleibs commented May 6, 2024

What

This effectively combines the guts of FileSink and MemorySink. Like the FileSink, this runs file-encoding on its own thread so that all we need to do when we're ready to read is move out the bytes.

The downside relative to memory_sink is the stream is a singular RRD. You have no way of accessing information about individual messages, draining from the backlog, etc.

This is designed to be as easy as possible to wire into a gradio output stream.

Example usage in a Gradio Component:

@rr.thread_local_stream("rerun_example_live")
def start_stream_direct(state):

    state['streaming'] = True

    cap = cv2.VideoCapture(0)
    frame_nr = 0
    stream = rr.binary_stream()

    while state.get('streaming', False):
        ret, img = cap.read()

        frame_time_ms = cap.get(cv2.CAP_PROP_POS_MSEC)
        if frame_time_ms != 0:
            rr.set_time_nanos("frame_time", int(frame_time_ms * 1_000_000))

        rr.set_time_sequence("frame_nr", frame_nr)
        frame_nr += 1

        print("Processing frame", frame_nr)

        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        # Log the original image
        rr.log("image/rgb", rr.Image(img).compress())

        # Convert to grayscale
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        rr.log("image/gray", rr.Image(gray).compress())

        # Run the canny edge detector
        canny = cv2.Canny(gray, 50, 200)
        rr.log("image/canny", rr.Image(canny).compress())

        yield stream.read()

Checklist

  • I have read and agree to Contributor Guide and the Code of Conduct
  • I've included a screenshot or gif (if applicable)
  • I have tested the web demo (if applicable):
  • The PR title and labels are set such as to maximize their usefulness for the next release's CHANGELOG
  • If applicable, add a new check to the release checklist!

To run all checks from main, comment on the PR with @rerun-bot full-check.

@jleibs jleibs marked this pull request as ready for review May 7, 2024 00:32
@jleibs jleibs changed the title Introduce a new BinaryStream / BinarySink Introduce a new BinaryStream / BinaryStreamSink May 7, 2024
@jleibs jleibs changed the title Introduce a new BinaryStream / BinaryStreamSink Introduce a new BinaryStreamSink that allows reading a stream of encoded bytes May 7, 2024
@Wumpf Wumpf self-requested a review May 7, 2024 07:25
Copy link
Member

@Wumpf Wumpf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat utility, seems pretty straight forward

wondering if we should expose this new type of stream to C++ as well, but I reckon we don't have a usecase for it it, so better not to

Requesting changes because I'd love to see the test being automated - if this turns out to be too much work all good from my side

Comment on lines +28 to +33
impl Command {
fn flush() -> (Self, Receiver<()>) {
let (tx, rx) = std::sync::mpsc::sync_channel(0); // oneshot
(Self::Flush(tx), rx)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I found 3 implementations of this, two of them using crossbeam one using mpsc like you here. I like that you bring balance to the world ;)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good enough for FileSink, good enough for me 🤷

let storage = BinaryStreamStorage::new(rec);

// We always compress when writing to a stream
// TODO(jleibs): Make this configurable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any harm in just exposing that as a flag on creation right away?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly that (1) it also applies to every other sink type and (2) I can't come up with a good reason that a user would ever set it to False.

I'd rather introduce them all together in a consistent way if and when we have a compelling use-case.

tests/python/binary_stream/main.py Outdated Show resolved Hide resolved
@jleibs jleibs requested a review from Wumpf May 7, 2024 14:51
Copy link
Member

@Wumpf Wumpf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work! super cool to have this running on ci

@jleibs jleibs merged commit 61193fd into main May 7, 2024
35 checks passed
@jleibs jleibs deleted the jleibs/binary_stream branch May 7, 2024 18:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

New recording sink for an in-memory binary stream
2 participants