Message Passing using OpenMPI : Production and Consumption via Broker

I am working on several assignments and through the problem solving process, I am learning OpenMPI. OpenMPI is a great library that gives you capability of using multiple CPUs in parallel. Think about you are given 100 random numbers which are not sorted. Normally you would write a sequential program where you will implement a O(nlogn) sorting algorithm like quick sort or merge sort to sort those random numbers in an increasing or decreasing order. But what if you partition the 100 numbers into 10 buckets. And sort each bucket using your sequential sort algorithm parallelly using 10 process. At the end, you need to combine those 10 buckets. This is just an example of how using parallel process, we can speedup. Sometimes certain information needs to be synchronized in all running processes for further computation. Unlike threads, processes don’t share shared memory. So, you can not really use a global variable for storing. Moreover, you can never exactly tell which instruction are being executed by  a process at a certain moment. The way processes communicate with each other is through sending and receiving messages. That is why MPI stands for Message Passing Interface. The first problem that I have been working is a problem where a producer produces item and send item to broker, then broker receives the message, saves it in its internal buffer and finally, sends acknowledgement message back to the producer. However, a broker can only insert a newly produced item into the buffer only if the buffer is not full. In case, the buffer is full, the producers will be waiting. Broker also waits for consumer request to consume items. A consumer sends request to broker and if the buffer is not empty, the broker sends an item for consumption to the consumer on receiving a valid request. While handling the consumer request, the broker also releases the waiting producer by inserting the produced items when the buffer becomes free. This whole production, consumption scheme now needs to run for a given amount of time. There are several challenges in implementing this parallel program. Deadlock is a common issue in parallel  programming. It occurs when two or multiple processes race against each other for accessing the same resource. Imagine a professor is discussing with his two students. Let’s assume the old professor can only remember one question at a time (his memory is like the communication buffer). The first student asks the professor a question, until the professor answers the question, the second student can not ask any question. Now imagine, if the first student is constantly asking question, the second student may not get chance to ask at all. (….)

// Question 1. Producer, Consumer, Broker in MPI

#include
#include
#include
#include <sys/time.h>
#include #include

#define MAX_THREADS 256

// MSG type definition
#define WORK_MSG 1
#define ACK_MSG 2
#define REQ_WORK_MSG 3
#define CONS_MSG 4
#define TIME_OUT_MSG 5

// broker rank definition
#define BROKER_RANK 0

struct timeval tz;
struct timezone tx;

// size and rank of the communication world
int world_rank;
int world_size;

int nprod; // number of producers
int ncons; // number of consumers

int *buffer; // circular buffer for broker to insert into for producers or extract from consumers
int buffer_item_count;
int BUFFER_SIZE; // size of buffer
int head, tail;

int time_out = 0;
int time_out_recommendation = 0; // this is required to stop the consumers, this complication is because of the non-blocking receive of consumer
double start_time, end_time, total_time; // variable for calculating time

double MAX_TIME = 2; // maximum time the simulation will run

MPI_Status status;
MPI_Request request;

// Check whether buffer is free or not
int buffer_free() {
if(buffer_item_count == BUFFER_SIZE) {
return 0;
}
return 1;
}

// Check whether buffer is empty or not
int buffer_empty() {
if(buffer_item_count > 0) {
return 0;
}
return 1;
}

// Broker process body
void *broker(int world_rank, int* buffer) {
start_time = MPI_Wtime();

int number;
int time_out_recommended = 0;

int inserted = 0;
int total_consumed = 0;

// queue of waiting producers
int *prod_waiting;
prod_waiting = malloc(sizeof(int) * nprod);
memset(prod_waiting, 0, sizeof(int) * nprod); // no producer is waiting

while(!time_out) {
// if buffer free in broker, receive work msg from produer
if (buffer_free()) {
MPI_Recv(&number, 1, MPI_INT, MPI_ANY_SOURCE , WORK_MSG, MPI_COMM_WORLD, &status);
}
// if buffer is not empty in broker, receive work request msg from consumer
else if(!buffer_empty()) {
MPI_Recv(&number, 1, MPI_INT, MPI_ANY_SOURCE , REQ_WORK_MSG, MPI_COMM_WORLD, &status);
}

int source_rank = status.MPI_SOURCE;
int msg_tag = status.MPI_TAG;

// block for message handling starts
// manage the work message from the producer
if (msg_tag == WORK_MSG) {
if (buffer_free()) { // extra checking for buffer_free
// insert the received number from the producer into the buffer
buffer[tail] = number;
// increase the count for items in buffer
buffer_item_count++; // increase buffer item count
//printf("Broker-Prod. Inserted number %d(tag=%d) from producer = %d into buffer position %d\n", number, msg_tag, source_rank, tail);
tail = (tail + 1) % BUFFER_SIZE;
inserted++;
// send ack msg to the producer
//printf("Broker-Prod. Sending ACK MSG for number %d to prod %d.\n", number, source_rank);
MPI_Send(&number, 1, MPI_INT, source_rank, ACK_MSG, MPI_COMM_WORLD);
}
else {
// if buffer is not free put the producer in the waiting producer queue
prod_waiting[source_rank] = 1;
//printf("Broker-Prod-Waiting. Buffer item count = %d\n", buffer_item_count);
//printf("Broker-Prod-Waiting. prod %d waiting for ACK MSG.\n", source_rank);
}
}
// manage the work request msg for the consumers
else if (msg_tag == REQ_WORK_MSG) {
if (!buffer_empty()){ // if buffer not empty, consume (extra checking)
// extract the number for the consumer
number = buffer[head];
// increase the count for items in buffer
buffer_item_count--;
//printf("Broker-Cons. Extracted number %d(tag=%d) for consumer = %d from buffer position %d\n", number, msg_tag, source_rank, head);
head = (head + 1) % BUFFER_SIZE;
// send the extracted number to consumer
//printf("Broker-Cons. Sending extracted number %d to cons %d.\n", number, source_rank);
MPI_Send(&number, 1, MPI_INT, source_rank, REQ_WORK_MSG, MPI_COMM_WORLD);
// send the waiting producers ack msg
int i;
for (i=0; i < nprod; i++) { if (prod_waiting[i] == 1) { buffer[tail] = number; buffer_item_count++; // increase buffer item count tail = (tail + 1) % BUFFER_SIZE; // set the producer to be non-waiting prod_waiting[i] = 0; // send the ack msg to the producer //printf("Broker-Cons-Waiting. Sending ACK MSG for number %d to prod %d\n", number, i); MPI_Send(&number, 1, MPI_INT, i, ACK_MSG, MPI_COMM_WORLD); } } } } // block for message handling ends // block for time out checking starts end_time = MPI_Wtime(); total_time = end_time - start_time; if (total_time > MAX_TIME) {
time_out = 1;
//printf("Broker. Time out. \n");
}
}

if (time_out == 1) {
int i;
number = -1;
for(i=0; i < world_size; i++) {
MPI_Send(&number, 1, MPI_INT, i, TIME_OUT_MSG, MPI_COMM_WORLD);
}
// receive consumed count from all the consumers
for(i=2; i < world_size; i += 2) {
int consumed_count = 0;
MPI_Recv(&consumed_count, 1, MPI_INT, i, CONS_MSG, MPI_COMM_WORLD, &status);
total_consumed += consumed_count;
}

printf("Broker. Total time = %lf s\n", end_time - start_time);
printf("Broker. Total inserted = %d \n", inserted);
printf("Broker. Total consumed = %d \n", total_consumed);
}

//printf ("Broker. Finished. \n");

return NULL;
}

// producer process body
void *producer(int world_rank) {
int number;

srand(time(NULL) + world_rank);
while(!time_out) {
// producing random number
number = rand() % 100 + 1;
//printf("Prod. Produced random number %d by prod %d \n", number, world_rank);
MPI_Send(&number, 1, MPI_INT, BROKER_RANK, WORK_MSG, MPI_COMM_WORLD);

// blocking wait for acknowledgement message
MPI_Recv(&number, 1, MPI_INT, BROKER_RANK, MPI_ANY_TAG, MPI_COMM_WORLD, &status);

int msg_tag = status.MPI_TAG;
if(msg_tag == ACK_MSG) {
// printf("Prod. Received ACK message for number %d by prod %d\n", number, world_rank);
}
else if (msg_tag == TIME_OUT_MSG) {
//printf("Prod. Time out for prod %d. \n", world_rank);
time_out = 1; // break out of loop
}
}

//printf("Prod %d. Out of while loop.\n", world_rank);
printf("Prod %d. Production finished.\n", world_rank);

return NULL;
}

// consumer process body
void *consumer(int world_rank) {
int number;
int consumed_count = 0;

while(!time_out) {
// Sending work request msg to broker
MPI_Send(&number, 1, MPI_INT, BROKER_RANK, REQ_WORK_MSG, MPI_COMM_WORLD);
//printf("Cons. Sent work req msg to broker by cons %d.\n", world_rank);

int source_rank, msg_tag;

// non-blocking receive for consuming the number
MPI_Irecv(&number, 1, MPI_INT, BROKER_RANK, MPI_ANY_TAG, MPI_COMM_WORLD, &request);
// testing until receive occurs
while (1) {
int flag = 0;
MPI_Test(&request, &flag, &status);
if (flag != 0) {
source_rank = status.MPI_SOURCE;
msg_tag = status.MPI_TAG;
//printf("Cons. Consumed the work req msg (number %d) by cons %d\n", number, world_rank);
break;
}
else {
//printf("fail!\n");
//MPI_Irecv(&number, 1, MPI_INT, BROKER_RANK, MPI_ANY_TAG, MPI_COMM_WORLD, &request);
}
}

if (source_rank == BROKER_RANK) {
// if the request is done
if (request == MPI_REQUEST_NULL) {
//printf("The receive request is null now. \n");
if(msg_tag == REQ_WORK_MSG) {
//printf("Cons. Consumed random number %d by cons = %d \n", number, world_rank);
consumed_count++;
} else if (msg_tag == TIME_OUT_MSG) {
//printf("Cons. Time out for cons %d. \n", world_rank);
MPI_Send(&consumed_count, 1, MPI_INT, BROKER_RANK, CONS_MSG, MPI_COMM_WORLD);
time_out = 1; // break out of loop
}
}
}
}
//printf("Cons %d. Out of while loop.\n", world_rank);
printf("Cons %d. Consumption finished.\n", world_rank);

return NULL;
}

int main(int argc, char** argv) {
int i;

MPI_Init(NULL, NULL);

MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);

int w_s = sqrt((double)world_size);

if (world_size % 2 != 0 || w_s * w_s != world_size || world_size < 4) { printf("Process number incorrect. \n"); MPI_Finalize(); return 0; } if (argc > 0) { // You can pass max time as the first argument
MAX_TIME = atoi(argv[1]);
}

if (world_rank == 0) {// broker processor
nprod = world_size / 2;
ncons = world_size / 2 - 1;
BUFFER_SIZE = 2 * world_size / 2;
buffer = (int *)malloc(sizeof(int) * BUFFER_SIZE);

head = tail = 0;
broker(world_rank, buffer);

free(buffer);
} else if (world_rank % 2 != 0){// Producer processors, odd world rank
producer(world_rank);
} else {// Consumer processors, even world rank
consumer(world_rank);
}

MPI_Finalize();

return 0;
}
Advertisements

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s