Tag Archives: hadoop

하둡 맵 리듀스 프로그래밍 발표 자료

하둡 MR 프로그래밍 발표자료다. 아마 예전 하둡 스터디 그룹에 참여할 때 썼던 글인 듯 하다. 지금은 큰 효용이 없겠지만 버리기 아까워서 다시 정리해 둔다.

맵리듀스의 등장

하둡 파일 시스템에 저장되어 있는 방대한 데이터를 분석하려면, 분산 환경에서 병렬처리르 지원하는 프레임워크가 필요하다. 맵리듀스(MapReduce)는 신뢰할 수 없는 컴퓨터로 구성된 클러스터 환경에서 페타바이트 이상의 대용량 데이터를 병렬로 처리하기 위한 일종의 함수형 프로그래밍 방법이다.

구글에서 발표한 맵리듀스 논문을 기반으로, 하둡 프로젝트 내에서 맵리듀스 프레임워크를 개발했다.

설치 모드

하둡은 독립 모드, 의사 분산 모드, 완전 분산 모두 총 3가지 모드로 설치 가능하다.

독립 모드(Standalone Mode, Local Mode)

하둡 프로세스가 데몬으로 동작하지 않는다. 모든 프로세스가 단일 JVM 내에서 실행된다. 테스트/디버깅이 편리하다. 독립 모드는 개발자 PC에서 맵리듀스 프로그램을 동작하기에 적합하다.

의사 분산 모드(Pseudo Distributed Mode)

하둡 데몬 프로세스가 로컬 컴퓨터에서 동작한다. 작은 규 모의 클러스터를 시뮬레이션하는 목적으로 사용한다.

완전 분산 모드(Fully Distributed Mode)

실제 운영환경에서 사용하는 모드다.

여기에서는 하둡을 의사 분산 모드로 설치한다.

의사 분산 모드로 하둡 실행하기

설치하기

아파치 하둡 릴리즈 페이지에서 안정화된 배포판을 다운로드 받는다. 여기에서는 hadoop-0.20.2.tar.gz를 다운로드 받았다.

로컬 파일 시스템의 적당한 위치에 압축을 푼다.

이렇게 하둡이 설치된 디렉토리를 HADOOP_INSTALL이라고 부르자.

다음으로, HADOOP_INSTALL/conf/hadoop-env.sh 파일을 열어, JAVA_HOME 변수를 아래와 같이 설정한다.

필수사항은 아니지만 하둡 관련 명령어를 쉽게 실행하기 위해 하둡 설치 디렉토리(HADOOP_INSTALL)를 환경 변수에 설정하고, PATH를 설정한다. 여기에서는 .bashrc 파일에 설정했다.

설정한 환경변수를 반영한다.

아래와 같이 하둡 명령어를 실행해서 제대로 설치죄었는지 확인해 볼 수 있다.

환경 설정하기

하둡은 독립 실행 모드(Standalone Mode), 의사 분산 모드(Pseudo-Distributed Mode), 완전 분산 모드(Fully Distributed Mode)로 실행할 수 있다. 여기에서는 의사 분산 모드로 실행하고자 한다.

하둡은 XML 파일을 사용해서 설정하며, 설정할 파일은 총 3가지다.

  • conf/core-site.xml
    하둡 컴포넌트의 공통적인 속성을 정의한다.
  • conf/hdfs-site.xml
    하둡 파일 시스템과 관련된 설정을 한다.
  • conf/mapred-site.xml
    하둡 맵리듀스와 관련된 설정을 한다.

먼저 core-site.xml부터 설정하자.

fs.default.name에 하둡 파일 시스템의 네임노드의 호스트와 포트를 설정한다. 여기에서는 기본 포트인 8020이 아니라 9000을 포트로 설정했다.

다음으로 hdfs-site.xml을 설정한다.

의사 분산 모드이므로 복사본을 1개를 만든다. 따라서 dfs.replication를 1로 설정한다.

마지막으로 mapred-site.xml을 설정한다.

SSH 설정하기

의사 분산 모드/완전 분산 모드에서는 모두 데몬 프로세스를 실행하므로, 원격 서버의 명령을 수행할 수 있도록 ssh를 설정해야 한다. ssh가 설치되지 않은 경우 머저 ssh를 설치하도록 한다. ssh 명령어가 실행가능하더라도 ssh 데몬이 실행되어 있지 않으면 SSH을 설정할 수 없으므로 아래와 같이 ssh를 다운받아서 설치한다.

아래와 같은 명령어를 실행행서 성공한다면 제대로 설정한 것이다.

아래와 같이 22번 포트에 접속할 수 없다는 에러메시지가 나는 경우

를 실행한다. 만약 아무런 결과도 나오지 않는다면 , sshd이 설치되지 않았거나, 실행중이 아닌 경우다.

실행하기

하둡 파일 시스템을 사용하려면 먼저 네임 노드를 포맷해야 한다. 아래의 명령어를 실행한다.

하둡 파일 시스템과 맵리듀스 데몬을 아래와 같이 구동한다.

구동한 데몬을 중지하려면 아래의 명령어를 실행한다.

데몬이 제대로 실행되었는지 확인하는 방법에는 여러가지가 있다. 그 중에서도 웹 UI를 통해 확인할 수도 있다. 네임노드는 http://localhost:50070/에서, 잡트래커는 http://localhost;50030/에서 확인할 수 있다.

mr_web_gui

맵리듀스 프로그래밍 모델

맵리듀스는 대용량 데이터를 병렬로 처리하기 위한 소프트웨어 프로그램이며, 많은 저가의 장비로 구성된 클러스터에서 페타바이트 이상의 데이터를 처리하는 프로그래밍 모델이기도 하다.

  • 맵 함수의 입력으로, 키 k1과 값 v1이 전달된다
  • 맵 함수의 출력으로, 새로운 키 k2와 값 v2의 리스트를 생성한다.
  • 맵 함수가 반복적으로 수행되며, 그에 따라 여러개의 출력 데이터가 생성된다.
  • 맵 함수의 출력 데이터를 키를 기준으로 정렬하여, 각 키에 대해 여러 개의 값 데이터가 생성된다.
  • 리듀스의 입력으로, 키 k2와 값 데이터 v2의 리스트가 전달된다.
  • 리듀스는 출력으로, 값 v3의 리스트를 출력한다.

사례: WordCount

문서의 단어별 개수를 세는 프로그램은 아래와 같은 프로그래밍 모델을 따른다.

mr_wordcount_process

맵리듀스 프레임워크의 역할

위의 WordCount 사례에서 나타나듯이 이러한 유형의 프로그램은 아래와 같은 공통적인 기능을 포함한다.

  • 입력 파일을 라인 단위로 분할해서 맵 함수에 전달
  • 맵 함수를 분산된 서버에서 수행
  • 맵 함수의 출력 결과를 정렬/병합
  • 분산 처리된 맵 결과를 리듀스가 수행될 서버로 전송
  • 정렬/병합된 결과를 리듀스 함수에 전달

맵리듀스 프레임워크는 이러한 공통적인 기능을 전담하며, 개발자는 맵 함수와 리듀스 함수 자체, 즉 비지니스 로직 구현에만 집중할 수 있다.

맵리듀스 시스템 구성

하나의 맵리듀스 클러스터는 한개의 잡 트래커(Job Tracker)와 여러개의 태스크 트래커(Task Tracker)로 구성된다.

mr_system_architecture

  • 잡 트래커(Job Tracker)
    전체 작업을 관장하는 기능을 수행하는 마스터 서버다.
    하둡 파일 시스템의 네임 노드 서버에서 실행한다.
  • 태스크 트래커(Task Tracker)
    사용자가 요청한 작업을 실행하는 서버다.
    하둡 파일 시스템의 데이터 노드 서버에서 실행한다.
  • 클라이언트 라이브러리
    개발자가 맵리듀스 클러서트에서 동작하는 비즈니스 로직을 구현할 수 있도록 제공된 개발자 API다.
    다양한 입력 형태와 분산 처리를 사용할 수 있는 메커니즘을 제공한다.
    개발자가 구현한 프로그램을 잡 트래커가 실행하도록 요청하거나, 작업 결과를 모니터링할 수 있는 API 또한 제공한다.

맵리듀스 프로그램 실행 절차

개발자가 구현한 프로그램을 잡 트래커에 제출하면

  • 작업의 분리 정보와(Job.split)와 사용자가 추가한 파라미터 정보는 자동으로 하둡 파일 시스템에 저장된다.
  • 작업을 수행할 프로그램 자체(Job.jar)도 하둡 파일 시스템에 저장된다.
  • 하둡 파일 시스템에 저장된 입력 포맷 정보를 읽어 작업의 분산 처리를 구성한다.
  • 잡 트래커는 태스크 트래커별로 처리할 작업 목록을 구성한다.
  • 태스크 트래커는 주기적으로 hearbeat을 잡 트래커로 전송한다. 잡 트래커는 반환값으로 처리할 작업의 아이디를 반환한다.
  • 작업 아이디를 받은 태스크 트래커는 해당 작업과 관련된 정보를 하둡 파일 시스템에서 로컬로 다운로드한다.
  • 태스크 트래커는 fork 명령어를 수행하여, 맵 또는 리듀스 태스크를 실행한다. 하나의 태스크 트래커에서 동시에 여러개의 맵/리듀스 태스크를 수행할 수 있다.
  • 태스크 트래커에 장애게 발생하거나, 새로운 태스크 트래커가 추가되면 잡 트래커가 자동으로 인식하여 클러스터에 삭제/추가 작업을 수행한다.

맵리듀스 처리 흐름

맵리듀스 내부의 데이터 처리흐름을 아래와 같다.

mr_mapreduce_process

그림. 맵리듀스 데이터 처리 흐름 (출처: MapReduce Internals)

  • 하둡 파일 시스템에 저장된 데이털르 입력으로 받는다.
    하둡 파일 시스템에서 파일은 자동으로 블록 단위(64MB)로 분할 된 후, 저장된다. 입력 데이터를 기본적으로 하나의 블록을 대상으로 한다.
  • 각 입력 블록은 레코드 단위로 맵 함수에 전달된다.
    입력을 읽어서 맵 함수에 전달하는 역할은 InputFormat이 담당한다. 하둡에서는 기본적으로 파일은 한 라인을 하나의 레코드로 본다.
  • 각 레코드에 대해 맵 함수를 실행하면 키와 값이 출력 된다.
  • 키 값을 사용해서 파티셔녀(Partitioner)가 맵 함수의 출력 값을 전달할 리듀서를 결정한다.
    리듀서 개수는 작업을 실행할 때 지정하므로, 작업일 실행되는 시점에는 리듀서의 개수는 고정적이다. 따라서 파니셔너는 리듀서 개수와 적절한 파티셔닝 전략을 사용하여 맵 함수의 출력 키를 처리할 리듀스를 결정한 후, 맵 함수의 출력 결괄르 로컬 디스크에 저장한다. 디폴크 파티셔닝 전략은 해시(hash) 파티셔닝으로, “hash(키) % 리듀서 개수”다.
  • 파티셔닝이 완료되면, 각 맵 태스크에서는 “맵태스크ID-리듀서태스크ID”라는 이름의 파일이 생성된다.
    결국 하나의 맵태스크에서는 리듀서 개수만큼의 임시파일이 로컬에 생성된다.
  • 맵 태스크에서 로컬 임시 파일이 생성되면, 리듀서 태스크는 각 태스크 트래커로 맵 태스크의 임시 파일을 요청한다.
  • 리듀서 태스크는 Http 프로토콜로 맵 태스크의 임시 파일을 리듀서 로컬로 다운로드한다.
  • 모든 맵 태스크가 완료되면, 리듀서 태스크에는 맵 태스크의 개수만큼의 파일을 다운로드한 상태다
  • 리듀서 태스크는 로컬로 복사한 파일을 병합하면서 키를 기준으로 정렬한다.
  • 이렇게 병합/정렬이 완료된 데이터가 리듀스 함수로 전달된다.
  • 리듀스 함수가 처리한 결과를 다시 하둡 파일 시스템에 저장한다.

지역성(locality) 보장

맵리듀스는 데이터 지역성(locality)를 보장한다. 쉽게 말해 태스크를 할당할 때 각 데이터 노드에 위치한 블록을 로컬 태스크에서 처리할 수 있도록 최적화하여, 데이터의 이동을 최소화한다. 데이터 지역성은 잡 트래커가 태스크 트래커에 작업을 할당하는 기준이다.

  • 로컬 파일 시스템에 해당 블록을 갖고 있는 데이터 노드의 태스크 트래커에게 작업을 할당한다
  • 따라서 맵 태스크가 하둡 파일 시스템에서 입력 파일을 읽을 때, 분산된 다른 노드가 아닌 로컬 디스크에서 읽게 된다.

맵리듀스 클래스 다이어그램: WordCount 사례

WordCount 사례에서 본 맵리듀스 프로그램의 클래스 다이어그램은 아래와 같다.

mr_wordcount_class_diagram_jobsubmit

  • Job이 submit()된다.
  • JobClient가 LocalJobRunner를 생성후 잡을 제출한다.
  • LocalJobRunner는 새로운 Job 쓰레드를 생성후 실행한다.
  • Job 쓰레드가 MapTask와 ReduceTask를 실행한다.

 

mr_wordcount_class_diagram_task

  • MapTask는 Mapper객체를 생성후 실행한다.
  • Mapper에서는 setup()을 실행 후, 입력이 다 소모될 떄까지 map()함수를 처리한다. 처리가 끝나면 cleanUp()이 실행된다.
  • Reducer에서는 setup()을 실행 후, 입력이 다 소모될 떄까지 reduce()함수를 처리한다. 처리가 끝나면 cleanUp()이 실행된다.

맵리듀스 프로그래밍 요소

InputFormat

입력 데이트를 읽어 맵 태스크의 map() 함수로 전달한다.

  • 하나의 작업이 몇 개의 맵 태스크로 분리해야 할지를 결정한다.
    이는 getSplits() 메서드에서 반환되는 InputSplit 목록의 개수로 결정된다. 기본적으로 getSplits() 메서드는 하둡 파일 시스템의 파일에 대해 파일의 블록 개수만큼의 InputSplit 객체를 반환한다.
  • 입력 블록을 읽어서 map() 함수로 레코드를 전달한다.
    이는 RecordReader가 수행하며, createRecordReader() 메서드에서 RecordReader 객체를 생성한다.

Mapper

RecordReader가 전달한 레코드를 처리한다.

Combiner

각 맵의 수행 결과로 생성된 임시 로컬 데이터를 맵 태스크 내에서 리듀스 작업을 수행한다.

맵리듀스 과정에서 속도가 가장 오래 걸리는 부분은 맵 결과가 리듀서로 복사하여 정렬하는 과정이다. Combiner를 사용하면 맵의 출력 결과를 로컬에서 리듀스하므로, 리듀서로 전달되는 데이터 양을 줄일 수 있다. 일반적으로 Combiner는 리듀서를 그대로 사용한다.

Partitioner

맵의 출력 데이터의 키를 기준으로, 어느 리듀서로 보낼지 결정한다.

OutputFormat

리듀서의 출력 데이터를 저장한다.

리듀서의 출력 데이터는 RecordWriter가 담당하며, getRecordWriter() 메서드에서 RecordWriter 객체를 생성한다.

맵리듀스 프로그래밍 절차

맵리듀스 프로그램을 만드는 순서는 다음과 같다.

  • 데이터의 흐름 설계
  • InputFormat / OutputFormat 구현
  • Mapper 구현
  • Partitioner 구현
  • Driver 구현

공통 데이터 배포

분산된 서버에서 작업을 실행할 때

  • 공통 데이터(코드 데이터), 환경 설정 정보의 공유
  • 외부 라이브러리 배포

등 공통 데이터/라이브러리 배포가 필수적이다.

하둡에서는 이를 위해 2가지 방법을 제공한다.

Configuration 객체를 이용

Configuration 객체는 주로 환결 설정 정보를 공유하기 위한 목적으로 활용한다.

mr_configuration

DistributedCache를 이용

전달하려는 데이터의 크기가 큰 경우, Configuration 객체를 이용하는 방식은 비효율적이다. 이 경우 DistributedCache를 사용한다.

예를 들어 실행하려는 맵리듀스 Job에서 hbase.jar에 의존하는 경우, hbase.jar를 맵리듀스 job의 classpath에 등록하는 방법에는 3가지가 있다.

  • $HADOOP_HOME/lib에 hbase.jar를 추가한다.
    맵리듀스 job을 실행하는 명령어인 bin/hadoop -jar의 경우, 기본적으로 $HADOOP_HOME/lib에 있는 라이브러리 모두를 classpath에 등록한다. 따라서 $HADOOP_HOME/lib에 필요한 라이브러릴 추가하면, 맵리듀스 job에서 런타임에 사용할 수 있다.
    다만, 라이브러리가 새로 추가된 경우, 하둡 클러스터의 모둔 노드에 라이브러리를 배포한 후 맵리듀스 데몬을 재기동해야 한다.
  • -libjars 옵션
    bin/hadoop -jar 명령어로 맵리듀스 Job을 실행할 때, -libjars 옵션을 사용해서 필요한 라이브러리를 설정한다.

    -libjars에 추가된 라이브러리는 자동으로 하둡 파일 시스템에 저장되며, 각 태스크 노드로 다운된다.
    단 실행명령어가 복잡해진다.
  • DistributedCache
    필요한 외부 라이브러리를 하둡 파일 시스템에 미리 등록해 둔다. 그리고 드라이버 클래스에서 필요한 라이브러리를 DistributedCache 클래스의 메서드를 사용해서 등록한다.
    mr_distributed_cache