diff --git a/conn.go b/conn.go index 5b452ce..ac1d783 100644 --- a/conn.go +++ b/conn.go @@ -400,7 +400,7 @@ func (c *conn) init() error { return errors.Wrap(err, qry) } defer st.Close() - rows, err := st.Query([]driver.Value{}) //nolint:staticcheck + rows, err := st.Query([]driver.Value{}) //lint:ignore SA1019 difficult if err != nil { if Log != nil { Log("qry", qry, "error", err) diff --git a/drv_10.go b/drv_10.go index 73a4d77..f16219a 100644 --- a/drv_10.go +++ b/drv_10.go @@ -95,7 +95,7 @@ func NewSessionIniter(m map[string]string) func(driver.Conn) error { if err != nil { return errors.Wrap(err, qry) } - _, err = st.Exec(nil) //nolint:staticcheck + _, err = st.Exec(nil) //lint:ignore SA1019 difficult st.Close() if err != nil { return err diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..80ebef9 --- /dev/null +++ b/queue.go @@ -0,0 +1,429 @@ +// Copyright 2019 Tamás Gulácsi +// +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package goracle + +/* +#include +#include "dpiImpl.h" +*/ +import "C" +import ( + "time" + "unsafe" + + "github.com/pkg/errors" +) + +// Queue represents an Oracle Advanced Queue. +type Queue struct { + *conn + dpiQueue *C.dpiQueue + name string +} + +func NewQueue(execer Execer, name string, payloadType *C.dpiObjectType) (Queue, error) { + cx, err := DriverConn(execer) + if err != nil { + return Queue{}, err + } + Q := Queue{conn: cx.(*conn)} + value := C.CString(name) + if C.dpiConn_newQueue(Q.conn.dpiConn, value, C.uint(len(name)), payloadType, &Q.dpiQueue) == C.DPI_FAILURE { + err = errors.WithMessage(Q.conn.drv.getError(), "newQueue "+name) + } + C.free(unsafe.Pointer(value)) + return Q, err +} + +// Name of the queue. +func (Q Queue) Name() string { return Q.name } + +func (Q Queue) EnqOptions() (EnqOptions, error) { + var E EnqOptions + var opts *C.dpiEnqOptions + if C.dpiQueue_getEnqOptions(Q.dpiQueue, &opts) == C.DPI_FAILURE { + return E, errors.WithMessage(Q.drv.getError(), "getEnqOptions") + } + err := E.fromOra(Q.conn.drv, opts) + return E, err +} +func (Q Queue) DeqOptions() (DeqOptions, error) { + var D DeqOptions + var opts *C.dpiDeqOptions + if C.dpiQueue_getDeqOptions(Q.dpiQueue, &opts) == C.DPI_FAILURE { + return D, errors.WithMessage(Q.drv.getError(), "getDeqOptions") + } + err := D.fromOra(Q.conn.drv, opts) + return D, err +} + +// Dequeues messages into the given slice. +func (Q Queue) Dequeue(messages []Message) (int, error) { + var ok C.int + props := make([]*C.dpiMsgProps, len(messages)) + num := C.uint(len(props)) + if num == 1 { + ok = C.dpiQueue_deqOne(Q.dpiQueue, &props[0]) + } else { + ok = C.dpiQueue_deqMany(Q.dpiQueue, &num, &props[0]) + } + if ok == C.DPI_FAILURE { + return int(num), errors.WithMessage(Q.conn.getError(), "dequeue") + } + var firstErr error + for i, p := range props[:int(num)] { + if err := messages[i].fromOra(Q.conn, p); err != nil { + if firstErr == nil { + firstErr = err + } + } + C.dpiMsgProps_release(p) + } + return int(num), firstErr +} + +// Warning: calling this function in parallel on different connections acquired from the same pool may fail due to Oracle bug 29928074. Ensure that this function is not run in parallel, use standalone connections or connections from different pools, or make multiple calls to Queue.enqOne() instead. The function Queue.Dequeue() call is not affected. +func (Q Queue) Enqueue(messages []Message) error { + props := make([]*C.dpiMsgProps, len(messages)) + defer func() { + for _, p := range props { + if p != nil { + C.dpiMsgProps_release(p) + } + } + }() + for i := range props { + if C.dpiConn_newMsgProps(Q.conn.dpiConn, &props[i]) == C.DPI_FAILURE { + return errors.WithMessage(Q.conn.getError(), "newMsgProps") + } + if err := messages[i].toOra(Q.drv, props[i]); err != nil { + return err + } + } + + var ok C.int + if len(messages) == 1 { + ok = C.dpiQueue_enqOne(Q.dpiQueue, props[0]) + } else { + ok = C.dpiQueue_enqMany(Q.dpiQueue, C.uint(len(props)), &props[0]) + } + if ok == C.DPI_FAILURE { + return errors.WithMessage(Q.conn.getError(), "enqueue") + } + return nil +} + +type Message struct { + DeliveryMode DeliveryMode + Enqueued time.Time + Delay, Expiration int32 + Priority, NumAttempts int32 + Correlation, ExceptionQ string + MsgID, OriginalMsgID string + State MessageState + Raw []byte + Object *Object +} + +func (M *Message) toOra(d *drv, props *C.dpiMsgProps) error { + var firstErr error + OK := func(ok C.int, name string) { + if ok == C.DPI_SUCCESS { + return + } + if firstErr == nil { + firstErr = errors.WithMessage(d.getError(), name) + } + } + if M.Correlation != "" { + value := C.CString(M.Correlation) + OK(C.dpiMsgProps_setCorrelation(props, value, C.uint(len(M.Correlation))), "setCorrelation") + C.free(unsafe.Pointer(value)) + } + + if M.Delay != 0 { + OK(C.dpiMsgProps_setDelay(props, C.int(M.Delay)), "setDelay") + } + + if M.ExceptionQ != "" { + value := C.CString(M.ExceptionQ) + OK(C.dpiMsgProps_setExceptionQ(props, value, C.uint(len(M.ExceptionQ))), "setExceptionQ") + C.free(unsafe.Pointer(value)) + } + + if M.Expiration != 0 { + OK(C.dpiMsgProps_setExpiration(props, C.int(M.Expiration)), "setExpiration") + } + + if M.OriginalMsgID != "" { + value := C.CString(M.OriginalMsgID) + OK(C.dpiMsgProps_setOriginalMsgId(props, value, C.uint(len(M.OriginalMsgID))), "setMsgOriginalId") + C.free(unsafe.Pointer(value)) + } + + OK(C.dpiMsgProps_setPriority(props, C.int(M.Priority)), "setPriority") + + if M.Object == nil { + OK(C.dpiMsgProps_setPayloadBytes(props, (*C.char)(unsafe.Pointer(&M.Raw[0])), C.uint(len(M.Raw))), "setPriority") + } + + OK(C.dpiMsgProps_setPayloadObject(props, M.Object.dpiObject), "setPayload") + + return firstErr +} + +func (M *Message) fromOra(c *conn, props *C.dpiMsgProps) error { + var firstErr error + OK := func(ok C.int, name string) bool { + if ok == C.DPI_SUCCESS { + return true + } + if firstErr == nil { + firstErr = errors.WithMessage(c.getError(), name) + } + return false + } + M.NumAttempts = 0 + var cint C.int + if OK(C.dpiMsgProps_getNumAttempts(props, &cint), "getNumAttempts") { + M.NumAttempts = int32(cint) + } + var value *C.char + var length C.uint + M.Correlation = "" + if OK(C.dpiMsgProps_getCorrelation(props, &value, &length), "getCorrelation") { + M.Correlation = C.GoStringN(value, C.int(length)) + } + + M.Delay = 0 + if OK(C.dpiMsgProps_getDelay(props, &cint), "getDelay") { + M.Delay = int32(cint) + } + + M.DeliveryMode = DeliverPersistent + var mode C.dpiMessageDeliveryMode + if OK(C.dpiMsgProps_getDeliveryMode(props, &mode), "getDeliveryMode") { + M.DeliveryMode = DeliveryMode(mode) + } + + M.ExceptionQ = "" + if OK(C.dpiMsgProps_getExceptionQ(props, &value, &length), "getExceptionQ") { + M.ExceptionQ = C.GoStringN(value, C.int(length)) + } + + var ts C.dpiTimestamp + M.Enqueued = time.Time{} + if OK(C.dpiMsgProps_getEnqTime(props, &ts), "getEnqTime") { + tz := c.timeZone + if ts.tzHourOffset != 0 || ts.tzMinuteOffset != 0 { + tz = timeZoneFor(ts.tzHourOffset, ts.tzMinuteOffset) + } + M.Enqueued = time.Date( + int(ts.year), time.Month(ts.month), int(ts.day), + int(ts.hour), int(ts.minute), int(ts.second), int(ts.fsecond), + tz, + ) + } + + M.Expiration = 0 + if OK(C.dpiMsgProps_getExpiration(props, &cint), "getExpiration") { + M.Expiration = int32(cint) + } + + M.MsgID = "" + if OK(C.dpiMsgProps_getMsgId(props, &value, &length), "getMsgId") { + M.MsgID = C.GoStringN(value, C.int(length)) + } + + M.OriginalMsgID = "" + if OK(C.dpiMsgProps_getOriginalMsgId(props, &value, &length), "getMsgOriginalId") { + M.OriginalMsgID = C.GoStringN(value, C.int(length)) + } + + M.Priority = 0 + if OK(C.dpiMsgProps_getPriority(props, &cint), "getPriority") { + M.Priority = int32(cint) + } + + M.State = 0 + var state C.dpiMessageState + if OK(C.dpiMsgProps_getState(props, &state), "getState") { + M.State = MessageState(state) + } + + M.Raw = nil + M.Object = nil + var obj *C.dpiObject + if OK(C.dpiMsgProps_getPayload(props, &obj, &value, &length), "getPayload") { + if obj == nil { + M.Raw = append(make([]byte, 0, length), ((*[1 << 30]byte)(unsafe.Pointer(value)))[:int(length):int(length)]...) + } else { + M.Object = &Object{dpiObject: obj} + } + } + return nil +} + +type EnqOptions struct { + Transformation string + Visibility Visibility + DeliveryMode DeliveryMode +} + +func (E EnqOptions) fromOra(d *drv, opts *C.dpiEnqOptions) error { + var firstErr error + OK := func(ok C.int, msg string) bool { + if ok == C.DPI_SUCCESS { + return true + } + if firstErr == nil { + firstErr = errors.WithMessage(d.getError(), msg) + } + return false + } + + var value *C.char + var length C.uint + if OK(C.dpiEnqOptions_getTransformation(opts, &value, &length), "getTransformation") { + E.Transformation = C.GoStringN(value, C.int(length)) + } + + var vis C.dpiVisibility + if OK(C.dpiEnqOptions_getVisibility(opts, &vis), "getVisibility") { + E.Visibility = Visibility(vis) + } + return firstErr +} + +type DeqOptions struct { + Condition, Consumer, Correlation string + MsgID, Transformation string + Mode DeqMode + Navigation DeqNavigation + Visibility Visibility + Wait uint32 +} + +func (D DeqOptions) fromOra(d *drv, opts *C.dpiDeqOptions) error { + var firstErr error + OK := func(ok C.int, msg string) bool { + if ok == C.DPI_SUCCESS { + return true + } + if firstErr == nil { + firstErr = errors.WithMessage(d.getError(), msg) + } + return false + } + + var value *C.char + var length C.uint + D.Transformation = "" + if OK(C.dpiDeqOptions_getTransformation(opts, &value, &length), "getTransformation") { + D.Transformation = C.GoStringN(value, C.int(length)) + } + D.Condition = "" + if OK(C.dpiDeqOptions_getCondition(opts, &value, &length), "getCondifion") { + D.Condition = C.GoStringN(value, C.int(length)) + } + D.Consumer = "" + if OK(C.dpiDeqOptions_getConsumerName(opts, &value, &length), "getConsumer") { + D.Consumer = C.GoStringN(value, C.int(length)) + } + D.Correlation = "" + if OK(C.dpiDeqOptions_getCorrelation(opts, &value, &length), "getCorrelation") { + D.Correlation = C.GoStringN(value, C.int(length)) + } + var mode C.dpiDeqMode + if OK(C.dpiDeqOptions_getMode(opts, &mode), "getMode") { + D.Mode = DeqMode(mode) + } + D.MsgID = "" + if OK(C.dpiDeqOptions_getMsgId(opts, &value, &length), "getMsgId") { + D.MsgID = C.GoStringN(value, C.int(length)) + } + var nav C.dpiDeqNavigation + if OK(C.dpiDeqOptions_getNavigation(opts, &nav), "getNavigation") { + D.Navigation = DeqNavigation(nav) + } + var vis C.dpiVisibility + if OK(C.dpiDeqOptions_getVisibility(opts, &vis), "getVisibility") { + D.Visibility = Visibility(vis) + } + D.Wait = 0 + var u32 C.uint + if OK(C.dpiDeqOptions_getWait(opts, &u32), "getWait") { + D.Wait = uint32(u32) + } + return firstErr +} + +type MessageState uint32 + +const ( + // MsgStateReady says that "The message is ready to be processed". + MsgStateReady = MessageState(C.DPI_MSG_STATE_READY) + // MsgStateWaiting says that "The message is waiting for the delay time to expire". + MsgStateWaiting = MessageState(C.DPI_MSG_STATE_WAITING) + // MsgStateProcessed says that "The message has already been processed and is retained". + MsgStateProcessed = MessageState(C.DPI_MSG_STATE_PROCESSED) + // MsgStateExpired says that "The message has been moved to the exception queue". + MsgStateExpired = MessageState(C.DPI_MSG_STATE_EXPIRED) +) + +type DeliveryMode uint32 + +const ( + // DeliverPersistent is to Dequeue only persistent messages from the queue. This is the default mode. + DeliverPersistent = DeliveryMode(C.DPI_MODE_MSG_PERSISTENT) + // DeliverBuffered is to Dequeue only buffered messages from the queue. + DeliverBuffered = DeliveryMode(C.DPI_MODE_MSG_BUFFERED) + // DeliverPersistentOrBuffered is to Dequeue both persistent and buffered messages from the queue. + DeliverPersistentOrBuffered = DeliveryMode(C.DPI_MODE_MSG_PERSISTENT_OR_BUFFERED) +) + +type Visibility uint32 + +const ( + // VisibleImmediate means that "The message is not part of the current transaction but constitutes a transaction of its own". + VisibleImmediate = Visibility(C.DPI_VISIBILITY_IMMEDIATE) + // VisibleOnCommit means that "The message is part of the current transaction. This is the default value". + VisibleOnCommit = Visibility(C.DPI_VISIBILITY_ON_COMMIT) +) + +type DeqMode uint32 + +const ( + // DeqRemove reads the message and updates or deletes it. This is the default mode. Note that the message may be retained in the queue table based on retention properties. + DeqRemove = DeqMode(C.DPI_MODE_DEQ_REMOVE) + // DeqBrows reads the message without acquiring a lock on the message (equivalent to a SELECT statement). + DeqBrowse = DeqMode(C.DPI_MODE_DEQ_BROWSE) + // DeqLocked reads the message and obtain a write lock on the message (equivalent to a SELECT FOR UPDATE statement). + DeqLocked = DeqMode(C.DPI_MODE_DEQ_LOCKED) + // DeqPeek confirms receipt of the message but does not deliver the actual message content. + DeqPeek = DeqMode(C.DPI_MODE_DEQ_REMOVE_NO_DATA) +) + +type DeqNavigation uint32 + +const ( + // NavFirst retrieves the first available message that matches the search criteria. This resets the position to the beginning of the queue. + NavFirst = DeqNavigation(C.DPI_DEQ_NAV_FIRST_MSG) + // NavNext skips the remainder of the current transaction group (if any) and retrieves the first message of the next transaction group. This option can only be used if message grouping is enabled for the queue. + NavNextTran = DeqNavigation(C.DPI_DEQ_NAV_NEXT_TRANSACTION) + // NavNext Retrieves the next available message that matches the search criteria. This is the default method. + NavNext = DeqNavigation(C.DPI_DEQ_NAV_NEXT_MSG) +)