Processus concurrents
L'écriture d'une application composée de plusieurs processus concurrents
fait perdre la propriété de déterminisme des programmes séquentiels.
Pour des processus partageant une même zone mémoire, le résultat du programme
suivant ne peut pas être déduit de sa lecture.
programme principal |
lex x = ref 1 ;; |
processus P |
processus Q |
x := ! x + 1 ;; |
x := ! x * 2 ;; |
|
À la fin de l'exécution de P et Q, la référence x peut
valoir 2, 3 ou 4, selon l'ordre de calcul de chaque processus.
Cet indéterminisme vaut également pour la terminaison. Comme
l'état mémoire dépend du déroulement de chaque processus parallèle,
une application peut ne pas terminer pour une certaine exécution et
terminer dans une autre. Pour apporter un certain contrôle à
l'exécution, les processus doivent se synchroniser.
Pour des processus utilisant des mémoires distinctes, mais
communiquant entre eux, leur interaction dépend du type de
communication. On introduit pour l'exemple suivant deux primitives de
communication : send qui envoie une valeur en indiquant le
destinataire et receive qui reçoit une valeur d'un
processus. Soient deux processus communicants P et Q :
processus P |
processus Q |
lex x = ref 1 ;; |
lex y = ref 1 ;; |
send(Q,! x); |
y := ! y + 3 ; |
x := ! x * 2 ; |
y := ! y + receive(P); |
send(Q,! x); |
send(P,! y); |
x := ! x + receive(Q); |
y := ! y + receive(P); |
Dans le cas d'une communication évanescente, le processus Q peut
rater les émissions de P. On retombe dans le non-déterminisme du
modèle précédent.
Pour une communication asynchrone, le médium du canal de communication
conserve les différentes valeurs transmises. Seule la réception est
bloquante. Le processus P peut être en attente sur Q, bien que ce
dernier n'ait pas encore lu les deux envois de P. Ce qui ne
l'empêche pas d'émettre.
Pour une communication synchrone, l'émission est elle aussi bloquante.
Dans notre exemple le send(Q,!
x) de P attend que Q
soit en réception (receive(P)). Une fois l'information
transmise, les deux processus continuent leur chemin. Malheureusement,
dans notre exemple, P et Q se retrouvent sur une instruction
d'émission bloquante, et le programme n'avancera plus.
On peut classer les applications concurrentes en cinq catégories
suivant que les unités de programme les composant sont :
-
sans relation ;
- avec relation mais sans synchronisation ;
- avec relation d'exclusion mutuelle ;
- avec relation d'exclusion mutuelle et communication ;
- avec relation, sans exclusion mutuelle et avec communication synchrone.
La difficulté de réalisation vient principalement des dernières
catégories. Nous allons à présent voir comment résoudre ces
difficultés en utilisant les bibliothèques d'Objective CAML.
Compilation avec processus légers
La bibliothèque sur les threads d'Objective CAML est découpée en
cinq modules dont les quatre premiers définissent chacun des types
abstraits :
-
module Thread : création et exécution de processus légers
(type Thread.t);
- module Mutex : création, pose et libération de verrous
(type Mutex.t);
- module Condition : création de conditions (signaux),
attente et réveil sur condition (type Condition.t);
- module Event : création de canaux de communication
(type 'a Event.channel), des valeurs y transitant
(type 'a Event.event), et des fonctions de communication.
- module ThreadUnix : redéfinition des fonctions
d'entrées-sorties du module Unix
pour qu'elles ne soient pas bloquantes.
Cette bibliothèque ne fait pas partie de la bibliothèque d'exécution
d'Objective CAML. Son utilisation nécessite soit de compiler ses programmes
avec l'option -custom, soit de construire une nouvelle boucle
d'interaction en
utilisant les commandes :
$ ocamlc -thread -custom threads.cma fichiers.ml -cclib -lthreads
$ ocamlmktop -thread -custom -o threadtop thread.cma -cclib -lthreads
La bibliothèque Threads n'est utilisable
avec le compilateur natif que si le système
d'exploitation implante des processus légers conformes à la
norme POSIX 10031. On compile alors les exécutables en
ajoutant les bibliothèques unix.a et pthread.a :
$ ocamlc -thread -custom threads.cma fichiers.ml -cclib -lthreads \
-cclib -lunix -cclib -lpthread
$ ocamltop -thread -custom threads.cma fichiers.ml -cclib -lthreads \
-cclib -lunix -cclib -lpthread
$ ocamlcopt -thread threads.cmxa fichiers.ml -cclib -lthreads \
-cclib -lunix -cclib -lpthread
Module Thread
Le module Thread d'Objective CAML contient les primitives de
création et de gestion des processus légers. Nous n'en ferons pas
une présentation exhaustive, en particulier les opérations d'entrées-sorties
sur les fichiers ont été décrites au chapitre
précédent.
La création d'un processus léger se fait par appel à :
# Thread.create
;;
- : ('a -> 'b) -> 'a -> Thread.t = <fun>
Le premier argument, de type ('a -> 'b), correspond à la
fonction exécutée par le processus créé ; le second argument, de
type 'a, est l'argument attendu par la fonction
exécutée ; le résultat de l'appel est le descripteur associé
au processus. Le processus ainsi créé est détruit automatiquement
lorsque la fonction associée termine.
Connaissant son descripteur, on peut demander l'exécution d'un
processus et en attendre la fin en utilisant la fonction
join. En voici un exemple d'utilisation :
# let
f_proc1
()
=
for
i=
0
to
1
0
do
Printf.printf
"(%d)"
i;
flush
stdout
done;
print_newline()
;;
val f_proc1 : unit -> unit = <fun>
# let
t1
=
Thread.create
f_proc1
()
;;
val t1 : Thread.t = <abstr>
# Thread.join
t1
;;
(0)(1)(2)(3)(4)(5)(6)(7)(8)(9)(10)
- : unit = <unknown constructor>
Warning
Le résultat de l'exécution d'un processus n'est pas récupéré par
le processus père, mais perdu quand le processus fils termine.
On peut également interrompre brutalement le déroulement d'un
processus dont on connaît le descripteur par la fonction
kill. Créons, par exemple, un processus pour l'interrompre
immédiatement :
# let
n
=
ref
0
;;
val n : int ref = {contents=0}
# let
f_proc1
()
=
while
true
do
incr
n
done
;;
val f_proc1 : unit -> unit = <fun>
# let
go
()
=
n
:=
0
;
let
t1
=
Thread.create
f_proc1
()
in
Thread.kill
t1
;
Printf.printf
"n = %d\n"
!
n
;;
val go : unit -> unit = <fun>
# go
()
;;
n = 0
- : unit = ()
Un processus peut mettre fin à sa propre activité par la fonction :
# Thread.exit
;;
- : unit -> unit = <fun>
Il peut suspendre son activité pendant un temps donné par appel à :
# Thread.delay
;;
- : float -> unit = <fun>
L'argument indique le nombre de secondes d'attente.
Reprenons l'exemple précédent en lui ajoutant des temporisations. Nous
créons un premier processus t1 dont la fonction associée
f_proc2 crée à son tour un processus t2 qui
exécute f_proc1, puis f_proc2 attend un délai de
d secondes, met fin à t2. À la terminaison de
t1, on affiche le contenu de n.
# let
f_proc2
d
=
n
:=
0
;
let
t2
=
Thread.create
f_proc1
()
in
Thread.delay
d
;
Thread.kill
t2
;;
val f_proc2 : float -> unit = <fun>
# let
t1
=
Thread.create
f_proc2
0
.
2
5
in
Thread.join
t1
;
Printf.printf
"n = %d\n"
!
n
;;
n = 36123
- : unit = ()