Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FedProx implementation #837

Draft
wants to merge 16 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
"author": "",
"license": "ISC",
"dependencies": {
"server": "*",
"@epfml/discojs-node": "*",
"csv-parse": "^5.6.0",
"immutable": "4",
"server": "*",
"tslib": "2"
},
"devDependencies": {
Expand Down
10 changes: 6 additions & 4 deletions cli/src/args.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ const argExample = 'e.g. npm start -- -u 2 -e 3 # runs 2 users for 3 epochs'

const unsafeArgs = parse<BenchmarkUnsafeArguments>(
{
task: { type: String, alias: 't', description: 'Task: titanic, simple_face, cifar10 or lus_covid', defaultValue: 'simple_face' },
numberOfUsers: { type: Number, alias: 'u', description: 'Number of users', defaultValue: 1 },
task: { type: String, alias: 't', description: 'Task: tinder_dog, titanic, simple_face, cifar10, llm_task or lus_covid', defaultValue: 'tinder_dog' },
numberOfUsers: { type: Number, alias: 'u', description: 'Number of users', defaultValue: 2 },
epochs: { type: Number, alias: 'e', description: 'Number of epochs', defaultValue: 10 },
roundDuration: { type: Number, alias: 'r', description: 'Round duration (in epochs)', defaultValue: 2 },
batchSize: { type: Number, alias: 'b', description: 'Training batch size', defaultValue: 10 },
Expand All @@ -37,11 +37,13 @@ const unsafeArgs = parse<BenchmarkUnsafeArguments>(
)

const supportedTasks = Map(
Set.of<TaskProvider<"image"> | TaskProvider<"tabular">>(
Set.of<TaskProvider<"image"> | TaskProvider<"tabular"> | TaskProvider<"text">>(
defaultTasks.cifar10,
defaultTasks.lusCovid,
defaultTasks.simpleFace,
defaultTasks.titanic,
defaultTasks.tinderDog,
defaultTasks.wikitext,
).map((t) => [t.getTask().id, t]),
);

Expand Down Expand Up @@ -69,4 +71,4 @@ export const args: BenchmarkArguments = {
},
getModel: () => provider.getModel(),
},
};
};
38 changes: 20 additions & 18 deletions cli/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ import type {
TaskProvider,
} from "@epfml/discojs";
import { Disco, aggregator as aggregators, client as clients } from '@epfml/discojs'
import { Server } from 'server'

import { getTaskData } from './data.js'
import { getTaskData, loadTinderDogData } from './data.js'
import { args } from './args.js'

// Array.fromAsync not yet widely used (2024)
Expand Down Expand Up @@ -49,23 +48,26 @@ async function main<D extends DataType>(
console.log(`Started ${task.trainingInformation.scheme} training of ${task.id}`)
console.log({ args })

const [server, url] = await new Server().serve(undefined, provider)
const url = new URL('http://localhost:8080/')

const data = await getTaskData(task)

const logs = await Promise.all(
Range(0, numberOfUsers).map(async (_) => await runUser(task, url, data)).toArray()
)

if (args.save) {
const fileName = `${task.id}_${numberOfUsers}users.csv`;
await fs.writeFile(fileName, JSON.stringify(logs, null, 2));
if (task.id === 'tinder_dog') {
const dataSplits = await Promise.all(
Range(0, numberOfUsers).map(async i => loadTinderDogData(i))
)
const _ = await Promise.all(
dataSplits.map(async data => runUser(task, url, data as Dataset<DataFormat.Raw[D]>))
)
} else {
const data = await getTaskData(task)

const logs = await Promise.all(
Range(0, numberOfUsers).map(async (_) => await runUser(task, url, data)).toArray()
)
if (args.save) {
const fileName = `${task.id}_${numberOfUsers}users.csv`;
await fs.writeFile(fileName, JSON.stringify(logs, null, 2));
}
}
console.log('Shutting down the server...')
await new Promise((resolve, reject) => {
server.once('close', resolve)
server.close(reject)
})
}

main(args.provider, args.numberOfUsers).catch(console.error)
main(args.provider, args.numberOfUsers).catch(console.error)
60 changes: 55 additions & 5 deletions cli/src/data.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import path from "node:path";

import fs from 'node:fs/promises'
import { parse } from 'csv-parse';
import { Dataset } from "@epfml/discojs";
import type {
Dataset,
DataFormat,
DataType,
Image,
Task,
Text,
} from "@epfml/discojs";
import { loadCSV, loadImagesInDir } from "@epfml/discojs-node";
import { Repeat } from "immutable";
import { loadCSV, loadImage, loadImagesInDir, loadText } from "@epfml/discojs-node";
import { Repeat, Map } from "immutable";

async function loadSimpleFaceData(): Promise<Dataset<DataFormat.Raw["image"]>> {
const folder = path.join("..", "datasets", "simple_face");
Expand Down Expand Up @@ -36,6 +38,52 @@ async function loadLusCovidData(): Promise<Dataset<DataFormat.Raw["image"]>> {
return positive.chain(negative);
}

async function loadWikitextData(): Promise<Dataset<DataFormat.Raw["text"]>> {
const folder = path.join("..", "datasets", "wikitext");
const dataset: Dataset<Text> = loadText(path.join(folder, "wiki.train.tokens"))
return Promise.resolve(dataset)
}

export async function loadTinderDogData(split: number): Promise<Dataset<DataFormat.Raw["image"]>> {
const folder = path.join("..", "datasets", "tinder_dog", `${split + 1}`);
console.log(`Reading data split ${folder}`)
const csvPath = path.join(folder, 'labels.csv')

const headers = ['filename', 'label'];
const fileContent = await fs.readFile(csvPath, { encoding: 'utf-8' });
const csvContent = await new Promise<{ filename: string, label: number }[]>((resolve, reject) => {
parse(fileContent, {
delimiter: ',',
columns: headers,
}, (error, result: { filename: string, label: number }[]) => {
if (error) {
console.error(error);
reject(error)
}
resolve(result)
});
})
const imgToLabel = Map(csvContent.map(entry =>
[entry.filename, entry.label] as const)
);
const fileExtensions = [".png", ".jpg", ".jpeg"];
const imagesFile = (await fs.readdir(folder)).filter(file => {
for (const ext of fileExtensions) if(file.endsWith(ext)) return true;
return false;
})
const labels = imagesFile.map(img => {
const label = imgToLabel.get(img.slice(0, -4)) // remove the file extension
if (label === undefined) throw Error(`Image ${img} not found in CSV`)
return label.toString()
})
const imgPaths = imagesFile.map(imgName => path.join(folder, imgName))
console.log(`Found ${imgPaths.length} in split ${split}`)
const images = await Promise.all(imgPaths.map(imgPath => loadImage(imgPath)))

return new Dataset(images).zip(labels)
}


export async function getTaskData<D extends DataType>(
task: Task<D>,
): Promise<Dataset<DataFormat.Raw[D]>> {
Expand All @@ -52,7 +100,9 @@ export async function getTaskData<D extends DataType>(
).zip(Repeat("cat")) as Dataset<DataFormat.Raw[D]>;
case "lus_covid":
return (await loadLusCovidData()) as Dataset<DataFormat.Raw[D]>;
case "llm_task":
return (await loadWikitextData()) as Dataset<DataFormat.Raw[D]>;
default:
throw new Error(`Data loader for ${task.id} not implemented.`);
}
}
}
3 changes: 3 additions & 0 deletions datasets/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@

# LUS Covid
/lus_covid/

# GDHF demo
/tinder_dog/
3 changes: 1 addition & 2 deletions discojs/src/client/federated/federated_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { Client, shortenId } from "../client.js";
import { type, type ClientConnected } from "../messages.js";
import {
waitMessage,
waitMessageWithTimeout,
WebSocketServer,
} from "../event_connection.js";
import * as messages from "./messages.js";
Expand Down Expand Up @@ -75,7 +74,7 @@ export class FederatedClient extends Client {
const {
id, waitForMoreParticipants, payload,
round, nbOfParticipants
} = await waitMessageWithTimeout(this.server, type.NewFederatedNodeInfo);
} = await waitMessage(this.server, type.NewFederatedNodeInfo);

// This should come right after receiving the message to make sure
// we don't miss a subsequent message from the server
Expand Down
1 change: 1 addition & 0 deletions discojs/src/default_tasks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ export { mnist } from './mnist.js'
export { simpleFace } from './simple_face.js'
export { titanic } from './titanic.js'
export { wikitext } from './wikitext.js'
export { tinderDog } from './tinder_dog.js'
17 changes: 9 additions & 8 deletions discojs/src/default_tasks/lus_covid.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ export const lusCovid: TaskProvider<'image'> = {

// Model architecture from tensorflow.js docs:
// https://codelabs.developers.google.com/codelabs/tfjs-training-classfication/index.html#4
async getModel (): Promise<Model<'image'>> {
async getModel(): Promise<Model<'image'>> {
const seed = 42
const imageHeight = 100
const imageWidth = 100
const imageChannels = 3
Expand All @@ -55,7 +56,7 @@ export const lusCovid: TaskProvider<'image'> = {
filters: 8,
strides: 1,
activation: 'relu',
kernelInitializer: 'varianceScaling'
kernelInitializer: tf.initializers.heNormal({ seed })
}))

// The MaxPooling layer acts as a sort of downsampling using max values
Expand All @@ -69,7 +70,7 @@ export const lusCovid: TaskProvider<'image'> = {
filters: 16,
strides: 1,
activation: 'relu',
kernelInitializer: 'varianceScaling'
kernelInitializer: tf.initializers.heNormal({ seed })
}))
model.add(tf.layers.maxPooling2d({ poolSize: [2, 2], strides: [2, 2] }))

Expand All @@ -82,16 +83,16 @@ export const lusCovid: TaskProvider<'image'> = {
// output class.
model.add(tf.layers.dense({
units: numOutputClasses,
kernelInitializer: 'varianceScaling',
activation: 'softmax'
activation: 'softmax',
kernelInitializer: tf.initializers.heNormal({ seed })
}))

model.compile({
optimizer: 'sgd',
optimizer: tf.train.sgd(0.001),
loss: 'binaryCrossentropy',
metrics: ['accuracy']
})

return Promise.resolve(new models.TFJS('image', model))
}
}
}
84 changes: 84 additions & 0 deletions discojs/src/default_tasks/tinder_dog.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import * as tf from '@tensorflow/tfjs'

import type { Model, Task, TaskProvider } from '../index.js'
import { models } from '../index.js'

export const tinderDog: TaskProvider<'image'> = {
getTask (): Task<'image'> {
return {
id: 'tinder_dog',
displayInformation: {
taskTitle: 'GDHF 2024 | TinderDog',
summary: {
preview: 'Which dog is the cutest....or not?',
overview: "Binary classification model for dog cuteness."
},
model: 'The model is a simple Convolutional Neural Network composed of two convolutional layers with ReLU activations and max pooling layers, followed by a fully connected output layer. The data preprocessing reshapes images into 64x64 pixels and normalizes values between 0 and 1',
dataFormatInformation: 'Accepted image formats are .png .jpg and .jpeg.',
dataExampleText: '',
dataExampleImage: 'https://storage.googleapis.com/deai-313515.appspot.com/tinder_dog_preview.png',
sampleDatasetLink: 'https://storage.googleapis.com/deai-313515.appspot.com/tinder_dog.zip',
sampleDatasetInstructions: 'Opening the link should start downloading a zip file which you can unzip. To connect the data, pick one of the data splits (the folder 0 for example) and use the CSV option below to select the file named "labels.csv". You can now connect the images located in the same folder.'
},
trainingInformation: {
epochs: 10,
roundDuration: 2,
validationSplit: 0,
batchSize: 10,
dataType: 'image',
IMAGE_H: 64,
IMAGE_W: 64,
LABEL_LIST: ['Cute dogs', 'Less cute dogs'],
scheme: 'federated',
aggregationStrategy: 'mean',
minNbOfParticipants: 3,
tensorBackend: 'tfjs'
}
}
},


async getModel(): Promise<Model<'image'>> {
const seed = 42
const imageHeight = this.getTask().trainingInformation.IMAGE_H
const imageWidth = this.getTask().trainingInformation.IMAGE_W
const imageChannels = 3

const model = tf.sequential()

model.add(
tf.layers.conv2d({
inputShape: [imageHeight, imageWidth, imageChannels],
kernelSize: 5,
filters: 8,
activation: 'relu',
kernelInitializer: tf.initializers.heNormal({ seed })
})
)
model.add(tf.layers.conv2d({
kernelSize: 5, filters: 16, activation: 'relu',
kernelInitializer: tf.initializers.heNormal({ seed })
}))
model.add(tf.layers.maxPooling2d({ poolSize: 2, strides: 2 }))
model.add(tf.layers.dropout({ rate: 0.25, seed }))

model.add(tf.layers.flatten())
model.add(tf.layers.dense({
units: 32, activation: 'relu',
kernelInitializer: tf.initializers.heNormal({ seed })
}))
model.add(tf.layers.dropout({rate:0.25, seed}))
model.add(tf.layers.dense({
units: 2, activation: 'softmax',
kernelInitializer: tf.initializers.heNormal({ seed })
}))

model.compile({
optimizer: tf.train.adam(0.0005),
loss: 'categoricalCrossentropy',
metrics: ['accuracy']
})

return Promise.resolve(new models.TFJS('image', model))
}
}
28 changes: 4 additions & 24 deletions discojs/src/models/gpt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,30 +80,10 @@ export class GPT extends Model<"text"> {
async #runBatch(
batch: Batched<DataFormat.ModelEncoded["text"]>,
): Promise<BatchLogs> {
const tfBatch = this.#batchToTF(batch);

let logs: tf.Logs | undefined;
await this.model.fitDataset(tf.data.array([tfBatch]), {
epochs: 1,
verbose: 0, // don't pollute
callbacks: {
onEpochEnd: (_, cur) => {
logs = cur;
},
},
});
tf.dispose(tfBatch);
if (logs === undefined) throw new Error("batch didn't gave any logs");

const { loss, acc: accuracy } = logs;
if (loss === undefined || isNaN(loss))
throw new Error("training loss is undefined or NaN");

return {
accuracy,
loss,
memoryUsage: tf.memory().numBytes / 1024 / 1024 / 1024,
};
const {xs, ys} = this.#batchToTF(batch);
const logs = await this.model.trainOnBatch(xs, ys);
tf.dispose([xs, ys])
return this.getBatchLogs(logs)
}

async #evaluate(
Expand Down
Loading