Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions python/ray/test/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,35 @@
from __future__ import division
from __future__ import print_function

import time
import pytest
import time

import ray

from ray.experimental.queue import Queue, Empty, Full


def setup_module():
if not ray.worker.global_worker.connected:
ray.init()

@pytest.fixture
def ray_start():
# Start the Ray process.
ray.init(num_cpus=1)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()

@ray.remote
def get_async(queue, block, timeout, sleep):
time.sleep(sleep)
return queue.get(block, timeout)

def test_queue(ray_start):
@ray.remote
def get_async(queue, block, timeout, sleep):
time.sleep(sleep)
return queue.get(block, timeout)

@ray.remote
def put_async(queue, item, block, timeout, sleep):
time.sleep(sleep)
queue.put(item, block, timeout)
@ray.remote
def put_async(queue, item, block, timeout, sleep):
time.sleep(sleep)
queue.put(item, block, timeout)

# Test simple usage.

def test_simple_use():
q = Queue()

items = list(range(10))
Expand All @@ -38,8 +41,8 @@ def test_simple_use():
for item in items:
assert item == q.get()

# Test asynchronous usage.

def test_async():
q = Queue()

items = set(range(10))
Expand All @@ -52,8 +55,8 @@ def test_async():

assert items == result

# Test put.

def test_put():
q = Queue(1)

item = 0
Expand Down Expand Up @@ -82,8 +85,8 @@ def test_put():

assert ray.get(get_id) == 1

# Test get.

def test_get():
q = Queue()

item = 0
Expand All @@ -107,8 +110,8 @@ def test_get():
put_async.remote(q, item, True, None, 0.2)
assert q.get() == item

# Test qsize.

def test_qsize():
q = Queue()

items = list(range(10))
Expand Down