33
33
public class AsyncQueueingExecutor <T > {
34
34
35
35
private final Logger logger ;
36
- private final boolean isWarn ;
37
36
38
37
private final LinkedBlockingQueue <T > queue ;
39
38
private final AtomicBoolean isRun = new AtomicBoolean (true );
@@ -51,7 +50,6 @@ public AsyncQueueingExecutor(int queueSize, String executorName, AsyncQueueingEx
51
50
Objects .requireNonNull (executorName , "executorName" );
52
51
53
52
this .logger = LogManager .getLogger (this .getClass ().getName () + "@" + executorName );
54
- this .isWarn = logger .isWarnEnabled ();
55
53
56
54
// BEFORE executeThread start
57
55
this .maxDrainSize = 10 ;
@@ -66,27 +64,27 @@ public AsyncQueueingExecutor(int queueSize, String executorName, AsyncQueueingEx
66
64
67
65
private Thread createExecuteThread (String executorName ) {
68
66
final ThreadFactory threadFactory = new PinpointThreadFactory (executorName , true );
69
- Thread thread = threadFactory .newThread (this ::doAccept );
67
+ Thread thread = threadFactory .newThread (this ::doExecute );
70
68
thread .start ();
71
69
return thread ;
72
70
}
73
71
74
- private void doAccept () {
72
+ private void doExecute () {
75
73
long timeout = 2000 ;
76
74
drainStartEntry :
77
75
while (isRun ()) {
78
76
try {
79
77
final Collection <T > dtoList = getDrainQueue ();
80
78
final int drainSize = takeN (dtoList , this .maxDrainSize );
81
79
if (drainSize > 0 ) {
82
- doAccept (dtoList );
80
+ doExecute (dtoList );
83
81
continue ;
84
82
}
85
83
86
84
while (isRun ()) {
87
85
final T dto = takeOne (timeout );
88
86
if (dto != null ) {
89
- doAccept (dto );
87
+ doExecute (dto );
90
88
continue drainStartEntry ;
91
89
} else {
92
90
pollTimeout (timeout );
@@ -113,7 +111,7 @@ private void flushQueue() {
113
111
if (debugEnabled ) {
114
112
logger .debug ("flushData size {}" , drainSize );
115
113
}
116
- doAccept (elementList );
114
+ doExecute (elementList );
117
115
}
118
116
}
119
117
@@ -136,32 +134,26 @@ protected void pollTimeout(long timeout) {
136
134
137
135
public boolean execute (T data ) {
138
136
if (data == null ) {
139
- if (isWarn ) {
140
- logger .warn ("execute(). data is null" );
141
- }
137
+ logger .warn ("execute(). data is null" );
142
138
return false ;
143
139
}
144
140
if (!isRun .get ()) {
145
- if (isWarn ) {
146
- logger .warn ("{} is shutdown. discard data:{}" , executorName , data );
147
- }
141
+ logger .warn ("{} is shutdown. discard data:{}" , executorName , data );
148
142
return false ;
149
143
}
150
144
boolean offer = queue .offer (data );
151
145
if (!offer ) {
152
- if (isWarn ) {
153
- logger .warn ("{} Drop data. queue is full. size:{}" , executorName , queue .size ());
154
- }
146
+ logger .warn ("{} Drop data. queue is full. size:{}" , executorName , queue .size ());
155
147
}
156
148
return offer ;
157
149
}
158
150
159
151
160
- private void doAccept (Collection <T > elements ) {
152
+ private void doExecute (Collection <T > elements ) {
161
153
this .listener .execute (elements );
162
154
}
163
155
164
- private void doAccept (T element ) {
156
+ private void doExecute (T element ) {
165
157
this .listener .execute (element );
166
158
}
167
159
0 commit comments