-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmyth_wsqueue_func.h
383 lines (362 loc) · 8.84 KB
/
myth_wsqueue_func.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
#ifndef MYTH_WSQUEUE_FUNC_H_
#define MYTH_WSQUEUE_FUNC_H_
#include "myth_config.h"
#include "myth_desc.h"
#include "myth_wsqueue.h"
#include "myth_wsqueue_proto.h"
//Initialize thread-local data
static inline void myth_queue_init_thread_data(myth_queue_data_t th){}
static inline void myth_queue_fini_thread_data(myth_queue_data_t th){}
//critical section for signal
static inline void myth_queue_enter_operation(myth_thread_queue_t q)
{
#ifdef USE_SIGNAL_CS
assert(q->op_flag==0);
q->op_flag=1;
myth_wsqueue_wbarrier();
#endif
#ifdef USE_THREAD_CS
real_pthread_mutex_lock(&q->mtx);
#endif
}
static inline void myth_queue_exit_operation(myth_thread_queue_t q)
{
#ifdef USE_SIGNAL_CS
assert(q->op_flag==1);
myth_wsqueue_wbarrier();
q->op_flag=0;
#endif
#ifdef USE_THREAD_CS
real_pthread_mutex_unlock(&q->mtx);
#endif
}
static inline int myth_queue_is_operating(myth_thread_queue_t q)
{
int ret=0;
#ifdef USE_SIGNAL_CS
ret=ret && q->op_flag;
#endif
return ret;
}
static inline void myth_queue_init(myth_thread_queue_t q){
myth_wsqueue_lock_init(&q->lock);
#if defined USE_LOCK || defined USE_LOCK_ANY
myth_internal_lock_init(&q->m_lock);
#endif
#ifdef USE_SIGNAL_CS
q->op_flag=0;
#endif
#ifdef USE_THREAD_CS
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_ERRORCHECK);
real_pthread_mutex_init(&q->mtx,&attr);
pthread_mutexattr_destroy(&attr);
#endif
q->size=INITIAL_QUEUE_SIZE;
q->ptr=myth_malloc(sizeof(myth_thread_t)*q->size);
memset(q->ptr,0,sizeof(myth_thread_t)*q->size);
q->base=q->size/2;
q->top=q->base;
memset(&q->wc,0,sizeof(myth_wscache));
}
static inline void myth_queue_fini(myth_thread_queue_t q){
myth_queue_clear(q);
myth_wsqueue_lock_destroy(&q->lock);
#if defined USE_LOCK || defined USE_LOCK_ANY
myth_internal_lock_destroy(&q->m_lock);
#endif
myth_free(q->ptr,0);
}
static inline void myth_queue_clear(myth_thread_queue_t q)
{
myth_queue_enter_operation(q);
#if defined USE_LOCK || defined USE_LOCK_CLEAR
myth_internal_lock_lock(&q->m_lock);
#endif
myth_wsqueue_lock_lock(&q->lock);
myth_assert(q->top==q->base);
q->base=q->size/2;
q->top=q->base;
myth_wsqueue_lock_unlock(&q->lock);
#if defined USE_LOCK || defined USE_LOCK_CLEAR
myth_internal_lock_unlock(&q->m_lock);
#endif
myth_queue_exit_operation(q);
}
//push/pop/peek:Owner thread operations
static inline void __attribute__((always_inline)) myth_queue_push(myth_thread_queue_t q,myth_thread_t th)
{
myth_queue_enter_operation(q);
#if defined USE_LOCK || defined USE_LOCK_PUSH
myth_internal_lock_lock(&q->m_lock);
#endif
//Check
int t=q->top;
//read barrier
myth_wsqueue_rbarrier();
if (t==q->size){
//Acquire lock
myth_wsqueue_lock_lock(&q->lock);
//Runqueue full?
if (q->base==0){
myth_assert(0);
fprintf(stderr,"Fatal error:Runqueue overflow\n");
abort();
//TODO:extend runqueue
/*myth_thread_t *newptr;
int newsize;
//Extend runqueue
newsize=q->size*2;
//Newly allocate
newptr=myth_malloc(sizeof(myth_thread_t)*newsize);
//copy
memcpy(newptr+,q->ptr+base,sizeof(myth_thread_t)*(q->top-q->base+1));
//Adjust index
q->top=;q->base=;
//Release that old array and replace to new one
myth_free(q->ptr);q->ptr=newptr;q->size=newsize;*/
}
else{
//Shift pointers
int offset,offset_x2;
offset_x2=q->size-(q->base+q->top);
offset=offset_x2/2;if (offset_x2%2)offset--;
myth_assert(offset<0);
if (q->top-q->base){
memmove(&q->ptr[q->base+offset],&q->ptr[q->base],sizeof(myth_thread_t)*(q->top-q->base));
}
q->top+=offset;q->base+=offset;
}
t=q->top;
myth_wsqueue_lock_unlock(&q->lock);
}
//Do not need to extend of move.
q->ptr[t]=th;
myth_wsqueue_wbarrier();//Guarantee W-W dependency
q->top=t+1;
#if defined USE_LOCK || defined USE_LOCK_PUSH
myth_internal_lock_unlock(&q->m_lock);
#endif
myth_queue_exit_operation(q);
}
#ifndef MYTH_QUEUE_FIFO
static inline myth_thread_t __attribute__((always_inline)) myth_queue_pop(myth_thread_queue_t q)
{
myth_queue_enter_operation(q);
#ifdef QUICK_CHECK_ON_POP
if (q->top <= q->base) {
return NULL;
}
#endif
#if defined USE_LOCK || defined USE_LOCK_POP
myth_internal_lock_lock(&q->m_lock);
#endif
myth_thread_t ret;
int top,base;
top=q->top;
top--;
q->top=top;
//Decrement and check top
myth_wsqueue_rwbarrier();
base=q->base;
if (base+1<top){
ret=q->ptr[top];
//q->ptr[top]=NULL;
#if defined USE_LOCK || defined USE_LOCK_POP
myth_internal_lock_unlock(&q->m_lock);
#endif
myth_queue_exit_operation(q);
return ret;
}
else{
myth_wsqueue_lock_lock(&q->lock);
base=q->base;
if (base<=top){//OK
ret=q->ptr[top];
q->ptr[top]=NULL;
if (top<=base){
//invalidate cache
myth_wscache_t wc=&q->wc;
//fprintf(stderr,"cache Invalidate\n");
//Increment sequence
int s=wc->seq;
wc->seq=s+1;
myth_wsqueue_wbarrier();
//Copy data
wc->ptr=NULL;
wc->size=0;
//Increment sequence
myth_wsqueue_wbarrier();
wc->seq=s+2;
}
myth_wsqueue_lock_unlock(&q->lock);
#if defined USE_LOCK || defined USE_LOCK_POP
myth_internal_lock_unlock(&q->m_lock);
#endif
myth_queue_exit_operation(q);
return ret;
}
else{
q->top=q->size/2;
q->base=q->size/2;
myth_wsqueue_lock_unlock(&q->lock);
#if defined USE_LOCK || defined USE_LOCK_POP
myth_internal_lock_unlock(&q->m_lock);
#endif
myth_queue_exit_operation(q);
return NULL;
}
myth_unreachable();
}
myth_unreachable();
}
#else
static inline myth_thread_t myth_queue_take(myth_thread_queue_t q);
static inline myth_thread_t myth_queue_pop(myth_thread_queue_t q){
return myth_queue_take(q);
}
#endif
//take/pass:Non-owner functions
static inline myth_thread_t myth_queue_take(myth_thread_queue_t q)
{
myth_thread_t ret;
int b,top;
#ifdef QUICK_CHECK_ON_STEAL
if (q->top-q->base<=0){
return NULL;
}
#endif
#if defined USE_LOCK || defined USE_LOCK_TAKE
myth_internal_lock_lock(&q->m_lock);
#endif
//#ifdef TRY_LOCK_BEFORE_STEAL
#if 0
if (!myth_wsqueue_lock_trylock(&q->lock)){
//myth_queue_exit_operation();
return NULL;
}
#else
myth_wsqueue_lock_lock(&q->lock);
#endif
//Increment base
b=q->base;
q->base=b+1;
myth_wsqueue_rwbarrier();
top=q->top;
if (b<top){
ret=q->ptr[b];
//q->ptr[b]=NULL;
myth_wsqueue_lock_unlock(&q->lock);
#if defined USE_LOCK || defined USE_LOCK_TAKE
myth_internal_lock_unlock(&q->m_lock);
#endif
return ret;
}else{
q->base=b;
myth_wsqueue_lock_unlock(&q->lock);
#if defined USE_LOCK || defined USE_LOCK_TAKE
myth_internal_lock_unlock(&q->m_lock);
#endif
return NULL;
}
myth_unreachable();
}
static inline myth_thread_t myth_queue_peek(myth_thread_queue_t q)
{
myth_thread_t ret;
int b,top;
#ifdef QUICK_CHECK_ON_STEAL
if (q->top-q->base<=0){
return NULL;
}
#endif
//myth_wsqueue_lock_lock(&q->lock);
//if (!myth_wsqueue_lock_trylock(&q->lock))return NULL;
//Increment base
b=q->base;
top=q->top;
if (b<top){
ret=q->ptr[b];
//myth_wsqueue_lock_unlock(&q->lock);
return ret;
}else{
//myth_wsqueue_lock_unlock(&q->lock);
return NULL;
}
myth_unreachable();
}
static inline int myth_queue_trypass(myth_thread_queue_t q,myth_thread_t th)
{
#if defined USE_LOCK || defined USE_LOCK_TRYPASS
myth_internal_lock_lock(&q->m_lock);
#endif
int ret=1;
if (!myth_wsqueue_lock_trylock(&q->lock))return 0;
if (q->base==0){
ret=0;
}
else{
int b;
b=q->base;
q->ptr[b-1]=th;
myth_wsqueue_wbarrier();
q->base--;
}
myth_wsqueue_lock_unlock(&q->lock);
#if defined USE_LOCK || defined USE_LOCK_TRYPASS
myth_internal_lock_unlock(&q->m_lock);
#endif
return ret;
}
static inline void myth_queue_pass(myth_thread_queue_t q,myth_thread_t th)
{
int ret;
do{
ret=myth_queue_trypass(q,th);
}while(ret==0);
}
//put:Owner function: put a thread to the tail of the queue
static inline void myth_queue_put(myth_thread_queue_t q,myth_thread_t th)
{
myth_queue_enter_operation(q);
#if defined USE_LOCK || defined USE_LOCK_PUSH
myth_internal_lock_lock(&q->m_lock);
#endif
myth_wsqueue_lock_lock(&q->lock);
if (q->base==0){
if (q->top==q->size){
myth_assert(0);
fprintf(stderr,"Fatal error:Runqueue overflow\n");
abort();
/*myth_thread_t *newptr;
int newsize;
newsize=q->size*2;
newptr=myth_malloc(sizeof(myth_thread_t)*newsize);
memcpy(newptr+,q->ptr+base,sizeof(myth_thread_t)*(q->top-q->base+1));
q->top=;q->base=;
myth_free(q->ptr);q->ptr=newptr;q->size=newsize;*/
}
else{
int offset,offset_x2;
offset_x2=q->size-(q->base+q->top);
offset=offset_x2/2;if (offset_x2%2)offset--;
myth_assert(offset<0);
if (q->top-q->base){
memmove(&q->ptr[q->base+offset],&q->ptr[q->base],sizeof(myth_thread_t)*(q->top-q->base));
}
q->top+=offset;q->base+=offset;
}
}
int b=q->base;
if (b==0){myth_unreachable();}
b--;
q->ptr[b]=th;
q->base=b;
myth_wsqueue_lock_unlock(&q->lock);
#if defined USE_LOCK || defined USE_LOCK_PUSH
myth_internal_lock_unlock(&q->m_lock);
#endif
myth_queue_exit_operation(q);
}
#endif /* MYTH_QUEUE_FUNC_H_ */