Logo Search packages:      
Sourcecode: hadoop version File versions  Download package

TestFairScheduler.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import junit.framework.TestCase;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.FairScheduler.JobInfo;

public class TestFairScheduler extends TestCase {
  final static String TEST_DIR = new File(System.getProperty("test.build.data",
      "build/contrib/streaming/test/data")).getAbsolutePath();
  final static String ALLOC_FILE = new File(TEST_DIR, 
      "test-pools").getAbsolutePath();
  
  private static final String POOL_PROPERTY = "pool";
  
  private static int jobCounter;
  private static int taskCounter;
  
  static class FakeJobInProgress extends JobInProgress {
    
    private FakeTaskTrackerManager taskTrackerManager;
    
    public FakeJobInProgress(JobConf jobConf,
        FakeTaskTrackerManager taskTrackerManager) throws IOException {
      super(new JobID("test", ++jobCounter), jobConf);
      this.taskTrackerManager = taskTrackerManager;
      this.startTime = System.currentTimeMillis();
      this.status = new JobStatus();
      this.status.setRunState(JobStatus.PREP);
    }
    
    @Override
    public synchronized void initTasks() throws IOException {
      // do nothing
    }

    @Override
    public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
        int ignored) throws IOException {
      TaskAttemptID attemptId = getTaskAttemptID(true);
      Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
        @Override
        public String toString() {
          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
        }
      };
      taskTrackerManager.startTask(tts.getTrackerName(), task);
      runningMapTasks++;
      return task;
    }
    
    @Override
    public Task obtainNewReduceTask(final TaskTrackerStatus tts,
        int clusterSize, int ignored) throws IOException {
      TaskAttemptID attemptId = getTaskAttemptID(false);
      Task task = new ReduceTask("", attemptId, 0, 10) {
        @Override
        public String toString() {
          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
        }
      };
      taskTrackerManager.startTask(tts.getTrackerName(), task);
      runningReduceTasks++;
      return task;
    }
    
    private TaskAttemptID getTaskAttemptID(boolean isMap) {
      JobID jobId = getJobID();
      return new TaskAttemptID(jobId.getJtIdentifier(),
          jobId.getId(), isMap, ++taskCounter, 0);
    }
  }
  
  static class FakeTaskTrackerManager implements TaskTrackerManager {
    int maps = 0;
    int reduces = 0;
    int maxMapTasksPerTracker = 2;
    int maxReduceTasksPerTracker = 2;
    List<JobInProgressListener> listeners =
      new ArrayList<JobInProgressListener>();
    
    private Map<String, TaskTrackerStatus> trackers =
      new HashMap<String, TaskTrackerStatus>();
    private Map<String, TaskStatus> taskStatuses = 
      new HashMap<String, TaskStatus>();

    public FakeTaskTrackerManager() {
      trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
          new ArrayList<TaskStatus>(), 0,
          maxMapTasksPerTracker, maxReduceTasksPerTracker));
      trackers.put("tt2", new TaskTrackerStatus("tt2", "tt2.host", 2,
          new ArrayList<TaskStatus>(), 0,
          maxMapTasksPerTracker, maxReduceTasksPerTracker));
    }
    
    @Override
    public ClusterStatus getClusterStatus() {
      int numTrackers = trackers.size();
      return new ClusterStatus(numTrackers, maps, reduces,
          numTrackers * maxMapTasksPerTracker,
          numTrackers * maxReduceTasksPerTracker,
          JobTracker.State.RUNNING);
    }

    @Override
    public QueueManager getQueueManager() {
      return null;
    }
    
    @Override
    public int getNumberOfUniqueHosts() {
      return 0;
    }

    @Override
    public Collection<TaskTrackerStatus> taskTrackers() {
      return trackers.values();
    }


    @Override
    public void addJobInProgressListener(JobInProgressListener listener) {
      listeners.add(listener);
    }

    @Override
    public void removeJobInProgressListener(JobInProgressListener listener) {
      listeners.remove(listener);
    }
    
    @Override
    public int getNextHeartbeatInterval() {
      return MRConstants.HEARTBEAT_INTERVAL_MIN;
    }

    @Override
    public void killJob(JobID jobid) {
      return;
    }

    @Override
    public JobInProgress getJob(JobID jobid) {
      return null;
    }

    public void initJob (JobInProgress job) {
      // do nothing
    }
    
    public void failJob (JobInProgress job) {
      // do nothing
    }
    
    // Test methods
    
    public void submitJob(JobInProgress job) throws IOException {
      for (JobInProgressListener listener : listeners) {
        listener.jobAdded(job);
      }
    }
    
    public TaskTrackerStatus getTaskTracker(String trackerID) {
      return trackers.get(trackerID);
    }
    
    public void startTask(String taskTrackerName, final Task t) {
      if (t.isMapTask()) {
        maps++;
      } else {
        reduces++;
      }
      TaskStatus status = new TaskStatus() {
        @Override
        public boolean getIsMap() {
          return t.isMapTask();
        }
      };
      taskStatuses.put(t.getTaskID().toString(), status);
      status.setRunState(TaskStatus.State.RUNNING);
      trackers.get(taskTrackerName).getTaskReports().add(status);
    }
    
    public void finishTask(String taskTrackerName, String tipId) {
      TaskStatus status = taskStatuses.get(tipId);
      if (status.getIsMap()) {
        maps--;
      } else {
        reduces--;
      }
      status.setRunState(TaskStatus.State.SUCCEEDED);
    }
  }
  
  protected class FakeClock extends FairScheduler.Clock {
    private long time = 0;
    
    public void advance(long millis) {
      time += millis;
    }

    @Override
    long getTime() {
      return time;
    }
  }
  
  protected JobConf conf;
  protected FairScheduler scheduler;
  private FakeTaskTrackerManager taskTrackerManager;
  private FakeClock clock;

  @Override
  protected void setUp() throws Exception {
    jobCounter = 0;
    taskCounter = 0;
    new File(TEST_DIR).mkdirs(); // Make sure data directory exists
    // Create an empty pools file (so we can add/remove pools later)
    FileWriter fileWriter = new FileWriter(ALLOC_FILE);
    fileWriter.write("<?xml version=\"1.0\"?>\n");
    fileWriter.write("<allocations />\n");
    fileWriter.close();
    conf = new JobConf();
    conf.set("mapred.fairscheduler.allocation.file", ALLOC_FILE);
    conf.set("mapred.fairscheduler.poolnameproperty", POOL_PROPERTY);
    taskTrackerManager = new FakeTaskTrackerManager();
    clock = new FakeClock();
    scheduler = new FairScheduler(clock, false);
    scheduler.waitForMapsBeforeLaunchingReduces = false;
    scheduler.setConf(conf);
    scheduler.setTaskTrackerManager(taskTrackerManager);
    scheduler.start();
  }
  
  @Override
  protected void tearDown() throws Exception {
    if (scheduler != null) {
      scheduler.terminate();
    }
  }
  
  private JobInProgress submitJob(int state, int maps, int reduces)
      throws IOException {
    return submitJob(state, maps, reduces, null);
  }
  
  private JobInProgress submitJob(int state, int maps, int reduces, String pool)
      throws IOException {
    JobConf jobConf = new JobConf(conf);
    jobConf.setNumMapTasks(maps);
    jobConf.setNumReduceTasks(reduces);
    if (pool != null)
      jobConf.set(POOL_PROPERTY, pool);
    JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager);
    job.getStatus().setRunState(state);
    taskTrackerManager.submitJob(job);
    job.startTime = clock.time;
    return job;
  }
  
  protected void submitJobs(int number, int state, int maps, int reduces)
    throws IOException {
    for (int i = 0; i < number; i++) {
      submitJob(state, maps, reduces);
    }
  }

  public void testAllocationFileParsing() throws Exception {
    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
    out.println("<?xml version=\"1.0\"?>");
    out.println("<allocations>"); 
    // Give pool A a minimum of 1 map, 2 reduces
    out.println("<pool name=\"poolA\">");
    out.println("<minMaps>1</minMaps>");
    out.println("<minReduces>2</minReduces>");
    out.println("</pool>");
    // Give pool B a minimum of 2 maps, 1 reduce
    out.println("<pool name=\"poolB\">");
    out.println("<minMaps>2</minMaps>");
    out.println("<minReduces>1</minReduces>");
    out.println("</pool>");
    // Give pool C min maps but no min reduces
    out.println("<pool name=\"poolC\">");
    out.println("<minMaps>2</minMaps>");
    out.println("</pool>");
    // Give pool D a limit of 3 running jobs
    out.println("<pool name=\"poolD\">");
    out.println("<maxRunningJobs>3</maxRunningJobs>");
    out.println("</pool>");
    // Set default limit of jobs per user to 5
    out.println("<userMaxJobsDefault>5</userMaxJobsDefault>");
    // Give user1 a limit of 10 jobs
    out.println("<user name=\"user1\">");
    out.println("<maxRunningJobs>10</maxRunningJobs>");
    out.println("</user>");
    out.println("</allocations>"); 
    out.close();
    
    PoolManager poolManager = scheduler.getPoolManager();
    poolManager.reloadAllocs();
    
    assertEquals(5, poolManager.getPools().size()); // 4 in file + default pool
    assertEquals(0, poolManager.getAllocation(Pool.DEFAULT_POOL_NAME,
        TaskType.MAP));
    assertEquals(0, poolManager.getAllocation(Pool.DEFAULT_POOL_NAME,
        TaskType.REDUCE));
    assertEquals(1, poolManager.getAllocation("poolA", TaskType.MAP));
    assertEquals(2, poolManager.getAllocation("poolA", TaskType.REDUCE));
    assertEquals(2, poolManager.getAllocation("poolB", TaskType.MAP));
    assertEquals(1, poolManager.getAllocation("poolB", TaskType.REDUCE));
    assertEquals(2, poolManager.getAllocation("poolC", TaskType.MAP));
    assertEquals(0, poolManager.getAllocation("poolC", TaskType.REDUCE));
    assertEquals(0, poolManager.getAllocation("poolD", TaskType.MAP));
    assertEquals(0, poolManager.getAllocation("poolD", TaskType.REDUCE));
    assertEquals(Integer.MAX_VALUE, poolManager.getPoolMaxJobs("poolA"));
    assertEquals(3, poolManager.getPoolMaxJobs("poolD"));
    assertEquals(10, poolManager.getUserMaxJobs("user1"));
    assertEquals(5, poolManager.getUserMaxJobs("user2"));
  }
  
  public void testTaskNotAssignedWhenNoJobsArePresent() throws IOException {
    assertNull(scheduler.assignTasks(tracker("tt1")));
  }

  public void testNonRunningJobsAreIgnored() throws IOException {
    submitJobs(1, JobStatus.PREP, 10, 10);
    submitJobs(1, JobStatus.SUCCEEDED, 10, 10);
    submitJobs(1, JobStatus.FAILED, 10, 10);
    submitJobs(1, JobStatus.KILLED, 10, 10);
    assertNull(scheduler.assignTasks(tracker("tt1")));
    advanceTime(100); // Check that we still don't assign jobs after an update
    assertNull(scheduler.assignTasks(tracker("tt1")));
  }

  /**
   * This test contains two jobs with fewer required tasks than there are slots.
   * We check that all tasks are assigned, but job 1 gets them first because it
   * was submitted earlier.
   */
  public void testSmallJobs() throws IOException {
    JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1);
    JobInfo info1 = scheduler.infos.get(job1);
    
    // Check scheduler variables
    assertEquals(0,    info1.runningMaps);
    assertEquals(0,    info1.runningReduces);
    assertEquals(2,    info1.neededMaps);
    assertEquals(1,    info1.neededReduces);
    assertEquals(0,    info1.mapDeficit);
    assertEquals(0,    info1.reduceDeficit);
    assertEquals(4.0,  info1.mapFairShare);
    assertEquals(4.0,  info1.reduceFairShare);
    
    // Advance time before submitting another job j2, to make j1 run before j2
    // deterministically.
    advanceTime(100);
    JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 2);
    JobInfo info2 = scheduler.infos.get(job2);
    
    // Check scheduler variables; the fair shares should now have been allocated
    // equally between j1 and j2, but j1 should have (4 slots)*(100 ms) deficit
    assertEquals(0,    info1.runningMaps);
    assertEquals(0,    info1.runningReduces);
    assertEquals(2,    info1.neededMaps);
    assertEquals(1,    info1.neededReduces);
    assertEquals(400,  info1.mapDeficit);
    assertEquals(400,  info1.reduceDeficit);
    assertEquals(2.0,  info1.mapFairShare);
    assertEquals(2.0,  info1.reduceFairShare);
    assertEquals(0,    info2.runningMaps);
    assertEquals(0,    info2.runningReduces);
    assertEquals(1,    info2.neededMaps);
    assertEquals(2,    info2.neededReduces);
    assertEquals(0,    info2.mapDeficit);
    assertEquals(0,    info2.reduceDeficit);
    assertEquals(2.0,  info2.mapFairShare);
    assertEquals(2.0,  info2.reduceFairShare);
    
    // Assign tasks and check that all slots are filled with j1, then j2
    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
    checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
    checkAssignment("tt2", "attempt_test_0002_r_000006_0 on tt2");
    assertNull(scheduler.assignTasks(tracker("tt2")));
    
    // Check that the scheduler has started counting the tasks as running
    // as soon as it launched them.
    assertEquals(2,  info1.runningMaps);
    assertEquals(1,  info1.runningReduces);
    assertEquals(0,  info1.neededMaps);
    assertEquals(0,  info1.neededReduces);
    assertEquals(1,  info2.runningMaps);
    assertEquals(2,  info2.runningReduces);
    assertEquals(0, info2.neededMaps);
    assertEquals(0, info2.neededReduces);
  }
  
  /**
   * This test begins by submitting two jobs with 10 maps and reduces each.
   * The first job is submitted 100ms after the second, during which time no
   * tasks run. After this, we assign tasks to all slots, which should all be
   * from job 1. These run for 200ms, at which point job 2 now has a deficit
   * of 400 while job 1 is down to a deficit of 0. We then finish all tasks and
   * assign new ones, which should all be from job 2. These run for 50 ms,
   * which is not enough time for job 2 to make up its deficit (it only makes up
   * 100 ms of deficit). Finally we assign a new round of tasks, which should
   * all be from job 2 again.
   */
  public void testLargeJobs() throws IOException {
    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
    JobInfo info1 = scheduler.infos.get(job1);
    
    // Check scheduler variables
    assertEquals(0,    info1.runningMaps);
    assertEquals(0,    info1.runningReduces);
    assertEquals(10,   info1.neededMaps);
    assertEquals(10,   info1.neededReduces);
    assertEquals(0,    info1.mapDeficit);
    assertEquals(0,    info1.reduceDeficit);
    assertEquals(4.0,  info1.mapFairShare);
    assertEquals(4.0,  info1.reduceFairShare);
    
    // Advance time before submitting another job j2, to make j1 run before j2
    // deterministically.
    advanceTime(100);
    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
    JobInfo info2 = scheduler.infos.get(job2);
    
    // Check scheduler variables; the fair shares should now have been allocated
    // equally between j1 and j2, but j1 should have (4 slots)*(100 ms) deficit
    assertEquals(0,    info1.runningMaps);
    assertEquals(0,    info1.runningReduces);
    assertEquals(10,   info1.neededMaps);
    assertEquals(10,   info1.neededReduces);
    assertEquals(400,  info1.mapDeficit);
    assertEquals(400,  info1.reduceDeficit);
    assertEquals(2.0,  info1.mapFairShare);
    assertEquals(2.0,  info1.reduceFairShare);
    assertEquals(0,    info2.runningMaps);
    assertEquals(0,    info2.runningReduces);
    assertEquals(10,   info2.neededMaps);
    assertEquals(10,   info2.neededReduces);
    assertEquals(0,    info2.mapDeficit);
    assertEquals(0,    info2.reduceDeficit);
    assertEquals(2.0,  info2.mapFairShare);
    assertEquals(2.0,  info2.reduceFairShare);
    
    // Assign tasks and check that all slots are initially filled with job 1
    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
    checkAssignment("tt2", "attempt_test_0001_r_000007_0 on tt2");
    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
    
    // Check that the scheduler has started counting the tasks as running
    // as soon as it launched them.
    assertEquals(4,  info1.runningMaps);
    assertEquals(4,  info1.runningReduces);
    assertEquals(6,  info1.neededMaps);
    assertEquals(6,  info1.neededReduces);
    assertEquals(0,  info2.runningMaps);
    assertEquals(0,  info2.runningReduces);
    assertEquals(10, info2.neededMaps);
    assertEquals(10, info2.neededReduces);
    
    // Finish up the tasks and advance time again. Note that we must finish
    // the task since FakeJobInProgress does not properly maintain running
    // tasks, so the scheduler will always get an empty task list from
    // the JobInProgress's getMapTasks/getReduceTasks and think they finished.
    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0");
    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0");
    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000003_0");
    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000004_0");
    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000005_0");
    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000006_0");
    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000007_0");
    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000008_0");
    advanceTime(200);
    assertEquals(0,   info1.runningMaps);
    assertEquals(0,   info1.runningReduces);
    assertEquals(0,   info1.mapDeficit);
    assertEquals(0,   info1.reduceDeficit);
    assertEquals(0,   info2.runningMaps);
    assertEquals(0,   info2.runningReduces);
    assertEquals(400, info2.mapDeficit);
    assertEquals(400, info2.reduceDeficit);

    // Assign tasks and check that all slots are now filled with job 2
    checkAssignment("tt1", "attempt_test_0002_m_000009_0 on tt1");
    checkAssignment("tt1", "attempt_test_0002_m_000010_0 on tt1");
    checkAssignment("tt1", "attempt_test_0002_r_000011_0 on tt1");
    checkAssignment("tt1", "attempt_test_0002_r_000012_0 on tt1");
    checkAssignment("tt2", "attempt_test_0002_m_000013_0 on tt2");
    checkAssignment("tt2", "attempt_test_0002_m_000014_0 on tt2");
    checkAssignment("tt2", "attempt_test_0002_r_000015_0 on tt2");
    checkAssignment("tt2", "attempt_test_0002_r_000016_0 on tt2");

    // Finish up the tasks and advance time again, but give job 2 only 50ms.
    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000009_0");
    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000010_0");
    taskTrackerManager.finishTask("tt1", "attempt_test_0002_r_000011_0");
    taskTrackerManager.finishTask("tt1", "attempt_test_0002_r_000012_0");
    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000013_0");
    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000014_0");
    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000015_0");
    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000016_0");
    advanceTime(50);
    assertEquals(0,   info1.runningMaps);
    assertEquals(0,   info1.runningReduces);
    assertEquals(100, info1.mapDeficit);
    assertEquals(100, info1.reduceDeficit);
    assertEquals(0,   info2.runningMaps);
    assertEquals(0,   info2.runningReduces);
    assertEquals(300, info2.mapDeficit);
    assertEquals(300, info2.reduceDeficit);

    // Assign tasks and check that all slots are now still with job 2
    checkAssignment("tt1", "attempt_test_0002_m_000017_0 on tt1");
    checkAssignment("tt1", "attempt_test_0002_m_000018_0 on tt1");
    checkAssignment("tt1", "attempt_test_0002_r_000019_0 on tt1");
    checkAssignment("tt1", "attempt_test_0002_r_000020_0 on tt1");
    checkAssignment("tt2", "attempt_test_0002_m_000021_0 on tt2");
    checkAssignment("tt2", "attempt_test_0002_m_000022_0 on tt2");
    checkAssignment("tt2", "attempt_test_0002_r_000023_0 on tt2");
    checkAssignment("tt2", "attempt_test_0002_r_000024_0 on tt2");
  }
  

  /**
   * We submit two jobs such that one has 2x the priority of the other, wait
   * for 100 ms, and check that the weights/deficits are okay and that the
   * tasks all go to the high-priority job.
   */
  public void testJobsWithPriorities() throws IOException {
    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
    JobInfo info1 = scheduler.infos.get(job1);
    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
    JobInfo info2 = scheduler.infos.get(job2);
    job2.setPriority(JobPriority.HIGH);
    scheduler.update();
    
    // Check scheduler variables
    assertEquals(0,    info1.runningMaps);
    assertEquals(0,    info1.runningReduces);
    assertEquals(10,   info1.neededMaps);
    assertEquals(10,   info1.neededReduces);
    assertEquals(0,    info1.mapDeficit);
    assertEquals(0,    info1.reduceDeficit);
    assertEquals(1.33, info1.mapFairShare, 0.1);
    assertEquals(1.33, info1.reduceFairShare, 0.1);
    assertEquals(0,    info2.runningMaps);
    assertEquals(0,    info2.runningReduces);
    assertEquals(10,   info2.neededMaps);
    assertEquals(10,   info2.neededReduces);
    assertEquals(0,    info2.mapDeficit);
    assertEquals(0,    info2.reduceDeficit);
    assertEquals(2.66, info2.mapFairShare, 0.1);
    assertEquals(2.66, info2.reduceFairShare, 0.1);
    
    // Advance time and check deficits
    advanceTime(100);
    assertEquals(133,  info1.mapDeficit, 1.0);
    assertEquals(133,  info1.reduceDeficit, 1.0);
    assertEquals(266,  info2.mapDeficit, 1.0);
    assertEquals(266,  info2.reduceDeficit, 1.0);
    
    // Assign tasks and check that all slots are filled with j1, then j2
    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
    checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
    checkAssignment("tt2", "attempt_test_0002_m_000006_0 on tt2");
    checkAssignment("tt2", "attempt_test_0002_r_000007_0 on tt2");
    checkAssignment("tt2", "attempt_test_0002_r_000008_0 on tt2");
  }
  
  /**
   * This test starts by submitting three large jobs:
   * - job1 in the default pool, at time 0
   * - job2 in poolA, with an allocation of 1 map / 2 reduces, at time 200
   * - job3 in poolB, with an allocation of 2 maps / 1 reduce, at time 200
   * 
   * After this, we sleep 100ms, until time 300. At this point, job1 has the
   * highest map deficit, job3 the second, and job2 the third. This is because
   * job3 has more maps in its min share than job2, but job1 has been around
   * a long time at the beginning. The reduce deficits are similar, except job2
   * comes before job3 because it had a higher reduce minimum share.
   * 
   * Finally, assign tasks to all slots. The maps should be assigned in the
   * order job3, job2, job1 because 3 and 2 both have guaranteed slots and 3
   * has a higher deficit. The reduces should be assigned as job2, job3, job1.
   */
  public void testLargeJobsWithPools() throws Exception {
    // Set up pools file
    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
    out.println("<?xml version=\"1.0\"?>");
    out.println("<allocations>");
    // Give pool A a minimum of 1 map, 2 reduces
    out.println("<pool name=\"poolA\">");
    out.println("<minMaps>1</minMaps>");
    out.println("<minReduces>2</minReduces>");
    out.println("</pool>");
    // Give pool B a minimum of 2 maps, 1 reduce
    out.println("<pool name=\"poolB\">");
    out.println("<minMaps>2</minMaps>");
    out.println("<minReduces>1</minReduces>");
    out.println("</pool>");
    out.println("</allocations>");
    out.close();
    scheduler.getPoolManager().reloadAllocs();
    
    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
    JobInfo info1 = scheduler.infos.get(job1);
    
    // Check scheduler variables
    assertEquals(0,    info1.runningMaps);
    assertEquals(0,    info1.runningReduces);
    assertEquals(10,   info1.neededMaps);
    assertEquals(10,   info1.neededReduces);
    assertEquals(0,    info1.mapDeficit);
    assertEquals(0,    info1.reduceDeficit);
    assertEquals(4.0,  info1.mapFairShare);
    assertEquals(4.0,  info1.reduceFairShare);
    
    // Advance time 200ms and submit jobs 2 and 3
    advanceTime(200);
    assertEquals(800,  info1.mapDeficit);
    assertEquals(800,  info1.reduceDeficit);
    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
    JobInfo info2 = scheduler.infos.get(job2);
    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
    JobInfo info3 = scheduler.infos.get(job3);
    
    // Check that minimum and fair shares have been allocated
    assertEquals(0,    info1.minMaps);
    assertEquals(0,    info1.minReduces);
    assertEquals(1.0,  info1.mapFairShare);
    assertEquals(1.0,  info1.reduceFairShare);
    assertEquals(1,    info2.minMaps);
    assertEquals(2,    info2.minReduces);
    assertEquals(1.0,  info2.mapFairShare);
    assertEquals(2.0,  info2.reduceFairShare);
    assertEquals(2,    info3.minMaps);
    assertEquals(1,    info3.minReduces);
    assertEquals(2.0,  info3.mapFairShare);
    assertEquals(1.0,  info3.reduceFairShare);
    
    // Advance time 100ms and check deficits
    advanceTime(100);
    assertEquals(900,  info1.mapDeficit);
    assertEquals(900,  info1.reduceDeficit);
    assertEquals(100,  info2.mapDeficit);
    assertEquals(200,  info2.reduceDeficit);
    assertEquals(200,  info3.mapDeficit);
    assertEquals(100,  info3.reduceDeficit);
    
    // Assign tasks and check that slots are first given to needy jobs
    checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
    checkAssignment("tt1", "attempt_test_0003_m_000002_0 on tt1");
    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
    checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
    checkAssignment("tt2", "attempt_test_0003_r_000007_0 on tt2");
    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
  }

  /**
   * This test starts by submitting three large jobs:
   * - job1 in the default pool, at time 0
   * - job2 in poolA, with an allocation of 2 maps / 2 reduces, at time 200
   * - job3 in poolA, with an allocation of 2 maps / 2 reduces, at time 300
   * 
   * After this, we sleep 100ms, until time 400. At this point, job1 has the
   * highest deficit, job2 the second, and job3 the third. The first two tasks
   * should be assigned to job2 and job3 since they are in a pool with an
   * allocation guarantee, but the next two slots should be assigned to job 3
   * because the pool will no longer be needy.
   */
  public void testLargeJobsWithExcessCapacity() throws Exception {
    // Set up pools file
    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
    out.println("<?xml version=\"1.0\"?>");
    out.println("<allocations>");
    // Give pool A a minimum of 2 maps, 2 reduces
    out.println("<pool name=\"poolA\">");
    out.println("<minMaps>2</minMaps>");
    out.println("<minReduces>2</minReduces>");
    out.println("</pool>");
    out.println("</allocations>");
    out.close();
    scheduler.getPoolManager().reloadAllocs();
    
    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
    JobInfo info1 = scheduler.infos.get(job1);
    
    // Check scheduler variables
    assertEquals(0,    info1.runningMaps);
    assertEquals(0,    info1.runningReduces);
    assertEquals(10,   info1.neededMaps);
    assertEquals(10,   info1.neededReduces);
    assertEquals(0,    info1.mapDeficit);
    assertEquals(0,    info1.reduceDeficit);
    assertEquals(4.0,  info1.mapFairShare);
    assertEquals(4.0,  info1.reduceFairShare);
    
    // Advance time 200ms and submit job 2
    advanceTime(200);
    assertEquals(800,  info1.mapDeficit);
    assertEquals(800,  info1.reduceDeficit);
    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
    JobInfo info2 = scheduler.infos.get(job2);
    
    // Check that minimum and fair shares have been allocated
    assertEquals(0,    info1.minMaps);
    assertEquals(0,    info1.minReduces);
    assertEquals(2.0,  info1.mapFairShare);
    assertEquals(2.0,  info1.reduceFairShare);
    assertEquals(2,    info2.minMaps);
    assertEquals(2,    info2.minReduces);
    assertEquals(2.0,  info2.mapFairShare);
    assertEquals(2.0,  info2.reduceFairShare);
    
    // Advance time 100ms and submit job 3
    advanceTime(100);
    assertEquals(1000, info1.mapDeficit);
    assertEquals(1000, info1.reduceDeficit);
    assertEquals(200,  info2.mapDeficit);
    assertEquals(200,  info2.reduceDeficit);
    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
    JobInfo info3 = scheduler.infos.get(job3);
    
    // Check that minimum and fair shares have been allocated
    assertEquals(0,    info1.minMaps);
    assertEquals(0,    info1.minReduces);
    assertEquals(2,    info1.mapFairShare, 0.1);
    assertEquals(2,    info1.reduceFairShare, 0.1);
    assertEquals(1,    info2.minMaps);
    assertEquals(1,    info2.minReduces);
    assertEquals(1,    info2.mapFairShare, 0.1);
    assertEquals(1,    info2.reduceFairShare, 0.1);
    assertEquals(1,    info3.minMaps);
    assertEquals(1,    info3.minReduces);
    assertEquals(1,    info3.mapFairShare, 0.1);
    assertEquals(1,    info3.reduceFairShare, 0.1);
    
    // Advance time 100ms and check deficits
    advanceTime(100);
    assertEquals(1200, info1.mapDeficit, 1.0);
    assertEquals(1200, info1.reduceDeficit, 1.0);
    assertEquals(300,  info2.mapDeficit, 1.0);
    assertEquals(300,  info2.reduceDeficit, 1.0);
    assertEquals(100,  info3.mapDeficit, 1.0);
    assertEquals(100,  info3.reduceDeficit, 1.0);
    
    // Assign tasks and check that slots are first given to needy jobs, but
    // that job 1 gets two tasks after due to having a larger deficit.
    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
    checkAssignment("tt1", "attempt_test_0003_m_000002_0 on tt1");
    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
    checkAssignment("tt1", "attempt_test_0003_r_000004_0 on tt1");
    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
    checkAssignment("tt2", "attempt_test_0001_r_000007_0 on tt2");
    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
  }
  
  /**
   * This test starts by submitting two jobs at time 0:
   * - job1 in the default pool
   * - job2, with 1 map and 1 reduce, in poolA, which has an alloc of 4
   *   maps and 4 reduces
   * 
   * When we assign the slots, job2 should only get 1 of each type of task.
   * 
   * The fair share for job 2 should be 2.0 however, because even though it is
   * running only one task, it accumulates deficit in case it will have failures
   * or need speculative tasks later. (TODO: This may not be a good policy.)
   */
  public void testSmallJobInLargePool() throws Exception {
    // Set up pools file
    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
    out.println("<?xml version=\"1.0\"?>");
    out.println("<allocations>");
    // Give pool A a minimum of 4 maps, 4 reduces
    out.println("<pool name=\"poolA\">");
    out.println("<minMaps>4</minMaps>");
    out.println("<minReduces>4</minReduces>");
    out.println("</pool>");
    out.println("</allocations>");
    out.close();
    scheduler.getPoolManager().reloadAllocs();
    
    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
    JobInfo info1 = scheduler.infos.get(job1);
    JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 1, "poolA");
    JobInfo info2 = scheduler.infos.get(job2);
    
    // Check scheduler variables
    assertEquals(0,    info1.runningMaps);
    assertEquals(0,    info1.runningReduces);
    assertEquals(10,   info1.neededMaps);
    assertEquals(10,   info1.neededReduces);
    assertEquals(0,    info1.mapDeficit);
    assertEquals(0,    info1.reduceDeficit);
    assertEquals(2.0,  info1.mapFairShare);
    assertEquals(2.0,  info1.reduceFairShare);
    assertEquals(0,    info2.runningMaps);
    assertEquals(0,    info2.runningReduces);
    assertEquals(1,    info2.neededMaps);
    assertEquals(1,    info2.neededReduces);
    assertEquals(0,    info2.mapDeficit);
    assertEquals(0,    info2.reduceDeficit);
    assertEquals(2.0,  info2.mapFairShare);
    assertEquals(2.0,  info2.reduceFairShare);
    
    // Assign tasks and check that slots are first given to needy jobs
    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
    checkAssignment("tt2", "attempt_test_0001_r_000007_0 on tt2");
    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
  }
  
  /**
   * This test starts by submitting four jobs in the default pool. However, the
   * maxRunningJobs limit for this pool has been set to two. We should see only
   * the first two jobs get scheduled, each with half the total slots.
   */
  public void testPoolMaxJobs() throws Exception {
    // Set up pools file
    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
    out.println("<?xml version=\"1.0\"?>");
    out.println("<allocations>");
    out.println("<pool name=\"default\">");
    out.println("<maxRunningJobs>2</maxRunningJobs>");
    out.println("</pool>");
    out.println("</allocations>");
    out.close();
    scheduler.getPoolManager().reloadAllocs();
    
    // Submit jobs, advancing time in-between to make sure that they are
    // all submitted at distinct times.
    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
    JobInfo info1 = scheduler.infos.get(job1);
    advanceTime(10);
    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
    JobInfo info2 = scheduler.infos.get(job2);
    advanceTime(10);
    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
    JobInfo info3 = scheduler.infos.get(job3);
    advanceTime(10);
    JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
    JobInfo info4 = scheduler.infos.get(job4);
    
    // Check scheduler variables
    assertEquals(2.0,  info1.mapFairShare);
    assertEquals(2.0,  info1.reduceFairShare);
    assertEquals(2.0,  info2.mapFairShare);
    assertEquals(2.0,  info2.reduceFairShare);
    assertEquals(0.0,  info3.mapFairShare);
    assertEquals(0.0,  info3.reduceFairShare);
    assertEquals(0.0,  info4.mapFairShare);
    assertEquals(0.0,  info4.reduceFairShare);
    
    // Assign tasks and check that slots are first to jobs 1 and 2
    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
    advanceTime(100);
    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
    checkAssignment("tt2", "attempt_test_0002_m_000006_0 on tt2");
    checkAssignment("tt2", "attempt_test_0002_r_000007_0 on tt2");
    checkAssignment("tt2", "attempt_test_0002_r_000008_0 on tt2");
  }

  /**
   * This test starts by submitting two jobs by user "user1" to the default
   * pool, and two jobs by "user2". We set user1's job limit to 1. We should
   * see one job from user1 and two from user2. 
   */
  public void testUserMaxJobs() throws Exception {
    // Set up pools file
    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
    out.println("<?xml version=\"1.0\"?>");
    out.println("<allocations>");
    out.println("<user name=\"user1\">");
    out.println("<maxRunningJobs>1</maxRunningJobs>");
    out.println("</user>");
    out.println("</allocations>");
    out.close();
    scheduler.getPoolManager().reloadAllocs();
    
    // Submit jobs, advancing time in-between to make sure that they are
    // all submitted at distinct times.
    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
    job1.getJobConf().set("user.name", "user1");
    JobInfo info1 = scheduler.infos.get(job1);
    advanceTime(10);
    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
    job2.getJobConf().set("user.name", "user1");
    JobInfo info2 = scheduler.infos.get(job2);
    advanceTime(10);
    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
    job3.getJobConf().set("user.name", "user2");
    JobInfo info3 = scheduler.infos.get(job3);
    advanceTime(10);
    JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
    job4.getJobConf().set("user.name", "user2");
    JobInfo info4 = scheduler.infos.get(job4);
    
    // Check scheduler variables
    assertEquals(1.33,  info1.mapFairShare, 0.1);
    assertEquals(1.33,  info1.reduceFairShare, 0.1);
    assertEquals(0.0,   info2.mapFairShare);
    assertEquals(0.0,   info2.reduceFairShare);
    assertEquals(1.33,  info3.mapFairShare, 0.1);
    assertEquals(1.33,  info3.reduceFairShare, 0.1);
    assertEquals(1.33,  info4.mapFairShare, 0.1);
    assertEquals(1.33,  info4.reduceFairShare, 0.1);
    
    // Assign tasks and check that slots are first to jobs 1 and 3
    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
    advanceTime(100);
    checkAssignment("tt2", "attempt_test_0003_m_000005_0 on tt2");
    checkAssignment("tt2", "attempt_test_0003_m_000006_0 on tt2");
    checkAssignment("tt2", "attempt_test_0003_r_000007_0 on tt2");
    checkAssignment("tt2", "attempt_test_0003_r_000008_0 on tt2");
  }
  
  /**
   * Test a combination of pool job limits and user job limits, the latter
   * specified through both the userMaxJobsDefaults (for some users) and
   * user-specific &lt;user&gt; elements in the allocations file. 
   */
  public void testComplexJobLimits() throws Exception {
    // Set up pools file
    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
    out.println("<?xml version=\"1.0\"?>");
    out.println("<allocations>");
    out.println("<pool name=\"poolA\">");
    out.println("<maxRunningJobs>1</maxRunningJobs>");
    out.println("</pool>");
    out.println("<user name=\"user1\">");
    out.println("<maxRunningJobs>1</maxRunningJobs>");
    out.println("</user>");
    out.println("<user name=\"user2\">");
    out.println("<maxRunningJobs>10</maxRunningJobs>");
    out.println("</user>");
    out.println("<userMaxJobsDefault>2</userMaxJobsDefault>");
    out.println("</allocations>");
    out.close();
    scheduler.getPoolManager().reloadAllocs();
    
    // Submit jobs, advancing time in-between to make sure that they are
    // all submitted at distinct times.
    
    // Two jobs for user1; only one should get to run
    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
    job1.getJobConf().set("user.name", "user1");
    JobInfo info1 = scheduler.infos.get(job1);
    advanceTime(10);
    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
    job2.getJobConf().set("user.name", "user1");
    JobInfo info2 = scheduler.infos.get(job2);
    advanceTime(10);
    
    // Three jobs for user2; all should get to run
    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
    job3.getJobConf().set("user.name", "user2");
    JobInfo info3 = scheduler.infos.get(job3);
    advanceTime(10);
    JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
    job4.getJobConf().set("user.name", "user2");
    JobInfo info4 = scheduler.infos.get(job4);
    advanceTime(10);
    JobInProgress job5 = submitJob(JobStatus.RUNNING, 10, 10);
    job5.getJobConf().set("user.name", "user2");
    JobInfo info5 = scheduler.infos.get(job5);
    advanceTime(10);
    
    // Three jobs for user3; only two should get to run
    JobInProgress job6 = submitJob(JobStatus.RUNNING, 10, 10);
    job6.getJobConf().set("user.name", "user3");
    JobInfo info6 = scheduler.infos.get(job6);
    advanceTime(10);
    JobInProgress job7 = submitJob(JobStatus.RUNNING, 10, 10);
    job7.getJobConf().set("user.name", "user3");
    JobInfo info7 = scheduler.infos.get(job7);
    advanceTime(10);
    JobInProgress job8 = submitJob(JobStatus.RUNNING, 10, 10);
    job8.getJobConf().set("user.name", "user3");
    JobInfo info8 = scheduler.infos.get(job8);
    advanceTime(10);
    
    // Two jobs for user4, in poolA; only one should get to run
    JobInProgress job9 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
    job9.getJobConf().set("user.name", "user4");
    JobInfo info9 = scheduler.infos.get(job9);
    advanceTime(10);
    JobInProgress job10 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
    job10.getJobConf().set("user.name", "user4");
    JobInfo info10 = scheduler.infos.get(job10);
    advanceTime(10);
    
    // Check scheduler variables. The jobs in poolA should get half
    // the total share, while those in the default pool should get
    // the other half. This works out to 2 slots each for the jobs
    // in poolA and 1/3 each for the jobs in the default pool because
    // there are 2 runnable jobs in poolA and 6 jobs in the default pool.
    assertEquals(0.33,   info1.mapFairShare, 0.1);
    assertEquals(0.33,   info1.reduceFairShare, 0.1);
    assertEquals(0.0,    info2.mapFairShare);
    assertEquals(0.0,    info2.reduceFairShare);
    assertEquals(0.33,   info3.mapFairShare, 0.1);
    assertEquals(0.33,   info3.reduceFairShare, 0.1);
    assertEquals(0.33,   info4.mapFairShare, 0.1);
    assertEquals(0.33,   info4.reduceFairShare, 0.1);
    assertEquals(0.33,   info5.mapFairShare, 0.1);
    assertEquals(0.33,   info5.reduceFairShare, 0.1);
    assertEquals(0.33,   info6.mapFairShare, 0.1);
    assertEquals(0.33,   info6.reduceFairShare, 0.1);
    assertEquals(0.33,   info7.mapFairShare, 0.1);
    assertEquals(0.33,   info7.reduceFairShare, 0.1);
    assertEquals(0.0,    info8.mapFairShare);
    assertEquals(0.0,    info8.reduceFairShare);
    assertEquals(2.0,    info9.mapFairShare, 0.1);
    assertEquals(2.0,    info9.reduceFairShare, 0.1);
    assertEquals(0.0,    info10.mapFairShare);
    assertEquals(0.0,    info10.reduceFairShare);
  }
  
  public void testSizeBasedWeight() throws Exception {
    scheduler.sizeBasedWeight = true;
    JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 10);
    JobInProgress job2 = submitJob(JobStatus.RUNNING, 20, 1);
    assertTrue(scheduler.infos.get(job2).mapFairShare >
               scheduler.infos.get(job1).mapFairShare);
    assertTrue(scheduler.infos.get(job1).reduceFairShare >
               scheduler.infos.get(job2).reduceFairShare);
  }
  
  public void testWaitForMapsBeforeLaunchingReduces() {
    // We have set waitForMapsBeforeLaunchingReduces to false by default in
    // this class, so this should return true
    assertTrue(scheduler.enoughMapsFinishedToRunReduces(0, 100));
    
    // However, if we set waitForMapsBeforeLaunchingReduces to true, we should
    // now no longer be able to assign reduces until 5 have finished
    scheduler.waitForMapsBeforeLaunchingReduces = true;
    assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 100));
    assertFalse(scheduler.enoughMapsFinishedToRunReduces(1, 100));
    assertFalse(scheduler.enoughMapsFinishedToRunReduces(2, 100));
    assertFalse(scheduler.enoughMapsFinishedToRunReduces(3, 100));
    assertFalse(scheduler.enoughMapsFinishedToRunReduces(4, 100));
    assertTrue(scheduler.enoughMapsFinishedToRunReduces(5, 100));
    assertTrue(scheduler.enoughMapsFinishedToRunReduces(6, 100));
    
    // Also test some jobs that have very few maps, in which case we will
    // wait for at least 1 map to finish
    assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 5));
    assertTrue(scheduler.enoughMapsFinishedToRunReduces(1, 5));
    assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 1));
    assertTrue(scheduler.enoughMapsFinishedToRunReduces(1, 1));
  }
  

  /**
   * This test submits jobs in three pools: poolA, which has a weight
   * of 2.0; poolB, which has a weight of 0.5; and the default pool, which
   * should have a weight of 1.0. It then checks that the map and reduce
   * fair shares are given out accordingly. We then submit a second job to
   * pool B and check that each gets half of the pool (weight of 0.25).
   */
  public void testPoolWeights() throws Exception {
    // Set up pools file
    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
    out.println("<?xml version=\"1.0\"?>");
    out.println("<allocations>");
    out.println("<pool name=\"poolA\">");
    out.println("<weight>2.0</weight>");
    out.println("</pool>");
    out.println("<pool name=\"poolB\">");
    out.println("<weight>0.5</weight>");
    out.println("</pool>");
    out.println("</allocations>");
    out.close();
    scheduler.getPoolManager().reloadAllocs();
    
    // Submit jobs, advancing time in-between to make sure that they are
    // all submitted at distinct times.
    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
    JobInfo info1 = scheduler.infos.get(job1);
    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
    JobInfo info2 = scheduler.infos.get(job2);
    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
    JobInfo info3 = scheduler.infos.get(job3);
    advanceTime(10);
    
    assertEquals(1.14,  info1.mapFairShare, 0.01);
    assertEquals(1.14,  info1.reduceFairShare, 0.01);
    assertEquals(2.28,  info2.mapFairShare, 0.01);
    assertEquals(2.28,  info2.reduceFairShare, 0.01);
    assertEquals(0.57,  info3.mapFairShare, 0.01);
    assertEquals(0.57,  info3.reduceFairShare, 0.01);
    
    JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
    JobInfo info4 = scheduler.infos.get(job4);
    advanceTime(10);
    
    assertEquals(1.14,  info1.mapFairShare, 0.01);
    assertEquals(1.14,  info1.reduceFairShare, 0.01);
    assertEquals(2.28,  info2.mapFairShare, 0.01);
    assertEquals(2.28,  info2.reduceFairShare, 0.01);
    assertEquals(0.28,  info3.mapFairShare, 0.01);
    assertEquals(0.28,  info3.reduceFairShare, 0.01);
    assertEquals(0.28,  info4.mapFairShare, 0.01);
    assertEquals(0.28,  info4.reduceFairShare, 0.01);
  }

  /**
   * This test submits jobs in two pools, poolA and poolB. None of the
   * jobs in poolA have maps, but this should not affect their reduce
   * share.
   */
  public void testPoolWeightsWhenNoMaps() throws Exception {
    // Set up pools file
    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
    out.println("<?xml version=\"1.0\"?>");
    out.println("<allocations>");
    out.println("<pool name=\"poolA\">");
    out.println("<weight>2.0</weight>");
    out.println("</pool>");
    out.println("<pool name=\"poolB\">");
    out.println("<weight>1.0</weight>");
    out.println("</pool>");
    out.println("</allocations>");
    out.close();
    scheduler.getPoolManager().reloadAllocs();
    
    // Submit jobs, advancing time in-between to make sure that they are
    // all submitted at distinct times.
    JobInProgress job1 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
    JobInfo info1 = scheduler.infos.get(job1);
    JobInProgress job2 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
    JobInfo info2 = scheduler.infos.get(job2);
    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
    JobInfo info3 = scheduler.infos.get(job3);
    advanceTime(10);
    
    assertEquals(0,     info1.mapWeight, 0.01);
    assertEquals(1.0,   info1.reduceWeight, 0.01);
    assertEquals(0,     info2.mapWeight, 0.01);
    assertEquals(1.0,   info2.reduceWeight, 0.01);
    assertEquals(1.0,   info3.mapWeight, 0.01);
    assertEquals(1.0,   info3.reduceWeight, 0.01);
    
    assertEquals(0,     info1.mapFairShare, 0.01);
    assertEquals(1.33,  info1.reduceFairShare, 0.01);
    assertEquals(0,     info2.mapFairShare, 0.01);
    assertEquals(1.33,  info2.reduceFairShare, 0.01);
    assertEquals(4,     info3.mapFairShare, 0.01);
    assertEquals(1.33,  info3.reduceFairShare, 0.01);
  }

  /**
   * Tests that max-running-tasks per node are set by assigning load
   * equally accross the cluster in CapBasedLoadManager.
   */
  public void testCapBasedLoadManager() {
    CapBasedLoadManager loadMgr = new CapBasedLoadManager();
    // Arguments to getCap: totalRunnableTasks, nodeCap, totalSlots
    // Desired behavior: return ceil(nodeCap * min(1, runnableTasks/totalSlots))
    assertEquals(1, loadMgr.getCap(1, 1, 100));
    assertEquals(1, loadMgr.getCap(1, 2, 100));
    assertEquals(1, loadMgr.getCap(1, 10, 100));
    assertEquals(1, loadMgr.getCap(200, 1, 100));
    assertEquals(1, loadMgr.getCap(1, 5, 100));
    assertEquals(3, loadMgr.getCap(50, 5, 100));
    assertEquals(5, loadMgr.getCap(100, 5, 100));
    assertEquals(5, loadMgr.getCap(200, 5, 100));
  }
  
  private void advanceTime(long time) {
    clock.advance(time);
    scheduler.update();
  }

  protected TaskTrackerStatus tracker(String taskTrackerName) {
    return taskTrackerManager.getTaskTracker(taskTrackerName);
  }
  
  protected void checkAssignment(String taskTrackerName,
      String expectedTaskString) throws IOException {
    List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
    assertNotNull(expectedTaskString, tasks);
    assertEquals(expectedTaskString, 1, tasks.size());
    assertEquals(expectedTaskString, tasks.get(0).toString());
  }
  
}

Generated by  Doxygen 1.6.0   Back to index