diff --git a/cpp/src/arrow/python/deserialize.cc b/cpp/src/arrow/python/deserialize.cc index f13070a5883..f17abbe7cb3 100644 --- a/cpp/src/arrow/python/deserialize.cc +++ b/cpp/src/arrow/python/deserialize.cc @@ -84,8 +84,11 @@ Status DeserializeDict(PyObject* context, const Array& array, int64_t start_idx, // The latter two steal references whereas PyDict_SetItem does not. So we need // to make sure the reference count is decremented by letting the OwnedRef // go out of scope at the end. - PyDict_SetItem(result.obj(), PyList_GET_ITEM(keys.obj(), i - start_idx), - PyList_GET_ITEM(vals.obj(), i - start_idx)); + int ret = PyDict_SetItem(result.obj(), PyList_GET_ITEM(keys.obj(), i - start_idx), + PyList_GET_ITEM(vals.obj(), i - start_idx)); + if (ret != 0) { + return ConvertPyError(); + } } static PyObject* py_type = PyUnicode_FromString("_pytype_"); if (PyDict_Contains(result.obj(), py_type)) { diff --git a/cpp/src/arrow/util/logging.cc b/cpp/src/arrow/util/logging.cc index 7e368409c02..2d7cc605f8b 100644 --- a/cpp/src/arrow/util/logging.cc +++ b/cpp/src/arrow/util/logging.cc @@ -152,6 +152,10 @@ void ArrowLog::InstallFailureSignalHandler() { #endif } +bool ArrowLog::IsLevelEnabled(ArrowLogLevel log_level) { + return log_level >= severity_threshold_; +} + ArrowLog::ArrowLog(const char* file_name, int line_number, ArrowLogLevel severity) // glog does not have DEBUG level, we can handle it using is_enabled_. : logging_provider_(nullptr), is_enabled_(severity >= severity_threshold_) { diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h index 5ea78206a73..18746c5885b 100644 --- a/cpp/src/arrow/util/logging.h +++ b/cpp/src/arrow/util/logging.h @@ -56,6 +56,7 @@ enum class ArrowLogLevel : int { #define ARROW_LOG_INTERNAL(level) ::arrow::util::ArrowLog(__FILE__, __LINE__, level) #define ARROW_LOG(level) ARROW_LOG_INTERNAL(::arrow::util::ArrowLogLevel::ARROW_##level) + #define ARROW_IGNORE_EXPR(expr) ((void)(expr)) #define ARROW_CHECK(condition) \ @@ -173,6 +174,12 @@ class ARROW_EXPORT ArrowLog : public ArrowLogBase { /// If glog is not installed, this function won't do anything. static void InstallFailureSignalHandler(); + /// Return whether or not the log level is enabled in current setting. + /// + /// \param log_level The input log level to test. + /// \return True if input log level is not lower than the threshold. + static bool IsLevelEnabled(ArrowLogLevel log_level); + private: ARROW_DISALLOW_COPY_AND_ASSIGN(ArrowLog); diff --git a/cpp/src/plasma/events.cc b/cpp/src/plasma/events.cc index 7c977b00069..d49c577fdf1 100644 --- a/cpp/src/plasma/events.cc +++ b/cpp/src/plasma/events.cc @@ -80,7 +80,14 @@ void EventLoop::Start() { aeMain(loop_); } void EventLoop::Stop() { aeStop(loop_); } -void EventLoop::Shutdown() { aeDeleteEventLoop(loop_); } +void EventLoop::Shutdown() { + if (loop_ != nullptr) { + aeDeleteEventLoop(loop_); + loop_ = nullptr; + } +} + +EventLoop::~EventLoop() { Shutdown(); } int64_t EventLoop::AddTimer(int64_t timeout, const TimerCallback& callback) { auto data = std::unique_ptr(new TimerCallback(callback)); diff --git a/cpp/src/plasma/events.h b/cpp/src/plasma/events.h index 109540e6d1a..765be9c01fb 100644 --- a/cpp/src/plasma/events.h +++ b/cpp/src/plasma/events.h @@ -59,6 +59,8 @@ class EventLoop { EventLoop(); + ~EventLoop(); + /// Add a new file event handler to the event loop. /// /// \param fd The file descriptor we are listening to. diff --git a/cpp/src/plasma/io.cc b/cpp/src/plasma/io.cc index cc425428ece..9ba23e552c2 100644 --- a/cpp/src/plasma/io.cc +++ b/cpp/src/plasma/io.cc @@ -195,6 +195,7 @@ int ConnectIpcSock(const std::string& pathname) { socket_address.sun_family = AF_UNIX; if (pathname.size() + 1 > sizeof(socket_address.sun_path)) { ARROW_LOG(ERROR) << "Socket pathname is too long."; + close(socket_fd); return -1; } strncpy(socket_address.sun_path, pathname.c_str(), pathname.size() + 1); diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 82f648cc6ad..c55da30573f 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -232,6 +232,10 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si } else { pointer = AllocateMemory(total_size, &fd, &map_size, &offset); if (!pointer) { + ARROW_LOG(ERROR) << "Not enough memory to create the object " << object_id.hex() + << ", data_size=" << data_size + << ", metadata_size=" << metadata_size + << ", will send a reply of PlasmaError::OutOfMemory"; return PlasmaError::OutOfMemory; } } @@ -1036,6 +1040,7 @@ static std::unique_ptr g_runner = nullptr; void HandleSignal(int signal) { if (signal == SIGTERM) { + ARROW_LOG(INFO) << "SIGTERM Signal received, closing Plasma Server..."; if (g_runner != nullptr) { g_runner->Stop(); } diff --git a/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java b/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java index f933c85b836..520549b8d33 100644 --- a/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java +++ b/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java @@ -29,7 +29,7 @@ public interface ObjectStoreLink { class ObjectStoreData { - ObjectStoreData(byte[] metadata, byte[] data) { + public ObjectStoreData(byte[] metadata, byte[] data) { this.data = data; this.metadata = metadata; } diff --git a/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java b/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java index a708f41853d..af53c6190ee 100644 --- a/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java +++ b/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java @@ -99,7 +99,7 @@ public List get(byte[][] objectIds, int timeoutMs) { } else { meta = null; } - ret.add(new ObjectStoreData(data, meta)); + ret.add(new ObjectStoreData(meta, data)); } } return ret;