공부기록

쓰레드(2) 본문

Programming/JAVA

쓰레드(2)

코타쿠 2021. 10. 25. 23:16

쓰레드의 동기화

쓰레드간의 상호배제를 지키기 위해, 임계영역에 하나의 쓰레드만 들어오도록 락을 걸 수 있다. 이 처럼 한 스레드가 진행 중인 작업을 다른 쓰레드가 간섭하지 못하도록 막는 것을 '스레드의 동기화 (synchronization)' 이라고 한다.

synchronized를 이용한 동기화

synchronized 키워드를 이용한 동기화에는 두 가지 방식이 있다.

  1. 메서드 전체를 임계영역으로 지정
public synchronized void calcSum(){
    /* 모두 임계 영역이 된다.*/
}

이 경우, 해당 메서드가 포함된 객체의 lock을 얻어 작업을 수행하다가 메서드가 종료되면 lock을 반환한다.

  1. 특정한 영역을 임계영역으로 지정
synchronized(refObj){
    /* 모두 임계영역이 된다. */
}

refObj는 락을 걸고자 하는 객체의 레퍼런스여야 한다. 이 synchronized 블럭에 들어가면서 refObj의 lock을 획득하고 나가면서 lock을 해제하게 된다.

단 하나의 쓰레드만이 lock을 획득할 수 있기에 퍼포먼스에 이슈가 생길 수 있다.

public class ThreadEx21 {
    public static void main(String[] args){
        Runnable r = new RunnableEx21();
        new Thread(r).start();
        new Thread(r).start();
    }
}

class Account{
    private int balance = 1000;

    public int getBalance(){
        return balance;
    }

    public void withdraw(int money){
        if(balance >= money){
            try{
                Thread.sleep(1000);
            }catch (InterruptedException e){

            }
            balance -= money;
        }
    }

}

class RunnableEx21 implements Runnable{

    Account acc = new Account();

    public void run(){

        while(acc.getBalance() > 0){
            int money = (int)(Math.random()*3+1)*100;
            acc.withdraw(money);
            System.out.println("balance:"+acc.getBalance());
        }

    }
}

/* output

balance:900
balance:800
balance:600
balance:300
balance:200
balance:200
balance:-200
balance:-200

 */

다음의 경우, acc 객체에 대해 동기화를 해주지 않고 두 개의 쓰레드를 실행시킨 상황이다. 각 쓰레드가 다른 쓰레드가 빼는 값을 고려하지 않고 동시에 인출하여, 음수가 나와버렸다.

public class ThreadEx22 {
    public static void main(String[] args){
        Runnable r = new RunnableEx22();
        new Thread(r).start();
        new Thread(r).start();
    }
}

class AccountSync{
    private int balance = 1000;

    public int getBalance(){
        return balance;
    }

    public synchronized void withdraw(int money){
        if(balance >= money){
            try{
                Thread.sleep(1000);
            }catch (InterruptedException e){

            }
            balance -= money;
        }
    }

}

class RunnableEx22 implements Runnable{

    AccountSync acc = new AccountSync();

    public void run(){
        while(acc.getBalance() > 0){
            int money = (int)(Math.random()*3+1)*100;
            acc.withdraw(money);
            System.out.println("balance:"+acc.getBalance());
        }

    }
}

/* output

balance:700
balance:600
balance:400
balance:100
balance:0
balance:0

Process finished with exit code 0

 */

다음은 acc 객체에 대해 동기화해준 상황이다. 오류가 나지 않고 잘 수행되었다. 각 쓰레드가 순차적으로 인출하여 오류가 나지 않게 되었다.

wait()와 notify()

단순히 synchronized로 락을 걸면, 쓰레드가 작업을 한동안 진행할 수 없는 상황임에도 락을 가지게 되어 비효율적인 프로그램을 만들 수 있다.

이때 wait()와 notify()를 사용하여 더 효율적으로 프로그램을 개선할 수 있다.

오랜기간 대기해야 되는 상황에서 wait()를 사용하여 쓰레드가 락을 반납하게 되고, notify()를 호출하면 대기하고 있던 쓰레드가 다시 락을 얻어 작업을 진행할 수 있게된다.

근데 문제가 있다. 기아현상이 일어날 수 있다는 것이다. notify()는 오래 기다린 쓰레드에게 락을 준다는 보장이 없기 때문에, 이러한 문제가 발생할 수 있다.

또한, reader와 writer가 있고, 이들이 각각 wait()를 하고 notify()를 할 때, 우리가 당연히 생각해보면 writer는 reader를 notify해서 읽게 할 것이고, reader는 writer에게 책을 다 읽었으니 쓰라고 notify() 할 텐데, 문제는 이 둘을 구별할 수 없어 누가 깨어날지를 운에 맞기게 되는 프로그램을 자게 된다. 이 때문에 조건으로 동기화 하는 방법을 더 많이 쓰게 된다.

Lock과 Condition을 이용한 동기화

lock 클래스들이 있으며 이들의 종류는 다음과 같다.

ReentrantLock : 재진입이 가능한 lock, 가장 일반적인 배타lock

ReentrantReadWriteLock : 읽기에는 공유적이고, 쓰기에는 배타적인 lock

StampedLock : ReentrantReadWriteLock에 낙관적인 lock의 기능을 추가

SampedLock의 '낙관적 읽기 lock'은 쓰기 lock에 바로 풀린다.

ReentrantLock에는 두 가지 생성자가 있다.

ReentrantLock()

ReentrantLock(boolean fair)

fair가 true이면 오래 기다린 순으로 공정하게 lock을 부여하나, 이는 오버헤드가 생긴다.

lock을 관리하는 메서드에는 다음과 같은 것들이 있다.

void lock() : lock을 잠근다.

void unlock() : lock을 해지한다.

boolean isLocked() : lock이 잠겼는지 확인한다.

lock을 사용하는 방법은 다음과 같다.

lock.lock();
/* 임계영역 */
lock.unlock();

lock()은 무한정 기다리는데 반에, 이로 인한 비효율성을 줄이기 위한 trylock()도 있다.

boolean tryLock()

boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException

tryLock()은 lock을 획득하면 true, 그렇지 않으면 false를 리턴한다.

아래의 메서드는 interrupt()에 의해 작업이 취소될 수 있도록 코드를 작성할 수 있다.

ReentranLock과 Condition

Condition은 이미 생성된 lock으로 부터 newCondition()을 호출하여 생성한다.

대표적인 함수로 await()signal()이 있다.

await()는 해당 condition의 큐에 이 메서드를 호출한 쓰레드를 대기시키는 것이고, signal()은 해당 condition의 큐에 대기하는 쓰레드중 하나를 깨우는 것이다.

import java.util.ArrayList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadWaitEx1 {
    public static void main(String[] args) throws Exception{
        Table table = new Table();

        new Thread(new Cook(table), "COOK1").start();
        new Thread(new Customer(table, "donut"), "CUST1").start();
        new Thread(new Customer(table, "burger"), "CUST2").start();

        Thread.sleep(2000);
        System.exit(0);

    }
}

class Customer implements Runnable{
    private Table table;
    private String food;

    Customer(Table table, String food){
        this.table = table;
        this.food = food;
    }

    public void run(){
        while(true){
            try{
                Thread.sleep(10);
            }catch (InterruptedException e){

            }
            String name = Thread.currentThread().getName();

            table.remove(food);
            System.out.println(name + " ate a " + food);
        }
    }

}

class Cook implements Runnable{
    private Table table;

    Cook(Table table){
        this.table = table;
    }

    public void run(){
        while(true){
            int idx = (int)(Math.random()*table.dishNum());
            table.add(table.dishNames[idx]);
            try{
                Thread.sleep(1);
            }catch (InterruptedException e){}
        }
    }
}

class Table{
    String[] dishNames = {"donut", "donut", "burger"};
    final int MAX_FOOD = 6;

    private ArrayList<String> dishes = new ArrayList<>();

    private ReentrantLock lock = new ReentrantLock();
    private Condition forCook = lock.newCondition();
    private Condition forCust = lock.newCondition();

    public synchronized void add(String dish){
        lock.lock();
        try {
            while (dishes.size() >= MAX_FOOD) {
                String name = Thread.currentThread().getName();
                System.out.println(name + " is waiting.");
                try {
                    forCook.await();
                    Thread.sleep(500);
                } catch (InterruptedException e) {

                }
            }
            dishes.add(dish);
            forCust.signal();
            System.out.println("Dishes:" + dishes.toString());
        }finally {
            lock.unlock();
        }
    }


    public void remove(String dishName){
        lock.lock();
        try {
            synchronized (this) {
                String name = Thread.currentThread().getName();
                while (dishes.size() == 0) {
                    System.out.println(name + " is waiting");
                    try {
                        forCust.await();
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                    }
                }
                while (true) {
                    for (int i = 0; i < dishes.size(); i++) {
                        if (dishName.equals(dishes.get(i))) {
                            dishes.remove(i);
                            forCook.signal();
                            return;
                        }
                    }
                    try {
                        System.out.println(name + " is waiting.");
                        forCook.await();
                        Thread.sleep(500);
                    } catch (InterruptedException e) {

                    }
                }
            }
        }finally {
            lock.unlock();
        }
    }

    public int dishNum(){
        return dishNames.length;
    }
}



/* output

Dishes:[burger]
Dishes:[burger, burger]
Dishes:[burger, burger, donut]
Dishes:[burger, burger, donut, burger]
Dishes:[burger, burger, donut, burger, donut]
Dishes:[burger, burger, donut, burger, donut, burger]
CUST1 ate a donut
CUST2 ate a burger
Dishes:[burger, burger, donut, burger, donut]
Dishes:[burger, burger, donut, burger, donut, burger]
CUST2 ate a burger
CUST1 ate a donut
Dishes:[burger, donut, donut, burger, burger, donut]
CUST2 ate a burger
CUST1 ate a donut
Dishes:[donut, burger, burger, donut, donut]
Dishes:[donut, burger, burger, donut, donut, donut]
CUST1 ate a donut
CUST2 ate a burger
Dishes:[burger, burger, donut, donut, donut, donut]
CUST2 ate a burger
CUST1 ate a donut
Dishes:[burger, donut, donut, donut, donut]
Dishes:[burger, donut, donut, donut, donut, burger]
CUST1 ate a donut
CUST2 ate a burger
Dishes:[donut, donut, donut, burger, donut]
Dishes:[donut, donut, donut, burger, donut, donut]
CUST2 ate a burger
CUST1 ate a donut
Dishes:[donut, donut, burger, donut, donut]
Dishes:[donut, donut, burger, donut, donut, donut]
CUST1 ate a donut
CUST2 ate a burger
Exception in thread "COOK1" java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911)
    at java.util.ArrayList$Itr.next(ArrayList.java:861)
    at java.util.AbstractCollection.toString(AbstractCollection.java:461)
    at Table.add(ThreadWaitEx1.java:77)
    at Cook.run(ThreadWaitEx1.java:58)
    at java.lang.Thread.run(Thread.java:748)
CUST2 failed to eat. :(
CUST1 ate a donut

Process finished with exit code 0

 */

/* output2

Dishes:[burger]
Dishes:[burger, donut]
Dishes:[burger, donut, burger]
Dishes:[burger, donut, burger, donut]
Dishes:[burger, donut, burger, donut, donut]
Dishes:[burger, donut, burger, donut, donut, burger]
CUST2 ate a burger
CUST1 ate a donut
Dishes:[burger, donut, donut, burger, donut]
Dishes:[burger, donut, donut, burger, donut, burger]
CUST1 ate a donut
Dishes:[donut, burger, donut, burger, donut]
CUST2 ate a burger
Dishes:[donut, burger, donut, burger, donut, donut]
CUST2 ate a burger
Dishes:[donut, donut, burger, donut, donut, donut]
CUST1 ate a donut
Dishes:[donut, burger, donut, donut, donut, burger]
CUST1 ate a donut
Dishes:[burger, donut, donut, donut, burger, burger]
CUST2 ate a burger
Dishes:[donut, donut, donut, burger, burger, donut]
CUST2 ate a burger
Dishes:[donut, donut, donut, burger, donut, donut]
CUST1 ate a donut
Dishes:[donut, donut, burger, donut, donut, donut]
CUST2 ate a burger
Dishes:[donut, donut, donut, donut, donut, donut]
CUST1 ate a donut
Dishes:[donut, donut, donut, donut, donut, burger]
CUST2 ate a burger
Dishes:[donut, donut, donut, donut, donut, donut]
CUST1 ate a donut
Dishes:[donut, donut, donut, donut, donut, donut]
CUST2 failed to eat. :(
CUST1 ate a donut
Dishes:[donut, donut, donut, donut, donut, donut]
CUST2 failed to eat. :(
CUST1 ate a donut
Dishes:[donut, donut, donut, donut, donut, burger]

Process finished with exit code 0

 */

위의 코드를 설명하면 다음과 같다. 먼저 Table은 손님과 요리사의 공유자원이다. 따라서 lock이 걸리게 된다.

요리사가 테이블을 사용할 때는 add()를 호출한다. add()에서 요리사는 Table에 lock을 걸고 들어간다. 테이블에 요리가 가득 차 있다면 요리사를 대기시키고 lock을 반납한 뒤, 추후에 signal()이 호출되면 일어난다. 요리에 빈 자리가 있다면 요리를 추가하고 손님을 깨우고 lock을 반납한다.

손님은 테이블을 사용할 때 remove()를 호출한다. remove()에서 손님은 Table에 lock을 걸고 들어간다. 테이블에 요리가 없다면 손님은 lock을 반납한 뒤 요리사의 손님에 대한 signal()에 의해 깨어난다. 손님은 요리를 먹고 요리가 없다고 오리사를 호출한 뒤 lock을 반납한다.

volatile

코어의 캐시와 메모리의 정보는 일치하지 않을 수 있다. 이를 동기화 하면서 사용하기 위해 volatile 키워드를 사용하여, 메모리의 정보를 읽어와 동기화 해줄 수 있다.

변수를 읽거나 쓰는 함수에 synchronized 블럭을 사용해도 같은 효과를 얻을 수 있다.

JVM은 데이터를 4 byte (32-bit) 단위로 처리하는데, 이 때 long, double은 한 번에 읽지 못하고 이 때 다른 쓰레드가 끼어들어와 문제가 생길 여지가 있다. 이 때 long, double 변수를 선언할 때 다음과 같이 volatile 키워드를 붙여주면 연산을 원자화 할 수 있다.

같은 의미로, synchronized 키워드도 함수를 원자화 할 수 있다.

volatile long balance;

synchronized int getbalance(){
    return balance;
}

fork & join 프레임워크

fork & join 프레임워크는 하나의 작업을 여러 개로 쪼개서 병렬로 처리할 수 있도록 만들어 준다. 수행할 작업에 따라 두 개의 클래스를 이용한다.

RecursiveAction : 변환값이 없는 작업을 구현할 때 사용

RecursiveTask : 변환값이 있는 작업을 구현할 때 사용

두 클래스 모두 compute()라는 추상 클래스를 가지며 상속을 통해 이를 재귀로 구현하기만 하면 된다.

class SumTask extends RecursiveTask<Long>{

    public long compute(){

    }
}

그 후, 다음과 같이 작업을 수행한다.

ForkJoinPool pool = new ForkJoinPool(); // 쓰레드 풀 생성
SumTask task = new SumTask(from, to);  // 작업 생성
Long result = pool.invoke(task)       // invole()를 호출하여 작업 시작

작업을 수행하는 과정에서 각 쓰레드가 똑같은 양을 일을 할 수 있도록 서로 작업을 훔쳐온다. (work stealing)

이는 쓰레드 풀에 의해서 관리된다.

fork()와 join()

fork() : 해당 작업을 쓰레드 풀의 작업 큐에 넣는다. 비동기 메서드

join() : 해당 작업의 수행이 끝날 때 까지 기다렸다가, 끝나면 결과 반환. 동기 메서드

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinEx1 {

    static final ForkJoinPool pool = new ForkJoinPool();

    public static void main(String[] args){
        long from = 1l, to = 100_000_000L;

        SumTask task = new SumTask(from, to);

        long start = System.currentTimeMillis();
        Long result = pool.invoke(task);
        System.out.println("Elapsed time(4 Core) : " + (System.currentTimeMillis() - start));

        System.out.printf("sum of %d~%d\n", from, to, result);
        System.out.println();
        result = 0L;
        start = System.currentTimeMillis();
        for(long i=from; i<=to; i++)
            result += i;
        System.out.println("Elapsed time(1core) : " + (System.currentTimeMillis() - start));

    }

}

class SumTask extends RecursiveTask<Long> {
    long from, to;
    SumTask(long from, long to){
        this.from = from;
        this.to = to;
    }
    public Long compute(){
        long size = to - from + 1;
        if(size <= 5)
            return sum();

        long half = (from+to)/2;

        SumTask leftSum = new SumTask(from, half);
        SumTask rightSum = new SumTask(half+1, to);
        leftSum.fork();
        return rightSum.compute() + leftSum.join();
    }

    long sum(){
        long tmp = 0L;
        for(long i=from; i<=to; i++)
            tmp += i;
        return tmp;
    }
}

/* output

Elapsed time(4 Core) : 491
sum of 1~100000000

Elapsed time(1core) : 606

Process finished with exit code 0

 */

위 코드를 보자. 수행될 작업을 재쉬적으로 생성하고, 쓰레드 풀을 생성한 뒤, 쓰레드 풀에 작업을 넣고 스레드풀이 작업을 하도록 시작한다.

compute()에서 왼쪽 작업은 큐에 넣어지고 오른쪽 작업은 compute() 가 수행된다. 오른쪽 작업이 끝나면 큐에 넣은 왼쪽 작업이 수행될 때 까지 기다리게 된다.

출처

JAVA의 정석 (3rd Edition), 남궁 성