Producer Consumer Thread Programming


#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/time.h>
#include "lifo_queue.h"
#include "rwlock.h"

#define MAX_THREADS 256

pthread_rwlock_t read_write_lock;

int num_threads;
int *buffer;


int ins[MAX_THREADS]; // insertions
int ext[MAX_THREADS]; // extractions
int total_insertions;
int total_extractions;
double ins_ext_per_sec;

Queue q;

/* other shared data structure */
struct task {
int n;
};


int doneProducing(int ins) {
if(ins >= 10 * QUEUE_SIZE/num_threads)
return 1;

return 0;
}

int doneConsuming(int ext) {
if(ext >= 10 * QUEUE_SIZE/num_threads)
return 1;

return 0;
}

void create_task(int thread_no, int ins, struct task* t) {
t->n = thread_no + ins * num_threads;
return;
}

void process_task(struct task t) {
return;
}

/*
if (ins % 100 == 0) {
printf("Producer Thread no %d:\n", thread_no);
printf ("After %d insertions\n", ins);
print_queue(&q);
printf("\n\n");
}
*/


void *producer(void *producer_thread_data) {
int thread_no, *ins_pointer;
int inserted;
int ins;

struct task my_task;

ins_pointer = (int *) producer_thread_data;
thread_no = *ins_pointer;
ins = 0;

while(!doneProducing(ins)) {
inserted = 0;
create_task(thread_no, ins, &my_task);

while(inserted == 0) {
pthread_rwlock_rdlock(&read_write_lock);
while (isFull(&q)) {
pthread_rwlock_unlock(&read_write_lock);
pthread_rwlock_rdlock(&read_write_lock);
}
pthread_rwlock_unlock(&read_write_lock);

pthread_rwlock_wrlock(&read_write_lock);
if (!isFull(&q)) {
//printf("\nProducer thread no %d:\n", thread_no);
insertIntoQueue(&q, my_task.n);
pthread_rwlock_unlock(&read_write_lock);
inserted = 1;
ins ++;
}
}
}

*ins_pointer = ins;

return NULL;
}

void *consumer(void *consumer_thread_data) {
int thread_no, *ext_pointer;
int extracted;
int ext;

struct task my_task;

ext_pointer = (int *) consumer_thread_data;
thread_no = *ext_pointer;

ext = 0;

while (!doneConsuming(ext)) {
extracted = 0;
while(extracted == 0) {
pthread_rwlock_rdlock(&read_write_lock);
while (isEmpty(&q)) {
pthread_rwlock_unlock(&read_write_lock);
pthread_rwlock_rdlock(&read_write_lock);
}
pthread_rwlock_unlock(&read_write_lock);

pthread_rwlock_wrlock(&read_write_lock);
if (!isEmpty(&q)) {
//printf("\nConsumer thread no %d:\n", thread_no);
my_task.n = extractFromQueue(&q);
pthread_rwlock_unlock(&read_write_lock);
extracted = 1;
ext++;
}
}
}

*ext_pointer = ext;

return NULL;
}

int main() {
int i;
struct timeval tz;
struct timezone tx;

double start_time, end_time;
pthread_t p_threads_broker;
pthread_t p_threads_prod[MAX_THREADS];
pthread_t p_threads_cons[MAX_THREADS];
pthread_attr_t attr;

pthread_attr_init(&attr);
pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);

//pthread_rwlock_init(&read_write_lock, NULL);

printf("Enter number of threads (use either of 2, 4, 8, 16, 32, 64, 128, 256): ");
scanf("%d", &num_threads);

int n_proc = num_threads;
int n_prods = n_proc / 2;
int n_cons = n_prods - 1;
int n_broker = 1;

buffer = (int *) malloc(sizeof(int) * 2 * num_prods)

gettimeofday(&tz, &tx);
start_time = (double)tz.tv_sec + (double) tz.tv_usec / 1000000.0;

//init(&q); //initializing task_queue


// create broker thread
p_threds_broker = pthread_create(&p_threads_broker, &attr, broker, (void *));

for (i=0; i<n_prods; i++) {
ins[i] = i;
// create producer threads
pthread_create(&p_threads_prod[i], &attr, producer, (void *) &ins[i]);
}

for (i=0; i<n_cons; i++) {
ext[i] = i;
// create consumer threads
pthread_create(&p_threads_cons[i], &attr, consumer, (void *) &ext[i]);
}

for (i=0; i<n_prods; i++) {
// join producer threads and add up all the insertions
pthread_join(p_threads_prod[i], NULL);
total_insertions += ins[i];
}

for (i=0; i<n_cons; i++) {

// join consumer threads and add up all the extractions
pthread_join(p_threads_cons[i], NULL);
total_extractions += ext[i];
}

gettimeofday(&tz, &tx);
end_time = (double) tz.tv_sec + (double) tz.tv_usec / 1000000.0;

/*
ins_ext_per_sec = (total_insertions + total_extractions) / (end_time - start_time);
printf("\n\n\n");
printf("Producer/Consumer threads = %d\n", num_threads);
printf("Total insertions = %d\n", total_insertions);
printf("Total extractions = %d\n", total_extractions);
printf("Total time = %lf s\n", end_time - start_time);
printf("Throughput (# of insertions/extractions ) = %lf per sec\n", ins_ext_per_sec);
*/

return 0;
}
Advertisements

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s