hadoop - Mapper not invoked while using multipleInputFormat -


i have driver class using multipleinputformat class invoke different mappers @ runtime. when use multipleinputs.addinputpath(job, fstatus.getpath(), textinputformat.class,createpuredeltamapperone.class) in first loop, first mapper(createpuredeltamapperone) not getting invoked. when comment block of code invokes multiple input format first loop, , call outside, mapper class invoked. please me find issue.

import java.io.ioexception; import java.io.inputstreamreader; import java.net.urisyntaxexception; import java.util.properties;  import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filestatus; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.input.multipleinputs; import org.apache.hadoop.mapreduce.lib.input.textinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.mapreduce.lib.output.textoutputformat;  /***  * creates pure delta file matching history records present in hdfs  * @author debajit  *  */ public class createpuredeltadriver {      /**       * @param args      * @throws urisyntaxexception       */     public static void main(string[] args) throws ioexception, interruptedexception, classnotfoundexception, urisyntaxexception {               string historyfileinputpath="";              string deltafiledirectorypath="";             string puredeltafileoutpath="";              configuration config= new configuration();              job job = new job(config, "pure delta file creation");             job.setjarbyclass(createpuredeltadriver.class);              path historydirpath= new path(historyfileinputpath);             filesystem fs = filesystem.get(config);                      filestatus[] statushistory = fs.liststatus(historydirpath);                     (filestatus fstatus : statushistory) {                     string historyfilename=fstatus.getpath().getname();                      if(historyfilename.contains("part-r")){                         multipleinputs.addinputpath(job, fstatus.getpath(), textinputformat.class,createpuredeltamapperone.class);                     }                 }               path deltadirpath= new path(deltafiledirectorypath);                     filestatus[] statusdelta = fs.liststatus(deltadirpath);                     (filestatus fstatus : statusdelta) {                         string deltafilename=fstatus.getpath().getname();                      if(deltafilename.startswith("part-r")){                         multipleinputs.addinputpath(job, fstatus.getpath(), textinputformat.class, createpuredeltamappertwo.class);                      }             }              job.setmapperclass(createpuredeltamapperone.class);             job.setmapperclass(createpuredeltamappertwo.class);             job.setreducerclass(createpuredeltareducer.class);              job.setmapoutputkeyclass(text.class);             job.setmapoutputvalueclass(text.class);               job.setinputformatclass(textinputformat.class);             job.setoutputformatclass(textoutputformat.class);              path hisinpath = new path(historyfileinputpath);             path outpath = new path(puredeltafileoutpath);              //multipleinputs.addinputpath(job, hisinpath, textinputformat.class, createpuredeltamapperone.class);             //multipleinputs.addinputpath(job, delpath, textinputformat.class, createpuredeltamappertwo.class);                  fileoutputformat.setoutputpath(job, outpath);             system.out.println(job.waitforcompletion(true));          }      } 

my mapper class

import java.io.ioexception; import java.io.inputstreamreader; import java.util.properties;  import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filestatus; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.nullwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.mapper.context;   public class createpuredeltamapperone extends mapper<longwritable, text, text, text> {      private text outkey = new text();     private text outvalue = new text();     int counter=0;     private string delimiter="";     private int primarykeyindicator =0;  private integer nummapnodes = null;      public void setup(context context) throws ioexception{         system.out.println("setup--- mapper 1");         configuration config = context.getconfiguration();         properties properties = new properties();         string propertydirectory = config.get("propertydirectory");          string propertyfilename =config.get("propertyfilename");         path propertydirpath= new path(propertydirectory);         filesystem fs = filesystem.get(config);         filestatus[] status = fs.liststatus(propertydirpath);         (filestatus fstatus : status) {             string propfilename=fstatus.getpath().getname().trim();             if(propfilename.equals(propertyfilename)){                 properties.load(new inputstreamreader(fs.open(fstatus.getpath())));                 this.setnummapnodes(integer.parseint(properties.getproperty("num.of.nodes").trim()));                 this.setdelimiter(properties.getproperty("file.delimiter.type").trim());                 this.setprimarykeyindicator(integer.parseint(properties.getproperty("file.primary.key.index.specifier").trim()));             }         }     }       public void map(longwritable key, text val, context context) throws ioexception, interruptedexception{          string valuestring = val.tostring().trim();           string[] tokens = valuestring.split(this.getdelimiter());         string temp=tokens[this.getprimarykeyindicator()].tostring();         system.out.println(" mapper 1 invoked");          this.setoutkey(new text(tokens[this.getprimarykeyindicator()].tostring().trim()));//account number          this.setoutvalue(new text("h"+valuestring.trim()));         context.write(outkey,outvalue );       }    } 

do not use these 2 lines in code : job.setmapperclass(createpuredeltamapperone.class); job.setmapperclass(createpuredeltamappertwo.class);

because passing name of corresponding class in loop. hope helps..


Comments

Popular posts from this blog

PHPMotion implementation - URL based videos (Hosted on separate location) -

javascript - Using Windows Media Player as video fallback for video tag -

c# - Unity IoC Lifetime per HttpRequest for UserStore -