Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for processing messages after MProc drop/reject. #261

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -290,118 +290,16 @@ protected void forwardMessage(Sms sms0, boolean withCharging, SmscStatAggregator
chargingSbb.setupChargingRequestInterface(chargingMedium, sms0);
} else {
// applying of MProc
MProcResult mProcResult = MProcManagement.getInstance().applyMProcArrival(itsMProcRa, sms0, persistence);

FastList<Sms> smss = mProcResult.getMessageList();
for (FastList.Node<Sms> n = smss.head(), end = smss.tail(); (n = n.getNext()) != end;) {
Sms sms = n.getValue();
TargetAddress ta = new TargetAddress(sms.getSmsSet());
TargetAddress lock = persistence.obtainSynchroObject(ta);

try {
sms.setTimestampC(System.currentTimeMillis());

synchronized (lock) {
boolean storeAndForwMode = MessageUtil.isStoreAndForward(sms);
if (!storeAndForwMode) {
try {
this.scheduler.injectSmsOnFly(sms.getSmsSet(), true);
} catch (Exception e) {
throw new SmscProcessingException("Exception when running injectSmsOnFly(): " + e.getMessage(),
SmppConstants.STATUS_SYSERR, MAPErrorCode.systemFailure,
SmscProcessingException.HTTP_ERROR_CODE_NOT_SET, null, e,
SmscProcessingException.INTERNAL_ERROR_INJECT_STORE_AND_FORWARD_NOT_SET);
}
} else {
// store and forward
if (smscPropertiesManagement.getStoreAndForwordMode() == StoreAndForwordMode.fast
&& sms.getScheduleDeliveryTime() == null) {
try {
sms.setStoringAfterFailure(true);
this.scheduler.injectSmsOnFly(sms.getSmsSet(), true);
} catch (Exception e) {
throw new SmscProcessingException(
"Exception when running injectSmsOnFly(): " + e.getMessage(),
SmppConstants.STATUS_SYSERR, MAPErrorCode.systemFailure,
SmscProcessingException.HTTP_ERROR_CODE_NOT_SET, null, e,
SmscProcessingException.INTERNAL_ERROR_INJECT_STORE_AND_FORWARD_FAST);
}
} else {
try {
sms.setStored(true);
this.scheduler.setDestCluster(sms.getSmsSet());
persistence.c2_scheduleMessage_ReschedDueSlot(sms,
smscPropertiesManagement.getStoreAndForwordMode() == StoreAndForwordMode.fast,
false);
} catch (PersistenceException e) {
throw new SmscProcessingException(
"PersistenceException when storing LIVE_SMS : " + e.getMessage(),
SmppConstants.STATUS_SUBMITFAIL, MAPErrorCode.systemFailure,
SmscProcessingException.HTTP_ERROR_CODE_NOT_SET, null, e,
SmscProcessingException.INTERNAL_ERROR_INJECT_STORE_AND_FORWARD_NORMAL);
}
}
}
}
} finally {
persistence.releaseSynchroObject(lock);
}
}
final MProcResult mProcResult = MProcManagement.getInstance().applyMProcArrival(itsMProcRa, sms0,
persistence);

if (mProcResult.isMessageRejected()) {
sms0.setMessageDeliveryResultResponse(null);
if (eventTypeFailure != null) {
String sourceAddrAndPort = null;
if (eventTypeFailure == EventType.IN_SMPP_REJECT_MPROC) {
EsmeManagement esmeManagement = EsmeManagement.getInstance();

if (esmeManagement != null) {
// for testing there is no EsmeManagement
Esme esme = esmeManagement.getEsmeByClusterName(sms0.getOrigEsmeName());
// null check is for testing
if (esme != null) {
sourceAddrAndPort = esme.getRemoteAddressAndPort();
}
}
statusCode = mProcResult.getSmppErrorCode();
} else if (eventTypeFailure == EventType.IN_HTTP_REJECT_MPROC) {
statusCode = mProcResult.getHttpErrorCode();
}

generateMprocFailureDetailedCdr(sms0, eventTypeFailure, ErrorCode.REJECT_INCOMING_MPROC, messageType,
statusCode, mProcResult.getRuleIdDropReject(), sourceAddrAndPort, seqNumber);
}

rejectSmsByMProc(sms0, "Message is rejected by MProc rules.", mProcResult);
}
if (mProcResult.isMessageDropped()) {
sms0.setMessageDeliveryResultResponse(null);
smscStatAggregator.updateMsgInFailedAll();

if (eventTypeFailure != null) {
String sourceAddrAndPort = null;
if (eventTypeFailure == EventType.IN_SMPP_REJECT_MPROC) {
eventTypeFailure = EventType.IN_SMPP_DROP_MPROC;
EsmeManagement esmeManagement = EsmeManagement.getInstance();
if (esmeManagement != null) {
// for testing there is no EsmeManagement
Esme esme = esmeManagement.getEsmeByClusterName(sms0.getOrigEsmeName());
// null check is for testing
if (esme != null) {
sourceAddrAndPort = esme.getRemoteAddressAndPort();
}
}
statusCode = mProcResult.getSmppErrorCode();
} else if (eventTypeFailure == EventType.IN_HTTP_REJECT_MPROC) {
eventTypeFailure = EventType.IN_HTTP_DROP_MPROC;
statusCode = mProcResult.getHttpErrorCode();
}

generateMprocFailureDetailedCdr(sms0, eventTypeFailure, ErrorCode.REJECT_INCOMING_MPROC, messageType,
statusCode, mProcResult.getRuleIdDropReject(), sourceAddrAndPort, seqNumber);
}

rejectSmsByMProc(sms0, "Message is dropped by MProc rules.", mProcResult);
handleRejectedMessage(sms0, eventTypeFailure, statusCode, mProcResult, messageType, seqNumber);
} else if (mProcResult.isMessageDropped()) {
handleDroppedMessage(sms0, eventTypeFailure, statusCode, mProcResult, messageType, seqNumber,
smscStatAggregator);
} else {
handleSuccessfulMprocResult(mProcResult);
}

smscStatAggregator.updateMsgInReceivedAll();
Expand All @@ -423,7 +321,7 @@ protected void forwardMessage(Sms sms0, boolean withCharging, SmscStatAggregator
}
}
}

public enum MaxActivityCountFactor {
factor_12, factor_14,
}
Expand Down Expand Up @@ -474,23 +372,143 @@ protected void generateMprocFailureDetailedCdr(Sms sms, EventType eventType, Err
smscPropertiesManagement.getGenerateDetailedCdr());
}

private void rejectSmsByMProc(final Sms anSms, final String aReason, final MProcResult anMProcResult)
private void handleRejectedMessage(final Sms sms0, final EventType eventTypeFailure, final int aStatusCode,
final MProcResult mProcResult, final String messageType, int seqNumber) throws SmscProcessingException {
int sc = aStatusCode;
sms0.setMessageDeliveryResultResponse(null);
if (eventTypeFailure != null) {
String sourceAddrAndPort = null;
if (eventTypeFailure == EventType.IN_SMPP_REJECT_MPROC) {
EsmeManagement esmeManagement = EsmeManagement.getInstance();
if (esmeManagement != null) {
// for testing there is no EsmeManagement
Esme esme = esmeManagement.getEsmeByClusterName(sms0.getOrigEsmeName());
// null check is for testing
if (esme != null) {
sourceAddrAndPort = esme.getRemoteAddressAndPort();
}
}
sc = mProcResult.getSmppErrorCode();
} else if (eventTypeFailure == EventType.IN_HTTP_REJECT_MPROC) {
sc = mProcResult.getHttpErrorCode();
}
generateMprocFailureDetailedCdr(sms0, eventTypeFailure, ErrorCode.REJECT_INCOMING_MPROC, messageType, sc,
mProcResult.getRuleIdDropReject(), sourceAddrAndPort, seqNumber);
}
rejectSmsByMProc(sms0, mProcResult);
}

private void handleDroppedMessage(final Sms sms0, final EventType eventTypeFailure, final int aStatusCode,
final MProcResult mProcResult, final String messageType, final int seqNumber,
final SmscStatAggregator smscStatAggregator) {
int sc = aStatusCode;
EventType etf = eventTypeFailure;
sms0.setMessageDeliveryResultResponse(null);
smscStatAggregator.updateMsgInFailedAll();
if (eventTypeFailure != null) {
String sourceAddrAndPort = null;
if (eventTypeFailure == EventType.IN_SMPP_REJECT_MPROC) {
etf = EventType.IN_SMPP_DROP_MPROC;
EsmeManagement esmeManagement = EsmeManagement.getInstance();
if (esmeManagement != null) {
// for testing there is no EsmeManagement
Esme esme = esmeManagement.getEsmeByClusterName(sms0.getOrigEsmeName());
// null check is for testing
if (esme != null) {
sourceAddrAndPort = esme.getRemoteAddressAndPort();
}
}
sc = mProcResult.getSmppErrorCode();
} else if (eventTypeFailure == EventType.IN_HTTP_REJECT_MPROC) {
etf = EventType.IN_HTTP_DROP_MPROC;
sc = mProcResult.getHttpErrorCode();
}
generateMprocFailureDetailedCdr(sms0, etf, ErrorCode.REJECT_INCOMING_MPROC, messageType, sc,
mProcResult.getRuleIdDropReject(), sourceAddrAndPort, seqNumber);
}
logger.info("Incoming message is dropped by mProc rules, message=[" + sms0 + "]");
if (smscPropertiesManagement.isGenerateRejectionCdr()) {
CdrGenerator.generateCdr(sms0, CdrGenerator.CDR_MPROC_DROPPED, "Message is dropped by MProc rules.",
smscPropertiesManagement.getGenerateReceiptCdr(),
MessageUtil.isNeedWriteArchiveMessage(sms0, smscPropertiesManagement.getGenerateCdr()), false, true,
smscPropertiesManagement.getCalculateMsgPartsLenCdr(),
smscPropertiesManagement.getDelayParametersInCdr());
}
}

private void handleSuccessfulMprocResult(final MProcResult mProcResult) throws SmscProcessingException {
FastList<Sms> smss = mProcResult.getMessageList();
for (FastList.Node<Sms> n = smss.head(), end = smss.tail(); (n = n.getNext()) != end;) {
final Sms sms = n.getValue();
final TargetAddress ta = new TargetAddress(sms.getSmsSet());
final TargetAddress lock = persistence.obtainSynchroObject(ta);
try {
sms.setTimestampC(System.currentTimeMillis());
synchronized (lock) {
boolean storeAndForwMode = MessageUtil.isStoreAndForward(sms);
if (!storeAndForwMode) {
try {
this.scheduler.injectSmsOnFly(sms.getSmsSet(), true);
} catch (Exception e) {
throw new SmscProcessingException(
"Exception when running injectSmsOnFly(): " + e.getMessage(),
SmppConstants.STATUS_SYSERR, MAPErrorCode.systemFailure,
SmscProcessingException.HTTP_ERROR_CODE_NOT_SET, null, e,
SmscProcessingException.INTERNAL_ERROR_INJECT_STORE_AND_FORWARD_NOT_SET);
}
} else {
// store and forward
if (smscPropertiesManagement.getStoreAndForwordMode() == StoreAndForwordMode.fast
&& sms.getScheduleDeliveryTime() == null) {
try {
sms.setStoringAfterFailure(true);
this.scheduler.injectSmsOnFly(sms.getSmsSet(), true);
} catch (Exception e) {
throw new SmscProcessingException(
"Exception when running injectSmsOnFly(): " + e.getMessage(),
SmppConstants.STATUS_SYSERR, MAPErrorCode.systemFailure,
SmscProcessingException.HTTP_ERROR_CODE_NOT_SET, null, e,
SmscProcessingException.INTERNAL_ERROR_INJECT_STORE_AND_FORWARD_FAST);
}
} else {
try {
sms.setStored(true);
this.scheduler.setDestCluster(sms.getSmsSet());
persistence.c2_scheduleMessage_ReschedDueSlot(sms,
smscPropertiesManagement.getStoreAndForwordMode() == StoreAndForwordMode.fast,
false);
} catch (PersistenceException e) {
throw new SmscProcessingException(
"PersistenceException when storing LIVE_SMS : " + e.getMessage(),
SmppConstants.STATUS_SUBMITFAIL, MAPErrorCode.systemFailure,
SmscProcessingException.HTTP_ERROR_CODE_NOT_SET, null, e,
SmscProcessingException.INTERNAL_ERROR_INJECT_STORE_AND_FORWARD_NORMAL);
}
}
}
}
} finally {
persistence.releaseSynchroObject(lock);
}
}
}

private void rejectSmsByMProc(final Sms anSms, final MProcResult anMProcResult)
throws SmscProcessingException {
final SmscProcessingException e = new SmscProcessingException(aReason,
final String reason = "Message is rejected by MProc rules.";
final SmscProcessingException e = new SmscProcessingException(reason,
getErrorCode(anMProcResult.getSmppErrorCode(), SmppConstants.STATUS_SUBMITFAIL),
getErrorCode(anMProcResult.getMapErrorCode(), MAPErrorCode.systemFailure),
getErrorCode(anMProcResult.getHttpErrorCode(), SmscProcessingException.HTTP_ERROR_CODE_NOT_SET), null,
SmscProcessingException.INTERNAL_ERROR_MPROC_REJECT);
e.setSkipErrorLogging(true);
e.setMessageRejectCdrCreated(true);
if (logger.isInfoEnabled()) {
logger.info("Incoming message is " + (anMProcResult.isMessageRejected() ? "rejected" : "dropped")
+ "by mProc rules, message=[" + anSms + "]");
logger.info("Incoming message is rejected by mProc rules, message=[" + anSms + "]");
}
if (smscPropertiesManagement.isGenerateRejectionCdr()) {
CdrGenerator.generateCdr(anSms,
(anMProcResult.isMessageRejected() ? CdrGenerator.CDR_MPROC_REJECTED : CdrGenerator.CDR_MPROC_DROPPED),
aReason, smscPropertiesManagement.getGenerateReceiptCdr(),
CdrGenerator.CDR_MPROC_REJECTED, reason, smscPropertiesManagement.getGenerateReceiptCdr(),
MessageUtil.isNeedWriteArchiveMessage(anSms, smscPropertiesManagement.getGenerateCdr()), false, true,
smscPropertiesManagement.getCalculateMsgPartsLenCdr(), smscPropertiesManagement.getDelayParametersInCdr());
}
Expand Down