Skip to content

Commit

Permalink
[pulsar-client] Add deliver_at and deliver_after to python client (ap…
Browse files Browse the repository at this point in the history
…ache#6737)

* Add deliver_at and deliver_after to python client

* Fix deliver_at test

* Fix deliver_after function binding for Python

Create a new function deliverAfter that takes the MessageBuilder and a
py::object.
Python object is reinterpreted as PyDateTime_Delta then convert to a
std::chrono::duration and then the builder property is setted.

After that we return the builder.

Signed-off-by: Guillaume Perrin <[email protected]>

* Fix cast of datetime in setDeliverAfter in python library

Use PyObject* instead of boost::python::api::object since we want to
cast to another python object, PyDateTime_Delta.

Signed-off-by: Guillaume Perrin <[email protected]>

* Fix test comment

* Remove unused import

Co-authored-by: Yoann Ciabaud <[email protected]>
Co-authored-by: Guillaume Perrin <[email protected]>
  • Loading branch information
3 people authored and merlimat committed Apr 21, 2020
1 parent c4defc7 commit 37ae78c
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 4 deletions.
36 changes: 32 additions & 4 deletions pulsar-client-cpp/python/pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def send_callback(res, msg):
_retype = type(re.compile('x'))

import certifi
from datetime import timedelta


class MessageId:
Expand Down Expand Up @@ -809,6 +810,8 @@ def send(self, content,
replication_clusters=None,
disable_replication=False,
event_timestamp=None,
deliver_at=None,
deliver_after=None,
):
"""
Publish a message on the topic. Blocks until the message is acknowledged
Expand Down Expand Up @@ -836,9 +839,17 @@ def send(self, content,
Do not replicate this message.
* `event_timestamp`:
Timestamp in millis of the timestamp of event creation
* `deliver_at`:
Specify the this message should not be delivered earlier than the
specified timestamp.
The timestamp is milliseconds and based on UTC
* `deliver_after`:
Specify a delay in timedelta for the delivery of the messages.
"""
msg = self._build_msg(content, properties, partition_key, sequence_id,
replication_clusters, disable_replication, event_timestamp)
replication_clusters, disable_replication, event_timestamp,
deliver_at, deliver_after)
return self._producer.send(msg)

def send_async(self, content, callback,
Expand All @@ -847,7 +858,9 @@ def send_async(self, content, callback,
sequence_id=None,
replication_clusters=None,
disable_replication=False,
event_timestamp=None
event_timestamp=None,
deliver_at=None,
deliver_after=None,
):
"""
Send a message asynchronously.
Expand Down Expand Up @@ -889,9 +902,16 @@ def callback(res, msg):
Do not replicate this message.
* `event_timestamp`:
Timestamp in millis of the timestamp of event creation
* `deliver_at`:
Specify the this message should not be delivered earlier than the
specified timestamp.
The timestamp is milliseconds and based on UTC
* `deliver_after`:
Specify a delay in timedelta for the delivery of the messages.
"""
msg = self._build_msg(content, properties, partition_key, sequence_id,
replication_clusters, disable_replication, event_timestamp)
replication_clusters, disable_replication, event_timestamp,
deliver_at, deliver_after)
self._producer.send_async(msg, callback)


Expand All @@ -910,7 +930,8 @@ def close(self):
self._producer.close()

def _build_msg(self, content, properties, partition_key, sequence_id,
replication_clusters, disable_replication, event_timestamp):
replication_clusters, disable_replication, event_timestamp,
deliver_at, deliver_after):
data = self._schema.encode(content)

_check_type(bytes, data, 'data')
Expand All @@ -920,6 +941,8 @@ def _build_msg(self, content, properties, partition_key, sequence_id,
_check_type_or_none(list, replication_clusters, 'replication_clusters')
_check_type(bool, disable_replication, 'disable_replication')
_check_type_or_none(int, event_timestamp, 'event_timestamp')
_check_type_or_none(int, deliver_at, 'deliver_at')
_check_type_or_none(timedelta, deliver_after, 'deliver_after')

mb = _pulsar.MessageBuilder()
mb.content(data)
Expand All @@ -936,6 +959,11 @@ def _build_msg(self, content, properties, partition_key, sequence_id,
mb.disable_replication(disable_replication)
if event_timestamp:
mb.event_timestamp(event_timestamp)
if deliver_at:
mb.deliver_at(deliver_at)
if deliver_after:
mb.deliver_after(deliver_after)

return mb.build()


Expand Down
51 changes: 51 additions & 0 deletions pulsar-client-cpp/python/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import time
import os
import uuid
from datetime import timedelta
from pulsar import Client, MessageId, \
CompressionType, ConsumerType, PartitionsRoutingMode, \
AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition
Expand Down Expand Up @@ -165,6 +166,54 @@ def test_redelivery_count(self):
producer.close()
client.close()

def test_deliver_at(self):
client = Client(self.serviceUrl)
consumer = client.subscribe('my-python-topic-deliver-at',
'my-sub',
consumer_type=ConsumerType.Shared)
producer = client.create_producer('my-python-topic-deliver-at')
# Delay message in 1.1s
producer.send(b'hello', deliver_at=int(round(time.time() * 1000)) + 1100)

# Message should not be available in the next second
try:
msg = consumer.receive(1000)
self.assertTrue(False) # Should not reach this point
except:
pass # Exception is expected

# Message should be published now
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello')
consumer.unsubscribe()
producer.close()
client.close()

def test_deliver_after(self):
client = Client(self.serviceUrl)
consumer = client.subscribe('my-python-topic-deliver-after',
'my-sub',
consumer_type=ConsumerType.Shared)
producer = client.create_producer('my-python-topic-deliver-after')
# Delay message in 1.1s
producer.send(b'hello', deliver_after=timedelta(milliseconds=1100))

# Message should not be available in the next second
try:
msg = consumer.receive(1000)
self.assertTrue(False) # Should not reach this point
except:
pass # Exception is expected

# Message should be published in the next 500ms
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello')
consumer.unsubscribe()
producer.close()
client.close()

def test_consumer_initial_position(self):
client = Client(self.serviceUrl)
producer = client.create_producer('my-python-topic-producer-consumer')
Expand Down Expand Up @@ -590,6 +639,8 @@ def test_message_argument_errors(self):
self._check_value_error(lambda: producer.send(content, replication_clusters=5))
self._check_value_error(lambda: producer.send(content, disable_replication='test'))
self._check_value_error(lambda: producer.send(content, event_timestamp='test'))
self._check_value_error(lambda: producer.send(content, deliver_at='test'))
self._check_value_error(lambda: producer.send(content, deliver_after='test'))
client.close()

def test_client_argument_errors(self):
Expand Down
29 changes: 29 additions & 0 deletions pulsar-client-cpp/python/src/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
#include "utils.h"

#include <datetime.h>
#include <boost/python/suite/indexing/map_indexing_suite.hpp>
#include <boost/python/suite/indexing/vector_indexing_suite.hpp>

Expand Down Expand Up @@ -85,16 +86,44 @@ const MessageId& Message_getMessageId(const Message& msg) {
return msg.getMessageId();
}

void deliverAfter(MessageBuilder* const builder, PyObject* obj_delta) {
PyDateTime_Delta const* pydelta = reinterpret_cast<PyDateTime_Delta*>(obj_delta);

long days = pydelta->days;
const bool is_negative = days < 0;
if (is_negative) {
days = -days;
}

// Create chrono duration object
std::chrono::milliseconds
duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::hours(24)*days
+ std::chrono::seconds(pydelta->seconds)
+ std::chrono::microseconds(pydelta->microseconds)
);

if (is_negative) {
duration = duration * -1;
}

builder->setDeliverAfter(duration);
}

void export_message() {
using namespace boost::python;

PyDateTime_IMPORT;

MessageBuilder& (MessageBuilder::*MessageBuilderSetContentString)(const std::string&) = &MessageBuilder::setContent;

class_<MessageBuilder, boost::noncopyable>("MessageBuilder")
.def("content", MessageBuilderSetContentString, return_self<>())
.def("property", &MessageBuilder::setProperty, return_self<>())
.def("properties", &MessageBuilder::setProperties, return_self<>())
.def("sequence_id", &MessageBuilder::setSequenceId, return_self<>())
.def("deliver_after", &deliverAfter, return_self<>())
.def("deliver_at", &MessageBuilder::setDeliverAt, return_self<>())
.def("partition_key", &MessageBuilder::setPartitionKey, return_self<>())
.def("event_timestamp", &MessageBuilder::setEventTimestamp, return_self<>())
.def("replication_clusters", &MessageBuilder::setReplicationClusters, return_self<>())
Expand Down

0 comments on commit 37ae78c

Please sign in to comment.