问答 菜鸟程序狗 ⋅ 于 2018-10-19 10:58:20 ⋅ 最后回复由 青牛 2018-10-20 19:13:47

FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
static final String FINAL_SYNC_ATTRIBUTE = "";
private OutputCommitter committer = null;
private static final byte[] newline = "\n".getBytes();

  • Set the requirement for a final sync before the stream is closed.
    static void setFinalSync(JobContext job, boolean newValue) {
    job.getConfiguration().setBoolean(FINAL_SYNC_ATTRIBUTE, newValue);


  • Does the user want a final sync at close?
    public static boolean getFinalSync(JobContext job) {
    return job.getConfiguration().getBoolean(FINAL_SYNC_ATTRIBUTE, false);

    static class TeraRecordWriter extends RecordWriter<Text,Text> {
    private boolean finalSync = false;
    private DataOutputStream out;

    private final byte[] keyValueSeparator;
    public TeraRecordWriter(DataOutputStream out,JobContext job,String keyValueSeparator) {
      finalSync = getFinalSync(job);
      this.keyValueSeparator = keyValueSeparator.getBytes();
      this.out = out;
    public synchronized void write(Text key,Text value) throws IOException {
        out.write(key.getBytes(), 0, key.getLength());
        out.write(value.getBytes(), 0, value.getLength());
    public void close(TaskAttemptContext context) throws IOException {
      if (finalSync) {
          if (out instanceof Syncable) {


    public void checkOutputSpecs(JobContext job) throws InvalidJobConfException, IOException {
    // Ensure that the output directory is set
    Path outDir = getOutputPath(job);
    if (outDir == null) {
    throw new InvalidJobConfException("Output directory not set in JobConf.");

    final Configuration jobConf = job.getConfiguration();
    // get delegation token for outDir's file system
    TokenCache.obtainTokensForNamenodes(job.getCredentials(),new Path[] { outDir }, jobConf);
    final FileSystem fs = outDir.getFileSystem(jobConf);
    if (fs.exists(outDir)) {
      // existing output dir is considered empty iff its only content is the
      // partition file.
      final FileStatus[] outDirKids = fs.listStatus(outDir);
      boolean empty = false;
      if (outDirKids != null && outDirKids.length == 1) {
        final FileStatus st = outDirKids[0];
        final String fname = st.getPath().getName();
        empty =!st.isDirectory() && TeraInputFormat.PARTITION_FILENAME.equals(fname);//_partition.lst
      if (TeraSort.getUseSimplePartitioner(job) || !empty) {//mapreduce.terasort.simplepartitioner
        throw new FileAlreadyExistsException("Output directory " + outDir+ " already exists");


    public RecordWriter<Text,Text> getRecordWriter(TaskAttemptContext job) throws IOException {

    Configuration conf = job.getConfiguration();
    boolean isCompressed = getCompressOutput(job);
    String keyValueSeparator= conf.get("mapreduce.output.teraoutputformat.separator", "\t");
    CompressionCodec codec = null;
    String extension = "";
    if (isCompressed) {
        Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
        codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
        extension = codec.getDefaultExtension();//获取后缀名
    Path file = getDefaultWorkFile(job, extension);
    FileSystem fs = file.getFileSystem(job.getConfiguration());
    if (!isCompressed) {
        FSDataOutputStream fileOut = fs.create(file);
        return new TeraRecordWriter(fileOut, job,keyValueSeparator);
      } else {
        FSDataOutputStream fileOut = fs.create(file);
        return new TeraRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)),job,keyValueSeparator);


    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
    if (committer == null) {
    Path output = getOutputPath(context);
    committer = new FileOutputCommitter(output, context);
    return committer;


  • 青牛 国内首批大数据从业者,就职于金山,担任大数据团队核心研发工程师
    2018-10-20 19:13:47


