CISC3595: Threads and Synchronization: Bounded Buffer

In this lab, we will focus on bounded buffer problem. We will write a multithreaded program that uses semaphores and mutexes to synchronize reading and writing to a common buffer. The solution is found in the book and slides 55-57 (PPT)(PDF) for the class. Try and have some fun with it. You can try and write classes for the different components or one class that combines the whole problem. If you prefer, you can write it as a set of global resources and functions that use those resources. It's up to you.

1. Problem Statement

A data set must be processed item by item. We define the problem to be that the producer produces the data set and the consumer consumes it. Generally, the producer may do some simple preprocessing and the consumer does the more intensive post processing.

Our problem is to read a file and output it to the console, like multithreaded cat. In order to process the file, without synchronization issues we are going to use semaphores. We can solve the problem as stated in the book and the slides with three semaphores: full, empty and mutex. You can also use one additional semaphore to manage access to a bool that controls if the job is done since both threads must be able to set/read it.

2. Requirements

Write a program that takes a size and a filename from the command line (argc, and argv). Use them to create a string buffer of fixed size and open the filename. Then create a producer that will read in the file line by line and a consumer that will take the lines and print them to the console.

We will use the pthread library to create and launch the threads. Let the main execution thread wait for the producer-consumer to finish before continuing and then exit. Use pthread_join to make the main thread wait for the producer and consumer threads. Use pthread_exit when the task is complete.

There will be three threads in your program: main thread, producer and consumer. You have to create the two threads that are producer and consumer. Remember that both threads have to have access to the buffer, so it has to be global to the threads. The pthread_create API requires that the starter functions be global to the module.

Initialize the semaphores using sem_init call and initialize full to 0, empty to buffer size and mutex to 1. Use sem_wait before attempting to write or read from the buffer. Use sem_post after to release it.

Use the linux semaphore API calls such as sem_init, sem_wait and sem_post. Remember that sem_wait is used when we want to access a resource and sem_post is used when we want to release it. Only do the minimum in your critical sections which would be accessing common buffer and buffer pointers (indices).

Semaphore API

Pthread API

Since this program will use pthread, you must link with the -lpthread on the g++ command line like this:
	g++ lab3.cpp -lpthread -o lab3

3. Write a program

Remember that the program has to take two command line arguments: size and filename. Check for the arguments and use them to create the buffer (a string array of given argument size) and open the filename. Make sure that the file is open before processing further.

Some hints: You can approach the problem one of two ways. If you are comfortable with object-oriented programming in C++:

Using a class to implement BoundedBuffer

Feel free to use C++11 threads

Here is a simplified tutorial on the native C++11 thread support. It breaks it down into a set of questions about how to use threads.

  • C++11 Multithreading FAQ
  • 
    #include <iostream>
    #include <thread>
    
    class BoundedBuffer
    {
      public: // Required to create std::thread with member function start.
    	void run_producer(BoundedBuffer* bb);
    	void run_consumer(BoundedBuffer* bb);
    };
    
    int main(int argc, char* argv[])
    {
    	// Parse the args, perhaps create the BoundedBuffer after 
    	// seeing that the buffer size and file will be given.
    
    	BoundedBuffer bb;
    
    	// create the threads
    	std::thread producer(BoundedBuffer::run_producer, &bb);
    	std::thread consumer(BoundedBuffer::run_consumer, &bb);
    
    
    	// wait for the job to be finished.
    	producer.join();
    	consumer.join();
    
    	return 0;
    }
    

    In the constructor, setup the semaphores using the sem_init, create the threads and join to them. In the thread startup fucntions, convert the parameter into a pointer to your class object using static_cast. Once converted, call the method that corresponds to the thread, i.e. for run_consumer call bbptr->consumer() and for run_producer call bbpr->producer();

    Using global functions and variables

    Alternate approach is just doing global functions as a C program. Declare all common resources as globals: buffer and semaphores, write an initialization function that creates the semaphores using sem_init and the pthreads using pthread_create and then pthread_join to them. Use two global functions to implement the producer and consumer directly. You have to open the file, so the file ifstream has to be global as well. This is not an elegant solution, but it may be easier than the object-oriented one.

    4. Compile and run the program

    To compile a program named lab3.cpp, you type in the following command:
    	g++ lab3.cpp -lpthread 
    

    If there is no compile time error, the executable file (by default, it's named a.out) will be created. You can then type the following in the terminal to run the program

    a.out
    

    Like all Unix commands, the g++ command has many options that you can use to control its behavior. You can type man g++ to view a very long list of all options and their descriptions. For now, please familiarize yourself with the following options to g++ command:

    Running the program (Sample Run)

    [harazduk@storm lab3]$ lab3 5 test_alarm.cpp
    #include <iostream>
    #include <unistd.h>
    #include <signal.h>
    
    using namespace std;
    
    const int TIMESLICE=3;
    bool alarmFired = false;
    
    /*
     * void sigalarm_handler(int signum)
     *      User defined signal handler for the signal number that is registered
     *      using signal(unsigned int signum, sighandler_t)
     * @param signum: signal number of the signal that caused the handler to fire.
     */
    void sigalarm_handler(int signum)
    {
        // To simulate a preemptive, timesliced scheduling system, a signal handler
        // must be registered that simulates the CPU timer that fires whenever the
        // system has to schedule another process.
        cout << "got signal " <<  signum << endl;
        alarmFired = true;
    }
    
    /*
     * main(void)
     *      Demonstrates the use of signal to register a signal handler for a signal
     *      as well as the use of alarm to set an alarm for TIMESLICE seconds. Then
     *      the process sleeps for some arbitrary amount of time. The process awakens
     *      due to the alarm since the sleep time is greater than the alarm time.
     */
    int main(void)
    {
            int timeRemaining = 16;
        signal(SIGALRM, sigalarm_handler);
        cout << "pid is " << getpid() << endl;
    
        while (timeRemaining > 0)
        {
            // Set the alarm each time through for the prescribed number of seconds.
            alarm(TIMESLICE);
    
            // Simulate CPU time with either sleep(int seconds) or
            // settimer(ITIMER_REAL, const struct itimerval *newval, struct itimerval *oldval)
            // Get the sleep seconds from the remaining process time.
            sleep(timeRemaining);
            cout << "ProcessName " << timeRemaining << endl;
    
            // If alarm has fired, then timeslice expired,
            // otherwise remaining time expired.
            if (alarmFired == true)
                timeRemaining -= TIMESLICE;
            else
                timeRemaining -= timeRemaining;
    
            // Print that process is finished if no time remaining.
            if (timeRemaining == 0)
                cout << "ProcessName finished" << endl;
    
            // Reset alarmFired.
            alarmFired = false;
        }
    }
    Bye!!
    [harazduk@storm lab3]$ 
    

    Compilation Errors

    If you experience compilation errors because errno, wait, fork or execv are unrecognized, you probably have left out an include file. Feel free to google or use the man command (i.e. man 3 exec) to see what include files are needed.
    
    #include <iostream>
    #include <fstream>
    #include "pthread.h"
    #include <string>
    #include <semaphore.h>
    

    5. Pthread man pages. (To learn more commands use man -k pthread)

    You will be using pthread_create, pthread_join and pthread_exit. You can use other pthread functions if you choose. You will need two global functions that are thread starter functions. Those global functions can take a pointer argument which can either be the buffer, a pointer to an object that you created or NULL. If you choose NULL, the buffer will have to be a global pointer that is accessible to both starter functions.

    In pthread_create, pass in the address of a concrete pthread_t object as in the following:

    
    	void* run_producer(void* buffptr);
    	pthread_t producer_thread;
    	pthread_create(&producer_thread, NULL, run_producer, (void*)&buffer);
    
    
    Remember that run_oroducer is the name a function defined as void* run_producer(void* param); Remember that buffptr is the address of the object. It can be shared because it is passed in to the function.

    NAME
           pthread_create - create a new thread
    
    SYNOPSIS
           #include <pthread.h>
    
           int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
                              void *(*start_routine) (void *), void *arg);
    
           Compile and link with -pthread.
    
    DESCRIPTION
           The  pthread_create()  function  starts  a  new  thread in the calling process.  The new thread starts execution by invoking
           start_routine(); arg is passed as the sole argument of start_routine().
    
           The new thread terminates in one of the following ways:
    
           * It calls pthread_exit(3), specifying an exit status value that is available to another thread in  the  same  process  that
             calls pthread_join(3).
    
           * It  returns  from  start_routine().   This  is equivalent to calling pthread_exit(3) with the value supplied in the return
             statement.
    
           * It is canceled (see pthread_cancel(3)).
    

    Once the threads are created, use pthread_join API to wait for the threads.

    
    NAME
           pthread_join - join with a terminated thread
    
    SYNOPSIS
           #include <pthread.h>
    
           int pthread_join(pthread_t thread, void **retval);
    
           Compile and link with -pthread.
    
    DESCRIPTION
           The  pthread_join()  function waits for the thread specified by thread to terminate.  If that thread has already terminated,
           then pthread_join() returns immediately.  The thread specified by thread must be joinable.
    

    Use pthread_exit to exit the thread when processing is completed.

    
    NAME
           pthread_exit - terminate calling thread
    
    SYNOPSIS
           #include <pthread.h>
    
           void pthread_exit(void *retval);
    
           Compile and link with -pthread.
    
    DESCRIPTION
           The pthread_exit() function terminates the calling thread and returns a value via retval that (if the thread is joinable) is
           available to another thread in the same process that calls pthread_join(3).
    

    6. Semaphore man pages. (To learn more commands: use man -k semaphore)

    Use sem_init to initalize the three semaphores needed to synchronize on the common buffer. Pass in the address of a concrete semaphore object into the function

    
    NAME
           sem_init - initialize an unnamed semaphore
    
    SYNOPSIS 
           #include <semaphore.h>
    
           int sem_init(sem_t *sem, int pshared, unsigned int value);
    
           Link with -pthread.
    
    DESCRIPTION
           sem_init()  initializes  the  unnamed  semaphore at the address pointed to by sem.  The value argument specifies the initial
           value for the semaphore.
    
           The pshared argument indicates whether this semaphore is to be shared between the threads of  a  process,  or  between  pro-
           cesses.
    
           If  pshared  has  the  value 0, then the semaphore is shared between the threads of a process, and should be located at some
           address that is visible to all threads (e.g., a global variable, or a variable allocated dynamically on the heap).
    

    Use sem_wait before entering the critical region in the producer and consumer functions.

    
    NAME
           sem_wait, sem_timedwait, sem_trywait - lock a semaphore
    
    SYNOPSIS 
           #include <semaphore.h>
    
           int sem_wait(sem_t *sem);
    
           int sem_trywait(sem_t *sem);
    
           int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
    
           Link with -pthread.
    
    DESCRIPTION
           sem_wait()  decrements  (locks)  the  semaphore  pointed to by sem.  If the semaphore's value is greater than zero, then the
           decrement proceeds, and the function returns, immediately.  If the semaphore currently has the value  zero,  then  the  call
           blocks  until  either it becomes possible to perform the decrement (i.e., the semaphore value rises above zero), or a signal
           handler interrupts the call.
    

    Use sem_post when exiting the critical regionl. Remember that the producer must sem_post(full) and the consumer must sem_post(empty)

    
    NAME
           sem_post - unlock a semaphore
    
    SYNOPSIS
           #include <semaphore.h>
    
           int sem_post(sem_t *sem);
    
           Link with -pthread.
    
    DESCRIPTION
           sem_post() increments (unlocks) the semaphore pointed to by sem.  If the semaphore's value consequently becomes greater than
           zero, then another process or thread blocked in a sem_wait(3) call will be woken up and proceed to lock the semaphore.
    

    6. Submit your program (lab3.cpp)

    The following submission method only works from storm server. Write me (jharazduk@fordham.edu) if you encounter any problem; include in your email the error message you received. To submit a file named “lab3.cpp” using command:
    	submitOS lab3.cpp
    
    You can then run command
    	verifyOS lab3.cpp
    
    To retrieve the file and verify that if the file has been submitted successfully. It should display the file you submitted back to you.

    If you want to submit linux.txt and lab3.cpp at the same time, use the command:

    	submitOS lab3.cpp
    
    Unfortunately, verifyOS only works on one file at a time.
    	verifyOS lab3.cpp
    

    NOTE: There are two errors that you may encounter. The first you can fix yourself: submitOS: command not found. Either you did not run ~harazduk/bin/configOS or you did but the change did not take affect. If OSGradedLabs directory exists in your home directory, then you it's the latter. Run bash on the command line alone and try submitOS again.

    	bash
    
    If you encounter another error, send me an email with the error and I will fix it for you. You have finished lab3.