Initial commit
Signed-off-by: Tuan-Dat Tran <tuan-dat.tran@tudattr.dev>
This commit is contained in:
0
aggregator-node/src/.gitkeep
Normal file
0
aggregator-node/src/.gitkeep
Normal file
506
aggregator-node/src/server.py
Normal file
506
aggregator-node/src/server.py
Normal file
@@ -0,0 +1,506 @@
|
||||
import flwr as fl
|
||||
import tensorflow as tf
|
||||
from tensorflow import keras
|
||||
from typing import Dict, Optional, Tuple, List, Union
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from sklearn.preprocessing import MinMaxScaler
|
||||
import sys
|
||||
import json
|
||||
from flwr.server.client_manager import SimpleClientManager
|
||||
from flwr.server.client_proxy import ClientProxy
|
||||
from abc import ABC
|
||||
from logging import INFO
|
||||
from flwr.common.logger import log
|
||||
from time import sleep
|
||||
from time import time_ns
|
||||
from flask import Flask, request
|
||||
import threading
|
||||
import os
|
||||
|
||||
|
||||
Scalar = Union[bool, bytes, float, int, str]
|
||||
Config = Dict[str, Scalar]
|
||||
param_file = None
|
||||
global best_model, list_kpi_11
|
||||
selected_clients_ids = [] # This is the list of client IDs the Agg.Node receives from the DMLO and will use for training.
|
||||
all_round_reports = {} # The dictionary containing all the round reports
|
||||
flwr_port = sys.argv[1]
|
||||
dmlo_port = sys.argv[2]
|
||||
# server_ip = ip
|
||||
l_kpi1, l_kpi2, l_kpi4, l_kpi5, list_kpi_11 = [], [], [], [], []
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
|
||||
@app.route("/config_server", methods=["POST"])
|
||||
def config_server():
|
||||
global param_file
|
||||
param_file = request.json
|
||||
param_received.set()
|
||||
# print("_____Received a config file", flush=True)
|
||||
try:
|
||||
global req_clients
|
||||
# highest_req_clients = max(req_clients, highest_req_clients)
|
||||
req_clients = request["training_clients_per_round"]
|
||||
# print(f"_____The new number of clients (req_clients) is: {req_clients} and the highest was had so far is {highest_req_clients}", flush=True)
|
||||
# if req_clients > highest_req_clients:
|
||||
# print(f"_____Rescaled the last dimension to {req_clients}", flush=True)
|
||||
# kpis = np.resize(kpis, (epochs+1, 12, req_clients))
|
||||
except:
|
||||
# print("_____Except path triggered", flush=True)
|
||||
pass
|
||||
return "Parameters received successfully.", 200
|
||||
|
||||
|
||||
@app.route(
|
||||
"/select_clients", methods=["GET"]
|
||||
) # The method that will receive the list of client IDs the server will use for training.
|
||||
def select_clients():
|
||||
global selected_clients_ids
|
||||
selected_clients_ids = request.json["eligible_clients_ids"]
|
||||
if len(selected_clients_ids) != req_clients:
|
||||
print(
|
||||
f"WARNING: {req_clients} clients are needed but only {len(selected_clients_ids)} client IDs are received. The training will wait for another list with enough eligible clients."
|
||||
)
|
||||
# A selection logic can be added here to modify the "selected_clients_id" variable. Do not forget to modify the next line (return) if this logic is added
|
||||
return request.json, 200
|
||||
|
||||
|
||||
@app.route("/check_connection", methods=["POST"])
|
||||
def check_connection():
|
||||
"""A function part of the older system to synchronize the processes.
|
||||
It does not hurt to keep for the final version to check server availability.
|
||||
"""
|
||||
return "Agg.Node is online", 200
|
||||
|
||||
|
||||
@app.route("/terminate_app", methods=["POST"])
|
||||
def terminate_app():
|
||||
try:
|
||||
save_kpis()
|
||||
except:
|
||||
print("No KPIs saved.")
|
||||
try:
|
||||
global best_model
|
||||
tf.keras.models.save_model(
|
||||
model=best_model,
|
||||
filepath="../resources/last_model.h5",
|
||||
overwrite=True,
|
||||
save_format="h5",
|
||||
)
|
||||
except:
|
||||
print("No model has been saved")
|
||||
print("Agg.Node shutting down...")
|
||||
end_thread = threading.Thread(target=__terminate__)
|
||||
end_thread.start()
|
||||
# myserver.disconnect_all_clients(timeout=None)
|
||||
return "Agg.Node successfully received shutdown command.", 200
|
||||
|
||||
|
||||
@app.route("/upload_kpi01", methods=["POST"])
|
||||
def upload_kpi01():
|
||||
"""for automatic averaging if needed again
|
||||
received01 += 1
|
||||
if received01 != 1:
|
||||
kpi01_value = (kpi01_value*((received01-1)/received01)) + (((request.json["kpi01"] - uc6_01_start)/1000000000)/received01)
|
||||
print(f"KPI01 average so far: {kpi01_value}")
|
||||
else: kpi01_value = (request.json["kpi01"] - uc6_01_start)/1000000000
|
||||
return "", 200
|
||||
"""
|
||||
l_kpi1.append((request.json["kpi01"] - uc6_01_start) / 1000000000)
|
||||
if (
|
||||
current_training_round != 1
|
||||
): # Skipping the measurement for the first round as it is inaccurate because of the starting process
|
||||
kpis[current_training_round, 1, len(l_kpi1) - 1] = (
|
||||
request.json["kpi01"] - uc6_01_start
|
||||
) / 1000000000
|
||||
return "", 200
|
||||
|
||||
|
||||
@app.route("/upload_kpi02", methods=["POST"])
|
||||
def upload_kpi02():
|
||||
tmp = (request.json["kpi02"] - (uc6_02_help_end - uc6_02_help_start)) / 1000000000
|
||||
l_kpi2.append(tmp)
|
||||
kpis[current_training_round, 2, len(l_kpi2) - 1] = tmp
|
||||
return "", 200
|
||||
|
||||
|
||||
@app.route("/upload_kpi04", methods=["POST"])
|
||||
def upload_kpi04():
|
||||
try:
|
||||
l_kpi4.append(request.json["kpi04"])
|
||||
kpis[current_training_round, 4, len(l_kpi4) - 1] = request.json["kpi04"]
|
||||
except:
|
||||
pass
|
||||
return "", 200
|
||||
|
||||
|
||||
@app.route("/upload_kpi05", methods=["POST"])
|
||||
def upload_kpi05():
|
||||
l_kpi5.append(request.json["kpi05"])
|
||||
kpis[current_training_round, 5, len(l_kpi5) - 1] = request.json["kpi05"]
|
||||
return "", 200
|
||||
|
||||
|
||||
@app.route("/get_status", methods=["GET"])
|
||||
def get_status():
|
||||
try:
|
||||
with open("Round_report.txt", "r") as file:
|
||||
report = file.read()
|
||||
return report
|
||||
except FileNotFoundError:
|
||||
return "No report available", 200
|
||||
except Exception as e:
|
||||
return f"An error occurred: {e}", 500
|
||||
|
||||
|
||||
def __terminate__():
|
||||
sleep(2)
|
||||
os._exit(0)
|
||||
|
||||
|
||||
def run_flask():
|
||||
app.run(host="0.0.0.0", port=dmlo_port)
|
||||
|
||||
|
||||
param_received = threading.Event()
|
||||
flask_thread = threading.Thread(target=run_flask)
|
||||
flask_thread.setDaemon(True)
|
||||
flask_thread.start()
|
||||
param_received.wait()
|
||||
|
||||
local_training = param_file["hyperparam_epochs"]
|
||||
epochs = param_file["num_epochs"]
|
||||
req_clients = param_file["training_clients_per_round"] # Number of clients to train
|
||||
# highest_req_clients = req_clients # the highest number of clinets a round has had so far (to resize the KPI matrix if needed)
|
||||
hyperparam_learning_rate = param_file["hyperparam_learning_rate"]
|
||||
hyperparam_batch_size = param_file["hyperparam_batch_size"]
|
||||
ml_model = param_file["ml_model"]
|
||||
kpis = np.empty((epochs + 1, 12, 8), dtype=object)
|
||||
print("Parameters loaded")
|
||||
q_alpha = 0.95
|
||||
n_features = 3
|
||||
n_future = 1
|
||||
n_past = 400
|
||||
|
||||
|
||||
def save_kpis():
|
||||
try:
|
||||
np.save("kpis.npy", kpis)
|
||||
except:
|
||||
print("No KPIs recorded so far.")
|
||||
|
||||
|
||||
def save_round_report(round_status):
|
||||
all_round_reports[f"Round {current_training_round}"] = round_status
|
||||
try:
|
||||
with open("Round_report.txt", "w") as file:
|
||||
json.dump(all_round_reports, file, indent=4)
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
|
||||
|
||||
class QuantileMetric(tf.keras.metrics.Metric):
|
||||
def __init__(self, name="quantile_metric", **kwargs):
|
||||
super(QuantileMetric, self).__init__(name=name, **kwargs)
|
||||
self.quantile_metric = self.add_weight(
|
||||
name="quantile_metric", initializer="zeros"
|
||||
)
|
||||
self.quantile_metric_count = self.add_weight(
|
||||
name="quantile_metric_count", initializer="zeros"
|
||||
)
|
||||
|
||||
def update_state(self, y_true, y_pred, sample_weight=None):
|
||||
quantileCondition = tf.math.greater(y_true, tf.squeeze(y_pred))
|
||||
qc = tf.math.reduce_sum(tf.cast(quantileCondition, tf.float32))
|
||||
self.quantile_metric.assign_add(qc)
|
||||
self.quantile_metric_count.assign_add(
|
||||
tf.cast(tf.size(quantileCondition), tf.float32)
|
||||
)
|
||||
|
||||
def result(self):
|
||||
return self.quantile_metric / self.quantile_metric_count
|
||||
|
||||
def reset_state(self):
|
||||
self.quantile_metric.assign(0.0)
|
||||
self.quantile_metric_count.assign(0)
|
||||
|
||||
|
||||
def tilted_loss(y_true, y_pred):
|
||||
q = q_alpha
|
||||
e = y_true - y_pred
|
||||
tl = tf.stack([q * e, (q - 1) * e])
|
||||
e_max = tf.math.reduce_max(tl, axis=0, keepdims=True)
|
||||
return tf.reduce_mean(e_max)
|
||||
|
||||
|
||||
""" Choosing GPU
|
||||
gpu_id = 0 # Index of the GPU you want to use
|
||||
physical_devices = tf.config.list_physical_devices('GPU')
|
||||
print(physical_devices)
|
||||
tf.config.set_visible_devices(physical_devices[gpu_id], 'GPU')
|
||||
tf.config.experimental.set_memory_growth(physical_devices[gpu_id], True)
|
||||
"""
|
||||
|
||||
|
||||
def main() -> None:
|
||||
global best_model
|
||||
print("Inializing Model")
|
||||
best_model = tf.keras.models.load_model(ml_model, compile=False)
|
||||
|
||||
print("Model loaded")
|
||||
|
||||
opt = tf.keras.optimizers.Adam(learning_rate=hyperparam_learning_rate)
|
||||
best_model.compile(
|
||||
optimizer=opt,
|
||||
loss=[tilted_loss],
|
||||
metrics=[QuantileMetric(), keras.metrics.MeanAbsoluteError()],
|
||||
)
|
||||
|
||||
print("Model Compiled")
|
||||
|
||||
class CustomStrategy(fl.server.strategy.FedAdagrad):
|
||||
def aggregate_fit(self, rnd, results, failures):
|
||||
uc6_03_start = time_ns()
|
||||
aggregated_parameters = super().aggregate_fit(rnd, results, failures)
|
||||
uc6_03_end = time_ns()
|
||||
global kpi_uc6_03
|
||||
kpi_uc6_03 = (
|
||||
(uc6_03_end - uc6_03_start) / 1000000000
|
||||
) # Time required to aggregate all locally trained models sent by the OBUs in sec (Target <5s)
|
||||
kpis[current_training_round, 3, 0] = kpi_uc6_03
|
||||
|
||||
per_client_accuracy = []
|
||||
per_client_loss = []
|
||||
clients_order = [] # To map the accuracy and loss to a client ID (n'th ID to the n'th accuracy/loss)
|
||||
for result in results:
|
||||
client_info = result[1].metrics
|
||||
clients_order.append(client_info["id"])
|
||||
per_client_accuracy.append(client_info["accuracy"])
|
||||
per_client_loss.append(client_info["loss"])
|
||||
round_status = {
|
||||
"is_completed": "True",
|
||||
"current_accuracy": accuracy_perc,
|
||||
"current_loss": loss_perc,
|
||||
"lost_clients": len(failures),
|
||||
"clients_order": clients_order,
|
||||
"per_client_accuracy": per_client_accuracy,
|
||||
"per_client_loss": per_client_loss,
|
||||
}
|
||||
save_round_report(round_status)
|
||||
kpi_uc6_11 = round(
|
||||
100 - ((len(failures) / (len(results) + len(failures))) * 100), 1
|
||||
) # The % of successfully uploaded trained models for a certain round (Target >90%)
|
||||
kpis[current_training_round, 11, 0] = kpi_uc6_11
|
||||
list_kpi_11.append(kpi_uc6_11)
|
||||
kpi_uc6_10 = sum(list_kpi_11) / len(
|
||||
list_kpi_11
|
||||
) # The % of successfully uploaded trained models in total (Target >90%)
|
||||
kpis[current_training_round, 10, 0] = kpi_uc6_10
|
||||
|
||||
return aggregated_parameters
|
||||
|
||||
strategy = CustomStrategy(
|
||||
evaluate_fn=get_evaluate_fn(best_model),
|
||||
on_fit_config_fn=fit_config,
|
||||
initial_parameters=fl.common.ndarrays_to_parameters(best_model.get_weights()),
|
||||
)
|
||||
|
||||
class GetPropertiesIns:
|
||||
"""Properties request for a client."""
|
||||
|
||||
def __init__(self, config: Config):
|
||||
self.config = config
|
||||
|
||||
test: GetPropertiesIns = GetPropertiesIns(config={"server_round": 1})
|
||||
|
||||
class Criterion(ABC):
|
||||
"""Abstract class which allows subclasses to implement criterion
|
||||
sampling."""
|
||||
|
||||
def select(self, client: ClientProxy) -> bool:
|
||||
"""Decide whether a client should be eligible for sampling or not."""
|
||||
# if client.get_properties(ins=test, timeout = None).properties["client_id"] in eligible_clients_ids: #This line makes the selection logic on the server side but needs clients to be connected first. In the final test version, the logic is elsewhere. This function just uses the previous selection
|
||||
if (
|
||||
client.get_properties(ins=test, timeout=None).properties["client_id"]
|
||||
in selected_clients_ids
|
||||
):
|
||||
return True
|
||||
else:
|
||||
# # Code to debug clients not being selected for training despite selecting their ID (first thought: ID as str compared to ID as int will always return false)
|
||||
# print(f"Rejected: _{client.get_properties(ins=test, timeout = None).properties['client_id']}_ with the list being:")
|
||||
# for i in selected_clients_ids:
|
||||
# print(f"_{i}_")
|
||||
return False
|
||||
|
||||
c = Criterion()
|
||||
|
||||
class CustomClientManager(SimpleClientManager):
|
||||
def sample(
|
||||
self,
|
||||
num_clients: int = 2, # Number of clients currently connected to the server
|
||||
rq_clients: int = req_clients, # Number of clients to train (added)
|
||||
min_num_clients: int = 3,
|
||||
min_wait: int = req_clients, # Number of clients to have before beginning the selection (added)
|
||||
criterion: [Criterion] = c,
|
||||
) -> List[ClientProxy]:
|
||||
"""Sample a number of Flower ClientProxy instances."""
|
||||
# Block until at least num_clients are connected.
|
||||
if min_wait is None:
|
||||
min_wait = num_clients
|
||||
self.wait_for(min_wait)
|
||||
print(f"{min_wait} clients connected.")
|
||||
|
||||
connection_attempts = 40 # Helper variable to give the OBUs more time to start and connect to the agg.node
|
||||
while connection_attempts != 0:
|
||||
# Sample clients which meet the criterion
|
||||
available_cids = list(self.clients)
|
||||
if criterion is not None:
|
||||
available_cids = [
|
||||
cid
|
||||
for cid in available_cids
|
||||
if criterion.select(self.clients[cid])
|
||||
]
|
||||
|
||||
if rq_clients > len(available_cids):
|
||||
log(
|
||||
INFO,
|
||||
"Sampling failed: number of available clients"
|
||||
" (%s) is less than number of requested clients (%s).",
|
||||
len(available_cids),
|
||||
rq_clients,
|
||||
)
|
||||
connection_attempts -= 1
|
||||
print(
|
||||
f"Retrying in 5 seconds. Attempts left: {connection_attempts}"
|
||||
)
|
||||
sleep(5)
|
||||
else:
|
||||
break
|
||||
|
||||
if rq_clients > len(available_cids):
|
||||
return []
|
||||
|
||||
sampled_cids = available_cids
|
||||
return [self.clients[cid] for cid in sampled_cids]
|
||||
|
||||
fl.server.start_server(
|
||||
server_address=f"0.0.0.0:{flwr_port}",
|
||||
config=fl.server.ServerConfig(num_rounds=epochs),
|
||||
strategy=strategy,
|
||||
client_manager=CustomClientManager(),
|
||||
)
|
||||
|
||||
|
||||
def get_evaluate_fn(best_model):
|
||||
"""Return an evaluation function for server-side evaluation."""
|
||||
|
||||
# The `evaluate` function will be called after every round
|
||||
def evaluate(
|
||||
server_round: int,
|
||||
parameters: fl.common.NDArrays,
|
||||
config: Dict[str, fl.common.Scalar],
|
||||
) -> Optional[Tuple[float, Dict[str, fl.common.Scalar]]]:
|
||||
global uc6_02_help_start
|
||||
uc6_02_help_start = (
|
||||
time_ns()
|
||||
) # Time to be substracted as processing time to know the model upload time
|
||||
best_model.set_weights(parameters) # Update model with the latest parameters
|
||||
|
||||
df_final = pd.read_csv("../resources/test.csv")
|
||||
df_train = pd.read_csv("../resources/data.csv")
|
||||
# train test validation split
|
||||
test_df = df_final
|
||||
# Scaling the dataframe
|
||||
test = test_df
|
||||
scalers = {}
|
||||
|
||||
# Scaling train data
|
||||
for i in test_df.columns:
|
||||
scaler = MinMaxScaler(feature_range=(-1, 1))
|
||||
s_s = scaler.fit_transform(test[i].values.reshape(-1, 1))
|
||||
s_s = np.reshape(s_s, len(s_s))
|
||||
scalers["scaler_" + i] = scaler
|
||||
test[i] = s_s
|
||||
|
||||
def split_series(series, n_past, n_future):
|
||||
X, y = list(), list()
|
||||
# Loop to create array of every observations (past) and predictions (future) for every datapoint
|
||||
for window_start in range(len(series)):
|
||||
# Calculating boundaries for each datapoint
|
||||
past_end = window_start + n_past
|
||||
future_end = past_end + n_future
|
||||
# Loop will end if the number of datapoints is less than observations (past)
|
||||
if future_end > len(series):
|
||||
break
|
||||
past, future = (
|
||||
series[window_start:past_end, :],
|
||||
series[past_end:future_end, :],
|
||||
)
|
||||
X.append(past)
|
||||
y.append(future)
|
||||
return np.array(X), np.array(y)
|
||||
|
||||
X_test, y_test = split_series(test.values, n_past, n_future)
|
||||
X_test = X_test.reshape((X_test.shape[0], X_test.shape[1], n_features))
|
||||
y_test = y_test.reshape((y_test.shape[0], y_test.shape[1], n_features))
|
||||
|
||||
print(X_test.shape)
|
||||
print(y_test.shape)
|
||||
|
||||
y_test_sliced = y_test[:, :, 2]
|
||||
|
||||
np.save("X_test_server.npy", X_test)
|
||||
np.save("y_test_server.npy", y_test_sliced)
|
||||
|
||||
loss, metric, error = best_model.evaluate(X_test, y_test_sliced)
|
||||
pred = best_model.predict(X_test)
|
||||
pred_copies = np.repeat(pred, 3, axis=-1)
|
||||
pred_copies = np.expand_dims(pred_copies, axis=1)
|
||||
for index, i in enumerate(test_df.columns):
|
||||
scaler = scalers["scaler_" + i]
|
||||
pred_copies[:, :, index] = scaler.inverse_transform(
|
||||
pred_copies[:, :, index]
|
||||
)
|
||||
y_test[:, :, index] = scaler.inverse_transform(y_test[:, :, index])
|
||||
np.save("prediction_server.npy", pred_copies)
|
||||
np.save("test_server.npy", y_test)
|
||||
|
||||
global loss_perc, accuracy_perc
|
||||
loss_perc = loss
|
||||
accuracy_perc = error
|
||||
|
||||
save_kpis()
|
||||
return loss, {"accuracy": error}
|
||||
|
||||
return evaluate
|
||||
|
||||
|
||||
def fit_config(server_round: int):
|
||||
"""Return training configuration dict for each round.
|
||||
Keep batch size fixed at 2048, perform two rounds of training with one
|
||||
local epoch, increase to two local epochs afterwards.
|
||||
"""
|
||||
global \
|
||||
current_training_round, \
|
||||
uc6_02_help_end, \
|
||||
uc6_01_start, \
|
||||
l_kpi1, \
|
||||
l_kpi2, \
|
||||
l_kpi4, \
|
||||
l_kpi5
|
||||
current_training_round = server_round
|
||||
l_kpi1, l_kpi2, l_kpi4, l_kpi5 = [], [], [], []
|
||||
uc6_02_help_end = time_ns()
|
||||
|
||||
uc6_01_start = time_ns()
|
||||
config = {
|
||||
"batch_size": hyperparam_batch_size,
|
||||
"local_epochs": local_training,
|
||||
}
|
||||
return config
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user