Logo Search packages:      
Sourcecode: hadoop version File versions  Download package

DistributedCache.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

00019 package org.apache.hadoop.filecache;

import org.apache.commons.logging.*;
import java.io.*;
import java.util.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.*;

import java.net.URI;

/**
 * Distribute application-specific large, read-only files efficiently.
 * 
 * <p><code>DistributedCache</code> is a facility provided by the Map-Reduce
 * framework to cache files (text, archives, jars etc.) needed by applications.
 * </p>
 * 
 * <p>Applications specify the files, via urls (hdfs:// or http://) to be cached 
 * via the {@link org.apache.hadoop.mapred.JobConf}.
 * The <code>DistributedCache</code> assumes that the
 * files specified via hdfs:// urls are already present on the 
 * {@link FileSystem} at the path specified by the url.</p>
 * 
 * <p>The framework will copy the necessary files on to the slave node before 
 * any tasks for the job are executed on that node. Its efficiency stems from 
 * the fact that the files are only copied once per job and the ability to 
 * cache archives which are un-archived on the slaves.</p> 
 *
 * <p><code>DistributedCache</code> can be used to distribute simple, read-only
 * data/text files and/or more complex types such as archives, jars etc. 
 * Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes. 
 * Jars may be optionally added to the classpath of the tasks, a rudimentary 
 * software distribution mechanism.  Files have execution permissions.
 * Optionally users can also direct it to symlink the distributed cache file(s)
 * into the working directory of the task.</p>
 * 
 * <p><code>DistributedCache</code> tracks modification timestamps of the cache 
 * files. Clearly the cache files should not be modified by the application 
 * or externally while the job is executing.</p>
 * 
 * <p>Here is an illustrative example on how to use the 
 * <code>DistributedCache</code>:</p>
 * <p><blockquote><pre>
 *     // Setting up the cache for the application
 *     
 *     1. Copy the requisite files to the <code>FileSystem</code>:
 *     
 *     $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat  
 *     $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip  
 *     $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
 *     $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
 *     $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
 *     $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
 *     
 *     2. Setup the application's <code>JobConf</code>:
 *     
 *     JobConf job = new JobConf();
 *     DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), 
 *                                   job);
 *     DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
 *     DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
 *     DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
 *     DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
 *     DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
 *     
 *     3. Use the cached files in the {@link org.apache.hadoop.mapred.Mapper}
 *     or {@link org.apache.hadoop.mapred.Reducer}:
 *     
 *     public static class MapClass extends MapReduceBase  
 *     implements Mapper&lt;K, V, K, V&gt; {
 *     
 *       private Path[] localArchives;
 *       private Path[] localFiles;
 *       
 *       public void configure(JobConf job) {
 *         // Get the cached archives/files
 *         localArchives = DistributedCache.getLocalCacheArchives(job);
 *         localFiles = DistributedCache.getLocalCacheFiles(job);
 *       }
 *       
 *       public void map(K key, V value, 
 *                       OutputCollector&lt;K, V&gt; output, Reporter reporter) 
 *       throws IOException {
 *         // Use data from the cached archives/files here
 *         // ...
 *         // ...
 *         output.collect(k, v);
 *       }
 *     }
 *     
 * </pre></blockquote></p>
 * 
 * @see org.apache.hadoop.mapred.JobConf
 * @see org.apache.hadoop.mapred.JobClient
 */
00115 public class DistributedCache {
  // cacheID to cacheStatus mapping
  private static TreeMap<String, CacheStatus> cachedArchives = new TreeMap<String, CacheStatus>();
  
  private static TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
  
  // default total cache size
  private static final long DEFAULT_CACHE_SIZE = 10737418240L;

  private static final Log LOG =
    LogFactory.getLog(DistributedCache.class);
  
  /**
   * Get the locally cached file or archive; it could either be 
   * previously cached (and valid) or copy it from the {@link FileSystem} now.
   * 
   * @param cache the cache to be localized, this should be specified as 
   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
   * or hostname:port is provided the file is assumed to be in the filesystem
   * being used in the Configuration
   * @param conf The Confguration file which contains the filesystem
   * @param baseDir The base cache Dir where you wnat to localize the files/archives
   * @param fileStatus The file status on the dfs.
   * @param isArchive if the cache is an archive or a file. In case it is an
   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
   *  be unzipped/unjarred/untarred automatically 
   *  and the directory where the archive is unzipped/unjarred/untarred is
   *  returned as the Path.
   *  In case of a file, the path to the file is returned
   * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
   * file to be cached hasn't changed since the job started
   * @param currentWorkDir this is the directory where you would want to create symlinks 
   * for the locally cached files/archives
   * @return the path to directory where the archives are unjarred in case of archives,
   * the path to the file where the file is copied locally 
   * @throws IOException
   */
00152   public static Path getLocalCache(URI cache, Configuration conf, 
                                   Path baseDir, FileStatus fileStatus,
                                   boolean isArchive, long confFileStamp,
                                   Path currentWorkDir) 
  throws IOException {
    return getLocalCache(cache, conf, baseDir, fileStatus, isArchive, 
        confFileStamp, currentWorkDir, true);
  }
  /**
   * Get the locally cached file or archive; it could either be 
   * previously cached (and valid) or copy it from the {@link FileSystem} now.
   * 
   * @param cache the cache to be localized, this should be specified as 
   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
   * or hostname:port is provided the file is assumed to be in the filesystem
   * being used in the Configuration
   * @param conf The Confguration file which contains the filesystem
   * @param baseDir The base cache Dir where you wnat to localize the files/archives
   * @param fileStatus The file status on the dfs.
   * @param isArchive if the cache is an archive or a file. In case it is an
   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
   *  be unzipped/unjarred/untarred automatically 
   *  and the directory where the archive is unzipped/unjarred/untarred is
   *  returned as the Path.
   *  In case of a file, the path to the file is returned
   * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
   * file to be cached hasn't changed since the job started
   * @param currentWorkDir this is the directory where you would want to create symlinks 
   * for the locally cached files/archives
   * @param honorSymLinkConf if this is false, then the symlinks are not
   * created even if conf says so (this is required for an optimization in task
   * launches
   * @return the path to directory where the archives are unjarred in case of archives,
   * the path to the file where the file is copied locally 
   * @throws IOException
   */
00188   public static Path getLocalCache(URI cache, Configuration conf, 
      Path baseDir, FileStatus fileStatus,
      boolean isArchive, long confFileStamp,
      Path currentWorkDir, boolean honorSymLinkConf) 
  throws IOException {
    String cacheId = makeRelative(cache, conf);
    CacheStatus lcacheStatus;
    Path localizedPath;
    synchronized (cachedArchives) {
      lcacheStatus = cachedArchives.get(cacheId);
      if (lcacheStatus == null) {
        // was never localized
        lcacheStatus = new CacheStatus(baseDir, new Path(baseDir, new Path(cacheId)));
        cachedArchives.put(cacheId, lcacheStatus);
      }

      synchronized (lcacheStatus) {
        localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus, 
            fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
        lcacheStatus.refcount++;
      }
    }

    // try deleting stuff if you can
    long size = 0;
    synchronized (baseDirSize) {
      Long get = baseDirSize.get(baseDir);
      if ( get != null ) {
      size = get.longValue();
      }
    }
    // setting the cache size to a default of 10GB
    long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
    if (allowedSize < size) {
      // try some cache deletions
      deleteCache(conf);
    }
    return localizedPath;
  }

  
  /**
   * Get the locally cached file or archive; it could either be 
   * previously cached (and valid) or copy it from the {@link FileSystem} now.
   * 
   * @param cache the cache to be localized, this should be specified as 
   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
   * or hostname:port is provided the file is assumed to be in the filesystem
   * being used in the Configuration
   * @param conf The Confguration file which contains the filesystem
   * @param baseDir The base cache Dir where you wnat to localize the files/archives
   * @param isArchive if the cache is an archive or a file. In case it is an 
   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will 
   *  be unzipped/unjarred/untarred automatically 
   *  and the directory where the archive is unzipped/unjarred/untarred 
   *  is returned as the Path.
   *  In case of a file, the path to the file is returned
   * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
   * file to be cached hasn't changed since the job started
   * @param currentWorkDir this is the directory where you would want to create symlinks 
   * for the locally cached files/archives
   * @return the path to directory where the archives are unjarred in case of archives,
   * the path to the file where the file is copied locally 
   * @throws IOException

   */
00254   public static Path getLocalCache(URI cache, Configuration conf, 
                                   Path baseDir, boolean isArchive,
                                   long confFileStamp, Path currentWorkDir) 
  throws IOException {
    return getLocalCache(cache, conf, 
                         baseDir, null, isArchive,
                         confFileStamp, currentWorkDir);
  }
  
  /**
   * This is the opposite of getlocalcache. When you are done with
   * using the cache, you need to release the cache
   * @param cache The cache URI to be released
   * @param conf configuration which contains the filesystem the cache 
   * is contained in.
   * @throws IOException
   */
00271   public static void releaseCache(URI cache, Configuration conf)
    throws IOException {
    String cacheId = makeRelative(cache, conf);
    synchronized (cachedArchives) {
      CacheStatus lcacheStatus = cachedArchives.get(cacheId);
      if (lcacheStatus == null)
        return;
      synchronized (lcacheStatus) {
        lcacheStatus.refcount--;
      }
    }
  }
  
  // To delete the caches which have a refcount of zero
  
  private static void deleteCache(Configuration conf) throws IOException {
    // try deleting cache Status with refcount of zero
    synchronized (cachedArchives) {
      for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {
        String cacheId = (String) it.next();
        CacheStatus lcacheStatus = cachedArchives.get(cacheId);
        synchronized (lcacheStatus) {
          if (lcacheStatus.refcount == 0) {
            // delete this cache entry
            FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
            synchronized (baseDirSize) {
              Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
              if ( dirSize != null ) {
                  dirSize -= lcacheStatus.size;
                  baseDirSize.put(lcacheStatus.baseDir, dirSize);
              }
            }
            it.remove();
          }
        }
      }
    }
  }

  /*
   * Returns the relative path of the dir this cache will be localized in
   * relative path that this cache will be localized in. For
   * hdfs://hostname:port/absolute_path -- the relative path is
   * hostname/absolute path -- if it is just /absolute_path -- then the
   * relative path is hostname of DFS this mapred cluster is running
   * on/absolute_path
   */
  public static String makeRelative(URI cache, Configuration conf)
    throws IOException {
    String host = cache.getHost();
    if (host == null) {
      host = cache.getScheme();
    }
    if (host == null) {
      URI defaultUri = FileSystem.get(conf).getUri();
      host = defaultUri.getHost();
      if (host == null) {
        host = defaultUri.getScheme();
      }
    }
    String path = host + cache.getPath();
    path = path.replace(":/","/");                // remove windows device colon
    return path;
  }

  private static Path cacheFilePath(Path p) {
    return new Path(p, p.getName());
  }

  // the method which actually copies the caches locally and unjars/unzips them
  // and does chmod for the files
  private static Path localizeCache(Configuration conf, 
                                    URI cache, long confFileStamp,
                                    CacheStatus cacheStatus,
                                    FileStatus fileStatus,
                                    boolean isArchive, 
                                    Path currentWorkDir,boolean honorSymLinkConf) 
  throws IOException {
    boolean doSymlink = honorSymLinkConf && getSymlink(conf);
    if(cache.getFragment() == null) {
      doSymlink = false;
    }
    FileSystem fs = getFileSystem(cache, conf);
    String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
    File flink = new File(link);
    if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
                           cacheStatus, fileStatus)) {
      if (isArchive) {
        if (doSymlink){
          if (!flink.exists())
            FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
                             link);
        }
        return cacheStatus.localLoadPath;
      }
      else {
        if (doSymlink){
          if (!flink.exists())
            FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
                             link);
        }
        return cacheFilePath(cacheStatus.localLoadPath);
      }
    } else {
      // remove the old archive
      // if the old archive cannot be removed since it is being used by another
      // job
      // return null
      if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
        throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
                              + " is in use and cannot be refreshed");
      
      FileSystem localFs = FileSystem.getLocal(conf);
      localFs.delete(cacheStatus.localLoadPath, true);
      synchronized (baseDirSize) {
      Long dirSize = baseDirSize.get(cacheStatus.baseDir);
      if ( dirSize != null ) {
        dirSize -= cacheStatus.size;
        baseDirSize.put(cacheStatus.baseDir, dirSize);
      }
      }
      Path parchive = new Path(cacheStatus.localLoadPath,
                               new Path(cacheStatus.localLoadPath.getName()));
      
      if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
        throw new IOException("Mkdirs failed to create directory " + 
                              cacheStatus.localLoadPath.toString());
      }

      String cacheId = cache.getPath();
      fs.copyToLocalFile(new Path(cacheId), parchive);
      if (isArchive) {
        String tmpArchive = parchive.toString().toLowerCase();
        File srcFile = new File(parchive.toString());
        File destDir = new File(parchive.getParent().toString());
        if (tmpArchive.endsWith(".jar")) {
          RunJar.unJar(srcFile, destDir);
        } else if (tmpArchive.endsWith(".zip")) {
          FileUtil.unZip(srcFile, destDir);
        } else if (isTarFile(tmpArchive)) {
          FileUtil.unTar(srcFile, destDir);
        }
        // else will not do anyhting
        // and copy the file into the dir as it is
      }
      
      long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
      cacheStatus.size = cacheSize;
      synchronized (baseDirSize) {
            Long dirSize = baseDirSize.get(cacheStatus.baseDir);
            if( dirSize == null ) {
              dirSize = Long.valueOf(cacheSize);
            } else {
              dirSize += cacheSize;
            }
            baseDirSize.put(cacheStatus.baseDir, dirSize);
      }
      
      // do chmod here 
      try {
      FileUtil.chmod(parchive.toString(), "+x");
      } catch(InterruptedException e) {
      LOG.warn("Exception in chmod" + e.toString());
      }

      // update cacheStatus to reflect the newly cached file
      cacheStatus.currentStatus = true;
      cacheStatus.mtime = getTimestamp(conf, cache);
    }
    
    if (isArchive){
      if (doSymlink){
        if (!flink.exists())
          FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
                           link);
      }
      return cacheStatus.localLoadPath;
    }
    else {
      if (doSymlink){
        if (!flink.exists())
          FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
                           link);
      }
      return cacheFilePath(cacheStatus.localLoadPath);
    }
  }

  private static boolean isTarFile(String filename) {
    return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
           filename.endsWith(".tar"));
  }
  
  // Checks if the cache has already been localized and is fresh
  private static boolean ifExistsAndFresh(Configuration conf, FileSystem fs, 
                                          URI cache, long confFileStamp, 
                                          CacheStatus lcacheStatus,
                                          FileStatus fileStatus) 
  throws IOException {
    // check for existence of the cache
    if (lcacheStatus.currentStatus == false) {
      return false;
    } else {
      long dfsFileStamp;
      if (fileStatus != null) {
        dfsFileStamp = fileStatus.getModificationTime();
      } else {
        dfsFileStamp = getTimestamp(conf, cache);
      }

      // ensure that the file on hdfs hasn't been modified since the job started 
      if (dfsFileStamp != confFileStamp) {
        LOG.fatal("File: " + cache + " has changed on HDFS since job started");
        throw new IOException("File: " + cache + 
                              " has changed on HDFS since job started");
      }
      
      if (dfsFileStamp != lcacheStatus.mtime) {
        // needs refreshing
        return false;
      }
    }
    
    return true;
  }

  /**
   * Returns mtime of a given cache file on hdfs.
   * @param conf configuration
   * @param cache cache file 
   * @return mtime of a given cache file on hdfs
   * @throws IOException
   */
00504   public static long getTimestamp(Configuration conf, URI cache)
    throws IOException {
    FileSystem fileSystem = FileSystem.get(cache, conf);
    Path filePath = new Path(cache.getPath());

    return fileSystem.getFileStatus(filePath).getModificationTime();
  }
  
  /**
   * This method create symlinks for all files in a given dir in another directory
   * @param conf the configuration
   * @param jobCacheDir the target directory for creating symlinks
   * @param workDir the directory in which the symlinks are created
   * @throws IOException
   */
00519   public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
    throws IOException{
    if ((jobCacheDir == null || !jobCacheDir.isDirectory()) ||
           workDir == null || (!workDir.isDirectory())) {
      return;
    }
    boolean createSymlink = getSymlink(conf);
    if (createSymlink){
      File[] list = jobCacheDir.listFiles();
      for (int i=0; i < list.length; i++){
        FileUtil.symLink(list[i].getAbsolutePath(),
                         new File(workDir, list[i].getName()).toString());
      }
    }  
  }
  
  private static String getFileSysName(URI url) {
    String fsname = url.getScheme();
    if ("hdfs".equals(fsname)) {
      String host = url.getHost();
      int port = url.getPort();
      return (port == (-1)) ? host : (host + ":" + port);
    } else {
      return null;
    }
  }
  
  private static FileSystem getFileSystem(URI cache, Configuration conf)
    throws IOException {
    String fileSysName = getFileSysName(cache);
    if (fileSysName != null)
      return FileSystem.getNamed(fileSysName, conf);
    else
      return FileSystem.get(conf);
  }

  /**
   * Set the configuration with the given set of archives
   * @param archives The list of archives that need to be localized
   * @param conf Configuration which will be changed
   */
00560   public static void setCacheArchives(URI[] archives, Configuration conf) {
    String sarchives = StringUtils.uriToString(archives);
    conf.set("mapred.cache.archives", sarchives);
  }

  /**
   * Set the configuration with the given set of files
   * @param files The list of files that need to be localized
   * @param conf Configuration which will be changed
   */
00570   public static void setCacheFiles(URI[] files, Configuration conf) {
    String sfiles = StringUtils.uriToString(files);
    conf.set("mapred.cache.files", sfiles);
  }

  /**
   * Get cache archives set in the Configuration
   * @param conf The configuration which contains the archives
   * @return A URI array of the caches set in the Configuration
   * @throws IOException
   */
00581   public static URI[] getCacheArchives(Configuration conf) throws IOException {
    return StringUtils.stringToURI(conf.getStrings("mapred.cache.archives"));
  }

  /**
   * Get cache files set in the Configuration
   * @param conf The configuration which contains the files
   * @return A URI array of the files set in the Configuration
   * @throws IOException
   */

00592   public static URI[] getCacheFiles(Configuration conf) throws IOException {
    return StringUtils.stringToURI(conf.getStrings("mapred.cache.files"));
  }

  /**
   * Return the path array of the localized caches
   * @param conf Configuration that contains the localized archives
   * @return A path array of localized caches
   * @throws IOException
   */
00602   public static Path[] getLocalCacheArchives(Configuration conf)
    throws IOException {
    return StringUtils.stringToPath(conf
                                    .getStrings("mapred.cache.localArchives"));
  }

  /**
   * Return the path array of the localized files
   * @param conf Configuration that contains the localized files
   * @return A path array of localized files
   * @throws IOException
   */
00614   public static Path[] getLocalCacheFiles(Configuration conf)
    throws IOException {
    return StringUtils.stringToPath(conf.getStrings("mapred.cache.localFiles"));
  }

  /**
   * Get the timestamps of the archives
   * @param conf The configuration which stored the timestamps
   * @return a string array of timestamps 
   * @throws IOException
   */
00625   public static String[] getArchiveTimestamps(Configuration conf) {
    return conf.getStrings("mapred.cache.archives.timestamps");
  }


  /**
   * Get the timestamps of the files
   * @param conf The configuration which stored the timestamps
   * @return a string array of timestamps 
   * @throws IOException
   */
00636   public static String[] getFileTimestamps(Configuration conf) {
    return conf.getStrings("mapred.cache.files.timestamps");
  }

  /**
   * This is to check the timestamp of the archives to be localized
   * @param conf Configuration which stores the timestamp's
   * @param timestamps comma separated list of timestamps of archives.
   * The order should be the same as the order in which the archives are added.
   */
00646   public static void setArchiveTimestamps(Configuration conf, String timestamps) {
    conf.set("mapred.cache.archives.timestamps", timestamps);
  }

  /**
   * This is to check the timestamp of the files to be localized
   * @param conf Configuration which stores the timestamp's
   * @param timestamps comma separated list of timestamps of files.
   * The order should be the same as the order in which the files are added.
   */
00656   public static void setFileTimestamps(Configuration conf, String timestamps) {
    conf.set("mapred.cache.files.timestamps", timestamps);
  }
  
  /**
   * Set the conf to contain the location for localized archives 
   * @param conf The conf to modify to contain the localized caches
   * @param str a comma separated list of local archives
   */
00665   public static void setLocalArchives(Configuration conf, String str) {
    conf.set("mapred.cache.localArchives", str);
  }

  /**
   * Set the conf to contain the location for localized files 
   * @param conf The conf to modify to contain the localized caches
   * @param str a comma separated list of local files
   */
00674   public static void setLocalFiles(Configuration conf, String str) {
    conf.set("mapred.cache.localFiles", str);
  }

  /**
   * Add a archives to be localized to the conf
   * @param uri The uri of the cache to be localized
   * @param conf Configuration to add the cache to
   */
00683   public static void addCacheArchive(URI uri, Configuration conf) {
    String archives = conf.get("mapred.cache.archives");
    conf.set("mapred.cache.archives", archives == null ? uri.toString()
             : archives + "," + uri.toString());
  }
  
  /**
   * Add a file to be localized to the conf
   * @param uri The uri of the cache to be localized
   * @param conf Configuration to add the cache to
   */
00694   public static void addCacheFile(URI uri, Configuration conf) {
    String files = conf.get("mapred.cache.files");
    conf.set("mapred.cache.files", files == null ? uri.toString() : files + ","
             + uri.toString());
  }

  /**
   * Add an file path to the current set of classpath entries It adds the file
   * to cache as well.
   * 
   * @param file Path of the file to be added
   * @param conf Configuration that contains the classpath setting
   */
00707   public static void addFileToClassPath(Path file, Configuration conf)
    throws IOException {
    String classpath = conf.get("mapred.job.classpath.files");
    conf.set("mapred.job.classpath.files", classpath == null ? file.toString()
             : classpath + System.getProperty("path.separator") + file.toString());
    FileSystem fs = FileSystem.get(conf);
    URI uri = fs.makeQualified(file).toUri();

    addCacheFile(uri, conf);
  }

  /**
   * Get the file entries in classpath as an array of Path
   * 
   * @param conf Configuration that contains the classpath setting
   */
00723   public static Path[] getFileClassPaths(Configuration conf) {
    String classpath = conf.get("mapred.job.classpath.files");
    if (classpath == null)
      return null;
    ArrayList list = Collections.list(new StringTokenizer(classpath, System
                                                          .getProperty("path.separator")));
    Path[] paths = new Path[list.size()];
    for (int i = 0; i < list.size(); i++) {
      paths[i] = new Path((String) list.get(i));
    }
    return paths;
  }

  /**
   * Add an archive path to the current set of classpath entries. It adds the
   * archive to cache as well.
   * 
   * @param archive Path of the archive to be added
   * @param conf Configuration that contains the classpath setting
   */
00743   public static void addArchiveToClassPath(Path archive, Configuration conf)
    throws IOException {
    String classpath = conf.get("mapred.job.classpath.archives");
    conf.set("mapred.job.classpath.archives", classpath == null ? archive
             .toString() : classpath + System.getProperty("path.separator")
             + archive.toString());
    FileSystem fs = FileSystem.get(conf);
    URI uri = fs.makeQualified(archive).toUri();

    addCacheArchive(uri, conf);
  }

  /**
   * Get the archive entries in classpath as an array of Path
   * 
   * @param conf Configuration that contains the classpath setting
   */
00760   public static Path[] getArchiveClassPaths(Configuration conf) {
    String classpath = conf.get("mapred.job.classpath.archives");
    if (classpath == null)
      return null;
    ArrayList list = Collections.list(new StringTokenizer(classpath, System
                                                          .getProperty("path.separator")));
    Path[] paths = new Path[list.size()];
    for (int i = 0; i < list.size(); i++) {
      paths[i] = new Path((String) list.get(i));
    }
    return paths;
  }

  /**
   * This method allows you to create symlinks in the current working directory
   * of the task to all the cache files/archives
   * @param conf the jobconf 
   */
00778   public static void createSymlink(Configuration conf){
    conf.set("mapred.create.symlink", "yes");
  }
  
  /**
   * This method checks to see if symlinks are to be create for the 
   * localized cache files in the current working directory 
   * @param conf the jobconf
   * @return true if symlinks are to be created- else return false
   */
00788   public static boolean getSymlink(Configuration conf){
    String result = conf.get("mapred.create.symlink");
    if ("yes".equals(result)){
      return true;
    }
    return false;
  }

  /**
   * This method checks if there is a conflict in the fragment names 
   * of the uris. Also makes sure that each uri has a fragment. It 
   * is only to be called if you want to create symlinks for 
   * the various archives and files.
   * @param uriFiles The uri array of urifiles
   * @param uriArchives the uri array of uri archives
   */
00804   public static boolean checkURIs(URI[]  uriFiles, URI[] uriArchives){
    if ((uriFiles == null) && (uriArchives == null)){
      return true;
    }
    if (uriFiles != null){
      for (int i = 0; i < uriFiles.length; i++){
        String frag1 = uriFiles[i].getFragment();
        if (frag1 == null)
          return false;
        for (int j=i+1; j < uriFiles.length; j++){
          String frag2 = uriFiles[j].getFragment();
          if (frag2 == null)
            return false;
          if (frag1.equalsIgnoreCase(frag2))
            return false;
        }
        if (uriArchives != null){
          for (int j = 0; j < uriArchives.length; j++){
            String frag2 = uriArchives[j].getFragment();
            if (frag2 == null){
              return false;
            }
            if (frag1.equalsIgnoreCase(frag2))
              return false;
            for (int k=j+1; k < uriArchives.length; k++){
              String frag3 = uriArchives[k].getFragment();
              if (frag3 == null)
                return false;
              if (frag2.equalsIgnoreCase(frag3))
                return false;
            }
          }
        }
      }
    }
    return true;
  }

  private static class CacheStatus {
    // false, not loaded yet, true is loaded
    boolean currentStatus;

    // the local load path of this cache
    Path localLoadPath;
    
    //the base dir where the cache lies
    Path baseDir;
    
    //the size of this cache
    long size;

    // number of instances using this cache
    int refcount;

    // the cache-file modification time
    long mtime;

    public CacheStatus(Path baseDir, Path localLoadPath) {
      super();
      this.currentStatus = false;
      this.localLoadPath = localLoadPath;
      this.refcount = 0;
      this.mtime = -1;
      this.baseDir = baseDir;
      this.size = 0;
    }
  }

  /**
   * Clear the entire contents of the cache and delete the backing files. This
   * should only be used when the server is reinitializing, because the users
   * are going to lose their files.
   */
00877   public static void purgeCache(Configuration conf) throws IOException {
    synchronized (cachedArchives) {
      FileSystem localFs = FileSystem.getLocal(conf);
      for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
        try {
          localFs.delete(f.getValue().localLoadPath, true);
        } catch (IOException ie) {
          LOG.debug("Error cleaning up cache", ie);
        }
      }
      cachedArchives.clear();
    }
  }
}

Generated by  Doxygen 1.6.0   Back to index