공부기록
쓰레드(2) 본문
쓰레드의 동기화
쓰레드간의 상호배제를 지키기 위해, 임계영역에 하나의 쓰레드만 들어오도록 락을 걸 수 있다. 이 처럼 한 스레드가 진행 중인 작업을 다른 쓰레드가 간섭하지 못하도록 막는 것을 '스레드의 동기화 (synchronization)' 이라고 한다.
synchronized를 이용한 동기화
synchronized 키워드를 이용한 동기화에는 두 가지 방식이 있다.
- 메서드 전체를 임계영역으로 지정
public synchronized void calcSum(){
/* 모두 임계 영역이 된다.*/
}
이 경우, 해당 메서드가 포함된 객체의 lock을 얻어 작업을 수행하다가 메서드가 종료되면 lock을 반환한다.
- 특정한 영역을 임계영역으로 지정
synchronized(refObj){
/* 모두 임계영역이 된다. */
}
refObj는 락을 걸고자 하는 객체의 레퍼런스여야 한다. 이 synchronized 블럭에 들어가면서 refObj의 lock을 획득하고 나가면서 lock을 해제하게 된다.
단 하나의 쓰레드만이 lock을 획득할 수 있기에 퍼포먼스에 이슈가 생길 수 있다.
public class ThreadEx21 {
public static void main(String[] args){
Runnable r = new RunnableEx21();
new Thread(r).start();
new Thread(r).start();
}
}
class Account{
private int balance = 1000;
public int getBalance(){
return balance;
}
public void withdraw(int money){
if(balance >= money){
try{
Thread.sleep(1000);
}catch (InterruptedException e){
}
balance -= money;
}
}
}
class RunnableEx21 implements Runnable{
Account acc = new Account();
public void run(){
while(acc.getBalance() > 0){
int money = (int)(Math.random()*3+1)*100;
acc.withdraw(money);
System.out.println("balance:"+acc.getBalance());
}
}
}
/* output
balance:900
balance:800
balance:600
balance:300
balance:200
balance:200
balance:-200
balance:-200
*/
다음의 경우, acc 객체에 대해 동기화를 해주지 않고 두 개의 쓰레드를 실행시킨 상황이다. 각 쓰레드가 다른 쓰레드가 빼는 값을 고려하지 않고 동시에 인출하여, 음수가 나와버렸다.
public class ThreadEx22 {
public static void main(String[] args){
Runnable r = new RunnableEx22();
new Thread(r).start();
new Thread(r).start();
}
}
class AccountSync{
private int balance = 1000;
public int getBalance(){
return balance;
}
public synchronized void withdraw(int money){
if(balance >= money){
try{
Thread.sleep(1000);
}catch (InterruptedException e){
}
balance -= money;
}
}
}
class RunnableEx22 implements Runnable{
AccountSync acc = new AccountSync();
public void run(){
while(acc.getBalance() > 0){
int money = (int)(Math.random()*3+1)*100;
acc.withdraw(money);
System.out.println("balance:"+acc.getBalance());
}
}
}
/* output
balance:700
balance:600
balance:400
balance:100
balance:0
balance:0
Process finished with exit code 0
*/
다음은 acc 객체에 대해 동기화해준 상황이다. 오류가 나지 않고 잘 수행되었다. 각 쓰레드가 순차적으로 인출하여 오류가 나지 않게 되었다.
wait()와 notify()
단순히 synchronized로 락을 걸면, 쓰레드가 작업을 한동안 진행할 수 없는 상황임에도 락을 가지게 되어 비효율적인 프로그램을 만들 수 있다.
이때 wait()와 notify()를 사용하여 더 효율적으로 프로그램을 개선할 수 있다.
오랜기간 대기해야 되는 상황에서 wait()를 사용하여 쓰레드가 락을 반납하게 되고, notify()를 호출하면 대기하고 있던 쓰레드가 다시 락을 얻어 작업을 진행할 수 있게된다.
근데 문제가 있다. 기아현상이 일어날 수 있다는 것이다. notify()는 오래 기다린 쓰레드에게 락을 준다는 보장이 없기 때문에, 이러한 문제가 발생할 수 있다.
또한, reader와 writer가 있고, 이들이 각각 wait()를 하고 notify()를 할 때, 우리가 당연히 생각해보면 writer는 reader를 notify해서 읽게 할 것이고, reader는 writer에게 책을 다 읽었으니 쓰라고 notify() 할 텐데, 문제는 이 둘을 구별할 수 없어 누가 깨어날지를 운에 맞기게 되는 프로그램을 자게 된다. 이 때문에 조건으로 동기화 하는 방법을 더 많이 쓰게 된다.
Lock과 Condition을 이용한 동기화
lock 클래스들이 있으며 이들의 종류는 다음과 같다.
ReentrantLock : 재진입이 가능한 lock, 가장 일반적인 배타lock
ReentrantReadWriteLock : 읽기에는 공유적이고, 쓰기에는 배타적인 lock
StampedLock : ReentrantReadWriteLock에 낙관적인 lock의 기능을 추가
SampedLock의 '낙관적 읽기 lock'은 쓰기 lock에 바로 풀린다.
ReentrantLock에는 두 가지 생성자가 있다.
ReentrantLock()
ReentrantLock(boolean fair)
fair가 true이면 오래 기다린 순으로 공정하게 lock을 부여하나, 이는 오버헤드가 생긴다.
lock을 관리하는 메서드에는 다음과 같은 것들이 있다.
void lock() : lock을 잠근다.
void unlock() : lock을 해지한다.
boolean isLocked() : lock이 잠겼는지 확인한다.
lock을 사용하는 방법은 다음과 같다.
lock.lock();
/* 임계영역 */
lock.unlock();
lock()
은 무한정 기다리는데 반에, 이로 인한 비효율성을 줄이기 위한 trylock()
도 있다.
boolean tryLock()
boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException
tryLock()
은 lock을 획득하면 true, 그렇지 않으면 false를 리턴한다.
아래의 메서드는 interrupt()
에 의해 작업이 취소될 수 있도록 코드를 작성할 수 있다.
ReentranLock과 Condition
Condition은 이미 생성된 lock으로 부터 newCondition()을 호출하여 생성한다.
대표적인 함수로 await()
와 signal()
이 있다.
await()
는 해당 condition의 큐에 이 메서드를 호출한 쓰레드를 대기시키는 것이고, signal()
은 해당 condition의 큐에 대기하는 쓰레드중 하나를 깨우는 것이다.
import java.util.ArrayList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadWaitEx1 {
public static void main(String[] args) throws Exception{
Table table = new Table();
new Thread(new Cook(table), "COOK1").start();
new Thread(new Customer(table, "donut"), "CUST1").start();
new Thread(new Customer(table, "burger"), "CUST2").start();
Thread.sleep(2000);
System.exit(0);
}
}
class Customer implements Runnable{
private Table table;
private String food;
Customer(Table table, String food){
this.table = table;
this.food = food;
}
public void run(){
while(true){
try{
Thread.sleep(10);
}catch (InterruptedException e){
}
String name = Thread.currentThread().getName();
table.remove(food);
System.out.println(name + " ate a " + food);
}
}
}
class Cook implements Runnable{
private Table table;
Cook(Table table){
this.table = table;
}
public void run(){
while(true){
int idx = (int)(Math.random()*table.dishNum());
table.add(table.dishNames[idx]);
try{
Thread.sleep(1);
}catch (InterruptedException e){}
}
}
}
class Table{
String[] dishNames = {"donut", "donut", "burger"};
final int MAX_FOOD = 6;
private ArrayList<String> dishes = new ArrayList<>();
private ReentrantLock lock = new ReentrantLock();
private Condition forCook = lock.newCondition();
private Condition forCust = lock.newCondition();
public synchronized void add(String dish){
lock.lock();
try {
while (dishes.size() >= MAX_FOOD) {
String name = Thread.currentThread().getName();
System.out.println(name + " is waiting.");
try {
forCook.await();
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
dishes.add(dish);
forCust.signal();
System.out.println("Dishes:" + dishes.toString());
}finally {
lock.unlock();
}
}
public void remove(String dishName){
lock.lock();
try {
synchronized (this) {
String name = Thread.currentThread().getName();
while (dishes.size() == 0) {
System.out.println(name + " is waiting");
try {
forCust.await();
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
while (true) {
for (int i = 0; i < dishes.size(); i++) {
if (dishName.equals(dishes.get(i))) {
dishes.remove(i);
forCook.signal();
return;
}
}
try {
System.out.println(name + " is waiting.");
forCook.await();
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
}
}finally {
lock.unlock();
}
}
public int dishNum(){
return dishNames.length;
}
}
/* output
Dishes:[burger]
Dishes:[burger, burger]
Dishes:[burger, burger, donut]
Dishes:[burger, burger, donut, burger]
Dishes:[burger, burger, donut, burger, donut]
Dishes:[burger, burger, donut, burger, donut, burger]
CUST1 ate a donut
CUST2 ate a burger
Dishes:[burger, burger, donut, burger, donut]
Dishes:[burger, burger, donut, burger, donut, burger]
CUST2 ate a burger
CUST1 ate a donut
Dishes:[burger, donut, donut, burger, burger, donut]
CUST2 ate a burger
CUST1 ate a donut
Dishes:[donut, burger, burger, donut, donut]
Dishes:[donut, burger, burger, donut, donut, donut]
CUST1 ate a donut
CUST2 ate a burger
Dishes:[burger, burger, donut, donut, donut, donut]
CUST2 ate a burger
CUST1 ate a donut
Dishes:[burger, donut, donut, donut, donut]
Dishes:[burger, donut, donut, donut, donut, burger]
CUST1 ate a donut
CUST2 ate a burger
Dishes:[donut, donut, donut, burger, donut]
Dishes:[donut, donut, donut, burger, donut, donut]
CUST2 ate a burger
CUST1 ate a donut
Dishes:[donut, donut, burger, donut, donut]
Dishes:[donut, donut, burger, donut, donut, donut]
CUST1 ate a donut
CUST2 ate a burger
Exception in thread "COOK1" java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911)
at java.util.ArrayList$Itr.next(ArrayList.java:861)
at java.util.AbstractCollection.toString(AbstractCollection.java:461)
at Table.add(ThreadWaitEx1.java:77)
at Cook.run(ThreadWaitEx1.java:58)
at java.lang.Thread.run(Thread.java:748)
CUST2 failed to eat. :(
CUST1 ate a donut
Process finished with exit code 0
*/
/* output2
Dishes:[burger]
Dishes:[burger, donut]
Dishes:[burger, donut, burger]
Dishes:[burger, donut, burger, donut]
Dishes:[burger, donut, burger, donut, donut]
Dishes:[burger, donut, burger, donut, donut, burger]
CUST2 ate a burger
CUST1 ate a donut
Dishes:[burger, donut, donut, burger, donut]
Dishes:[burger, donut, donut, burger, donut, burger]
CUST1 ate a donut
Dishes:[donut, burger, donut, burger, donut]
CUST2 ate a burger
Dishes:[donut, burger, donut, burger, donut, donut]
CUST2 ate a burger
Dishes:[donut, donut, burger, donut, donut, donut]
CUST1 ate a donut
Dishes:[donut, burger, donut, donut, donut, burger]
CUST1 ate a donut
Dishes:[burger, donut, donut, donut, burger, burger]
CUST2 ate a burger
Dishes:[donut, donut, donut, burger, burger, donut]
CUST2 ate a burger
Dishes:[donut, donut, donut, burger, donut, donut]
CUST1 ate a donut
Dishes:[donut, donut, burger, donut, donut, donut]
CUST2 ate a burger
Dishes:[donut, donut, donut, donut, donut, donut]
CUST1 ate a donut
Dishes:[donut, donut, donut, donut, donut, burger]
CUST2 ate a burger
Dishes:[donut, donut, donut, donut, donut, donut]
CUST1 ate a donut
Dishes:[donut, donut, donut, donut, donut, donut]
CUST2 failed to eat. :(
CUST1 ate a donut
Dishes:[donut, donut, donut, donut, donut, donut]
CUST2 failed to eat. :(
CUST1 ate a donut
Dishes:[donut, donut, donut, donut, donut, burger]
Process finished with exit code 0
*/
위의 코드를 설명하면 다음과 같다. 먼저 Table은 손님과 요리사의 공유자원이다. 따라서 lock이 걸리게 된다.
요리사가 테이블을 사용할 때는 add()
를 호출한다. add()
에서 요리사는 Table에 lock을 걸고 들어간다. 테이블에 요리가 가득 차 있다면 요리사를 대기시키고 lock을 반납한 뒤, 추후에 signal()
이 호출되면 일어난다. 요리에 빈 자리가 있다면 요리를 추가하고 손님을 깨우고 lock을 반납한다.
손님은 테이블을 사용할 때 remove()
를 호출한다. remove()
에서 손님은 Table에 lock을 걸고 들어간다. 테이블에 요리가 없다면 손님은 lock을 반납한 뒤 요리사의 손님에 대한 signal()
에 의해 깨어난다. 손님은 요리를 먹고 요리가 없다고 오리사를 호출한 뒤 lock을 반납한다.
volatile
코어의 캐시와 메모리의 정보는 일치하지 않을 수 있다. 이를 동기화 하면서 사용하기 위해 volatile 키워드를 사용하여, 메모리의 정보를 읽어와 동기화 해줄 수 있다.
변수를 읽거나 쓰는 함수에 synchronized 블럭을 사용해도 같은 효과를 얻을 수 있다.
JVM은 데이터를 4 byte (32-bit) 단위로 처리하는데, 이 때 long, double은 한 번에 읽지 못하고 이 때 다른 쓰레드가 끼어들어와 문제가 생길 여지가 있다. 이 때 long, double 변수를 선언할 때 다음과 같이 volatile 키워드를 붙여주면 연산을 원자화 할 수 있다.
같은 의미로, synchronized 키워드도 함수를 원자화 할 수 있다.
volatile long balance;
synchronized int getbalance(){
return balance;
}
fork & join 프레임워크
fork & join 프레임워크는 하나의 작업을 여러 개로 쪼개서 병렬로 처리할 수 있도록 만들어 준다. 수행할 작업에 따라 두 개의 클래스를 이용한다.
RecursiveAction : 변환값이 없는 작업을 구현할 때 사용
RecursiveTask : 변환값이 있는 작업을 구현할 때 사용
두 클래스 모두 compute()
라는 추상 클래스를 가지며 상속을 통해 이를 재귀로 구현하기만 하면 된다.
class SumTask extends RecursiveTask<Long>{
public long compute(){
}
}
그 후, 다음과 같이 작업을 수행한다.
ForkJoinPool pool = new ForkJoinPool(); // 쓰레드 풀 생성
SumTask task = new SumTask(from, to); // 작업 생성
Long result = pool.invoke(task) // invole()를 호출하여 작업 시작
작업을 수행하는 과정에서 각 쓰레드가 똑같은 양을 일을 할 수 있도록 서로 작업을 훔쳐온다. (work stealing)
이는 쓰레드 풀에 의해서 관리된다.
fork()와 join()
fork() : 해당 작업을 쓰레드 풀의 작업 큐에 넣는다. 비동기 메서드
join() : 해당 작업의 수행이 끝날 때 까지 기다렸다가, 끝나면 결과 반환. 동기 메서드
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class ForkJoinEx1 {
static final ForkJoinPool pool = new ForkJoinPool();
public static void main(String[] args){
long from = 1l, to = 100_000_000L;
SumTask task = new SumTask(from, to);
long start = System.currentTimeMillis();
Long result = pool.invoke(task);
System.out.println("Elapsed time(4 Core) : " + (System.currentTimeMillis() - start));
System.out.printf("sum of %d~%d\n", from, to, result);
System.out.println();
result = 0L;
start = System.currentTimeMillis();
for(long i=from; i<=to; i++)
result += i;
System.out.println("Elapsed time(1core) : " + (System.currentTimeMillis() - start));
}
}
class SumTask extends RecursiveTask<Long> {
long from, to;
SumTask(long from, long to){
this.from = from;
this.to = to;
}
public Long compute(){
long size = to - from + 1;
if(size <= 5)
return sum();
long half = (from+to)/2;
SumTask leftSum = new SumTask(from, half);
SumTask rightSum = new SumTask(half+1, to);
leftSum.fork();
return rightSum.compute() + leftSum.join();
}
long sum(){
long tmp = 0L;
for(long i=from; i<=to; i++)
tmp += i;
return tmp;
}
}
/* output
Elapsed time(4 Core) : 491
sum of 1~100000000
Elapsed time(1core) : 606
Process finished with exit code 0
*/
위 코드를 보자. 수행될 작업을 재쉬적으로 생성하고, 쓰레드 풀을 생성한 뒤, 쓰레드 풀에 작업을 넣고 스레드풀이 작업을 하도록 시작한다.
compute()
에서 왼쪽 작업은 큐에 넣어지고 오른쪽 작업은 compute()
가 수행된다. 오른쪽 작업이 끝나면 큐에 넣은 왼쪽 작업이 수행될 때 까지 기다리게 된다.
출처
JAVA의 정석 (3rd Edition), 남궁 성
'Programming > JAVA' 카테고리의 다른 글
ITEM 2 : 생성자에 매개변수가 많다면 빌더를 고려하라 (0) | 2021.12.03 |
---|---|
item1. 생성자 대신 정적 팩터리 메서드를 고려하라 (0) | 2021.12.01 |
쓰레드(1) (0) | 2021.10.25 |
JVM (0) | 2021.05.29 |