-
Notifications
You must be signed in to change notification settings - Fork 5
/
orogen_test_helpers.rb
142 lines (121 loc) · 5.17 KB
/
orogen_test_helpers.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# frozen_string_literal: true
module IODriversBase
# Common helpers for the rather complex setup of iodrivers_base::Task tests
#
# It relies on the test cases to deploy a task-under-test and store it in the
# @task instance variable
module OroGenTestHelpers
def setup
super
@__iodrivers_base_sockets = []
end
def teardown
super
@__iodrivers_base_sockets.each(&:close)
end
# Configure @task to use a file descriptor
#
# Internally, it creates a UDP socket and set the io_port property to connect to it
def setup_iodrivers_base_with_fd(task, io_port_name: :io_port)
# We currently have no simple way to forward a file descriptor to
# the component and have it use it. Randomly assign a port locally,
# close the socket and ask the component to use it to reduce the
# likelihood of port collision
local_socket, local_port = create_udp_socket
remote_socket, remote_port = create_udp_socket
@__iodrivers_base_sockets << local_socket
remote_socket.close
local_socket.connect "127.0.0.1", remote_port
task.properties[io_port_name]
.write("udp://127.0.0.1:#{local_port}?local_port=#{remote_port}")
local_socket
end
# @api private
#
# Data service used by {#setup_iodrivers_base_with_ports}
data_service_type "RawIOService" do
input_port "in", "/iodrivers_base/RawPacket"
output_port "out", "/iodrivers_base/RawPacket"
end
# @api private
#
# Composition used by {#setup_iodrivers_base_with_ports}
class PortIO < Syskit::Composition
add OroGen.iodrivers_base.Task, as: "test"
add RawIOService, as: "raw_io"
test_child.io_raw_out_port.connect_to \
raw_io_child, type: :buffer, size: 20
raw_io_child.connect_to \
test_child.io_raw_in_port, type: :buffer, size: 20
end
# Configure task to be connected to another task for its I/O, and
# return the task
#
# The "peering" task has a 'in' port and 'out' port. The direction is
# relative to the task, so write to `out_port` to send data to the driver,
# and read from `in_port` to read data from it.
#
# @param [Boolean] configure_and_start whether the setup should also configure
# and start the given task. The englobing composition and the I/O stub are
# always configured and start. This is true for historical reasons. Assume
# that it will eventually be set to false and then removed.
def setup_iodrivers_base_with_ports(task, configure_and_start: true)
port_io = syskit_stub_and_deploy(PortIO.use("test" => task))
if configure_and_start
Roby.warn_deprecated(
"setup_iodrivers_base_with_ports used to configure and start the "\
"task under test. This is still the default behavior, but will be "\
"changed in the future. Set configure_and_start to false to prepare "\
"for the new behavior"
)
syskit_configure_and_start(port_io)
else
syskit_configure_and_start(port_io, recursive: false)
syskit_configure_and_start(port_io.raw_io_child, recursive: false)
end
port_io.raw_io_child
end
# Create a UDP socket on a random port and return the socket and the port
#
# @return [UDPSocket, Integer]
def create_udp_socket
socket = UDPSocket.new
socket.bind("127.0.0.1", 0)
[socket, socket.local_address.ip_port]
end
# Create a RawPacket object from a raw data as a string
#
# @param [String] data the data
def raw_packet_from_s(data, time: Time.now)
packet = Types.iodrivers_base.RawPacket.new
packet.time = time
packet.data.from_buffer([data.size].pack("Q<") + data)
packet
end
# Convert a RawPacket's data array into a string
#
# @param [Types.iodrivers_base.RawPacket] packet
# @return [String]
def raw_packet_to_s(packet)
packet.data.to_byte_array[8..-1]
end
# Read data from a I/O but automatically timing out
def read_with_timeout(io, size, timeout: 10)
deadline = Time.now + timeout
buffer = "".dup
while buffer.size != size
begin
buffer.concat(io.read_nonblock(size - buffer.size))
rescue IO::WaitReadable
remaining = deadline - Time.now
if remaining < 0
flunk("failed to read #{size} bytes from #{io} in #{timeout}s. "\
"Only got #{buffer.size} so far")
end
select([io], [], [], remaining)
end
end
buffer
end
end
end