One of the requirement for a PIG/Hadoop job is that the files are in the right format.The entire job fails when a single corrupt line or file is encountered.
While converting one of the processes we encountered the following errors
ERROR 2998: Unhandled internal error. Java heap space
ERROR 1071: Cannot convert a generic_writablecomparable to a String
One way to handle this is in a external process that would cleanup all the compressed files and then copy them over to hdfs and process it using pig script or we can extend the LoadFunc in Pig using a java udf.
Below is the code for the CustomGzipHandler.jar source code
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
public class CustomGzipLoader extends LoadFunc {
@SuppressWarnings("rawtypes")
private RecordReader reader;
private TupleFactory tupleFactory;
public CustomGzipLoader()
{
tupleFactory = TupleFactory.getInstance();
}
@SuppressWarnings("rawtypes")
@Override
public InputFormat getInputFormat() throws IOException
{
return new TextInputFormat();
}
@Override
public Tuple getNext() throws IOException
{
try
{
if (!reader.nextKeyValue())
return null;
Text value = (Text)reader.getCurrentValue();
if(value != null && !value.toString().isEmpty() && value.toString().length() < 9999)
return tupleFactory.newTuple(value.toString());
}
catch(Exception e){}
return null;
}
@Override
public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit pigSplit)
throws IOException
{
this.reader = reader;
}
@Override
public void setLocation(String location, Job job) throws IOException
{
FileInputFormat.setInputPaths(job, location);
}
}
After writing my own custom loader the load statement was transformed from
rawdata = LOAD '/output/mydir/*.gz' USING PigStorage('\t') AS (mystring:chararray);
to this
set mapred.linerecordreader.maxlength 100000;//Ignore any lines more than 100000 in length
REGISTER CustomGzipHandler.jar;
DEFINE CustomGzipLoader com.mycompany.package.CustomGzipLoader() ;
rawdata = LOAD '/output/mydir/*.gz' USING CustomGzipLoader();
While converting one of the processes we encountered the following errors
ERROR 2998: Unhandled internal error. Java heap space
ERROR 1071: Cannot convert a generic_writablecomparable to a String
One way to handle this is in a external process that would cleanup all the compressed files and then copy them over to hdfs and process it using pig script or we can extend the LoadFunc in Pig using a java udf.
Below is the code for the CustomGzipHandler.jar source code
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
public class CustomGzipLoader extends LoadFunc {
@SuppressWarnings("rawtypes")
private RecordReader reader;
private TupleFactory tupleFactory;
public CustomGzipLoader()
{
tupleFactory = TupleFactory.getInstance();
}
@SuppressWarnings("rawtypes")
@Override
public InputFormat getInputFormat() throws IOException
{
return new TextInputFormat();
}
@Override
public Tuple getNext() throws IOException
{
try
{
if (!reader.nextKeyValue())
return null;
Text value = (Text)reader.getCurrentValue();
if(value != null && !value.toString().isEmpty() && value.toString().length() < 9999)
return tupleFactory.newTuple(value.toString());
}
catch(Exception e){}
return null;
}
@Override
public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit pigSplit)
throws IOException
{
this.reader = reader;
}
@Override
public void setLocation(String location, Job job) throws IOException
{
FileInputFormat.setInputPaths(job, location);
}
}
After writing my own custom loader the load statement was transformed from
rawdata = LOAD '/output/mydir/*.gz' USING PigStorage('\t') AS (mystring:chararray);
to this
set mapred.linerecordreader.maxlength 100000;//Ignore any lines more than 100000 in length
REGISTER CustomGzipHandler.jar;
DEFINE CustomGzipLoader com.mycompany.package.CustomGzipLoader() ;
rawdata = LOAD '/output/mydir/*.gz' USING CustomGzipLoader();
No comments:
Post a Comment