Spark SQL: Generate Table Using ‘Create Table As Select’ Approach

Feedback


Question:

I am utilizing
spark sql
with Spark version 2.2.2.

The SQL likes

select a.*, b.* from a inner join b on a.id=b.id

Both table a and b are large in size, but this method is effective and yields the desired solution.

Additionally, my objective is to store the SQL results in a hive table. The SQL code is as follows:

create table xxx stored as orc as select ...

This would not work as intended due to the occurrence of
error code 143
. An error message is displayed in this case.

ExecutorLostFailure occurred when executor 268 exited due to one of the running tasks. The container container_e37_1554167308087_15187_01_000269 on host xxx was marked as failed. The exit code is
status: 143
. Additional diagnostics reveal
container killed on request. exit code is 143
.
Another ExecutorLostFailure occurred when executor 268 exited due to one of the running tasks. The container container_e37_1554167308087_15187_01_000269 on host xxxx was marked as failed. The exit status is 143. Diagnostics indicate
Container killed
on request. The exit code is 143.
The container exited with a non-zero
exit code 143
and was killed by an external signal.

I attempted to utilize pyspark with the implementation of

df.saveAsTable()

. Although it is successful in certain cases, it doesn’t consistently work.

Can anybody help?

Thanks.


Solution:

In order to address memory problems, one can consider the following solutions:

  • repartitioning
  • Enhance the memory allocated to the executor.

    --executor-memory
  • Decrease the number of executor cores (

    --executor-cores

    ) to ensure that they share the RAM, while also increasing the memory overhead to meet the required specifications.

By redistributing, the size of individual tasks can be decreased, and either increasing memory or reducing the number of cores will lead to an allocation of more memory for each task.

In cases where the number of tasks and memory allocation are uncertain, you can repartition your dataframes based on their size.

df.persist()
n = df.count()
nb_records = 10
df = df.repartition(int(n / nb_records))
df.rdd.getNumPartitions()
10

To ensure 10 records per task, we requested 10 partitions since the dataframe contains 100 records.

Frequently Asked Questions