@@ -411,7 +411,7 @@ class EventService extends ServiceAction {
411
411
logger . debug ( '%s - create stream based on blockType' , method , this . blockType ) ;
412
412
eventer . setStreamByType ( this . blockType ) ;
413
413
414
- // the promise and streams live on and need we need
414
+ // the promise and streams live on and we need
415
415
// to check at times to be sure we are working with the
416
416
// correct one if the target gets restarted
417
417
const stream = eventer . stream ;
@@ -421,7 +421,7 @@ class EventService extends ServiceAction {
421
421
logger . debug ( '%s - create stream listening callbacks - onData, onEnd, onStatus, onError' , method ) ;
422
422
423
423
eventer . stream . on ( 'data' , ( deliverResponse ) => {
424
- logger . debug ( `on.data - peer:${ eventer . endpoint . url } - ${ mystreamCount } ` ) ;
424
+ logger . debug ( `on.data - peer:${ eventer . endpoint . url } - stream: ${ mystreamCount } ` ) ;
425
425
if ( stream !== eventer . stream ) {
426
426
logger . debug ( 'on.data - incoming block was from a cancelled stream' ) ;
427
427
return ;
@@ -488,10 +488,24 @@ class EventService extends ServiceAction {
488
488
logger . debug ( 'on.data - status received when newest block seen: %s block_num: %s' ,
489
489
deliverResponse . status , this . lastBlockNumber . toNumber ( ) ) ;
490
490
this . _close ( new Error ( `Newest block received:${ this . lastBlockNumber . toNumber ( ) } status:${ deliverResponse . status } ` ) ) ;
491
- } else if ( this . endBlock ) {
492
- logger . debug ( 'on.data - status received before the endblock has been seen' ) ;
493
- this . _close ( new Error ( `End block of ${ this . endBlock . toNumber ( ) } ` +
491
+ } else if ( this . endBlock && this . endBlock . greaterThan ( this . lastBlockNumber ) ) {
492
+ logger . error ( 'on.data - status SUCCESS received before the configured endblock has been seen' ) ;
493
+ this . _close ( new Error ( `Connection Shutdown. End block of ${ this . endBlock . toNumber ( ) } ` +
494
494
`not received. Last block received ${ this . lastBlockNumber . toNumber ( ) } ` ) ) ;
495
+ } else {
496
+ logger . error ( 'on.data - status SUCCESS received while blocks are required' ) ;
497
+ this . _close ( new Error ( 'Event Service connection has been shutdown. ' +
498
+ `Last block received ${ this . lastBlockNumber . toNumber ( ) } ` ) ) ;
499
+ }
500
+ } else if ( deliverResponse . status === 'NOT_FOUND' ) {
501
+ logger . debug ( '%s - on.data received type status of NOT_FOUND' , method ) ;
502
+ if ( this . endBlock ) {
503
+ logger . error ( 'on.data - Configured endblock does not exist' ) ;
504
+ this . _close ( new Error ( `End block of ${ this . endBlock . toNumber ( ) } ` +
505
+ ` does not exist. Last block received ${ this . lastBlockNumber . toNumber ( ) } ` ) ) ;
506
+ } else {
507
+ logger . error ( 'on.data - NOT_FOUND status received - last block received %s' , this . lastBlockNumber . toNumber ( ) ) ;
508
+ this . _close ( new Error ( `Event stream has received an unexpected status message. status:${ deliverResponse . status } ` ) ) ;
495
509
}
496
510
} else {
497
511
// tell all registered users that something is wrong and shutting down
@@ -505,11 +519,11 @@ class EventService extends ServiceAction {
505
519
} ) ;
506
520
507
521
eventer . stream . on ( 'status' , ( response ) => {
508
- logger . debug ( 'on status - status received: %j peer:%s - %s' , response , eventer . endpoint . url , mystreamCount ) ;
522
+ logger . debug ( 'on status - status received: %j peer:%s - stream: %s' , response , eventer . endpoint . url , mystreamCount ) ;
509
523
} ) ;
510
524
511
525
eventer . stream . on ( 'end' , ( ) => {
512
- logger . debug ( 'on.end - peer:%s - % ' , eventer . endpoint . url , mystreamCount ) ;
526
+ logger . debug ( 'on.end - peer:%s - stream:%s ' , eventer . endpoint . url , mystreamCount ) ;
513
527
if ( stream !== eventer . stream ) {
514
528
logger . debug ( 'on.data - incoming message was from a cancelled stream' ) ;
515
529
return ;
@@ -536,7 +550,7 @@ class EventService extends ServiceAction {
536
550
} ) ;
537
551
538
552
eventer . stream . on ( 'error' , ( err ) => {
539
- logger . debug ( 'on.error - block peer:%s - %s' , eventer . endpoint . url , mystreamCount ) ;
553
+ logger . debug ( 'on.error - block peer:%s - stream: %s' , eventer . endpoint . url , mystreamCount ) ;
540
554
if ( stream !== eventer . stream ) {
541
555
logger . debug ( '%s - on.error - incoming error was from a cancelled stream - %s' , method , err ) ;
542
556
return ;
@@ -758,7 +772,7 @@ class EventService extends ServiceAction {
758
772
*/
759
773
registerChaincodeListener ( chaincodeId = checkParameter ( 'chaincodeId' ) , eventName = checkParameter ( 'eventName' ) , callback = checkParameter ( 'callback' ) , options ) {
760
774
const method = `registerChaincodeListener[${ this . name } ] - #${ this . myNumber } ` ;
761
- logger . debug ( '%s - start' , method ) ;
775
+ logger . debug ( '%s - start - %s - %s ' , method , chaincodeId , eventName ) ;
762
776
763
777
const event_name = new RegExp ( eventName ) ;
764
778
const event_reg = new EventListener ( this , CHAINCODE , callback , options , event_name , chaincodeId ) ;
@@ -982,19 +996,23 @@ class EventService extends ServiceAction {
982
996
logger . debug ( '%s filtered block number=%s' , method , filtered_block . number ) ;
983
997
if ( filtered_block . filtered_transactions ) {
984
998
for ( const filtered_transaction of filtered_block . filtered_transactions ) {
985
- this . _callTransactionListener ( filtered_transaction . txid ,
986
- filtered_transaction . tx_validation_code ,
987
- filtered_block . number ) ;
999
+ if ( filtered_transaction . type === 'ENDORSER_TRANSACTION' ) {
1000
+ this . _callTransactionListener ( filtered_transaction . txid ,
1001
+ filtered_transaction . tx_validation_code ,
1002
+ filtered_block . number ) ;
1003
+ }
988
1004
}
989
1005
}
990
1006
} else {
991
1007
logger . debug ( '%s full block number=%s' , method , full_block . header . number ) ;
992
1008
const txStatusCodes = full_block . metadata . metadata [ fabprotos . common . BlockMetadataIndex . TRANSACTIONS_FILTER ] ;
993
1009
for ( let index = 0 ; index < full_block . data . data . length ; index ++ ) {
994
1010
const channel_header = full_block . data . data [ index ] . payload . header . channel_header ;
995
- this . _callTransactionListener ( channel_header . tx_id ,
996
- txStatusCodes [ index ] ,
997
- full_block . header . number ) ;
1011
+ if ( channel_header . type === fabprotos . common . HeaderType . ENDORSER_TRANSACTION ) {
1012
+ this . _callTransactionListener ( channel_header . tx_id ,
1013
+ txStatusCodes [ index ] ,
1014
+ full_block . header . number ) ;
1015
+ }
998
1016
}
999
1017
}
1000
1018
}
@@ -1052,7 +1070,7 @@ class EventService extends ServiceAction {
1052
1070
if ( filtered_transaction . transaction_actions ) {
1053
1071
if ( filtered_transaction . transaction_actions . chaincode_actions ) {
1054
1072
for ( const chaincode_action of filtered_transaction . transaction_actions . chaincode_actions ) {
1055
- logger . debug ( '%s - filtered block chaincode_event %s ' , method , chaincode_action ) ;
1073
+ logger . debug ( '%s - filtered block chaincode_event %j ' , method , chaincode_action ) ;
1056
1074
// need to remove the payload since with filtered blocks it
1057
1075
// has an empty byte array value which is not the real value
1058
1076
// we do not want the listener to think that is the value
@@ -1073,12 +1091,13 @@ class EventService extends ServiceAction {
1073
1091
try {
1074
1092
const env = full_block . data . data [ index ] ;
1075
1093
const channel_header = env . payload . header . channel_header ;
1076
- if ( channel_header . type === 3 ) { // only ENDORSER_TRANSACTION have chaincode events
1094
+ // only ENDORSER_TRANSACTION have chaincode events
1095
+ if ( channel_header . type === fabprotos . common . HeaderType . ENDORSER_TRANSACTION ) {
1077
1096
const tx = env . payload . data ;
1078
1097
if ( tx && tx . actions ) {
1079
1098
for ( const { payload} of tx . actions ) {
1080
1099
const chaincode_event = payload . action . proposal_response_payload . extension . events ;
1081
- logger . debug ( '%s - full block chaincode_event %s ' , method , chaincode_event ) ;
1100
+ logger . debug ( '%s - full block chaincode_event %j ' , method , chaincode_event ) ;
1082
1101
1083
1102
const txStatusCodes = full_block . metadata . metadata [ fabprotos . common . BlockMetadataIndex . TRANSACTIONS_FILTER ] ;
1084
1103
const val_code = txStatusCodes [ index ] ;
@@ -1124,13 +1143,14 @@ class EventService extends ServiceAction {
1124
1143
1125
1144
_queueChaincodeEvent ( chaincode_event , block_num , tx_id , val_code , all_events ) {
1126
1145
const method = `_queueChaincodeEvent[${ this . name } ] - #${ this . myNumber } ` ;
1127
- logger . debug ( '%s - start - chaincode_event %s ' , method , chaincode_event ) ;
1146
+ logger . debug ( '%s - start - chaincode_event %j ' , method , chaincode_event ) ;
1128
1147
1129
1148
const tx_status = convertValidationCode ( val_code ) ;
1130
1149
1131
1150
logger . debug ( '%s - txid=%s val_code=%s' , method , tx_id , tx_status ) ;
1132
1151
1133
1152
for ( const chaincode_reg of this . _eventListenerRegistrations . values ( ) ) {
1153
+ logger . debug ( '%s - checking regisistered chaincode event %s %s' , method , chaincode_reg . event , chaincode_reg . chaincodeId ) ;
1134
1154
// check each listener to see if this chaincode event matches
1135
1155
if ( chaincode_reg . listenerType === CHAINCODE &&
1136
1156
chaincode_reg . chaincodeId === chaincode_event . chaincode_id &&
@@ -1152,7 +1172,7 @@ class EventService extends ServiceAction {
1152
1172
chaincode_event . payload
1153
1173
) ) ;
1154
1174
} else {
1155
- logger . debug ( '%s - NOT queuing chaincode event: %s' , method , chaincode_event . event_name ) ;
1175
+ logger . debug ( '%s - NOT queuing chaincode event: %s' , method , chaincode_event . event_name ) ;
1156
1176
}
1157
1177
}
1158
1178
}
0 commit comments