programming erlang ch19 ch20
Jun 2, 2013
shared state concurrency
- 동일한 메모리를 공유하면서 처리
- 병행월드로 넘어오면서 재앙발생
- 잠금 매커니즘 필요. 복잡해짐
message passing concurrency
- 가변 데이터 구조가 없음. (완전히 참은 아니지만 충분히 참이라고 강조)
- 잠금 매커니즘이 필요없음. 쉽고 간단.
- 많은 프로세스를 사용하세요
- CPU 는 항상 빡세게 돌려야 효율적.
- side effect 를 피해주세요
- 동시에 같은 메모리를 쓰면 재앙발생.
- erlang 은 메모리 공유를 안해요
- -> 사실 2가지 방법이 있다눈....
- 순차적병목 (Sequential bottleneck) 을 피해주세요
- 우리는 나서 -> 살고 -> 죽는다. 이걸 병렬로 할수는 없어요
메세지로 data 와 처리결과를 주고받음으로써 병렬화
map(_, []) -> []; map(F, [H|T]) -> [F(H) | map(F,T)]. }}} * pmap ( Parallel map ) : List 의 인수 딱 1개씩만 병렬로 평가하여 취합 (취합순서고려함) {{{#!gcode pmap(F, L) -> S = self(), %% make_ref() returns a unique reference %% we'll match on this later Ref = erlang:make_ref(), Pids = map(fun(I) -> spawn(fun() -> do_f(S, Ref, F, I) end) end, L), %% gather the results gather(Pids, Ref). do_f(Parent, Ref, F, I) -> Parent ! {self(), Ref, (catch F(I))}. gather([Pid|T], Ref) -> receive {Pid, Ref, Ret} -> [Ret|gather(T, Ref)] end; gather([], _) -> [].
pmap1 : 취합순서고려하지 않는 버전
pmap1(F, L) -> S = self(), Ref = erlang:make_ref(), foreach(fun(I) -> spawn(fun() -> do_f1(S, Ref, F, I) end) end, L), %% gather the results gather1(length(L), Ref, []). do_f1(Parent, Ref, F, I) -> Parent ! {Ref, (catch F(I))}. gather1(0, _, L) -> L; gather1(N, Ref, L) -> receive {Ref, Ret} -> gather1(N-1, Ref, [Ret|L]) end.
언제 pmap 을 사용할수 있나요?
- 처리되는 작업량이 적으면 pmap 사용하지 마세요
- 프로세스가 너무 많이 생성될경우를 피해주세요
- 조금 더 추상화해서 사용하세요. 취합순서 고려
SMP Erlang (Symmetric Multi Processing)
erl -smp +S 스케쥴러갯수N
N을 따로 설정하지 않으면 머신의 CPU 갯수가 기본값으로 설정됨.
||Erlang (BEAM) emulator version 5.6.3 {{{#red [smp:2]}}} [async-threads:0]|| ||Eshell V5.6.3 (abort with ^G)|| ||1>||
-smp 옵션을 이용하여 가상 멀티코어CPU 를 emulate 하기위한 스크립트
for( $i = 1; $i < 32; $i++ ){ print "$i \n"; # CPU 갯수를 1부터 32개까지 늘려가면서 ptests:tests(CPU갯수) 수행 system("erl -boot start_clean -noshell -smp +S $i -s ptests tests $i >> results.txt"); } set i = 1 FOR /L %%i IN (1,1,5) DO erl -boot start_clean -noshell -smp +S %%i -s ptests tests %%i >> results.txt
테스트 프로그램
-module(ptests). -export([tests/1, fib/1]). -import(lists, [map/2]). -import(lib_misc, [pmap/2]). tests([N]) -> Nsched = list_to_integer(atom_to_list(N)), run_tests(1, Nsched). run_tests(N, Nsched) -> case test(N) of stop -> init:stop(); Val -> io:format("~p.~n",[{Nsched, Val}]), run_tests(N+1, Nsched) end. test(1) -> %% Make 100 lists %% Each list contains 1000 random integers seed(), S = lists:seq(1,100), L = map(fun(_) -> mkList(1000) end, S), {Time1, S1} = timer:tc(lists, map, [fun lists:sort/1, L]), {Time2, S2} = timer:tc(lib_misc, pmap, [fun lists:sort/1, L]), {sort, Time1, Time2, equal(S1, S2)}; test(2) -> %% L = [27,27,27,..] 100 times L = lists:duplicate(100, 27), {Time1, S1} = timer:tc(lists, map, [fun ptests:fib/1, L]), {Time2, S2} = timer:tc(lib_misc, pmap, [fun ptests:fib/1, L]), {fib, Time1, Time2, equal(S1, S2)}; test(3) -> stop. %% Equal is used to test that map and pmap compute the same thing equal(S,S) -> true; equal(S1,S2) -> {differ, S1, S2}. %% recursive (inefficent) fibonacci fib(0) -> 1; fib(1) -> 1; fib(N) -> fib(N-1) + fib(N-2). %% Reset the random number generator. This is so we %% get the same sequence of random numbers each time we run %% the program seed() -> random:seed(44,55,66). %% Make a list of K random numbers %% Each random number in the range 1..1000000 mkList(K) -> mkList(K, []). mkList(0, L) -> L; mkList(N, L) -> mkList(N-1, [random:uniform(1000000)|L]).
map + reduce
map : mapping 프로세스가 {Key,Value} 쌍의 스트림 생성해서 reduce 프로세스로 보내기
reduce : reduce 프로세스는 같은 키를 지닌 쌍끼리 결합하는 방식으로 병합
-module(phofs). -export([mapreduce/4]). -import(lists, [foreach/2]). %% F1(Pid, X) -> sends {Key,Val} messages to Pid %% F2(Key, [Val], AccIn) -> AccOut mapreduce(F1, F2, Acc0, L) -> S = self(), Pid = spawn(fun() -> reduce(S, F1, F2, Acc0, L) end), receive {Pid, Result} -> Result end. reduce(Parent, F1, F2, Acc0, L) -> process_flag(trap_exit, true), ReducePid = self(), %% Create the Map processes %% One for each element X in L foreach(fun(X) -> spawn_link(fun() -> do_job(ReducePid, F1, X) end) end, L), N = length(L), %% make a dictionary to store the Keys Dict0 = dict:new(), %% Wait for N Map processes to terminate Dict1 = collect_replies(N, Dict0), Acc = dict:fold(F2, Acc0, Dict1), Parent ! {self(), Acc}. %% collect_replies(N, Dict) %% collect and merge {Key, Value} messages from N processes. %% When N processes have terminated return a dictionary %% of {Key, [Value]} pairs collect_replies(0, Dict) -> Dict; collect_replies(N, Dict) -> receive {Key, Val} -> case dict:is_key(Key, Dict) of true -> Dict1 = dict:append(Key, Val, Dict), collect_replies(N, Dict1); false -> Dict1 = dict:store(Key,[Val], Dict), collect_replies(N, Dict1) end; {'EXIT', _, Why} -> collect_replies(N-1, Dict) end. %% Call F(Pid, X) %% F must send {Key, Value} messsages to Pid %% and then terminate do_job(ReducePid, F, X) -> F(ReducePid, X).
mapreduce 예제
map : generate_words -> 파일에서 word 를 찾기
reduce : count_words -> word 의 사용 빈도수
-module(test_mapreduce). -compile(export_all). -import(lists, [reverse/1, sort/1]). test() -> wc_dir("."). wc_dir(Dir) -> F1 = fun generate_words/2, F2 = fun count_words/3, Files = lib_find:files(Dir, "*.erl", false), L1 = phofs:mapreduce(F1, F2, [], Files), reverse(sort(L1)). generate_words(Pid, File) -> F = fun(Word) -> Pid ! {Word, 1} end, lib_misc:foreachWordInFile(File, F). count_words(Key, Vals, A) -> [{length(Vals), Key}|A].
- "이것은 복잡한 프로그램입니다.(이 책에서 가장 복잡하다.)"
- "모두 합해 코드가 약 1200줄이나 되는 관계로 나는 여기 그 코드를 전부 포함시켜 자라나는 새싹을 밟고 싶지는 않다" by 조 암스트롱
- 얼랭으로 새로운 스타일의 프로그램 만들기를 즐겨보자