Skip to content

acsys tutorial

Rich Neswold edited this page Apr 30, 2021 · 3 revisions

Our control system contains a rich set of services and resources to keep the accelerator running. Although it uses many different technologies to accomplish this, the fundamental tech that pulls it all together is the ACNET network transport. This Wiki page shows how a Python script can participate in the ACNET network. Most Python programmers will use higher-level packages when interacting with control system services. This page is primarily targeted for authors of those packages.

Brief Intro to ACNET

To make a system a part of the ACNET network, its name and IP addresses need to be registered and the machine must run an ACNET daemon. The most up to date is acnetd.

When a client attaches to the ACNET service, it needs a "handle" which is a 6-character string that uniquely identifies the client on that node. Clients don't usually care about the handle since all their communication is outbound to services. As a result, they get assigned a dynamic name. If a task wants to provide a service for the control system, it registers itself with a handle name associated with the service. The acsys package currently only allows Python scripts to connect as clients. When referring to ACNET tasks, this package uses an Internet-like scheme, "HANDLE@NODE".

Clients can make requests, which come in two flavors, to other ACNET tasks. One form of request expects only a single reply. This can be thought of as a Remote Procedure Call (RPC) where the request contains data for the remote procedure and the reply has the returned data. The other type of request is one that expects multiple - possibly infinite - replies. This request type is very useful for processing accelerator data.

ACNET is a transport, not a protocol. ACNET routes messages from clients to services and back. It does not care about the contents of the message. As a result, we have many well-known services handles, each of which understands their own, specific protocol.

Programming With the acsys Package

When this package was created, Python 3.8 was the latest version and Python 2.7 was officially obsolete (although it stubbornly refused to go away.) With all major languages supporting asynchronous libraries, including Python, the author decided to base this package on the asyncio module.

To use this package, the starting function of your script needs to be marked async and gets passed to acsys.run_client(). Your starting function will be passed one argument, an acsys.Connection which manages your script's connection to ACNET. Inside your function, you can create other asynchronous tasks. When your function returns, acsys.run_client() will return.

Here's a very simple script that connects to ACNET and prints the handle that was assigned to it:

#!/usr/bin/env python3
import acsys

async def my_app(con):
    print(f'Assigned handle {con.handle}.')

acsys.run_client(my_app)

my_app is our "main" function for the script. It simply prints its handle, which is stored in the handle property of the Connection object. It gets passed to run_client in Line 7.

Logging

The acsys package uses Python's logging module to report interesting and error conditions. Specifically, it uses the acsys logger instance. It is recommended to configure the logger in your scripts to help debug your code. Rewriting the previous example to use the logger:

#!/usr/bin/env python3
import asyncio
import logging
import acsys

# Initialize the logger. Set the minimum reported level to INFO.

FORMAT = '%(asctime)-15s [%(levelname)s] %(message)s'
logging.basicConfig(format=FORMAT)

log = logging.getLogger('acsys')
log.setLevel(logging.INFO)

# Define starting function.

async def my_app(con):
    log.info('Assigned handle %s', con.handle)

acsys.run_client(my_app)

Using await

Up to this point, neither example took advantage of the asynchronous properties of the library. In this next example, we'll make an RPC call to a remote node.

#!/usr/bin/env python3
import asyncio
import logging
import acsys

# Initialize the logger. Set the minimum reported level to INFO.

FORMAT = '%(asctime)-15s [%(levelname)s] %(message)s'
logging.basicConfig(format=FORMAT)

log = logging.getLogger('acsys')
log.setLevel(logging.INFO)

# Define starting function.

async def my_app(con):
    node = 'CENTRA'
    if await con.ping(node):
        log.info('%s replied', node)
    else:
        log.warning('%s not responding', node)

acsys.run_client(my_app)

In the if-statement of the my_app function, the .ping() method is called, which is an async method so we use await to block this task until a reply is received. Since this example doesn't run other tasks, it'll behave like synchronous code. What's important to remember is, aside from the async and await keywords, this code looks like synchronous code you'd write in Python 2.7. The big difference is that this code will work if other async tasks are running and won't block their progress while waiting for the reply.

Let's change the example to ping two nodes by rewriting my_app as:

async def my_app(con):
    result1 = await con.ping('CENTRA')
    result2 = await con.ping('CLXSRV')
    log.info('(%s, %s)', str(result1), str(result2))

This code is still rather synchronous; the ping to CLXSRV won't happen until the ping to CENTRA has been resolved. However, at each await, other tasks can make progress. To perform these pings "in parallel", rewrite my_app as this:

async def my_app(con):
    a, b = await asyncio.gather(
        asyncio.Task(con.ping('CENTRA')),
        asyncio.Task(con.ping('CLXSRV'))
    )
    log.info('(%s, %s)', str(a), str(a))

In this example, the two async functions are wrapped as tasks and asyncio.gather() is used to place both of them on the scheduler. Since there are two tasks, a 2-tuple will be returned. Using a network analyzer shows both pings are sent on the network before either receives a reply. This example suggests how you can run concurrent tasks.

As an example:

async def ping_periodically(con, node, delay):
    while True:
        await asyncio.sleep(delay)
        if await con.ping(node):
            log.info('%s is still up', node)
        else:
            log.warning('%s is not responding', node)

async def my_app(con):
    await asyncio.gather(
        asyncio.Task(ping_periodically(con, 'CLXSRV', 0.5)),
        asyncio.Task(ping_periodically(con, 'CENTRA', 0.333))
    )

This example creates two tasks that loop forever. Neither knows about the other, yet they will share the CPU and each keep their own timing.

The observant reader may have noticed there's no await keyword before either call to ping_periodically. An async function returns an Awaitable object which can be passed around like other objects. If you're interested in the final value of the Awaitable, you preface it with the await keyword. If the Awaitable doesn't yet have a value, that code will be blocked until a value arrives. If the Awaitable has a value, then await immediately returns the current value. When we pinged other ACNET nodes, we wanted the result of the ping, so we used await. In this example, we want asyncio.gather to wait on the Awaitables, so we pass them as is.

Requests and Replies

The .ping() used in the previous examples is a "helper method" in that it simplifies the details to perform this function. The ACNET "PING" is one request type support by the "ACNET" task that is present on all ACNET services. The .ping() method uses the more general .request_reply() method to send the request and wait for the reply. .request_reply() takes a remote task specification, a message, and an optional timeout. Since the ping request always goes to the "ACNET" handle on the node, .ping() only allows the caller to specify the node. The PING packet is typecode 0, subcode 0 which can be expressed in Python as the binary b'\x00\x00'.

So, to first order, the .ping() method looks like:

async def ping(self, node):
    return await self.request_reply('ACNET@' + node, b'\x00\x00', timeout=250)

but if it times out, we don't want it to raise an ACNET timeout status but, instead, return False. So the real implementation looks more like this:

async def ping(self, node):
    try:
        await self.request_reply('ACNET@' + node, b'\x00\x00', timeout=250)
        return True
    except acsys.status.Status as e:
        if e == acsys.status.ACNET_UTIME:
            return False
        else:
            raise

As we've seen, .request_reply() can be used to make a request to another ACNET task and receive one reply. This method returns a future (actually a Python Awaitable) so await needs to be used to wait for the reply. This function returns a 2-tuple. The first element is the trunk/node address of the replier. In almost every case, this value is the same as the node specified in the task's address so it can be ignored. ACNET supports multicasted requests. In the rare case you're multicasting your request, the replier value would be useful to see who replied first (you're only going to get one reply with this method.) The second element is the reply message which, by default, is a binary but, if proto is defined, it will be an data type unmarshaled by code generated by the protocol compiler.

If you want to make a request that returns multiple replies, use .request_stream(). It takes the same arguments as the single reply method. However, it returns an asynchronous generator which will yield each reply as they arrive. The generator can be used in an async-for-loop. Let's rewrite my_app to ping every ACNET node with a single request and then print out all the repliers. The ACNET node table maps node names to IP addresses. Some of these IP addresses are multicast addresses so sending request to these special nodes will multicast the request to listening nodes. All ACNET nodes, by default, always join the address associated with the node MCAST so we'll send the request to "ACNET@MCAST".

The program looks like this:

 1| #!/usr/bin/env python3
 2| 
 3| import asyncio
 4| import acsys
 5| 
 6| async def my_app(con):
 7|     gen = con.request_stream('ACNET@MCAST', b'\x00\x00', timeout=250)
 8| 
 9|     try:
10|         async for replier, _ in gen:
11|             name = await con.get_name(replier)
12|             print(f'{name}', end=', ')
13|     except acsys.status.Status as e:
14|         if e == acsys.status.ACNET_UTIME:
15|             pass
16| 
17| acsys.run_client(my_app)
  • Line 7 - multicasts out the PING request.
  • Line 10 - uses the async for-loop to wait for each reply to arrive. While it's blocked, other async tasks could be running (but none were started in this example)
  • Line 11 - convert the trunk/node address to a name.

When the author ran this, it resulted in 379 replies

FGATE, BBPL12, CLX138, CLX238, CLX38, CLX38E, CLX179, CLX279, CLX79, CLX119, CLX19, CLX219, CLX125, CLX225, CLX25, DCE12, CLX139, CLX239, CLX39, CLX39E, BBPL18, DCE16, CLX159, CLX259, CLX59, CLX59E, CLX1, CLX101, CLX201, CLX161, CLX261, CLX61, BBPL24, CLX174, CLX274, CLX74, TCP74, P2BLLA, STCLRF, CMLRFA, CMLRFB, CMLRFC, CMLRFD, CLX158, CLX258, CLX58, CLX58E, CLX107, CLX207, CLX7, CLX128, CLX228, CLX28, CLX28E, CLX132, CLX232, CLX32, CLX172, CLX272, CLX72, TCP72, BBPMT2, SRSAVE, SRFILE, ALARMZ, MIRROR, MACSET, DCE32, DLE32A, DCE04, DPM04, CLX109, CLX209, CLX9, CLX171, CLX271, CLX71, TCP71, CLX162, CLX262, CLX62, CLX173, CLX273, CLX73, TCP73, CLX73E, CLX73I, DCE05, DPM05, DLE05B, DCE15, CLX135, CLX235, CLX35, PLCFE, DCE17, BOOOAC, CHLOAC, DCE39, DCE25, CLX169, CLX269, CLX69, CLX146, CLX246, CLX46, CLX46E, DEBBPM, DUE30, CLX151, CLX251, CLX51, CLX51E, CLX104, CLX204, CLX4, CLX120, CLX20, CLX220, ESTATS, DUE47, VLOGV, DUE27, CLX178, CLX278, CLX78, CLX11, CLX111, CLX211, CLX129, CLX229, CLX29, CLX29E, DCE53, FSMDEV, DCE09, CLX126, CLX226, CLX26, CLX26E, DPE06, TRREC, LVTEST, TESTJY, DPE10, MACALC, TIMEAV, DCE10, CLX127, CLX227, CLX27, CLX27E, BBPL15, DCE50, LJOAC, DBSETS, SNAP, BIGSAV, DCE28, DSE02, LRHVAC, DUE23, DLE23A, CLX122, CLX22, CLX222, CLX149, CLX249, CLX49, CLX49E, DCE14, LNMXM4, CLX160, CLX260, CLX60, CLX133, CLX233, CLX33, MUON01, MUON02, DCE11, BUNNY4, DSE07, DUE10, DLE10A, CLX106, CLX206, CLX6, NMLDC2, CLX116, CLX16, CLX216, RCYSCH, DPE01, PRAC01, MTNRNG, DCE27, CLX167, CLX267, CLX67, CLX67E, DUE33, DLE33B, LNMXM5, CLX144, CLX244, CLX44, CLX44E, BBPL06, BLMLOG, DUE06, WEATHR, DUE34, RBEX, EVENTS, DUE09, CLX142, CLX242, CLX42, CLX42E, ERRORZ, DCE35, CLX155, CLX255, CLX55, CLX55E, DUE16, CLX113, CLX13, CLX213, DCE01, DPM01, DCE21, DCE20, CLX10, CLX110, CLX210, NUMRWM, BUNNY5, DSE06, RACKMN, DUE28, CLX121, CLX21, CLX221, LNMXM3, CLX170, CLX270, CLX70, DCE47, DLE47A, CLX148, CLX248, CLX48, CLX48E, CLX112, CLX12, CLX212, DSE03, CLX123, CLX223, CLX23, DCE38, DLE38A, DLE38B, DLE38C, DLE38D, DLE38E, DLE38F, DLE38G, DLE38H, DLE38I, DLE38J, DLE38K, DLE38L, CLX145, CLX245, CLX45, PIDLUP, DUE17, CLX154, CLX254, CLX54, CLX54E, CLX150, CLX250, CLX50, CLX147, CLX247, CLX47, CLX47E, CLX166, CLX266, CLX66, DCE26, DLE26A, EVENTN, DUE40, DLE40A, DLE40B, NMLDC1, CLX108, CLX208, CLX8, SETSDB, DCE42, DCE48, CLX164, CLX264, CLX64, CLX117, CLX17, CLX217, CLX152, CLX252, CLX52, CLX52E, ACLDEV, DUE21, DCE51, DLE51A, BUNNY2, DSE09, BUNNY0, DPE08, DPE03, DUE36, CACHE, MJRROR, DCE30, CLX114, CLX14, CLX214, MIEFFS, MIWIRS, MIBLMS, DUE32, DLE32B, CLX156, CLX256, CLX56, CLX56E, PRAC37, PRAC20, PRAC49, CLX163, CLX263, CLX63, BATCH, DUE20, QSSR12, PRAC02, GUNMOD,
Clone this wiki locally