-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtask_queue.c
99 lines (87 loc) · 2.91 KB
/
task_queue.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#include "task_queue.h"
// Put data unless queue is full
void putTask(TaskQueue *queue, Task task) {
// 1. Lock
pthread_mutex_lock(&queue->lockQueue);
// 2. Wait until queue is not full
while (queue->length == QSIZE) {
pthread_cond_wait(&queue->condNotFull, &queue->lockQueue);
}
// 3. Put Algorithm for Circular Array
// 3.a. Assign the value `data` to the position pointed by `put_index`
queue->TaskArr[queue->putIndex] = task;
// 3.b. Modularly increase `put_index` by 1 so that it is within `capacity`
queue->putIndex = (queue->putIndex + 1) % QSIZE;
// at all times 3.c. Increase `length` by 1
queue->length++;
// printf("put task: %d to queue\n", task);
// 4. Send a signal that queue is not empty
pthread_cond_signal(&queue->condNotEmpty);
// 5. Unlock
pthread_mutex_unlock(&queue->lockQueue);
}
// Get data unless queue is empty
Task getTask(TaskQueue *queue) {
Task task;
// 1. Lock
pthread_mutex_lock(&queue->lockQueue);
// 2: Wait until queue is not empty
while (queue->length == 0) {
pthread_cond_wait(&queue->condNotEmpty, &queue->lockQueue);
}
// 3. Get Algorithm for Circular Array
// 3.a. Get the data at index `get_index` and then modularly add `get_index`
// by
// so that it is within capacity at all times
task = queue->TaskArr[queue->getIndex];
// 3.b. Decrease `length` by 1
queue->length--;
// 3.c. When `length` is zero, set `get_index` and `put_index` to 0
if (queue->length == 0) {
queue->getIndex = queue->putIndex = 0;
} else {
queue->getIndex = (queue->getIndex + 1) % QSIZE;
}
// printf("get task: %d from queue\n", task);
// 4. Send a signal that queue is not full
pthread_cond_signal(&queue->condNotFull);
// 5. Unlock
// Note: Signal before releasing the lock because the signal thread might
// acquire the lock again.
pthread_mutex_unlock(&queue->lockQueue);
// 6. Return data
return task;
}
TaskQueue *initTaskQueue(int capacity) {
TaskQueue *queue =
(TaskQueue *)malloc(sizeof(TaskQueue) + sizeof(Task[capacity]));
queue->capacity = capacity;
queue->putIndex = queue->getIndex = queue->length = 0;
for (int i = 0; i < capacity; i++)
queue->TaskArr[i] = -1;
if (pthread_mutex_init(&queue->lockQueue, NULL) != 0) {
perror("Mutex init failed");
free(queue);
return NULL;
}
if (pthread_cond_init(&queue->condNotFull, NULL) != 0) {
perror("Condition init failed");
pthread_mutex_destroy(&queue->lockQueue);
free(queue);
return NULL;
}
if (pthread_cond_init(&queue->condNotEmpty, NULL) != 0) {
perror("Condition init failed");
pthread_cond_destroy(&queue->condNotFull);
pthread_mutex_destroy(&queue->lockQueue);
free(queue);
return NULL;
}
return queue;
}
void deleteTaskQueue(TaskQueue *queue) {
pthread_mutex_destroy(&queue->lockQueue);
pthread_cond_destroy(&queue->condNotFull);
pthread_cond_destroy(&queue->condNotEmpty);
free(queue);
}