Tutorial for a federated learning using flower and pandas.
Federated learning is an approach to machine learning that allows models to be trained in a distributed way across multiple devices or local servers, without having to gather and transfer the raw data to a central server. Instead, models are sent to local devices, where they are trained on local data, and then model updates are aggregated to form a global model. This approach has a number of advantages, including the protection of data confidentiality, reduced bandwidth requirements and the ability to process geographically distributed data.
-
To illustrate this, let's take the example of text prediction on smartphones. In a centralised model, all the text examples have to be sent to a central server for processing, which poses problems of confidentiality and latency. In contrast, with federated learning, text prediction models can be trained locally on each smartphone, using only the data available on that device. In this way, text predictions can be improved without compromising the confidentiality of user data.
-
Another example concerns autonomous vehicles. In a centralised model, the driving data from each vehicle has to be sent to a central server to train the autonomous driving model, which poses problems of confidentiality and latency, as well as requiring a lot of bandwidth. With federated learning, each vehicle can train its own autonomous driving model using only local driving data, and then model updates can be aggregated to form an improved global model, without compromising the confidentiality of driver data.
In summary, federated learning makes it possible to take advantage of distributed data while preserving data confidentiality, reducing bandwidth requirements and enabling distributed processing of geographically dispersed data.
To begin, ensure you have the necessary libraries installed. You can install them using pip:
pip install numpy pandas flwr flwr_datasets
Next, let's set up the Flower client. Create a file called client.py
and add the following code:
import os
import argparse
from typing import Dict, List, Tuple
import numpy as np
import pandas as pd
import flwr as fl
from flwr_datasets import FederatedDataset
column_names = ["sepal_length", "sepal_width"]
def compute_hist(df: pd.DataFrame, col_name: str) -> np.ndarray:
freqs, _ = np.histogram(df[col_name])
return freqs
# Define Flower client
class FlowerClient(fl.client.NumPyClient):
def __init__(self, X: pd.DataFrame):
self.X = X
def fit(
self, parameters: List[np.ndarray], config: Dict[str, str]
) -> Tuple[List[np.ndarray], int, Dict]:
hist_list = []
# Execute query locally
for c in self.X.columns:
hist = compute_hist(self.X, c)
hist_list.append(hist)
return (
hist_list,
len(self.X),
{},
)
if __name__ == "__main__":
N_CLIENTS = os.environ.get("N_CLIENTS", 10)
parser = argparse.ArgumentParser(description="Flower")
parser.add_argument(
"--partition-id",
type=int,
choices=range(0, N_CLIENTS),
required=True,
help="Specifies the partition id of artificially partitioned datasets.",
)
parser.add_argument(
"--server-address",
type=str,
required=True,
help="The address of the Flower server. server + port",
)
args = parser.parse_args()
partition_id = args.partition_id
server_address = args.server_address
# Load the partition data
fds = FederatedDataset(dataset="hitorilabs/iris", partitioners={"train": N_CLIENTS})
dataset = fds.load_partition(partition_id, "train").with_format("pandas")[:]
# Use just the specified columns
X = dataset[column_names]
# Start Flower client
fl.client.start_client(
server_address=server_address,
client=FlowerClient(X).to_client(),
)
Now, let's set up the Flower server. Create a file called server.py
and add the following code:
import os
from typing import Dict, List, Optional, Tuple, Union
import numpy as np
import flwr as fl
from flwr.common import (
EvaluateIns,
EvaluateRes,
FitIns,
FitRes,
Parameters,
Scalar,
ndarrays_to_parameters,
parameters_to_ndarrays,
)
from flwr.server.client_manager import ClientManager
from flwr.server.client_proxy import ClientProxy
from flwr.server.strategy import Strategy
class FedAnalytics(Strategy):
def initialize_parameters(
self, client_manager: Optional[ClientManager] = None
) -> Optional[Parameters]:
return None
def configure_fit(
self, server_round: int, parameters: Parameters, client_manager: ClientManager
) -> List[Tuple[ClientProxy, FitIns]]:
config = {}
fit_ins = FitIns(parameters, config)
clients = client_manager.sample(num_clients=2, min_num_clients=2)
return [(client, fit_ins) for client in clients]
def aggregate_fit(
self,
server_round: int,
results: List[Tuple[ClientProxy, FitRes]],
failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
) -> Tuple[Optional[Parameters], Dict[str, Scalar]]:
# Get results from fit
# Convert results
values_aggregated = [
(parameters_to_ndarrays(fit_res.parameters)) for _, fit_res in results
]
length_agg_hist = 0
width_agg_hist = 0
for val in values_aggregated:
length_agg_hist += val[0]
width_agg_hist += val[1]
ndarr = np.concatenate(
(["Length:"], length_agg_hist, ["Width:"], width_agg_hist)
)
return ndarrays_to_parameters(ndarr), {}
def evaluate(
self, server_round: int, parameters: Parameters
) -> Optional[Tuple[float, Dict[str, Scalar]]]:
agg_hist = [arr.item() for arr in parameters_to_ndarrays(parameters)]
return 0, {"Aggregated histograms": agg_hist}
def configure_evaluate(
self, server_round: int, parameters: Parameters, client_manager: ClientManager
) -> List[Tuple[ClientProxy, EvaluateIns]]:
pass
def aggregate_evaluate(
self,
server_round: int,
results: List[Tuple[ClientProxy, EvaluateRes]],
failures: List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]],
) -> Tuple[Optional[float], Dict[str, Scalar]]:
pass
num_rounds = int(os.getenv("NUM_ROUNDS", 2))
# Start Flower server
fl.server.start_server(
server_address="0.0.0.0:8080",
config=fl.server.ServerConfig(num_rounds=num_rounds),
strategy=FedAnalytics(),
)
Explanation:
-
client.py: This file sets up the Flower client. It imports necessary libraries, defines a custom client class, and implements the fit method to compute histograms locally. The client connects to the Flower server specified via command line arguments (--partition-id for the dataset partition ID and --server-address for the server address).
-
server.py: This file sets up the Flower server. It defines a custom federated learning strategy (FedAnalytics) which configures the fit process, aggregates fit results, and evaluates aggregated parameters. The server starts on address 0.0.0.0:8080 with a specified number of rounds for federated learning.
These files provide a basic setup for running federated learning experiments using Flower.
With both client and server ready, we can now run everything and see federated learning in action. FL systems usually have a server and multiple clients. We therefore have to start the server first:
python server.py
Once the server is running we can start the clients in different terminals. Open a new terminal and start the first client:
python client.py --partition-id 0 --server-address 127.0.0.1:8080
Open another terminal and start the second client:
python /client.py --partition-id 0 --server-address 127.0.0.1:8080
Start by cloning the example project. We prepared a single-line command that you can copy into your shell which will checkout the example for you:
$ git clone https://github.com/CCBDA-UPC/Research-Projects-2024.git
After cloning the repository, you can run the containers using the following command:
cd tutorial-1
$ docker-compose up --build
Note
Docker compose is a tool for defining and running multi-container Docker applications. With Compose, you use a YAML file to configure your application's services. Then, with a single command, you create and start all the services from your configuration. More information here https://docs.docker.com/compose/
The docker-compose.yml
file defines the services for the server and clients.
- The server service is built from the
server/
directory (uses the script mentioned in the last part)server: build: server/. container_name: server environment: - NUM_ROUNDS=3 networks: - federated_learning
- The client service is built from the
client/
directory. The server service is configured to wait for 3 rounds before generating the global model. The client service is configured to start 10 clients, each with a unique partition ID and server address.client-1: build: client/. restart: on-failure environment: - SERVER_ADDRESS=server:8080 # Server address - PARTITION_ID=1 # Partition ID - NUMBER_OF_CLIENTS=2 # Number of clients networks: - federated_learning
- The
federated_learning
network is created to allow communication between the server and clients.networks: federated_learning: driver: bridge
- The
docker compose up --build
command builds the Docker images and starts the containers. - You can read the logs of the server and clients using the following command:
$ docker compose logs -f
Important
If you use docker-compose v1 you can use the following command:
$ docker-compose up --build
$ docker-compose logs -f
If you use docker-compose v2 you can use the following command:
$ docker compose up --build
$ docker compose logs -f
In real-world scenarios, devices often operate independently. In federated learning, the server must be operational before any client attempts to update the global model. This tutorial demonstrates simulating a dynamic system where the server runs continuously while clients randomly update the model when new data is received. The server waits for 8 updates before generating the global model.
-
Create a Docker Network: This step sets up a network for communication between the server and clients.
docker network create federated_learning
-
Build the Server Docker Image: Build the Docker image for the server.
docker build -t federated_learning_server:latest server/.
-
Run the Server Docker Container: Launch the server Docker container, specifying the number of rounds (updates) to wait for before generating the global model.
docker run --name server \ --env NUM_ROUNDS=8 \ --network federated_learning \ federated_learning_server echo "Server started."
-
Create a Bash Script
run_random_clients.sh
:#!/bin/bash docker build -t federated_learning_client:latest client/. start_client() { client_id=$1 partition_id=$2 client_name="client-${client_id}" docker run -d --name "$client_name" \ --env SERVER_ADDRESS=server:8080 \ --env PARTITION_ID="$partition_id" \ --env NUMBER_OF_CLIENTS=2 \ --network federated_learning \ --restart on-failure \ federated_learning_client echo "Started $client_name" } stop_client() { client_id=$1 client_name="client-${client_id}" docker stop "$client_name" > /dev/null docker rm "$client_name" > /dev/null echo "Stopped $client_name" } create_random_clients() { for ((i=0; i<10; i++)); do client_id=$((RANDOM % 1000)) start_client "$client_id" "$i" sleep 1 done } create_random_clients
-
Execute the Script in a New Terminal:
./run_random_clients.sh
This script randomly creates and starts multiple client instances, each attempting to update the model when new data is available. Adjust the number of clients and sleep duration according to your requirements.