Load-Aware Scheduler
Algorithm 2 : Task assignment stage
Input:
M: JobTracker of MapReduce cluster Q: active job queue on JobTrackerM R: requirements list of a job L: list of attributes
Whenever a TaskTrackerNhave an empty working slot:
1: Task assignTask( TaskTrackerN) do 2: for each job j,jinQdo
9: Estimate task execute timeTonNovera.
10: end for
We choose Hadoop MapReduce as our implementation platform. To make our scheduler more flexible and extensible, we merge our two modules into FairScheduler, which is pluggable from Hadoop.
We modified FairScheduler to achieve above objectives. Our scheduler still retains the fair-ness characteristic of FairScheduler since we only avoid unnecessary speculative tasks. We schedule tasks based on the configuration of the job itself.
^EDWůŝĞŶƚ
^EDW^ĞƌǀĞƌ
Figure 3.2: SNMP query scheme
For data collection module, we need an information exchange approach between JobTracker and TaskTracker. We can design our own protocol to exchange information. This approach might lead the scheduler to inflexible and hard to implement. Taking an attribute we might want to collect for example. We want to obtain system loading on a TaskTracker. Different operating systems might need different system calls or different programs to obtain. For each operating system we want to support, we will need a set of codes dealing with local data extrac-tion in TaskTracker. Not only did we need the effort to write extra codes, if there are any new platforms we want to support, we have to re-compile the source codes and restart the service.
To avoid these disadvantages, we choose SNMP[20] as our data collection protocol. SNMP (Simple Network Management Protocol) is a protocol designed for network management and network monitoring. SNMP defines a structure of management information and the manage-ment information base[21]. Figure 3.2 shows an example of a SNMP query flow. We choose Net-SNMP[22] as our SNMP server on TaskTracker. Net-SNMP is originally developed by University of California, Davis and is now an open-source project. Data collection module in JobTracker will send SNMP query to TaskTracker periodically. There is much informa-tion already defined in management informainforma-tion base (MIB). Taking previous system loading for example, query iso.org.dod.internet.private.enterprise.ucdavis.laTable.laEntry.laLoad (MIB number 1.3.6.1.4.1.2021.10.1.3) can obtain the system loading of a TaskTracker when using Net-SNMP. Net-SNMP defines a extTable (MIB number 1.3.6.1.4.1.2021.8) for administrators to specify self-defined commands. Given a program, Net-SNMP will run the program and
re-0 5 10 15 20 0.5
1 1.5
2x 106
System Load
Task Execution Time (second)
Figure 3.3: System loading versus Task execution time experiment
turn its output as the result of an MIB entry. We leverage the extTable to construct our structure to do jobs’ requirement checking. MapReduce jobs can set a special flag expressing that they need some checking for TaskTracker. Only if all the results of SNMP query fit the job’s require-ments will the JobTracker assign tasks to the TaskTracker. To add a new checking attribute for MapReduce jobs, we applied following three steps:
• Add the corresponding program on TaskTracker.
• Restart the SNMP daemon on TaskTracker.
• Notify users the MIB number of the attribute.
We do not have to restart our MapReduce service thus no active jobs and users are influence by this process.
For task scheduling module, we leave the speculation threshold unchanged and focus on avoiding unnecessary speculative tasks. Our work needs an estimation approach toward task execution time. Our target application is CPU-intensive. We analyze two attributes that we think have the most effect on task execution time: system loading and CPU frequency. System loading gives a global view of CPU usage about a TaskTracker. High loading means fewer resources can be obtained by one process. We need an estimation function that gives task estimated execution time based on previous task executing experience. Given current loading and previous loading plus running time, we can predict how long the task might elapse on this TaskTracker. We perform an experiment to observe the relation between loading and task execution time. The
0 2 4 6 8
Figure 3.4: System loading versus Task execution time (a) node are not overloaded (b) node are overloaded
results are shown in Figure 3.3. Multi-core computers are common nowadays. We found that the growing curve differs when system loading grows larger than system core number.
Our experiment node has 8 cores. We can see in Figure 3.3 that when x-axis reached beyond 8, the curve grows exponentially. When system loading lower than 8, the curve linearly grows.
We split the experimental result in half, and analyze the changes before and after system loading reaches core number in Figure 3.3.
Base on the experiment, we have two estimation functions of system loading:
• Time′= 17700 ∗ (Load′− Load) ∗ Time when node are not overloaded.
• Time′= Time ∗ e0.12∗(Load′−Load) when node are overloaded.
We use the same technique for CPU frequency. We perform an experiment to observe the relation between CPU frequency and task execution time. The results are shown in Figure 3.5.
Base on the experiment, we have an estimation function of CPU frequency:
• Time′= Time ∗ Freq′/Freq
When task scheduling module needs an estimation of some task, following steps are taken:
• Select an executed task of the same job.
• Estimate the finish time based on system loading adjustment function if selected task is processed by the same TaskTracker.
• Estimate the finish time based on CPU Frequency and system loading adjustment func-tions if selected task is not processed by the same TaskTracker.
500 1000 1500 2000 2500
Figure 3.5: CPU frequency versus Task execution time experiment
3.4 Discussion
While Hadoop MapReduce was designed to use on large-scale cluster, its elegant framework design has made many small organizations push their applications on top of it. Smaller scale clusters are common in campus to support courses and researches. Companies that are not web-based can also store and analyze data on top of Hadoop MapReduce. Heartbeat plays an important role on the architecture of Hadoop MapReduce. JobTracker and TaskTracker mainly communicate over heartbeats and its responses. JobTracker needs to process heartbeats and gives proper responses. To avoid overwhelming the JobTracker, current Hadoop MapReduce has a 3 seconds lower-bound for heartbeat interval. Heartbeat interval grows linearly when cluster size increases. In our experience, a 3 seconds lower-bound for heartbeat interval seems quite much for small cluster. Jobs submitted in cluster cannot be scheduled until next heartbeat was sent by a TaskTracker who has available working slots. Small jobs that take less than 1 minute might waste over 10% of time waiting for heartbeats. There are some discussions about dynamic adjustment of heartbeat interval on Hadoop MapReduce. Issue number 5784[23]
discusses about configurable heartbeat interval. [24] lowers the minimum heartbeat threshold on small cluster. The experiment of [24] receives quite a stunning performance burst of 100%
on a small cluster.
Heartbeat between JobTracker and TaskTracker carries much information around. If the in-formation that data collection module needed can be padded on heartbeat, it will be undoubtfully be an enhancement of performance. There are issues [25] [26] that discussed about padding
ex-tra information on heartbeats. In the future, our data collection module can leverage the exex-tra information already carried by heartbeat.