Synchronisation des processus
C'est dans le cadre de processus partageant une même zone mémoire
que le mot << concurrence >> prend tout son sens : les divers processus
impliqués sont en concurrence pour l'accès à cette unique
ressource qu'est la mémoire2. Au problème du partage des
ressources vient s'ajouter celui du manque de maîtrise de
l'alternance et du temps d'exécution des processus concurrents.
Le système qui gère l'ensemble des processus peut à tout moment
interrompre un calcul en cours. Ainsi
lorsque deux processus coopèrent, ils doivent pouvoir garantir
l'intégrité des manipulations de certaines données
partagées. Pour cela, un processus doit pouvoir rester maître de
ces données tant qu'il n'a pas achevé un calcul ou toute autre
opération (par exemple, une acquisition sur un périphérique).
Pour garantir l'exclusivité d'accès aux données
à un seul processus, on met en oeuvre un mécanisme dit d'exclusion mutuelle.
Section critique et exclusion mutuelle
Les mécanismes d'exclusion mutuelle sont implantés à l'aide de
données particulières appelées verrous. Les opérations
sur les verrous sont limitées à leur création, leur pose et leur
libération. Un verrou est la plus petite donnée partagée par un
ensemble de processus concurrents. Sa manipulation est toujours
exclusive. À la notion d'exclusivité de manipulation d'un verrou
s'ajoute celle d'exclusivité de détention : seul le processus
qui a pris un verrou peut le libérer ; si d'autres processus
veulent à leur tour disposer de ce verrou, ils doivent en attendre
la libération par le processus détenteur.
Module Mutex
Le module Mutex est utilisé pour créer des verrous
entre processus en relation d'exclusion mutuelle
sur une zone mémoire. Nous allons illustrer leur utilisation par deux petits
exemples classiques de concurrence.
Les fonctions de création, prise et libération des verrous sont :
# Mutex.create
;;
- : unit -> Mutex.t = <fun>
# Mutex.lock
;;
- : Mutex.t -> unit = <fun>
# Mutex.unlock
;;
- : Mutex.t -> unit = <fun>
Il existe une variante à la prise de verrou qui n'est pas bloquante :
# Mutex.try_lock;;
- : Mutex.t -> bool = <fun>
Si le verrou est déjà pris, la fonction retourne false. Dans
l'autre cas, la fonction prend le verrou et retourne true.
Les philosophes orientaux
Cette petite histoire, due à Dijkstra, illustre un pur problème de
partage de ressources. Elle se raconte ainsi :
<< Cinq philosophes orientaux partagent leur temps entre l'étude et la
venue au réfectoire pour manger un bol de riz. La salle affectée à
la sustentation des philosophes ne comprend qu'une seule table ronde sur
laquelle sont disposés un grand plat de riz (toujours plein) cinq
bols et cinq baguettes. >>
Figure 19.1 : la table des philosophes orientaux
Comme on le voit sur la figure 19.1, un philosophe
qui prend les deux baguettes autour de son bol empêche ses voisins
d'en faire autant. Dès qu'il dépose l'une de ses baguettes, son
voisin, affamé, peut s'en emparer. Le cas échéant, ce dernier devra
attendre que l'autre baguette soit disponible. Les baguettes sont ici
les ressources partagées.
Pour simplifier les choses, on supposera que chaque philosophe a des
habitudes et vient toujours à la même place. On modélise les cinq
baguettes par autant de verrous stockés dans un vecteur
b.
# let
b
=
let
b0
=
Array.create
5
(Mutex.create())
in
for
i=
1
to
4
do
b0.
(i)
<-
Mutex.create()
done;
b0
;;
val b : Mutex.t array = [|<abstr>; <abstr>; <abstr>; <abstr>; <abstr>|]
Manger et méditer sont simulés par une suspension des
processus.
# let
mediter
=
Thread.delay
and
manger
=
Thread.delay
;;
val mediter : float -> unit = <fun>
val manger : float -> unit = <fun>
On modélise un philosophe par une fonction qui exécute à
l'infini la séquence des actions de la petite histoire de
Dijkstra. La prise d'une baguette est simulée par l'obtention d'un
verrou, ainsi un seul philosophe à la fois peut détenir une baguette.
On introduit un petit temps de réflexion entre la prise et le dépôt de
chacune des deux baguettes ainsi que quelques affichages traçant
l'activité du philosophe.
# let
philosophe
i
=
let
ii
=
(i+
1
)
mod
5
in
while
true
do
mediter
3
.
;
Mutex.lock
b.
(i);
Printf.printf
"Le philosophe (%d) prend sa baguette gauche"
i
;
Printf.printf
" et médite encore un peu\n"
;
mediter
0
.
2
;
Mutex.lock
b.
(ii);
Printf.printf
"Le philosophe (%d) prend sa baguette droite\n"
i;
manger
0
.
5
;
Mutex.unlock
b.
(i);
Printf.printf
"Le philosophe (%d) repose sa baguette gauche"
i;
Printf.printf
" et commence déjà à méditer\n"
;
mediter
0
.
1
5
;
Mutex.unlock
b.
(ii);
Printf.printf
"Le philosophe (%d) repose sa baguette droite\n"
i
done
;;
val philosophe : int -> unit = <fun>
On pourra tester ce petit programme en exécutant :
for
i=
0
to
4
do
ignore
(Thread.create
philosophe
i)
done
;
while
true
do
Thread.delay
5
.
done
;;
On suspend, dans la boucle infinie while, le processus
principal de façon à augmenter les chances des processus
philosophes d'entrer en action. On utilise dans la boucle d'activité
des philosophes des délais d'attente dont la durée est choisie
aléatoirement dans le but de créer un peu de disparité dans
l'exécution parallèle des processus.
Problèmes de la solution naïve
Il peut arriver une chose terrible à nos philosophes : ils arrivent
tous en même temps et se saisissent de leur baguette gauche. Dans ce
cas nous sommes dans une situation d'inter-blocage. Plus aucun
philosophe ne pourra manger ! Nous sommes dans un cas de
famine.
Pour éviter cela, les philosophes peuvent reposer une baguette
s'il n'arrivent pas à prendre la deuxième.
Cela est fort courtois, mais permet
à deux philosophes
de se liguer contre un troisième pour l'empêcher de se sustenter
en ne relâchant les baguettes que si l'autre voisin les a prises.
Il existe diverses solutions à ce
problème. L'une d'elle fait l'objet de l'exercice page ??.
Producteurs et consommateurs I
Le couple producteurs-consommateurs est un exemple classique de
la programmation concurrente. Un groupe de processus, désignés comme
les producteurs, est chargé d'emmagasiner des données dans une file
d'attente ; un second groupe, les consommateurs, est chargé de les
déstocker. Chaque intervenant exclut les autres.
Nous implantons ce schéma en utilisant une file d'attente partagée
entre les producteurs et les consommateurs. Pour garantir le bon
fonctionnement de l'ensemble, la file d'attente est manipulée en
exclusion mutuelle afin de garantir l'intégrité des opérations
d'ajout et de retrait.
f est la file d'attente partagée, et m le verrou
d'exclusion mutuelle.
# let
f
=
Queue.create
()
and
m
=
Mutex.create
()
;;
val f : '_a Queue.t = <abstr>
val m : Mutex.t = <abstr>
Nous divisons l'activité d'un producteur en deux parties : créer
un produit (fonction produire) et stocker un produit
(fonction stocker). Seule l'opération de stockage a besoin
du verrou.
# let
produire
i
p
d
=
incr
p
;
Thread.delay
d
;
Printf.printf
"Le producteur (%d) a produit %d\n"
i
!
p
;
flush
stdout
;;
val produire : int -> int ref -> float -> unit = <fun>
# let
stocker
i
p
=
Mutex.lock
m
;
Queue.add
(i,!
p)
f
;
Printf.printf
"Le producteur (%d) a ajouté son %d-ième produit\n"
i
!
p
;
flush
stdout
;
Mutex.unlock
m
;;
val stocker : int -> int ref -> unit = <fun>
Le code du producteur est une boucle sans fin de création et de
stockage. Nous introduisons un délai aléatoire d'attente à la
fin de chaque itération afin d'obtenir un effet de
désynchronisation dans l'exécution.
# let
producteur
i
=
let
p
=
ref
0
and
d
=
Random.float
2
.
in
while
true
do
produire
i
p
d
;
stocker
i
p
;
Thread.delay
(Random.float
2
.
5
)
done
;;
val producteur : int -> unit = <fun>
La seule opération du consommateur est le retrait d'un élément de
la file en prenant garde à l'existence effective d'un produit.
# let
consommateur
i
=
while
true
do
Mutex.lock
m
;
(
try
let
ip,
p
=
Queue.take
f
in
Printf.printf
"Le consommateur(%d) "
i
;
Printf.printf
"a retiré le produit (%d,%d)\n"
ip
p
;
flush
stdout
;
with
Queue.
Empty
->
Printf.printf
"Le consommateur(%d) "
i
;
print_string
"est reparti les mains vides\n"
)
;
Mutex.unlock
m
;
Thread.delay
(Random.float
2
.
5
)
done
;;
val consommateur : int -> unit = <fun>
Le programme de test suivant crée quatre producteurs et quatre consommateurs.
for
i
=
0
to
3
do
ignore
(Thread.create
producteur
i);
ignore
(Thread.create
consommateur
i)
done
;
while
true
do
Thread.delay
5
.
done
;;
Attente et synchronisation
La relation d'exclusion mutuelle n'est pas assez fine pour décrire la
synchronisation entre processus. Il n'est pas rare que le travail
d'un processus dépende de la réalisation d'une action par un autre
processus, modifiant par là une certaine condition. Il est alors
souhaitable que les processus puissent communiquer le fait que cette
condition a pu changer, indiquant aux processus en attente de la
tester de nouveau. Les différents processus sont alors en relation
d'exclusion mutuelle avec communication.
Dans l'exemple précédent, un consommateur, plutôt que repartir les
mains vides pourrait attendre qu'un producteur vienne réapprovisionner
le stock. Ce dernier pourra signaler au consommateur en attente qu'il
y a quelque chose à emporter. Le modèle de l'attente sur condition
d'une prise de verrou est connu sous le nom de sémaphore.
Sémaphores
Un sémaphore est une variable entière s ne pouvant prendre que des
valeurs positives (ou nulles). Une fois s initialisée, les seules
opérations admises sont : wait(s) et signal(s), notées
respectivement P(s) et V(s). Elles sont définies ainsi, s
correspond au nombre de ressources d'un type donné.
-
wait(s) : si s > 0 alors s := s -1, sinon l'exécution du
processus ayant appelé wait(s) est suspendue.
- signal(s) : si un processus a été suspendu lors d'une exécution antérieure
d'un wait(s) alors le réveiller, sinon s := s + 1.
Un sémaphore ne prenant que les valeurs 0 ou 1 est appelé
sémaphore binaire.
Module Condition
Les fonctions du module Condition implantent des primitives
de mise en sommeil et de réveil de processus sur un signal. Un
signal, dans ce cadre, est une variable partagée par l'ensemble des
processus. Son type est abstrait et les fonctions de manipulation
sont :
-
create
- : unit -> Condition.t qui crée
un nouveau signal.
- signal
- : Condition.t -> unit qui
réveille l'un des processus en attente du signal.
- broadcast
- : Condition.t -> unit qui
réveille l'ensemble des processus en attente du signal.
- wait
- : Condition.t -> Mutex.t -> unit
qui suspend le processus appelant sur le signal passé en premier
argument. Le second argument est un verrou utilisé pour protéger
la manipulation du signal. Il est libéré, puis repositionné à
chaque exécution de la fonction.
Producteurs et consommateurs (2)
Nous reprenons l'exemple des producteurs et consommateurs en
utilisant le mécanisme des variables de condition pour mettre en
attente un consommateur arrivant alors que le magasin est vide.
Pour réaliser la synchronisation entre attente d'un consommateur et
production, on déclare :
# let
c
=
Condition.create
()
;;
val c : Condition.t = <abstr>
On modifie la fonction de stockage du producteur en lui ajoutant
l'émission d'un signal :
# let
stocker2
i
p
=
Mutex.lock
m
;
Queue.add
(i,!
p)
f
;
Printf.printf
"Le producteur (%d) a ajoute son %d-ième produit\n"
i
!
p
;
flush
stdout
;
Condition.signal
c
;
Mutex.unlock
m
;;
val stocker2 : int -> int ref -> unit = <fun>
# let
producteur2
i
=
let
p
=
ref
0
in
let
d
=
Random.float
2
.
in
while
true
do
produire
i
p
d;
stocker2
i
p;
Thread.delay
(Random.float
2
.
5
)
done
;;
val producteur2 : int -> unit = <fun>
L'activité du consommateur se fait alors en deux temps :
attendre qu'un produit soit disponible puis emporter le produit. Le
verrou est pris quand l'attente prend fin et il est rendu dès
que le consommateur a emporté son produit. L'attente se fait sur
la variable c.
# let
attendre2
i
=
Mutex.lock
m
;
while
Queue.length
f
=
0
do
Printf.printf
"Le consommateur (%d) attend\n"
i
;
Condition.wait
c
m
done
;;
val attendre2 : int -> unit = <fun>
# let
emporter2
i
=
let
ip,
p
=
Queue.take
f
in
Printf.printf
"Le consommateur (%d) "
i
;
Printf.printf
"emporte le produit (%d, %d)\n"
ip
p
;
flush
stdout
;
Mutex.unlock
m
;;
val emporter2 : int -> unit = <fun>
# let
consommateur2
i
=
while
true
do
attendre2
i;
emporter2
i;
Thread.delay
(Random.float
2
.
5
)
done
;;
val consommateur2 : int -> unit = <fun>
Notons qu'il n'est plus besoin de vérifier l'existence effective d'un
produit puisqu'un consommateur doit attendre l'existence d'un produit
dans la file d'attente pour que sa suspension prenne fin. Vu que la
fin de son attente correspond à la prise du verrou, il ne court pas
le risque de se faire dérober le nouveau produit avant de l'avoir
emporté.
Lecteurs et écrivain
Voici un autre exemple classique de processus concurrents dans lequel
les agents n'ont pas le même comportement vis-à-vis de la donnée
partagée.
Un écrivain et des lecteurs opèrent sur une donnée
partagée. L'action du premier est susceptible de rendre
momentanément les données incohérentes alors que les seconds
n'ont qu'une action passive. La difficulté vient du fait que nous ne
souhaitons pas interdire à plusieurs lecteurs de consulter la
donnée simultanément. Une solution à ce problème est de gérer un
compteur dénombrant les lecteurs en train d'accéder aux
données. L'écriture ne sera acceptable que si le nombre de
lecteurs est nul.
La donnée est symbolisée par l'entier data qui prendra la
valeur 0 ou 1. La valeur 0 signifie que la
donnée est prête pour la lecture :
# let
data
=
ref
0
;;
val data : int ref = {contents=0}
Les opérations sur le compteur n sont protégées par le
verrou m :
# let
n
=
ref
0
;;
val n : int ref = {contents=0}
# let
m
=
Mutex.create
()
;;
val m : Mutex.t = <abstr>
# let
cpt_incr
()
=
Mutex.lock
m
;
incr
n
;
Mutex.unlock
m
;;
val cpt_incr : unit -> unit = <fun>
# let
cpt_decr
()
=
Mutex.lock
m
;
decr
n
;
Mutex.unlock
m
;;
val cpt_decr : unit -> unit = <fun>
# let
cpt_signal
()
=
Mutex.lock
m
;
if
!
n=
0
then
Condition.signal
c
;
Mutex.unlock
m
;;
val cpt_signal : unit -> unit = <fun>
Les lecteurs mettent à jour le compteur et émettent le signal
c lorsque plus aucun lecteur n'est présent. Ils indiquent
par là à l'écrivain qu'il peut entrer en action.
# let
c
=
Condition.create
()
;;
val c : Condition.t = <abstr>
# let
lire
i
=
cpt_incr
()
;
Printf.printf
"Le lecteur (%d) lit (donnée=%d)\n"
i
!
data
;
Thread.delay
(Random.float
1
.
5
)
;
Printf.printf
"Le lecteur (%d) a fini sa lecture\n"
i
;
cpt_decr
()
;
cpt_signal
()
;;
val lire : int -> unit = <fun>
# let
lecteur
i
=
while
true
do
lire
i
;
Thread.delay
(Random.float
1
.
5
)
done
;;
val lecteur : int -> unit = <fun>
L'écrivain tente de bloquer le compteur pour interdire aux lecteurs
d'accéder à la donnée partagée. Mais il ne peut le faire que
si le compteur est nul, sinon il attend le signal lui
indiquant que c'est le cas.
# let
ecrire
()
=
Mutex.lock
m
;
while
!
n<>
0
do
Condition.wait
c
m
done
;
print_string
"L'écrivain écrit\n"
;
flush
stdout
;
data
:=
1
;
Thread.delay
(Random.float
1
.
)
;
data
:=
0
;
Mutex.unlock
m
;;
val ecrire : unit -> unit = <fun>
# let
ecrivain
()
=
while
true
do
ecrire
()
;
Thread.delay
(Random.float
1
.
5
)
done
;;
val ecrivain : unit -> unit = <fun>
On crée pour tester ces fonctions un écrivain et six lecteurs.
ignore
(Thread.create
ecrivain
());
for
i=
0
to
5
do
ignore(Thread.create
lecteur
i)
done;
while
true
do
Thread.delay
5
.
done
;;
Cette solution garantit que ni l'écrivain ni les lecteurs n'ont accès
aux données en même temps. Par contre, rien ne garantit que l'écrivain
puisse un jour remplir son office, nous sommes confrontés là encore
à un cas de famine.