Logo Search packages:      
Sourcecode: hadoop version File versions  Download package

HadoopPipes.cc

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "hadoop/Pipes.hh"
#include "hadoop/SerialUtils.hh"
#include "hadoop/StringUtils.hh"

#include <map>
#include <vector>

#include <errno.h>
#include <netinet/in.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <sys/socket.h>
#include <pthread.h>

using std::map;
using std::string;
using std::vector;

using namespace HadoopUtils;

namespace HadoopPipes {

  class JobConfImpl: public JobConf {
  private:
    map<string, string> values;
  public:
    void set(const string& key, const string& value) {
      values[key] = value;
    }

    virtual bool hasKey(const string& key) const {
      return values.find(key) != values.end();
    }

    virtual const string& get(const string& key) const {
      map<string,string>::const_iterator itr = values.find(key);
      if (itr == values.end()) {
        throw Error("Key " + key + " not found in JobConf");
      }
      return itr->second;
    }

    virtual int getInt(const string& key) const {
      const string& val = get(key);
      return toInt(val);
    }

    virtual float getFloat(const string& key) const {
      const string& val = get(key);
      return toFloat(val);
    }

    virtual bool getBoolean(const string&key) const {
      const string& val = get(key);
      return toBool(val);
    }
  };

  class DownwardProtocol {
  public:
    virtual void start(int protocol) = 0;
    virtual void setJobConf(vector<string> values) = 0;
    virtual void setInputTypes(string keyType, string valueType) = 0;
    virtual void runMap(string inputSplit, int numReduces, bool pipedInput)= 0;
    virtual void mapItem(const string& key, const string& value) = 0;
    virtual void runReduce(int reduce, bool pipedOutput) = 0;
    virtual void reduceKey(const string& key) = 0;
    virtual void reduceValue(const string& value) = 0;
    virtual void close() = 0;
    virtual void abort() = 0;
    virtual ~DownwardProtocol() {}
  };

  class UpwardProtocol {
  public:
    virtual void output(const string& key, const string& value) = 0;
    virtual void partitionedOutput(int reduce, const string& key,
                                   const string& value) = 0;
    virtual void status(const string& message) = 0;
    virtual void progress(float progress) = 0;
    virtual void done() = 0;
    virtual void registerCounter(int id, const string& group, 
                                 const string& name) = 0;
    virtual void 
      incrementCounter(const TaskContext::Counter* counter, uint64_t amount) = 0;
    virtual ~UpwardProtocol() {}
  };

  class Protocol {
  public:
    virtual void nextEvent() = 0;
    virtual UpwardProtocol* getUplink() = 0;
    virtual ~Protocol() {}
  };

  class TextUpwardProtocol: public UpwardProtocol {
  private:
    FILE* stream;
    static const char fieldSeparator = '\t';
    static const char lineSeparator = '\n';

    void writeBuffer(const string& buffer) {
      fprintf(stream, quoteString(buffer, "\t\n").c_str());
    }

  public:
    TextUpwardProtocol(FILE* _stream): stream(_stream) {}
    
    virtual void output(const string& key, const string& value) {
      fprintf(stream, "output%c", fieldSeparator);
      writeBuffer(key);
      fprintf(stream, "%c", fieldSeparator);
      writeBuffer(value);
      fprintf(stream, "%c", lineSeparator);
    }

    virtual void partitionedOutput(int reduce, const string& key,
                                   const string& value) {
      fprintf(stream, "parititionedOutput%c%d%c", fieldSeparator, reduce, 
              fieldSeparator);
      writeBuffer(key);
      fprintf(stream, "%c", fieldSeparator);
      writeBuffer(value);
      fprintf(stream, "%c", lineSeparator);
    }

    virtual void status(const string& message) {
      fprintf(stream, "status%c%s%c", fieldSeparator, message.c_str(), 
              lineSeparator);
    }

    virtual void progress(float progress) {
      fprintf(stream, "progress%c%f%c", fieldSeparator, progress, 
              lineSeparator);
    }

    virtual void registerCounter(int id, const string& group, 
                                 const string& name) {
      fprintf(stream, "registerCounter%c%d%c%s%c%s%c", fieldSeparator, id,
              fieldSeparator, group.c_str(), fieldSeparator, name.c_str(), 
              lineSeparator);
    }

    virtual void incrementCounter(const TaskContext::Counter* counter, 
                                  uint64_t amount) {
      fprintf(stream, "incrCounter%c%d%c%ld%c", fieldSeparator, counter->getId(), 
              fieldSeparator, (long)amount, lineSeparator);
    }
    
    virtual void done() {
      fprintf(stream, "done%c", lineSeparator);
    }
  };

  class TextProtocol: public Protocol {
  private:
    FILE* downStream;
    DownwardProtocol* handler;
    UpwardProtocol* uplink;
    string key;
    string value;

    int readUpto(string& buffer, const char* limit) {
      int ch;
      buffer.clear();
      while ((ch = getc(downStream)) != -1) {
        if (strchr(limit, ch) != NULL) {
          return ch;
        }
        buffer += ch;
      }
      return -1;
    }

    static const char* delim;
  public:

    TextProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) {
      downStream = down;
      uplink = new TextUpwardProtocol(up);
      handler = _handler;
    }

    UpwardProtocol* getUplink() {
      return uplink;
    }

    virtual void nextEvent() {
      string command;
      string arg;
      int sep;
      sep = readUpto(command, delim);
      if (command == "mapItem") {
        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
        sep = readUpto(key, delim);
        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
        sep = readUpto(value, delim);
        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
        handler->mapItem(key, value);
      } else if (command == "reduceValue") {
        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
        sep = readUpto(value, delim);
        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
        handler->reduceValue(value);
      } else if (command == "reduceKey") {
        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
        sep = readUpto(key, delim);
        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
        handler->reduceKey(key);
      } else if (command == "start") {
        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
        sep = readUpto(arg, delim);
        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
        handler->start(toInt(arg));
      } else if (command == "setJobConf") {
        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
        sep = readUpto(arg, delim);
        int len = toInt(arg);
        vector<string> values(len);
        for(int i=0; i < len; ++i) {
          HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
          sep = readUpto(arg, delim);
          values.push_back(arg);
        }
        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
        handler->setJobConf(values);
      } else if (command == "setInputTypes") {
        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
        sep = readUpto(key, delim);
        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
        sep = readUpto(value, delim);
        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
        handler->setInputTypes(key, value);
      } else if (command == "runMap") {
        string split;
        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
        sep = readUpto(split, delim);
        string reduces;
        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
        sep = readUpto(reduces, delim);
        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
        sep = readUpto(arg, delim);
        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
        handler->runMap(split, toInt(reduces), toBool(arg));
      } else if (command == "runReduce") {
        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
        sep = readUpto(arg, delim);
        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
        string piped;
        sep = readUpto(piped, delim);
        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
        handler->runReduce(toInt(arg), toBool(piped));
      } else if (command == "abort") { 
        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
        handler->abort();
      } else if (command == "close") {
        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
        handler->close();
      } else {
        throw Error("Illegal text protocol command " + command);
      }
    }

    ~TextProtocol() {
      delete uplink;
    }
  };
  const char* TextProtocol::delim = "\t\n";

  enum MESSAGE_TYPE {START_MESSAGE, SET_JOB_CONF, SET_INPUT_TYPES, RUN_MAP, 
                     MAP_ITEM, RUN_REDUCE, REDUCE_KEY, REDUCE_VALUE, 
                     CLOSE, ABORT, 
                     OUTPUT=50, PARTITIONED_OUTPUT, STATUS, PROGRESS, DONE,
                     REGISTER_COUNTER, INCREMENT_COUNTER};

  class BinaryUpwardProtocol: public UpwardProtocol {
  private:
    FileOutStream* stream;
  public:
    BinaryUpwardProtocol(FILE* _stream) {
      stream = new FileOutStream();
      HADOOP_ASSERT(stream->open(_stream), "problem opening stream");
    }

    virtual void output(const string& key, const string& value) {
      serializeInt(OUTPUT, *stream);
      serializeString(key, *stream);
      serializeString(value, *stream);
    }

    virtual void partitionedOutput(int reduce, const string& key,
                                   const string& value) {
      serializeInt(PARTITIONED_OUTPUT, *stream);
      serializeInt(reduce, *stream);
      serializeString(key, *stream);
      serializeString(value, *stream);
    }

    virtual void status(const string& message) {
      serializeInt(STATUS, *stream);
      serializeString(message, *stream);
    }

    virtual void progress(float progress) {
      serializeInt(PROGRESS, *stream);
      serializeFloat(progress, *stream);
      stream->flush();
    }

    virtual void done() {
      serializeInt(DONE, *stream);
    }

    virtual void registerCounter(int id, const string& group, 
                                 const string& name) {
      serializeInt(REGISTER_COUNTER, *stream);
      serializeInt(id, *stream);
      serializeString(group, *stream);
      serializeString(name, *stream);
    }

    virtual void incrementCounter(const TaskContext::Counter* counter, 
                                  uint64_t amount) {
      serializeInt(INCREMENT_COUNTER, *stream);
      serializeInt(counter->getId(), *stream);
      serializeLong(amount, *stream);
    }
    
    ~BinaryUpwardProtocol() {
      delete stream;
    }
  };

  class BinaryProtocol: public Protocol {
  private:
    FileInStream* downStream;
    DownwardProtocol* handler;
    BinaryUpwardProtocol * uplink;
    string key;
    string value;

  public:
    BinaryProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) {
      downStream = new FileInStream();
      downStream->open(down);
      uplink = new BinaryUpwardProtocol(up);
      handler = _handler;
    }

    UpwardProtocol* getUplink() {
      return uplink;
    }

    virtual void nextEvent() {
      int32_t cmd;
      cmd = deserializeInt(*downStream);
      switch (cmd) {
      case START_MESSAGE: {
        int32_t prot;
        prot = deserializeInt(*downStream);
        handler->start(prot);
        break;
      }
      case SET_JOB_CONF: {
        int32_t entries;
        entries = deserializeInt(*downStream);
        vector<string> result(entries);
        for(int i=0; i < entries; ++i) {
          string item;
          deserializeString(item, *downStream);
          result.push_back(item);
        }
        handler->setJobConf(result);
        break;
      }
      case SET_INPUT_TYPES: {
        string keyType;
        string valueType;
        deserializeString(keyType, *downStream);
        deserializeString(valueType, *downStream);
        handler->setInputTypes(keyType, valueType);
        break;
      }
      case RUN_MAP: {
        string split;
        int32_t numReduces;
        int32_t piped;
        deserializeString(split, *downStream);
        numReduces = deserializeInt(*downStream);
        piped = deserializeInt(*downStream);
        handler->runMap(split, numReduces, piped);
        break;
      }
      case MAP_ITEM: {
        deserializeString(key, *downStream);
        deserializeString(value, *downStream);
        handler->mapItem(key, value);
        break;
      }
      case RUN_REDUCE: {
        int32_t reduce;
        int32_t piped;
        reduce = deserializeInt(*downStream);
        piped = deserializeInt(*downStream);
        handler->runReduce(reduce, piped);
        break;
      }
      case REDUCE_KEY: {
        deserializeString(key, *downStream);
        handler->reduceKey(key);
        break;
      }
      case REDUCE_VALUE: {
        deserializeString(value, *downStream);
        handler->reduceValue(value);
        break;
      }
      case CLOSE:
        handler->close();
        break;
      case ABORT:
        handler->abort();
        break;
      default:
        HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
      }
    }

    virtual ~BinaryProtocol() {
      delete downStream;
      delete uplink;
    }
  };

  /**
   * Define a context object to give to combiners that will let them
   * go through the values and emit their results correctly.
   */
00460   class CombineContext: public ReduceContext {
  private:
    ReduceContext* baseContext;
    Partitioner* partitioner;
    int numReduces;
    UpwardProtocol* uplink;
    bool firstKey;
    bool firstValue;
    map<string, vector<string> >::iterator keyItr;
    map<string, vector<string> >::iterator endKeyItr;
    vector<string>::iterator valueItr;
    vector<string>::iterator endValueItr;

  public:
    CombineContext(ReduceContext* _baseContext,
                   Partitioner* _partitioner,
                   int _numReduces,
                   UpwardProtocol* _uplink,
                   map<string, vector<string> >& data) {
      baseContext = _baseContext;
      partitioner = _partitioner;
      numReduces = _numReduces;
      uplink = _uplink;
      keyItr = data.begin();
      endKeyItr = data.end();
      firstKey = true;
      firstValue = true;
    }

00489     virtual const JobConf* getJobConf() {
      return baseContext->getJobConf();
    }

00493     virtual const std::string& getInputKey() {
      return keyItr->first;
    }

00497     virtual const std::string& getInputValue() {
      return *valueItr;
    }

00501     virtual void emit(const std::string& key, const std::string& value) {
      if (partitioner != NULL) {
        uplink->partitionedOutput(partitioner->partition(key, numReduces),
                                  key, value);
      } else {
        uplink->output(key, value);
      }
    }

00510     virtual void progress() {
      baseContext->progress();
    }

00514     virtual void setStatus(const std::string& status) {
      baseContext->setStatus(status);
    }

    bool nextKey() {
      if (firstKey) {
        firstKey = false;
      } else {
        ++keyItr;
      }
      if (keyItr != endKeyItr) {
        valueItr = keyItr->second.begin();
        endValueItr = keyItr->second.end();
        firstValue = true;
        return true;
      }
      return false;
    }

    virtual bool nextValue() {
      if (firstValue) {
        firstValue = false;
      } else {
        ++valueItr;
      }
      return valueItr != endValueItr;
    }
    
00542     virtual Counter* getCounter(const std::string& group, 
                               const std::string& name) {
      return baseContext->getCounter(group, name);
    }

00547     virtual void incrementCounter(const Counter* counter, uint64_t amount) {
      baseContext->incrementCounter(counter, amount);
    }
  };

  /**
   * A RecordWriter that will take the map outputs, buffer them up and then
   * combine then when the buffer is full.
   */
00556   class CombineRunner: public RecordWriter {
  private:
    map<string, vector<string> > data;
    int64_t spillSize;
    int64_t numBytes;
    ReduceContext* baseContext;
    Partitioner* partitioner;
    int numReduces;
    UpwardProtocol* uplink;
    Reducer* combiner;
  public:
    CombineRunner(int64_t _spillSize, ReduceContext* _baseContext, 
                  Reducer* _combiner, UpwardProtocol* _uplink, 
                  Partitioner* _partitioner, int _numReduces) {
      numBytes = 0;
      spillSize = _spillSize;
      baseContext = _baseContext;
      partitioner = _partitioner;
      numReduces = _numReduces;
      uplink = _uplink;
      combiner = _combiner;
    }

    virtual void emit(const std::string& key,
                      const std::string& value) {
      numBytes += key.length() + value.length();
      data[key].push_back(value);
      if (numBytes >= spillSize) {
        spillAll();
      }
    }

    virtual void close() {
      spillAll();
    }

  private:
    void spillAll() {
      CombineContext context(baseContext, partitioner, numReduces, 
                             uplink, data);
      while (context.nextKey()) {
        combiner->reduce(context);
      }
      data.clear();
      numBytes = 0;
    }
  };

  class TaskContextImpl: public MapContext, public ReduceContext, 
                         public DownwardProtocol {
  private:
    bool done;
    JobConf* jobConf;
    string key;
    const string* newKey;
    const string* value;
    bool hasTask;
    bool isNewKey;
    bool isNewValue;
    string* inputKeyClass;
    string* inputValueClass;
    string status;
    float progressFloat;
    uint64_t lastProgress;
    bool statusSet;
    Protocol* protocol;
    UpwardProtocol *uplink;
    string* inputSplit;
    RecordReader* reader;
    Mapper* mapper;
    Reducer* reducer;
    RecordWriter* writer;
    Partitioner* partitioner;
    int numReduces;
    const Factory* factory;
    pthread_mutex_t mutexDone;
    std::vector<int> registeredCounterIds;

  public:

    TaskContextImpl(const Factory& _factory) {
      statusSet = false;
      done = false;
      newKey = NULL;
      factory = &_factory;
      jobConf = NULL;
      inputKeyClass = NULL;
      inputValueClass = NULL;
      inputSplit = NULL;
      mapper = NULL;
      reducer = NULL;
      reader = NULL;
      writer = NULL;
      partitioner = NULL;
      protocol = NULL;
      isNewKey = false;
      isNewValue = false;
      lastProgress = 0;
      progressFloat = 0.0f;
      hasTask = false;
      pthread_mutex_init(&mutexDone, NULL);
    }

    void setProtocol(Protocol* _protocol, UpwardProtocol* _uplink) {

      protocol = _protocol;
      uplink = _uplink;
    }

    virtual void start(int protocol) {
      if (protocol != 0) {
        throw Error("Protocol version " + toString(protocol) + 
                    " not supported");
      }
    }

    virtual void setJobConf(vector<string> values) {
      int len = values.size();
      JobConfImpl* result = new JobConfImpl();
      HADOOP_ASSERT(len % 2 == 0, "Odd length of job conf values");
      for(int i=0; i < len; i += 2) {
        result->set(values[i], values[i+1]);
      }
      jobConf = result;
    }

    virtual void setInputTypes(string keyType, string valueType) {
      inputKeyClass = new string(keyType);
      inputValueClass = new string(valueType);
    }

    virtual void runMap(string _inputSplit, int _numReduces, bool pipedInput) {
      inputSplit = new string(_inputSplit);
      reader = factory->createRecordReader(*this);
      HADOOP_ASSERT((reader == NULL) == pipedInput,
                    pipedInput ? "RecordReader defined when not needed.":
                    "RecordReader not defined");
      if (reader != NULL) {
        value = new string();
      }
      mapper = factory->createMapper(*this);
      numReduces = _numReduces;
      if (numReduces != 0) { 
        reducer = factory->createCombiner(*this);
        partitioner = factory->createPartitioner(*this);
      }
      if (reducer != NULL) {
        int64_t spillSize = 100;
        if (jobConf->hasKey("io.sort.mb")) {
          spillSize = jobConf->getInt("io.sort.mb");
        }
        writer = new CombineRunner(spillSize * 1024 * 1024, this, reducer, 
                                   uplink, partitioner, numReduces);
      }
      hasTask = true;
    }

    virtual void mapItem(const string& _key, const string& _value) {
      newKey = &_key;
      value = &_value;
      isNewKey = true;
    }

    virtual void runReduce(int reduce, bool pipedOutput) {
      reducer = factory->createReducer(*this);
      writer = factory->createRecordWriter(*this);
      HADOOP_ASSERT((writer == NULL) == pipedOutput,
                    pipedOutput ? "RecordWriter defined when not needed.":
                    "RecordWriter not defined");
      hasTask = true;
    }

    virtual void reduceKey(const string& _key) {
      isNewKey = true;
      newKey = &_key;
    }

    virtual void reduceValue(const string& _value) {
      isNewValue = true;
      value = &_value;
    }
    
    virtual bool isDone() {
      pthread_mutex_lock(&mutexDone);
      bool doneCopy = done;
      pthread_mutex_unlock(&mutexDone);
      return doneCopy;
    }

    virtual void close() {
      pthread_mutex_lock(&mutexDone);
      done = true;
      pthread_mutex_unlock(&mutexDone);
    }

    virtual void abort() {
      throw Error("Aborted by driver");
    }

    void waitForTask() {
      while (!done && !hasTask) {
        protocol->nextEvent();
      }
    }

    bool nextKey() {
      if (reader == NULL) {
        while (!isNewKey) {
          nextValue();
          if (done) {
            return false;
          }
        }
        key = *newKey;
      } else {
        if (!reader->next(key, const_cast<string&>(*value))) {
          pthread_mutex_lock(&mutexDone);
          done = true;
          pthread_mutex_unlock(&mutexDone);
          return false;
        }
        progressFloat = reader->getProgress();
      }
      isNewKey = false;
      if (mapper != NULL) {
        mapper->map(*this);
      } else {
        reducer->reduce(*this);
      }
      return true;
    }

    /**
     * Advance to the next value.
     */
    virtual bool nextValue() {
      if (isNewKey || done) {
        return false;
      }
      isNewValue = false;
      progress();
      protocol->nextEvent();
      return isNewValue;
    }

    /**
     * Get the JobConf for the current task.
     */
    virtual JobConf* getJobConf() {
      return jobConf;
    }

    /**
     * Get the current key. 
     * @return the current key or NULL if called before the first map or reduce
     */
    virtual const string& getInputKey() {
      return key;
    }

    /**
     * Get the current value. 
     * @return the current value or NULL if called before the first map or 
     *    reduce
     */
    virtual const string& getInputValue() {
      return *value;
    }

    /**
     * Mark your task as having made progress without changing the status 
     * message.
     */
    virtual void progress() {
      if (uplink != 0) {
        uint64_t now = getCurrentMillis();
        if (now - lastProgress > 1000) {
          lastProgress = now;
          if (statusSet) {
            uplink->status(status);
            statusSet = false;
          }
          uplink->progress(progressFloat);
        }
      }
    }

    /**
     * Set the status message and call progress.
     */
    virtual void setStatus(const string& status) {
      this->status = status;
      statusSet = true;
      progress();
    }

    /**
     * Get the name of the key class of the input to this task.
     */
    virtual const string& getInputKeyClass() {
      return *inputKeyClass;
    }

    /**
     * Get the name of the value class of the input to this task.
     */
    virtual const string& getInputValueClass() {
      return *inputValueClass;
    }

    /**
     * Access the InputSplit of the mapper.
     */
    virtual const std::string& getInputSplit() {
      return *inputSplit;
    }

    virtual void emit(const string& key, const string& value) {
      progress();
      if (writer != NULL) {
        writer->emit(key, value);
      } else if (partitioner != NULL) {
        int part = partitioner->partition(key, numReduces);
        uplink->partitionedOutput(part, key, value);
      } else {
        uplink->output(key, value);
      }
    }

    /**
     * Register a counter with the given group and name.
     */
    virtual Counter* getCounter(const std::string& group, 
                               const std::string& name) {
      int id = registeredCounterIds.size();
      registeredCounterIds.push_back(id);
      uplink->registerCounter(id, group, name);
      return new Counter(id);
    }

    /**
     * Increment the value of the counter with the given amount.
     */
    virtual void incrementCounter(const Counter* counter, uint64_t amount) {
      uplink->incrementCounter(counter, amount); 
    }

    void closeAll() {
      if (reader) {
        reader->close();
      }
      if (mapper) {
        mapper->close();
      }
      if (reducer) {
        reducer->close();
      }
      if (writer) {
        writer->close();
      }
    }

    virtual ~TaskContextImpl() {
      delete jobConf;
      delete inputKeyClass;
      delete inputValueClass;
      delete inputSplit;
      if (reader) {
        delete value;
      }
      delete reader;
      delete mapper;
      delete reducer;
      delete writer;
      delete partitioner;
      pthread_mutex_destroy(&mutexDone);
    }
  };

  /**
   * Ping the parent every 5 seconds to know if it is alive 
   */
00938   void* ping(void* ptr) {
    TaskContextImpl* context = (TaskContextImpl*) ptr;
    char* portStr = getenv("hadoop.pipes.command.port");
    int MAX_RETRIES = 3;
    int remaining_retries = MAX_RETRIES;
    while (!context->isDone()) {
      try{
        sleep(5);
        int sock = -1;
        if (portStr) {
          sock = socket(PF_INET, SOCK_STREAM, 0);
          HADOOP_ASSERT(sock != - 1,
                        string("problem creating socket: ") + strerror(errno));
          sockaddr_in addr;
          addr.sin_family = AF_INET;
          addr.sin_port = htons(toInt(portStr));
          addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
          HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
                        string("problem connecting command socket: ") +
                        strerror(errno));

        }
        if (sock != -1) {
          int result = shutdown(sock, SHUT_RDWR);
          HADOOP_ASSERT(result == 0, "problem shutting socket");
          result = close(sock);
          HADOOP_ASSERT(result == 0, "problem closing socket");
        }
        remaining_retries = MAX_RETRIES;
      } catch (Error& err) {
        if (!context->isDone()) {
          fprintf(stderr, "Hadoop Pipes Exception: in ping %s\n", 
                err.getMessage().c_str());
          remaining_retries -= 1;
          if (remaining_retries == 0) {
            exit(1);
          }
        } else {
          return NULL;
        }
      }
    }
    return NULL;
  }

  /**
   * Run the assigned task in the framework.
   * The user's main function should set the various functions using the 
   * set* functions above and then call this.
   * @return true, if the task succeeded.
   */
00989   bool runTask(const Factory& factory) {
    try {
      TaskContextImpl* context = new TaskContextImpl(factory);
      Protocol* connection;
      char* portStr = getenv("hadoop.pipes.command.port");
      int sock = -1;
      FILE* stream = NULL;
      FILE* outStream = NULL;
      char *bufin = NULL;
      char *bufout = NULL;
      if (portStr) {
        sock = socket(PF_INET, SOCK_STREAM, 0);
        HADOOP_ASSERT(sock != - 1,
                      string("problem creating socket: ") + strerror(errno));
        sockaddr_in addr;
        addr.sin_family = AF_INET;
        addr.sin_port = htons(toInt(portStr));
        addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
        HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
                      string("problem connecting command socket: ") +
                      strerror(errno));

        stream = fdopen(sock, "r");
        outStream = fdopen(sock, "w");

        // increase buffer size
        int bufsize = 128*1024;
        int setbuf;
        bufin = new char[bufsize];
        bufout = new char[bufsize];
        setbuf = setvbuf(stream, bufin, _IOFBF, bufsize);
        HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for inStream: ")
                                     + strerror(errno));
        setbuf = setvbuf(outStream, bufout, _IOFBF, bufsize);
        HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for outStream: ")
                                     + strerror(errno));

        connection = new BinaryProtocol(stream, context, outStream);
      } else if (getenv("hadoop.pipes.command.file")) {
        char* filename = getenv("hadoop.pipes.command.file");
        string outFilename = filename;
        outFilename += ".out";
        stream = fopen(filename, "r");
        outStream = fopen(outFilename.c_str(), "w");
        connection = new BinaryProtocol(stream, context, outStream);
      } else {
        connection = new TextProtocol(stdin, context, stdout);
      }
      context->setProtocol(connection, connection->getUplink());
      pthread_t pingThread;
      pthread_create(&pingThread, NULL, ping, (void*)(context));
      context->waitForTask();
      while (!context->isDone()) {
        context->nextKey();
      }
      context->closeAll();
      connection->getUplink()->done();
      pthread_join(pingThread,NULL);
      delete context;
      delete connection;
      if (stream != NULL) {
        fflush(stream);
      }
      if (outStream != NULL) {
        fflush(outStream);
      }
      fflush(stdout);
      if (sock != -1) {
        int result = shutdown(sock, SHUT_RDWR);
        HADOOP_ASSERT(result == 0, "problem shutting socket");
        result = close(sock);
        HADOOP_ASSERT(result == 0, "problem closing socket");
      }
      if (stream != NULL) {
        //fclose(stream);
      }
      if (outStream != NULL) {
        //fclose(outStream);
      } 
      delete bufin;
      delete bufout;
      return true;
    } catch (Error& err) {
      fprintf(stderr, "Hadoop Pipes Exception: %s\n", 
              err.getMessage().c_str());
      return false;
    }
  }
}


Generated by  Doxygen 1.6.0   Back to index