Source code for luna.common.sparksession

from pyspark.sql import SparkSession
from luna.common.config import ConfigSet
import warnings
"""Common spark session"""
[docs]class SparkConfig:
[docs] def spark_session(self, config_name, app_name): ''' @:param config_name logical name of the configuration to use from the ConfigSet. See luna/common/config.py @:param app_name application name to give to the spark session. This name will be used in the spark logs. ''' cfg = ConfigSet() warnings.warn("You are using SparkConfig to generate a spark_session, however spark has been depreciated from this package!") spark_uri = cfg.get_value(path=config_name+'::$.spark_cluster_config[:1]["spark.uri"]') spark_driver_host = cfg.get_value(path=config_name+'::$.spark_cluster_config[:2]["spark.driver.host"]') spark_executor_cores = cfg.get_value(path=config_name+ '::$.spark_application_config[:1]["spark.executor.cores"]') spark_cores_max = cfg.get_value(path=config_name+'::$.spark_application_config[:2]["spark.cores.max"]') spark_executor_memory = cfg.get_value(path=config_name+ '::$.spark_application_config[:3]["spark.executor.memory"]') spark_executor_pyspark_memory = \ cfg.get_value(path=config_name+'::$.spark_application_config[:4]["spark.executor.pyspark.memory"]') spark_sql_shuffle_partitions = \ cfg.get_value(path=config_name+'::$.spark_application_config[:5]["spark.sql.shuffle.partitions"]') spark_driver_maxresultsize = \ cfg.get_value(path=config_name+'::$.spark_application_config[:6]["spark.driver.maxResultSize"]') return SparkSession.builder \ .appName(app_name) \ .master(spark_uri) \ .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \ .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.HDFSLogStore") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ .config("spark.hadoop.dfs.client.use.datanode.hostname", "true") \ .config("spark.driver.host", spark_driver_host) \ .config("spark.sql.execution.arrow.pyspark.enabled", "true") \ .config("spark.executor.memory", spark_executor_memory) \ .config("spark.driver.memory", spark_executor_memory) \ .config("spark.executor.cores", spark_executor_cores) \ .config("spark.cores.max", spark_cores_max) \ .config("spark.executor.pyspark.memory", spark_executor_pyspark_memory) \ .config("spark.sql.shuffle.partitions", spark_sql_shuffle_partitions) \ .config("spark.driver.maxResultSize", spark_driver_maxresultsize) \ .config("spark.files.overwrite", "true") \ .config("fs.defaultFS", "file:///") \ .config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \ .config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \ .getOrCreate()