My simple library

..of useful code



Chapters

Java Multithreading Concepts


Multiprocessing

Process-based multitasking is a heavyweight process and occupies different address spaces in memory. Hence, while switching from one process to another, it will require some time (be it very small), causing a lag because of switching. This happens as registers will be loaded in memory maps and the list will be updated.

Multithreading

Thread-based multitasking is a lightweight process and occupies the same address space. Hence, while switching, the cost of communication will be very less.

Creating Threads

There are two ways to create threads in Java:

  1. Extending the Thread class
  2. Implementing the Runnable interface

Example: Extending Thread class

class MyThread extends Thread {
    // run() method to transit thread from runnable to run state
    public void run() {
        // Printing the current running thread via getName()
        System.out.println("Running Thread: " + currentThread().getName());

        // Print the priority of current thread
        System.out.println("Running Thread Priority: " + currentThread().getPriority());
    }
}

class GFG {
    public static void main(String[] args) {
        MyThread t1 = new MyThread();
        MyThread t2 = new MyThread();

        t1.setPriority(Thread.NORM_PRIORITY); // Default priority (5)
        t2.setPriority(Thread.NORM_PRIORITY);

        t1.start();
        t2.start();
    }
}
Note: You can also create a thread using new Thread(class implementing Runnable).

Daemon Threads

Characteristics of Daemon threads:

  • It is only a service provider thread, not responsible for interpretation in user threads.
  • It is a low-priority thread.
  • It is a dependent thread as it has no existence on its own.
  • JVM terminates the thread as soon as user threads are terminated and comes back into play when user threads start.
  • The most popular example is the garbage collector in Java. Some other examples include 'finalizer'.

If the main thread finishes, the daemon thread does not complete its execution and stops abruptly.

ReentrantLock

class Geeks {
    // counter value shared across the threads
    private static int c = 0;

    // Lock object
    private static ReentrantLock lock = new ReentrantLock();

    public static void increment() {
        // acquire the lock
        lock.lock();
        try {
            c++;
            System.out.println(Thread.currentThread().getName() +
                             " incremented counter to: " + c);
        } finally {
            // release the lock
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        Runnable task = () -> {
            for (int i = 1; i < 3; i++) {
                increment();
            }
        };

        Thread t1 = new Thread(task, "Thread-1");
        Thread t2 = new Thread(task, "Thread-2");

        t1.start();
        t2.start();
    }
}

Synchronized Blocks

A synchronized block in Java is synchronized on some object. Synchronized blocks in Java are marked with the synchronized keyword. All synchronized blocks synchronize on the same object and can only have one thread executed inside them at a time. All other threads attempting to enter the synchronized block are blocked until the thread inside the synchronized block exits the block.

Method-level synchronization:

public synchronized void inc() {
    c++;
}

Block-level synchronization:

synchronized(dataType) {
    dataType.doSomething();
}

Wait/Notify Mechanism

What is Polling, and what are the problems with it?

The process of testing a condition repeatedly till it becomes true is known as polling. Polling is usually implemented with the help of loops to check whether a particular condition is true or not. If it is true, a certain action is taken. This wastes many CPU cycles and makes the implementation inefficient.

To avoid polling, Java uses three methods: wait(), notify(), and notifyAll(). All these methods belong to object class as final so that all classes have them. They must be used within a synchronized block only.

  • wait(): It tells the calling thread to give up the lock and go to sleep until some other thread enters the same monitor and calls notify().
  • notify(): It wakes up one single thread called wait() on the same object. It should be noted that calling notify() does not give up a lock on a resource.
  • notifyAll(): It wakes up all the threads called wait() on the same object.

Producer-Consumer Example:

// Shared queue used by both producer and consumer
private static final Queue queue = new LinkedList<>();
// Maximum capacity of the queue
private static final int CAPACITY = 10;

// Producer task
private static final Runnable producer = new Runnable() {
    public void run() {
        while (true) {
            synchronized (queue) {
                // Wait if the queue is full
                while (queue.size() == CAPACITY) {
                    try {
                        System.out.println("Queue is at max capacity");
                        queue.wait(); // Release the lock and wait
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // Add item to the queue
                queue.add(10);
                System.out.println("Added 10 to the queue");
                queue.notifyAll(); // Notify all waiting consumers
                try {
                    Thread.sleep(2000); // Simulate some delay in production
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
};

// Consumer task
private static final Runnable consumer = new Runnable() {
    public void run() {
        while (true) {
            synchronized (queue) {
                // Wait if the queue is empty
                while (queue.isEmpty()) {
                    try {
                        System.out.println("Queue is empty, waiting");
                        queue.wait(); // Release the lock and wait
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // Remove item from the queue
                System.out.println("Removed " + queue.remove() + " from the queue");
                queue.notifyAll(); // Notify all waiting producers
                try {
                    Thread.sleep(2000); // Simulate some delay in consumption
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
};

public static void main(String[] args) {
    System.out.println("Main thread started");
    // Create and start the producer thread
    Thread producerThread = new Thread(producer, "Producer");
    // Create and start the consumer thread
    Thread consumerThread = new Thread(consumer, "Consumer");
    producerThread.start();
    consumerThread.start();
    System.out.println("Main thread exiting");
}

Volatile vs Synchronized

Two important features of locks and synchronization:

  • Mutual Exclusion: Only one thread or process can execute a block of code (critical section) at a time.
  • Visibility: Changes made by one thread to shared data are visible to other threads.

Java's synchronized keyword guarantees both mutual exclusion and visibility. If we make the blocks of threads that modify the value of the shared variable synchronized, only one thread can enter the block and changes made by it will be reflected in the main memory.

In some cases, we may only desire visibility and not atomicity. The use of synchronized in such a situation is overkill and may cause scalability problems. Here volatile comes to the rescue. Volatile variables have the visibility features of synchronized but not the atomicity features. The values of the volatile variable will never be cached and all writes and reads will be done to and from the main memory.

Volatile Example:

public class VolatileTest {
    private static final Logger LOGGER = MyLoggerFactory.getSimplestLogger();
    private static volatile int MY_INT = 0;

    public static void main(String[] args) {
        new ChangeListener().start();
        new ChangeMaker().start();
    }

    static class ChangeListener extends Thread {
        @Override public void run() {
            int local_value = MY_INT;
            while (local_value < 5) {
                if (local_value != MY_INT) {
                    LOGGER.log(Level.INFO, "Got Change for MY_INT: {0}", MY_INT);
                    local_value = MY_INT;
                }
            }
        }
    }

    static class ChangeMaker extends Thread {
        @Override public void run() {
            int local_value = MY_INT;
            while (MY_INT < 5) {
                LOGGER.log(Level.INFO, "Incrementing MY_INT to {0}", local_value + 1);
                MY_INT = ++local_value;
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Atomic Classes

Java provides atomic classes such as AtomicInteger, AtomicLong, AtomicBoolean and AtomicReference. Objects of these classes represent the atomic variable of int, long, boolean, and object reference respectively.

These classes contain the following methods:

  • set(int value): Sets to the given value
  • get(): Gets the current value
  • lazySet(int value): Eventually sets to the given value
  • compareAndSet(int expect, int update): Atomically sets the value to the given updated value if the current value == the expected value
  • addAndGet(int delta): Atomically adds the given value to the current value
  • decrementAndGet(): Atomically decrements by one the current value

Example:

class Counter extends Thread {
    // Atomic counter Variable
    AtomicInteger count;

    // Constructor of class
    Counter() {
        count = new AtomicInteger();
    }

    public void run() {
        int max = 1_000_00_000;

        // incrementing counter total of max times
        for (int i = 0; i < max; i++) {
            count.addAndGet(1);
        }
    }
}

public class AtomicCounter {
    public static void main(String[] args) throws InterruptedException {
        // Instance of Counter Class
        Counter c = new Counter();

        // Defining Two different threads
        Thread first = new Thread(c, "First");
        Thread second = new Thread(c, "Second");

        // Threads start executing
        first.start();
        second.start();

        // main thread will wait for both threads to complete execution
        first.join();
        second.join();

        // Printing final value of count variable
        System.out.println(c.count);
    }
}

Mutex (Mutual Exclusion)

  • At any time, only one process is allowed to enter its critical section.
  • The solution is implemented purely in software on a machine.
  • A process remains inside its critical section for a bounded time only.
  • No assumption can be made about the relative speeds of asynchronous concurrent processes.
  • A process cannot prevent any other process from entering into a critical section.
  • A process must not be indefinitely postponed from entering its critical section.

What is a race condition?

A race condition occurs when multiple processes or threads access shared data concurrently, and the final outcome of the program depends on the relative timing of their execution. This can lead to unexpected and undesired results, such as data corruption, incorrect calculations, or program crashes.

Semaphores

A semaphore can be seen as a lock that allows multiple threads to coexist in a certain critical region at a given time. The semaphore uses a counter that determines how many threads can still enter. Once it reaches the semaphore, a thread is only allowed to enter if the number of threads in the critical section is less than the maximum number of threads set when creating the semaphore. The respective thread announces that it has exited via a release call.

When a semaphore is initialized with a positive value x, you can think that x threads are allowed to enter the critical section. As a thread does acquire(), x is decremented. When it reaches 0, other possible threads that want to access the critical region will have to wait until the semaphore's value increases to a positive value (i.e., until one of the threads in the critical region exits).

Complementary, when a semaphore is initialized with a negative value such as -1, it is expected that at least two threads will first do release() (for the semaphore's value to increase from -1 to 1), before a critical region can be accessed (i.e., another thread can do acquire()). You can imagine that a third thread is waiting "at the semaphore" for two other threads to give it a signal by calling release() when they have "finished their work".

Semaphores in Java are represented by the Semaphore class. When creating semaphore objects, we can use the following constructors:

  • Semaphore(int permits)
  • Semaphore(int permits, boolean fair)

We pass the following to the constructor:

  • int permits - the initial and maximum value of the counter. In other words, this parameter determines how many threads can access the shared resource simultaneously.
  • boolean fair - establishes the order in which threads will have access. If fair is true, then access is granted to waiting threads in the order in which they requested it. If false, then the order is determined by the thread scheduler.

Dining Philosophers Example:

class Philosopher extends Thread {
    private Semaphore sem;
    private boolean full = false;
    private String name;

    Philosopher(Semaphore sem, String name) {
        this.sem = sem;
        this.name = name;
    }

    public void run() {
        try {
            if (!full) {
                sem.acquire();
                System.out.println(name + " takes a seat at the table");

                sleep(300);
                full = true;

                System.out.println(name + " has eaten! He leaves the table");
                sem.release();

                sleep(300);
            }
        } catch(InterruptedException e) {
            System.out.println("Something went wrong!");
        }
    }
}

public class Main {
    public static void main(String[] args) {
        Semaphore sem = new Semaphore(2);
        new Philosopher(sem, "Socrates").start();
        new Philosopher(sem, "Plato").start();
        new Philosopher(sem, "Aristotle").start();
        new Philosopher(sem, "Thales").start();
        new Philosopher(sem, "Pythagoras").start();
    }
}

Output:

Socrates takes a seat at the table
Plato takes a seat at the table
Socrates has eaten! He leaves the table
Plato has eaten! He leaves the table
Aristotle takes a seat at the table
Pythagoras takes a seat at the table
Aristotle has eaten! He leaves the table
Pythagoras has eaten! He leaves the table
Thales takes a seat at the table
Thales has eaten! He leaves the table

Final Keyword

static final double EULERCONSTANT;
final int CAPACITY;

// instance initializer block for initializing CAPACITY
{
    CAPACITY = 25;
}

static {
    EULERCONSTANT = 2.3;
}

Final variables can also be initialized in the constructor. Final classes cannot be extended.