Use SparkContext hadoop configuration within RDD methods/closures, like foreachPartition

I am using Spark to read a bunch of files, elaborating on them and then saving all of them as a Sequence file. What I wanted, was to have 1 sequence file per partition, so I did this:

SparkConf sparkConf = new SparkConf().setAppName("writingHDFS")
				.setMaster("local[2]")
				.set("spark.streaming.stopGracefullyOnShutdown", "true");
		final JavaSparkContext jsc = new JavaSparkContext(sparkConf);
		jsc.hadoopConfiguration().addResource(hdfsConfPath + "hdfs-site.xml");
		jsc.hadoopConfiguration().addResource(hdfsConfPath + "core-site.xml");
	    //JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5*1000));
	    
	    JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath);
	    if(!imageByteRDD.isEmpty())
	    	imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {

                @Override
				public void call(Iterator<Tuple2<String, PortableDataStream>> arg0){
						throws Exception {
                  [°°°SOME STUFF°°°]
                  SequenceFile.Writer writer = SequenceFile.createWriter(
					        		 jsc.hadoopConfiguration(), 
//here lies the problem: how to pass the hadoopConfiguration I have put inside the Spark Context? 
Previously, I created a Configuration for each partition, and it works, but I'm sure there is a much more "sparky way"

Does anybody know how to use the Hadoop Configuration Object inside the RDD closures?

Answers

The problem here is that Hadoop Configuration's aren't tagged as Serializable, so Spark wont pull them into RDDs. They are marked as Writable, so Hadoop's serialization mechanism can marshall and unmarshall them, but Spark doesn't directly work with that

The two long term fix options would be

  1. Add Support for serializing writables in Spark. Maybe SPARK-2421?
  2. Make Hadoop Configuration Serializable.
  3. Add explicit support for serializing Hadoop Configs.

You aren't going to hit any major objections to making Hadoop conf serializable; provided you implement custom ser/deser methods which delegate to the writable IO calls (and which just iterate through all key/value pairs). I say that as a Hadoop committer.

Update: Here's the code to create a serlializable class which does marshall the contents of a Hadoop config. Create it with val ser = new ConfSerDeser(hadoopConf); refer to it in your RDD as ser.get().

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

 import org.apache.hadoop.conf.Configuration

/**
 * Class to make Hadoop configurations serializable; uses the
 * `Writeable` operations to do this.
 * Note: this only serializes the explicitly set values, not any set
 * in site/default or other XML resources.
 * @param conf
 */
class ConfigSerDeser(var conf: Configuration) extends Serializable {

  def this() {
    this(new Configuration())
  }

  def get(): Configuration = conf

  private def writeObject (out: java.io.ObjectOutputStream): Unit = {
    conf.write(out)
  }

  private def readObject (in: java.io.ObjectInputStream): Unit = {
    conf = new Configuration()
    conf.readFields(in)
  }

  private def readObjectNoData(): Unit = {
    conf = new Configuration()
  }
}

Note that it would be relatively straightforward for someone to make this generic for all Writeable classes; you'd just need to provide a classname in the constructor and use that to instantiate the writeable during deserialization.

Posted on by stevel

Looks like it cannot be done, so here is the code I used:

final hdfsNameNodePath = "hdfs://quickstart.cloudera:8080";

JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath);
	    if(!imageByteRDD.isEmpty())
	    	imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {
	    		
				@Override
				public void call(Iterator<Tuple2<String, PortableDataStream>> arg0)
						throws Exception {
					
					Configuration conf = new Configuration();
					conf.set("fs.defaultFS", hdfsNameNodePath);
					//the string above should be passed as argument
SequenceFile.Writer writer = SequenceFile.createWriter(
					        		 conf, 
					        		 SequenceFile.Writer.file([***ETCETERA...

Posted on by Vale

You can serialize and deserialize the org.apache.hadoop.conf.Configuration using org.apache.spark.SerializableWritable.

For example:

import org.apache.spark.SerializableWritable

...

val hadoopConf = spark.sparkContext.hadoopConfiguration
// serialize here
val serializedConf = new SerializableWritable(hadoopConf)


// then access the conf by calling .value on serializedConf
rdd.map(someFunction(serializedConf.value))
Posted on by Simon Wong

This is a java implementation, according to @Steve's Answer.

import java.io.Serializable;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;


public class SerializableHadoopConfiguration implements Serializable {
    Configuration conf;

    public SerializableHadoopConfiguration(Configuration hadoopConf) {
        this.conf = hadoopConf;

        if (this.conf == null) {
            this.conf = new Configuration();
        }
    }

    public SerializableHadoopConfiguration() {
        this.conf = new Configuration();
    }

    public Configuration get() {
        return this.conf;
    }

    private void writeObject(java.io.ObjectOutputStream out) throws IOException {
        this.conf.write(out);
    }

    private void readObject(java.io.ObjectInputStream in) throws IOException {
        this.conf = new Configuration();
        this.conf.readFields(in);
    }
}
Posted on by Gary Gauh