From 50235b051f8b4c6eb8d5aca2d8dd7717ec1e16af Mon Sep 17 00:00:00 2001 From: TM-Squared Date: Mon, 3 Feb 2025 18:07:20 +0100 Subject: [PATCH] Wordcount fonctionnel - Timeout to permit to worker to join cluster - wordcount - sort --- .gitignore | 47 ++++ .idea/.gitignore | 8 + .idea/codeStyles/Project.xml | 7 + .idea/codeStyles/codeStyleConfig.xml | 5 + .idea/misc.xml | 6 + .idea/sbt.xml | 19 ++ .idea/scala_compiler.xml | 7 + .idea/scala_settings.xml | 7 + .idea/vcs.xml | 6 + build.sbt | 15 ++ project/build.properties | 1 + src/main/resources/application.conf | 38 ++++ .../scala/cm/gintou/CborSerializable.scala | 3 + src/main/scala/cm/gintou/GuardianActor.scala | 42 ++++ src/main/scala/cm/gintou/MasterActor.scala | 211 ++++++++++++++++++ src/main/scala/cm/gintou/WordCountApp.scala | 50 +++++ src/main/scala/cm/gintou/WorkerActor.scala | 37 +++ 17 files changed, 509 insertions(+) create mode 100644 .gitignore create mode 100644 .idea/.gitignore create mode 100644 .idea/codeStyles/Project.xml create mode 100644 .idea/codeStyles/codeStyleConfig.xml create mode 100644 .idea/misc.xml create mode 100644 .idea/sbt.xml create mode 100644 .idea/scala_compiler.xml create mode 100644 .idea/scala_settings.xml create mode 100644 .idea/vcs.xml create mode 100644 build.sbt create mode 100644 project/build.properties create mode 100644 src/main/resources/application.conf create mode 100644 src/main/scala/cm/gintou/CborSerializable.scala create mode 100644 src/main/scala/cm/gintou/GuardianActor.scala create mode 100644 src/main/scala/cm/gintou/MasterActor.scala create mode 100644 src/main/scala/cm/gintou/WordCountApp.scala create mode 100644 src/main/scala/cm/gintou/WorkerActor.scala diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bf3788d --- /dev/null +++ b/.gitignore @@ -0,0 +1,47 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr +out/ +!**/src/main/**/out/ +!**/src/test/**/out/ + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache +bin/ +!**/src/main/**/bin/ +!**/src/test/**/bin/ + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store + +### Scala ### +.bsp/ \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml new file mode 100644 index 0000000..919ce1f --- /dev/null +++ b/.idea/codeStyles/Project.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml new file mode 100644 index 0000000..a55e7a1 --- /dev/null +++ b/.idea/codeStyles/codeStyleConfig.xml @@ -0,0 +1,5 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..e0844bc --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/sbt.xml b/.idea/sbt.xml new file mode 100644 index 0000000..4080c49 --- /dev/null +++ b/.idea/sbt.xml @@ -0,0 +1,19 @@ + + + + + + \ No newline at end of file diff --git a/.idea/scala_compiler.xml b/.idea/scala_compiler.xml new file mode 100644 index 0000000..21d1271 --- /dev/null +++ b/.idea/scala_compiler.xml @@ -0,0 +1,7 @@ + + + + + \ No newline at end of file diff --git a/.idea/scala_settings.xml b/.idea/scala_settings.xml new file mode 100644 index 0000000..1b970c7 --- /dev/null +++ b/.idea/scala_settings.xml @@ -0,0 +1,7 @@ + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..8e00222 --- /dev/null +++ b/build.sbt @@ -0,0 +1,15 @@ +ThisBuild / version := "0.1.0-SNAPSHOT" + +ThisBuild / scalaVersion := "3.3.5" + +lazy val root = (project in file(".")) + .settings( + name := "Wordcount", + version := "0.1", + libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-actor-typed" % "2.8.8", + "com.typesafe.akka" %% "akka-cluster-typed" % "2.8.8", + "com.typesafe.akka" %% "akka-serialization-jackson" % "2.8.8", + "ch.qos.logback" % "logback-classic" % "1.5.16", + ) + ) diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..fe69360 --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version = 1.10.7 diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf new file mode 100644 index 0000000..1e1d44f --- /dev/null +++ b/src/main/resources/application.conf @@ -0,0 +1,38 @@ +akka { + actor { + provider = "cluster" + + # Désactive la serialization Java + allow-java-serialization = off + warn-about-java-serialization-usage = on + + # Binde toutes les classes qui héritent de CborSerializable à Jackson CBOR + serialization-bindings { + "cm.gintou.CborSerializable" = jackson-cbor + } + } + + + remote { + artery { + canonical { + hostname = "127.0.0.1" + port = 0 + } + transport = tcp + } + } + + cluster { + # On laisse seed-nodes vide ici. + # On le renseignera via la ligne de commande ou on lancera manuellement des noeuds seed. + seed-nodes = [ + "akka://WordCountCluster@127.0.0.1:2551" + ] + } + + # (Optionnel) Ajustez la compression, etc. + serialization.jackson.cbor { + compression = off + } +} diff --git a/src/main/scala/cm/gintou/CborSerializable.scala b/src/main/scala/cm/gintou/CborSerializable.scala new file mode 100644 index 0000000..df83011 --- /dev/null +++ b/src/main/scala/cm/gintou/CborSerializable.scala @@ -0,0 +1,3 @@ +package cm.gintou + +trait CborSerializable diff --git a/src/main/scala/cm/gintou/GuardianActor.scala b/src/main/scala/cm/gintou/GuardianActor.scala new file mode 100644 index 0000000..4eeeb15 --- /dev/null +++ b/src/main/scala/cm/gintou/GuardianActor.scala @@ -0,0 +1,42 @@ +package cm.gintou + +import akka.actor.typed._ +import akka.actor.typed.scaladsl.Behaviors + +object GuardianActor { + + sealed trait Command + case object Start extends Command + + def apply(role: String, filePathOpt: Option[String]): Behavior[Command] = + Behaviors.setup { context => + // On démarre soit un MasterActor soit un WorkerActor + role match { + case "master" => + filePathOpt match { + case Some(filePath) => + // Crée l'acteur maître + context.log.info(s"[Guardian] Rôle = master, lancement MasterActor avec fichier: $filePath") + val masterRef = context.spawn(MasterActor(filePath), "MasterActor") + // On envoie un message de démarrage + masterRef ! MasterActor.StartProcessing + case None => + context.log.error("Aucun chemin de fichier fourni pour le rôle master !") + Behaviors.stopped + } + + case "worker" => + context.log.info("[Guardian] Rôle = worker, lancement WorkerActor") + context.spawn(WorkerActor(), "WorkerActor") + + case other => + context.log.error(s"Rôle inconnu : $other") + Behaviors.stopped + } + + Behaviors.receiveMessage { + case Start => + Behaviors.same + } + } +} \ No newline at end of file diff --git a/src/main/scala/cm/gintou/MasterActor.scala b/src/main/scala/cm/gintou/MasterActor.scala new file mode 100644 index 0000000..7737ff5 --- /dev/null +++ b/src/main/scala/cm/gintou/MasterActor.scala @@ -0,0 +1,211 @@ +package cm.gintou + +import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler} +import akka.actor.typed.{ActorRef, Behavior} +import akka.cluster.typed.Cluster +import akka.cluster.MemberStatus +import akka.cluster.typed.Subscribe +import akka.cluster.ClusterEvent.MemberUp +import akka.cluster.ClusterEvent.MemberEvent +import akka.cluster.ClusterEvent.MemberRemoved +import akka.cluster.ClusterEvent.MemberJoined +import akka.actor.typed.receptionist.{Receptionist, ServiceKey} +import scala.io.Source +import scala.collection.mutable +import scala.concurrent.duration._ +import java.io.{File, PrintWriter} + +object MasterActor { + + // --- Messages --- + sealed trait Command extends CborSerializable + + case object StartProcessing extends Command + + // Timeout interne pour attendre l’arrivée des workers + private case object WaitForWorkersTimeout extends Command + + case class PartialCount(wordsCount: Map[String, Int]) extends Command + + private case class ListingResponse(listing: Receptionist.Listing) extends Command + private case class ClusterEvent(event: MemberEvent) extends Command + + // --- État interne --- + final case class State( + filePath: String, + lines: List[String], + aggregatedCounts: mutable.Map[String, Int], + knownWorkers: Set[ActorRef[WorkerActor.Command]] + ) + + // --- Comportement principal --- + def apply( + filePath: String, + waitForWorkersDuration: FiniteDuration = 30.seconds + ): Behavior[Command] = Behaviors.withTimers { timers => + Behaviors.setup { context => + // S'abonner aux events du cluster (optionnel, juste pour log) + val cluster = Cluster(context.system) + val clusterSubscriptionAdapter: ActorRef[MemberEvent] = + context.messageAdapter(e => ClusterEvent(e)) + cluster.subscriptions ! Subscribe(clusterSubscriptionAdapter, classOf[MemberEvent]) + + // S'abonner au Receptionist pour découvrir les Workers + val listingAdapter: ActorRef[Receptionist.Listing] = + context.messageAdapter(listing => ListingResponse(listing)) + context.system.receptionist ! Receptionist.Subscribe(WorkerActor.WorkerServiceKey, listingAdapter) + + // État initial + val initialState = State( + filePath = filePath, + lines = Nil, + aggregatedCounts = mutable.Map.empty[String, Int], + knownWorkers = Set.empty + ) + + Behaviors.receiveMessage { + case StartProcessing => + // 1) Lire le fichier + val lines = Source.fromFile(filePath).getLines().toList + context.log.info(s"[MasterActor] StartProcessing -> lecture du fichier $filePath, ${lines.size} lignes") + + val newState = initialState.copy(lines = lines) + + // 2) Démarrer un timer de X secondes pour attendre l’arrivée de workers + timers.startSingleTimer(WaitForWorkersTimeout, WaitForWorkersTimeout, waitForWorkersDuration) + context.log.info( + s"[MasterActor] Timer démarré pour $waitForWorkersDuration. " + + s"En attendant, on espère découvrir des workers..." + ) + + masterBehavior(newState)(timers, context) + + case other => + context.log.info(s"[MasterActor] Reçu un message inattendu avant StartProcessing: $other") + Behaviors.same + } + } + } + + // --- Comportement après StartProcessing --- + private def masterBehavior( + state: State + )( + timers: TimerScheduler[Command], + ctx: ActorContext[Command] + ): Behavior[Command] = Behaviors.receiveMessage { + // Mise à jour de la liste knownWorkers (Receptionist) + case ListingResponse(WorkerActor.WorkerServiceKey.Listing(workers)) => + val newState = state.copy(knownWorkers = workers) + ctx.log.info(s"[MasterActor] Mise à jour du listing : ${workers.size} worker(s) découvert(s).") + masterBehavior(newState)(timers, ctx) + case ListingResponse(otherListing) => + // Tous les autres listings (autres clés ou cas non prévus) + ctx.log.debug(s"[MasterActor] Ignored listing: $otherListing") + Behaviors.same + + // Réception d'un comptage partiel + case partial: PartialCount => + val newState = aggregateCounts(state, partial) + masterBehavior(newState)(timers, ctx) + + // Le timer a expiré + case WaitForWorkersTimeout => + if (state.knownWorkers.isEmpty) { + // Pas de workers -> Stratégie 2 = faire le travail en local + ctx.log.warn(s"[MasterActor] Aucun worker distant détecté après le timeout. " + + "On va créer des workers locaux pour faire le traitement.") + val localWorkerRefs = spawnLocalWorkers(2, ctx) // Ex : 2 workers locaux + val newState = state.copy(knownWorkers = localWorkerRefs) + distributeWork(newState, ctx) + masterBehavior(newState)(timers, ctx) + } else { + ctx.log.info(s"[MasterActor] Timeout expiré, on a déjà ${state.knownWorkers.size} worker(s) -> distribution.") + distributeWork(state, ctx) + Behaviors.same + } + + // On reçoit à nouveau StartProcessing ? (Peu probable) + case StartProcessing => + ctx.log.warn("[MasterActor] StartProcessing reçu mais on a déjà commencé.") + Behaviors.same + + // Événements cluster + case ClusterEvent(event) => + event match { + case MemberUp(member) => + ctx.log.info(s"[MasterActor] Membre UP : ${member.address} (roles=${member.roles})") + case MemberRemoved(member, _) => + ctx.log.info(s"[MasterActor] Membre removed : ${member.address} (roles=${member.roles})") + case _ => + ctx.log.info(s"[MasterActor] Autre event cluster : $event") + } + Behaviors.same + } + + // ---------------------------------------------------------------- + // Méthodes utilitaires + // ---------------------------------------------------------------- + + private def spawnLocalWorkers( + n: Int, + context: ActorContext[Command] + ): Set[ActorRef[WorkerActor.Command]] = { + (1 to n).map { i => + val ref = context.spawn(WorkerActor(), s"LocalWorker-$i") + ref + }.toSet + } + + private def aggregateCounts(state: State, partial: PartialCount): State = { + partial.wordsCount.foreach { + case (word, count) => + val newCount = state.aggregatedCounts.getOrElse(word, 0) + count + state.aggregatedCounts.update(word, newCount) + } + println(s"[MasterActor] Agrégation partielle. Nombre total de mots distincts = ${state.aggregatedCounts.size}") + + // ==== Écriture dans un fichier ==== + storeResultInFile(state, "result.txt") + + state + } + + private def distributeWork(state: State, context: ActorContext[Command]): Unit = { + val totalLines = state.lines.size + val workerCount = state.knownWorkers.size + if (workerCount == 0) { + context.log.warn("[MasterActor] Pas de workers pour distribuer le travail !") + return + } + + val chunkSize = (totalLines.toDouble / workerCount).ceil.toInt + val workersList = state.knownWorkers.toList + + workersList.zipWithIndex.foreach { case (workerRef, idx) => + val start = idx * chunkSize + val end = math.min(start + chunkSize, totalLines) + val subset = state.lines.slice(start, end) + + context.log.info(s"[MasterActor] -> Worker ${idx + 1}/$workerCount reçoit ${subset.size} lignes.") + workerRef ! WorkerActor.ProcessLines(subset, context.self) + } + } + + + private def storeResultInFile(state: State, outputFile: String): Unit = { + val writer = new PrintWriter(new File(outputFile)) + try { + // On trie les clés pour un affichage ordonné + val sortedKeys = state.aggregatedCounts.keys.toList.sorted + sortedKeys.foreach { word => + val count = state.aggregatedCounts(word) + writer.println(s"$word -> $count") + } + } finally { + writer.close() + } + } + + +} diff --git a/src/main/scala/cm/gintou/WordCountApp.scala b/src/main/scala/cm/gintou/WordCountApp.scala new file mode 100644 index 0000000..b5e2c2e --- /dev/null +++ b/src/main/scala/cm/gintou/WordCountApp.scala @@ -0,0 +1,50 @@ +package cm.gintou + +import akka.actor.typed.ActorSystem +import com.typesafe.config.ConfigFactory +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import scala.util.Try + +object WordCountApp { + + def main(args: Array[String]): Unit = { + /* + Paramètres attendus : + 1) role : "master" ou "worker" + 2) port : le port sur lequel ce nœud va écouter + 3) filePath : si role = "master", chemin vers le fichier + + Exemples de lancement en local : + sbt "runMain cm.gintou.WordCountApp master 2551 /chemin/vers/monFichier.txt" + sbt "runMain cm.gintou.WordCountApp worker 2552" + */ + + if (args.length < 2) { + println("Usage:\n WordCountApp master \n WordCountApp worker ") + System.exit(1) + } + + val role = args(0) + val port = args(1).toInt + val filePathOpt = if (role == "master" && args.length >= 3) Some(args(2)) else None + + // On construit une configuration Akka en surchargeant le port et en attribuant un rôle + val config = ConfigFactory.parseString( + s""" + |akka.remote.artery.canonical.port = $port + |akka.cluster.roles = [ "$role" ] + |""".stripMargin + ).withFallback(ConfigFactory.load()) + + // On crée l'ActorSystem avec l'acteur guardian + val system = ActorSystem( + GuardianActor(role, filePathOpt), + "WordCountCluster", + config + ) + + Await.result(system.whenTerminated, Duration.Inf) + } + +} diff --git a/src/main/scala/cm/gintou/WorkerActor.scala b/src/main/scala/cm/gintou/WorkerActor.scala new file mode 100644 index 0000000..43b7938 --- /dev/null +++ b/src/main/scala/cm/gintou/WorkerActor.scala @@ -0,0 +1,37 @@ +package cm.gintou + +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.{ActorRef, Behavior} +import akka.actor.typed.receptionist.{Receptionist, ServiceKey} +import scala.collection.mutable + +object WorkerActor { + + val WorkerServiceKey: ServiceKey[Command] = ServiceKey[Command]("WorkerService") + + sealed trait Command extends CborSerializable + final case class ProcessLines(lines: List[String], replyTo: ActorRef[MasterActor.Command]) extends Command + + def apply(): Behavior[Command] = + Behaviors.setup { context => + context.system.receptionist ! Receptionist.Register(WorkerServiceKey, context.self) + + Behaviors.receiveMessage { + case ProcessLines(lines, replyTo) => + context.log.info(s"[WorkerActor] Traitement de ${lines.size} lignes") + + val counts = mutable.Map.empty[String, Int] + for (line <- lines) { + val words = line.split("\\W+").filter(_.nonEmpty) + words.foreach { w => + val lw = w.toLowerCase + counts(lw) = counts.getOrElse(lw, 0) + 1 + } + } + + // On renvoie le résultat au Master + replyTo ! MasterActor.PartialCount(counts.toMap) + Behaviors.same + } + } +} \ No newline at end of file