The Java concurrency API provides a synchronization utility that allows the interchange of data between two concurrent tasks. In more detail, the Exchanger class allows the definition of a synchronization point between two threads. When the two threads arrive to this point, they interchange a data structure so the data structure of the first thread goes to the second one and the data structure of the second thread goes to the first one.
This class may be very useful in a situation similar to the producer-consumer problem. This is a classic concurrent problem where you have a common buffer of data, one or more producers of data, and one or more consumers of data. As the Exchanger class only synchronizes two threads, you can use it if you have a producer-consumer problem with one producer and one consumer.
In this recipe, you will learn how to use the Exchanger class to solve the producer-consumer problem with one producer and one consumer.
Getting ready
The example of this recipe has been implemented using the Eclipse IDE. If you use Eclipse or other IDE like NetBeans, open it and create a new Java project.
How to do it...
Follow these steps to implement the example:
1. First, let's begin by implementing the producer. Create a class named Producer and specify that it implements the Runnable interface.
public class Producer implements Runnable {
2. Declare a List<String> object named buffer. This will be the data structure that the producer will interchange with the consumer.
private List<String> buffer;
3. Declare an Exchanger<List<String>> object named exchanger. This will be the exchanger object that will be used to synchronize producer and consumer.
private final Exchanger<List<String>> exchanger;
4. Implement the constructor of the class that initializes the two attributes.
public Producer (List<String> buffer, Exchanger<List<String>>
exchanger){
this.buffer=buffer;
this.exchanger=exchanger;
}
5. Implement the run() method. Inside it, implement 10 cycles of interchange.
@Override
public void run() { int cycle=1;
for (int i=0; i<10; i++){
System.out.printf("Producer: Cycle %d\n",cycle);
6. In each cycle, add 10 strings to the buffer.
for (int j=0; j<10; j++){
String message="Event "+((i*10)+j);
System.out.printf("Producer: %s\n",message);
buffer.add(message);
}
7. Call the exchange() method to interchange data with the consumer. As this method can throw an InterruptedException exception, you have to add the code to process it.
try {
buffer=exchanger.exchange(buffer);
} catch (InterruptedException e) { e.printStackTrace();
}
System.out.println("Producer: "+buffer.size());
cycle++;
}
8. Now, let's implement the consumer. Create a class named Consumer and specify that it implements the Runnable interface.
public class Consumer implements Runnable {
9. Declare a List<String> object named buffer. This will be the data structure that the producer will interchange with the consumer.
private List<String> buffer;
10. Declare an Exchanger<List<String>> object named exchanger. This will be the exchanger object that will be used to synchronize producer and consumer.
private final Exchanger<List<String>> exchanger;
11. Implement the constructor of the class that initializes the two attributes.
public Consumer(List<String> buffer, Exchanger<List<String>>
exchanger){
this.buffer=buffer;
this.exchanger=exchanger;
}
12. Implement the run() method. Inside it, implement 10 cycles of interchange.
@Override
public void run() { int cycle=1;
for (int i=0; i<10; i++){
System.out.printf("Consumer: Cycle %d\n",cycle);
13. In each cycle, begin with a call to the exchange() method to synchronize with the producer. The consumer needs data to consume. As this method can throw an InterruptedException exception, you have to add the code to process it.
try {
buffer=exchanger.exchange(buffer);
} catch (InterruptedException e) { e.printStackTrace();
}
14. Write the 10 strings the producer sent in its buffer to the console and delete them from the buffer, to leave it empty.
System.out.println("Consumer: "+buffer.size());
for (int j=0; j<10; j++){
String message=buffer.get(0);
System.out.println("Consumer: "+message);
buffer.remove(0);
}
cycle++;
}
15. Now, implement the main class of the example by creating a class named Core and add the main() method to it.
public class Core {
public static void main(String[] args) {
16. Create the two buffers that will be used by the producer and the consumer.
List<String> buffer1=new ArrayList<>();
List<String> buffer2=new ArrayList<>();
17. Create the Exchanger object that will be used to synchronize the producer and the consumer.
Exchanger<List<String>> exchanger=new Exchanger<>();
18. Create the Producer object and the Consumer object.
Producer producer=new Producer(buffer1, exchanger);
Consumer consumer=new Consumer(buffer2, exchanger);
19. Create the threads to execute the producer and the consumer and start the threads.
Thread threadProducer=new Thread(producer);
Thread threadConsumer=new Thread(consumer);
threadProducer.start();
threadConsumer.start();
How it works...
The consumer begins with an empty buffer and calls Exchanger to synchronize with the producer. It needs data to consume. The producer begins its execution with an empty buffer.
It creates 10 strings, stores it in the buffer, and uses the exchanger to synchronize with the consumer.
At this point, both threads (producer and consumer) are in Exchanger and it changes the data structures, so when the consumer returns from the exchange() method, it will have a buffer with 10 strings. When the producer returns from the exchange() method, it will have an empty buffer to fill again. This operation will be repeated 10 times.
If you execute the example, you will see how producer and consumer do their jobs
concurrently and how the two objects interchange their buffers in every step. As it occurs with other synchronization utilities, the first thread that calls the exchange() method was put to sleep until the other threads arrived.
There's more...
The Exchanger class has another version of the exchange method: exchange(Vdata, longtime,TimeUnitunit) where V is the type used as a parameter in the declaration of Phaser (List<String> in our case). The thread will be sleeping until it's interrupted, the other thread arrives, or the specified time passes. The TimeUnit class is an enumeration with the following constants: DAYS, HOURS, MICROSECONDS, MILLISECONDS, MINUTES, NANOSECONDS, and SECONDS.