From 90ed74743c12b9fe2a5f2e166c210a1275bc906c Mon Sep 17 00:00:00 2001 From: ppallewatta Date: Wed, 16 Feb 2022 09:34:05 +0530 Subject: [PATCH] Dynamic scheduling scenario updated to enable static clustering. --- .../fog/entities/MicroserviceFogDevice.java | 43 +++++++++---------- ...DistributedMicroservicePlacementLogic.java | 16 +++++-- .../placement/MicroservicesController.java | 28 +++++++++++- .../perfeval/MicroservicesAppSample1.java | 27 ++++++------ 4 files changed, 71 insertions(+), 43 deletions(-) diff --git a/src/org/fog/entities/MicroserviceFogDevice.java b/src/org/fog/entities/MicroserviceFogDevice.java index d732880d..dbd37884 100644 --- a/src/org/fog/entities/MicroserviceFogDevice.java +++ b/src/org/fog/entities/MicroserviceFogDevice.java @@ -115,23 +115,6 @@ public void addPlacementRequest(PlacementRequest pr) { sendNow(getId(), FogEvents.PROCESS_PRS); } - private void sendThroughFreeClusterLink(Tuple tuple, Integer clusterNodeID) { - double networkDelay = tuple.getCloudletFileSize() / getClusterLinkBandwidth(); - setClusterLinkBusy(true); - double latency = (getClusterMembersToLatencyMap()).get(clusterNodeID); - send(getId(), networkDelay, FogEvents.UPDATE_CLUSTER_TUPLE_QUEUE); - - if (tuple instanceof ManagementTuple) { - send(clusterNodeID, networkDelay + latency + ((ManagementTuple) tuple).processingDelay, FogEvents.MANAGEMENT_TUPLE_ARRIVAL, tuple); - //todo -// if (Config.ENABLE_NETWORK_USAGE_AT_PLACEMENT) -// NetworkUsageMonitor.sendingManagementTuple(latency, tuple.getCloudletFileSize()); - } else { - send(clusterNodeID, networkDelay + latency, FogEvents.TUPLE_ARRIVAL, tuple); - NetworkUsageMonitor.sendingTuple(latency, tuple.getCloudletFileSize()); - } - } - protected void setDeviceType(String deviceType) { if (deviceType.equals(MicroserviceFogDevice.CLIENT) || deviceType.equals(MicroserviceFogDevice.FCN) || deviceType.equals(MicroserviceFogDevice.FON) || deviceType.equals(MicroserviceFogDevice.CLOUD)) @@ -197,7 +180,7 @@ protected void processTupleArrival(SimEvent ev) { if (tuple.getDirection() == Tuple.UP) { int destination = controllerComponent.getDestinationDeviceId(tuple.getDestModuleName()); if (destination == -1) { - System.out.println("Service DiscoveryInfo missing. Tuple routing stopped for : " + tuple.getDestModuleName()); + System.out.println("Service DiscoveryInfo missing in device : " + getId() + "-" + getDeviceType() + ". Tuple routing stopped for : " + tuple.getDestModuleName()); return; } tuple.setDestinationDeviceId(destination); @@ -452,14 +435,14 @@ protected void processModuleArrival(SimEvent ev) { module.updateVmProcessing(CloudSim.clock(), getVmAllocationPolicy().getHost(module).getVmScheduler() .getAllocatedMipsForVm(module)); - System.out.println("Module " + module.getName() + "created on " + getName() + " under Launch module"); + System.out.println("Module " + module.getName() + " created on " + getName() + " under Launch module"); Logger.debug("Module deploy success", "Module " + module.getName() + " placement on " + getName() + " successful. vm id : " + module.getId()); } else { Logger.error("Module deploy error", "Module " + module.getName() + " placement on " + getName() + " failed"); System.out.println("Module " + module.getName() + " placement on " + getName() + " failed"); } } else { - System.out.println("Module " + module.getName() + " already deplyed on" + getName()); + System.out.println("Module " + module.getName() + " already deployed on " + getName()); } } @@ -653,7 +636,21 @@ protected void sendUpFreeLink(Tuple tuple) { } else { super.sendUpFreeLink(tuple); } + } + protected void sendToCluster(Tuple tuple, int clusterNodeID) { + if (tuple instanceof ManagementTuple) { + double networkDelay = tuple.getCloudletFileSize() / getClusterLinkBandwidth(); + setClusterLinkBusy(true); + double latency = getClusterMembersToLatencyMap().get(clusterNodeID); + send(getId(), networkDelay, FogEvents.UPDATE_CLUSTER_TUPLE_QUEUE); + send(clusterNodeID, networkDelay + latency + ((ManagementTuple) tuple).processingDelay, FogEvents.MANAGEMENT_TUPLE_ARRIVAL, tuple); + //todo +// if (Config.ENABLE_NETWORK_USAGE_AT_PLACEMENT) +// NetworkUsageMonitor.sendingManagementTuple(latency, tuple.getCloudletFileSize()); + } else { + super.sendToCluster(tuple, clusterNodeID); + } } public void updateRoutingTable(int destId, int nextId) { @@ -661,13 +658,13 @@ public void updateRoutingTable(int destId, int nextId) { } private void updateCLusterConsInRoutingTable() { - for(int deviceId:clusterMembers){ - routingTable.put(deviceId,deviceId); + for (int deviceId : clusterMembers) { + routingTable.put(deviceId, deviceId); } } public void removeMonitoredDevice(FogDevice fogDevice) { - controllerComponent.removeMonitoredDevice(fogDevice); + controllerComponent.removeMonitoredDevice(fogDevice); } public void addMonitoredDevice(FogDevice fogDevice) { diff --git a/src/org/fog/placement/DistributedMicroservicePlacementLogic.java b/src/org/fog/placement/DistributedMicroservicePlacementLogic.java index e61e4de4..6acf6dff 100644 --- a/src/org/fog/placement/DistributedMicroservicePlacementLogic.java +++ b/src/org/fog/placement/DistributedMicroservicePlacementLogic.java @@ -78,7 +78,11 @@ public void updateResources(Map> resourceAvailabili @Override public void postProcessing() { - + currentCpuLoad = 0.0; + currentModuleMap = new ArrayList<>(); + currentModuleLoadMap = new HashMap<>(); + currentModuleInstanceNum = new HashMap<>(); + prStatus = new HashMap<>(); } private PlacementLogicOutput generatePlacementMap() { @@ -87,7 +91,7 @@ private PlacementLogicOutput generatePlacementMap() { placement.put(placementRequest.getPlacementRequestId(), placementRequest.getPlacedMicroservices()); } - Map>> perDevice = new HashMap<>(); + Map>> perDevice = new HashMap<>(); // per this algo only contains microservices placed on this device Map>> serviceDiscoveryInfo = new HashMap<>(); if (placement != null) { for (int prID : placement.keySet()) { @@ -101,6 +105,8 @@ private PlacementLogicOutput generatePlacementMap() { for (String microserviceName : placement.get(prID).keySet()) { int deviceID = placement.get(prID).get(microserviceName); + if(deviceID!=fogDevice.getId()) + continue; //service discovery info propagation List clientDevices = getClientServiceNodeIds(application, microserviceName, placementRequest.getPlacedMicroservices(), placement.get(prID)); for (int clientDevice : clientDevices) { @@ -139,7 +145,11 @@ private PlacementLogicOutput generatePlacementMap() { } - return new PlacementLogicOutput(perDevice, serviceDiscoveryInfo, prStatus); + Map prStatusTemp = new HashMap<>(); + for(PlacementRequest pr:prStatus.keySet()){ + prStatusTemp.put(pr,prStatus.get(pr)); + } + return new PlacementLogicOutput(perDevice, serviceDiscoveryInfo, prStatusTemp); } public void mapModules() { diff --git a/src/org/fog/placement/MicroservicesController.java b/src/org/fog/placement/MicroservicesController.java index 3535aefa..24202f9c 100644 --- a/src/org/fog/placement/MicroservicesController.java +++ b/src/org/fog/placement/MicroservicesController.java @@ -31,6 +31,7 @@ public class MicroservicesController extends SimEntity { * @param fogDevices * @param sensors * @param applications + * Used when monitored devices of each Fog Orchestration Node(which runs the placement logic) is not known but calculated dynamically. */ public MicroservicesController(String name, List fogDevices, List sensors, List applications, List clusterLevels, Double clusterLatency, int placementLogic) { super(name); @@ -46,6 +47,18 @@ public MicroservicesController(String name, List fogDevices, List fogDevices, List sensors, List applications, List clusterLevels, Double clusterLatency, int placementLogic, Map> monitored) { super(name); this.fogDevices = fogDevices; @@ -61,6 +74,11 @@ public MicroservicesController(String name, List fogDevices, List> monitored) { connectWithLatencies(); - if (!Config.ENABLE_STATIC_CLUSTERING) { + /** supports 3 modes of clustering + * 1. No clustering : Config.ENABLE_STATIC_CLUSTERING = false, Config.ENABLE_DYNAMIC_CLUSTERING = false + * 2. Static clustering : Config.ENABLE_STATIC_CLUSTERING = true (Nodes of the same level that shares the same parent) + * 3. Dynamic clustering : Clustered based on location of the nodes and their transmission powers + **/ + if (Config.ENABLE_STATIC_CLUSTERING) { for (Integer id : clustering_levels) createClusterConnections(id, fogDevices, Config.clusteringLatency); } @@ -130,12 +153,13 @@ protected void generateRoutingTable() { for (FogDevice f : fogDevices) { ((MicroserviceFogDevice) f).addRoutingTable(routing.get(f.getId())); } - } public void startEntity() { + //In STATIC mode initial placement is carried out before simulation start if (MicroservicePlacementConfig.SIMULATION_MODE == "STATIC") initiatePlacementRequestProcessing(); + // In DYNAMIC placement initial placement can be carried out after the start of the simulation to simulate the effect of placement delays if (MicroservicePlacementConfig.SIMULATION_MODE == "DYNAMIC") initiatePlacementRequestProcessingDynamic(); diff --git a/src/org/fog/test/perfeval/MicroservicesAppSample1.java b/src/org/fog/test/perfeval/MicroservicesAppSample1.java index 36986436..51e52d42 100644 --- a/src/org/fog/test/perfeval/MicroservicesAppSample1.java +++ b/src/org/fog/test/perfeval/MicroservicesAppSample1.java @@ -41,7 +41,7 @@ public class MicroservicesAppSample1 { static int proxyServers = 2; // proxy server static Integer[] gatewayDevices = new Integer[]{3, 3}; // GW devices - static Integer[] mobilesPerL2 = new Integer[]{3, 2, 1, 2, 3, 1}; // eg : client end devices ( mobiles ) + static Integer[] mobilesPerL2 = new Integer[]{7, 4, 7, 8, 5, 4}; // eg : client end devices ( mobiles ) private static int l2Num = 0; // fog adding l1 nodes static Integer deviceNum = 0; @@ -50,7 +50,7 @@ public class MicroservicesAppSample1 { static Integer[] cpus = new Integer[]{2800, 6000}; static Integer[] ram = new Integer[]{2048, 4096}; - static double ECG_TRANSMISSION_TIME = 5; + static double ECG_TRANSMISSION_TIME = 10; //cluster link latency 2ms static Double clusterLatency = 2.0; @@ -64,10 +64,11 @@ public class MicroservicesAppSample1 { /** * Config properties - * SIMULATION_MODE -> dynamic + * SIMULATION_MODE -> DYNAMIC * PR_PROCESSING_MODE -> SEQUENTIAL * ENABLE_RESOURCE_DATA_SHARING -> true * DYNAMIC_CLUSTERING -> false + * STATIC_CLUSTERING -> true (with clustering) or false (without clustering) */ public static void main(String[] args) { @@ -99,7 +100,7 @@ public static void main(String[] args) { createFogDevices(broker.getId()); List clusterLevelIdentifier = new ArrayList<>(); - clusterLevelIdentifier.add(2); + clusterLevelIdentifier.add(2); // level of the clustered devices Map> monitored = new HashMap<>(); for (FogDevice f : fogDevices) { @@ -173,10 +174,10 @@ private static FogDevice addL2Devices(String id, int userId, int parentId, int p FogDevice dept; if (diffResource) { int pos = deviceNum % 2; - dept = createFogDevice("L2-" + id, cpus[pos], ram[pos], 1250000, 18750, 2, 0.0, 107.339, 83.4333, MicroserviceFogDevice.FON); + dept = createFogDevice("L2-" + parentId + "-" + id, cpus[pos], ram[pos], 1250000, 18750, 2, 0.0, 107.339, 83.4333, MicroserviceFogDevice.FON); deviceNum = deviceNum + 1; } else { - dept = createFogDevice("L2-" + id, 2800, 2048, 1250000, 18750, 2, 0.0, 107.339, 83.4333, MicroserviceFogDevice.FON); + dept = createFogDevice("L2-" + parentId + "_" + id, 2800, 2048, 1250000, 18750, 2, 0.0, 107.339, 83.4333, MicroserviceFogDevice.FON); } fogDevices.add(dept); dept.setParentId(parentId); @@ -194,12 +195,11 @@ private static FogDevice addMobile(String id, int userId, int parentId) { Application application = applications.get(0); String appId = application.getAppId(); - double throughput = 200; FogDevice mobile = createFogDevice("m-" + id, 1000, 2048, 18750, 250, 3, 0, 87.53, 82.44, MicroserviceFogDevice.CLIENT); mobile.setParentId(parentId); - Sensor eegSensor = new Sensor("s-" + id, "ECG", userId, appId, new DeterministicDistribution(1000 / (throughput / 9 * 10))); // inter-transmission time of EEG sensor follows a deterministic distribution + Sensor eegSensor = new Sensor("s-" + id, "ECG", userId, appId, new DeterministicDistribution(ECG_TRANSMISSION_TIME)); // inter-transmission time of EEG sensor follows a deterministic distribution eegSensor.setApp(application); sensors.add(eegSensor); @@ -339,17 +339,14 @@ private static Application createApplication(String appId, int userId) { /* * Adding modules (vertices) to the application model (directed graph) */ - application.addAppModule("client", 128, 605, 100); // adding module Client to the application model MB,MIPS,MB,kbps - application.addAppModule("ECGFeature_Extractor", 256, 630, 200); // adding module Concentration Calculator to the application model - application.addAppModule("ECG_Analyser", 512, 100, 2000); // adding module Connector to the application model + application.addAppModule("client", 128, 300, 100); // adding module Client to the application model MB,MIPS,MB,kbps + application.addAppModule("ECGFeature_Extractor", 256, 450, 200); // adding module Concentration Calculator to the application model + application.addAppModule("ECG_Analyser", 512, 1100, 2000); // adding module Connector to the application model /* * Connecting the application modules (vertices) in the application model (directed graph) with edges */ - if (ECG_TRANSMISSION_TIME == 10) - application.addAppEdge("ECG", "client", 2000, 500, "ECG", Tuple.UP, AppEdge.SENSOR); // adding edge from EEG (sensor) to Client module carrying tuples of type EEG - else - application.addAppEdge("ECG", "client", 3000, 500, "ECG", Tuple.UP, AppEdge.SENSOR); + application.addAppEdge("ECG", "client", 2000, 500, "ECG", Tuple.UP, AppEdge.SENSOR); // adding edge from EEG (sensor) to Client module carrying tuples of type EEG application.addAppEdge("client", "ECGFeature_Extractor", 3500, 500, "_SENSOR", Tuple.UP, AppEdge.MODULE); // adding edge from Client to Concentration Calculator module carrying tuples of type _SENSOR application.addAppEdge("ECGFeature_Extractor", "ECG_Analyser", 100, 10000, 1000, "ECG_FEATURES", Tuple.UP, AppEdge.MODULE); // adding periodic edge (period=1000ms) from Concentration Calculator to Connector module carrying tuples of type PLAYER_GAME_STATE application.addAppEdge("ECGFeature_Extractor", "client", 14, 500, "ECG_FEATURE_ANALYSIS", Tuple.DOWN, AppEdge.MODULE); // adding edge from Concentration Calculator to Client module carrying tuples of type CONCENTRATION