Skip to content

Commit

Permalink
Update py examples (#851)
Browse files Browse the repository at this point in the history
* Update streaming_data.py

* revise all py examples

* add scanner.py example

* show signal counts in scanner.py

* fix scanner.py

* finalize scanner.py

* change foreground color not BG

* better peak decay and countdown timer

* revert countdown timer effort

* fix peak calc in scanner.py
  • Loading branch information
2bndy5 authored Jul 9, 2022
1 parent aaed50c commit 8ddd1e8
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 113 deletions.
45 changes: 20 additions & 25 deletions examples_linux/acknowledgement_payloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,27 +63,22 @@ def master():
end_timer = time.monotonic_ns() # stop timer
if result:
# print timer results upon transmission success
decoded = buffer[:6].decode("utf-8")
print(
"Transmission successful! Time to transmit: "
"{} us. Sent: {}{}".format(
int((end_timer - start_timer) / 1000),
buffer[:6].decode("utf-8"),
counter[0],
),
"Transmission successful! Time to transmit:",
f"{int((end_timer - start_timer) / 1000)} us.",
f"Sent: {decoded}{counter[0]}",
end=" ",
)
has_payload, pipe_number = radio.available_pipe()
if has_payload:
# print the received ACK that was automatically sent
length = radio.getDynamicPayloadSize()
response = radio.read(length)
decoded = bytes(response[:6]).decode("utf-8")
print(
"Received {} on pipe {}: {}{}".format(
length,
pipe_number,
bytes(response[:6]).decode("utf-8"),
response[7:8][0],
)
f"Received {length} on pipe {pipe_number}:",
f"{decoded}{response[7:8][0]}",
)
# increment counter from received payload
if response[7:8][0] < 255:
Expand All @@ -99,7 +94,7 @@ def master():
print(failures, "failures detected. Leaving TX role.")


def slave(timeout=6):
def slave(timeout: int = 6):
"""Listen for any payloads and print the transaction
:param int timeout: The number of seconds to wait (with no transmission)
Expand All @@ -121,15 +116,12 @@ def slave(timeout=6):
received = radio.read(length) # fetch 1 payload from RX FIFO
# increment counter from received payload
counter[0] = received[7:8][0] + 1 if received[7:8][0] < 255 else 0
decoded = [bytes(received[:6]).decode("utf-8")]
decoded.append(buffer[:6].decode("utf-8"))
print(
"Received {} bytes on pipe {}: {}{} Sent: {}{}".format(
length,
pipe_number,
bytes(received[:6]).decode("utf-8"),
received[7:8][0],
buffer[:6].decode("utf-8"),
buffer[7:8][0],
)
f"Received {length} bytes on pipe {pipe_number}:",
f"{decoded[0]}{received[7:8][0]}",
f"Sent: {decoded[1]}{buffer[7:8][0]}",
)
buffer = b"World \x00" + bytes(counter) # build a new ACK payload
radio.writeAckPayload(1, buffer) # load ACK for next response
Expand All @@ -140,7 +132,7 @@ def slave(timeout=6):
radio.stopListening() # put radio in TX mode & flush unused ACK payloads


def set_role():
def set_role() -> bool:
"""Set the role using stdin stream. Timeout arg for slave() can be
specified using a space delimiter (e.g. 'R 10' calls `slave(10)`)
Expand All @@ -163,10 +155,10 @@ def set_role():
else:
slave()
return True
elif user_input[0].upper().startswith("T"):
if user_input[0].upper().startswith("T"):
master()
return True
elif user_input[0].upper().startswith("Q"):
if user_input[0].upper().startswith("Q"):
radio.powerDown()
return False
print(user_input[0], "is an unrecognized input. Please try again.")
Expand Down Expand Up @@ -226,7 +218,10 @@ def set_role():
pass # continue example until 'Q' is entered
else: # if role was set using CLI args
# run role once and exit
master() if bool(args.role) else slave()
if bool(args.role):
master()
else:
slave()
except KeyboardInterrupt:
print(" Keyboard Interrupt detected. Exiting...")
radio.powerDown()
Expand Down
27 changes: 14 additions & 13 deletions examples_linux/getting_started.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ def master():
failures += 1
else:
print(
"Transmission successful! Time to Transmit: "
"{} us. Sent: {}".format((end_timer - start_timer) / 1000, payload[0])
"Transmission successful! Time to Transmit:",
f"{(end_timer - start_timer) / 1000} us. Sent: {payload[0]}",
)
payload[0] += 0.01
time.sleep(1)
Expand All @@ -93,9 +93,8 @@ def slave(timeout=6):
payload[0] = struct.unpack("<f", buffer[:4])[0]
# print details about the received packet
print(
"Received {} bytes on pipe {}: {}".format(
radio.payloadSize, pipe_number, payload[0]
)
f"Received {radio.payloadSize} bytes",
f"on pipe {pipe_number}: {payload[0]}",
)
start_timer = time.monotonic() # reset the timeout timer

Expand All @@ -104,7 +103,7 @@ def slave(timeout=6):
radio.stopListening() # put the radio in TX mode


def set_role():
def set_role() -> bool:
"""Set the role using stdin stream. Timeout arg for slave() can be
specified using a space delimiter (e.g. 'R 10' calls `slave(10)`)
Expand All @@ -127,10 +126,10 @@ def set_role():
else:
slave()
return True
elif user_input[0].upper().startswith("T"):
if user_input[0].upper().startswith("T"):
master()
return True
elif user_input[0].upper().startswith("Q"):
if user_input[0].upper().startswith("Q"):
radio.powerDown()
return False
print(user_input[0], "is an unrecognized input. Please try again.")
Expand Down Expand Up @@ -174,8 +173,8 @@ def set_role():

# To save time during transmission, we'll set the payload size to be only
# what we need. A float value occupies 4 bytes in memory using
# struct.pack(); "<f" means a little endian unsigned float
radio.payloadSize = len(struct.pack("<f", payload[0]))
# struct.pack(); "f" means an unsigned float
radio.payloadSize = struct.calcsize("f")

# for debugging, we have 2 options that print a large block of details
# (smaller) function that prints raw register values
Expand All @@ -189,8 +188,10 @@ def set_role():
pass # continue example until 'Q' is entered
else: # if role was set using CLI args
# run role once and exit
master() if bool(args.role) else slave()
if bool(args.role):
master()
else:
slave()
except KeyboardInterrupt:
print(" Keyboard Interrupt detected. Exiting...")
print(" Keyboard Interrupt detected. Powering down radio.")
radio.powerDown()
sys.exit()
37 changes: 19 additions & 18 deletions examples_linux/interrupt_configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ def interrupt_handler(channel):
tx_ds, tx_df, rx_dr = radio.whatHappened() # get IRQ status flags
if tx_df:
radio.flush_tx()
print("\ttx_ds: {}, tx_df: {}, rx_dr: {}".format(tx_ds, tx_df, rx_dr))
print(f"\ttx_ds: {tx_ds}, tx_df: {tx_df}, rx_dr: {rx_dr}")
if pl_iterator[0] == 0:
print(" 'data ready' event test {}".format("passed" if rx_dr else "failed"))
print(" 'data ready' event test", ("passed" if rx_dr else "failed"))
elif pl_iterator[0] == 1:
print(" 'data sent' event test {}".format("passed" if tx_ds else "failed"))
print(" 'data sent' event test", ("passed" if tx_ds else "failed"))
elif pl_iterator[0] == 3:
print(" 'data fail' event test {}".format("passed" if tx_df else "failed"))
print(" 'data fail' event test", ("passed" if tx_df else "failed"))


# setup IRQ GPIO pin
Expand Down Expand Up @@ -98,8 +98,8 @@ def _ping_n_wait(pl_iter):
time.sleep(0.1) # wait 100 ms for interrupt_handler() to complete


def print_rx_fifo(pl_size):
"""fush RX FIFO by printing all available payloads with 1 buffer
def print_rx_fifo(pl_size: int):
"""Flush RX FIFO by printing all available payloads with 1 buffer
:param int pl_size: the expected size of each payload
"""
Expand Down Expand Up @@ -147,7 +147,7 @@ def master():
print("RX node's FIFO is full; it is not listening any more")
else:
print(
"Transmission successful, but the RX node might still be " "listening."
"Transmission successful, but the RX node might still be listening."
)
else:
radio.flush_tx()
Expand All @@ -166,14 +166,14 @@ def master():
print_rx_fifo(len(ack_payloads[0])) # empty RX FIFO


def slave(timeout=6): # will listen for 6 seconds before timing out
def slave(timeout: int = 6):
"""Only listen for 3 payload from the master node
:param int timeout: The number of seconds to wait (with no transmission)
until exiting function.
"""
pl_iterator[0] = 0 # reset this to indicate event is a 'data_ready' event
# setup radio to recieve pings, fill TX FIFO with ACK payloads
# setup radio to receive pings, fill TX FIFO with ACK payloads
radio.writeAckPayload(1, ack_payloads[0])
radio.writeAckPayload(1, ack_payloads[1])
radio.writeAckPayload(1, ack_payloads[2])
Expand All @@ -187,7 +187,7 @@ def slave(timeout=6): # will listen for 6 seconds before timing out
print_rx_fifo(len(tx_payloads[0]))


def set_role():
def set_role() -> bool:
"""Set the role using stdin stream. Timeout arg for slave() can be
specified using a space delimiter (e.g. 'R 10' calls `slave(10)`)
Expand All @@ -210,15 +210,14 @@ def set_role():
else:
slave()
return True
elif user_input[0].upper().startswith("T"):
if user_input[0].upper().startswith("T"):
master()
return True
elif user_input[0].upper().startswith("Q"):
if user_input[0].upper().startswith("Q"):
radio.powerDown()
return False
else:
print(user_input[0], "is an unrecognized input. Please try again.")
return set_role()
print(user_input[0], "is an unrecognized input. Please try again.")
return set_role()


if __name__ == "__main__":
Expand Down Expand Up @@ -275,8 +274,10 @@ def set_role():
pass # continue example until 'Q' is entered
else: # if role was set using CLI args
# run role once and exit
master() if bool(args.role) else slave()
if bool(args.role):
master()
else:
slave()
except KeyboardInterrupt:
print(" Keyboard Interrupt detected. Exiting...")
print(" Keyboard Interrupt detected. Powering down radio.")
radio.powerDown()
sys.exit()
45 changes: 20 additions & 25 deletions examples_linux/manual_acknowledgements.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,9 @@ def master():
pass # wait for incoming payload or timeout
radio.stopListening() # put radio in TX mode
end_timer = time.monotonic_ns() # end timer
decoded = buffer[:6].decode("utf-8")
print(
"Transmission successful. Sent: {}{}.".format(
buffer[:6].decode("utf-8"), counter[0]
),
f"Transmission successful. Sent: {decoded}{counter[0]}.",
end=" ",
)
has_payload, pipe_number = radio.available_pipe()
Expand All @@ -89,23 +88,19 @@ def master():
received = radio.read(radio.payloadSize)
# NOTE received[7:8] discards NULL terminating 0
counter[0] = received[7:8][0] # save the counter
decoded = bytes(received[:6]).decode("utf-8")
print(
"Received {} bytes on pipe {}: {}{}. "
"Round-trip delay: {} us.".format(
radio.payloadSize,
pipe_number,
bytes(received[:6]).decode("utf-8"),
counter[0],
(end_timer - start_timer) / 1000,
)
f"Received {radio.payloadSize} bytes",
f"on pipe {pipe_number}: {decoded}{counter[0]}.",
f"Round-trip delay: {(end_timer - start_timer) / 1000} us.",
)
else:
print("No response received.")
time.sleep(1) # make example readable by slowing down transmissions
print(failures, "failures detected. Leaving TX role.")


def slave(timeout=6):
def slave(timeout: int = 6):
"""Listen for any payloads and print the transaction
:param int timeout: The number of seconds to wait (with no transmission)
Expand All @@ -132,18 +127,16 @@ def slave(timeout=6):
# NOTE txStandBy() flushes TX FIFO on transmission failure
radio.startListening() # put radio back in RX mode
# print the payload received payload
decoded = bytes(received[:6]).decode("utf-8")
print(
"Received {} bytes on pipe {}: {}{}.".format(
radio.payloadSize,
pipe_number,
bytes(received[:6]).decode("utf-8"),
received[7:8][0],
),
f"Received {radio.payloadSize} bytes"
f"on pipe {pipe_number}: {decoded}{received[7:8][0]}.",
end=" ",
)
if result: # did response succeed?
# print response's payload
print("Sent: {}{}".format(buffer[:6].decode("utf-8"), counter[0]))
decoded = buffer[:6].decode("utf-8")
print(f"Sent: {decoded}{counter[0]}")
else:
print("Response failed or timed out")
start_timer = time.monotonic() # reset the timeout timer
Expand All @@ -153,7 +146,7 @@ def slave(timeout=6):
radio.stopListening() # put the radio in TX mode


def set_role():
def set_role() -> bool:
"""Set the role using stdin stream. Timeout arg for slave() can be
specified using a space delimiter (e.g. 'R 10' calls `slave(10)`)
Expand All @@ -176,10 +169,10 @@ def set_role():
else:
slave()
return True
elif user_input[0].upper().startswith("T"):
if user_input[0].upper().startswith("T"):
master()
return True
elif user_input[0].upper().startswith("Q"):
if user_input[0].upper().startswith("Q"):
radio.powerDown()
return False
print(user_input[0], "is an unrecognized input. Please try again.")
Expand Down Expand Up @@ -238,8 +231,10 @@ def set_role():
pass # continue example until 'Q' is entered
else: # if role was set using CLI args
# run role once and exit
master() if bool(args.role) else slave()
if bool(args.role):
master()
else:
slave()
except KeyboardInterrupt:
print(" Keyboard Interrupt detected. Exiting...")
print(" Keyboard Interrupt detected. Powering down radio.")
radio.powerDown()
sys.exit()
Loading

0 comments on commit 8ddd1e8

Please sign in to comment.