Wordcount fonctionnel

- Timeout to permit to worker to join cluster
- wordcount
- sort
This commit is contained in:
TM-Squared
2025-02-03 18:07:20 +01:00
commit 50235b051f
17 changed files with 509 additions and 0 deletions

47
.gitignore vendored Normal file
View File

@@ -0,0 +1,47 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
out/
!**/src/main/**/out/
!**/src/test/**/out/
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
bin/
!**/src/main/**/bin/
!**/src/test/**/bin/
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store
### Scala ###
.bsp/

8
.idea/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

7
.idea/codeStyles/Project.xml generated Normal file
View File

@@ -0,0 +1,7 @@
<component name="ProjectCodeStyleConfiguration">
<code_scheme name="Project" version="173">
<ScalaCodeStyleSettings>
<option name="MULTILINE_STRING_CLOSING_QUOTES_ON_NEW_LINE" value="true" />
</ScalaCodeStyleSettings>
</code_scheme>
</component>

5
.idea/codeStyles/codeStyleConfig.xml generated Normal file
View File

@@ -0,0 +1,5 @@
<component name="ProjectCodeStyleConfiguration">
<state>
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
</state>
</component>

6
.idea/misc.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" languageLevel="JDK_11" default="true" project-jdk-name="11" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>

19
.idea/sbt.xml generated Normal file
View File

@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ScalaSbtSettings">
<option name="linkedExternalProjectsSettings">
<SbtProjectSettings>
<option name="converterVersion" value="2" />
<option name="externalProjectPath" value="$PROJECT_DIR$" />
<option name="modules">
<set>
<option value="$PROJECT_DIR$" />
<option value="$PROJECT_DIR$/project" />
</set>
</option>
<option name="sbtVersion" value="1.10.7" />
<option name="separateProdAndTestSources" value="true" />
</SbtProjectSettings>
</option>
</component>
</project>

7
.idea/scala_compiler.xml generated Normal file
View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ScalaCompilerConfiguration">
<option name="separateProdTestSources" value="true" />
<profile name="sbt 1" modules="Wordcount,Wordcount.main,Wordcount.test" />
</component>
</project>

7
.idea/scala_settings.xml generated Normal file
View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ScalaProjectSettings">
<option name="migrateConfigurationsNotificationShown" value="true" />
<option name="scala3DisclaimerShown" value="true" />
</component>
</project>

6
.idea/vcs.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

15
build.sbt Normal file
View File

@@ -0,0 +1,15 @@
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "3.3.5"
lazy val root = (project in file("."))
.settings(
name := "Wordcount",
version := "0.1",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % "2.8.8",
"com.typesafe.akka" %% "akka-cluster-typed" % "2.8.8",
"com.typesafe.akka" %% "akka-serialization-jackson" % "2.8.8",
"ch.qos.logback" % "logback-classic" % "1.5.16",
)
)

1
project/build.properties Normal file
View File

@@ -0,0 +1 @@
sbt.version = 1.10.7

View File

@@ -0,0 +1,38 @@
akka {
actor {
provider = "cluster"
# Désactive la serialization Java
allow-java-serialization = off
warn-about-java-serialization-usage = on
# Binde toutes les classes qui héritent de CborSerializable à Jackson CBOR
serialization-bindings {
"cm.gintou.CborSerializable" = jackson-cbor
}
}
remote {
artery {
canonical {
hostname = "127.0.0.1"
port = 0
}
transport = tcp
}
}
cluster {
# On laisse seed-nodes vide ici.
# On le renseignera via la ligne de commande ou on lancera manuellement des noeuds seed.
seed-nodes = [
"akka://WordCountCluster@127.0.0.1:2551"
]
}
# (Optionnel) Ajustez la compression, etc.
serialization.jackson.cbor {
compression = off
}
}

View File

@@ -0,0 +1,3 @@
package cm.gintou
trait CborSerializable

View File

@@ -0,0 +1,42 @@
package cm.gintou
import akka.actor.typed._
import akka.actor.typed.scaladsl.Behaviors
object GuardianActor {
sealed trait Command
case object Start extends Command
def apply(role: String, filePathOpt: Option[String]): Behavior[Command] =
Behaviors.setup { context =>
// On démarre soit un MasterActor soit un WorkerActor
role match {
case "master" =>
filePathOpt match {
case Some(filePath) =>
// Crée l'acteur maître
context.log.info(s"[Guardian] Rôle = master, lancement MasterActor avec fichier: $filePath")
val masterRef = context.spawn(MasterActor(filePath), "MasterActor")
// On envoie un message de démarrage
masterRef ! MasterActor.StartProcessing
case None =>
context.log.error("Aucun chemin de fichier fourni pour le rôle master !")
Behaviors.stopped
}
case "worker" =>
context.log.info("[Guardian] Rôle = worker, lancement WorkerActor")
context.spawn(WorkerActor(), "WorkerActor")
case other =>
context.log.error(s"Rôle inconnu : $other")
Behaviors.stopped
}
Behaviors.receiveMessage {
case Start =>
Behaviors.same
}
}
}

View File

@@ -0,0 +1,211 @@
package cm.gintou
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler}
import akka.actor.typed.{ActorRef, Behavior}
import akka.cluster.typed.Cluster
import akka.cluster.MemberStatus
import akka.cluster.typed.Subscribe
import akka.cluster.ClusterEvent.MemberUp
import akka.cluster.ClusterEvent.MemberEvent
import akka.cluster.ClusterEvent.MemberRemoved
import akka.cluster.ClusterEvent.MemberJoined
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import scala.io.Source
import scala.collection.mutable
import scala.concurrent.duration._
import java.io.{File, PrintWriter}
object MasterActor {
// --- Messages ---
sealed trait Command extends CborSerializable
case object StartProcessing extends Command
// Timeout interne pour attendre larrivée des workers
private case object WaitForWorkersTimeout extends Command
case class PartialCount(wordsCount: Map[String, Int]) extends Command
private case class ListingResponse(listing: Receptionist.Listing) extends Command
private case class ClusterEvent(event: MemberEvent) extends Command
// --- État interne ---
final case class State(
filePath: String,
lines: List[String],
aggregatedCounts: mutable.Map[String, Int],
knownWorkers: Set[ActorRef[WorkerActor.Command]]
)
// --- Comportement principal ---
def apply(
filePath: String,
waitForWorkersDuration: FiniteDuration = 30.seconds
): Behavior[Command] = Behaviors.withTimers { timers =>
Behaviors.setup { context =>
// S'abonner aux events du cluster (optionnel, juste pour log)
val cluster = Cluster(context.system)
val clusterSubscriptionAdapter: ActorRef[MemberEvent] =
context.messageAdapter(e => ClusterEvent(e))
cluster.subscriptions ! Subscribe(clusterSubscriptionAdapter, classOf[MemberEvent])
// S'abonner au Receptionist pour découvrir les Workers
val listingAdapter: ActorRef[Receptionist.Listing] =
context.messageAdapter(listing => ListingResponse(listing))
context.system.receptionist ! Receptionist.Subscribe(WorkerActor.WorkerServiceKey, listingAdapter)
// État initial
val initialState = State(
filePath = filePath,
lines = Nil,
aggregatedCounts = mutable.Map.empty[String, Int],
knownWorkers = Set.empty
)
Behaviors.receiveMessage {
case StartProcessing =>
// 1) Lire le fichier
val lines = Source.fromFile(filePath).getLines().toList
context.log.info(s"[MasterActor] StartProcessing -> lecture du fichier $filePath, ${lines.size} lignes")
val newState = initialState.copy(lines = lines)
// 2) Démarrer un timer de X secondes pour attendre larrivée de workers
timers.startSingleTimer(WaitForWorkersTimeout, WaitForWorkersTimeout, waitForWorkersDuration)
context.log.info(
s"[MasterActor] Timer démarré pour $waitForWorkersDuration. " +
s"En attendant, on espère découvrir des workers..."
)
masterBehavior(newState)(timers, context)
case other =>
context.log.info(s"[MasterActor] Reçu un message inattendu avant StartProcessing: $other")
Behaviors.same
}
}
}
// --- Comportement après StartProcessing ---
private def masterBehavior(
state: State
)(
timers: TimerScheduler[Command],
ctx: ActorContext[Command]
): Behavior[Command] = Behaviors.receiveMessage {
// Mise à jour de la liste knownWorkers (Receptionist)
case ListingResponse(WorkerActor.WorkerServiceKey.Listing(workers)) =>
val newState = state.copy(knownWorkers = workers)
ctx.log.info(s"[MasterActor] Mise à jour du listing : ${workers.size} worker(s) découvert(s).")
masterBehavior(newState)(timers, ctx)
case ListingResponse(otherListing) =>
// Tous les autres listings (autres clés ou cas non prévus)
ctx.log.debug(s"[MasterActor] Ignored listing: $otherListing")
Behaviors.same
// Réception d'un comptage partiel
case partial: PartialCount =>
val newState = aggregateCounts(state, partial)
masterBehavior(newState)(timers, ctx)
// Le timer a expiré
case WaitForWorkersTimeout =>
if (state.knownWorkers.isEmpty) {
// Pas de workers -> Stratégie 2 = faire le travail en local
ctx.log.warn(s"[MasterActor] Aucun worker distant détecté après le timeout. " +
"On va créer des workers locaux pour faire le traitement.")
val localWorkerRefs = spawnLocalWorkers(2, ctx) // Ex : 2 workers locaux
val newState = state.copy(knownWorkers = localWorkerRefs)
distributeWork(newState, ctx)
masterBehavior(newState)(timers, ctx)
} else {
ctx.log.info(s"[MasterActor] Timeout expiré, on a déjà ${state.knownWorkers.size} worker(s) -> distribution.")
distributeWork(state, ctx)
Behaviors.same
}
// On reçoit à nouveau StartProcessing ? (Peu probable)
case StartProcessing =>
ctx.log.warn("[MasterActor] StartProcessing reçu mais on a déjà commencé.")
Behaviors.same
// Événements cluster
case ClusterEvent(event) =>
event match {
case MemberUp(member) =>
ctx.log.info(s"[MasterActor] Membre UP : ${member.address} (roles=${member.roles})")
case MemberRemoved(member, _) =>
ctx.log.info(s"[MasterActor] Membre removed : ${member.address} (roles=${member.roles})")
case _ =>
ctx.log.info(s"[MasterActor] Autre event cluster : $event")
}
Behaviors.same
}
// ----------------------------------------------------------------
// Méthodes utilitaires
// ----------------------------------------------------------------
private def spawnLocalWorkers(
n: Int,
context: ActorContext[Command]
): Set[ActorRef[WorkerActor.Command]] = {
(1 to n).map { i =>
val ref = context.spawn(WorkerActor(), s"LocalWorker-$i")
ref
}.toSet
}
private def aggregateCounts(state: State, partial: PartialCount): State = {
partial.wordsCount.foreach {
case (word, count) =>
val newCount = state.aggregatedCounts.getOrElse(word, 0) + count
state.aggregatedCounts.update(word, newCount)
}
println(s"[MasterActor] Agrégation partielle. Nombre total de mots distincts = ${state.aggregatedCounts.size}")
// ==== Écriture dans un fichier ====
storeResultInFile(state, "result.txt")
state
}
private def distributeWork(state: State, context: ActorContext[Command]): Unit = {
val totalLines = state.lines.size
val workerCount = state.knownWorkers.size
if (workerCount == 0) {
context.log.warn("[MasterActor] Pas de workers pour distribuer le travail !")
return
}
val chunkSize = (totalLines.toDouble / workerCount).ceil.toInt
val workersList = state.knownWorkers.toList
workersList.zipWithIndex.foreach { case (workerRef, idx) =>
val start = idx * chunkSize
val end = math.min(start + chunkSize, totalLines)
val subset = state.lines.slice(start, end)
context.log.info(s"[MasterActor] -> Worker ${idx + 1}/$workerCount reçoit ${subset.size} lignes.")
workerRef ! WorkerActor.ProcessLines(subset, context.self)
}
}
private def storeResultInFile(state: State, outputFile: String): Unit = {
val writer = new PrintWriter(new File(outputFile))
try {
// On trie les clés pour un affichage ordonné
val sortedKeys = state.aggregatedCounts.keys.toList.sorted
sortedKeys.foreach { word =>
val count = state.aggregatedCounts(word)
writer.println(s"$word -> $count")
}
} finally {
writer.close()
}
}
}

View File

@@ -0,0 +1,50 @@
package cm.gintou
import akka.actor.typed.ActorSystem
import com.typesafe.config.ConfigFactory
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.Try
object WordCountApp {
def main(args: Array[String]): Unit = {
/*
Paramètres attendus :
1) role : "master" ou "worker"
2) port : le port sur lequel ce nœud va écouter
3) filePath : si role = "master", chemin vers le fichier
Exemples de lancement en local :
sbt "runMain cm.gintou.WordCountApp master 2551 /chemin/vers/monFichier.txt"
sbt "runMain cm.gintou.WordCountApp worker 2552"
*/
if (args.length < 2) {
println("Usage:\n WordCountApp master <port> <filePath>\n WordCountApp worker <port>")
System.exit(1)
}
val role = args(0)
val port = args(1).toInt
val filePathOpt = if (role == "master" && args.length >= 3) Some(args(2)) else None
// On construit une configuration Akka en surchargeant le port et en attribuant un rôle
val config = ConfigFactory.parseString(
s"""
|akka.remote.artery.canonical.port = $port
|akka.cluster.roles = [ "$role" ]
|""".stripMargin
).withFallback(ConfigFactory.load())
// On crée l'ActorSystem avec l'acteur guardian
val system = ActorSystem(
GuardianActor(role, filePathOpt),
"WordCountCluster",
config
)
Await.result(system.whenTerminated, Duration.Inf)
}
}

View File

@@ -0,0 +1,37 @@
package cm.gintou
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import scala.collection.mutable
object WorkerActor {
val WorkerServiceKey: ServiceKey[Command] = ServiceKey[Command]("WorkerService")
sealed trait Command extends CborSerializable
final case class ProcessLines(lines: List[String], replyTo: ActorRef[MasterActor.Command]) extends Command
def apply(): Behavior[Command] =
Behaviors.setup { context =>
context.system.receptionist ! Receptionist.Register(WorkerServiceKey, context.self)
Behaviors.receiveMessage {
case ProcessLines(lines, replyTo) =>
context.log.info(s"[WorkerActor] Traitement de ${lines.size} lignes")
val counts = mutable.Map.empty[String, Int]
for (line <- lines) {
val words = line.split("\\W+").filter(_.nonEmpty)
words.foreach { w =>
val lw = w.toLowerCase
counts(lw) = counts.getOrElse(lw, 0) + 1
}
}
// On renvoie le résultat au Master
replyTo ! MasterActor.PartialCount(counts.toMap)
Behaviors.same
}
}
}