Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Streaming Examples to the Panel Gallery. #1744

Open
MarcSkovMadsen opened this issue Nov 4, 2020 · 8 comments
Open

Add Streaming Examples to the Panel Gallery. #1744

MarcSkovMadsen opened this issue Nov 4, 2020 · 8 comments
Labels
type: docs Related to the Panel documentation and examples

Comments

@MarcSkovMadsen
Copy link
Collaborator

MarcSkovMadsen commented Nov 4, 2020

Is your feature request related to a problem? Please describe.

I see more questions popping up on Discourse asking for help on streaming data. For example https://discourse.holoviz.org/t/how-to-link-indicators-to-objects-that-i-cant-modify/1387.

I cannot find any good and conscience examples of this on the Panel Site.

Describe the solution you'd like

Add some examples of using streaming data in Panel. Something that clearly illustrates how streams can be started or pushed into in Panel and update plots and tables in high speed. Something that looks like a real world example. Not something simplified that you cannot see how to relate to something real. Get a stream from Github or something else.

Additional context

  • I have a feeling that this can be done in many ways. I would like to see some reference/ best practice examples.
  • Maybe there is a link to streamz in HoloViz. But since I and maybe a lot others don't come with extensive experience from there, it is not obvious we should look there if that is the case.
  • I would like to master this my self also :-)
@MarcSkovMadsen MarcSkovMadsen added the TRIAGE Default label for untriaged issues label Nov 4, 2020
@jbednar
Copy link
Member

jbednar commented Nov 4, 2020

Here's a very simple example of making a streaming app with hvPlot+Panel+streamz:

https://anaconda.org/jbednar/project/streaming_hvplot

@xavArtley
Copy link
Collaborator

I can propose something like that based on zmq publisher/subscriber:
ezgif com-gif-maker

@MarcSkovMadsen
Copy link
Collaborator Author

That is exactly what I was looking for. Can you share the code @xavArtley .

If I get a few good examples I would polish them and do a PR on Panel and jointly share live at awesome-panel.org.

@xavArtley
Copy link
Collaborator

xavArtley commented Nov 5, 2020 via email

@xavArtley
Copy link
Collaborator

xavArtley commented Nov 5, 2020

This is the code.

import re
import time
import datetime
import threading

import zmq
import param
import numpy as np
import panel as pn
import holoviews as  hv

from bokeh.models import DataRange1d, DatetimeTickFormatter

pn.extension()

class Publisher(param.Parameterized):
    
    start_stop = param.Boolean()
    logger = param.String(precedence=-1)
    port = param.Integer(default=5556, precedence=-1)
    verbose = param.Boolean(default=True)
    log_buffer_size = param.Integer(default=100, precedence=-1)
    pub_delay = param.Number(default=5e-2, bounds=(1e-2,1), step=1e-2)
    
    def __init__(self, **params):
        super().__init__(**params)
        self._run = False
        self._output_display = pn.pane.Markdown(sizing_mode="stretch_both", css_classes=["scrollable"])
        self._log_buffer = []
        self._log_cb = pn.state.add_periodic_callback(self._update_logger, 200, start=False)
    
    @property
    def layout(self):
        controls = pn.Param(self, widgets={
            "start_stop": {"type": pn.widgets.Toggle, "name": "Start/Stop"},
            "pub_delay": {"type": pn.widgets.FloatInput}
        }, default_layout=pn.WidgetBox)
        return pn.Row(controls, self._output_display, sizing_mode="stretch_both")
    
    def _update_logger(self):
        if self.verbose:
            self._output_display.object = '<br>'.join(self._log_buffer[::-1])
    
    @param.depends("logger", watch=True)
    def _update_display(self):
        if len(self._log_buffer) > self.log_buffer_size:
            self._log_buffer.pop(0)
        self._log_buffer.append(self.logger)
    
    def _timedelta_to_str(self, dt):
        h = (dt.astype('timedelta64[h]') - dt.astype('timedelta64[D]')).astype(int)
        m = (dt.astype('timedelta64[m]') - dt.astype('timedelta64[h]')).astype(int)
        s = (dt.astype('timedelta64[s]') - dt.astype('timedelta64[m]')).astype(int)
        ms = (dt.astype('timedelta64[ms]') - dt.astype('timedelta64[s]')).astype(int)
        us = (dt.astype('timedelta64[us]') - dt.astype('timedelta64[ms]')).astype(int)
        return "{:02d}:{:02d}:{:02d}-{:03d}ms-{:03d}us".format(h,m,s,ms,us)
    
    def start_publisher(self):
        context = zmq.Context()
        socket = context.socket(zmq.PUB)
        adress = "tcp://*:%s" % self.port
        if self.verbose:
            self.logger = "Pub Socket binded at:%s" % adress
        socket.bind(adress)
        
        start_time = time.perf_counter()
        while self._run:
            f = 1 # frequency in hertz
            dt = np.timedelta64(datetime.timedelta(0, time.perf_counter()-start_time))
            messagedata = np.sin(dt.astype(float)*1e-6 *(2*np.pi*f)) #dt in us => 1e-6
            msg = "%s %f" % (self._timedelta_to_str(dt), messagedata)
            if self.verbose:
                self.logger = "Msg sended: %s" % msg
            socket.send_string(msg)
            time.sleep(self.pub_delay)
    
    @param.depends('start_stop', watch=True)
    def _start_stop(self):
        if self.start_stop:
            self.param.port.constant = True
            self._th = threading.Thread(target=self.start_publisher, daemon=True)
            self._run = True
            self._th.start()
            self._log_cb.start()
        else:
            self.param.port.constant = False
            self._run = False
            self._th.join()
            self._log_cb.stop()

class Streamer(param.Parameterized):
    
    port = param.Integer(default=5556)
    start_stop = param.Boolean(default=False) #, name='Start/Stop Streaming')
    buffer_len = param.Integer(default=500, bounds=(100, 1000), step=100) # name="Plot stream length (nb points)")
    update_rate = param.Number(default=500., bounds=(0,1000), step=1) # name="Data acquisition rate (nb points/s)"
    flush_buffer = param.Event() # name="flush buffer"
    
    
    def __init__(self, **params):
        super().__init__(**params)
        self._refresh_rate = 50 # hz
        self._run = False
        self._streamed_data = []
        self._buffer = hv.streams.Buffer(
            {"time": np.asarray([]), "y": np.asarray([])},
            index=False,
            length=self.buffer_len
        )
        self._dmap = hv.DynamicMap(lambda data: hv.Curve(data, kdims=['time'], vdims=['y']), streams=[self._buffer])
        self._dmap.opts(responsive=True, hooks=[self._hook])
        self._buffer_update_cb = pn.state.add_periodic_callback(self._populate_buffer, period=50, start=False)
    
    @property
    def layout(self):
        controls = pn.Param(self, widgets={
            "start_stop": {"type": pn.widgets.Toggle, "name": "Start/Stop Streaming"},
            "buffer_len": {"type": pn.widgets.IntInput, "name": "Plot stream length (nb points)"},
            "update_rate": {"type": pn.widgets.FloatInput, "name": "Data acquisition rate (nb points/s)"},
        }, default_layout=pn.WidgetBox)
        plot = pn.panel(self._dmap)
        return pn.Row(controls, plot, sizing_mode="stretch_both")
    
    @staticmethod
    def _str_to_timedelta(dt_str):
        date_pattern = r'(\d{2}):(\d{2}):(\d{2})-(\d{3})ms-(\d{3})us'
        date_elements = ("hours", "minutes", "seconds","milliseconds", "microseconds")
        return datetime.timedelta(
            **{k:int(val) for k, val in zip(date_elements, re.findall(date_pattern, dt_str)[0])}
        )
        
    @staticmethod
    def _hook(plot, element):
        """
         bokeh hook to avoid auto rescaling plot when zooming on stream data
        """
        plot.handles['plot'].x_range = DataRange1d()
        plot.handles['plot'].y_range = DataRange1d()
        plot.handles['plot'].xaxis.formatter=DatetimeTickFormatter(seconds = ['%Ss'])

    def _get_data(self):
        """
        function to evaluation in a thread to populate temp list with data fetch from socket
        """
        context = zmq.Context()
        socket = context.socket(zmq.SUB)
        socket.setsockopt_string(zmq.SUBSCRIBE, "") # subscribe to all topics
        socket.setsockopt(zmq.CONFLATE, 1) # get only last frame in the socket buffer
        socket.connect ("tcp://localhost:%s" % self.port)
        
        while self._run:
            str_recv = socket.recv_string()
            dt_str, data_str = str_recv.split(' ')
            data = {"time": np.asarray(self._str_to_timedelta(dt_str)), "y":np.asarray(float(data_str))}
            self._streamed_data.append(data)
            if self.update_rate>0:
                time.sleep(1/self.update_rate)


    def _populate_buffer(self):
        """
        periodic callback sent by panel to push data in _streamed_data inside the stream buffer
        avoid pending lock
        """
        if len(self._streamed_data) > 0:
            data = self._streamed_data
            time_array_to_send = np.zeros(len(data), dtype=data[0]["time"].dtype)
            data_array_to_send = np.zeros(len(data), dtype=data[0]["y"].dtype)
            for idx, d in enumerate(data):
                time_array_to_send[idx] = d["time"].item()
                data_array_to_send[idx] = d["y"].item()
            data_to_send = {"time": time_array_to_send, "y": data_array_to_send}
            self._buffer.send(data_to_send)
            self._streamed_data.clear()
        
    @param.depends("flush_buffer", watch=True)
    def _flush_buffer(self):
        """
        function to clean buffers
        """
        self._buffer.clear()
        self._streamed_data.clear()
        
    @param.depends("start_stop", watch=True)
    def _start_stop(self):
        if self.start_stop:
            self._flush_buffer()
            self._run = True
            self._th = threading.Thread(target=self._get_data, daemon=True)
            self._th.start()
            self._buffer_update_cb.start()
        else:
            self._run = False
            self._th.join()
            self._buffer_update_cb.stop()
    
    @param.depends("buffer_len", watch=True)
    def _update_buffer_length(self):
        self._buffer.length = self.buffer_len
        

pub = Publisher()
streamer = Streamer()
pn.Row(pub.layout, streamer.layout, sizing_mode="stretch_both")

@jbednar
Copy link
Member

jbednar commented Nov 5, 2020

I think that app would be just a dozen lines of code if you use the approach in https://anaconda.org/jbednar/project/streaming_hvplot ...

@xavArtley
Copy link
Collaborator

Actually I need the zmq connection for my real app
the publisher was just for the example

@philippjfr philippjfr added type: docs Related to the Panel documentation and examples and removed TRIAGE Default label for untriaged issues labels Nov 9, 2020
@MarcSkovMadsen
Copy link
Collaborator Author

MarcSkovMadsen commented Nov 12, 2020

@philippjfr . Would you be interested in a contribution to the gallery based on the Streaming Plots example I now have running at https://awesome-panel.org/streaming-plots? The contribution might need some help on your end.

  • For example investigating performance.
    • Right now serving the data takes up 10% CPU on my powerful laptop and the browser consuming the data another 10%. I have tried profiling but I cannot find anything in my code to optimize. As this should serve as a reference example I think it's important that performance is as good as is possible with Panel.
  • For example fixing issues discovered in the process.

The code is here https://github.com/MarcSkovMadsen/awesome-panel/blob/master/application/pages/streaming_plots/streaming_plots.py

Longer term I would like to contribute 2-3 examples. I think streaming examples could be a valuable category to have.

If you are interested please let me know. Thanks.

streaming-plots

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: docs Related to the Panel documentation and examples
Projects
None yet
Development

No branches or pull requests

4 participants