Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/org/apache/sysds/conf/DMLConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public class DMLConfig
_defaultVals.put(FLOATING_POINT_PRECISION, "double" );
_defaultVals.put(USE_SSL_FEDERATED_COMMUNICATION, "false");
_defaultVals.put(DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT, "10");
_defaultVals.put(FEDERATED_TIMEOUT, "-1");
_defaultVals.put(FEDERATED_TIMEOUT, "86400"); // default 1 day compute timeout.
_defaultVals.put(FEDERATED_PLANNER, FederatedPlanner.RUNTIME.name());
_defaultVals.put(FEDERATED_PAR_CONN, "-1"); // vcores
_defaultVals.put(FEDERATED_PAR_INST, "-1"); // vcores
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private List<CompressedSizeInfoColGroup> coCodeBruteForce(List<CompressedSizeInf

final List<ColIndexes> workSet = new ArrayList<>(inputColumns.size());
k = k <= 0 ? InfrastructureAnalyzer.getLocalParallelism() : k;
final ExecutorService pool = CommonThreadPool.get(k);
final ExecutorService pool = k > 1 ? CommonThreadPool.get(k) : null;
try {
for(int i = 0; i < inputColumns.size(); i++) {
CompressedSizeInfoColGroup g = inputColumns.get(i);
Expand Down Expand Up @@ -183,7 +183,8 @@ else if((newCostIfJoined < changeInCost ||
return ret;
}
finally {
pool.shutdown();
if(pool != null)
pool.shutdown();
}
}

Expand All @@ -195,8 +196,12 @@ protected void parallelFirstCombine(List<ColIndexes> workSet, ExecutorService po
for(int j = i + 1; j < size; j++)
tasks.add(new CombineTask(workSet.get(i), workSet.get(j)));

for(Future<Object> t : pool.invokeAll(tasks))
t.get();
if(pool != null)
for(Future<Object> t : pool.invokeAll(tasks))
t.get();
else
for(CombineTask t: tasks)
t.call();
}
catch(Exception e) {
throw new DMLCompressionException("Failed parallelize first level all join all", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,38 @@ private static void divideByNumberOfCellsForMeanAll(CompressedMatrixBlock m1, Ma

private static void decompressingAggregate(CompressedMatrixBlock m1, MatrixBlock ret, AggregateUnaryOperator op,
MatrixIndexes indexesIn, boolean inCP) throws Exception {
List<Future<MatrixBlock>> rtasks = generateUnaryAggregateOverlappingFutures(m1, ret, op);
reduceFutures(rtasks, ret, op, true);
if(op.getNumThreads() > 1){

List<Future<MatrixBlock>> rtasks = generateUnaryAggregateOverlappingFutures(m1, ret, op);
reduceFutures(rtasks, ret, op, true);
}
else{
final int nCol = m1.getNumColumns();
final int nRow = m1.getNumRows();
final List<AColGroup> groups = m1.getColGroups();
final boolean shouldFilter = CLALibUtils.shouldPreFilter(groups);

final UAOverlappingTask t;
if(shouldFilter) {
final double[] constV = new double[nCol];
final List<AColGroup> filteredGroups = CLALibUtils.filterGroups(groups, constV);
final AColGroup cRet = ColGroupConst.create(constV);
filteredGroups.add(cRet);
t = new UAOverlappingTask(filteredGroups, ret, 0, nRow, op, nCol);
}
else {
t = new UAOverlappingTask(groups, ret, 0, nRow, op, nCol);
}
if(op.indexFn instanceof ReduceAll)
ret.set(0,0,t.call().get(0,0));
else if(op.indexFn instanceof ReduceRow) {
final boolean isPlus = op.aggOp.increOp.fn instanceof Mean || op.aggOp.increOp.fn instanceof KahanFunction;
final BinaryOperator bop = isPlus ? new BinaryOperator(Plus.getPlusFnObject()) : op. aggOp.increOp;
LibMatrixBincell.bincellOpInPlace(ret, t.call(), bop);
}
else // reduce cols just get the tasks done.
t.call();
}
}

private static void reduceFutures(List<Future<MatrixBlock>> futures, MatrixBlock ret, AggregateUnaryOperator op,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,35 @@ public static MatrixBlock scalarOperations(ScalarOperator sop, CompressedMatrixB
}

private static MatrixBlock fusedScalarAndDecompress(CompressedMatrixBlock in, ScalarOperator sop) {
if(sop.getNumThreads() <= 1)
return singleThreadFusedScalarAndDecompress(in, sop);
return parallelFusedScalarAndDecompress(in, sop) ;
}

private static MatrixBlock singleThreadFusedScalarAndDecompress(CompressedMatrixBlock in, ScalarOperator sop){
final int nRow = in.getNumRows();
final int nCol = in.getNumColumns();
final MatrixBlock out = new MatrixBlock(nRow, nCol, false);
out.allocateDenseBlock();
final DenseBlock db = out.getDenseBlock();
final List<AColGroup> groups = in.getColGroups();
long nnz = fusedDecompressAndScalar(groups, nCol, 0, nRow, db, sop);
out.setNonZeros(nnz);
out.examSparsity(true);
return out;
}

private static MatrixBlock parallelFusedScalarAndDecompress(CompressedMatrixBlock in, ScalarOperator sop) {
int k = sop.getNumThreads();
ExecutorService pool = CommonThreadPool.get(k);
try {
final int nRow = in.getNumRows();
final int nRow = in.getNumRows();
final int nCol = in.getNumColumns();
final MatrixBlock out = new MatrixBlock(nRow, nCol, false);
final List<AColGroup> groups = in.getColGroups();
out.allocateDenseBlock();
final DenseBlock db = out.getDenseBlock();
final int blkz = Math.max((int)(Math.ceil((double)nRow / k)), 256);
final int blkz = Math.max((int) (Math.ceil((double) nRow / k)), 256);
final List<Future<Long>> tasks = new ArrayList<>();
for(int i = 0; i < nRow; i += blkz) {
final int start = i;
Expand All @@ -138,9 +157,6 @@ private static MatrixBlock fusedScalarAndDecompress(CompressedMatrixBlock in, Sc
finally {
pool.shutdown();
}

// MatrixBlock m1d = m1.decompress(sop.getNumThreads());
// return m1d.scalarOperations(sop, result);
}

private static long fusedDecompressAndScalar(final List<AColGroup> groups, int nCol, int start, int end,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,8 @@ public static List<DataObjectModel> getWorkerDataObjects() {
return new ArrayList<>(workerDataObjects.values());
}

public static void addEvent(EventModel event) {
public synchronized static void addEvent(EventModel event) {
// synchronized, because multiple requests can be handled concurrently
workerEvents.add(event);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,7 @@ private FederatedResponse execUDF(FederatedRequest request, ExecutionContextMap
// get function and input parameters
try {
FederatedUDF udf = (FederatedUDF) request.getParam(0);
if(LOG.isDebugEnabled())
LOG.debug(udf);
LOG.debug(udf);

eventStage.operation = udf.getClass().getSimpleName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.fedplanner.FTypes.AlignType;
import org.apache.sysds.hops.fedplanner.FTypes.FType;
import org.apache.sysds.lops.RightIndex;
Expand Down Expand Up @@ -637,11 +640,25 @@ public long getMaxIndexInRange(int dim) {
* @param forEachFunction function to execute for each pair
*/
public void forEachParallel(BiFunction<FederatedRange, FederatedData, Void> forEachFunction) {
ExecutorService pool = CommonThreadPool.get(_fedMap.size());
ExecutorService pool = Executors.newFixedThreadPool(_fedMap.size());
ArrayList<MappingTask> mappingTasks = new ArrayList<>();
for(Pair<FederatedRange, FederatedData> fedMap : _fedMap)
mappingTasks.add(new MappingTask(fedMap.getKey(), fedMap.getValue(), forEachFunction, _ID));
CommonThreadPool.invokeAndShutdown(pool, mappingTasks);

try {
for(Future<?> t:pool.invokeAll(mappingTasks, ConfigurationManager.getFederatedTimeout(), TimeUnit.SECONDS)){
if(!t.isDone())
throw new RuntimeException("Timeout");
else if(t.isCancelled())
throw new RuntimeException("Failed");
}
}
catch(InterruptedException e) {
throw new RuntimeException(e);
}
finally{
pool.shutdown();
}
}

/**
Expand All @@ -655,15 +672,25 @@ public void forEachParallel(BiFunction<FederatedRange, FederatedData, Void> forE
* @return the new <code>FederationMap</code>
*/
public FederationMap mapParallel(long newVarID, BiFunction<FederatedRange, FederatedData, Void> mappingFunction) {
ExecutorService pool = CommonThreadPool.get(_fedMap.size());

ExecutorService pool = Executors.newFixedThreadPool(_fedMap.size());
FederationMap fedMapCopy = copyWithNewID(_ID);
ArrayList<MappingTask> mappingTasks = new ArrayList<>();
for(Pair<FederatedRange, FederatedData> fedMap : fedMapCopy._fedMap)
mappingTasks.add(new MappingTask(fedMap.getKey(), fedMap.getValue(), mappingFunction, newVarID));
CommonThreadPool.invokeAndShutdown(pool, mappingTasks);
fedMapCopy._ID = newVarID;
return fedMapCopy;
try{
for(Future<?> t : pool.invokeAll(mappingTasks, ConfigurationManager.getFederatedTimeout(), TimeUnit.SECONDS)){
if(!t.isDone())
throw new RuntimeException("Timeout");
else if(t.isCancelled()){
throw new RuntimeException("Failed");
}
}
fedMapCopy._ID = newVarID;
return fedMapCopy;
}
catch(Exception e){
throw new RuntimeException(e);
}
}

public FederationMap filter(IndexRange ixrange) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -61,6 +62,7 @@
import org.apache.spark.util.LongAccumulator;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.ExecType;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.recompile.Recompiler;
import org.apache.sysds.parser.Statement.FederatedPSScheme;
import org.apache.sysds.parser.Statement.PSFrequency;
Expand Down Expand Up @@ -241,13 +243,16 @@ model, aggServiceEC, getValFunction(), getNumBatchesPerEpoch(runtimeBalancing, r

try {
// Launch the worker threads and wait for completion
for (Future<Void> ret : es.invokeAll(threads))
ret.get(); //error handling
for (Future<Void> ret : es.invokeAll(threads, ConfigurationManager.getFederatedTimeout(), TimeUnit.SECONDS)){
if(!ret.isDone())
throw new RuntimeException("Failed federated execution");
// ret.get(); //error handling
}
// Fetch the final model from ps
ec.setVariable(output.getName(), ps.getResult());
if (DMLScript.STATISTICS)
ParamServStatistics.accExecutionTime((long) ParamServStatistics.getExecutionTimer().stop());
} catch (InterruptedException | ExecutionException e) {
} catch (Exception e) {
throw new DMLRuntimeException("ParamservBuiltinCPInstruction: unknown error: ", e);
} finally {
es.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.sysds.common.Types;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.hops.fedplanner.FTypes;
import org.apache.sysds.hops.fedplanner.FTypes.FType;
import org.apache.sysds.lops.PickByCount;
Expand All @@ -47,10 +48,10 @@
import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse.ResponseType;
import org.apache.sysds.runtime.frame.data.FrameBlock;
import org.apache.sysds.runtime.controlprogram.federated.FederatedUDF;
import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
import org.apache.sysds.runtime.frame.data.FrameBlock;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
import org.apache.sysds.runtime.instructions.cp.Data;
Expand Down Expand Up @@ -175,6 +176,7 @@ public void processInstruction(ExecutionContext ec) {
try {
FederatedResponse response = responseFuture.get();
MultiColumnEncoder encoder = (MultiColumnEncoder) response.getData()[0];

// merge this encoder into a composite encoder
synchronized(finalGlobalEncoder) {
finalGlobalEncoder.mergeAt(encoder, columnOffset, (int) (range.getBeginDims()[0] + 1));
Expand Down Expand Up @@ -378,24 +380,30 @@ public ExecuteFrameEncoder(long input, long output, MultiColumnEncoder encoder)

@Override
public FederatedResponse execute(ExecutionContext ec, Data... data) {
FrameBlock fb = ((FrameObject) data[0]).acquireReadAndRelease();

// offset is applied on the Worker to shift the local encoders to their respective column
_encoder.applyColumnOffset();
// apply transformation
//MatrixBlock mbout = _encoder.apply(fb, OptimizerUtils.getTransformNumThreads());
// FIXME: Enabling multithreading intermittently hangs
MatrixBlock mbout = _encoder.apply(fb, 1);

// create output matrix object
MatrixObject mo = ExecutionContext.createMatrixObject(mbout);

// add it to the list of variables
ec.setVariable(String.valueOf(_outputID), mo);
try{

// return id handle
return new FederatedResponse(
ResponseType.SUCCESS_EMPTY, mbout.getNonZeros());
FrameBlock fb = ((FrameObject) data[0]).acquireReadAndRelease();

// offset is applied on the Worker to shift the local encoders to their respective column
_encoder.applyColumnOffset();
// apply transformation
MatrixBlock mbout = _encoder.apply(fb, OptimizerUtils.getTransformNumThreads());
// FIXME: Enabling multithreading intermittently hangs
// MatrixBlock mbout = _encoder.apply(fb, 1);

// create output matrix object
MatrixObject mo = ExecutionContext.createMatrixObject(mbout);

// add it to the list of variables
ec.setVariable(String.valueOf(_outputID), mo);

// return id handle
return new FederatedResponse(
ResponseType.SUCCESS_EMPTY, mbout.getNonZeros());
}
catch(Exception e){
return new FederatedResponse(ResponseType.ERROR);
}
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion src/test/config/SystemDS-MultiTenant-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@
<!-- The timeout of the federated tests to initialize the federated matrixes -->
<sysds.federated.initialization.timeout>30</sysds.federated.initialization.timeout>
<!-- The timeout of each instruction sent to federated workers -->
<sysds.federated.timeout>128</sysds.federated.timeout>
<sysds.federated.timeout>16</sysds.federated.timeout>
<sysds.local.spark>true</sysds.local.spark>
</root>
2 changes: 1 addition & 1 deletion src/test/config/SystemDS-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@
<!-- The timeout of the federated tests to initialize the federated matrixes -->
<sysds.federated.initialization.timeout>2</sysds.federated.initialization.timeout>
<!-- The timeout of each instruction sent to federated workers -->
<sysds.federated.timeout>128</sysds.federated.timeout>
<sysds.federated.timeout>16</sysds.federated.timeout>
</root>
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ private void testCodegenIntegration( String testname, boolean rewrites, ExecType
{
boolean oldFlag = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
ExecMode platformOld = setExecMode(instType);
setOutputBuffering(true);

try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public void testTokenizeFullDenseFrameCP() {

private void runAggregateOperationTest(ExecMode execMode) {
setExecMode(execMode);
setOutputBuffering(true);

String TEST_NAME = TEST_NAME1;

Expand Down
Loading