Reduce a key-value pair into a key-list pair with Apache Spark

I am writing a Spark application and want to combine a set of Key-Value pairs (K, V1), (K, V2), ..., (K, Vn) into one Key-Multivalue pair (K, [V1, V2, ..., Vn]). I feel like I should be able to do...

reading and writing from hive tables with spark after aggregation

We have a hive warehouse, and wanted to use spark for various tasks (mainly classification). At times write the results back as a hive table. For example, we wrote the following python function to...

pySpark convert a list or RDD element to value (int)

I'm using pySpark to count elements in a tokenized RDD. This is one of the elements: ('b00004tkvy', ['noah', 'ark', 'activity', 'center', 'jewel', 'case', 'ages', '3', '8', 'victory',...

load a local file to spark using sc.textFile()

#Question# How to load a file from the local file system to Spark using sc.textFile? Do I need to change any -env variables? Also when I tried the same on my windows where Hadoop is not installed...

spark pyspark mllib model - when prediction rdd is generated using map, it throws exception on collect()

I am using spark 1.2.0 (cannot upgrade as I dont have control over it). I am using mllib to build a model points = labels.zip(tfidf).map(lambda t: LabeledPoint(t[0], t[1] )) train_data, test_data...

Persisting Spark Streaming output

I'm collecting the data from a messaging app, I'm currently using Flume, it sends approx 50 Million records per day I wish to use Kafka, consume from Kafka using Spark Streaming and persist it to...

Is there a way to control the number of partitions when reading a text file in PySpark

I am reading a text file using the following command in PySpark rating_data_raw = sc.textFile("/<path_to_csv_file>.csv") Is there a way to specify the number of partitions that RDD...

how to read and write to the same file in spark using parquet?

I am trying to read from a parquet file in spark, do a union with another rdd and then write the result into the same file I have read from (basically overwrite), this throws the following error: ...

Use SparkContext hadoop configuration within RDD methods/closures, like foreachPartition

I am using Spark to read a bunch of files, elaborating on them and then saving all of them as a Sequence file. What I wanted, was to have 1 sequence file per partition, so I did this: SparkConf...

How do I invert key and value in RDD in Python 3 pyspark?

This works in Python 2.7, but in Python 3.5 it returns SyntaxError: invalid syntax. I'm not sure if this has to do with the fact that "tuple unpacking" was removed from Python 3, as I read on...

ValueError: RDD is empty-- Pyspark (Windows Standalone)

I am trying to create an RDD but spark not creating it, throwing back error, pasted below; data = records.map(lambda r: LabeledPoint(extract_label(r), extract_features(r))) first_point =...

Separating application logs in Logback from Spark Logs in log4j

I have a Scala Maven project using that uses Spark, and I am trying implement logging using Logback. I am compiling my application to a jar, and deploying to an EC2 instance where the Spark...

How to convert pyspark.rdd.PipelinedRDD to Data frame with out using collect() method in Pyspark?

I have pyspark.rdd.PipelinedRDD (Rdd1). when I am doing Rdd1.collect(),it is giving result like below. [(10, {3: 3.616726727464709, 4: 2.9996439803387602, 5: 1.6767412921625855}), (1, {3:...

Arch Linux system update: error: GPGME error: No data

When I finally decided to upgrade my laptop running vanilla Arch Linux (because of internet problems) I kept on getting errors like this: $ sudo pacman -Syu :: Synchronizing package databases... ...

Huge latency in spark streaming job

I have a near real time spark streaming application for image recognition where receiver gets the input frames from kafka. I have 6 receivers per executor, 5 executors in total, I can see 30...

PySpark logging from the executor in a standalone cluster

This question has answers related to how to do this on a YARN cluster. But what if I am running a standalone spark cluster? How can I log from executors? Logging from the driver is easy using the...

OutOfMemoryError : Java heap space in Spark

I'm facing some problems regarding the memory issue, but I'm unable to solve it. Any help is highly appreciated. I am new to Spark and pyspark functionalities and trying to read a large JSON file...

How to fix org.apache.spark.SparkException: Job aborted due to stage failure Task & com.datastax.spark.connector.rdd.partitioner.CassandraPartition

In my project i am using spark-Cassandra-connector to read the from Cassandra table and process it further into JavaRDD but i am facing issue while processing Cassandra row to...

How to get most common for each element of array list (pyspark)

I have a List of arrays for which I need to find highest frequency element for each element of the list.For following code "unhashable type: 'list'" error is thrown.However I have also tried to...

How can I mock DynamoDB access via Spark in Scala?

I have a Spark job written in Scala that ultimately writes out to AWS DynamoDB. I want to write some unit tests around it, but the only problem is I don't have a clue how to go about mocking the...

Passing python class objects in Pyspark Rdd map function

While executing the below code where I pass python class object to pyspark rdd map function and got below error: PicklingError: could not serialize object: TypeError: can't pickle _thread.RLock...

Can Coalesce increase partitions of Spark DataFrame

I am trying to understand the difference between coalesce() and repartition(). If I correctly understood this answer, coalesce() can only reduce number of partitions of dataframe and if we try to...

Spark : converting Array[Byte] data to RDD or DataFrame

I have data in the form of Array[Byte] which I want to convert into Spark RDD or DataFrame so that I can write my data directly into a Google bucket in the form of a file. I am not able to write...

Spark SQL exception handling

In order to handle Spark exception on RDD operations I can use the following approach with additional exceptions column: val df: DataFrame = ... val rddWithExcep = df.rdd.map { row: Row => val...

What is the proper way to write a custom AccumulatorParam for this task?

Context: Working in Azure Databricks, Python programming language, Spark environment. I have a rdd, and have created a map operation. rdd = sc.parallelize(my_collection) mapper = rdd.map(lambda...

How do I set FTP passive mode in Spark?... to read a file from FTP Server

I am reading a file from FTP server into spark rdd like this val rdd = spark.sparkContext.textFile("ftp://anonymous:[email protected]<hostname>/data.gz") rdd.count ... This actually works when I run the spark...

Create accumulator on executor dynamically

I want to use accumulators to count combinations of few parameters of objects in my RDD. E.g I have RDD of Obj with fields a and b. Both fields are enum that may have one of few values. To achieve...

Remove Stopwords in a RDD, Pyspark

I have a RDD containing text read from a text file. I would like to remove all the stop words in the text files. There is a pyspark.ml.feature.StopWordsRemover which does the same functionality on...

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable

I am getting this error after running the function below. PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast...

Writing Parquet in Azure Blob Storage: "One of the request inputs is not valid"

I'm trying to write a simple DataFrame in parquet format to Azure Blob Storage. Note that the following code snippets work in local, so my guess is that it has to be something related with Azure...