HPCCommunity.org
 
Register

Go Back   HPC Community - High Performance Computing (HPC) Community > EGO DE > EGO DE Technical Articles, Tips and Tricks

EGO DE Technical Articles, Tips and Tricks EGO DE related technical articles, tips and tricks

Comment
 
LinkBack Article Tools Search this Article Display Modes
Hadoop-EGO Integration Solution
Hadoop-EGO Integration Solution
Published by qzhang
July 16th, 2008
Default Hadoop-EGO Integration Solution

1 Purpose

Hadoop is a framework for running applications that process vast amounts of data on large clusters. It implements a computational paradigm named Map/Reduce, where the application is divided into many small fragments of work, each of which may be executed or re-executed on any node in the cluster. In addition, it provides a distributed file system (HDFS) that stores data on the compute nodes, providing very high aggregate bandwidth across the cluster.

Currently Hadoop users use a set of configuration files to configure the size of Hadoop cluster, including how many nodes are in the cluster, which one is the master node, which are slaves, etc. All of these configurations can not be easily changed on demand at run-time. We want to find a way to dynamically control the size of Hadoop cluster on demand.

The intention of the solution described in this document is to use EGO as the underlying resource manager for Hadoop applications, whereby EGO policies could be used to control the dynamic growing/shrinking of Hadoop clusters.

Our solution provides the following features:
1. Dynamically add Hadoop computing hosts according to the workload of Hadoop cluster, remove the Hadoop computing hosts at the user pre-defined time.
2. Dynamically add/remove Hadoop computing hosts according to the EGO resource plan.

2 Referenced documents

Hadoop Quickstart
Hadoop Cluster Setup

3 Design idea

In order to control the dynamic growing/shrinking of Hadoop clusters, we need a way to let EGO start/stop Hadoop nodes in a Hadoop cluster. So we decided to wrap Hadoop daemons as EGO services, in this way, nodes in a Hadoop cluster can be easily controlled by EGO service controller.

For feature 1 mentioned in Section 1, we use PTM (Performance Threshold Manager which is a condition and action engine in the form of Java libraries) to collect the load information from a Hadoop cluster, and ask EGO to dynamically start/stop nodes in the Hadoop cluster according to the change of load info.

For feature 2, we use EGO policy to control the size of Hadoop clusters as we change the resource plan of a specific resource group (e.g. adjust the share ratio of two EGO consumers which represents a Hadoop cluster respectively)

4 Implementation of solution

In Hadoop, there are 4 java classes used to start four basic Hadoop daemons: name node, data node, job tracker and task tracker. To start a Hadoop master, we need to run the name node and job tracker; to start a Hadoop slave, we need to run the data node and task tracker. These daemons will read a configuration file (hadoop-site.xml) to get to know who is master, and then try to connect with it to set up a Hadoop cluster.

For feature 1

We wrap the name node and job tracker as an EGO service named HadoopAMaster, and the data node and task tracker as another EGO service named HadoopASlave. These two EGO services are used to set up a Hadoop cluster named HadoopA. When HadoopAMaster is started, that means the master in HadoopA is up; when HadoopASlave is started, it first changes the hostname of master in the configuration file(hadoop-site.xml) to the name of the node on which HadoopAMaster is running, and then starts the data node and task tracker.

For collecting the load information of the Hadoop cluster and for dynamically starting/stopping Hadoop nodes according to the change of load, we need a PTM orchestrator that is also wrapped as an EGO service named DeptAOrch. When DeptAOrch is started, it will collect load information from the Hadoop cluster by periodically calling the API provided by Hadoop. The metrics we use are the ratio of the number of Hadoop tasks running to the maximum number of tasks that can be run on HadoopA (please refer to the appendix A for the code). If the load is higher than the high-water value we pre-defined, DeptAOrch will ask EGO to start more HadoopASlave instances (i.e. add more slaves into the Hadoop cluster to provide more computation power). But when the load is lower than the low-water value we pre-defined, we cannot remove any slaves from the Hadoop cluster. The reason is that each slave may have some intermediate results produced by the previous tasks, and these results are needed by the subsequent tasks. So if we remove a slave, the intermediate results on it will never be accessible for any other subsequent tasks. The work around we use here is to define a time that should be long enough to finish the job we run in the Hadoop cluster. When this time expires (the job is done), DeptAOrch will ask EGO to shrink the Hadoop cluster by removing some slaves.

For the configuration on the EGO side, we create two resource groups: GroupXSharedPool and HadoopMasterPool, the slots of GroupXSharedPool are used to run HadoopASlave, the slots of HadoopMasterPool are used to run HadoopAMaster. In addition, we create three consumers: GroupX, Admin, DeptA, the relationship between these consumers and the EGO services is:

Service Consumer
------- --------
DeptAOrch /GroupX/Admin
HadoopAMaster /GroupX/Admin
HadoopASlave /GroupX/DeptA

The following figure illustrates our solution:


For try out this feature, please refer to the README file in the package HadoopOrchestrator.zip (http://www.hpccommunity.org/projecta...attachmentid=3), there are detailed instructions about how to install and test our solution.

For feature 2

We want to use two Hadoop clusters (HadoopA and HadoopB) to demonstrate this feature. Both of these clusters are managed by EGO, and our target is to use EGO policy to dynamically control the size of the clusters.

We continue to use the HadoopA cluster, and add another Hadoop cluster: HadoopB. Similar to HadoopA, the EGO services for the master and slave of HadoopB are HadoopBMaster and HadoopBSlave respectively. It should be mentioned that when HadoopBSlave is started, it first changes the hostname of the master in the configuration file (hadoop-site.xml) to the name of the node on which HadoopBMaster is running, and then starts the data node and task tracker.

For the configuration on the EGO side, the slots of GroupXSharedPool are used to run both HadoopASlave and HadoopBSlave, and the slots of HadoopMasterPool are used to run both HadoopAMaster and HadoopBMaster. In addition, we create a new consumer DeptB for HadoopBSlave. Now the relationship between these consumers and the EGO services is:

Service Consumer
------- --------
DeptAOrch /GroupX/Admin
DeptBOrch /GroupX/Admin
HadoopAMaster /GroupX/Admin
HadoopBMaster /GroupX/Admin
HadoopASlave /GroupX/DeptA
HadoopBSlave /GroupX/DeptB

The following figure illustrates our solution:



For this feature, we don’t care about the load in the cluster; the only factor for dynamically adding/removing Hadoop slaves is the change of resource plan of GroupXSharedPool. So the PTM orchestrator DeptAOrch and DeptBOrch are only used to start some HadoopASlave instances and HadoopBSlave instances at the beginning. The load information collected by them will be ignored.

When both HadoopA and HadoopB are up (by starting HadoopAMaster, HadoopBMaster, DeptAOrch and DeptBOrch), you can change the resource plan of GroupXSharedPool to apply different EGO policies, and then check the change of both of the Hadoop clusters’ size. Please refer to the README file in the package HadoopOrchestrator.zip for detailed instructions on how to try this feature.

It should be mentioned that this solution has a limitation: it only makes sense when the Hadoop clusters are idle (i.e. no jobs are running on them). The reason is the same as I mentioned for feature 1: if there are some jobs running on the Hadoop cluster, each Hadoop slave may have some intermediate results produced by the previous tasks, and these results are needed by the subsequent tasks. So if we remove a slave from HadoopB, and restart it as a slave of HadoopA, the intermediate results on it for the job in HadoopB will never be accessible for any other subsequent tasks in HadoopB.


A getWorkLoadRatio()

Code:
 public int getWorkLoadRatio() {

float MapTasks = m_ClusterStatus.getMapTasks();
    float MaxMapTasks = m_ClusterStatus.getMaxMapTasks();

float ReduceTasks = m_ClusterStatus.getReduceTasks();
float MaxReduceTasks = m_ClusterStatus.getMaxReduceTasks();

float Ratio = 0;

System.out.println("getWorkLoadRatio()::MapTasks = " + MapTasks);
    System.out.println("getWorkLoadRatio()::MaxMapTasks = " + MaxMapTasks);

    System.out.println("getWorkLoadRatio()::ReduceTasks = " + ReduceTasks);
    System.out.println("getWorkLoadRatio()::MaxReduceTasks = " + MaxReduceTasks);
            
    if(MaxReduceTasks>0||MaxMapTasks>0)
        Ratio = ((ReduceTasks+MapTasks) / (MaxReduceTasks+MaxMapTasks)) * 100;

        return (int) (Ratio);
    }
HPC Community.org

Article Tools
  #1 (permalink)  
By Ajith on July 29th, 2008, 09:16 PM
Default Hadoop

There's a background discussion in the Hadoop Project area.

There's a sample Hadoop orchestrator posted there.

- Ajith
Reply With Quote
Comment

Article Tools Search this Article
Search this Article:

Advanced Search
Display Modes

Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is On
HTML code is Off
Trackbacks are On
Pingbacks are On
Refbacks are On
Forum Jump


All times are GMT. The time now is 06:45 AM.


Powered by vBulletin® Version 3.7.4
Copyright ©2000 - 2008, Jelsoft Enterprises Ltd.

Article powered by GARS 2.1.9 ©2005-2006