-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmulticast_fifo_ordering.cpp
319 lines (234 loc) · 11.7 KB
/
multicast_fifo_ordering.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
#include <iostream>
#include <iomanip>
#include <cstdlib>
#include <unistd.h>
#include <stdio.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <string.h>
#include <arpa/inet.h>
#include <vector>
#include <cstdlib>
#include <ctime>
#include <thread>
#include <bits/stdc++.h>
#include "helper.h"
#include "buffered_delivered_msg.h"
#define PORT 5555
using namespace std;
// global variables
//NOTE: diff machine diff WIFI give you diff local ip addr. Use 'ifconfig' to check
const char LOCAL_IP_ADDR[] = "192.168.169.156";
//const cahr LOCAL_IP_ADDR[] = "192.168.169.154";
// argument vector<int>& pass vector by reference
void sender(int multicast_socket_fd, vector<int>& vector_clocks, int curr_proc_no)
{
std::thread::id curr_threadID = std::this_thread::get_id();
cout << "Sender starts. Sender pid is " << getpid() << " thread id is " << curr_threadID << endl;
stringstream ss_tid;
ss_tid << curr_threadID;
string curr_threadID_str = ss_tid.str();
cout << "Current vector_clocks is:" ;
print_vecotr_clocks(vector_clocks);
// *******
// sender socket configuration
// *******
/* Initialize the multicast group sockaddr structure with a */
/* group address of 226.1.1.1 and port 5555. */
struct sockaddr_in group_address;
group_address.sin_family = AF_INET; // IPv4
group_address.sin_addr.s_addr = inet_addr("226.1.1.1"); // = INADDR_ANY is localhost
group_address.sin_port = htons( PORT ); // 5555
// For multicast, the level (2nd param) should be IPPROTO_IP which has multiple options.
// IP_MULTICAST_LOOP: 0 if you want the data you send to be looped back to your host, 1 if not. Default is 0. We don't need to change.
// IP_MULTICAST_TTL: Sets the Time To Live (TTL) in the IP header for outgoing multicast datagrams. 0 samehost. 1 same subnet. Default is 1.
u_char ttl = '0'; // because we use only one machine
if (setsockopt(multicast_socket_fd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) < 0)
{
perror("Sender: setsockopt IPPROTO_IP IP_MULTICAST_TTL");
exit(EXIT_FAILURE);
}
/* Set local interface for outbound multicast datagrams. */
/* The IP address specified must be associated with a local multicast capable interface. */
struct in_addr localInterface;
localInterface.s_addr = inet_addr(LOCAL_IP_ADDR); // On linux use ifconfig to check "UP ... MULTICAST"
if(setsockopt(multicast_socket_fd, IPPROTO_IP, IP_MULTICAST_IF, (char *)&localInterface, sizeof(localInterface)) < 0)
{
perror("Sender: setsockopt IPPROTO_IP IP_MULTICAST_IF");
exit(EXIT_FAILURE);
}
usleep( 2 * 1000000 ); //microseconds 10^6 = 1 second
for(int i = 0; i < 5; i++){
// increaes self clock by 1 before sending
vector_clocks.at(curr_proc_no-1) ++;
// // sending a message to client/multicast
// const char *msg = "Hello from sender, anyone in this group should recerve this.";
// sendto(client_sockets[i] , msg , strlen(msg) , 0 );
/* Send a message to the multicast group specified by the group_address sockaddr-structure. */
// FIFO: the number before first 2 "," are essential
string msg_str = to_string(vector_clocks.at(curr_proc_no-1)) + ", " + to_string(curr_proc_no) + ", are my local clock value and my proc_no. Msg No.=" + to_string(i) + ". From sender (ProcessNode No.= " + to_string(curr_proc_no) + " pid " + to_string(getpid()) + " threadid " + curr_threadID_str + "), anyone in this group should recerve this.";
char msg_char_array[msg_str.length() + 1];
strcpy(msg_char_array, msg_str.c_str());
if(sendto(multicast_socket_fd, &msg_char_array, strlen(msg_char_array), 0, (struct sockaddr*)&group_address, sizeof(group_address)) < 0)
{
perror("Sender: Sending datagram message error");
exit(EXIT_FAILURE);
}
cout << "------\nMessge sent, so vector_clocks updated: ";
print_vecotr_clocks(vector_clocks);
usleep( 2 * 1000000 ); //microseconds 10^6 = 1 second
}
}
void receiver(int multicast_socket_fd, vector<int>& vector_clocks, int curr_proc_no)
{
std::thread::id curr_threadID = std::this_thread::get_id();
cout << "Receiver starts. Receiver pid is " << getpid() << " thread id is " << curr_threadID << endl;
cout << "Current vector_clocks is:" ;
print_vecotr_clocks(vector_clocks);
/* Bind to the proper port number with the IP address specified as INADDR_ANY. */
struct sockaddr_in receiver_addr;
receiver_addr.sin_family = AF_INET;
receiver_addr.sin_addr.s_addr = INADDR_ANY; //is localhost
receiver_addr.sin_port = htons(PORT);
socklen_t receiver_addr_length = sizeof(receiver_addr);
// Forcefully attaching socket to the port 8080
if (bind(multicast_socket_fd, (struct sockaddr *)&receiver_addr, sizeof(receiver_addr)) < 0)
{
perror("Receiver: bind failed");
exit(EXIT_FAILURE);
}
/* Join the multicast group 226.1.1.1 on the local interface LOCAL_IP_ADDR interface. */
/* Note that this IP_ADD_MEMBERSHIP option must be */
/* called for each local interface over which the multicast */
/* datagrams are to be received. */
struct ip_mreq group;
group.imr_multiaddr.s_addr = inet_addr("226.1.1.1");
group.imr_interface.s_addr = inet_addr(LOCAL_IP_ADDR);
if (setsockopt(multicast_socket_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *)&group, sizeof(group)) < 0)
{
perror("Receiver: Adding multicast group error");
exit(EXIT_FAILURE);
}
char receiver_read_buffer[1024] = {0}; // compile will be slow if this is too large
while(1){
// receiving form multicast
// read() and recvfrom() are same here
// if (read(multicast_socket_fd , receiver_read_buffer, 1024) < 0){
if (recvfrom(multicast_socket_fd, receiver_read_buffer, 1024, 0, (struct sockaddr *)&receiver_addr, &receiver_addr_length) < 0){
perror("Receiver: recvfrom() error");
exit(EXIT_FAILURE);
}
printf("------\nReceiver: read: '%s'\n\n",receiver_read_buffer );
// update vector_clocks according to received message
// Only first curr_proc_no elements are needed
string received_msg_str = receiver_read_buffer;
vector<string> splited = split( received_msg_str, ",");
cout << "Splited result is : " << vectorstr2str(splited) << endl;
int recv_clock_value = stoi(splited[0]);// aka sequence no
int recv_proc_no = stoi(splited[1]);
// no action if the msg come from self
if (recv_proc_no == curr_proc_no){
cout << "\nMessage came from self, no action needed.\n\n";
continue;
}
if (splited.size() >= 2){ // bc for FIFO only first 2 are essential
if (int(vector_clocks.at(recv_proc_no-1) + 1) == recv_clock_value){
// deliver msg
deliver_msg(recv_proc_no, recv_clock_value, received_msg_str);
// update vector_clocks
vector_clocks.at(recv_proc_no-1) = recv_clock_value;
cout << "Vector_clocks updated: ";
print_vecotr_clocks(vector_clocks);
// check buffered msg which meet requirement
check_buffered_msgs_and_deliver(recv_proc_no, recv_clock_value, vector_clocks);
cout << "Vector_clocks updated from buffered msgs: ";
print_vecotr_clocks(vector_clocks);
}else{
// buffer msg
buffer_msg(recv_proc_no, recv_clock_value, received_msg_str);
cout << "Vector_clocks didn't update: ";
print_vecotr_clocks(vector_clocks);
}
}else{
perror("Receiver: vector_clocks size of received message is not enough.");
exit(EXIT_FAILURE);
}
memset(receiver_read_buffer, '\0', strlen(receiver_read_buffer));
}
}
int main(int argc, char *argv[])
{
// /* deal with input arguments*/
// std::cout << "print arguments:\nargc == " << argc << '\n';
// for(int ndx{}; ndx != argc; ++ndx) {
// std::cout << "argv[" << ndx << "] == " << argv[ndx] << '\n';
// }
// std::cout << "argv[" << argc << "] == "
// << static_cast<void*>(argv[argc]) << '\n';
// *******
// both need setting begin
// *******
printf("Multicast starts. The pid is %d \n", getpid());
// Socket Cite: https://www.geeksforgeeks.org/socket-programming-cc/?ref=lbp
// Multiocast Cite: https://www.tenouk.com/Module41c.html
int multicast_socket_fd;
// // Creating socket file descriptor (IPv4, TCP, IP)
// if ((sender_socket_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0)
// For multicast it must be family AF_INET, and its type may be either SOCK_DGRAM or SOCK_RAW. The most common use is with SOCK_DGRAM sockets
if ((multicast_socket_fd = socket(AF_INET, SOCK_DGRAM, 0)) == 0) // (IPv4, UDP, IP)
{
perror("Multicast: socket failed");
exit(EXIT_FAILURE);
}
// Optional: it helps in reuse of address and port. Prevents error such as: “address already in use”.
int opt = 1; // for setsockopt
if (setsockopt(multicast_socket_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)) < 0){
perror("Multicast: setsockopt SOL_SOCKET");
exit(EXIT_FAILURE);
}
// *******
// both need setting end
// *******
// *******
// FIFO ordering begin
// *******
// ask user input (also clean processes_counter if this is first created proc)
int declared_proc_amount = ask_user_input_declared_proc_amt();
// write into file
if (declare_processes_amount(declared_proc_amount) <= 0 ){
perror("Multicast: Declared proc amount write into file error.");
exit(EXIT_FAILURE);
}
int proc_ctr_current_value = processes_counter(1); // increase process counter by 1
cout << "\nThere are " << proc_ctr_current_value << " processes/nodes in this DS now.\n" << endl;
// Register signal and signal handler
signal(SIGINT, signal_callback_handler);// wait for Ctrol+c, if caught then terminate current process
// hangon here if not enough process created
int curr_proc_amount = processes_counter(0);
while (declared_proc_amount != curr_proc_amount){
curr_proc_amount = processes_counter(0);
}
int curr_proc_no = proc_ctr_current_value; // the no. of current process in all processes, e.g. it is 3 for third process.
cout << "Current process(node) is No. " << curr_proc_no << endl << endl;
for (int i =0; i < declared_proc_amount; i++){
vector_clocks.push_back(0);
buffered_msgs.push_back(vector<s_Seq_Msg>());
delivered_msgs.push_back(vector<s_Seq_Msg>());
}
// *******
// FIFO ordering end
// *******
// This thread is launched by using function pointer as callable.
// The parameters to the function are put after the comma
std::thread thread_send_obj(sender, multicast_socket_fd, std::ref(vector_clocks), curr_proc_no);
std::thread thread_receive_obj(receiver, multicast_socket_fd, std::ref(vector_clocks), curr_proc_no);// must be send first otherwise bind() error
// Wait for the threads to finish. main() and these two threads are concurrent/synchronized.
thread_send_obj.join();
thread_receive_obj.join();
proc_ctr_current_value = processes_counter(-1); // decrease counter by 1
printf("Multicast: process stopped. \n");
cout << "There are still " << proc_ctr_current_value << " processes/nodes left in this DS now." << endl;
close(multicast_socket_fd);
return 0;
}