Spark의 저수준 API의 종류로는 RDD, 분산형 공유 변수(어큐뮬레이터, 브로드캐스트 변수)가 있다.
이는 모두 분산형 공유변수로 뒤에서 조금 더 자세히 알아보자.
대부분의 상황에서는 구조적 API를 사용하는 것이 좋으나 물리적 데이터의 배치를 아주 세밀하게 제어하는 경우나 RDD를 사용해 개발된 기존 코드를 유지해야 하는 경우 등 비즈니스나 기술적 문제를 처리하지 못하는 경우에는 저수준 API를 사용하여야 한다.
때문에 이번 포스팅에서는 저수준 API에 대해 알아보자.
먼저, 저수준 API를 사용하기 위해서는 SparkContext를 사용해야한다.
SparkContext는 Sparksession을 이용해 접근할 수 있는데, 자세한 건 뒤에서 설명하도록 하겠다.
[RDD(Resilient Distributed Dataset)]
RDD(Resilient Distributed Dataset)는 대부분 DataFrame API에서 최적화된 물리적 실행 계획을 만드는 데 사용된다.
사용자는 '제네릭RDD'와 '키-값 RDD'의 두 가지 형태의 RDD를 만들 수 있다.
RDD의 중요한 속성으로는 다음과 같다.
- 파티션의 목록
- 각 조각을 연산하는 함수
- 다른 RDD와의 의존성 목록
- 부가적으로 키-값 RDD를 위한 Partitioner
- 부가적으로 각 조각을 연산하기 위한 기본 위치 목록
이러한 속성이 사용자 프로그램을 스케줄링하고 실행하는 스파크의 모든 처리 방식을 결정한다.
또한 RDD역시 분산 환경에서 데이터를 다루는 데 필요한 트랜스포메이션과 액션 연산을 제공하며, DataFrame, Dataset과 동일한 방식으로 동작한다.
DataFrame과 Dataset과는 다른점은 '로우'라는 개념이 없다는 것이다. 또한 Dataset은 구조적 API가 제공하는 풍부한 기능과 최적화 기법을 제공하지만 RDD는 직접 함수와 로직을 구현해야한다는 점이 가장 큰 차이점이다.
즉, RDD의 레코드는 JAVA, Scala, Python의 객체일 뿐이며 구조적 API에서 제공하는 여러 함수를 사용하지 못하기 때문에 수동으로 처리해야 한다.
그럼 RDD를 사용하는 방법들에 대해 알아보자.
▶ RDD생성
RDD를 생성하는 방법은 총 3가지 방법이다. 각각의 방법에 대해 살펴보자.
1. DataFrame, Dataset으로 RDD 생성
RDD생성 방법 중 가장 쉬운 방법이며 기존에 사용하던 DataFrame이나 Dataset을 이용하는 방법이다.
기존 Dataset[T] 의 rdd메서드를 호출하면 쉽게 RDD로 변환할 수 있으며 이는 데이터 타입 T를 가진 RDD를 얻을 수 있다.
# Dataset[Long]을 RDD[Long]으로 변환
spark.range(500).rdd
다음 코드는 Dataset을 RDD로 변환하는 코드이다.
파이썬에는 DataFrame만 존재하기 때문에 Dataset을 사용할 수 없다.
즉, Row타입의 RDD를 얻게 된다.
# 파이썬 코드
spark.range(10).rdd
위 예제에서 만들어진 데이터를 처리하려면 Row객체를 올바른 데이터 타입으로 변환하거나 Row객체에서 값을 추출해야한다.
# 스칼라 코드
spark.range(10).toDF().rdd.map(rowObject => rowObject.getLong(0))
# 파이썬 코드
spark.range(10).toDF("id").rdd.map(lambda row: row[0])
위의 코드를 통해 Row타입을 가진 RDD가 반환된다.
RDD를 통해 DataFrame이나 Dataset을 생성할 때도 동일한 방법을 사용한다.
RDD의 toDF 메서드를 호출하기만 하면 된다.
# 스칼라 코드
spark.range(10).rdd.toDF()
# 파이썬 코드
spark.range(10).rdd.toDF()
rdd메서드는 Row 타입을 가진 RDD를 생성하며 Row타입은 스파크가 구조적 API에서 데이터를 표현하는 데 사용하는 내부 카탈리스트 포맷이다.
이러한 기능을 사용하면 상황에 따라 구조적 API와 저수준 API사이를 오고가게 만들 수 있다.
2. 로컬 컬렉션으로 RDD 생성
컬렉션 객체를 RDD로 만들려면 SparkSession안에 있는 sparkContext의 parallelize메서드를 호출해야 한다.
이 메서드는 단일 노드에 있는 컬렉션을 병렬 컬렉션으로 전환한다.
또한 파티션 수를 지정할 수 있다.
# 스칼라 코드
val myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
val words = spark.sparkContext.parallelize(myCollection, 2)
# 파이썬 코드
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
words = spark.sparkContext.parallelize(myCollection, 2)
위의 코드는 두 개의 파티션을 가진 병렬 컬렉션 객체를 만드는 예제이다.
후에 RDD에 이름을 지정하면 스파크 UI에 지정한 이름으로 RDD가 표시된다.
# 스칼라 코드
words.setName("myWords")
words.name
# 파이썬 코드
words.setName("myWords")
words.name()
3. 데이터소스로 RDD 생성
sparkContext를 사용해 데이터를 RDD로 읽을 수 있다.
spark.sparkContext.textFile("/some/path/withTextFiles")
위의 코드는 여러 텍스트 파일의 각 줄을 레코드로 가진 RDD를 생성하는 예제이다.
spark.sparkContext.wholeTextFiles("/some/path/withTextFiles")
이 코드는 개별 레코드로 처리하는 예제이다.
생성된 RDD에서 파일명은 첫 번째 객체인 RDD의 키가 되며, 텍스트 파일의 값은 두 번째 문자열 객체인 RDD값이 된다.
▶ RDD 다루기
RDD를 다루는 방법은 DataFrame과 매우 유사하다.
또한 키-값 형태로 다룰 수도 있으며 사용자가 직접 파티셔닝이 가능하다.
RDD를 다루는 함수는 대표적으로 map, filter, join 등이 있다.
RDD를 다루는 예제는 굉장히 많기 때문에 넘어가고, 중요한 사용자 정의 파티셔닝에 대해 살펴보자.
사용자 정의 파티셔닝(custom partitioning)은 RDD를 사용하는 가장 큰 이유 중 하나다.
논리적인 대응책을 가지고 있지 않으므로 구조적 API에서는 사용자 정의 파티셔너를 파라미터로 사용할 수 없다.
사용자 정의 파티셔닝의 유일한 목표는 데이터 치우침(skew)같은 문제를 피하고자 클러스터 전체에 걸쳐 데이터를 균등하게 분배하는 것이다.
사용자 정의 파티셔너를 사용하려면 구조적 API로 RDD를 얻고 사용자 정의 파티셔너를 적용한 다음 다시 DataFrame이나 Dataset으로 변환해야 한다.
이 방법은 필요시에만 사용자 정의 파티셔닝을 사용할 수 있으므로 구조적 API와 RDD의 장점을 모두 활용할 수 있다.
먼저 사용자 정의 파티셔닝을 사용하려면 Partitioner를 확장한 클래스를 구현해야 한다.
문제에 대한 업무 지식을 충분히 가지고 있는 경우에만 사용하도록 하며, 단일 값이나 다수 값을 파티셔닝해야 한다면 DataFrame API를 사용하는 것이 좋다.
val df = spark.read.option("header", "true").option("inferSchema","true").csv("/data/retail-data/all/")
val rdd = df.coalesce(10).rdd
다음과 같이 rdd를 구성한 후 예제를 실행한다.
import org.apache.spark.HashPartitioner
rdd.map(r => r(6)).take(5).foreach(println)
# 결과
17850
17850
17850
17850
17850
val keyedRDD = rdd.keyBy(row => row(6).asInstanceOf[Int].toDouble)
keyedRDD.partitionBy(new HashPartitioner(10)).take(10)
HashParitioner와 RangePartitioner는 RDD API에서 사용할 수 있는 내장형 파티셔너이다.
각각 이산형과 연속형을 다룰 때 사용하며 구조적 API와 RDD 모두 사용이 가능하다.
HashParitioner와 RangePartitioner는 유용하지만 매우 기초적인 기능을 제공한다.
때문에 매우 큰 데이터나 심각하게 치우친 키를 다뤄야 한다면 고급 파티셔닝 기능을 사용해야 한다.
이러한 현상을 키 치우침이라고 하며 이는 어떤 키가 다른 키에 비해 아주 많은 데이터를 가지는 현상을 의미한다.
이러한 경우 병렬성을 개선하고 실행 과정에서 OutOfMemoryError를 방지할 수있도록 키를 최대한 분할해야 한다.
import org.apache.spark.Partitioner
class DomainPartitioner extends Partitioner {
def numPartitions = 3
def getPartition(key: Any): Int = {
val customerId = key.asInstanceOf[Double].toInt
if (customerId == 17850.0 || customerId == 12583.0) {
return 0
} else {
return new java.util.Random().nextInt(2) + 1
}
}
}
keyedRDD
.partitionBy(new DomainPartitioner).map(_._1).glom().map(_.toSet.toSeq.length)
.take(5)
위의 코드를 실행하면 각 파티션 수를 확인할 수 있으며 데이터를 임의로 분산하였으므로 마지막 두 숫자가 크게 다를 수 있다.
하지만 동일한 원칙이 적용된다.
사용자 정의 키 분배 로직은 RDD수준에서만 사용할 수 있으며 사용자 정의 키 분배 로직은 임의로 로직을 사용해 물리적인 방식으로 클러스터에 데이터를 분배하는 강력한 방법이다.
마지막으로 병렬화 대상인 모든 객체나 함수는 직렬화 할 수 있어야 한다.
그러므로 직렬화에 대해 살펴보자.
class SomeClass extends Serializable {
var someValue = 0
def setSomeValue(i:Int) = {
someValue = i
this
}
}
sc.parallelize(1 to 10).map(num => new SomeClass().setSomeValue(num))
위의 기본 직렬화 기능은 매우 느릴 수 있으며 Kryo라이브러리를 사용해 약 10배 이상 성능이 좋게 사용이 가능하다.
이는 뒤의 포스팅에서 더 자세히 알아보겠다.
[분산형 공유 변수]
스파크의 두번째 저수준 API인 분산형 공유 변수에 대해 알아보자.
분산형 공유 변수에는 브로드캐스트 변수와 어큐뮬레이터라는 두 개의 타입이 존재한다.
이는 클러스터에서 실행할 때 특별한 속성을 가진 사용자 정의 함수에서 사용이 가능하다.
특히 어큐뮬레이터를 사용하면 모든 태스크의 데이터를 공유 결과에 추가할 수 있다.
반면 브로드캐스트 변수를 사용하면 모든 워커 노드에 큰 값을 저장하므로 재전송 없이 많은 스파크 액션에서 재사용할 수 있다.
이번 포스팅에서 분산형 공유 변수 타입이 만들어지게 된 계기와 사용 방법에 대해 알아보자.
1. 브로드캐스트 변수
브로드캐스트 변수는 변하지 않는 값(불변성 값)을 클로저(closure)함수의 변수로 캡슐화하지 않고 클러스터에서 효율적으로 공유하는 방법을 제공한다.
태스크에서 드라이버 노드의 변수를 사용할 때는 클로저 함수 내부에서 단순하게 참조하는 방법을 사용한다.
하지만 이 방법은 비효율적인데, 룩업 테이블이나 머신러닝 모델 같은 큰 변수를 사용하는 경우에는 더 그렇다.
그 이유는 클로저 함수에서 변수를 사용할 때 워커 노드에서 여러 번(태스크당 한 번) 역직렬화가 일어나기 때문이다.
게다가 여러 스파크 액션과 잡에서 동일한 변수를 사용하면 잡을 실행할 때마다 워커로 큰 변수를 재전송한다.
이런 상황에서는 브로드캐스트 변수를 사용해야 한다.
브로드캐스트 변수는 모든 태스크마다 직렬화하지 않고 클러스터의 모든머신에 캐시하는 불변성 공유 변수이다.
다음 그림처럼 익스큐터 메모리 크기에 맞는 조회용 테이블을 전달하고 함수에서 사용하는 것이 대표적인 예이다.
다음 예제를 통해 브로드캐스트 사용 방법을 알아보자.
val myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
val words = spark.sparkContext.parallelize(myCollection, 2)
val supplementalData = Map("Spark" -> 1000, "Definitive" -> 200, "Big" -> -300, "Simple" -> 100)
다음과 같이 단어나 값의 목록을 가지고 있으며 이 구조체를 suppBroadcast변수를 이용해 브로드캐스트 참초한다.
val suppBroadcast = spark.sparkContext.broadcast(supplementalData)
words.map(word =>( word, suppBroadcast.value.getOrElse(word, 0))).sortBy(wordPair => wordPair._2).collect()
suppBroadcast변수의 value 메서드를 사용해 브로드캐스트된 supplementalData값을 참조할 수 있다.
value메서드는 직렬화된 함수에서 브로드캐스트된 데이터를 직렬화하지 않아도 접근이 가능하다.
스파크는 브로드캐스트 기능을 이용해 데이터를 보다 효율적으로 전송하므로 직렬화와 역직렬화에 대한 부하를 크게 줄일 수 있다.
위의 데이터를 맵 연산으로 키-값 쌍 데이터를 생성하고(값이 비어있으면 0으로 치환) RDD로 변환한다.
이처럼 브로드캐스트 변수를 사용하면 클로저에 담아 전달하는 방식보다 훨씬 더 효율적으로 사용이 가능하다.
이는 데이터의 총량과 익스큐터의 수에 따라 다를 수 있으며 아주 작은 데이터를 작은 클러스터에서 돌린다면 차이가 없을 수 있다.
하지만 훨씬 큰 크기의 데이터를 사용하는 경우라면 데이터를 직렬화하는데 매우 큰 부하가 발생하기 때문에 브로드캐스트 변수를 사용하는 것이 좋다.
그리고 UDF나 Dataset에서도 사용할 수 있으며 동일한 효과를 얻을 수 있다.
2. 어큐뮬레이터
어큐뮬레이터는 스파크의 두 번째 공유 변수 타입으로 트랜스포메이션 내부의 다양한 값을 갱신하는 데에 사용된다.
그리고 내고장성을 보장하면서 효율적인 방식으로 드라이버에 값을 전달할 수 있다.
어큐뮬레이터는 스파크 클러스터에서 로우 단위로 안전하게 값을 갱신할 수 있는 변경 가능한 변수를 제공하며 디버깅용이나 저수준 집계 생성용으로 사용이 가능하다.
또한 결합성과 가환성을 가진 연산을 통해서만 더할 수 있는 변수이므로 병렬 처리 과정에서 효율적으로 사용할 수 있다.
때문에 카운터나 합계를 구하는 용도로 사용할 수 있으며 스파크는 기본적으로 수치형 어큐뮬레이터를 지원한다.
(사용자 정의 어큐뮬레이터를 만들어 사용할 수도 있다.)
어큐뮬레이터의 값은 액션을 처리하는 과정에서만 갱신되며, 스파크는 태스크당 한 번만 갱신하도록 제어한다.
따라서 재시작한 태스크는 갱신이 불가능하고 트랜스포메이션에서 재처리하는 경우 갱신 작업이 두 번 이상 적용될 수 있다.
어큐뮬레이터는 스파크의 지연 연산 모델에 영향을 주지 않는다.
실제로 수행된 시점, 즉 액션을 실행하는 시점에 딱 한번만 값을 갱신한다.
따라서 map 함수 같은 지연 처리 형태의 트랜스포메이션에서 어큐뮬레이터 갱신 작업을 수행하는 경우 실제 실행 전까지는 어큐뮬레이터가 갱신되지 않는다.
어큐뮬레이터의 이름은 선택적으로 지정 가능하며, 실행 결과는 스파크 UI에 표시된다.
만약 이름을 지정하지 않았을 경우에는 스파크 UI에 표시되지 않으니 주의하자.
'SEMINAR > 스파크 완벽 가이드' 카테고리의 다른 글
1-2. 스파크 간단히 살펴보기 (0) | 2021.07.10 |
---|---|
1-1. Apache Spark란 (0) | 2021.07.07 |