diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/impl/ElasticityController.java b/ass3-elastic/src/main/java/dst/ass3/elastic/impl/ElasticityController.java new file mode 100644 index 0000000..9855aad --- /dev/null +++ b/ass3-elastic/src/main/java/dst/ass3/elastic/impl/ElasticityController.java @@ -0,0 +1,77 @@ +package dst.ass3.elastic.impl; + +import dst.ass3.elastic.ContainerException; +import dst.ass3.elastic.ContainerInfo; +import dst.ass3.elastic.IContainerService; +import dst.ass3.elastic.IElasticityController; +import dst.ass3.messaging.IWorkloadMonitor; +import dst.ass3.messaging.Region; + +import java.util.List; +import java.util.stream.Collectors; + +public class ElasticityController implements IElasticityController { + IContainerService containerService; + IWorkloadMonitor workloadMonitor; + final double alpha = 0.1; // scale-out threshold + final double omega = 0.05; // scale-down threshold + + public ElasticityController(IContainerService containerService, IWorkloadMonitor workloadMonitor) { + this.containerService = containerService; + this.workloadMonitor = workloadMonitor; + } + + @Override + public void adjustWorkers() throws ContainerException { + long[] k = new long[3]; // number of workers per region currently running + long[] q = new long[3]; // number of waiting requests per region + double[] r10 = new double[3]; // avg processing time of last 10 requests per region + int[] rMax = new int[3]; // defined max wait time for request per region + double[] rExp = new double[3]; // expected wait time for last request in queue per region + + rMax[0] = 30 * 1000; // max wait time of at_vienna in milliseconds + rMax[1] = 30 * 1000; // max wait time of at_linz in milliseconds + rMax[2] = 120 * 1000; // max wait time of de_berlin in milliseconds + + k[0] = workloadMonitor.getWorkerCount().get(Region.AT_VIENNA); + k[1] = workloadMonitor.getWorkerCount().get(Region.AT_LINZ); + k[2] = workloadMonitor.getWorkerCount().get(Region.DE_BERLIN); + + q[0] = workloadMonitor.getRequestCount().get(Region.AT_VIENNA); + q[1] = workloadMonitor.getRequestCount().get(Region.AT_LINZ); + q[2] = workloadMonitor.getRequestCount().get(Region.DE_BERLIN); + + r10[0] = workloadMonitor.getAverageProcessingTime().get(Region.AT_VIENNA); + r10[1] = workloadMonitor.getAverageProcessingTime().get(Region.AT_LINZ); + r10[2] = workloadMonitor.getAverageProcessingTime().get(Region.DE_BERLIN); + + for (int i = 0; i < 3; i++) { + rExp[i] = (q[i] * r10[i]) / k[i]; + + if (rExp[i] > rMax[i] * (1 + alpha)) { + // rExp exceeds scale-out threshold + // Calculate workers needed to get back to rMax + double workersNeeded = (q[i] * r10[i]) / rMax[i] - k[i]; + System.out.println("Scaling up " + workersNeeded + " " + Region.values()[i] + " workers"); + + for (int j = 0; j < workersNeeded; j++) { + ContainerInfo containerInfo = containerService.startWorker(Region.values()[i]); + } + } else if (rExp[i] < rMax[i] * (1 - omega)) { + // rExp exceeds scale-down threshold + // Calculate how many workers should be stopped to get back to rMax + double workersStop = ((q[i] * r10[i]) / rMax[i] - k[i]) * (-1); + System.out.println("Scaling down " + workersStop + " " + Region.values()[i] + " workers"); + + List containers = containerService.listContainers(); + for (int j = 0; j < workersStop; j++) { + Region currentRegion = Region.values()[i]; + List containersToRemove = containers.stream() + .filter(containerInfo -> containerInfo.getWorkerRegion().equals(currentRegion)) + .collect(Collectors.toList()); + containerService.stopContainer(containersToRemove.get(0).getContainerId()); + } + } + } + } +} diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/impl/ElasticityFactory.java b/ass3-elastic/src/main/java/dst/ass3/elastic/impl/ElasticityFactory.java index 052ef93..24cbff2 100644 --- a/ass3-elastic/src/main/java/dst/ass3/elastic/impl/ElasticityFactory.java +++ b/ass3-elastic/src/main/java/dst/ass3/elastic/impl/ElasticityFactory.java @@ -16,8 +16,7 @@ public class ElasticityFactory implements IElasticityFactory { @Override public IElasticityController createElasticityController(IContainerService containerService, IWorkloadMonitor workloadMonitor) { - // TODO - return null; + return new ElasticityController(containerService, workloadMonitor); } }