Amazon EC2에서 whirr을 이용한 Hadoop 클러스터 구동 방법

최근 연구내용 검증을 위해 Amazon EC2에서 Hadoop 클러스터를 구축하여 실험을 수행 하는 중입니다. 그런데 Hadoop 클러스터를 EC2에 구축하는데 있어 Amazon EC2 환경에 대한 이해 부족과 자료의 부족으로 직접 부딪혀서 해결해야 하는 부분들이 꽤 있었습니다. 저는 이 포스팅을 통해 제가 시도했던 방법을 소개하고 제 경험을 공유하고자 합니다.

우선 이 글을 읽는 분들은 Amazon EC2 계정이 있고 AMI, Instance, EC2 Key Pair에 대해 알고 계시다고 전제하겠습니다.

Amazon EC2에서 Hadoop 클러스터 구동

현 시점에서 Amazon EC2 환경에 Hadoop 클러스터를 구동 방법은 선택의 폭이 그리 넓지 않습니다.

  1. Hadoop이 이미 설치된 이미지를 사용하고 수동 설정하는 방법
  2. EBS 기반 AMI에 하둡 설치 및 복사 그리고 수동 설정
  3. whirr을 사용하는 방법
  4. whirr의 hadoop-ec2를 사용하는 방법

이 포스팅에서는 3번인 whirr을 이용한 구축방법을 설명합니다. 그런데  이 방법은 정말 간단하지만 한 가지 제약을 가지고 있습니다. 이 방법은 기본적으로 instance store 기반의 AMI만 활용 할 수 있습니다. 따라서 Hadoop 클러스터의 HDFS는 instance store에 기반을 두게 되며 클러스터 종료 시 HDFS의 모든 데이터가 제거됩니다 (Amazon EC2의 모든 인스턴스는 영속적인 데이터 저장을 위해 EBS나 S3와 같은 별도의 저장 서비스를 사용해야 합니다.)

저의 경우 처음에 1,2번 방법을 모두 시도했었습니다. 그러나  다음과 같은 문제점이 있었습니다.

  • 최신 Hadoop 배포본(0.20 이상)이 설치된 AMI의 부재
  • 수십 여개의 인스턴스의 시동(launch)의 자동화
  • 매번 새로 할당 받는 IP 주소와 이에 따른 Hadoop 설정과 설정 배포의 어려움

조사해보니 인스턴스를 시동을 자동화하고 시동된 인스턴스의 IP 목록을 얻어 설정 배포까지 원활히 하기 위해서는 Amazon AWS API를 이용하거나 boto (Python interface to Amazon Web Services)jcloud (multi-cloud library) 와 같은 third-party 라이브러리를 이용해 개발을 해야합니다. 그러나 이는 많은 시간을 요구합니다. EBS 기반 AMI에 Hadoop을 직접 설치하는 사용하는 방법 역시 비슷한 이유로 포기했습니다.

위에 4번 방법인 hadoop-ec2는 원래 Hadoop의 contrib 에 속했던 프로그램으로 현재는 whirr에서 진행되고 있지만 지속적으로 유지보수가 되지 않는 것으로 보여 시도하지 않았습니다. whirr의 Change Log를 봐도 4번에 대한 내용은 찾기 어려웠습니다.

현재로써는 whirr이 가장 편리한 방법이라 여겨 집니다.

whirr?

whirr는 Apache Incubator에 속한 프로젝트로 Amazon EC2와 같은 상용 클라우드 환경에서 원하는 서비스에 대한 설치, 설정, 실행을 자동으로 수행하는 라이브러리입니다. 현재 제공하는 서비스로는 Apache Hadoop, Cassandra,  Cloudera’s Distribution for Hadoop (CDH), Zookeeper가 있으며 조만간 릴리즈 될 0.4-incubating 버전에 Hbase가 추가될 예정이라고 합니다.

동작의 개요

whirr의 사용방법을 설명하기에 앞서 전체적인 동작에 대해 개략적으로 설명을 드리겠습니다.

사용자가 ‘cluster-lunch’ 커맨드를 주면 whirr은 instance store 기반의 AMI를 이용해 다수의 인스턴스를 가동하고 모든 인스턴스들에 JDK 및 Hadoop의 설치와 설정을 일괄적으로 수행합니다. 이 과정이 끝나면 EC2 내부에서 Hadoop 클러스터가 동작하고 있게 됩니다.

그리고 사용자가 로컬 머쉰에 설치한 Hadoop 프로그램 통해 EC2에서 구동되는 Hadoop 클러스터를 제어하게 됩니다. 그런데 EC2 내부의 인스턴스들은 기본적으로 private IP만을 할당 받아 외부에서 접근할 수가 없고 기본적으로 방화벽 설정이 까다롭게 되어 있기 때문에 추가적인 설정 없이 Hadoop RPC나 웹 UI를 통한 접근이 불가능 합니다. 따라서 whirr이 제공하는 proxy 프로그램을 실행하고 난 뒤에 로컬 머쉰에 설치된 Hadoop 프로그램을 이용하여 EC2 내부의 클러스터를 제어하게 됩니다.

Hadoop 클러스터 구동

계정 생성

whirr을 통해 생성하는 Hadoop 클러스터는 linux에서 hadoop이라는 username의해 시동됩니다. Hadoop은 클러스터를 구동한 계정과 같은 계정으로 접근할 때 superuser 권한을 가집니다. 따라서 로컬에 hadoop이라는 계정을 생성하여 아래 작업을 수행해야 Amazon EC2 내부에서 동작하는 Hadoop 클러스터에 대한 superuser권한을 행사할 수 있습니다. 하지만 단순히 MapReduce 프로그램만 실행 시킨다면 아무 계정에서 작업해도 문제 없습니다.

로컬 머쉰에 Hadoop과 whirr의 설치 그리고 Hadoop Version 문제

Hadoop과 whirr은 http://archive.apache.org/dist/ 에서 다운 받아 로컬 머쉰에 설치합니다.  그런데 현 시점에서는 ‘어떤 Hadoop 버전을 설치해야 하는가’가 문제가됩니다. Hadoop은 현재 한참 빠르게 개발되고 있으며 다른 버전간 내부 프로토콜이 호환되지 않습니다.

따라서 whirr이 자동으로 설치해주는 Hadoop 클러스터와 로컬 머쉰의 Hadoop은 같은 버전으로 맞춰야 합니다. 버전을 바꾸는 것은 whirr FAQ에 아래와 같이 설명되어 있는 것 처럼 직접 install, configuration 스크립트를 수정해야 합니다.

How do I specify the service version and other service properties?

Currently the only way to do this is to modify the scripts to install a particular version of the service, or to change the service properties from the defaults.

See “How to modify the instance installation and configuration scripts” above for details on how to do this.

from http://incubator.apache.org/whirr/faq.html

whirr은 Apache Hadoop 배포본외에도 Cloudera의 Hadoop 배포본을 설치할 수 있습니다. 이는 아래 ‘whirr 설정 파일’에서 whirr.hadoop-install-runurl과 whirr.hadoop-configure-runurl에 대한 내용을 참고하시면 됩니다.

whirr 설정 파일

whirr을 이용한 클러스터의 구동은 클러스터에 대한 설정 파일을 만드는 것으로 시작합니다. 이 포스팅에서는 아래 cluster.properties 파일의 내용을 설명하고 이후 내용도 이 설정을 기준으로 설명하도록 하겠습니다.

(아래 내용은 최신인 0.3-incubating 버전에 대한 내용입니다. 0.4-incubating 버전이 릴리즈 되면 설정 방법이 변경될 예정입니다. 릴리즈 되고 나면 포스팅을 업데이트 하도록 하겠습니다.)

whirr.cluster-name=mycluster
whirr.instance-templates=1 jt+nn,16 dn+tt
whirr.provider=ec2
whirr.identity=ACCESS_KEY
whirr.credential=SECRET_KEY
whirr.private-key-file=${sys:user.home}/.ssh/id_rsa
whirr.public-key-file=${sys:user.home}/.ssh/id_rsa.pub
whirr.location-id=us-east-1d
whirr.hardware-id=m1.small
whirr.service-name=hadoop
#whirr.hadoop-install-runurl=cloudera/cdh/install
#whirr.hadoop-configure-runurl=cloudera/cdh/post-configure

각 항목에 대한 설명은 다음과 같습니다.

  • whirr.cluster-name : 구동할 클러스터를 식별하는 이름입니다. 클러스터를 구동하면 ${HOME}/.whirr/<cluster-name> 가  디렉토리가 생성되며 이 디렉토리에는 Hadoop 클러스터에 접근하는데 필요한 파일들이 저장됩니다.
  • whirr.instance-templates: 구동할 클러스터의 구성을 설정합니다. jt는 jobtracker, nn은 name node, dn은 data node, tt는 task tracker를 의미합니다. 이 설정을 통해 유연한 설정이 가능합니다. data node와 task tracker를 더 늘리고 싶을 때는 dn+tt 앞에 쓰여진 숫자를 변경해 주시면 됩니다.
  • whirr.provider: 클러스터 서비스 제공자를 설정합니다. 현재는 Amazon EC2와 Rackspace Cloud Servers 두 가지를 지원합니다.
  • whirr.identity: AWS의 access key를 입력하시면 됩니다.
  • whirr.credential: AWS의 secret key를 입력 합니다.
  • whirr.private-key-file: 이 설정과 아래 설정은 ec2 인스턴스를 생성할 때 사용할 key로 사용됩니다. 위 예제처럼 하기 위해서는 아래와 같이 ssh키를 생성해야 한다. 또는 기존에 다른 인스턴스를 위해 만들어 놓은 EC2 Key pair의 경로를 설정해도 됩니다.
$ ssh-keygen -t rsa -P ''
  • whirr.location-id: 원하는 availability zone을 설정한다. 설정하지 않으면 whirr을 실행하는 인스턴스와 같은 zone이 설정된다.
  • whirr.hardware-id: 원하는 인스턴스 유형을 설정한다. Amazon EC2가 제공하는 인스턴스 유형은 Amzon EC2 Instance Types 페이지에서 확인할 수 있으며 각 유형에 써 있는 API name을 이 설정에 적용하면 됩니다.
  • whirr.service-name: 구축할 서비스를 설정합니다. 이 글은 Hadoop을 위한 것이므로 hadoop으로 남겨 둡니다.
  • whirr.hadoop-install-runurl, whirr.hadoop-configure-runurl: Hadoop의 경우 apache 버전과 CDH 버전이 있습니다. 위 예제에서 주석(#)을 제거해 주면 CDH버전을 구동하게 됩니다.

설정에 대한 추가적인 설명은 Whirr Configuration Guide 문서를 참고하시면 됩니다.

Hadoop 클러스터 시동

클러스터의 시동은 다음과 같은 커맨드로 수행합니다. 클러스터를 시동하면 내부적으로 Amazon의 EC2 API를 통해 인스턴스를 생성해 필수 패키지(JDK)등을 설치하고 Hadoop 배포 버전을 다운로드 받아 설정을 하는 과정이 수행됩니다. 따라서 클러스터가 구동되는데 짧게는 수 분에서 길게는 10분 정도 소요됩니다. 클러스터 구동이 완료되면 다시 쉘 프롬프트가 뜨게 됩니다.

$ whirr/bin/whirr launch-cluster --config cluster.properties
Bootstrapping cluster
Configuring template
Starting 16 node(s) with roles [tt, dn]
Configuring template
Starting 1 node(s) with roles [jt, nn]
Nodes started: [[id=us-east-1/i-a45eb7cb, providerId=i-a45eb7cb, ...]]
.....
Nodes started: [[id=us-east-1/i-7a51b815, providerId=i-7a51b815, ...]]
Authorizing firewall
Running configuration script
Configuration script run completed
Running configuration script
Configuration script run completed
Completed configuration of mycluster
Web UI available at http://ec2-72-44-43-29.compute-1.amazonaws.com
Wrote Hadoop site file /home/-----/.whirr/mycluster/hadoop-site.xml
Wrote Hadoop proxy script /home/-----/.whirr/mycluster/hadoop-proxy.sh
Wrote instances file /home/-----/.whirr/mycluster/instances
Started cluster of 17 instances
Cluster{instances=[Instance{roles=[tt, dn], ...}]}
$

프록시 열기

whirr을 통해 구동한 Hadoop 클러스터에 접근하기 위해서는 로컬에 설치했던 Hadoop을 설정을 해야 합니다. 설정은 간단히 whirr이 클러스터 시동 후 생성해 주는 파일을 사용하면 됩니다. ‘whirr launch-cluster’ 가 완료되고 나면 ${HOME}/.whirr/<cluster-name>/hadoop-site.xml 파일이 생성됩니다. 이 파일을 로컬에 설치한 Hadoop의 ${HADOOP_HOME}/conf에 간단하게 복사하거나 다음과 같이 환경변수를 설정하여 Hadoop의 설정 디렉토리를 override 하면 됩니다.

export HADOOP_CONF_DIR=~/.whirr/<cluster-name>

하지만 EC2 내부의 클러스터들은 private IP만을 가지기 때문에 바로 Hadoop 클러스터에 접근할 수는 없습니다. ${HOME}/.whirr/<cluster-name>/hadoop-proxy.sh를 실행 해야 비로소 EC2에 구동된 Hadoop 클러스터에 접근할 수 있습니다.

$ chmod +x ~/.whirr/mycluster/hadoop-proxy
$ ~/.whirr/mycluster/hadoop-proxy

Running proxy to Hadoop cluster at ec2-72-44-43-29.compute-1.
amazonaws.com. Use Ctrl-c to quit.
$ bin/hadoop dfs -ls
...

hadoop-proxy.sh를 실행하면 EC2에서 동작하는 Hadoop 클러스터 웹 UI도 접근할 수 있습니다. 그러나 이를 위해서는 간단한 웹 브라우져 설정이 요구됩니다.

  • 크롬은 Preferences -> Under the Hood 탭 -> Network -> Change Proxy Settings에서 설정하면 됩니다. Socks 설정의 주소는 localhost, 포트는 6666으로 해주시면 됩니다.
  • 파폭은 기본적으로 SOCKS를 사용하더라도 로컬 머신에 설정된 DNS를 사용하게 되어 있는데 먼저 이 설정을 변경해 주셔야 합니다. 이를 위해서는 주소창에 about:config를 입력해 세부 설정으로 들어가 network.proxy.socks_remote_dns 설정을 true로 변경해주셔야 합니다. 그리고 Preferences -> Advanced -> Network 탭 -> Connection 섹션의 Settings에서 SOCKS Host를 주소는 localhost, 포트는 6666으로 설정해주시면 됩니다.

위와 같이 수정하고 hadoop-proxy.sh를 실행 했을 때 출력되는 URL에 접속하시면 됩니다.

클러스터 종료

Hadoop을 이용한 모든 작업이 끝나면 종료는 다음 커맨드를 통해 수행합니다. 위에서 언급한 바와 같이 whirr은 아직까지 instance store 기반 클러스터 구축 밖에 지원하지 못하므로 HDFS의 모든 데이터가 제거되는 사실을 염두하셔야 합니다.

$ whirr/bin/whirr destroy-cluster --config=cluster.properties

결론

whirr을 통한 Hadoop 클러스터 구동 방법을 설명했습니다. whirr은 아직 사소한 버그와 설정의 한계가 있지만 직접 클러스터 를 구축해야 하는 사용자의 노력을 상당히 줄여줍니다. 필요에 따라 MapReduce 프로그램을 대규모 클러스터에 동작시켜야 하는 사용자들에게는 특히 유용하다고 생각합니다.

참고문서


An Example of Hadoop MapReduce Counter

MapReduce Counter

Hadoop MapReduce Counter provides a way to measure the progress or the number of operations that occur within MapReduce programs. Basically, MapReduce framework provides a number of built-in counters to measure basic I/O operations, such as FILE_BYTES_READ/WRITTEN and Map/Combine/Reduce input/output records. These counters are very useful especially when you evaluate some MapReduce programs. Besides, the MapReduce Counter allows users to employ your own counters. Since MapReduce Counters are automatically aggregated over Map and Reduce phases, it is one of the easiest way to investigate internal behaviors of MapReduce programs. In this post, I’m going to introduce how to use your own MapReduce Counter. The example sources described in this post are based on Hadoop 0.21 API.

Incrementing your counter

For your own MapReduce counter, you first define a enum type as follow:

public static enum MATCH_COUNTER {
  INCOMING_GRAPHS,
  PRUNING_BY_NCV,
  PRUNING_BY_COUNT,
  PRUNING_BY_ISO,
  ISOMORPHIC
};

And then, when you want to increment your own counter, you should call the increment method as follows:

context.getCounter(MATCH_COUNTER.INCOMING_GRAPHS).increment(1);

You can access context instance within setup, cleanup, map, and reduce method in Mapper or Reducer class. You can get a desired counter via calling context.getCounter method with some enum value.

Finding your counter

You can get some Counters from a finished job as follows:

Configuration conf = new Configuration();
Cluster cluster = new Cluster(conf);
Job job = Job.getInstance(cluster,conf);
result = job.waitForCompletion(true);
...
Counters counters = job.getCounters();

The instance of Counters class contains all of the counters obtained from a job. So, when you want to get your own counter, you should call findCounter method with a enum type as follows:

Counter c1 = counters.findCounter(MATCH_COUNTER.INCOMING_GRAPHS);
System.out.println(c1.getDisplayName()+":"+c1.getValue());

The below example shows how to get built-in counter groups that Hadoop provides basically.

for (CounterGroup group : counters) {
  System.out.println("* Counter Group: " + group.getDisplayName() + " (" + group.getName() + ")");
  System.out.println("  number of counters in this group: " + group.size());
  for (Counter counter : group) {
    System.out.println("  - " + counter.getDisplayName() + ": " + counter.getName() + ": "+counter.getValue());
  }
}

HDFS Scalability 향상을 위한 시도들 (1)


얼마전 Yahoo!의 HDFS 팀에서 Multiple nodes를 사용하여 HDFS namenode의 Horizontal Scalability를 향상 시키는 방법을 제안 했었습니다 (HDFS-1052). 그런데 그 뒤로는 Dhruba Borthakur라는 Hadoop 커미터가 Vertical Scalability 개선 방법을 제안했습니다(The Curse of Singletons! The Vertical Scalability of Hadoop NameNode, HDFS-1093, HADOOP-6713). Borthakur에 대해 LinkedIn 에서 찾아보니 현재 Facebook에서 근무하는 Hadoop 엔지니어라고 나오는군요.

위 두 제안을 보면 Vertical Scalability과 Horizontal Scalability라는 용어가 나옵니다. Vertical Scalability는 시스템의 사양을 향상 시켰을 때 얻는 확장성을 의미합니다. 주로 CPU, Memory, Hard disk 등의 향상을 의미합니다. Hadoop과 같은 분산 시스템에서는 시스템 코어의 수가 늘어나는 것도 Vertical Scalability의 범주로 포함됩니다. 반면 Horizontal Scalability는 시스템의 개수를 늘렸을 때 얻는 확장성을 의미합니다. 예를 들면 노드의 수가 10대에서 20개로 늘어났을 때 얻는 확장성을 의미합니다. scale-up과 scale-out도 각각 같은 의미로 통용됩니다.

본 포스트에서는 위 두 가지 제안 중에서 Dhruba Borthaku가 제안한 vertical scalability 향상을 위한 제안을 소개합니다. 우선 Dhruba Borthakur라는 해커가 지적한 Hadoop Namenode (현재 Hadoop 0.21)의 병목현상은 다음과 같습니다.

  • Network: Facebook에서 자신이 사용하는 클러스터는 약 2000개의 노드로 구성되어 있으며 MapReduce 프로그램 동작 시 각 서버들은 9개의 mapper와 6개의 reducer가 동작하도록 설정되어 있다고 합니다. 이 구성의 클러스터에서 MapReduce를 동작하면 클라이언트들은 동시에 약 30k 의 request를 NameNode 에게 요청한다고 합니다. 그러나 singleton으로 구현된 Hadoop RPCServer의 Listener 스레드가 모든 메시지를 처리하므로 상당히 많은 지연이 발생하고 CPU core의 수가 증가해도 효과가 없었다고 합니다.
  • CPU: FSNamesystem lock 메카니즘으로 인해 namenode는 실제로는 8개의 core를 가진 시스템이지만 보통 2개의 코어밖에 활용되지 않는다고 합니다. Borthakur에 의하면 FSNamesystem에서 사용하는 locking 메커니즘이 너무 단순 하고 HADOOP-1269 를 통해 문제를 개선 시켰음에도 여전히 개선의 여지가 있다고 합니다.
  • Memory: Hadoop의 NameNode는 논문 내용에 충실하게 모든 메타 데이터를 메모리에 유지합니다. 그런데 Borthakur가 사용하는 클러스터의 HDFS에는 6천만개의 파일과 8천만개의 블럭들이 유지하고 있는데 이 파일들의 메타데이터를 유지하기 위해 무려 58GB의 힙공간이 필요했다고 합니다.

Borthakur가 이 문제를 해결하기 위해 제안했던 방법은 다음과 같습니다.

  • RPC Server: singleton으로 구현되었던 Listener 스레드에 Reader 스레프 풀을 붙였다고 합니다. 그래서 Listener 스레드는 connection 요청에 대한 accept 만 해주고 Reader 스레드 중 하나가 RPC를 직접 처리하도록 개선했다고 합니다. 결과적으로 다량의 RPC 요청에 대해 더 많은 CPU core들을 활용할 수 있게 되었다고 합니다(HADOOP-6713).
  • FSNamesystem lock: Borthakur는 파일에 대한 어떤 operation이 있을 때 lock이 걸리는지 통계를 내고 그 결과로 파일과 디렉토리의 상태를 얻을 때와 읽기 위해 파일을 열 때 걸리는 lock이 전체 lock의 90%를 차지 한다는 것을 밝힙니다. 그리고 저 두 파일 operation들은 오직 read-only operation 이기 때문에 read-write lock 으로 바꾸어 성능을 향상 시켰다고 합니다(HADOOP-1093). 이 부분은 MapReduce 논문(The Google File System) 4.1절 Namespace Management and Locking 에도 설명이 잘 되어 있습니다. 이미 MapReduce에서는 namespace의 자료구조에서 상위 디렉토리에 해당하는 데이터에는 read lock을 걸고 작업 디렉토리 또는 작업 파일에는 read 또는 write lock을 걸어 가능한 동시에 다수의 operation들이 공유 데이터에 접근하게 하면서도 consistency를 유지하는 방법을 설명하고 있습니다. 아마도 file 에 대한 append가 Hadoop 0.20 버전에 추가 된 것 처럼 논문에 설명이 있음에도 구현이 되지 않은 부분이었나 봅니다. 자세한건 소스를 분석해 봐야 알 수 있을 것 같습니다.

그러나 메모리에 대한 문제는 아직 해결하지 못했다고 합니다. 그래도 Borthakur에 의하면 위 두 가지 문제점을 해결한 것만으로 무려 8배나 scalability를 향상 시켰다고 합니다.

얼마전 부터 HDFS scalability 향상에 대한 시도들이 눈에 띄고 재미있어 보여 ‘여유 있을 때 블로그에 한번 정리해 봐야 겠다’라고 한달전에 맘 먹었는데 겨우 하나를 마쳤네요. 요즘 시간이 잘 안나서 이 포스트를 시작해서 완성하는데 약 3주나 걸렸습니다. 그 사이 Usenix의 매거진인 ;login:에 Hadoop Namenode의 scalability에 대한 article인 HDFS Scalability: The Limits to Growth가 실렸습니다. 또 야후 개발자 네트워크 블로그에서 article을 소개하는 글 (Scalability of the Hadoop Distributed File System)이 실렸네요. 시간날 때 마다 마저 정리해 보겠습니다.