Skip to content
mdavidsaver edited this page Mar 24, 2018 · 6 revisions

Disclaimer: this is a work in progress.

What is described here are high level APIs. These are intended for end users. They may not expose all possible functionality of the underlying C++ libraries. There will probably be a low level/internal API which exposes all (or nearly all) functionality, but which isn't intended for end users.

Use cases

These APIs are based around two use case. The first is the Process Variable (PV) concept as it was always existed in the EPICS world. A common data value observed via subscription by multiple clients. All clients see the same value, and are treated as indistinguishable from each other for the purposes of the Get and Monitor operations. For the purposes of the following discussion, this will be referred to as a Shared State PV.

[I'm trying not to have 'class PV' as many libraries already use this name]

This case allows for some restrictions.

  • The Get operation may not have side-effects. (eg. must only access cached Value)
  • The visible Type of a Shared State PV may not depend on the client pvRequest.
  • Each update is sent to all subscribers. (subscriber list is not exposed)

A variation on this avoids the need to cache a "current" value, but does not support the Get operation.

The second case is related to the RPC operation. The RPC operation accepts a single request Value and delivers a single result Value. This is sequential as the request value is received entirely before the RPC server handling code may examine it. User code must then provide the entire result Value before the underlying server code will begin to send it.

The Monitor operation has two concepts which can be combined to yield an RPC-like operation with a multi-part result. These are the notions of 1) a subscription having a last and final update, and 2) having per-subscription flow control. For the purposes of this discussion, this concept is called a Streaming PV.

This case allows for some restrictions.

  • get/put operations not supported

Modules

p4p.server

def installProvider(name, prov):
    pass
def removeProvider(name):
    pass
def Server(object):
    def __init__(self, providers=[...]):
        """Create and start a server with the given list of Providers.
           Providers may be given by the name string given to installProvider()
           or a Provider instance may be passed directly.
        """

p4p.server.raw

The following describes the "raw" (callback based) API. None of the following methods (implementation or user) should block (wait) for any operation to complete.

The entry point from the core server code is the Provider interface to be implemented by user code.

class Provider(object):
    def testPV(self, name):
        """Return bool to indicate if this server provides the named PV.
        """
        raise NotImplementedError()
    def makePV(self, name, peer=None):
        """Called each time a client creates a new channel.
           Must return an sub-class of SharedPV or StreamPV.

           (peer is a placeholder which will eventually provide access to peer host:port and auth. info)
        """
        raise NotImplementedError()

The SharedPV class holds a list of subscribers, and optionally a cache of the last Value post()'d to it.

It is distinguish from the concept of a Record by not having a user visible lock. Thus concurrent calls to put() are possible.

class SharedPV(object):
    def __init__(self, current=None, type=None, deffifo=4, maxfifo=4):
        """
           Either 'current' or 'type' must be provided.  current=None prevents the Get operation,
           but avoids storing/copying Values passed to post()

           The size of the subscriber FIFOs is determined by deffifo, maxfifo, and
           the client requested FIFO size (if provided).  (4 is a placeholder)
        """

    # View ofcached value of this PV accessed by Get operation.
    # Cache is modified via post().
    current = current

    def post(self, value=None):
        """User code calls this to send a subscription update to all clients.
           The changed mask of 'value' determines which parts/fields are sent.

           This method attempts to add an update to the FIFO of each subscribing client.
           If a client FIFO is full, the new update is "squashed" with the last entry in
           the FIFO and the overflow mask is updated (not exposed).

           If 'current is not None' then the cached Value is updated.
        """

    def put(self, op, value):
        """User code implements this method to handle each client Put.
           The base class method calls post().

           The changed mask of 'value' reflects fields which the remote client
           has provided.  The state of the other fields depends on self.current
           If self.current is None, then all unchanged fields of 'value' have an arbitrary default value.
           If self.current is not None, then unchanged fields of 'value' are copies.

           The Operation 'op' must be used to signal completion to the remote client.
           This method may be called again before completion is signaled.  User code
           must handle this case.
        """

    def rpc(self, op, value):
        """User code implements this method to handle each RPC request.

           Technically similar to put().  All fields are 'changed' (changed mask can be ignored).
        """

The put() and rpc() methods are passed an instance of Operation along with the new value to provide context which may be used during handling. This includes the pvRequest blob, whether the operation has been canceled by the client (explicitly, or via disconnection) and (eventually) information about the peer (host:port and auth. info).

TODO: How to deliver async. notification of cancellation?

In addition this object is used to indicate completion via done() and error(). The info() and warn() methods hook send a log message string to the client to eg. give information on non-fatal errors.

class Operation(object):
    peer=None # cf. Provider.makePV
    channel = SharedPV()
    request = Value() # client provided pvRequest
    canceled = False # set if remote client cancels us

    def done(self, val=None):
        """Indicate successful completion to client.

        val is None for put(), but may not be for rpc().
        """
    def error(self, msg):
        """Indicate Unsuccessful completion to client.
        """

    def info(self, msg):
        pass
    def warn(self, msg):
        """Send log message to remote client.  Does not indicate completion.
           May be called multiple times before done()/error().
        """

StreamPV is an alternative to SharedPV. Handling of rpc() is identical.

A stream represents a sequence of Values sent from this server to a single client. The Stream watermark and Handler.wakeup() allows the server to respond to the rate at which the client consumes these Values.

class StreamPV(object):
    def rpc(self, op, value):
        pass # same as SharedPV.rpc

    def stream(self, strm):
        """A new stream is requested (opened?).
        Returns a Handler() instance
        """
class Handler(object):
    def connection_lost(self, exc):
        """Called if the client closes the stream prior to our calling Stream.done().
        """
    def wakeup(self):
        """Called when FIFO watermark reached.  See Stream.__init__
        """

def Stream(object):
    peer=None # cf. Provider.makePV
    channel = object() # tbd. object shared between all rpc()/stream() on one Channel
    request = Value() # client provided pvRequest

    # Does FIFO has any entries
    empty = bool()
    # Does FIFO have all entries filled
    full = bool()
    # Percentage of FIFO which is filled.
    level = float()

    def __init__(self, deffifo=4, maxfifo=4, watermark=None):
        """
           The size of the subscriber FIFO is determined by deffifo, maxfifo, and
           the client requested FIFO size (if provided).  (4 is a placeholder)

           User code may "over-fill" the FIFO such that self.level > 100.0.
           This is a convenience to allow user code to avoid the extra state tracking of
           eg. a value extracted from an internal data source when the FIFO was full.

           'watermark' gives a percentage (in range [0, 100.0) ) of in the FIFO size.
           When the number of filled entries in the FIFO falls to 'watermark' (aka. self.level <= watermark)
           then the Handler.wakeup() is called.
           Note that Handler.wakeup() will not be called again until FIFO has risen above 'watermark'
           (aka. self.level > watermark).

           watermark=0 gives notification when the FIFO becomes empty.

           watermark=None will use an implementation defined default.  Currently 50.0 %.
        """

    def push(self, val=None):
        """Add another Value to FIFO.
           Returns True if FIFO not full (```self.level < 100.0```) after adding.
        """
    def done(self):
        """Indicate completion to client.
        """

    def info(self, msg):
        pass
    def warn(self, msg):
        """Send log message to remote client.  Does not indicate completion.
           May be called multiple times before done()/error().
        """

The Operation and Stream objects will complete w/ error and log a warning if they are GC'd before Operation.done() or Stream.done() is called.

Concurrent convenience APIs

The goal of the .thread, .cothread, and .asyncio APIs is to provide a facility for automatically running data production/modification work without the boilerplate (and potential bugs) of callback driven .raw API.

However, these wrappers will be implemented in terms of .raw so that raw *PV instances can be returned by any of the convenience API Providers. However, it will not be possible to mix, eg. threading and cothread within on Provider.

Each of these modules provides the same set of classes .raw.

p4p.server.thread

The methods SharedPV.put(), SharedPV.rpc(), Handler.rpc(), and Handler.stream() will be called from a worker thread on which blocking calls may be made.

Instead of implementing and returning a Handler object, Completion and Stream objects will have an additional member which is a 'threading.Event' which may be used to wait for a watermark, timeout, or cancellation events.

If Handler.stream() returns an iterator, then this will be iterated and the result given to Operation.push().

p4p.server.cothread

The methods SharedPV.put(), SharedPV.rpc(), Handler.rpc(), and Handler.stream() will be called from a cothread which may yield to the cothread scheduler.

The Completion and Stream objects will have an additional member which is a 'cothread.Event' which may be used to wait for watermark or cancellation events.

If StreamPV.stream() returns an iterator, then a worker thread will iterate it and the results passed to Operation.push().

p4p.server.asyncio

The methods SharedPV.put(), SharedPV.rpc(), Handler.rpc(), and Handler.stream() are treated as coroutines and may 'yield from'. 'yield from' will throw an exception on client cancellation or disconnection.

If a SharedPV or StreamPV instance has an optional attributes 'loop' then this is expected to be an event loop instance which will be used to any coroutines started.

If Handler.stream() returns an iterator, then this will be iterated and the result given to Operation.push(). TODO: distinguish between coroutine and plain iterator?