How to make Yarn dynamically allocate resources for Spark

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to make Yarn dynamically allocate resources for Spark

Anton Puzanov
Hi everyone,

have a cluster managed with Yarn and runs Spark jobs, the components were installed using Ambari (2.6.3.0-235). I have 6 hosts each with 6 cores. I use Fair scheduler

I want Yarn to automatically add/remove executor cores, but no matter what I do it doesn't work

Relevant Spark configuration (configured in Ambari):

spark.dynamicAllocation.schedulerBacklogTimeout 10s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s
spark.driver.memory 4G
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.initialExecutors 6 (has no effect - starts with 2)
spark.dynamicAllocation.maxExecutors 10
spark.dynamicAllocation.minExecutors 1
spark.scheduler.mode FAIR
spark.shuffle.service.enabled true
SPARK_EXECUTOR_MEMORY="36G"
Relevant Yarn configuration (configured in Ambari):
yarn.nodemanager.aux-services mapreduce_shuffle,spark_shuffle,spark2_shuffle YARN Java heap size 4096 yarn.resourcemanager.scheduler.class org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler yarn.scheduler.fair.preemption true yarn.nodemanager.aux-services.spark2_shuffle.class org.apache.spark.network.yarn.YarnShuffleService yarn.nodemanager.aux-services.spark2_shuffle.classpath {{stack_root}}/${hdp.version}/spark2/aux/* yarn.nodemanager.aux-services.spark_shuffle.class org.apache.spark.network.yarn.YarnShuffleService yarn.nodemanager.aux-services.spark_shuffle.classpath {{stack_root}}/${hdp.version}/spark/aux/* Minimum Container Size (VCores) 0 Maximum Container Size (VCores) 12 Number of virtual cores 12


Also I followed Dynamic resource allocation and passed all the steps to configure external shuffle service, I copied the yarn-shuffle jar:
cp /usr/hdp/2.6.3.0-235/spark/aux/spark-2.2.0.2.6.3.0-235-yarn-shuffle.jar /usr/hdp/2.6.3.0-235/hadoop-yarn/lib/

I see only 3 cores are allocated to the application (deafult executors is 2 so I guess its driver+2) the queue:



Although many tasks are pending:




I want to get to a point where Yarn starts with 3 cpu for every application, but when there are pending tasks more resources are allocated.

If it it relevant, I use Jupyter Notebook and findspark to connect to the cluster:
import findspark
findspark.init()
spark = SparkSession.builder.appName("internal-external2").getOrCreate()


I would really appreciate any suggestion/help, there is no manual on that topic I didn't try.
thx a lot,
Anton


Reply | Threaded
Open this post in threaded view
|

Re: How to make Yarn dynamically allocate resources for Spark

Suma Shivaprasad
Hi Anton,

Seems like --num-executors is being passed to spark client. If thats specified , then dynamic resource allocation does not kick in. Can you check what parameters being passed to the Spark submit ?

Thanks
Suma

On Tue, Jul 31, 2018 at 11:21 PM Anton Puzanov <[hidden email]> wrote:
Hi everyone,

have a cluster managed with Yarn and runs Spark jobs, the components were installed using Ambari (2.6.3.0-235). I have 6 hosts each with 6 cores. I use Fair scheduler

I want Yarn to automatically add/remove executor cores, but no matter what I do it doesn't work

Relevant Spark configuration (configured in Ambari):

spark.dynamicAllocation.schedulerBacklogTimeout 10s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s
spark.driver.memory 4G
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.initialExecutors 6 (has no effect - starts with 2)
spark.dynamicAllocation.maxExecutors 10
spark.dynamicAllocation.minExecutors 1
spark.scheduler.mode FAIR
spark.shuffle.service.enabled true
SPARK_EXECUTOR_MEMORY="36G"
Relevant Yarn configuration (configured in Ambari):
yarn.nodemanager.aux-services mapreduce_shuffle,spark_shuffle,spark2_shuffle YARN Java heap size 4096 yarn.resourcemanager.scheduler.class org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler yarn.scheduler.fair.preemption true yarn.nodemanager.aux-services.spark2_shuffle.class org.apache.spark.network.yarn.YarnShuffleService yarn.nodemanager.aux-services.spark2_shuffle.classpath {{stack_root}}/${hdp.version}/spark2/aux/* yarn.nodemanager.aux-services.spark_shuffle.class org.apache.spark.network.yarn.YarnShuffleService yarn.nodemanager.aux-services.spark_shuffle.classpath {{stack_root}}/${hdp.version}/spark/aux/* Minimum Container Size (VCores) 0 Maximum Container Size (VCores) 12 Number of virtual cores 12


Also I followed Dynamic resource allocation and passed all the steps to configure external shuffle service, I copied the yarn-shuffle jar:
cp /usr/hdp/2.6.3.0-235/spark/aux/spark-2.2.0.2.6.3.0-235-yarn-shuffle.jar /usr/hdp/2.6.3.0-235/hadoop-yarn/lib/

I see only 3 cores are allocated to the application (deafult executors is 2 so I guess its driver+2) the queue:



Although many tasks are pending:




I want to get to a point where Yarn starts with 3 cpu for every application, but when there are pending tasks more resources are allocated.

If it it relevant, I use Jupyter Notebook and findspark to connect to the cluster:
import findspark
findspark.init()
spark = SparkSession.builder.appName("internal-external2").getOrCreate()


I would really appreciate any suggestion/help, there is no manual on that topic I didn't try.
thx a lot,
Anton


Reply | Threaded
Open this post in threaded view
|

Re: How to make Yarn dynamically allocate resources for Spark

Anton Puzanov
Hi and thanks for the reply, I ran the calculations from Jupyter Notebook so I didn't actively called spark-submit. spark.executor.instances is the equivalent configuration property which is not set. Yarn's configuration of minimal number of cores and relevant spark.dynamicAllocation properties were set though,

On Wed, Aug 1, 2018 at 10:45 PM, Suma Shivaprasad <[hidden email]> wrote:
Hi Anton,

Seems like --num-executors is being passed to spark client. If thats specified , then dynamic resource allocation does not kick in. Can you check what parameters being passed to the Spark submit ?

Thanks
Suma

On Tue, Jul 31, 2018 at 11:21 PM Anton Puzanov <[hidden email]> wrote:
Hi everyone,

have a cluster managed with Yarn and runs Spark jobs, the components were installed using Ambari (2.6.3.0-235). I have 6 hosts each with 6 cores. I use Fair scheduler

I want Yarn to automatically add/remove executor cores, but no matter what I do it doesn't work

Relevant Spark configuration (configured in Ambari):

spark.dynamicAllocation.schedulerBacklogTimeout 10s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s
spark.driver.memory 4G
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.initialExecutors 6 (has no effect - starts with 2)
spark.dynamicAllocation.maxExecutors 10
spark.dynamicAllocation.minExecutors 1
spark.scheduler.mode FAIR
spark.shuffle.service.enabled true
SPARK_EXECUTOR_MEMORY="36G"
Relevant Yarn configuration (configured in Ambari):
yarn.nodemanager.aux-services mapreduce_shuffle,spark_shuffle,spark2_shuffle YARN Java heap size 4096 yarn.resourcemanager.scheduler.class org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler yarn.scheduler.fair.preemption true yarn.nodemanager.aux-services.spark2_shuffle.class org.apache.spark.network.yarn.YarnShuffleService yarn.nodemanager.aux-services.spark2_shuffle.classpath {{stack_root}}/${hdp.version}/spark2/aux/* yarn.nodemanager.aux-services.spark_shuffle.class org.apache.spark.network.yarn.YarnShuffleService yarn.nodemanager.aux-services.spark_shuffle.classpath {{stack_root}}/${hdp.version}/spark/aux/* Minimum Container Size (VCores) 0 Maximum Container Size (VCores) 12 Number of virtual cores 12


Also I followed Dynamic resource allocation and passed all the steps to configure external shuffle service, I copied the yarn-shuffle jar:
cp /usr/hdp/2.6.3.0-235/spark/aux/spark-2.2.0.2.6.3.0-235-yarn-shuffle.jar /usr/hdp/2.6.3.0-235/hadoop-yarn/lib/

I see only 3 cores are allocated to the application (deafult executors is 2 so I guess its driver+2) the queue:



Although many tasks are pending:




I want to get to a point where Yarn starts with 3 cpu for every application, but when there are pending tasks more resources are allocated.

If it it relevant, I use Jupyter Notebook and findspark to connect to the cluster:
import findspark
findspark.init()
spark = SparkSession.builder.appName("internal-external2").getOrCreate()


I would really appreciate any suggestion/help, there is no manual on that topic I didn't try.
thx a lot,
Anton



Reply | Threaded
Open this post in threaded view
|

Re: How to make Yarn dynamically allocate resources for Spark

Sudeep Singh Thakur
Hi ,

If you are setting up configuration in code it won't work. You should pass configuration parameters explicitly. 

Spark-submit --num-executers ....


Kind regards,
Sudeep Singh Thakur

On Thu 2 Aug, 2018, 5:56 AM Anton Puzanov, <[hidden email]> wrote:
Hi and thanks for the reply, I ran the calculations from Jupyter Notebook so I didn't actively called spark-submit. spark.executor.instances is the equivalent configuration property which is not set. Yarn's configuration of minimal number of cores and relevant spark.dynamicAllocation properties were set though,

On Wed, Aug 1, 2018 at 10:45 PM, Suma Shivaprasad <[hidden email]> wrote:
Hi Anton,

Seems like --num-executors is being passed to spark client. If thats specified , then dynamic resource allocation does not kick in. Can you check what parameters being passed to the Spark submit ?

Thanks
Suma

On Tue, Jul 31, 2018 at 11:21 PM Anton Puzanov <[hidden email]> wrote:
Hi everyone,

have a cluster managed with Yarn and runs Spark jobs, the components were installed using Ambari (2.6.3.0-235). I have 6 hosts each with 6 cores. I use Fair scheduler

I want Yarn to automatically add/remove executor cores, but no matter what I do it doesn't work

Relevant Spark configuration (configured in Ambari):

spark.dynamicAllocation.schedulerBacklogTimeout 10s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s
spark.driver.memory 4G
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.initialExecutors 6 (has no effect - starts with 2)
spark.dynamicAllocation.maxExecutors 10
spark.dynamicAllocation.minExecutors 1
spark.scheduler.mode FAIR
spark.shuffle.service.enabled true
SPARK_EXECUTOR_MEMORY="36G"
Relevant Yarn configuration (configured in Ambari):
yarn.nodemanager.aux-services mapreduce_shuffle,spark_shuffle,spark2_shuffle YARN Java heap size 4096 yarn.resourcemanager.scheduler.class org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler yarn.scheduler.fair.preemption true yarn.nodemanager.aux-services.spark2_shuffle.class org.apache.spark.network.yarn.YarnShuffleService yarn.nodemanager.aux-services.spark2_shuffle.classpath {{stack_root}}/${hdp.version}/spark2/aux/* yarn.nodemanager.aux-services.spark_shuffle.class org.apache.spark.network.yarn.YarnShuffleService yarn.nodemanager.aux-services.spark_shuffle.classpath {{stack_root}}/${hdp.version}/spark/aux/* Minimum Container Size (VCores) 0 Maximum Container Size (VCores) 12 Number of virtual cores 12


Also I followed Dynamic resource allocation and passed all the steps to configure external shuffle service, I copied the yarn-shuffle jar:
cp /usr/hdp/2.6.3.0-235/spark/aux/spark-2.2.0.2.6.3.0-235-yarn-shuffle.jar /usr/hdp/2.6.3.0-235/hadoop-yarn/lib/

I see only 3 cores are allocated to the application (deafult executors is 2 so I guess its driver+2) the queue:



Although many tasks are pending:




I want to get to a point where Yarn starts with 3 cpu for every application, but when there are pending tasks more resources are allocated.

If it it relevant, I use Jupyter Notebook and findspark to connect to the cluster:
import findspark
findspark.init()
spark = SparkSession.builder.appName("internal-external2").getOrCreate()


I would really appreciate any suggestion/help, there is no manual on that topic I didn't try.
thx a lot,
Anton