hadoop - Mapper not invoked while using multipleInputFormat -
i have driver class using multipleinputformat class invoke different mappers @ runtime. when use multipleinputs.addinputpath(job, fstatus.getpath(), textinputformat.class,createpuredeltamapperone.class)
in first loop, first mapper(createpuredeltamapperone) not getting invoked. when comment block of code invokes multiple input format first loop, , call outside, mapper class invoked. please me find issue.
import java.io.ioexception; import java.io.inputstreamreader; import java.net.urisyntaxexception; import java.util.properties; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filestatus; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.input.multipleinputs; import org.apache.hadoop.mapreduce.lib.input.textinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.mapreduce.lib.output.textoutputformat; /*** * creates pure delta file matching history records present in hdfs * @author debajit * */ public class createpuredeltadriver { /** * @param args * @throws urisyntaxexception */ public static void main(string[] args) throws ioexception, interruptedexception, classnotfoundexception, urisyntaxexception { string historyfileinputpath=""; string deltafiledirectorypath=""; string puredeltafileoutpath=""; configuration config= new configuration(); job job = new job(config, "pure delta file creation"); job.setjarbyclass(createpuredeltadriver.class); path historydirpath= new path(historyfileinputpath); filesystem fs = filesystem.get(config); filestatus[] statushistory = fs.liststatus(historydirpath); (filestatus fstatus : statushistory) { string historyfilename=fstatus.getpath().getname(); if(historyfilename.contains("part-r")){ multipleinputs.addinputpath(job, fstatus.getpath(), textinputformat.class,createpuredeltamapperone.class); } } path deltadirpath= new path(deltafiledirectorypath); filestatus[] statusdelta = fs.liststatus(deltadirpath); (filestatus fstatus : statusdelta) { string deltafilename=fstatus.getpath().getname(); if(deltafilename.startswith("part-r")){ multipleinputs.addinputpath(job, fstatus.getpath(), textinputformat.class, createpuredeltamappertwo.class); } } job.setmapperclass(createpuredeltamapperone.class); job.setmapperclass(createpuredeltamappertwo.class); job.setreducerclass(createpuredeltareducer.class); job.setmapoutputkeyclass(text.class); job.setmapoutputvalueclass(text.class); job.setinputformatclass(textinputformat.class); job.setoutputformatclass(textoutputformat.class); path hisinpath = new path(historyfileinputpath); path outpath = new path(puredeltafileoutpath); //multipleinputs.addinputpath(job, hisinpath, textinputformat.class, createpuredeltamapperone.class); //multipleinputs.addinputpath(job, delpath, textinputformat.class, createpuredeltamappertwo.class); fileoutputformat.setoutputpath(job, outpath); system.out.println(job.waitforcompletion(true)); } }
my mapper class
import java.io.ioexception; import java.io.inputstreamreader; import java.util.properties; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filestatus; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.nullwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.mapper.context; public class createpuredeltamapperone extends mapper<longwritable, text, text, text> { private text outkey = new text(); private text outvalue = new text(); int counter=0; private string delimiter=""; private int primarykeyindicator =0; private integer nummapnodes = null; public void setup(context context) throws ioexception{ system.out.println("setup--- mapper 1"); configuration config = context.getconfiguration(); properties properties = new properties(); string propertydirectory = config.get("propertydirectory"); string propertyfilename =config.get("propertyfilename"); path propertydirpath= new path(propertydirectory); filesystem fs = filesystem.get(config); filestatus[] status = fs.liststatus(propertydirpath); (filestatus fstatus : status) { string propfilename=fstatus.getpath().getname().trim(); if(propfilename.equals(propertyfilename)){ properties.load(new inputstreamreader(fs.open(fstatus.getpath()))); this.setnummapnodes(integer.parseint(properties.getproperty("num.of.nodes").trim())); this.setdelimiter(properties.getproperty("file.delimiter.type").trim()); this.setprimarykeyindicator(integer.parseint(properties.getproperty("file.primary.key.index.specifier").trim())); } } } public void map(longwritable key, text val, context context) throws ioexception, interruptedexception{ string valuestring = val.tostring().trim(); string[] tokens = valuestring.split(this.getdelimiter()); string temp=tokens[this.getprimarykeyindicator()].tostring(); system.out.println(" mapper 1 invoked"); this.setoutkey(new text(tokens[this.getprimarykeyindicator()].tostring().trim()));//account number this.setoutvalue(new text("h"+valuestring.trim())); context.write(outkey,outvalue ); } }
do not use these 2 lines in code : job.setmapperclass(createpuredeltamapperone.class); job.setmapperclass(createpuredeltamappertwo.class);
because passing name of corresponding class in loop. hope helps..
Comments
Post a Comment