Unable to create pyspark DataFrame from Datastore in azureml-sdk (version 1.12.0)

I am trying to read contents from a CSV file into Spark DataFrame using azureml-sdk using following code but an exception is being thrown.

Code throwing exception

import pyspark.sql as spark
from azureml.core import Dataset
dataset = Dataset.Tabular.from_delimited_files(path = [(datastore, file_path)], header = False)
sdf: spark.DataFrame = dataset.to_spark_dataframe()
sdf.show()

Exception

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/data/dataset_error_handling.py in _try_execute(action, operation, dataset_info, **kwargs)
    100         else:
--> 101             return action()
    102     except Exception as e:

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/dataprep/api/_loggerfactory.py in wrapper(*args, **kwargs)
    178                 try:
--> 179                     return func(*args, **kwargs)
    180                 except Exception as e:

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/dataprep/api/dataflow.py in to_spark_dataframe(self)
    763         self._raise_if_missing_secrets()
--> 764         return self._spark_executor.get_dataframe(steps_to_block_datas(self._steps), use_sampling=False)
    765 

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/dataprep/api/sparkexecution.py in get_dataframe(self, steps, use_sampling, overrides, use_first_record_schema)
    136                              overrides,
--> 137                              use_first_record_schema)
    138 

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/dataprep/api/sparkexecution.py in _execute(self, blocks, export_format, use_sampling, overrides, use_first_record_schema)
    169                                           + lariat_version + '.')
--> 170             raise e
    171 

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/dataprep/api/sparkexecution.py in _execute(self, blocks, export_format, use_sampling, overrides, use_first_record_schema)
    160             if export_format == ExportScriptFormat.PYSPARKDATAFRAMELOADER:
--> 161                 return module.LoadData(secrets=secrets, schemaFromFirstRecord=use_first_record_schema)
    162             else:

/tmp/spark-6ce53791-c8e4-4db0-bd37-bedb53a1ef1e/userFiles-dda6cd30-5d1e-48cf-af87-9c7c2a4b8038/loaderb9bc01c2b40c4b7aa86a95d343021e0c.py in LoadData(secrets, schemaFromFirstRecord)
      8 def LoadData(secrets=dict(), schemaFromFirstRecord=False):
----> 9     pex = Executor("S4ddf53ee8d5f4173bd3dcf4b51d78247", "dprep_2.11", "0.116.0", "42315", "39a925e4-9ae9-4588-93c4-5433250b7f73")
     10     jex = pex.jex

/tmp/spark-6ce53791-c8e4-4db0-bd37-bedb53a1ef1e/userFiles-dda6cd30-5d1e-48cf-af87-9c7c2a4b8038/Executor.py in __init__(self, scalaName, dprepMavenPackageName, dprepMavenPackageMatchingVersion, pythonHostChannelPort, pythonHostSecret)
     54             pythonHostChannelPort,
---> 55             pythonHostSecret)
     56 

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1568         return_value = get_return_value(
-> 1569             answer, self._gateway_client, None, self._fqn)
   1570 

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling None.com.microsoft.dprep.execution.PySparkExecutor.
: java.lang.NoClassDefFoundError: Could not initialize class com.microsoft.dprep.integration.azureml.AmlPySdkInvoker$
	at com.microsoft.dprep.execution.PySparkExecutor.<init>(PySparkExecutor.scala:79)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

AzureMLException                          Traceback (most recent call last)
<ipython-input-30-c546b1aded42> in <module>
      2 from azureml.core import Dataset
      3 dataset = Dataset.Tabular.from_delimited_files(path = [(datastore, file_path)], header = False)
----> 4 sdf: spark.DataFrame = dataset.to_spark_dataframe()
      5 sdf.show()
      6 

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/data/_loggerfactory.py in wrapper(*args, **kwargs)
    124             with _LoggerFactory.track_activity(logger, func.__name__, activity_type, custom_dimensions) as al:
    125                 try:
--> 126                     return func(*args, **kwargs)
    127                 except Exception as e:
    128                     if hasattr(al, 'activity_info') and hasattr(e, 'error_code'):

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/data/tabular_dataset.py in to_spark_dataframe(self)
    187         return _try_execute(dataflow.to_spark_dataframe,
    188                             'to_spark_dataframe',
--> 189                             None if self.id is None else {'id': self.id, 'name': self.name, 'version': self.version})
    190 
    191     @track(_get_logger, custom_dimensions={'app_name': 'TabularDataset'}, activity_type=_PUBLIC_API)

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/data/dataset_error_handling.py in _try_execute(action, operation, dataset_info, **kwargs)
    102     except Exception as e:
    103         message, is_dprep_exception = _construct_message_and_check_exception_type(e, dataset_info, operation)
--> 104         _dataprep_error_handler(e, message, is_dprep_exception)
    105 
    106 

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/data/dataset_error_handling.py in _dataprep_error_handler(e, message, is_dprep_exception)
    143         raise AzureMLException(message, inner_exception=e)
    144     else:
--> 145         raise AzureMLException(message, inner_exception=e)
    146 
    147 

AzureMLException: AzureMLException:
	Message: Execution failed unexpectedly due to: Py4JJavaError
	InnerException An error occurred while calling None.com.microsoft.dprep.execution.PySparkExecutor.
: java.lang.NoClassDefFoundError: Could not initialize class com.microsoft.dprep.integration.azureml.AmlPySdkInvoker$
	at com.microsoft.dprep.execution.PySparkExecutor.<init>(PySparkExecutor.scala:79)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

	ErrorResponse 
{
    "error": {
        "message": "Execution failed unexpectedly due to: Py4JJavaError"
    }
}

However, I can read and print the data with the following code i.e. create as a Panda's DataFrame.

Working code

dataset = Dataset.Tabular.from_delimited_files(path = [(datastore, file_path)], header = False)
#sdf: spark.DataFrame = dataset.to_spark_dataframe()
sdf: pd.DataFrame = dataset.to_pandas_dataframe()
print(sdf.head(3))

Answers

Dataset doesn't support Scala 2.12 runtime at the moment. The team is working on it and will address the feature gap soon. Stay tuned!

Posted on by May Hu