Wednesday, October 22, 2014

PIG,Hadoop,Java UDF - Custom gzip loader for handling gzip files in Pig,Java UDF,Hadoop

One of the requirement for a PIG/Hadoop job is that the files are in the right format.The entire job fails when a single corrupt line or file is encountered. 

While converting one of the processes we encountered the following errors

ERROR 2998: Unhandled internal error. Java heap space

ERROR 1071: Cannot convert a generic_writablecomparable to a String

One way to handle this is in a external process that would cleanup all the compressed files and then copy them over to hdfs and process it using pig script or we can extend the LoadFunc in Pig using a java udf. 


Below is the code for the CustomGzipHandler.jar source code


import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;


public class CustomGzipLoader extends LoadFunc {


@SuppressWarnings("rawtypes")

private RecordReader reader;
private TupleFactory tupleFactory;

public CustomGzipLoader() 

{
tupleFactory = TupleFactory.getInstance();
}
 
@SuppressWarnings("rawtypes")
@Override
public InputFormat getInputFormat() throws IOException 
{
return new TextInputFormat();
}

@Override

public Tuple getNext() throws IOException 
{
try 
{
if (!reader.nextKeyValue()) 
return null;
 
Text value = (Text)reader.getCurrentValue();
 
  if(value != null  && !value.toString().isEmpty() && value.toString().length() < 9999) 
  return tupleFactory.newTuple(value.toString());
}
catch(Exception e){}
return null;  
}

@Override

public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit pigSplit)
  throws IOException 
{
this.reader = reader; 
}

@Override

public void setLocation(String location, Job job) throws IOException 
{
FileInputFormat.setInputPaths(job, location); 
}

 
After writing my own custom loader the load statement was transformed from 

rawdata = LOAD '/output/mydir/*.gz' USING PigStorage('\t') AS (mystring:chararray);

to this

set mapred.linerecordreader.maxlength 100000;//Ignore any lines more than 100000 in length

REGISTER CustomGzipHandler.jar; 

DEFINE CustomGzipLoader com.mycompany.package.CustomGzipLoader() ;

rawdata = LOAD '/output/mydir/*.gz' USING CustomGzipLoader();




No comments: