Skip to content

Latest commit

 

History

History
1073 lines (1059 loc) · 112 KB

스파크 완벽 가이드.md

File metadata and controls

1073 lines (1059 loc) · 112 KB

서평

  • image
  • 이 책은 스파크의 기초적인 내용부터 처리, 운용, 관리, 모니터링, 그리고 머신러닝, 그래프에 이르기까지 다양한 내용을 종합적으로 잘 설명하고 있습니다. 스파크의 기본부터 심화까지 책을 읽고 스파크에 대해 더 깊게 알게 되었으며 스파크에 대해 어느 정도 알고 있는 분이라면 재밌어서 금방 읽게 될 것입니다. 해당 코드는 저자의 깃허브에서도 확인할 수 있으며 Scala와 Pyspark 코드를 통해서 이해하는데 많은 도움이 되었습니다.
  • 책의 유형은 크게 스파크에 대해서, 구조적 API로 Dataset, DataFrame, SQL과 구조적 스트리밍에 관하여 자세하게 설명하고 있고 스파크 튜닝부터 모니터링과 디버깅까지 많은 내용을 담고 있어서 스파크를 공부하는데 있어서 많은 도움이 되었습니다. 또한 스파크가 지원하는 클러스터 매니저는 스탠드 얼론 클러스터 매니저, 아파치 메소스, 하둡 YARN이 있는데 각각의 특징과 고려사항에 대해 알 수 있어서 좋았습니다.
  • 디버깅 및 스파크 응급 처치 부분에서 스파크를 통해 일어날 수 있는 다양한 트러블슈팅 사례에 관해 배울 수 있었으며 실제 운영했을때 비슷한 사례와 잠재적 대응법을 알게 되어서 좋았습니다. 뒷부분에서는 스파크를 활용한 통계, 머신러닝, 딥러닝까지 다루고 있어 데이터 엔지니어링뿐만 아니라 데이터 분석 쪽에서도 유익한 내용을 담고 있습니다.
  • ELT파이프라인을 스파크를 통해서 운영해본 경험이 있는데 실무에서 쓰였던 스파크 튜닝 방식이나 스파크 문법 같은 부분을 다시 보게 되는 계기가 되었고 비슷한 장애 대응 시 어떠한 차이가 있는지 비교하면서 읽었던 거 같습니다. 데이터 엔지니어뿐만 아니라 분석 쪽에서도 업무적으로 스파크를 학습하는 데 많은 도움이 될 거 같습니다.

빅데이터와 스파크 간단히 살펴보기

  • 스파크
    • 하둡의 장점뿐만 아니라 여러 오픈소스가 가진 장점을 함께 가지고 있음
    • 아파치 하이브의 장점인 HQL을 사용할 수 있을 뿐만 아니라 SQL에 친숙한 사용자를 위해 스파크 SQL을 제공함
    • 아파치 스톰의 스트리밍 처리 기술을 지원하는 스파크 스트리밍을 제공함
    • 머신러닝 처리를 위한 MLlib 라이브러리를 제공함
    • 스파크 GraphX를 제공
  • 아파치 스파크
    • 통합 컴퓨팅 엔진이며 클러스터 환경에서 데이터를 병렬로 처리하는 라이브러리 집합 ( 빅데이터를 위한 통합 컴퓨팅 엔진과 라이브러리 집합)
    • (파이썬,자바,스칼라,R)을 지원하며 SQL뿐만 아니라 스트리밍, 머신러닝에 이르기까지 넓은 범위의 라이브러리를 제공함
    • 단일 노트북 환경에서부터 수천 대의 서버로 구성된 클러스터까지 다양한 환경에서 실행함
    • 빅데이터 애플리케이션 개발에 필요한 통합 플랫폼을 제공하자는 목적
    • 간단한 데이터 읽기에서부터 SQL 처리, 머신러닝 그리고 스트림 처리에 이르기까지 다양한 데이터 분석 작업을 같은 연산 엔진과 일관성 있는 API로 수행할 수 있도록 설계
    • 클라우드 기반의 애저 스토리지,아마존 S3, 분산 파일 시스템인 아파치 하웁, 키/값 저장소인 아파치 카산드라, 메시지 전달 서비스인 아파치 카프카 등의 저장소를 지원함
    • SQL과 구조화된 데이터를 제공하는 스파크 SQL, 머신러닝을 지원하는 MLlib, 스트림 처리 기능을 제공하는 스파크 스트리밍과 새롭게 선보인 구조적 스트리밍 그리고 그래프 분석 엔진인 GraphX 라이브러리르 제공
    • 스파크는 스칼라로 구현되어 자바 가상 머신 기반으로 동작함
    • 코드 스니펫(snippet) : 스니펫은 재사용이 가능한 소스 코드나 기계어의 작은 부분을 뜻하는 프로그래밍 용어

스파크 애플리케이션

  • 드라이버(driver) 프로세스와 다수의 익스큐터(executor) 프로세스로 구성됨
  • 정보의 유지 관리, 사용자 프로그램이나 입력에 대한 응답, 전반적인 익스큐터 프로세스의 작업과 관련된 분석, 배포 그리고 스케줄링 역할을 수행
  • 익스큐터 : 드라이버 프로세스가 할당한 작업을 수행, 드라이버가 할당한 코드를 실행하고 진행 상황을 다시 드라이버 노드에 보고하는 두 가지 역할을 수행
  • 언어 API
    • 스칼라 : 스파크는 스칼라로 개발되어 있으므로 스칼라가 스파크의 기본 언어
    • 자바 : 스파크가 스칼라로 개발되어 있지만, 스파크 창시자들은 자바를 이용해 스파크 코드를 작성할 수 있도록 심혈을 기울임
    • 파이썬 : 스칼라가 지원하는 거의 모든 구조를 지원함
    • SQL : ANSI SQL:2003 표준 중 일부를 지원함
    • 스파크 코어에 포함된 SparkR, R 커뮤니티 기반 패키지인 sparklyr
  • 스파크 API
    • 저수준의 비구조적(unstructured) API
    • 고수준의 구조적(structured) API
  • DataFrame
    • 스키마(schema) : 컬럼과 컬럼의 타입을 정의한 목록
    • 스프레드시프트는 한 대의 컴퓨터에 존재, 스파크 DataFrame은 수천 대의 컴퓨터에 분산(단일 컴퓨터에 저장하기에는 데이터가 너무 크거나 계산에 너무 오랜 시간이 걸릴 수 있기 때문)
  • 파티션
    • 스파크는 모든 익스큐터가 병렬로 작업을 수행할 수 있도록 파티션이라 불리는 청크 단위로 데이터를 분할함
    • 파티션 : 클러스터의 물리적 머신에 존재하는 로우의 집합
  • 트랜스포메이션
    • 스파크의 핵심 데이터 구조는 불변성(immutable) : 한번 생성하면 변경할 수 없음
    • DataFrame을 변경하려면 원하는 변경 방법을 스파크에 알려줘야 하는데 이를 트랜스포메이션
    • 좁은 의존성(narrow dependency) : 각 입력 파티션이 하나의 출력 파티션에만 영향을 미침, 파티션이 하나의 출력 파티션에만 영향을 미침
    • 넓은 의존성(wide dependency) : 하나의 입력 파티션이 여러 출력 파티션에 영향을 미침, 스파크가 클러스터에서 파티션을 교환(셔플-shuffle)
    • 좁은 트랜스포메이션을 사용하면 스파크에서 파이프라이닝(pipelining)을 자동으로 수행함, DataFrame에 여러 필터를 지정하는 경우 모든 작업이 메모리에서 일어남
  • 지연 연산(lazy evaluation)
    • 스파크가 연산 그래프를 처리하기 직전까지 기다리는 동작 방식을 의미
    • 특정 연산 명령이 내려진 즉시 데이터를 수정하지 않고 원시 데이터에 적용할 트랜스포메이션의 실행 계획을 생성함
  • 액션
    • 콘솔에서 데이터를 보는 액션
    • 각 언어로 된 네이티브 객체에서 데이터를 모으는 액션
    • 출력 데이터소스에 저장하는 액션
    • 액션을 지정하면 스파크 잡(job) 시작, 스파크 잡은 필터(좁은 트랜스포메이션)를 수행한 후 파티션별로 레코드 수를 카운트(넓은 트랜스포메이션), 각 언어에 적합한 네이티브 객체에 결과를 모음
  • 스파크 UI
    • 스파크 잡의 진행 상황을 모니터링 할 때 사용
    • 스파크 잡의 상태, 환경 설정, 클러스터 상태의 정보를 확인
    • 스파크 잡을 튜닝하고 디버깅할 때 매우 유용

스파크 기능

  • 스파크 라이브러리 : 그래프 분석, 머신러닝, 그리고 스트리밍 등 다양한 작업을 지원하며, 컴퓨팅 및 스토리지 시스템과의 통합을 돕는 역할
  • spark-submit : 대화형 셸에서 개발한 프로그램을 운영용 애플리케이션으로 쉽게 전환, 애플리케이션 코드를 클러스터에 전송해 실행시키는 역할
  • 스파크 애플리케이션은 스탠드얼른,메소스, YARN 클러스터 매니저를 이용해 실행됨
  • 타입 안정성을 제공하는 구조적 API
    • Dataset
      • 자바와 스칼라의 정적 데이터 타입에 맞는 코드
      • 정적 타입 코드(statically typed code)를 지원하기 위해 고안된 스파크의 구조적 API
        • 정적 타입 코드 : 자료형이 고정된 언어, 자바,스칼라,C,C++ 등의 프로그래밍 언어가 정적 타입에 해당
        • 동적 타입 프로그래밍 언어 : 파이썬, 자바스크립트 등
      • Dataset은 타입 안정성을 지원하며 동적 타입 언어인 파이썬과 R에서는 사용할 수 없음
      • Dataset 클래스는 내부 객체의 데이터 타입을 매개변수로 사용
    • DataFrame
      • 다양한 데이터 타입의 테이블형 데이터를 보관할 수 있는 Row 타입의 객체로 구성된 분산 컬렉션
    • Dataset API
      • DataFrame의 레코드르 사용자가 자바나 스칼라로 정의한 클래스에 할당하고 자바의 ArrayList 또는 스칼라의 Seq 객체 등의 고정 타입형 컬렉션으로 다룰 수 있는 기능을 제공함
      • 타입 안정성을 지원하므로 초기화에 사용한 클래스 대신 다른 클래스를 사용해 접근할 수 없음
      • 다수의 소프트웨어 엔지니어가 잘 정의된 인터페이스로 상호작용하는 대규모 애플리케이션을 개발하는 데 특히 유용함
  • 구조적 스트리밍
    • 스파크 2.2 버전에서 안정화(production-ready)된 스트림 처리용 고수준 API
    • 구조적 API로 개발된 배치 모드의 연산을 스트리밍 방식으로 실행, 지연 시간을 줄이고 증분 처리
    • 배치 처리용 코드를 일부 수정하여 스트리밍 처리를 수행하고 값을 빠르게 얻을 수 있다는 장점
    • 프로토타입을 배치 잡으로 개발한 다음 스트리밍 잡으로 변환할 수 있으므로 개념 잡기가 수월함
    • 셔플 파티션 수 : 셔플 이후에 생성될 파티션 수, 기본값은 200이지만 로컬 모드에서는 그렇게 많은 익스큐터가 필요하지 않으므로 5로 변경
      • spark.conf.set("spark.sql.shuffle.partitions","5")
    • 스트리밍 코드
      • read 메서드 대신 readStream 메서드 사용
      • maxFilesPerTrigger : 한 번에 읽을 파일수를 설정함
    • MLlib : 대용량 데이터를 대상으로 전처리(preprocessing), 멍잉(munging - 데이터 랭글링(data wrangling), 원본 데이터를 다른 형태로 변환하거나 매핑하는 과정), 모델 학습(model training) 및 예측(prediction)
  • 저수준 API
    • RDD를 통해 자바와 파이썬 객체를 다루는 데 필요한 다양한 기본 기능을 제공
    • 스파크의 거의 모든 기능은 RDD를 기반
    • 드라이버 시스템의 메모리에 저장된 원시 데이터를 병렬처리(parallelize)하는 데 RDD를 사용

구조적 API: DataFrame, SQL, Dataset

구조적 API 개요

  • 구조적 API : 비정형 로그 파일로부터 반정형 csv파일, 매우 정형적인 파케이(Parquet) 파일까지 다양한 유형의 데이터를 처리
    • 세 가지 분산 컬렉션 API
      • Dataset : 타입형
      • DataFrame : 비타입형
      • SQL 테이블과 뷰
    • 배치작업을 스트리밍 작업으로 손쉽게 변환함
  • 스파크는 트랜스포메이션의 처리 과정을 정의하는 분산 프로그래밍 모델
    • 사용자가 정의한 다수의 트랜스포메이션은 지향성 비순환 그래프(DAG)로 표현되는 명령을 만들어냄
    • 액션은 하나의 잡을 클러스터에서 실행하기 위해 스테이지와 태스크로 나누고 DAG 처리 프로세스를 실행함
    • 트랜스포메이션과 액션으로 다루는 논리적 구조가 바로 DataFrame과 Dataset
  • 스키마 : 분산 컬렉션에 저장할 데이터 타입을 정의하는 방법
  • DataFrame의 컬럼명과 데이터 타입을 정의함
  • 카탈리스트(Catalyst) 엔진 : 실행 계획 수립과 처리에 사용하는 자체 데이터 정보를 가지고 있음, 다양한 실행 최적화 기능을 제공함
  • DataFrame은 Row 타입으로 구성된 Dataset
    • Row 타입 : 스파크가 사용하는 연산에 최적화된 인메모리 포맷의 내부적인 표현 방식
      • 가비지 컬렉션(garbage collection)과 객체 초기화 부하가 있는 JVM 데이터 타입을 사용하는 대신 자체 데이터 포맷을 사용하기 때문에 매우 효율적인 연산이 가능함
  • 컬럼 : 정수형이나 문자열 같은 단순 데이터 타입, 배열이나 맵 같은 복합 데이터 타입, null 값을 표현함
  • 로우 : 데이터 레코드(Dataframe의 각 로우는 하나의 레코드)
  • 구조적 API의 실행 과정
    • 구조적 API 쿼리가 사용자 코드에서 실제 실행 코드로 변환되는 과정
      • DataFrame/Dataset/SQL을 이용해 코드를 작성함
      • 정상적인 코드라면 스파크가 논리적 실행 계획으로 변환함
      • 스파크는 논리적 실행 계획을 물리적 실행 계획으로 변환하며 그 과정에서 추가적인 최적화를 할 수 있는지 확인
      • 스파크는 클러스터에서 물리적 실행 계획(RDD 처리)을 실행

구조적 API 기본 연산

  • DataFrame : Row 타입의 레코드(테이블의 로우 같은)와 각 레코드에 수행할 연산 표현식을 나타내는 여러 컬럼(스프레드시트의 컬럼 같은)으로 구성됨
  • 스키마 : 각 컬러명과 데이터 타입을 정의함(DataFrame의 컬럼명과 데이터 타입을 정의함)
  • 파티셔닝 : DataFrame이나 Dataset이 클러스터에서 물리적으로 배치되는 형태를 정의함
  • 파티셔닝 스키마 : 파티션을 배치하는 방법을 정의함
  • selectExpr 메서드 : 새로운 DataFrame을 생성하는 복잡한 표현식을 간단하게 만드는 도구
  • 리터럴(literal) : 프로그래밍 언어의 리터럴값은 스파크가 이해할 수 있는 값으로 변환함
  • 최적화 기법
    • 자주 필터링하는 컬럼을 기준으로 데이터를 분할함
    • 파티셔닝 스키마와 파티션 수를 포함해 클러스터 전반의 물리적 데이터 구성을 제어함
    • repartition 메서드 : 무조건 전체 데이터를 셔플함, 향후에 사용할 파티션 수가 현재 파티션 수보다 많거나 컬럼을 기준으로 파티션을 만드는 경우에만 사용
      • 특정 컬럼을 기준으로 자주 필터링한다면 자주 필터링되는 컬럼을 기준으로 파티션을 재분배하는것이 좋음
    • coalesce 메서드 : 전체 데이터를 셔플하지 않고 파티션을 병합하는 경우 사용
  • taLocalIterator : 이터레이터(iterator,반복자)로 모든 파티션의 데이터 드라이버에 전달함 , 데이터셋의 파티션을 차례로 반복 처리함

다양한 데이터 타입 다루기

  • DataFrame(Dataset) 메서드
    • Dataset의 하위 모듈은 다양한 메서드를 제공함
      • DataFrameStatFunctions : 다양한 통계적 함수를 제공
      • DataFrameNaFunctions : null 데이터를 다루는 데 필요한 함수를 제공
  • Column 메서드
    • Column은 alias나 contains 같이 컬럼과 관련된 여러가지 메서드를 제공
    • org.apache.spark.sql.functions 패키지는 데이터 타입과 관련된 다양한 함수를 제공함
  • 불리언 구문 : and,or,true,false
  • initcap 함수 : 주어진 문자열에서 공백으로 나뉘는 모든 단어의 첫 글자를 대문자로 변경
  • 날짜와 타임스탬프 데이터
    • 날짜(date) : 달력 형태의 날짜
    • 타임스탬프(timestamp) : 날짜와 시간 정보를 모두 가짐
    • inferSchema 옵션이 활성화된 경우 날짜와 타임스탬프를 포함해 컬럼의 데이터 타입을 최대한 정확하게 식별하려 시도함
  • null 값
    • 명시적으로 null 값을 제거, 전역 또는 컬럼 단위로 null 값을 특정 값으로 채워 넣음
    • coalesce 함수 : 인수로 지정한 여러 컬럼 중 null이 아닌 첫번 째 값을 반환함
  • 사용자 정의함수
    • UDF(user defined function) : 파이썬이나 스칼라 그리고 외부 라이브러리를 사용해 사용자가 원하는 형태로 트랜스포메이션을 만들 수 있게 함
    • 모든 워커 노드에서 생성된 함수를 사용할 수 있도록 스파크에 등록 -> 드라이버에서 함수를 직렬화하고 네트워크를 통해 모든 익스큐터 프로세스로 전달함
    • 스칼라나 자바 -> JVM 환경에서만 사용 가능
    • 워커 노드에서 파이썬 프로세스를 실행하고 파이썬이 이해할 수 있는 포맷으로 모든 데이터를 직렬화(JVM 사용 언어에도 존재했던 부분), 파이썬 프로세스에 있는 데이터의 로우마다 함수를 실행하고 마지막으로 JVM과 스파크에 처리 결과를 반환함
      • 직렬화에 큰 부하가 발생
      • 데이터가 파이썬으로 전달되면 스파크에서 워커 메모리를 관리할 수 없음, JVM과 파이썬이 동일한 머신에서 메모리 경합을 하면 자원에 제약이 생겨 워커가 비정상적으로 종료될 가능성이 있음

집계 연산

  • count 메서드가 트랜스포메이션이 아닌 액션, 결과를 즉시 반환함, count메서드는 데이터셋의 전체 크기를 알아보는 용도로 사용하지만 메모리에 DataFrame 캐싱 작업을 수행하는 용도로 사용됨
  • approx_count_distinct : 어느 정도 수준의 정확도를 가지는 근사치만으로도 유의미, 최대 추정 오류율(maximum estimation error), 대규모 데이터셋을 사용할 때 훨씬 더 좋아짐
  • 비대칭도와 첨도(kurtosis)
    • 데이터의 변곡점(extreme point)을 측정하는 방법
    • 비대칭도(skewness)는 데이터 평균의 비대칭 정도를 측정하고, 첨도는 데이터 끝 부분을 측정함
    • 확률변수(random variable)의 확률 분포(probability distribution)로 데이터를 모델링할 때 특히 중요함
  • 복합 데이터 타입의 집계
    • collect_list() : 특정 컬럼의 값을리스트로 수집
    • collect_set() : 셋 데이터 타입으로 고윳값만 수집
  • 윈도우 함수
    • 랭크 함수(ranking function)
    • 분석 함수(analytic function)
    • 집계 함수(aggregate function)
  • 사용자 정의 집계 함수(user-defined aggregation function, UDAF)
    • 직접 제작한 함수나 비즈니스 규칙에 기반을 둔 자체 집계 함수를 정의하는 방법
    • 입력 데이터 그룹에 직접 개발한 연산을 수행
    • 입력 데이터의 모든 그룹의 중간 결과를 단일 AggregationBuffer에 저장해 관리함

조인

  • 스파크의 조인 수행 방식
    • 노드간 네트워크 통신 전략
      • 스파크는 조인 시 두가지 클러스터 통신 방식을 활용함
      • 전체 노드간 통신을 유발 셔플 조인(shuffle join),
      • 그렇지 않은 브로드캐스트 조인(broadcast join) , 작은 DataFrame을 클러스터의 전체 워커 노드에 복제하는 것, 대구모 노드 간 통신이 발생하지만 그 이후로는 노드 사이에 추가적인 통신이 발생하지 않음, 모든 단일 노드에서 개별적으로 조인이 수행되므로 CPU가 가장 큰 병목 구간, 너무 큰 데이터를 브로드캐스트하면 고비용의 수집 연산이 발생하므로 드라이버 노드가 비정상적으로 종료됨
      • 내부 최적화 기술은 시간이 흘러 비용 기반 옵티마이저(cost-based optimizer, CBO)가 개선되고 더 나은 통신 전략이 도입되는 경우 바뀔 수 있음
    • 노드별 연산 전략

데이터소스

  • 스파크의 핵심 데이터소스
    • CSV, JSON, 파케이, ORC, JDBC/ODBC 연결, 일반 텍스트 파일
  • 데이터 소스
    • 카산드라, HBase, 몽고디비, AWS Redshift, XML, 기타 수많은 데이터소스
  • 스파크에서 데이터를 읽을 때는 기본적으로 DataFrameReader를 사용
    • 포맷
    • 스키마
    • 읽기 모드
    • 옵션
  • 읽기 모드 : 스파크가 형식에 맞지 않는 데이터를 만났을 때의 동작 방식을 지정하는 옵션
    • permissive : 오류 레코드의 모든 필드를 null로 설정하고 모든 오류 레코드를 _corrupt_record라는 문자열 컬럼에 기록함
    • dropMalformed : 형식에 맞지 않는 레코드가 포함된 로우를 제거함
    • failFast : 형식에 맞지 않는 레코드를 만나면 즉시 종료함
  • 쓰기 API 구조
    • DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
  • 저장 모드 : 스파크가 지정된 위치에서 동일한 파일이 발견했을 때의 동작 방식을 지정하는 옵션
    • append : 해당 경로에 이미 존재하는 파일 목록에 결과 파일을 추가함
    • overwrite : 이미 존재하는 모든 데이터를 완전히 덮어씀
    • errorIfExists : 해당 경로에 데이터나 파일이 존재하는 경우 오류를 발생시키면서 쓰기 작업이 실패함
    • ignore : 해당 경로에 데이터나 파일이 존재하는 경우 아무런 처리도 하지 않음
  • CVS 파일(comma-separated values)
    • 콤마(,)로 구분된 값을 의미
    • 각 줄이 단일 레코드가 되며 레코드의 각 필드를 콤마로 구분하는 일반적인 텍스트 파일 포맷
  • JSON 파일
    • 자바스크립트 세상에서 온 파일 형식, 자바스크립트 객체 표기법(JSON, JavaScript Object Notation)
    • 줄로 구분된 JSON을 기본적으로 사용함
    • multiLine 옵션을 사용해 줄로 구분된 방식과 여러 줄로 구성된 방식을 선택적으로 사용
    • 줄로 구분된 JSON이 인기있는 이유는 구조화되어 있고, 최소한의 기본 데이터 타입이 존재하기 때문
  • 파케이 파일
    • 다양한 스토리지 최적화 기술을 제공하는 오픈소스로 만들어진 컬럼 기반의 데이터 저장 방식, 분석 워크로드에 최적화
    • 저장소 공간을 절약할 수 있고 전체 파일을 읽는 대신 개별 컬럼을 읽을 수 있으며, 컬럼 기반의 압축 기능을 제공함
  • ORC 파일
    • 하둡 워크로드를 위해 설계된 자기 기술적(self-describing)이며 데이터 타입을 인식할 수 있는 컬럼 기반의 파일 포맷
    • 대규모 스트리밍 읽기에 최적화되어 있을 뿐만 아니라 필요한 로우를 신속하게 찾아낼 수 있는 기능이 통합되어 있음
  • SQL 데이터베이스
    • 데이터베이스의 데이터를 읽고 쓰기 위해서는 스파크 클래스패스(classpath)에 데이터베이스의 JDBC(Java DataBase Connectivity) 드라이버를 추가하고 적절한 JDBC 드라이버 jar 파일을 제공함 -JDBC 데이터 소스 옵션
      • url : 접속을 위한 JDBC URL
      • dbtable : 읽을 JDBC 테이블을 설정함
      • driver : 지정한 URL에 접속할 때 사용할 JDBC 드라이버 클래스명을 지정함
      • partitionColumn, lowerBound, upperBound : numPartitions도 반드시 지정, 다수의 워커에서 병렬로 테이블을 나눠 읽는 방법을 정의함, partitionColumn은 반드시 해당 테이블의 수치형 컬럼이여야 함, lowerBound와 upperBound는 테이블의 로우를 필터링하는 데 사용되는 것이 아니라 각 파티션의 범위를 결정하는 데 사용함, 테이블의 모든 로우는 분할되어 반환됨
      • numPartitions : 테이블의 데이터를 병렬로 읽거나 쓰기 작업에 사용할 수 있는 최대 파티션 수를 결정함
      • fetchsize : 한 번에 얼마나 많은 로우를 가져올지 결정하는 JDBC의 패치 크기를 결정함
      • batchsize : 한 번에 얼마나 많은 로우를 저장할지 결정하는 JDBC의 배치 크기를 설정함
      • isolationLevel : 현재 연결에 적용되는 트랜잭션 격리 수준을 정의함
      • truncate : JDBC writer 관련 옵션
      • createTableOptions : 테이블 생성 시 특정 테이블의 데이터베이스와 파티션 옵션을 설정할 수 있음
      • createTableColumnTypes : 테이블을 생성할 때 기본값 대신 사용할 데이터베이스 컬럼 데이터 타입을 정의함
  • 데이터베이스 병렬로 읽기
    • 스파크는 파일 크기, 파일 유형 긔고 압축 방식에 따른 분할 가능성에 따라 여러 파일을 읽어 하나의 파티션으로 만들거나 여러 파티션을 하나의 파일로 만드는 기본 알고리즘을 가지오 있음
    • numPartitions 옵션을 사용해 읽기 및 쓰기용 동시 작업 수를 제한할 수 있는 최대 파티션 수를 설정할 수 있음
  • 슬라이딩 윈도우 기반의 파티셔닝
    • 스파크는 데이터베이스에 병렬로 쿼리를 요청하며 numPartitions에 설정된 값만큼 파티션을 반환함, 파티션에 값을 할당하기 위해 상한값과 하한값을 수정함
  • 고급 I/O 개념
    • 쓰기 작업 전에 파티션 수를 조절함으로써 병렬로 처리할 파일 수를 제어
    • 버켓팅과 파티셔닝을 조절함으로써 데이터의 저장 구조를 제어함
    • 분할 가능한 파일 타입과 압축 방식
      • 특정 파일 포맷은 기본적으로 분할을 지원함
      • 스파크에서 전체 파일이 아닌 쿼리에 필요한 부분만 읽을 수 있으므로 성능 향상에 도움이 됨
      • 하둡 분산 파일 시스템(HDFS) 같은 시스템을 사용한다면 분할된 파일을 여러 블록으로 나누어 분산 저장하기 때문에 훨씬 더 최적화
    • 병렬로 데이터 읽기
      • 사용 가능한 익스큐터를 이용해 병렬(익스큐터 수를 넘어가는 파일은 처리 중인 파일이 완료될 때까지 대기)로 파일을 읽음
    • 병렬로 데이터 쓰기
      • 파일이나 데이터 수는 데이터를 쓰는 시점에 DataFrame이 가진 파티션 수에 따라 달라질 수 있음
      • 기본적으로 데이터 파티션당 하나의 파일이 작성됨
  • 파티셔닝
    • 파티셔닝은 어떤 데이터를 어디에 저장할 것인지 제어할 수 있는 기능
    • 파티셔닝된 디렉터리 또는 테이블에 파일을 쓸 때 디렉터리별로 컬럼 데이터를 인코딩해 저장함
    • 데이터를 읽을 때 전체 데이터셋을 스캔하지 않고 필요한 컬럼의 데이터만 읽을 수 있음
  • 버켓팅(bucketing)
    • 각 파일에 저장된 데이터를 제어할 수 있는 또 다른 파일 조직화 기법
    • 동일한 버킷 ID를 가진 데이터가 하나의 물리적 파티션에 모두 모여있기 때문에 데이터를 읽을 때 셔플을 피할 수 있음
    • 데이터가 이후의 사용 방식에 맞춰 사전에 파티셔닝되므로 조인이나 집계 시 발생하는 고비용의 셔플을 피할 수 있음
    • csvFile.write.format("parquet").mode("overwrite").bucketBy(numberBuckets(10), columToBucketBy("count)).saveAsTable("bucketedFiles")
  • 파일 크기 관리
    • 작은 파일을 많이 생성하면 메타데이터에 엄청난 관리 부하가 발생함 -> 작은 크기의 파일 문제
    • 몇 개의 로우가 필요하더라도 전체 데이터 블록을 읽어야 하기 때문에 비효율적, 너무 큰 파일도 좋지 않음
    • maxRecordsPerFile 옵션에 파일당 레코드 수를 지정, 각 파일에 기록될 레코드를 수를 조절할 수 있으므로 파일 크기를 더 효과적으로 제어함

스파크 SQL

  • 스파크 SQL을 사용해 데이터베이스에 생성된 뷰(view)나 테이블에 SQL쿼리를 실행함
  • 시스템 함수를 사용하거나 사용자 정의 함수를 정의
  • 스파크 SQL은 DataFrame과 Dataset API에 통합, 데이터 변환 시 SQL과 DataFrame의 기능을 모두 사용할 수 있으며 두 방식 모두 동일한 실행 코드로 컴파일됨
  • SQL
    • SQL 또는 구조적 질의 언어(Structured Query Language)는 데이터에 대한 관계형 연산을 표현하기 위한 도메인 특화 언어
    • 스파크가 등장하기 전에는 하이브(Hive)가 빅데이터 SQL 접근 계층에서 사실상의 표준
    • 스파크2.0 버전에는 하이브를 지원할 수 있는 상위 호환 기능으로 ANSI-SQL과 HiveQL을 모두 지원하는 자체 개발된 SQL 파서가 포함됨
    • SQL 분석가들은 쓰르피트 서버(Thrift Server)나 SQL 인터페이스에 접속해 스파크의 연산 능력을 활용함
    • 전체 데이터 처리 파이프라인에 스파크 SQL을 사용함
    • 통합형 API는 SQL로 데이터를 조회하고 DataFrame으로 변환한 다음 스파크의 MLlib이 제공하는 대규모 머신러닝 알고리즘 중 하나를 사용해 수행한 결과를 다른 데이터소스에 저장하는 전체 과정을 가능하게 만듬
    • 스파크 SQL은 온라인 트랜잭션 처리(online transaction processing, OLTP) 데이터베이스가 아닌 온라인 분석용(online analytic processing,OLAP) 데이터베이스로 동작함, 매우 낮은 지연 시간이 필요한 쿼리를 수행하기 위한 용도로 사용할 수 없음
    • 스파크 SQL CLI(Command Line Interface - 명령행 인터페이스)는 로컬 환경의 명령행에서 기본 스파크 SQL 쿼리를 실행할 수 있는 편리한 도구
    • 다른 트랜스 포메이션과 마찬가지로 즉시 실행되지 않고 지연 처리됨, DataFrame을 사용하는 것보다 SQL 코드로 표현하기 훨씬 쉬운 트랜스포메이션이기 때문에 엄청나게 강력한 인터페이스
    • 함수에 여러 줄로 구성된 문자열을 전달할 수 있으므로 여러 줄로 구성된 쿼리를 아주 간단히 표현함
    • 자바 데이터베이스 연결(Java Database Connectivity, JDBC) 인터페이스를 제공함
    • 카탈로그
      • 테이블에 저장된 데이터에 대한 메타데이터뿐만 아니라 데이터베이스, 테이블, 함수 그리고 뷰에 대한 정보를 추상화함
      • 테이블, 데이터베이스 그리고 함수를 조회하는 등 여러 가지 유용한 함수를 제공함
  • 스파크 관리형 테이블
    • 메타데이터 : 테이블의 데이터와 테이블에 대한 데이터
    • 스파크는 데이터뿐만 아니라 파일에 대한 메타데이터를 관리함
    • 기존 테이블에 여러 트랜스포메이션 작업을 지정함
    • 뷰를 사용하면 쿼리 로직을 체계화하거나 재사용하기 편하게 만들 수 있음
    • 임시 뷰 : 테이블처럼 데이터베이스에 등록되지 않고 현재 세션에서만 사용할 수 있음
    • 전역적 임시 뷰(global temp view) : 데이터베이스에 상관없이 사용할 수 있으므로 전체 스파크 애플리케이션에서 볼 수 있음, 세션이 종료되면 뷰도 사라짐
  • 데이터베이스 : 여러 테이블을 조직화하기 위한 도구
  • 서브쿼리
    • 상호 연관 서브쿼리(correlated subquery) : 서브쿼리의 정보를 보완하기 위해 쿼리의 외부 범위에 있는 일부 정보를 사용할 수 있음
    • 비상호 연관 서브쿼리(uncorrelated subquery) : 외부 범위에 있는 정보를 사용하지 않음
    • 스파크는 값에 따라 필터링할 수 있는 조건절 서브쿼리(predicate subquery)도 지원함

Dataset

  • Dataset은 구조적 API의 기본 데이터 타입, 고수준의 구조적 API와 저수준 RDD API가 조합된 형태
  • DataFrame은 Row 타입의 Dataset
  • Dataset은 자바 가상 머신을 사용하는 언어인 스칼라와 자바에서만 사용할 수 있음, Dataset을 사용해 데이터셋의 각 로우를 구성하는 객체를 정의함
  • 인코더 : 도메인별 특정 객체 T를 스파크의 내부 데이터 타입으로 매핑하는 시스템을 의미함
  • Dataset을 사용할 시기
    • DataFrame 기능만으로는 수행할 연산을 표현할 수 없는 경우
    • 성능 저하를 감수하더라도 타입 안정성(type-safe)을 가진 데이터 타입을 사용하고 싶은 경우
    • 복잡한 비즈니스 로직을 SQL이나 DataFrame 대신 단일 함수로 인코딩해야 하는 경우
    • 타입이 유효하지 않은 작업은 런타임이 아닌 컴파일 타임에 오류가 발생함
    • Dataset API를 사용하면 잘못된 데이터로부터 애플리케이션을 보호할 수 는 없지만 보다 우하하게 데이터를 제어하고 구조화 함
    • 단일 노드의 워크로드와 스파크 워크로드에서 전체 로우에 대한 다양한 트랜스포메이션을 재사용하려면 Dataset을 사용하는 것이 적합함
    • Dataset을 사용하는 장점 중 하나는 로컬과 분산 환경의 워크로드에서 재사용, 케이스 클래스로 구현된 데이터 타입을 사용해 모든 데이터와 트랜스포메이션을 정의하면 재사용할 수 있음
    • 올바른 클래스와 데이터 타입이 지정된 DataFrame을 로컬 디스크에 저장하면 다음 처리 과정에서 사용할 수 있어 더 쉽게 데이터를 다룰 수 있음
  • Dataset 생성
    • 자바:Encoders
      • 자바 인코더(Encoders) : 데이터 타입 클래스를 정의한 다음 DataFrame(Dataset 타입의)에 지정해 인코딩
    • 스칼라: 케이스 클래스
      • 스칼라에서 Dataset을 생성하려면 스칼라 case class 구문을 사용해 데이터 타입을 정의해야 함
      • 케이스 클래스의 특징 및 정규 클래스(regular class)
        • 불변성
        • 패턴 매칭으로 분해 가능
        • 참조값 대신 클래스 구조를 기반으로 비교
        • 사용하기 쉽고 다루기 편함
      • 케이스 클래스는 불변성을 가지며 값 대신 구조로 비교할 수 있음
        • 불변성이므로 객체들이 언제 어디서 변경되었는지 추적할 필요가 없음
        • 값으로 비교하면 인스턴스를 마치 원시(primitive) 데이터 타입의 값처럼 비교함, 클래스 인스턴스가 값으로 비교되는지, 참조로 비교되는지 더는 불확실해하지 않아도 됨
        • 패턴 매칭은 로직 분기를 단순화해 버그를 줄이고 가독성을 좋게 만듬

저수준 API

RDD

  • 저수준 API : RDD, SparkContext 그리고 어큐뮬레이터(accumulator)와 브로드캐스트 변수(broadcast variable) 같은 분산형 공유 변수(distributed shared variables)등을 의미함
    • 분산 데이터 처리를 위한 RDD
    • 브로드캐스트 변수와 어큐뮬레이터처럼 분산형 공유 변수를 배포하고 다루기 위한 API
    • 사용 시기
      • 고수준 API에서 제공하지 않는 기능이 필요한 경우(클러스터의 물리적 데이터의 배치를 아주 세말하게 제어해야 하는 상황)
      • RDD를 사용해 개발된 기존 코드를 유지해야 하는 경우
      • 사용자가 정의한 공유 변수를 다뤄야 하는 경우
    • SparkContext는 저수준 API 기능을 사용하기 위한 진입 지점
  • RDD
    • 스파크 1.x 버전의 핵심 API
    • 사용자가 실행한 모든 DataFrame이나 Dataset 코드는 RDD로 컴파일됨
    • RDD는 불변성을 가지며 병렬로 처리할 수 있는 파티셔닝된 레코드의 모음
    • DataFrame의 각 레코드는 스키마를 알고 있는 필드로 구성된 구조화된 로우인 반면, RDD의 레코드는 그저 프로그래머가 선택한 자바, 스칼라, 파이썬의 객체일뿐
    • RDD API는 Dataset과 유사하지만 RDD는 구조화된 데이터 엔진을 사용해 데이터를 저장하거나 다루지 않음, RDD와 Dataset 사이의 전환은 매우 쉬우므로 두 API를 모두 사용해 각 API의 장점을 동시에 활용함
    • DataFrame API에서 최적화된 물리적 실행 계획을 만드는 데 대부분 사용됨
    • 제니링 RDD타입, 키 기반의 집계가 가능한 키-값 RDD
    • 주요 속성
      • 파티션의 목록
      • 각 조각을 연산하는 함수
      • 다른 RDD와의 의존성 목록
      • 부가적으로 키-값 RDD를 위한 Partitioner(RDD는 해시 파티셔닝되어 있다고 말함)
      • 부가적으로 각 조각을 연산하기 위한 기본 위치(하둡 분산 파일 시스템 파일의 블록 위치)
    • RDD는 분산 환경에서 데이터를 다루는 데 필요한 지연 처리 방식의 트랜스포메이션과 즉시 실행 방식의 액션을 제공, DataFrame과 Dataset의 트랜스포메이션, 액션과 동일한 방식으로 동작함
    • 스칼라와 자바를 사용하는 경우에는 대부분 비슷한 성능, 원형 객체를 다룰 때는 큰 성능 손실이 발생함, 파이썬을 사용해 RDD를 다룰 때는 상당한 성능 저하가 발생, 파이썬으로 RDD를 실행하는 것은 파이썬으로 만들어진 사용자 정의 함수를 사용해 로우마다 적용하는 것과 동일함, 직렬화 과정을 거친 데이터를 파이썬 프로세스에 전달하고, 파이썬에서 처리가 끝나면 다시 직렬화하여 자바 가상 머신에 반환함, 파이썬을 사용해 RDD를 다룰때는 높은 오버헤드가 발생함
    • RDD에는 많은 강점이 있지만 구조적 API가 제공하는 여러 최적화 기법을 사용할 수 없음, DataFrame은 RDD보다 더 효율적이고 안정적이며 표현력이 좋음
    • 물리적으로 분산된 데이터(자체적으로 구상한 데이터 파티셔닝)에 세부적인 제어가 필요할 때 RDD를 사용하는 것이 가정 적합함
    • Dataset은 구조적 API가 제공하는 풍부한 기능과 최적화 기법을 제공한다는 것이 가장 큰 차이점
  • 로컬 컬렉션으로 RDD 생성하기
    • 컬렉션 객체를 RDD로 만들려면 SparkContext와 parallelize메서드를 호출해야함, 단일 노드에 있는 컬렉션을 병렬 컬렉션으로 전환함, 파티션 수를 명시적으로 지정함
  • 액션
    • 지정된 트랜스포메이션 연산을 시작하려면 액션을 사용함
    • 액션은 데이터를 드라이버로 모으거나 외부 데이터소스로 내보낼 수 있음
  • 파일 저장하기
    • saveAsTextFile : 텍스트 파일로 저장하려면 경로를 지정
    • 시퀀스 파일 : 시퀀스 파일은 바이너리 키-값 쌍으로 구성된 플랫 파일, 맵리듀스의 입출력 포맷으로 널리 사용됨
    • 하둡 파일 : 하둡 파일 포맷을 사용하면 클래스, 출력 포맷, 하둡 설정 그리고 압축 방식을 지정함
  • 캐싱 : RDD를 캐시하거나 저장(persist)할 수 있음
  • 체크포인팅 : 체크포인팅은 RDD를 디스크에 저장하는 방식
  • RDD를 시스템 명령으로 전송하기
    • pipe 메서드는 스파크의 흥미로운 메서드 중 하나, pipe 메서드를 사용하면 파이핑 요소로 생성된 RDD를 외부 프로세스로 전달함
    • 각 입력 파티션의 모든 요소는 개행 문자 단위로 분할되어 여러 줄의 입력 데이터로 변경된 후 프로세스의 표준 입력(stdin)에 전달됨
    • 결과 파티션은 프로세스의 표준 출력(stdout)으로 생성됨
  • foreachPartition
    • 각 파티션의 데이터를 데이터베이스에 저장하는 것과 같이 개별 파티션에서 특정 작업을 수행하는 데 매우 적합한 함수

RDD 고급 개념

  • 키-값 형태의 기초(키-값 형태의 RDD)
    • RDD에는 데이터를 키-값 형태로 다룰 수 있는 다양한 메서드가 존재, <연산명>ByKey 형태의 이름을 가짐
    • 메서드 이름에 ByKey가 있다면 PairRDD 타입만 사용할 수 있음
  • 파티션 제어하기
    • RDD를 사용하면 데이터가 클러스터 전체에 물리적으로 정확히 분산되는 방식을 정의함
    • 파티션 함수를 파라미터로 사용할 수 있다는 사실
    • coalesce
      • 파티션을 재분배할 때 발생하는 데이터 셔플을 방지하기 위해 동일한 워커에 존재하는 파티션을 합치는 메서드
    • repartition
      • 파티션 수를 늘리거나 줄일 수 있지만, 처리 시 노드 간의 셔플이 발생할 수 있음
      • 파티션 수를 늘리면 맵 타입이나 필터 타입의 연산을 수행할 때 병렬 처리 수준을 높일 수 있음
    • repartitonAndSortWithinPartitions
      • 파티션을 재분배할 수 있고, 재분배된 결과 파티션의 정렬 방식을 지정할 수 있음
    • 사용자 정의 파티셔닝(custom partitioning)
      • 저수준 API의 세부적인 구현 방식
      • 잡이 성공적으로 동작되는지 여부에 상당한 영향을 미침
      • 페이지 랭크(PageRank) : 클러스터의 데이터 배치 구조를 제어하고 셔플을 회피함
      • 데이터 치우침(skew) 같은 문제를 피하고자 클러스터 전체에 걸쳐 데이터를 균등하게 분배함
      • HashPartitioner와 RangePartitioner는 RDD API에서 사용할 수 있는 내장형 파티셔너, 이산형과 연속형 값을 다룰 때 사용함
        • 키 치우침 : 매우 큰 데이터나 심각하게 치우친 키를 다뤄야 한다면 고급 파티셔닝 기능을 사용해야함
        • 어떤 키가 다른 키에 비해 아주 많은 데이터를 가지는 현상을 의미함
        • 병렬성을 개선하고 실행 과정에서 OutOfMemoryError를 방지 할 수 있도록 키를 최대한 분할함
        • 키가 특정 형태를 띠는 경우에는 키를 분할함
  • 사용자 정의 직렬화
    • Kryo 직렬화 : 병렬화 대상인 모든 객체나 함수는 직렬화 할 수 있어야함

분산형 공유 변수

  • 분산형 공유 변수 : 브로드캐스트 변수와 어큐뮬레이터, 클러스터에서 실행할 때 특별한 속성을 가진 사용자 정의 함수(예: RDD나 DataFrame을 다루는 map함수)에서 이 변수를 사용할 수 있음
    • 어큐뮬레이터 : 모든태스크의 데이터를 공유 결과에 추가할 수 있음, 잡의 입력 레코드를 파싱하면서 얼마나 많은 오류가 발생했는지 확인하는 카운터를 구현함
      • 트랜스포메이션 내부의 다양한 값을 갱신하는데 사용함
      • 로우 단위로 안전하게 값을 갱신할 수 있는 변경 가능한 변수를 제공함
      • 디버깅용이나 저수준 집계 생성용으로 사용할 수 있음, 파티션별로 특정 변수의 값을 추적하는 용도로 사용할 수 있으며 시간이 흐를수록 더 유용하게 사용됨
      • 어큐뮬레이터의 값은 액션을 처리하는 과정에서만 갱신됨
      • 어큐뮬레이터를 만드는 가장 간단한 방법은 SparkContext를 사용하는것, 직접 어큐뮬레이터를 생성하고 이름을 붙여 등록할 수도 있음
    • 브로드캐스트 변수 : 모든 워커 노드에 큰 값을 저장하므로 재전송 없이 많은 스파크 액션에서 재사용할 수 있음
      • 변하지 않는 값(불변성 값)을 클로저(closure) 함수의 변수로 캡슐화하지 않고 클러스터에서 효율적으로 공유하는 방법을 제공함
      • 모든 태스크마다 직렬화하지 않고 클러스터의 모든 머신에 캐시하는 불변성 공유 변수, 익스큐터 메모리 크기에 맞는 조회용 테이블을 전달하고 함수에서 사용하는 것이 대표적
      • suppBroadcast 변수를 이용해 참조함, 불변성이며 액션을 실행할 때 클러스터의 모든 노드에 지연 처리 방식으로 복제됨

운영용 애플리케이션

클러스터에서 스파크 실행하기

  • 스파크 애플리케이션 아키텍처
    • 스파크 드라이버
      • 스파크 애플리케이션의 운전자 역할을 하는 프로세스
      • 스파크 애플리케이션의 실행을 제어하고 스파크 클러스터(익스큐터의 상태와 태스크)의 모든 상태 정보를 유지함
      • 물리적 컴퓨팅 자원 확보와 익스큐터 실행을 위해 클러스터 매니저와 통신할 수 있어야 함
      • 물리적 머신의 프로세스이며 클러스터에서 실행 중인 애플리케이션의 상태를 유지함
    • 스파크 익스큐터
      • 스파크 드라이버가 할당한 태스크를 수행하는 프로세스
      • 드라이버가 할당한 태스크를 받아 실행하고 태스크의 상태와 결과(성공 또는 실패)를 드라이버에 보고함, 모든 스파크 애플리케이션은 개별 익스큐터 프로세스를 사용함
    • 클러스터 매니저
      • 스파크 애플리케이션을 실행할 클러스터 머신을 유지함
      • 드라이버(마스터)와 워커라는 개념을 가짐
      • 프로세스가 아닌 물리적인 머신에 연결되는 개념
      • 애플리케이션이 실행되는 머신을 관리함
  • 스파크가 지원하는 클러스터 매니저
    • 스탠드 얼론 클러스터 매니저
    • 아파치 메소스
    • 하둡 YARN
  • 실행 모드
    • 애플리케이션을 실행할 때 요청한 자원의 물리적인 위치를 결정함
    • 클러스터 모드
      • 가장 흔하게 사용되는 스파크 애플리케이션 실행 방식
      • 컴파일된 JAR 파일이나 파이썬 스크립트 또는 R 스크립트를 클러스터 매니저에 전달
      • 클러스터 매니저는 파일을 받은 다음 워커 노드에 드라이버와 익스큐터프로세스를 실행함
      • 클러스터 매니저는 모든 스파크 애플리케이션과 관련된 프로세스를 유지하는 역할을 함
    • 클라이언트 모드
      • 애플리케이션을 제출한 클라이언트 머신에 스파크 드라이버가 위치한다는 것을 제외하면 클러스터 모드와 비슷함
      • 클라이언트 머신은 스파크 드라이버 프로세스를 유지하며 클러스터 매니저는 익스큐터 프로세스를 유지함
      • 게이트웨이 머신(gateway machine) 또는 에지 노드(edge node)
    • 로컬 모드
      • 모든 스파크 애플리케이션은 단일 머신에서 실행됨
      • 애플리케이션의 병렬 처리를 위해 단일 머신의 스레드를 활용함
      • 스파크를 학습하거나 애플리케이션 테스트 그리고 개발 중인 애플리케이션을 반복적으로 실험하는 용도로 주로 사용됨
  • 스파크 애플리케이션의 생애주기
    • SparkSession은 클러스터 매니저와 통신해 스파크 익스큐터 프로세스의 실행을 요청함
    • 스파크와 스파크 SQL 컨텍스트를 new SparkContext 패턴을 사용해서 만드는 것보다 안전하게 생성함
    • 스파크 애플리케이션에서 다수의 라이브러리가 세션을 생성하려는 상황에서 컨텍스트 충돌을 방지할 수 있음
    • SparkSession을 사용해 모든 저수준 API, 기존 컨텍스트 그리고 관련 설정 정보에 접근할 수 있음
    • SparkContext
      • RDD 같은 스파크의 저수준 API 사용할 수 있음
    • 스테이지
      • 스파크의 스테이지는 다수의 머신에서 동일한 연산을 수행하는 태스크의 그룹
      • 가능한 한 많은 태스크 (잡의 트랜스포메이션)를 동일한 스테이지로 묶으려는 노력함
      • 셔플은 데이터의 물리적 재분배 과정
      • spark.sql.shuffle.partitions 속성의 기본값은 200
      • 스파크 잡이 실행되는 도중에 셔플을 수행하면 기본적으로 200개의 셔플 파티션을 생성함
      • 클러스터의 익스큐터 수보다 파티션 수를 더 크게 지정하는 것이 좋음
    • 태스크
      • 스파크의 스테이지는 태스크로 구성됨
      • 각 태스크는 단일 익스큐터에서 실행할 데이터의 블록과 다수의 트랜스포메이션 조합으로 볼 수 있음
  • 세부 실행 과정
    • 스파크는 map 연산 후 다른 map 연산이 이어진다면 함께 실행할 수 있도록 스테이지와 태스크를 자동으로 연결함
    • 스파크는 모든 셔플을 작업할 때 데이터를 안정적인 저장소(예:디스크)에 저장하므로 여러 잡에서 재사용할 수 있음
    • 파이프라이닝
      • 스파크가 수행하는 주요 최적화 기법 중 하나는 RDD나 RDD보다 더 아래에서 발생하는 파이프라이닝 기법
      • 파이프라이닝 기법 : 노드 간의 데이터 이동 없이 각 노드가 데이터를 직접 공급할 수 있는 연산만 모아 태스크의 단일 스테이지로 만듬
    • 셔플 결과 저장(shuffle persistence)
      • reduce-by-key 연산 같이 노드 간 복제를 유발하는 연산을 실행하면 엔진에서 파이프라이닝을 수행하지 못하므로 네트워크 셔플이 발생함
      • 노드 간 복제를 유발하는 연산은 각 키에 대한 입력 데이터를 먼저 여러 노드로부터 복사함
      • 부작용은 이미 셔플된 데이터를 이용해 새로운 잡을 실행하면 소스와 관련된 셔플이 다시 실행되지 않는다는 것
      • 스파크 UI와 로그 파일에서 skipped라고 표시된 사전 셔플 스테이지(pre-shuffle stage)를 확인할 수 있음
      • 자동 최적화 기능은 동일한 데이터를 사용해 여러 잡을 실행하는 워크로드의 시간을 절약할 수 있음
      • 더 나은 성능을 얻기 위해 DataFrame이나 RDD의 cache 메서드를 사용할 수 있음
      • 사용자가 직접 캐싱을 수행할 수 있으며 정확히 어떤 데이터가 어디에 저장되는지 제어할 수 있음

스파크 애플리케이션 개발하기

  • 스파크 애플리케이션 작성하기
    • 스파크 애플리케이션은 스파크 클러스터와 사용자 코드 두 가지 조합으로 구성됨
    • 스칼라는 스파크의 기본 언어이기 때문에 애플리케이션을 개발하는 가장 적합한 방법
    • 두 가지 자바 가상 머신 기반의 빌드 도구인 sbt나 아파치 메이븐을 이용해 빌드함
    • sbt 빌드 환경을 구성하려면 패키지 정보를 관리하기 위해 build,sbt 파일을 정의해야함
      • 프로젝트 메타데이터(패키지명, 패키지 버전 정보 등)
      • 라이브러리 의존성을 관리하는 장소
      • 라이브러리에 포함된 의존성 정보
    • 코드를 실행하려면 스칼라나 자바의 메인 클래스 역할을 하는 파이선 파일을 작성해야함
      • SparkSession을 생성하는 실행 가능한 스크립트 파일을 만들어야 함
    • SparkSession을 한 번만 초기화하고 런타임 환경에서 함수와 클래스에 전달하는 방식을 사용하면 테스트 중에 SparkSession을 쉽게 교체할 수 있음
    • 어떤 팀이나 프로젝트에서는 개발 속도를 올리기 위해 덜 엄격한 SQL과 DataFrame API를 사용할 수 있고 다른 팀에서는 타입 안전성을 얻기 위해 Dataset과 RDD API를 사용할 수 있음
  • 애플리케이션 시작하기
    • spark-submit 명령으로 스파크 잡을 제출할 때는 클라이언트 모드와 클러스터 모드 중 하나를 선택해야 함
    • 드라이버와 익스큐터 간의 지연 시간을 줄이기 위해 클러스터 모드로(또는 클러스터 장비 중 하나에서 클라이언트 모드로) 실행할 것을 추천함
  • 애플리케이션 환경 설정하기
    • 시스템 설정
      • 스파크 속성은 대부분의 애플리케이션 파라미터를 제어하며 SparkConf객체를 사용해 스파크 속성을 설정할 수 있음
      • 자바 시스템 속성
      • 하드코딩된 환경 설정 파일
    • SparkConf
      • SparkConf는 애플리케이션의 모든 설정을 관리함
      • 스파크 애플리케이션에서 생성된 SparkConf 객체는 불변성
      • SparkConf 객체는 개별 스파크 애플리케이션에 대한 스파크 속성값을 구하는 용도로 사용함, 스파크 속성값은 스파크 애플리케이션의 동작 방식과 클러스터 구성 방식을 제어함
      • spark.executor.cores(각 익스큐터의 코어 수)와 spark.files.maxPartitionBytes(파일 읽기 시 파티션의 최대 크기)
  • 애플리케이션에서 잡 스케줄링
    • 스파크 애플리케이션에서 별도의 스레드를 사용해 여러 잡을 동시에 실행할 수 있음
    • 스파크의 스케줄러는 스레드 안정성을 충분히 보장함
    • 여러 요청(예:다수의 사용자가 쿼리를 요청하는 경우)을 동시에 처리할 수 있는 애플리케이션을 만들 수 있음
    • 스파크의 스케줄러는 FIFO 방식으로 동작함, 큐의 전단(head)에 있는 잡이 클러스터의 전체 자원을 사용하지 않으면 이후 잡을 바로 실행할 수 있음, 큐의 전단에 있는 잡이 너무 크면 이후 잡은 아주 늦게 실행됨
    • 스파크는 모든 잡이 클러스터 자원을 거의 동일하게 사용할 수 있도록 라운드 로빈(round-robin) 방식으로 여러 스파크 잡의 태스크를 할당함, 장시간 수행되는 스파크 잡이 처리되는 중에 짧게 끝난 스파크 잡이 제출된 경우 즉시 장시간 수행되는 스파크 잡의 자원을 할당받아 처리함
    • 장시간 수행되는 스파크 잡의 종료를 기다리지 않고 빠르게 응당할 수 있음
    • 페어 스케줄러(fair scheduler)
      • spark.scheduler.mode 속성을 FAIR로 지정
      • 여러 개의 잡을 풀(pool)로 그룹화하는 방식도 지원함
      • 개별 풀에 다른 스케줄링 옵션이나 가중치를 설정함
      • 더 중요한 스파크 잡을 할당할 수 있도록 우선순위가 높은 풀을 만들 수 있음
      • 각 잡에 같은 양의 자원을 할당하는 대신 각 사용자의 스파크 잡을 그룹화 함

스파크 배포 환경

  • 스파크 애플리케이션 실행을 위한 클러스터 환경
    • 설치형 클러스터(on-premise cluster)
      • 자체 데이터 센터를 운영하는 조직에 더욱 적합함, 그 밖의 모든 상황에서 설치형 클러스터 환경을 사용하면 트레이드오프가 존재함
      • 설치형 클러스터를 구축하면 사용 중인 하드웨어를 완전히 제어할 수 있으므로 특정 워크로드의 성능을 최적화할 수 있음
      • 스파크 같은 데이터 분석 워크로드에서 몇 가지 문제가 발생할 수 있음
        • 설치형 클러스터의 크기는 제한적, 분산 워크로드에 필요한 자원은 상황에 따라 달라짐, 클러스터를 너무 작게 만들면 신규 머신러닝 모델을 학습하는 잡을 실행하기 어려울 수 있음, 반대로 클러스터를 너무 크게 만들면 사용하지 않는 자원이 많아짐
        • 설치형 클러스터는 하웁 파일 시스템(HDFS)이나 분산 키-값 저장소 같은 자체 저장소 시스템을 선택하고 운영해야 함, 지리적 복제(geo replication) 및 재해 복구(disaster recovery) 체계도 함께 구축해야함
      • 자원 활용 문제를 해결할 수 있는 가장 좋은 방법은 클러스터 매니저를 사용하는 것, 클러스터 매니저를 사용하면 다수의 스파크 애플리케이션을 실행할 수 있고 애플리케이션의 자원을 동적으로 재할당할 수 있음, 하나의 클러스터에서 스파크 애플리케이션이 아닌 다른 프로그램을 실행할 수 있음
      • YARN과 메소스는 동적 자원 공유를 스탠드얼론 모드보다 잘 지원하며 스파크 이외의 애플리케이션도 실행할 수 있음, 공개 클라우드를 사용하면 애플리케이션에 맞는 규모의 클러스터를 얻을 수 있음
      • 여러 종류의 저장소를 선택할 수 있음
        • 스파크는 하웁의 HDFS 같은 분산 파일 시스템과 아파치 카산드라 같은 키-값 저장소 시스템을 가장 많이 사용함, 데이터를 수집하는 용도로 아파치 카프카 같은 스트리밍 메시지 버스 시스템(streaming message bus system)을 사용함, 모든 시스템은 관리, 백업 그리고 지리적 복제 기능이 자체적으로 구현되어 있거나 상용 서드 파티 도구를 사용해 다양한 수준으로 지원함
        • 저장소를 선택할 때는 스파크 커텍터의 성능을 시험하고 관리 도구를 사용할 수 있는지 확인해야 함
    • 공개 클라우드(public cloud)
      • 장점
        • 자원을 탄력적으로 늘이고 줄이는 것이 가능함, 몇 시간 동안 수백 대의 머신을 활용해야 하는 괴물 같은 잡을 실행한다고 해도 사용한 만큼의 비용만 지불하면 됨, 일반적인 연산을 수행하는 경우에도 애플리케이션마다 다른 유형의 머신과 클러스터 규모를 선택할 수 있기 때문에 가격 대비 뛰어난 성능을 얻을 수 있음, 딥러닝 작업이 필요한 경우에만 GPU(Graphics Processing Units) 인스턴스를 할당받아 사용할 수 있음
        • 공개 클라우드 환경은 비용이 저렴하고 지리적 복제 기능을 지원하는 저장소를 제공하기 때문에 대규모 데이터를 쉽게 관리 할 수 있음
      • 아마존 웹 서비스, 마이크로소프트 애저, 구글 클라우드 플랫폼은 아파치 스파크뿐만 아니라 관리형 하둡 클러스터를 제공함
      • 아마존 S3, 애저 Blob 저장소, 구글 클라우드 저장소와 같이 클러스터에서 분리된 글로벌 저장소 시스템(global storage system)을 사용하고 스파크 워크로드마다 별도의 클러스터를 동적으로 할당하는 것이 좋음, 연산 클러스터와 저장소 클러스터를 분리하면 연산이 필요한 경우에만 클러스터 비용을 지불함
      • 클라우드 환경에서는 설치형 클러스터 환경에서처럼 연산 관련 프로그램 및 환경을 관리할 필요가 없음, 클라우드 저장소를 연동해 스파크를 실행하면 클라우드의 유연성과 비용 절감 효과 그리고 관리 도구 등 다양한 장점을 활용할 수 있음
      • 데이터브릭스는 클라우드에서 스파크 서비스를 제공, 복잡하게 하둡을 설치하지 않아도 스파크 워크로드를 실행할 수 있음,
        • 클러스터 크기 자동 조절, 클러스터 자동 종료, 클라우드 스토리지에 최적화된 커넥터, 노트북과 스탠드얼론 잡을 위한 공동 작업 환경과 같이 클라우드 환경에서 스파크를 보다 효율적으로 실행할 수 있음, 스파크를 배울 때 활용할 수 있는 무료 커뮤니티 에디션을 제공함
  • 클러스터 매니저
    • 스탠드얼론 모드
      • 아파치 스파크 워크로드용으로 특별히 제작된 경량화 플랫폼, 하나의 클러스터에서 다수의 스파크 애플리케이션을 실행할 수 있음
      • 실행을 위한 간단한 인터페이스를 제공하며 대형 스파크 워크로드로 확장할 수 있음
      • 스파크 애플리케이션만 실행할 수 있다는 큰 단점, 클러스터 환경을 빠르게 구축해 스파크 애플리케이션을 실행해야 하거나 YARN이나 메소스를 사용해본 경험이 없다면 가장 좋은 선택지
    • YARN
      • 하둡 YARN은 잡 스케줄링과 클러스터 자원 관리용 프레임워크
      • 스파크는 기본적으로 하둡 YARN 클러스터 매니저를 지원하지만 하둡 자체가 필요한 것은 아님
      • 하둡 YARN은 다양한 실행 프레임워크를 지원하는 통합 스케줄러
      • cluster 모드는 YARN 클러스터에서 스파크 드라이버 프로세스를 관리하며 클라이언트는 애플리케이션을 생성한 다음 즉시 종료됨, client 모드는 드라이버가 클라이언트 프로세스에서 실행됨
      • 하둡 설정
        • 스파크를 이용해 HDFS의 파일을 읽고 쓰려면 스파크 클래스패스에 두 개의 하둡 설정 파일을 포함시켜야함
        • HDFS 클라이언트의 동작 방식을 결정하는 hdfs-site.xml 파일, 기본 파일 시스템의 이름을 설정하는 core-site.xml
        • /etc/hadoop/conf 하위에 설정 파일이 존재함
    • 메소스
      • 아파치 메소스는 CPU, 메모리, 저장소 그리고 다른 연산 자원을 머신에서 추상화함
      • 내고장성(fault-tolerant) 및 탄력적 분산 시스템(elastic distributed system)을 쉽게 구성하고 효과적으로 실행함
      • 메소스는 스파크처럼 짧게 실행되는 애플리케이션을 관리함, 웹 애플리케이션이나 다른 자원 인터페이스 등 오래 실행되는 애플리케이션까지 관리할 수 있는 데이터센터 규모의 클러스터 매니저를 지향함
      • 메소스는 스파크에서 지원하는 클러스터 매니저 중 가장 무거움, 대규모의 메소스 배포 환경이 있는 경우에만 사용하는 것이 좋음
  • 신뢰도가 낮은 환경에서 애플리케이션을 안전하게 실행할 수 있도록 저수준 API 기능을 제공함
  • 애플리케이션 스케줄링
    • 스파크는 여러 연산 과정에서 필요한 자원을 스케줄링할 수 있는 몇 가지 기능을 제공함
    • 각 스파크 애플리케이션은 독립적인 익스큐터 프로세스를 실행함, 클러스터 매니저는 스파크 애플리케이션 전체에 대한 스케줄링 기능을 제공함
    • 스파크 애플리케이션에서 여러 개의 잡(즉, 스파크 액션)을 다른 스레드가 제출한 경우 동시에 실행할 수 있음, 네트워크를 통한 요청에 응답하는 애플리케이션에 적합함
    • 단일 클러스터를 공유해 다수의 사용자가 스파크 애플리케이션을 실행하면 클러스터 매니저에 따라 자원 할당을 관리할 수 있는 여러 옵션이 있음
    • 동적 할당
      • 하나의 클러스터에서 여러 스파크 애플리케이션을 실행하려면 워크로드에 따라 애플리케이션이 점유하는 자원을 동적으로 조정해야 함
  • 기타 고려 사항
    • 애플리케이션의 개수와 유형
    • YARN은 HDFS를 사용하는 애플리케이션을 실행할 때 가장 적합하지만 그 외의 경우에는 잘 사용하지 않음, YARN은 HDFS의 정보를 사용하도록 설계되었기 때문에 클라우드 환경을 제대로 지원하지 못함, 연산용 클러스터와 저장소 클러스터가 강하게 결합되어 있음, 클러스터를 확장할 때 연산용 클러스터와 저장소 클러스터를 동시에 확장해야 함
    • 메소스는 YARN이 가진 개념을 조금 더 개선하였으며 다양한 애플리케이션 유형을 지원함, 메소스는 더 큰 규모의 클러스터에 적합함, 스파크 애플리케이션만 실행하려고 메소스 클러스터를 구축하는 것은 바람직하지 않음
    • 스파크 스탠드얼론 클러스터는 가장 가벼운 클러스터 매니저이며 비교적 이해와 활용이 쉬움, 더 많은 애플리케이션을 관리하는 인프라 구조를 구축해야 한다면 YARN이나 메소스를 사용하는 것이 훨씬 더 나을 수 있음
    • 클러스터 매니저에 상관없이 애플리케이션 디버깅에 필요한 로그를 기록하는 방식을 결정해야함, YARN과 메소스는 로그 기록 기능을 기본으로 제공함
    • 테이블 카탈로그(table catalog) 같은 저장된 데이터셋의 메타데이터를 관리하기 위해 메타스토어(metastore) 사용을 고려해야함
    • 클러스터에서 실행되는 스파크 잡을 디버깅하려면 최소한 기본적인 모니터링 솔루션이 필요함

모니터링과 디버깅

  • 모니터링 범위
    • 스파크 잡의 어느 지점에서 오류가 발생했는지 파악하려면 스파크 잡을 모니터링 해야함
    • 모니터링 대상 컴포넌트
      • 스파크 애플리케이션과 잡
        • 클러스터에서 사용자 애플리케이션이 실행되는 상황을 파악하거나 디버깅하려면 가장 먼저 스파크 UI와 스파크 로그를 확인해야함
        • 스파크 UI와 스파크 로그는 실행 중인 애플리케이션의 RDD와 쿼리 실행 계획 같은 개념적 수준의 정보를 제공함
      • JVM
        • 스파크는 모든 익스큐터를 개별 자바 가상 머신(JVM)에서 실행함
        • 각 가상 머신을 모니터링함, JVM 도구에는 스택 트레이스(stack trace)를 제공하는 jstack, 힙 덤프(heap dump)를 생성하는 jmap, 시계열 통계 리포트를 제공하는 jstat, 다양한 JVM 속성 변수를 시각화된 방식으로 탐색할 수 있는 jconsole 등이 있음, jvisualvm을 이용해 스파크 잡의 동작 특성을 알아볼 수도 있음, 저수준 디버깅이 필요하다면 JVM 도구가 더 유용함
      • OS와 머신
        • JVM은 호스트 운영 시스템(Operating System, OS)에서 실행됨
        • 머신의 상태를 모니터링해 정상 작동 중인지 확인하는 것은 매우 중요함
        • CPU, 네트워크, I/O 등의 자원에 대한 모니터링도 함께 해야함
        • dstat, iostat 그리고 iotop 같은 명령어를 사용하면 더 세밀하게 모니터링 할 수 있ㅇ므
      • 클러스터
        • 스파크 애플리케이션이 실행되는 클러스터도 모니터링 해야함
        • YARN, 메소스, 스탠드얼론 클러스터 매니저가 모니터링의 대상임
        • 인기 있는 클러스터 모니터링 도구로는 강글리아(Ganglia)와 프로메테우스(Prometheus)가 있음
  • 모니터링 대상
    • 실행 중인 사용자 애플리케이션의 프로세스(CPU, 메모리 사용률 등)
    • 프로세스 내부에서의 쿼리 실행 과정(예: 잡과 태스크)
    • 스파크는 수월한 모니터링을 지원하기 위해 드롭위자드 메트릭 라이브러리(Dropwizard Metrics Library) 기반의 메트릭 시스템(metric system)을 갖추고 있음
    • $SPARK_HOME/conf/metrics.properties 파일을 생성해 구성할 수 있음
    • spark.metric.conf 속성의 값을 지정해 메트릭 시스템 설정 파일의 경로를 변경할 수 있음
  • 스파크 로그
    • 로깅 프레임워크를 통해 스파크 로그와 사용자 애플리케이션 로그를 모두 확인함
    • 파이썬은 스파크 자바 기반 로깅 라이브러리를 사용할 수 없으므로 파이썬의 logging 모듈 또는 print 구문을 사용해 표준 오류로 결과를 출력해야함
    • spark.sparkContext.setLogLevel("INFO")
    • 로그 수준에 맞는 스파크 로그를 확인할 수 있음, 로깅 프레임워크를 사용한다면 스파크 로그와 함께 사용자가 직접 필요한 정보를 로그로 기록할 수 있으므로 스파크와 스파크 애플리케이션을 모두 점검할 수 있음
    • 로그를 검색하면 사용자가 직면한 문제를 정확히 파악할 수 있음
    • 사용자 애플리케이션에 다양한 로그를 기록하면 더 자세한 정보를 얻을 수 있음
  • 스파크 UI
    • 스파크 UI는 실행 중인 애플리케이션과 스파크 워크로드에 대한 평가지표를 모니터링할 수 있는 화면을 제공함
    • 스파크 UI 전체 탭
      • Jobs : 스파크 잡에 대한 정보를 제공함
      • Stages : 개별 스테이지(스테이지와 태스크를 포함합니다)와 관련된 정보를 제공함
      • Storage : 스파크 애플리케이션에 캐싱된 정보와 데이터 정보를 제공함
      • Environment : 스파크 애플리케이션의 구성과 설정 관련 정보를 제공함
      • Executors : 애플리케이션에 사용 중인 익스큐터의 상세 정보를 제공함
      • SQL : SQL과 DataFrame을 포함한 구조적 API 쿼리 정보를 제공함
    • 스파크 스테이지의 지향성 비순환 그래프(DAG), 프로젝션(컬럼 선택/추가/필터링)과 집계를 수행함
    • 기타 스파크 UI 탭
      • Storage 탭은 클러스터에 캐시된 RDD나 DataFrame과 관련된 정보를 제공하며 시간이 지나 캐시된 데이터가 사라졌는지 확인할 수 있음
      • Environment 탭은 클러스터에 설정된 다양한 스파크 속성뿐만 아니라 스칼라나 자바와 관련된 정보 등 런타임 환경 관련 정보를 제공함
  • 스파크 REST API
    • 스파크에서 제공하는 시각화 및 모니터링 도구는 REST API를 기반으로 만들어졌음, REST API는 스파크 UI와 동일한 정보를 제공하지만 SQL 관련 정보는 제공하지 않음
    • 스파크 UI에서 볼 수 있는 정보를 기반으로 사용자 정의 리포트 솔루션을 구축하려면 REST API를 사용해야함
    • 스파크 UI 히스토리 서버
      • 스파크 UI는 SparkContext가 실행되는 동안 사용할 수 있음
      • 정상적으로 종료되거나 비정상적으로 종료된 애플리케이션의 정보를 확인하려면 스파크 히스토리 서버(Spark History Server)를 이용해야함
      • 이벤트 로그를 저장하도록 스파크 애플리케이션을 설정하면 스파크 히스토리 서버를 이용해 스파크 UI와 REST API를 재구성할 수 있음
      • 특정 경로에 이벤트 로그를 저장하도록 스파크 애플리케이션을 설정해야함
      • spark.eventLog.enabled 속성을 true로 설정하고 spark.eventLog.dir 속성에 이벤트 로그 저장경로를 지정해야함, 이벤트 로그가 저장되면 스파크 히스토리 서버를 스탠드얼론 애플리케이션 형태로 실행할 수 있음
      • 스파크 히스토리 서버는 저장된 이벤트 로그를 기반으로 웹 UI를 자동으로 재구성함
  • 디버깅 및 스파크 응급 처치
    • 스파크 애플리케이션이 시작되지 않는 경우 : 신규 클러스터 매니저나 환경을 사용하는 경우에 자주 발생함
      • 징후와 증상
        • 스파크 잡이 시작되지 않음
        • 스파크 UI가 드라이버 노드를 제외한 클러스터의 노드 정보를 전현 표시하지 않음
        • 스파크 UI가 잘못된 정보를 표시하는 것 같음
      • 잠재적 대응법
        • 클러스터나 사용자 애플리케이션의 실행에 필요한 자원을 적절하게 설정하지 않았을때 발생함
        • IP를 잘못 입력했거나 포트가 열려 있지 않은 경우에 발생할 수 있으며 대부분 클러스터 영역, 머신, 설정과 관련된 문제
        • 설정한 포트로 클러스터 머신 간에 통신할 수 있는지 확인해야함
        • 스파크 자원 설정이 올바른지, 클러스터 매니저가 스파크를 실행할 수 있도록 적합하게 설정되었는지 확인해야함, 스파크 애플리케이션을 실행해 정상적으로 동작하는지 확인함
    • 스파크 애플리케이션 실행전에 오류가 발생한 경우 : 새로운 애플리케이션을 개발해 클러스터에서 실행할 때 발생할 수 있음
      • 징후와 증상
        • 명령이 전혀 실행되지 않으며 오류 메시지가 출력됨
        • 스파크 UI에서 잡, 스테이지, 태스크의 정보를 확인할 수 없음
      • 잠재적 대응법
        • 스파크 UI의 Environment 탭에서 애플리케이션 정보가 올바른지 확인한 다음 코드를 검토함
        • 단순한 오타나 잘못된 컬럼명을 사용하는 경우가 많으며 스파크 실행 계획(DataFrame API를 사용한 경우)을 만드는 과정에서 오류가 발생함
        • 잘못된 입력 파일 경로나 필드명을 사용하는 것과 같이 코드상에 문제가 없는지 확인하기 위해 스파크가 반환하는 오류를 살펴봐야함
        • 클러스터의 드라이버, 워커 그리고 사용하는 저장소 시스템 간의 네트워크 연결 상태를 다시 한번 확인함
        • 잘못된 버전의 저장소 접속용 라이브러리를 사용할 수 있으므로 라이브러리나 클래스패스를 다시 한번 확인함
    • 스파크 애플리케이션 실행 중에 오류가 발생한 경우
      • 징후와 증상
        • 하나의 스파크 잡이 전체 클러스터에서 성공적으로 실행되지만 다음 잡은 실패함
        • 여러 단계로 처리되는 쿼리의 특정 단계가 실패함
        • 어제 정상 동작한 예약 작업이 오늘은 실패함
        • 오류 메시지를 해석하기 어려움
      • 잠재적 대응법
        • 데이터가 존재하는지 또는 데이터가 올바른 포맷인지 확인함
        • 쿼리에 잘못된 컬럼명을 입력하였거나 컬럼,뷰,테이블이 존재하지 않을 수 있음
        • 어떤 컴포넌트가 연관되어 있는지(예: 어떤 연산자와 스테이지가 실행 중이었는지) 알아내기 위해 스택 트레이스(stack trace)를 분석해 단서를 찾아야 함
        • 입력 데이터와 데이터 포맷을 한 번 더 확인해 문제의 원인을 찾아야함, 잡의 태스크가 잠시 실행되다가 비정상적으로 종료된다면 입력 데이터 자체의 문제(스키마가 올바르게 지정되지 않았거나 특정 로우가 스키마 형태와 일치하지 않는 경우)일 수 있음
        • 데이터를 처리하는 코드에서 오류가 발생할 수 있음
    • 느리거나 뒤처진 태스크 : 머신 간의 작업이 균등하게 분배되지 않거나(데이터 치우침 : skew) 특정 머신이 다른 머신에 비해 처리 속도가 느린 경우(예: 하드웨어 문제)에도 발생함
      • 징후와 증상
        • 스파크 스테이지에서 대부분의 태스크가 정상적으로 실행되었으며 소수의 태스크만 남아 있음, 남아 있는 태스크는 오랫동안 실행됨
        • 스파크 UI에서 첫 번째 증상과 같은 느린 태스크를 확인할 수 있으며 동일한 데이터셋을 다룰 때 항상 발생함
        • 여러 스테이지에서 번갈아 가며 두 번째 증상과 같은 현상이 발생함
        • 스파크 애플리케이션을 실행하는 머신 수를 늘려도 상황이 개선되지 않고 여전히 특정 태스크가 다른 태스크에 비해 훨씬 오래 실행됨
        • 스파크 메트릭을 보면 특정 익스큐터가 다른 익스큐터에 비해 훨씬 많은 데이터를 읽거나 쓰고 있음을 알 수 있음
      • 잠재적 대응법
        • 낙오자(straggler) : 느린 태스크
        • DataFrame이나 RDD 파티션에 데이터가 균등하게 분할되지 않은 경우에 주로 발생함, 일부 익스큐터가 다른 익스큐터에 비해 훨씬 더 많은 양의 데이터를 처리함
        • 파티션별 데이터 양을 줄이기 위해 파티션 수를 증가시킴
        • 다른 컬럼을 조합해 파티션을 재분배함, 가능하면 익스큐터의 메모리를 증가시킴
        • 사용자 정의 함수를 구현할 때 객체 할당이나 비즈니스 로직에서 쓸모없는 부분이 있는지 확인하고 가능하면 DataFrame 코드로 변환함
        • 사용자 정의 함수(UDF)나 사용자 정의 집계 함수(UDAF)가 적당한 크기의 데이터를 사용해 실행되는지 확인함
    • 느린 집계 속도
      • 징후와 증상
        • groupBy 호출 시 느린 태스크가 발생함
        • 집계 처리 이후의 잡도 느림
      • 잠재적 대응법
        • 집계 연산 전에 파티션 수를 증가시키면 태스크별로 처리할 키 수를 줄일 수 있음
        • 익스큐터의 메모리를 증가시킨 경우 데이터가 많은 키를 처리하는 익스큐터는 다른 키를 처리하는 익스큐터에 비해 여전히 느릴수 있음
        • 집계 처리가 끝나고 이어서 실해되는 태스크가 느리다면 집계 처리된 데이터셋에 불균형 현상이 남아 있음을 의미함, 파티션을 임의로 재분배할 수 있도록 repartition 명령을 추가함
        • 모든 필터와 SELECT 구문이 집계 연산보다 먼저 처리된다면 필요한 데이터만 이용해서 집계 연산을 수행할 수 있음
        • 스파크는 잡 실행 전 null 값을 건너뛰기 위한 최적화를 수행함, null 대신 대체 기호를 사용하면 최적화를 수행할 수 없음
        • collect_list와 collect_set은 일치하는 모든 객체를 드라이버에 전송하기 때문에 아주 느리게 동작함
    • 느린 조인 속도 : 조인과 집계는 모두 셔플을 유발하기 때문에 동일한 증상과 대응법을 가짐
      • 징후와 증상
        • 조인 스테이지의 처리 시간이 오래 걸림, 하나 이상의 태스크가 여기에 해당할 수 있음
        • 조인 전후의 스테이지는 정상적으로 동작함
      • 잠재적 대응법
        • 많은 조인 연산은 다른 조인 타입으로 변경해 최적화(자동 또는 수동 방식)할 수 있음
        • 조인 순서를 변경하면서 잡의 처리 속도가 올라가는지 테스트함
        • 조인을 수행하기 전에 데이터셋을 분할하면 클러스터 노드 간 데이터 이동을 줄일 수 있음
        • 데이터 치우침 현상은 느린 조인을 유발하고 스파크 애플리케이션이나 익스큐터의 자원 할당량을 늘리는 것이 도움이 될 수 있음
        • 모든 필터와 SELECT 구문이 조인 연산보다 우선 처리된다면 필요한 데이터만 이용해서 조인 연산을 수행할 수 있음
        • 집계 연산과 마찬가지로 null 값을 제어하기 위해 " " 또는 "EMPTY" 같은 값을 대체 값으로 사용하는지 확인함
        • 스파크는 입력 DataFrame이나 테이블에 대한 통계가 없는 경우 브로드캐스트 조인을 사용하는 실행 계획을 생성하지 못함
    • 느린 읽기와 쓰기 속도 :
      • 징후와 증상
        • 분산 파일 시스템이나 외부 시스템의 데이터를 읽는 속도가 느림
        • 네트워크 파일 시스템이나 blob 저장소에 데이터를 쓰는 속도가 느림
      • 잠재적 대응법
        • 스파크의 투기적 실행(spark.speculation 속성을 true로 설정)을 사용하면 느린 읽기와 쓰기 속도를 개선하는데 도움
        • 스파크 클러스터와 저장소 시스템 간의 네트워크 대역폭이 충분하지 않을 수 있으므로 네트워크 성능에 문제가 없는지 반드시 확인해야 함
    • 드라이버 OutOfMemoryError 또는 응답없음 : 드라이버 OutOfMemoryError는 스파크 애플리케이션이 비정상적으로 종료되므로 매우 심각한 문제, 드라이버에 너무 많은 데이터를 전송해 메모리를 모두 소비한 경우에 자주 발생함
      • 징후와 증상
        • 스파크 애플리케이션이 응답하지 않거나 비정상적으로 종료됨
        • 드라이버 로그에 OutOfMemoryError 또는 가비지 컬렉션과 관련된 메시지가 출력됨
        • 명령이 장시간 실행되거나 실행되지 않음
        • 반응이 거의 없음
        • 드라이버 JVM의 메모리 사용량이 많음
      • 잠재적 대응법
        • 사용자 코드에서 collect 메서드 같은 연산을 실행해 너무 큰 데이터셋을 드라이버에 전송하고 시도했을 수 있음
        • 브로드캐스트하기에 너무 큰 데이터를 브로드캐스트 조인에 사용했을 수 있음, 스파크의 최대 브로드캐스트 조인 설정을 이용해 브로드캐스트할 크기를 제어함
        • 장시간 실행되는 애플리케이션은 드라이버에 많은 양의 객체를 생성해 해제하지 못할 수 있음, 자바의 jmap 도구를 사용해 힙 메모리의 히스토그램을 확인할 수 있음
        • 가능하면 더 많은 데이터를 다룰 수 있도록 드라이버의 가용 메모리를 증가시킴, JVM 메모리 부족 현상은 파이썬과 같은 다른 언어를 함께 사용하는 경우에 발생할 수 있음
    • 익스큐터 OutOfMemoryError 또는 응답 없음 : 어떤 경우에는 스파크 애플리케이션이 이런 문제를 자동으로 복구할 수 있음
      • 징후와 증상
        • 익스큐터 로그에 OutOfMemoryError 또는 가비지 컬렉션과 관련된 메시지가 출력되므로 스파크 UI에서 확인할 수 있음
        • 익스큐터가 비정상적으로 종료되거나 응답하지 않음
        • 특정 노드의 느린 태스크가 복구되지 않음
      • 잠재적 대응법
        • 익스큐터의 가용 메모리와 익스큐터 수를 증가시킴
        • PySpark 워커의 크기를 증가시킴
        • 익스큐터 로그에 가비지 컬렉션 오류 메시지가 발생했는지 확인함, 실행 중인 태스크의 일부가 너무 많은 객체를 생성하고 있어 가비지 컬렉션이 발생할 수 있음
        • null 값을 정확하게 제어하기 위해 " " 또는 "EMPTY" 같은 값을 기본값으로 사용한느 것은 아닌지 확인함
        • RDD와 Dataset은 객체를 생성하기 때문에 이 문제가 발생할 가능성이 더 큼, 가능하면 사용자 정의 함수의 사용을 줄이고 스파크의 구조적 API를 더 많이 사용함
        • 자바의 jmap 도구를 사용해 익스큐터 힙 메모리의 히스토그램을 볼 수 있음
        • 키-값 저장소 같이 다른 워크로드를 처리하는 노드에 익스큐터가 위치한다면 스파크 잡을 다른 작업과 분리해야함
    • 의도하지 않은 null 값이 있는 결과 데이터
      • 징후와 증상
        • 트랜스포메이션이 실행된 결과에 의도치 않은 null 값이 발생함
        • 잘 동작하던 운영 환경의 예약 작업이 더는 동작하지 않거나 정확한 결과를 생성하지 못함
      • 잠재적 대응법
        • 비즈니스 로직을 변경하지 않았다면 데이터 포맷이 변경되었을 수 있음
        • 어큐뮬레이터를 사용해 레코드나 특정 데이터 타입의 수를 확인함
    • 디스크 공간 없음 오류
      • 징후와 증상
        • 'no space left on disk' 오류 메시지와 함께 잡이 실패함
      • 잠재적 대응법
        • 더 많은 디스크 공간을 확보하면 이 문제를 쉽게 해결함, 작업 노드의 디스크를 늘리거나 클라우드 환경의 외부 저장소를 추가해 디스크 공간을 확보함
        • 제한된 용량의 저장소를 가진 클러스터를 사용하는 경우 데이터 치우침 현상이 발생하면 일부 노드의 저장소 공간이 모두 소진됨, 데이터 파티션을 재분배하는 것이 좋음
        • 스파크에서 로그를 유지하는 기간을 정함, 문제가 되는 머신의 오래된 로그 파일과 셔플 파일을 수동으로 제거함
    • 직렬화 오류
      • 징후와 증상
        • 직렬화 오류와 함께 잡이 실패함
      • 잠재적 대응법
        • 구조적 API를 사용하는 경우 직렬화 오류는 거의 나타나지 않음, UDF나 RDD를 이용해 개발된 사용자 정의 로직을 수행하는 익스큐터에서는 발생할 수 있음
        • UDF나 함수로 직렬화할 수 없는 코드 또는 데이터를 다루거나 직렬화할 수 없는 이상한 데이터 타입을 다루는 경우에는 주로 발생함
        • 직렬화 대상 사용자 클래스를 실제로 등록해 직렬화 성공 여부를 확인해야 함
        • 자바나 스칼라 클래스에서 UDF를 생성할 때는 UDF 내에서 인클로징 객체(enclosing object - 내부 클래스를 감싸고 있는 외부 클래스의 객체 인스턴스)의 필드를 참조하면 안됨

성능 튜닝

  • 잡의 실행 속도를 높이기 위한 성능 튜닝 방법
  • 네트워크가 엄청나게 빠른 환경이라면 가장 큰 비용이 드는 셔플 처리 시간이 줄어 더 빠르게 스파크 잡을 처리할 수 있음
  • 최적화 포인트
    • 코드 수준의 설계(예: RDD와 DataFrame 중 하나를 선택함)
    • 보관용 데이터
    • 조인
    • 집계
    • 데이터 전송
    • 애플리케이션별 속성
    • 익스큐터 프로세스의 JVM
    • 워커 노드
    • 클러스터와 배포 환경 속성
  • 스파크 잡의 성능 튜닝 방법
    • 속성값을 설정하거나 런타임 환경을 변경해 간접적으로 성능을 높임, 전체 스파크 애플리케이션이나 스파크 잡에 영향을 미침
    • 개별 스파크 잡, 스테이지, 태스크 성능 튜닝을 시도하거나 코드 설계를 변경해 직접적으로 성능을 높일 수 있음, 애플리케이션의 특정 영역에만 영향을 주므로 전체 스파크 애플리케이션이나 스파크 잡에는 영향을 미치지 않음
  • DataFrame vs SQL vs Dataset vs RDD
    • 모든 언어에서 DataFrame, Dataset 그리고 SQL의 속도는 동일함, DataFrame은 어떤 언어에서 사용하더라도 성능은 동일함
    • 파이썬이나 R을 사용해 UDF를 정의하면 성능 저하가 발생할 수 있으므로 자바와 스칼라를 사용해 UDF를 정의하는 것이 좋음
    • 파이썬에서 RDD 코드를 실행한다면 파이썬 프로세스를 오가는 많은 데이터를 직렬화해야함, 매우 큰 데이터를 직렬화하면 엄청난 비용이 발생하고 안정성까지 떨어질 수 있음
  • 동적 할당
    • 스파크 워크로드에 따라 애플리케이션이 차지할 자원을 동적으로 조절하는 메커니즘을 제공
    • 사용자 애플리케이션은 더 이상 사용하지 않는 자원을 클러스터에 반환하고 필요할 때 다시 요청할 수 있음
    • spark.dynamicAllocation.enabled 속성값을 true로 설정
  • 파일 기반 장기 데이터 저장소
    • 데이터를 바이너리 형태로 저장하려면 구조적 API를 사용하는 것이 좋음
    • CSV 같은 파일은 구조화되어 있는 것처럼 보이지만 파싱 속도가 아주 느리고 예외 상황이 자주 발생함
    • 파케이는 데이터를 바이너리 파일에 컬럼 지향 방식으로 저장함
    • 쿼리에서 사용하지 않는 데이터를 빠르게 건너뛸 수 있도록 몇 가지 통계를 함께 저장함
  • 분할 가능한 파일 포맷과 압축
    • 분할 가능한 포맷을 사용하면 여러 태스크가 파일의 서로 다른 부분을 동시에 읽을 수 있음
  • 테이블 파티셔닝
    • 테이블 파티셔닝은 데이터의 날짜 필드를 같은 키를 기준으로 개별 디렉터리에 파일을 저장하는 것을 의미함
    • 쿼리에서 컬럼을 기준으로 자주 필터링한다면 컬럼을 기준으로 파티션을 생성하는 것이 좋음
    • 파티셔닝을 사용하면 쿼리에서 읽어야 하는 데이터양을 크게 줄일 수 있어 쿼리를 훨씬 빠르게 처리할 수 있음
    • 파티셔닝을 할 때 너무 작은 단위로 분할하면 작은 크기의 파일이 대량으로 생성될 수 있음, 저장소 시스템에서 전체 파일의 목록을 읽을 때 오버헤드가 발생함
  • 버켓팅
    • 데이터를 버켓팅하면 스파크는 사용자가 조인이나 집계를 수행하는 방식에 따라 데이터를 사전 분할(pre-partition)할 수 있음
    • 데이터를 한두 개 파티션에 치우치지 않고 전체 파티션에 균등하게 분산시킬 수 있음
  • 파일 수
    • 데이터를 파티션이나 버켓으로 구성하려면 파일 수와 저장하려는 파일 크기도 고려해야함
    • 작은 파일이 많은 경우 파일 목록 조회와 파일 읽기 과정에서 부하가 발생함
    • 트레이드 오프를 감안해야함
    • 입력 데이터 파일이 최소 수십 메가바이트의 데이터를 갖도록 파일의 크기를 조정하는것이 좋음
  • 데이터 지역성(data locality)
    • 기본적으로 네트워크를 통해 데이터 블록을 교환하지 않고 특정 데이터를 가진 노드에서 동작할 수 있도록 지정하는 것을 의미함
  • 통계 수집
    • 스파크의 구조적 API를 사용하면 비용 기반 쿼리 옵티마이저가 내부적으로 동작함
    • 쿼리 옵티마이저는 입력 데이터의 속성을 기반으로 쿼리 실행 계획을 만듬
  • 셔플 설정
    • 스파크의 외부 셔플 서비스를 설정하면 머신에서 실행되는 익스큐터가 바쁜 상황에서도(예:가비지 컬렉션 수행) 원격 머신에서 셔플 데이터를 읽을 수 있으므로 성능을 높일 수 있음
    • 파티션 수가 너무 적으면 소수의 노드만 작업을 수행하기 때문에 데이터 치우침 현상이 발생함
    • 파티션 수가 너무 많으면 파티션을 처리하기 위한 태스크를 많이 실행해야 하므로 부하가 발생함
    • 셔플을 수행할 때는 결과 파티션당 최소 수십 메가바이트의 데이터가 포함되어야 함
  • 메모리 부족과 가비지 컬렉션
    • 스파크 잡 실행 과정 중에 익스큐터나 드라이버 머신의 메모리가 부족하거나 '메모리 압박(memory pressure)으로 인해 태스크를 완료하지 못할 수 있음
    • 애플리케이션 실행 중에 메모리를 너무 많이 사용한 경우
    • 가비지 컬렉션이 자주 수행되는 경우
    • JVM 내에서 객체가 너무 많이 생성되어 더 이상 사용하지 않는 객체를 가비지 컬렉션이 정리하면서 실행 속도가 느려지는 경우
  • 가비지 컬렉션 영향도 측정
    • 가비지 컬레션의 튜닝
      • 가비지 컬렉션의 발생 빈도와 소요 시간에 대한 통계를 모으는 것, spark.executor.extraJavaOptions 속성에 스파크 JVM 옵션으로 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 값을 추가해 통계를 모을 수 있음
      • 속성값을 설정한 다음 스파크 잡을 실행하면 가비지 컬렉션이 발생할 때매다 워커의 로그에 메시지가 출력됨
      • 로그는 드라이버가 아닌 워커 노드의 stdout 파일에 저장됨
  • 가비지 컬렉션 튜닝
    • 자바 힙 공간은 Young 영역과 Old 영역으로 나누어짐, Young 영역은 수명이 짧은 객체를 유지함, 반면 Old 영역은 오래 살아 있는 객체를 대상으로 함
    • Young 영역은 Eden, Survivor1, Survivor2 세 영역으로 다시 나뉨
    • 가비지 컬렉션 수행 절차
      • Eden 영역이 가득 차면 Eden 영역에 대한 마이너 가비지 컬렉션(minor garbage collection)이 실행됨, Eden 영역에서 살아남은 객체와 Survivor1 영역의 객체는 Survivor2 영역으로 복제됨
      • 두 Survivor 영역을 교체함
      • 객체가 아주 오래되었거나 Survivor2 영역이 가득 차면 객체는 Old 영역으로 옮겨짐
      • 마지막으로 Old 영역이 거의 가득 차면 풀 가비지 컬렉션(full garbage collection)이 발생함, 풀 가비지 컬렉션은 힙 공간의 모든 객체를 추적해 참조 정보가 없는 객체들을 제거함, 나머지 객체를 빈곳으로 옮기는 작업을 수행함, 풀 가비지 컬렉션은 가장 느린 가비지 컬렉션 연산임
    • 수명이 긴 캐시 데이터셋을 Old 영역에 저장함, Young 영역에서 수명이 짧은 모든 객체를 보관할 수 있도록 충분한 공간을 유지함
  • 직접적인 성능 향상 기법
    • 병렬화
      • spark.defaul.parallelism과 spark.sql.shuffle.partitions의 값을 클러스터 코어 수에 따라 설정함
      • 스테이지에서 처리해야할 데이터양이 매우 많다면 클러스터의 CPU 코어당 최소 2~3개의 태스크를 할당함
    • 향상된 필터링
    • 파티션 재분배와 병합
      • 파티션 재분배 과정은 셔플을 수반함, 클러스터 전체에 걸쳐 데이터가 균등하게 분배되므로 잡의 전체 실행 단계를 최적화함
      • 일반적으로 가능한 한 적은 양의 데이터를 셔플하는 것이 좋음
      • 셔플 대신 동일 노드의 파티션을 하나로 합치는 coalesce 메서드를 실행해 DataFrame이나 RDD의 전체 파티션 수를 먼저 줄옇야 함, 이보다 느린 repartition 메서드는 부하를 분산하기 위해 네트워크로 데이터를 셔플링함
      • 파티션 재분배는 조인이나 cache 메서드를 호출 시 매우 유용함
      • 파티션 재분배 과정은 부하를 유발하지만 애플리케이션의 전체적인 성능과 스파크 잡의 병렬성을 높일 수 있음을 기억해야함
    • 사용자 정의 파티셔닝
      • 잡이 여전히 느리게 동작하거나 불안정하다면 RDD를 이용한 사용자 정의 파티셔닝 기법을 적용함
    • 사용자 정의 함수(UDF)
    • 임시 데이터 저장소(캐싱)
      • 애플리케이션에서 같은 데이터셋을 계속해서 재사용한다면 캐싱을 사용해 최적화할 수 있음
      • 캐싱은 클러스터의 익스큐터 전반에 걸쳐 만들어진 임시 저장소(메모리나 디스크)에 DataFrame, 테이블 또는 RDD를 보관해 빠르게 접근할 수 있도록 함
      • 캐싱이 필요한 상황 : 스파크의 대화형 세션이나 스탠드얼론 애플리케이션에서 특정 데이터셋(DataFrame 또는 RDD)을 다시 사용하려 할 경우
      • 캐싱은 지연 연산, 데이터에 접근해야 캐싱이 일어남
DF1 = spark.read.format("csv")\
  .option("inferSchema", "true")\
  .option("header","true")\
  .load("/data/flight-data/csv/2015-summary.csv")
DF2 = DF1.groupBy("DEST_COUNTRY_NAME").count().collect()
DF3 = DF1.groupBy("ORIGIN_COUNTRY_NAME").count().collect()
DF4 = DF1.groupBy("count").count().coolect()
  
DF1.cache()
DF1.count()
- DataFrame의 cache 메서드를 호출하면 스파크는 최초 연산 시 데이터를 메모리나 디스크에 저장함, 다음 캐싱된 DataFrame을 사용하는 쿼리를 수행하면 원본 파일을 읽는 대신 메모리에 저장된 데이터를 참조함
- 캐싱은 지연 처리, 스파크는 DataFrame에 액션을 실행하는 시점에 데이터를 캐싱함
DF2 = DF1.groupBy("DEST_COUNTRY_NAME").count().collect()  
DF3 = DF1.groupBy("ORIGIN_COUNTRY_NAME").count().collect()
DF4 = DF1.groupBy("count").count().collect()
- 캐싱은 동일한 데이터를 여러 번 접근하는 반복적인 머신러닝 워크로드에도 매우 적합함
- 스파크의 cache 명령은 기본적으로 데이터를 메모리에 저장함
- 클러스터 전체 메모리가 가득 찼다면 데이터셋의 일부 데이터만 캐싱함
- 정교한 제어를 위해 persist 메서드를 사용함, persist 메서드는 데이터 캐시 영역(메모리, 디스크 또는 둘 다)을 지정하는 StorageLevel 객체를 파라미터로 사용함  
  • 조인
    • 동등 조인은 최적화하기 가장 쉬우므로 우선적으로 사용하는 것이 좋음
    • 조인 순서 변경은 내부 조인을 사용해 필터링하는 것과 동일한 효과를 누림, 안정성과 최적화를 위해 카테시안 조인이나 전체 외부 조인의 사용을 최대한 피해야 함
    • 데이터를 적절하게 버켓팅하면 조인 수행 시 거대한 양의 셔플이 발생하지 않도록 미리 방지할 수 있음
  • 집계
    • 집계 전에 충분히 많은 수의 파티션을 가질 수 있도록 데이터를 필터링하는 것이 최선의 방법
  • 브로드캐스트 변수

스트리밍

스트림 처리의 기초

  • 스트림 처리(stream processing)는 신규 데이터를 끊임없이 처리해 결과를 만들어내는 행위
  • 입력 데이터는 스트림 처리 시스템에 도착한 일련의 이벤트(예: 신용카드 전표 정보, 웹사이트 클릭, IoT 장비의 센서 데이터)
  • 연속형 애플리케이션(continuous application) : 스트리밍, 배치 그리고 대화형 작업으로 구성된 통합 애플리케이션
  • 스트림 처리의 장점
    • 대부분의 배치 애플리케이션은 직관적이며 유지 보수와 개발이 비교적 단순함, 배치 형태로 데이터를 처리하면 스트리밍 시스템보다 훨씬 높은 처리량을 얻을 수 있음
    • 스트림 처리는 대기 시간이 짧음
    • 자동으로 연산 결과의 증분을 생성하므로 반복적인 배치 작업보다 결과를 수정하는 데 더 효율적
  • 스트림 처리의 문제점
    • 애플리케이션 타임스탬프(이벤트 시간) 기준으로 순서가 뒤섞인 데이터 처리하기
    • 대규모의 상태 정보 유지하기
    • 높은 데이터 처리량 보장하기
    • 장애 상황에서도 정확히 한 번 처리하기
    • 부하 불균형과 뒤처진 서버 다루기
    • 이벤트에 빠르게 응답하기
    • 다른 저장소 시스템의 외부 데이터와 조인하기
    • 신규 이벤트 도착 시 출력 싱크와 갱신 방법 결정하기
    • 출력 시스템에 데이터 저장 시 트랜잭션 보장하기
    • 런타임에 비즈니스 로직 변경하기
  • 레코드 단위 처리와 선언형 API
    • 각 이벤트를 애플리케이션에 전달하고 사용자 코드에 반응하도록 만드는 것
    • 아파치 스톰(Apache Storm) 같은 기존의 여러 스트리밍 시스템에 사용했음, 애플리케이션이 데이터 처리와 관련된 완벽한 제어권을 가져야 하는 분야에서 중요한 위치를 차지하고 있음
    • 선언형(declarative) API 를 제공함, 애플리케이션을 정의할 때 어떻게 신규 데이터를 처리하고 장애 상황에서 복구할지 지정하는 대신 무엇을 처리할지 지정함
    • 스파크의 DStream API는 맵, 리듀스 그리고 필터 같은 연산을 기바능로 하는 함수형 API를 제공함, DStream API는 내부적으로 각 연산자의 데이터 처리량과 연산 관련 상태 정보를 자동으로 추적하고 관련 상태를 신뢰도 있게 저장함
  • 이벤트 시간과 처리 시간
    • 원천 시스템에서 각 레코드에 기록한 타임스탬프를 기반으로 데이터를 처리하는 방식을 의미
  • 연속형 처리와 마이크로 배치 처리
    • 연속형 처리 기반의 시스템을 구성하는 각 노드는 다른 노드에 전송하는 메시지를 끊임없이 수신하고 새로 갱신된 정보를 자신의 하위 노드로 전송함
  • 스파카의 스트리밍 API
    • DStream API
      • 제약사항
        • 많은 장점을 제공하는 구조적 테이블 개념의 DataFrame이나 Dataset과 달리 자바나 파이썬의 객체와 함수에 매우 의존적
        • DStream API는 기본저긍로 처리 시간을 기준으로 동작함
        • DStream API는 마이크로 배치 형태로만 동작함
        • DStream의 일부 API는 마이크로 배치의 주기를 참조하고 있어 다른 처리 모드를 지원할 수 없음
    • 구조적 스트리밍
      • 구조적 스트리밍은 스파크의 구조적 API를 기반으로 하는 고수준 스트리밍 API
      • 스칼라, 자바, 파이썬, R 그리고 SQL을 사용해 구조적 처리를 할 수 있는 모든 환경에서 사용함
      • 모든 윈도우 연산에서 자동으로 이벤트 시간 처리를 지원함
      • 구조적 스트리밍은 단순한 스트림 처리를 넘어 스트리밍, 배치 그리고 대화형 쿼리를 하나로 통합한 연속형 애플리케이션을 쉽게 만들 수 있도록 설계되었음

구조적 스트리밍의 기초

  • 구조적 스트리밍은 스파크 SQL 엔진 기반의 스트림 처리 프레임워크, 스파크의 구조적 API(DataFrame, Dataset, SQL)를 사용함
  • 연속형 애플리케이션은 스트리밍 작업, 배치 작업, 스트리밍과 오프라인 데이터의 조인 그리고 대화형 비정형 쿼리의 실행을 조합해 데이터에 실시간으로 반응하는 통합 빅데이터 처리 애플리케이션을 의미함
  • 입력 소스
    • 아파치 카프카 0.10 버전
    • HDFS나 S3 등 분산 파일 시스템의 파일(스파크는 디렉터리의 신규 파일을 계속해서 읽음)
    • 테스트용 소켓 소스
  • 싱크(sink) : 스트림의 결과를 저장할 목적지를 명시함, 싱크와 실행 엔진은 데이터 처리의 진행 상황을 신뢰도 있고 정확하게 추적하는 역할을 함
    • 출력용 싱크
      • 아파치 카프카 0.10
      • 거의 모든 파일 포맷
      • 출력 레코드에 임의 연산을 실행하는 foreach 싱크
      • 테스트용 콘솔 싱크
      • 디버깅용 메모리 싱크
  • 출력 모드(output mode)
    • 정적인(static) 형태의 구조적 API처럼 출력모드 지정
    • append : 싱크에 신규 레코드만 추가
    • update : 변경 대상 레코드 자체를 갱신
    • complete : 전체 출력 내용 재작성하기
  • 트리거
    • 출력 모드가 데이터 출력 방식을 정의한다면 트리거는 데이터 출력 시점을 정의함
    • 구조적 스트리밍에서 언제 신규 데이터를 확인하고 결과를 갱신할지 정의함
    • 구조적 스트리밍은 기본적으로 마지막 입력 데이터를 처리한 직후에 신규 입력 데이터를 조회해 최단 시간 내에 새로운 처리 결과를 만들어냄
  • 이벤트 시간 처리
    • 무작위로 도착한 레코드 내부에 기록된 타임스탬프를 기준으로 함
  • 이벤트 시간 데이터(event-time)
    • 이벤트 시간은 데이터에 기록된 시간 필드
    • 스파크는 데이터가 유입된 시간이 아니라 데이터 생성 시간을 기준으로 처리함, 데이터가 늦게 업로드되거나 네트워크 지연으로 데이터의 순서가 뒤섞인 채 시스템으로 들어와도 처리할 수 있음
  • 워터마크(watermark)
    • 시간 제한을 설정할 수 있는 스트리밍 시스템의 기능
    • 늦게 들어온 이벤트를 어디까지 처리할지 시간을 제한할 수 있음
  • 스트림 트랜스포메이션
    • 구조적 스트리밍은 DataFrame의 모든 함수와 개별 컬럼을 처리하는 선택과 필터링 그리고 단순 트랜스포메이션을 지원함
  • 입력과 출력
    • 파일 소스와 싱크
      • 스트리밍에서 파일 소스 / 싱크와 정적 파일 소스를 사용할 때 유일한 차이점은 트리거 시 읽을 파일 수를 결정함
      • maxFilesPerTrigger 옵션을 사용함
      • 모든 파일은 스트리밍 작업에서 바라보는 입력 디렉터리에 원자적으로 추가되어야 함
    • 카프카 소스와 싱크
      • 아파치 카프카는 데이터 스트림을 위한 발행-구독 방식의 분사형 시스템
      • 카프카는 메시지 큐 방식처럼 레코드의 스트림을 발행하고 구독하는 방식으로 사용함
      • 발행된 메시지는 내고장성을 보장하는 저장소에 저장됨, 카프카는 분산형 버퍼로 생각할 수 있음
    • 카프카 소스에서 메시지 읽기
      • assign : 토픽 뿐만 아니라 읽으려는 파티션까지 세밀하게 지정하는 옵션
      • subscribe , subscribePattern : 각각 토픽 목록과 패턴을 지정해 여러 토픽을 구독하는 옵션
      • 카프카 서비스에 접속할 수 있도록 kafka.bootstrap.servers 값을 지정함
      • startingOffsets 및 endingOffsets : 쿼리를 시작할 때 읽을 지점
      • fialOnDataLoss : 데이터 유실(예:토픽이 삭제되거나 오프셋이 범위를 벗어났을 때)이 일어났을 때 쿼리를 중단할 것인지 지정함
      • maxOffsetsPerTrigger : 특정 트리거 시점에서 읽을 오프셋의 전체 개수

이벤트 시간과 상태 기반 처리

  • 이벤트 시간 처리
    • 이벤트가 실제로 발생한 시간(이벤트 시간 : event time), 이벤트가 시스템에 도착한 시간 또는 처리된 시간(처리 시간 : processing time)
    • 이벤트 시간 : 데이터에 기록되어 있는 시간
    • 처리 시간 : 스트림 처리 시스템이 데이터를 실제로 수신한 시간
  • 워터마크로 지연 데이터 제어하기
    • 워터마크는 특정 시간 이후에 처리에서 제외할 이벤트나 이벤트 집합에 대한 시간 기준, 지연 도착 현상은 네트워크 지연이나 장비의 연결 실패 또는 다른 여러가지 원인으로 인해 발생함
  • 스트림에서 중복 데이터 제거하기
    • 중복을 찾기 위해서는 여러 레코드를 반드시 한 번에 처리함
    • 한 번에 많은 레코드를 처리해야 하므로 중복 제거는 처리 시스템에 큰 부하를 발생함
    • 구조적 스트리밍은 태생적으로 최소 한 번(at-leat-once) 처리하는 방식을 제공하는 메시지 시스템을 쉽게 사용할 수 있음
    • 처리시에 키를 기준으로 중복을 제거해 정확히 한 번 처리 방식을 지원함
    • 스파크는 데이터 중복을 제거하기 위해 사용자가 지정한 키를 유지하면서 중복 여부를 확인함
  • 임의적인 상태 기반 처리
    • mapGroupsWithState API : 데이터의 각 그룹에 맵 연산을 수행하고 각 그룹에서 최대 한 개의 로우를 만들어냄
    • flatMapGroupsWithState API : 데이터의 각 그룹에 맵 연산을 수행하고 각 그룹에서 하나 이상의 로우를 만들어냄
    • 타임 아웃 : 중간 상태를 제거하기 전에 기다려야 하는 시간을 정의함
      • state.hasTimeOut 값이나 valus 이터레이터가 비어 있는지 확인하는 방식으로 타입아웃 정보를 얻을 수 있음
    • 세션 : 발생한 일련의 이벤트를 가진 지정되지 않은 시간 윈도우(unspecified time windows)

운영 환경에서의 구조적 스트리밍

  • 내고장성과 체크포인팅
    • 클러스터 머신에 문제가 생길 수도 있고 마이그레이션 실수로 스키마가 변경될 수도 있음, 클러스터나 애플리케이션을 재시작해야함
    • 체크포인팅과 WAL 정보를 저장하는 신뢰도 높은 파일 시스템(예: HDFS, S3 또는 모든 호환 가능한 파일 시스템)의 체크포인트 경로를 쿼리에 설정
  • 애플리케이션 변경하기
    • 체크포인팅은 현재까지 처리한 스트림과 모든 중간 상태를 저장함
  • 유입률과 처리율
    • 유입률(input rate) : 입력 소스에서 구조적 스트리밍 내부로 데이터가 유입되는 양
    • 처리율(processing rate) : 유입된 데이터를 처리하는 속도
    • 유입률이 처리율보다 훨씬 큰 경우 : 스트림이 뒤처지므로 클러스터의 규모를 늘려 더 많은 데이터를 처리함
  • 스파크 UI
    • 스파크 UI에서 구조적 스트리밍과 관련된 잡, 태스크 그리고 처리 메트릭을 확인할 수 있음
    • 각각의 스트리밍 애플리케이션은 스파크 UI에서 트리거마다 생성된 짧은 잡이 누적된 형태로 나타남, 애플리케이션의 메트릭, 쿼리 실행 계획, 태스크 주기 그리고 로그 정보도 제공함

고급 분석과 머신러닝

  • 머신러닝
    • 다양한 특징(feature)을 기바능로 각 샘플에 부여된 레이블(label)을 예측하는 분류/회귀 문제를 포함한 지도 학습
    • 사용자의 과거 행동에 기반하여 제품을 제안하는 추천 에닌
    • 군집 분석, 이상징후 탐지, 토픽 모델링(topic modeling)과 같이 데이터 구조를 파악하기 위한 비지도 학습
    • 소셜 네트워크상에서 유의미한 패턴을 찾는 그래프 분석
  • 지도 학습
    • 레이블(또는 종속변수)을 포함하는 과거 데이터를 사용하여 모델을 학습시킨 후 데이터 포인트(또는 데이터)의 다양한 특징을 기반으로 해당 레이블값을 예측하는 것
  • 분류
    • 범주형(불연속적이고 유한한 값의 집합) 종속변수를 예측하는 알고리즘을 학습시키는 행위
  • 비지도 학습
    • 이상징후 탐지 : 시간의 흐름에 따라 주기적으로 발생하는 표준 이벤트가 있는 곳에서 예외적으로 비표준 이벤트가 발생하는 경우 이를 보고해야함
    • 사용자 세분화 : 다양한 사용자의 행동이 데이터로 주어진다면 특정 사용자가 다른 사용자와 어떤 속성이 유사한지 더 잘 이해
    • 토픽 모델링(문서 주제 추출) : 문서가 데이터로 주어졌을 때 문서에 나타난 다양한 단어를 분석하여 단어 간에 내재된 연관성이 있는지 살펴볼 수 있음
  • 그래프 분석
    • 객체를 가리키는 정점(vertex)과 해당 객체 간의 관계를 나타내는 에지(edge)를 지정하는 구조에 대한 연구
    • 정점은 사람과 상품, 에지는 구매
    • 사기거래 예측
    • 이상징후 탐지 : 각 개체의 네트워크가 서로 어떻게 연결되어 있는지 살펴봄으로써 담당자가 추가로 조사할 특잇값 및 이상값을 식별함
    • 네트워크 특성 분류 : 네트워크 내의 특정 정점에 관한 몇 가지 특성이 주어지면, 이미 존재하는 노드와의 연결 구조에 따라 다른 정점을 분류할 수 있음
  • MLib
    • 스파크에 내장된 패키지로서 데이터 수집과 정제, 특징 추출과 선택, 대규모 데이터를 대상으로 한 지도 및 비지도 머신러닝 모델의 학습과 튜닝, 그리고 이러한 모델을 운영 환경에서 사용할 수 있도록 도와주는 인터페이스를 제공함
    • 단일 머신을 기반으로 머신러닝을 수행할 수 있는 도구는 매우 다양하고 사용자가 선택할 수 있는 좋은 옵션이 많이 들어 있음, 하지만 이러한 도구들은 학습할 수 있는 데이터 크기나 처리시간 면에서 분명한 한계 존재
    • 스파크로 데이터를 전처리하고 특징을 생성하여 대량의 데이터로부터 학습 및 테스트를 위한 데이터샛 생성에 걸리는 시간을 줄인 후 단일 머신 기반의 학습 라이브러리를 활용하여 주어진 데이터셋을 학습함
    • 입력 데이터나 모델 크기의 단일 머신에 올려놓기 너무 어렵거나 불편할 경우 스파크를 사용하여 분산 처리 기반의 머신러닝을 간단히 수행할 수 있음
  • 고수준 MLib의 개념
    • 변환자 : 원시 데이터를 다양한 방식으로 변환하는 함수
      • 새로운 상호작용 변수를 생성하거나, 컬럼을 정규화하거나, Integer를 모델에 입력하기 위해서 Double 타입으로 변경하는 것 등이 될 수 있음
      • 문자열로 입력된 범주형 변수를 MLlib에서 인식할 수 있는 수치형 변숫값으로 변환하는 작업이 변환자의 역할에 해당하며, 주로 데이터 전처리 및 피처 엔지니어링 과정에 사용됨
      • 변환자는 특징 수를 줄이거나, 더 많은 특징을 추가하거나, 현재 도출된 특징을 조작하거나, 단순히 데이터를 적절히 구성하는데 사용됨
      • MLlib을 사용할 때 스파크에서 제공하는 머신러닝 알고리즘의 모든 입력변수는 Double 타입(레이블용)이나 Vector[Double] 타입(특징용)으로 구성되어야함
    • 추정자
      • 데이터를 초기화하는 일종의 변환자, 수치형 데이터를 정규화하려면 컬럼의 현잿값 정보를 사용하여 변환을 초기화해야함
      • 스파크에서는 사용자가 데이터로부터 모델을 학습시키기 위해 사용하는 알고리즘 역시 추정자
    • 평가기
      • 주어진 모델의 성능이 수신자 조작 특성(receiver operating characteristic,ROC)곡선 처럼 지정한 기준에 따라 어떻게 작동하는지 볼 수 있게 해줌
  • 하이퍼 파라미터 : 모델 아키텍처 및 일반화(regularization)와 같은 학습 프로세스에 영향을 주는 설정 매개변수
  • 표준화(standardization) : 평균을 기준으로 관측값들이 얼마나 떨어져 있는지 재표현하는 방법, 수치 데이터 표준화(z-transformation), 카테고리 데이터 표준화(원-핫 인코딩), 서수 데이터 표준화(value - 0.5 / max value)가 있음
  • 정규화(normalization) : 데이터의 범위를바꾸는 방법,모든 값을 0에서 1 사이의 값으로 재표현(min-max scale)하는 과정
  • 일반화(regularization) : 모델 과적합을 방지하기 위한 기법으로 모델의 표현식에 추가적인 제약 조건을 걸어 모델이 필요 이상으로 복잡해지지 않도록 조정해주는 방법, 복잡한 커브를 많이 가지고 있는 가설함수를 가능한 한 부드럽고 단순하게 만들어주는 기법, 리지 회귀, 라쏘, 엘라스틱넷, 최솩ㄱ회귀(Least-Angle regression,LAR) 등이 대표적인 일반화 알고리즘

데이터 전처리 및 피처 엔지니어링

  • 대부분의 분류 및 회귀 알고리즘의 경우 데이터를 Double 타입의 컬럼으로 가져와서 레이블을 표시하고, Vector 타입(밀도가 높거나 희소한)의 컬럼을 사용하여 특징을나타내야 함
  • 변환자는 인수로 DataFrame을 받고, 새로운 DataFrame을 반환하는 함수
  • 변환자
    • 다양한 방식으로 원시 데이터를 변환시키는 함수, 새로운 상호작용 변수를 생성(두 개의 다른 변수로)하거나 컬럼을 정규화하거나 모델에 입력하기 위해 변수를 Double 타입으로 변환시키는 기능을 하며, 주로 데이터 전처리 또는 특징 생성을 위해 사용됨
  • 추정자
    • 수행하려는 변환이 입력 컬럼에 대한 데이터 또는 정보로 초기화되어야 할 때 필요함
  • 주성분 분석(Principal Components Analysis, PCA)
    • 데이터의 가장 중요한 측면(주로 구성 요소)을 찾는 수학적 기법
    • PCA는 데이터의 주요 정보를 포함하는 더 작은 특징 집합을 생성할 수 있다는 강점
    • 대규모 입력 데이터셋에서 총 특징 수를 줄이기 위해 사용됨

분류

  • 활용 사례
    • 신용리스크 예측 : 금융회사는 회사나 개인에게 대출을 제공하기 전에 여러 가지 변수(사용자 정보)를 고려함, 대출을 제공할지 여부는 이진 분류 문제
    • 뉴스기사 분류 : 알고리즘을 학습시켜 뉴스기사의 주제(스포츠,정치,비즈니스 등)를 예측할 수 있음
    • 사용자 행위 분류 : 휴대전화의 가속도계 또는 스마트시계와 같은 센서에서 데이터를 수집하여 사용자의 활동을 예측할 수 있음
  • 램덤 포레스트 : 단순히 많은 트리를 학습시킨 다음 각 트리의 응답을 평균화하여 최종 예측함
  • 그래디언트 부스티드 트리 : 많은 트리를 학습시키지만 랜덤 포레스트와는 다르게 개별 트리를 학습할 때 별도의 가중치가 부여됨
  • 나이브 베이즈
    • 베이즈의 정리에 기반한 분류기의 모음
    • 데이터의 모든 특징이 서로 독립적
    • 문서 내 용어의 존재를 지시변수(indicator varialbe)를 활용하여 나타내는 다변량 베르누이 모델
    • 용어의 총 수를 변수로 활용하는 다항 모델
    • 모든 입력 특징이 음수가 아니어야함

추천

  • 추천을 위한 알고리즘으로 교차최소제곱(Alternating Least Square, ALS)을 제공함
    • 협업 필터링(collaborative filtering) 기술을 활용하여 사용자가 과거에 상호작용한, 즉 구매하거나 관심 있어 했던 아이템을 기반으로 추천함
    • 사용자 또는 아이템에 대한 추가적인 특징이 필요하지 않음
    • 장바구니 분석을 위해 연관 규칙을 찾아내는 빈발 패턴 마이닝(Frequent Pattern Mining)을 제공함