Skip to content
7 changes: 5 additions & 2 deletions cpp/src/arrow/python/deserialize.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,11 @@ Status DeserializeDict(PyObject* context, const Array& array, int64_t start_idx,
// The latter two steal references whereas PyDict_SetItem does not. So we need
// to make sure the reference count is decremented by letting the OwnedRef
// go out of scope at the end.
PyDict_SetItem(result.obj(), PyList_GET_ITEM(keys.obj(), i - start_idx),
PyList_GET_ITEM(vals.obj(), i - start_idx));
int ret = PyDict_SetItem(result.obj(), PyList_GET_ITEM(keys.obj(), i - start_idx),
PyList_GET_ITEM(vals.obj(), i - start_idx));
if (ret != 0) {
return ConvertPyError();
}
}
static PyObject* py_type = PyUnicode_FromString("_pytype_");
if (PyDict_Contains(result.obj(), py_type)) {
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/util/logging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ void ArrowLog::InstallFailureSignalHandler() {
#endif
}

bool ArrowLog::IsLevelEnabled(ArrowLogLevel log_level) {
return log_level >= severity_threshold_;
}

ArrowLog::ArrowLog(const char* file_name, int line_number, ArrowLogLevel severity)
// glog does not have DEBUG level, we can handle it using is_enabled_.
: logging_provider_(nullptr), is_enabled_(severity >= severity_threshold_) {
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/util/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ enum class ArrowLogLevel : int {

#define ARROW_LOG_INTERNAL(level) ::arrow::util::ArrowLog(__FILE__, __LINE__, level)
#define ARROW_LOG(level) ARROW_LOG_INTERNAL(::arrow::util::ArrowLogLevel::ARROW_##level)

#define ARROW_IGNORE_EXPR(expr) ((void)(expr))

#define ARROW_CHECK(condition) \
Expand Down Expand Up @@ -173,6 +174,12 @@ class ARROW_EXPORT ArrowLog : public ArrowLogBase {
/// If glog is not installed, this function won't do anything.
static void InstallFailureSignalHandler();

/// Return whether or not the log level is enabled in current setting.
///
/// \param log_level The input log level to test.
/// \return True if input log level is not lower than the threshold.
static bool IsLevelEnabled(ArrowLogLevel log_level);

private:
ARROW_DISALLOW_COPY_AND_ASSIGN(ArrowLog);

Expand Down
9 changes: 8 additions & 1 deletion cpp/src/plasma/events.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,14 @@ void EventLoop::Start() { aeMain(loop_); }

void EventLoop::Stop() { aeStop(loop_); }

void EventLoop::Shutdown() { aeDeleteEventLoop(loop_); }
void EventLoop::Shutdown() {
if (loop_ != nullptr) {
aeDeleteEventLoop(loop_);
loop_ = nullptr;
}
}

EventLoop::~EventLoop() { Shutdown(); }

int64_t EventLoop::AddTimer(int64_t timeout, const TimerCallback& callback) {
auto data = std::unique_ptr<TimerCallback>(new TimerCallback(callback));
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/plasma/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class EventLoop {

EventLoop();

~EventLoop();

/// Add a new file event handler to the event loop.
///
/// \param fd The file descriptor we are listening to.
Expand Down
1 change: 1 addition & 0 deletions cpp/src/plasma/io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ int ConnectIpcSock(const std::string& pathname) {
socket_address.sun_family = AF_UNIX;
if (pathname.size() + 1 > sizeof(socket_address.sun_path)) {
ARROW_LOG(ERROR) << "Socket pathname is too long.";
close(socket_fd);
return -1;
}
strncpy(socket_address.sun_path, pathname.c_str(), pathname.size() + 1);
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si
} else {
pointer = AllocateMemory(total_size, &fd, &map_size, &offset);
if (!pointer) {
ARROW_LOG(ERROR) << "Not enough memory to create the object " << object_id.hex()
<< ", data_size=" << data_size
<< ", metadata_size=" << metadata_size
<< ", will send a reply of PlasmaError::OutOfMemory";
return PlasmaError::OutOfMemory;
}
}
Expand Down Expand Up @@ -1036,6 +1040,7 @@ static std::unique_ptr<PlasmaStoreRunner> g_runner = nullptr;

void HandleSignal(int signal) {
if (signal == SIGTERM) {
ARROW_LOG(INFO) << "SIGTERM Signal received, closing Plasma Server...";
if (g_runner != nullptr) {
g_runner->Stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface ObjectStoreLink {

class ObjectStoreData {

ObjectStoreData(byte[] metadata, byte[] data) {
public ObjectStoreData(byte[] metadata, byte[] data) {
this.data = data;
this.metadata = metadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public List<ObjectStoreData> get(byte[][] objectIds, int timeoutMs) {
} else {
meta = null;
}
ret.add(new ObjectStoreData(data, meta));
ret.add(new ObjectStoreData(meta, data));
}
}
return ret;
Expand Down