我是一个Spark应用程序,有几个要保留当前状态的地方。通常这是经过很大的一步,或者是缓存我想多次使用的状态。看来,当我第二次在数据帧上调用缓存时,新副本将缓存到内存中。在我的应用程序中,这会在扩展时导致内存问题。即使在我当前的测试中,给定的数据帧最大约为100
MB,中间结果的累积大小却超出了执行器上分配的内存。请参见下面的一个小示例,该示例显示了此行为。
cache_test.py:
from pyspark import SparkContext, HiveContext
spark_context = SparkContext(appName='cache_test')
hive_context = HiveContext(spark_context)
df = (hive_context.read
.format('com.databricks.spark.csv')
.load('simple_data.csv')
)
df.cache()
df.show()
df = df.withColumn('C1+C2', df['C1'] + df['C2'])
df.cache()
df.show()
spark_context.stop()
simple_data.csv:
1,2,3
4,5,6
7,8,9
查看应用程序UI,除了带有新列的原始数据框之外,还有一个原始数据框的副本。我可以通过df.unpersist()
在withColumn行之前调用来删除原始副本。这是删除缓存的中间结果的推荐方法(即,在之前调用unpersist
cache()
)。
另外,可以清除所有缓存的对象。在我的应用程序中,有自然的断点,我可以在其中简单地清除所有内存,然后继续下一个文件。我想这样做而不为每个输入文件创建新的spark应用程序。
先感谢您!
火花2.x
您可以使用Catalog.clearCache
:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate
...
spark.catalog.clearCache()
Spark 1.x
您可以使用以下SQLContext.clearCache
方法
从内存缓存中删除所有缓存的表。
from pyspark.sql import SQLContext
from pyspark import SparkContext
sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
...
sqlContext.clearCache()