Skip to content

programming erlang ch19 ch20

Jongbin Oh edited this page Jun 2, 2013 · 1 revision

19장 멀티코어 서곡

병행성의 두가지 모델

  • shared state concurrency

    • 동일한 메모리를 공유하면서 처리
    • 병행월드로 넘어오면서 재앙발생
      • 잠금 매커니즘 필요. 복잡해짐
  • message passing concurrency

    • 가변 데이터 구조가 없음. (완전히 참은 아니지만 충분히 참이라고 강조)
    • 잠금 매커니즘이 필요없음. 쉽고 간단.

20장 멀티코어CPU프로그래밍

멀티코어 CPU 에서 효율적으로 실행되는 프로그램 만들기

  1. 많은 프로세스를 사용하세요
  • CPU 는 항상 빡세게 돌려야 효율적.
  1. side effect 를 피해주세요
  • 동시에 같은 메모리를 쓰면 재앙발생.
  • erlang 은 메모리 공유를 안해요
    • -> 사실 2가지 방법이 있다눈....
  1. 순차적병목 (Sequential bottleneck) 을 피해주세요
  • 우리는 나서 -> 살고 -> 죽는다. 이걸 병렬로 할수는 없어요

프로그램 병렬화 시키기

  • 메세지로 data 와 처리결과를 주고받음으로써 병렬화

  • map

      map(_, []) -> [];
      map(F, [H|T]) -> [F(H) | map(F,T)].
      }}}
       * pmap ( Parallel map ) : List 의 인수 딱 1개씩만 병렬로 평가하여 취합 (취합순서고려함)
      {{{#!gcode
      pmap(F, L) -> 
          S = self(),
          %% make_ref() returns a unique reference
          %%   we'll match on this later
          Ref = erlang:make_ref(), 
          Pids = map(fun(I) -> 
      		       spawn(fun() -> do_f(S, Ref, F, I) end)
      	       end, L),
          %% gather the results
          gather(Pids, Ref).
      
      do_f(Parent, Ref, F, I) ->					    
          Parent ! {self(), Ref, (catch F(I))}.
      
      gather([Pid|T], Ref) ->
          receive
      	{Pid, Ref, Ret} -> [Ret|gather(T, Ref)]
          end;
      gather([], _) ->
          [].
    
  • pmap1 : 취합순서고려하지 않는 버전

      pmap1(F, L) -> 
          S = self(),
          Ref = erlang:make_ref(),
          foreach(fun(I) -> 
      		    spawn(fun() -> do_f1(S, Ref, F, I) end)
      	    end, L),
          %% gather the results
          gather1(length(L), Ref, []).
      
      do_f1(Parent, Ref, F, I) ->					    
          Parent ! {Ref, (catch F(I))}.
      
      gather1(0, _, L) -> L;
      gather1(N, Ref, L) ->
          receive
      	{Ref, Ret} -> gather1(N-1, Ref, [Ret|L])
          end.
    
  • 언제 pmap 을 사용할수 있나요?

    1. 처리되는 작업량이 적으면 pmap 사용하지 마세요
    2. 프로세스가 너무 많이 생성될경우를 피해주세요
    3. 조금 더 추상화해서 사용하세요. 취합순서 고려

메세지는 작게, 계산은 크게

  • SMP Erlang (Symmetric Multi Processing)

    1. erl -smp +S 스케쥴러갯수N

    2. N을 따로 설정하지 않으면 머신의 CPU 갯수가 기본값으로 설정됨.

      ||Erlang (BEAM) emulator version 5.6.3 {{{#red [smp:2]}}} [async-threads:0]|| ||Eshell V5.6.3 (abort with ^G)|| ||1>||

  • -smp 옵션을 이용하여 가상 멀티코어CPU 를 emulate 하기위한 스크립트

    • runtests.pl

      for( $i = 1; $i < 32; $i++ ){
      	print "$i \n";
          # CPU 갯수를 1부터 32개까지 늘려가면서 ptests:tests(CPU갯수) 수행
      	system("erl -boot start_clean -noshell -smp +S $i -s ptests tests $i >> results.txt");
      	}
      
      set i = 1
      
      FOR /L %%i IN (1,1,5) DO erl -boot start_clean -noshell -smp +S %%i -s ptests tests %%i >> results.txt
      
  • 테스트 프로그램

      -module(ptests).
      -export([tests/1, fib/1]).
      -import(lists, [map/2]).
      -import(lib_misc, [pmap/2]).
      
      tests([N]) ->
          Nsched = list_to_integer(atom_to_list(N)),
          run_tests(1, Nsched).
      
      run_tests(N, Nsched) ->
          case test(N) of
      	stop ->
      	    init:stop();
      	Val ->
      	    io:format("~p.~n",[{Nsched, Val}]),
      	    run_tests(N+1, Nsched)
          end.
      
      test(1) ->
          %% Make 100 lists 
          %%   Each list contains 1000 random integers
          seed(),
          S = lists:seq(1,100),
          L = map(fun(_) -> mkList(1000) end, S),
          {Time1, S1} = timer:tc(lists,    map,  [fun lists:sort/1, L]),
          {Time2, S2} = timer:tc(lib_misc, pmap, [fun lists:sort/1, L]),
          {sort, Time1, Time2, equal(S1, S2)};
      
      test(2) ->
          %% L = [27,27,27,..] 100 times
          L = lists:duplicate(100, 27), 
          {Time1, S1} = timer:tc(lists,    map,  [fun ptests:fib/1, L]),
          {Time2, S2} = timer:tc(lib_misc, pmap, [fun ptests:fib/1, L]),
          {fib, Time1, Time2, equal(S1, S2)};
      
      test(3) ->
          stop.
      
      %% Equal is used to test that map and pmap compute the same thing
      equal(S,S)   -> true;
      equal(S1,S2) ->  {differ, S1, S2}.
       
      %% recursive (inefficent) fibonacci
      fib(0) -> 1;
      fib(1) -> 1;
      fib(N) -> fib(N-1) + fib(N-2).
      
      %% Reset the random number generator. This is so we
      %% get the same sequence of random numbers each time we run
      %% the program
      seed() -> random:seed(44,55,66).
      
      %% Make a list of K random numbers
      %%    Each random number in the range 1..1000000
      mkList(K) -> mkList(K, []).
      
      mkList(0, L) -> L;
      mkList(N, L) -> mkList(N-1, [random:uniform(1000000)|L]).
    

병렬화 예제 1. mapreduce

  • map + reduce

    • map : mapping 프로세스가 {Key,Value} 쌍의 스트림 생성해서 reduce 프로세스로 보내기

    • reduce : reduce 프로세스는 같은 키를 지닌 쌍끼리 결합하는 방식으로 병합

      -module(phofs).
      -export([mapreduce/4]).
      
      -import(lists, [foreach/2]).
      
      %% F1(Pid, X) -> sends {Key,Val} messages to Pid
      %% F2(Key, [Val], AccIn) -> AccOut
      
      mapreduce(F1, F2, Acc0, L) ->
          S = self(),
          Pid = spawn(fun() -> reduce(S, F1, F2, Acc0, L) end),
          receive
      	{Pid, Result} ->
      	    Result
          end.
      
      reduce(Parent, F1, F2, Acc0, L) ->
          process_flag(trap_exit, true),
          ReducePid = self(),
          %% Create the Map processes
          %%   One for each element X in L
          foreach(fun(X) -> 
      		    spawn_link(fun() -> do_job(ReducePid, F1, X) end)
      	    end, L),
          N = length(L),
          %% make a dictionary to store the Keys
          Dict0 = dict:new(),
          %% Wait for N Map processes to terminate
          Dict1 = collect_replies(N, Dict0),
          Acc = dict:fold(F2, Acc0, Dict1),
          Parent ! {self(), Acc}.
      
      %% collect_replies(N, Dict)
      %%     collect and merge {Key, Value} messages from N processes.
      %%     When N processes have terminated return a dictionary
      %%     of {Key, [Value]} pairs
      
      collect_replies(0, Dict) ->
          Dict;
      collect_replies(N, Dict) ->
          receive
      	{Key, Val} ->
      	    case dict:is_key(Key, Dict) of
      		true ->
      		    Dict1 = dict:append(Key, Val, Dict),
      		    collect_replies(N, Dict1);
      		false ->
      		    Dict1 = dict:store(Key,[Val], Dict),
      		    collect_replies(N, Dict1)
      	    end;
      	{'EXIT', _,  Why} ->
      	    collect_replies(N-1, Dict)
          end.
      
      %% Call F(Pid, X)
      %%   F must send {Key, Value} messsages to Pid
      %%     and then terminate
      
      do_job(ReducePid, F, X) ->
          F(ReducePid, X).
      
  • mapreduce 예제

    • map : generate_words -> 파일에서 word 를 찾기

    • reduce : count_words -> word 의 사용 빈도수

      -module(test_mapreduce).
      -compile(export_all).
      -import(lists, [reverse/1, sort/1]).
      
      test() ->
          wc_dir(".").
      
      wc_dir(Dir) ->
          F1 = fun generate_words/2,
          F2 = fun count_words/3,
          Files = lib_find:files(Dir, "*.erl", false),
          L1 = phofs:mapreduce(F1, F2, [], Files),
          reverse(sort(L1)).
      
      generate_words(Pid, File) ->
          F = fun(Word) -> Pid ! {Word, 1} end,
          lib_misc:foreachWordInFile(File, F).
      
      count_words(Key, Vals, A) ->
          [{length(Vals), Key}|A].
      

병렬화 예제 2. 디스크 색인

  • "이것은 복잡한 프로그램입니다.(이 책에서 가장 복잡하다.)"
  • "모두 합해 코드가 약 1200줄이나 되는 관계로 나는 여기 그 코드를 전부 포함시켜 자라나는 새싹을 밟고 싶지는 않다" by 조 암스트롱

미래로 성장하기

  • 얼랭으로 새로운 스타일의 프로그램 만들기를 즐겨보자
Clone this wiki locally