package dst.ass3.elastic.impl; import dst.ass3.elastic.ContainerException; import dst.ass3.elastic.ContainerInfo; import dst.ass3.elastic.IContainerService; import dst.ass3.elastic.IElasticityController; import dst.ass3.messaging.IWorkloadMonitor; import dst.ass3.messaging.Region; import java.util.List; import java.util.stream.Collectors; public class ElasticityController implements IElasticityController { IContainerService containerService; IWorkloadMonitor workloadMonitor; final double alpha = 0.1; // scale-out threshold final double omega = 0.05; // scale-down threshold public ElasticityController(IContainerService containerService, IWorkloadMonitor workloadMonitor) { this.containerService = containerService; this.workloadMonitor = workloadMonitor; } @Override public void adjustWorkers() throws ContainerException { long[] k = new long[3]; // number of workers per region currently running long[] q = new long[3]; // number of waiting requests per region double[] r10 = new double[3]; // avg processing time of last 10 requests per region int[] rMax = new int[3]; // defined max wait time for request per region double[] rExp = new double[3]; // expected wait time for last request in queue per region rMax[0] = 30 * 1000; // max wait time of at_vienna in milliseconds rMax[1] = 30 * 1000; // max wait time of at_linz in milliseconds rMax[2] = 120 * 1000; // max wait time of de_berlin in milliseconds k[0] = workloadMonitor.getWorkerCount().get(Region.AT_VIENNA); k[1] = workloadMonitor.getWorkerCount().get(Region.AT_LINZ); k[2] = workloadMonitor.getWorkerCount().get(Region.DE_BERLIN); q[0] = workloadMonitor.getRequestCount().get(Region.AT_VIENNA); q[1] = workloadMonitor.getRequestCount().get(Region.AT_LINZ); q[2] = workloadMonitor.getRequestCount().get(Region.DE_BERLIN); r10[0] = workloadMonitor.getAverageProcessingTime().get(Region.AT_VIENNA); r10[1] = workloadMonitor.getAverageProcessingTime().get(Region.AT_LINZ); r10[2] = workloadMonitor.getAverageProcessingTime().get(Region.DE_BERLIN); for (int i = 0; i < 3; i++) { rExp[i] = (q[i] * r10[i]) / k[i]; if (rExp[i] > rMax[i] * (1 + alpha)) { // rExp exceeds scale-out threshold // Calculate workers needed to get back to rMax double workersNeeded = (q[i] * r10[i]) / rMax[i] - k[i]; System.out.println("Scaling up " + workersNeeded + " " + Region.values()[i] + " workers"); for (int j = 0; j < workersNeeded; j++) { ContainerInfo containerInfo = containerService.startWorker(Region.values()[i]); } } else if (rExp[i] < rMax[i] * (1 - omega)) { // rExp exceeds scale-down threshold // Calculate how many workers should be stopped to get back to rMax double workersStop = ((q[i] * r10[i]) / rMax[i] - k[i]) * (-1); System.out.println("Scaling down " + workersStop + " " + Region.values()[i] + " workers"); List containers = containerService.listContainers(); for (int j = 0; j < workersStop; j++) { Region currentRegion = Region.values()[i]; List containersToRemove = containers.stream() .filter(containerInfo -> containerInfo.getWorkerRegion().equals(currentRegion)) .collect(Collectors.toList()); containerService.stopContainer(containersToRemove.get(0).getContainerId()); } } } } }