Filter Spark DataFrame by checking if value is in a list, with other criteria

As a simplified example, I tried to filter a Spark DataFrame with following code: val xdf = sqlContext.createDataFrame(Seq( ("A", 1), ("B", 2), ("C", 3) )).toDF("name", "cnt") xdf.filter($"cnt"...

FetchFailedException or MetadataFetchFailedException when processing big data set

When I run the parsing code with 1 GB dataset it completes without any error. But, when I attempt 25 gb of data at a time I get below errors. I'm trying to understand how can I avoid below...

I am reading JSON data from kafka and parsing the data using spark. But I end up with JSON parser issue

I am reading JSON data from kafka and parsing the data using spark. But I end up with JSON parser issue. Code shown below: val Array(zkQuorum, groupId, topics, numThreads) = args val conf =...

How to validate Spark SQL expression without executing it?

I want to validate if spark-sql query is syntactically correct or not without actually running the query on the cluster. Actual use case is that I am trying to develop a user interface, which...

java.lang.IllegalArgumentException: Failed to parse query: {"query":

I am trying to execute the Elasticsearch DSL query in Spark 2.2 and Scala 2.11.8. The version of Elasticsearch is 2.4.4, while the version of Kibana that I use is 4.6.4. This is the library that I...

structured streaming - explode json fields into dynamic columns?

I got this dataframe from a Kafka source. +-----------------------+ | data | +-----------------------+ | '{ "a": 1, "b": 2 }' | +-----------------------+ | '{ "b": 3, "d": 4 }' ...

How to parse CSV which contains \n in data using Apache Spark?

I have some CSV files which have \n character in the data itself. I am trying to read these files in Apache Spark 2.2 but Spark is not able to read the data properly. Sample data is like...

lz4 exception when reading data from kafka using spark streaming

I am trying to read json data from kafka using spark streaming api, when I do that it throws java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.init exception. Stack trace is...

Dataproc Reading from Google Cloud Storage

I am trying to read a csv or txt file from GCS in a Dataproc pyspark Application. I have tried so many things. So far the most promising: #!/usr/bin/python import os import sys import...

Error parsing conf core-default.xml While running shadow jar of geotool with Spark

I have created a spark application that process lat/long and identifies the zone defined in custom shape files provided by client. Given this requirement, i have created a shadow jar file using...

ClassNotFoundException: net.logstash.log4j.JSONEventLayoutV1 in assembled Spark jar file

I am running Spark application from jar file. When I define in log4j custom file and run my application simply from the main class the logs output works fine, however, when I run the application...

Kafka Spark Streaming Filing with source KAFKA not found

I am trying to Steam a Producer Topic Form Kafka. Getting the error that Kafka is not a valid data source I imported all the required packages like Kafka SQL streaming etc. BUILD.Gradle FILE...

AWS Glue: How to expand nested Hive struct to Dict?

I'm trying to expand field mappings in a Table mapped by my AWS Glue crawler to a nested dictionary in Python. But, I can't find any Spark/Hive parsers to deserialize the var_type =...

Alternative to deprecated method sparksession.read.json(JavaRDD)

I am using sparksessions read().json method to read the json file before converting it to parquet file and it is working fine but the .json(JAVARDD) method is showing as deprecated method. Can we...

PySpark and argparse

How does one specify command line arguments using argparse for a PySpark script? I've been breaking my head over this one and I swear I can't find the solution anywhere else. Here's my test...

Submitting Job Arguments to Spark Job in Dataproc

Trying to run [Spark-Wiki-Parser][1] on a GCP Dataproc cluster. The code takes in two arguments "dumpfile" and "destloc". When I submit the following I get a [scallop] Error: Excess arguments...

Spark Streaming handle Skewed Kafka Partitions

Scenario: Kafka -> Spark Streaming Logic in each Spark Streaming microbatch (30 seconds): Read Json->Parse Json->Send to Kafka My streaming job is reading from around 1000 Kafka topics, with...

How to explode a struct column with a prefix?

My goal is to explode (ie, take them from inside the struct and expose them as the remaining columns of the dataset) a Spark struct column (already done) but changing the inner field names by...

to_date fails to parse date in Spark 3.0

I am trying to parse date using to_date() but I get the following exception. SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: Fail to parse '12/1/2010...

Pyspark Streaming with Pandas UDF

I am new to Spark Streaming and Pandas UDF. I am working on pyspark consumer from kafka, payload is of xml format and trying to parse the incoming xml by applying pandas udf @pandas_udf("col1...

Consume Confluent Avro Format in Spark standalone cluster?

I have the following simple kafka consumer that works with confluent kafka/schema and avro based topics. It works as expected with local[*]. However, once I try to submit this to a standalone...

Spark: How to parse and transform json string from spark data frame rows

How to parse and transform json string from spark dataframe rows in pyspark? I'm looking for help how to parse: json string to json struct output 1 transform json string to columns a, b and id...

Spark: How to transform JSON string with multiple keys, from data frame rows?

I'm looking for a help, how to parse json string with multiple keys to json struct, see required output. Answer below shows how to transform JSON string with one Id : jstr1 = '{"id_1": \[{"a": 1,...

Spark: How to parse JSON string of nested lists to spark data frame?

How to parse JSON string of nested lists to spark data frame in pyspark ? Input data frame: +-------------+-----------------------------------------------+ |url |json ...

Flatten nested array in Spark DataFrame

I'm reading in some JSON on the from: {"a": [{"b": {"c": 1, "d": 2}}]} That is, the array items are unnecessarily nested. Now, because this happens inside an array, the answers given in...

Why can't I parse all my data when parsing byte string from TFRecrord file written from spark dataframe?

I have used the spark-tensorflow-connector package to write a tfrecord dataset. When attempting to read the tfrecord files in as a TFRecordDataset in I am losing any fields that contains an array...

How to install PyCaret in AWS Glue

How can I properly install PyCaret in AWS Glue? Methods I tried: --additional-python-modules and --python-modules-installer-option Python library path easy_install as described in...

How can I avoid Pandas dataframe in Spark pipeline?

I'm a newbie in PySpark, and I want to parse data as below: # http_path #0 https://example.org/path/to/file?param=42#frag... #1 https://example.org/path/to/file # ...

Unsupported operation exception from spark: Schema for type org.apache.spark.sql.types.DataType is not supported

Spark Streaming: I am receiving a dataframe that consists of two columns. The first column is of string type that contains a json string and the second column consists of schema for each...

Spark execution planning

I'm trying to better understand the Spark Execution Planning, I find the articles on the internet quite imprecise. I think we're all familiar with this diagram: I was wondering which components...