Class SparkExecutionContext
- java.lang.Object
-
- org.apache.sysds.runtime.controlprogram.context.ExecutionContext
-
- org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext
-
public class SparkExecutionContext extends ExecutionContext
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
SparkExecutionContext.SparkClusterConfig
Captures relevant spark cluster configuration properties, e.g., memory budgets and degree of parallelism.
-
Field Summary
Fields Modifier and Type Field Description static boolean
FAIR_SCHEDULER_MODE
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description void
addLineage(String varParent, String varChild, boolean broadcast)
void
addLineageBroadcast(String varParent, String varChild)
Adds a child broadcast object to the lineage of a parent rdd.void
addLineageRDD(String varParent, String varChild)
Adds a child rdd object to the lineage of a parent rdd.org.apache.spark.broadcast.Broadcast<CacheBlock>
broadcastVariable(CacheableData<CacheBlock> cd)
void
cacheMatrixObject(String var)
static void
cleanupBroadcastVariable(org.apache.spark.broadcast.Broadcast<?> bvar)
This call destroys a broadcast variable at all executors and the driver.void
cleanupCacheableData(CacheableData<?> mo)
static void
cleanupRDDVariable(org.apache.spark.api.java.JavaPairRDD<?,?> rvar)
This call removes an rdd variable from executor memory and disk if required.void
cleanupThreadLocalSchedulerPool(int pool)
void
close()
static org.apache.spark.SparkConf
createSystemDSSparkConf()
Sets up a SystemDS-preferred Spark configuration based on the implicit default configuration (as passed via configurations from outside).org.apache.spark.api.java.JavaPairRDD<MatrixIndexes,MatrixBlock>
getBinaryMatrixBlockRDDHandleForVariable(String varname)
Spark instructions should call this for all matrix inputs except broadcast variables.org.apache.spark.api.java.JavaPairRDD<MatrixIndexes,MatrixBlock>
getBinaryMatrixBlockRDDHandleForVariable(String varname, int numParts, boolean inclEmpty)
org.apache.spark.api.java.JavaPairRDD<TensorIndexes,TensorBlock>
getBinaryTensorBlockRDDHandleForVariable(String varname)
Spark instructions should call this for all tensor inputs except broadcast variables.org.apache.spark.api.java.JavaPairRDD<TensorIndexes,TensorBlock>
getBinaryTensorBlockRDDHandleForVariable(String varname, int numParts, boolean inclEmpty)
PartitionedBroadcast<FrameBlock>
getBroadcastForFrameVariable(String varname)
PartitionedBroadcast<MatrixBlock>
getBroadcastForMatrixObject(MatrixObject mo)
PartitionedBroadcast<TensorBlock>
getBroadcastForTensorObject(TensorObject to)
PartitionedBroadcast<TensorBlock>
getBroadcastForTensorVariable(String varname)
PartitionedBroadcast<MatrixBlock>
getBroadcastForVariable(String varname)
static double
getBroadcastMemoryBudget()
Obtains the available memory budget for broadcast variables in bytes.static double
getDataMemoryBudget(boolean min, boolean refresh)
Obtain the available memory budget for data storage in bytes.static int
getDefaultParallelism(boolean refresh)
Obtain the default degree of parallelism (cores in the cluster).org.apache.spark.api.java.JavaPairRDD<Long,FrameBlock>
getFrameBinaryBlockRDDHandleForVariable(String varname)
Spark instructions should call this for all frame inputs except broadcast variables.static int
getNumExecutors()
Obtain the number of executors in the cluster (excluding the driver).org.apache.spark.api.java.JavaPairRDD<?,?>
getRDDHandleForFrameObject(FrameObject fo, Types.FileFormat fmt)
FIXME: currently this implementation assumes matrix representations but frame signature in order to support the old transform implementation.org.apache.spark.api.java.JavaPairRDD<?,?>
getRDDHandleForMatrixObject(MatrixObject mo, Types.FileFormat fmt)
org.apache.spark.api.java.JavaPairRDD<?,?>
getRDDHandleForMatrixObject(MatrixObject mo, Types.FileFormat fmt, int numParts, boolean inclEmpty)
org.apache.spark.api.java.JavaPairRDD<?,?>
getRDDHandleForTensorObject(TensorObject to, Types.FileFormat fmt, int numParts, boolean inclEmpty)
org.apache.spark.api.java.JavaPairRDD<?,?>
getRDDHandleForVariable(String varname, Types.FileFormat fmt, int numParts, boolean inclEmpty)
static SparkExecutionContext.SparkClusterConfig
getSparkClusterConfig()
Obtains the lazily analyzed spark cluster configuration.org.apache.spark.api.java.JavaSparkContext
getSparkContext()
Returns the used singleton spark context.static org.apache.spark.api.java.JavaSparkContext
getSparkContextStatic()
static boolean
isLazySparkContextCreation()
static boolean
isLocalMaster()
boolean
isRDDCached(int rddID)
static boolean
isSparkContextCreated()
Indicates if the spark context has been created or has been passed in from outside.void
repartitionAndCacheMatrixObject(String var)
static void
resetSparkContextStatic()
void
setBroadcastHandle(MatrixObject mo)
void
setRDDHandleForVariable(String varname, org.apache.spark.api.java.JavaPairRDD<?,?> rdd)
Keep the output rdd of spark rdd operations as meta data of matrix/frame objects in the symbol table.int
setThreadLocalSchedulerPool()
static FrameBlock
toFrameBlock(org.apache.spark.api.java.JavaPairRDD<Long,FrameBlock> rdd, Types.ValueType[] schema, int rlen, int clen)
static FrameBlock
toFrameBlock(RDDObject rdd, Types.ValueType[] schema, int rlen, int clen)
static org.apache.spark.api.java.JavaPairRDD<Long,FrameBlock>
toFrameJavaPairRDD(org.apache.spark.api.java.JavaSparkContext sc, FrameBlock src)
static MatrixBlock
toMatrixBlock(org.apache.spark.api.java.JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int blen, long nnz)
Utility method for creating a single matrix block out of a binary block RDD.static MatrixBlock
toMatrixBlock(org.apache.spark.api.java.JavaPairRDD<MatrixIndexes,MatrixCell> rdd, int rlen, int clen, long nnz)
Utility method for creating a single matrix block out of a binary cell RDD.static MatrixBlock
toMatrixBlock(RDDObject rdd, int rlen, int clen, int blen, long nnz)
This method is a generic abstraction for calls from the buffer pool.static MatrixBlock
toMatrixBlock(RDDObject rdd, int rlen, int clen, long nnz)
static org.apache.spark.api.java.JavaPairRDD<MatrixIndexes,MatrixBlock>
toMatrixJavaPairRDD(org.apache.spark.api.java.JavaSparkContext sc, MatrixBlock src, int blen)
static org.apache.spark.api.java.JavaPairRDD<MatrixIndexes,MatrixBlock>
toMatrixJavaPairRDD(org.apache.spark.api.java.JavaSparkContext sc, MatrixBlock src, int blen, int numParts, boolean inclEmpty)
static PartitionedBlock<MatrixBlock>
toPartitionedMatrixBlock(org.apache.spark.api.java.JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int blen, long nnz)
static TensorBlock
toTensorBlock(org.apache.spark.api.java.JavaPairRDD<TensorIndexes,TensorBlock> rdd, DataCharacteristics dc)
static org.apache.spark.api.java.JavaPairRDD<TensorIndexes,TensorBlock>
toTensorJavaPairRDD(org.apache.spark.api.java.JavaSparkContext sc, TensorBlock src, int blen)
static org.apache.spark.api.java.JavaPairRDD<TensorIndexes,TensorBlock>
toTensorJavaPairRDD(org.apache.spark.api.java.JavaSparkContext sc, TensorBlock src, int blen, int numParts, boolean inclEmpty)
static void
writeFrameRDDtoHDFS(RDDObject rdd, String path, Types.FileFormat fmt)
static long
writeMatrixRDDtoHDFS(RDDObject rdd, String path, Types.FileFormat fmt)
-
Methods inherited from class org.apache.sysds.runtime.controlprogram.context.ExecutionContext
addTmpParforFunction, allocateGPUMatrixObject, cleanupDataObject, containsVariable, containsVariable, createCacheableData, createFrameObject, createFrameObject, createMatrixObject, createMatrixObject, getCacheableData, getCacheableData, getDataCharacteristics, getDenseMatrixOutputForGPUInstruction, getDenseMatrixOutputForGPUInstruction, getFrameInput, getFrameObject, getFrameObject, getGPUContext, getGPUContexts, getGPUDensePointerAddress, getGPUSparsePointerAddress, getLineage, getLineageItem, getLineageItem, getListObject, getListObject, getMatrixInput, getMatrixInput, getMatrixInputForGPUInstruction, getMatrixInputs, getMatrixInputs, getMatrixLineagePair, getMatrixLineagePair, getMatrixObject, getMatrixObject, getMetaData, getNumGPUContexts, getOrCreateLineageItem, getProgram, getScalarInput, getScalarInput, getScalarInputs, getSealClient, getSparseMatrixOutputForGPUInstruction, getSparseMatrixOutputForGPUInstruction, getTensorInput, getTensorObject, getTID, getTmpParforFunctions, getVariable, getVariable, getVariables, getVarList, getVarListPartitioned, isAutoCreateVars, isFederated, isFederated, isFrameObject, isMatrixObject, maintainLineageDebuggerInfo, pinVariables, releaseCacheableData, releaseFrameInput, releaseMatrixInput, releaseMatrixInput, releaseMatrixInputForGPUInstruction, releaseMatrixInputs, releaseMatrixInputs, releaseMatrixOutputForGPUInstruction, releaseTensorInput, releaseTensorInput, removeVariable, setAutoCreateVars, setFrameOutput, setGPUContexts, setLineage, setMatrixOutput, setMatrixOutput, setMatrixOutputAndLineage, setMetaData, setMetaData, setProgram, setScalarOutput, setSealClient, setTensorOutput, setTID, setVariable, setVariables, toString, traceLineage, unpinVariables
-
-
-
-
Field Detail
-
FAIR_SCHEDULER_MODE
public static final boolean FAIR_SCHEDULER_MODE
- See Also:
- Constant Field Values
-
-
Method Detail
-
getSparkContext
public org.apache.spark.api.java.JavaSparkContext getSparkContext()
Returns the used singleton spark context. In case of lazy spark context creation, this methods blocks until the spark context is created.- Returns:
- java spark context
-
getSparkContextStatic
public static org.apache.spark.api.java.JavaSparkContext getSparkContextStatic()
-
isSparkContextCreated
public static boolean isSparkContextCreated()
Indicates if the spark context has been created or has been passed in from outside.- Returns:
- true if spark context created
-
resetSparkContextStatic
public static void resetSparkContextStatic()
-
close
public void close()
-
isLazySparkContextCreation
public static boolean isLazySparkContextCreation()
-
createSystemDSSparkConf
public static org.apache.spark.SparkConf createSystemDSSparkConf()
Sets up a SystemDS-preferred Spark configuration based on the implicit default configuration (as passed via configurations from outside).- Returns:
- spark configuration
-
isLocalMaster
public static boolean isLocalMaster()
-
getBinaryMatrixBlockRDDHandleForVariable
public org.apache.spark.api.java.JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryMatrixBlockRDDHandleForVariable(String varname)
Spark instructions should call this for all matrix inputs except broadcast variables.- Parameters:
varname
- variable name- Returns:
- JavaPairRDD of MatrixIndexes-MatrixBlocks
-
getBinaryMatrixBlockRDDHandleForVariable
public org.apache.spark.api.java.JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryMatrixBlockRDDHandleForVariable(String varname, int numParts, boolean inclEmpty)
-
getBinaryTensorBlockRDDHandleForVariable
public org.apache.spark.api.java.JavaPairRDD<TensorIndexes,TensorBlock> getBinaryTensorBlockRDDHandleForVariable(String varname)
Spark instructions should call this for all tensor inputs except broadcast variables.- Parameters:
varname
- variable name- Returns:
- JavaPairRDD of TensorIndexes-HomogTensors
-
getBinaryTensorBlockRDDHandleForVariable
public org.apache.spark.api.java.JavaPairRDD<TensorIndexes,TensorBlock> getBinaryTensorBlockRDDHandleForVariable(String varname, int numParts, boolean inclEmpty)
-
getFrameBinaryBlockRDDHandleForVariable
public org.apache.spark.api.java.JavaPairRDD<Long,FrameBlock> getFrameBinaryBlockRDDHandleForVariable(String varname)
Spark instructions should call this for all frame inputs except broadcast variables.- Parameters:
varname
- variable name- Returns:
- JavaPairRDD of Longs-FrameBlocks
-
getRDDHandleForVariable
public org.apache.spark.api.java.JavaPairRDD<?,?> getRDDHandleForVariable(String varname, Types.FileFormat fmt, int numParts, boolean inclEmpty)
-
getRDDHandleForMatrixObject
public org.apache.spark.api.java.JavaPairRDD<?,?> getRDDHandleForMatrixObject(MatrixObject mo, Types.FileFormat fmt)
-
getRDDHandleForMatrixObject
public org.apache.spark.api.java.JavaPairRDD<?,?> getRDDHandleForMatrixObject(MatrixObject mo, Types.FileFormat fmt, int numParts, boolean inclEmpty)
-
getRDDHandleForTensorObject
public org.apache.spark.api.java.JavaPairRDD<?,?> getRDDHandleForTensorObject(TensorObject to, Types.FileFormat fmt, int numParts, boolean inclEmpty)
-
getRDDHandleForFrameObject
public org.apache.spark.api.java.JavaPairRDD<?,?> getRDDHandleForFrameObject(FrameObject fo, Types.FileFormat fmt)
FIXME: currently this implementation assumes matrix representations but frame signature in order to support the old transform implementation.- Parameters:
fo
- frame objectfmt
- file format type- Returns:
- JavaPairRDD handle for a frame object
-
broadcastVariable
public org.apache.spark.broadcast.Broadcast<CacheBlock> broadcastVariable(CacheableData<CacheBlock> cd)
-
getBroadcastForMatrixObject
public PartitionedBroadcast<MatrixBlock> getBroadcastForMatrixObject(MatrixObject mo)
-
setBroadcastHandle
public void setBroadcastHandle(MatrixObject mo)
-
getBroadcastForTensorObject
public PartitionedBroadcast<TensorBlock> getBroadcastForTensorObject(TensorObject to)
-
getBroadcastForVariable
public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable(String varname)
-
getBroadcastForTensorVariable
public PartitionedBroadcast<TensorBlock> getBroadcastForTensorVariable(String varname)
-
getBroadcastForFrameVariable
public PartitionedBroadcast<FrameBlock> getBroadcastForFrameVariable(String varname)
-
setRDDHandleForVariable
public void setRDDHandleForVariable(String varname, org.apache.spark.api.java.JavaPairRDD<?,?> rdd)
Keep the output rdd of spark rdd operations as meta data of matrix/frame objects in the symbol table.- Parameters:
varname
- variable namerdd
- JavaPairRDD handle for variable
-
toMatrixJavaPairRDD
public static org.apache.spark.api.java.JavaPairRDD<MatrixIndexes,MatrixBlock> toMatrixJavaPairRDD(org.apache.spark.api.java.JavaSparkContext sc, MatrixBlock src, int blen)
-
toMatrixJavaPairRDD
public static org.apache.spark.api.java.JavaPairRDD<MatrixIndexes,MatrixBlock> toMatrixJavaPairRDD(org.apache.spark.api.java.JavaSparkContext sc, MatrixBlock src, int blen, int numParts, boolean inclEmpty)
-
toTensorJavaPairRDD
public static org.apache.spark.api.java.JavaPairRDD<TensorIndexes,TensorBlock> toTensorJavaPairRDD(org.apache.spark.api.java.JavaSparkContext sc, TensorBlock src, int blen)
-
toTensorJavaPairRDD
public static org.apache.spark.api.java.JavaPairRDD<TensorIndexes,TensorBlock> toTensorJavaPairRDD(org.apache.spark.api.java.JavaSparkContext sc, TensorBlock src, int blen, int numParts, boolean inclEmpty)
-
toFrameJavaPairRDD
public static org.apache.spark.api.java.JavaPairRDD<Long,FrameBlock> toFrameJavaPairRDD(org.apache.spark.api.java.JavaSparkContext sc, FrameBlock src)
-
toMatrixBlock
public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, int blen, long nnz)
This method is a generic abstraction for calls from the buffer pool.- Parameters:
rdd
- rdd objectrlen
- number of rowsclen
- number of columnsblen
- block lengthnnz
- number of non-zeros- Returns:
- matrix block
-
toMatrixBlock
public static MatrixBlock toMatrixBlock(org.apache.spark.api.java.JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int blen, long nnz)
Utility method for creating a single matrix block out of a binary block RDD. Note that this collect call might trigger execution of any pending transformations. NOTE: This is an unguarded utility function, which requires memory for both the output matrix and its collected, blocked representation.- Parameters:
rdd
- JavaPairRDD for matrix blockrlen
- number of rowsclen
- number of columnsblen
- block lengthnnz
- number of non-zeros- Returns:
- Local matrix block
-
toMatrixBlock
public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, long nnz)
-
toMatrixBlock
public static MatrixBlock toMatrixBlock(org.apache.spark.api.java.JavaPairRDD<MatrixIndexes,MatrixCell> rdd, int rlen, int clen, long nnz)
Utility method for creating a single matrix block out of a binary cell RDD. Note that this collect call might trigger execution of any pending transformations.- Parameters:
rdd
- JavaPairRDD for matrix blockrlen
- number of rowsclen
- number of columnsnnz
- number of non-zeros- Returns:
- matrix block
-
toTensorBlock
public static TensorBlock toTensorBlock(org.apache.spark.api.java.JavaPairRDD<TensorIndexes,TensorBlock> rdd, DataCharacteristics dc)
-
toPartitionedMatrixBlock
public static PartitionedBlock<MatrixBlock> toPartitionedMatrixBlock(org.apache.spark.api.java.JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int blen, long nnz)
-
toFrameBlock
public static FrameBlock toFrameBlock(RDDObject rdd, Types.ValueType[] schema, int rlen, int clen)
-
toFrameBlock
public static FrameBlock toFrameBlock(org.apache.spark.api.java.JavaPairRDD<Long,FrameBlock> rdd, Types.ValueType[] schema, int rlen, int clen)
-
writeMatrixRDDtoHDFS
public static long writeMatrixRDDtoHDFS(RDDObject rdd, String path, Types.FileFormat fmt)
-
writeFrameRDDtoHDFS
public static void writeFrameRDDtoHDFS(RDDObject rdd, String path, Types.FileFormat fmt)
-
addLineageRDD
public void addLineageRDD(String varParent, String varChild)
Adds a child rdd object to the lineage of a parent rdd.- Parameters:
varParent
- parent variablevarChild
- child variable
-
addLineageBroadcast
public void addLineageBroadcast(String varParent, String varChild)
Adds a child broadcast object to the lineage of a parent rdd.- Parameters:
varParent
- parent variablevarChild
- child variable
-
cleanupCacheableData
public void cleanupCacheableData(CacheableData<?> mo)
- Overrides:
cleanupCacheableData
in classExecutionContext
-
cleanupBroadcastVariable
public static void cleanupBroadcastVariable(org.apache.spark.broadcast.Broadcast<?> bvar)
This call destroys a broadcast variable at all executors and the driver. Hence, it is intended to be used on rmvar only. Depending on the ASYNCHRONOUS_VAR_DESTROY configuration, this is asynchronous or not.- Parameters:
bvar
- broadcast variable
-
cleanupRDDVariable
public static void cleanupRDDVariable(org.apache.spark.api.java.JavaPairRDD<?,?> rvar)
This call removes an rdd variable from executor memory and disk if required. Hence, it is intended to be used on rmvar only. Depending on the ASYNCHRONOUS_VAR_DESTROY configuration, this is asynchronous or not.- Parameters:
rvar
- rdd variable to remove
-
repartitionAndCacheMatrixObject
public void repartitionAndCacheMatrixObject(String var)
-
cacheMatrixObject
public void cacheMatrixObject(String var)
-
setThreadLocalSchedulerPool
public int setThreadLocalSchedulerPool()
-
cleanupThreadLocalSchedulerPool
public void cleanupThreadLocalSchedulerPool(int pool)
-
isRDDCached
public boolean isRDDCached(int rddID)
-
getSparkClusterConfig
public static SparkExecutionContext.SparkClusterConfig getSparkClusterConfig()
Obtains the lazily analyzed spark cluster configuration.- Returns:
- spark cluster configuration
-
getBroadcastMemoryBudget
public static double getBroadcastMemoryBudget()
Obtains the available memory budget for broadcast variables in bytes.- Returns:
- broadcast memory budget
-
getDataMemoryBudget
public static double getDataMemoryBudget(boolean min, boolean refresh)
Obtain the available memory budget for data storage in bytes.- Parameters:
min
- flag for minimum data budgetrefresh
- flag for refresh with spark context- Returns:
- data memory budget
-
getNumExecutors
public static int getNumExecutors()
Obtain the number of executors in the cluster (excluding the driver).- Returns:
- number of executors
-
getDefaultParallelism
public static int getDefaultParallelism(boolean refresh)
Obtain the default degree of parallelism (cores in the cluster).- Parameters:
refresh
- flag for refresh with spark context- Returns:
- default degree of parallelism
-
-