Article
· Nov 9, 2023 14m de lecture

Inter-process communication avec $SYSTEM.Event

Salut les devs,

Actuellement, je travaille sur un projet qui requiert une gestion hautement dynamique des événements. Dans le contexte du langage de programmation Java, mon premier choix aurait été d'opter instinctivement pour l'utilisation du "Observer Pattern". Le "Observer Pattern" représente une approche pour gérer les interactions entre les objets en établissant un mécanisme de notification. Il permet à plusieurs observateurs de réagir de manière autonome aux changements d'état d'un sujet, favorisant ainsi la flexibilité et la modularité du code. Si vous n'êtes pas familier avec ce modèle de conception, vous pouvez trouver de plus amples informations à ce sujet sur ce Wikipedia

Bien que ce soit naturel et couramment utilisé dans certains langages de programmations comme le Java ou le C++, en ObjectScript pas du tout. 
Ces langages diffèrent considérablement, Java permet de lancer des threads, tandis qu'en ObjectScript, nous lançons de nouveaux processus. Cette différence est notable dans plusieurs aspects :

  • Les threads: partagent la même mémoire, ce qui facilite la communication et le partage de données entre eux. Cependant, cela peut également entraîner des problèmes de concurrence si la synchronisation n'est pas gérée correctement.
  • Les processus: sont des exécutions indépendantes avec leur propre espace mémoire. Ils offrent une isolation complète les uns par rapport aux autres, ce qui signifie qu'un processus ne peut pas accéder directement à la mémoire d'un autre.  En raison de leur isolation, les processus sont plus sûrs en cas d'erreurs, car un processus défectueux n'affectera pas les autres processus du système.  Cependant, la communication entre processus est plus complexe que la communication entre threads, car elle nécessite des mécanismes de communication inter-processus (IPC) tels que les sockets, les tubes (pipes), ou les files de messages.

Cette différence fondamentale demande une approche différente. Cependant, ne serait-il pas envisageable de développer rapidement une bibliothèque pour résoudre ce problème? C'est la question à laquelle je me suis confronté, et je vais à présent vous présenter une implémentation. Comme c'est souvent le cas dans mes articles, vous trouverez les sources sur un repository GitHub, ainsi qu'un package disponible sur Open Exchange.

L'objectif n'est pas de reproduire l'Observer Pattern, mais plutôt de développer une solution permettant de faciliter la communication inter-processus, de manière à notifier les processus en attente d'un événement.

Prenons l'exemple d'un processus qui gère la création de patients. Dans ce scénario, nous pourrions représenter la séquence d'activités suivante :

Le processus A informe le gestionnaire des événements "Patient" et est chargé d'envoyer un message à tous les observateurs enregistrés. Pour ce faire, il est essentiel de déterminer comment mettre en place la communication inter-processus. En consultant la documentation, j'ai rapidement découvert une classe parfaite pour résoudre ce problème $SYSTEM.Event.

Vous pouvez effectuer un test rapide en ouvrant deux sessions terminal IRIS. La première session est destinée à créer la ressource et à écouter les messages entrants, tandis que la seconde session est utilisée pour envoyer des notifications

 

Session en écoute Session qui notifie
Do $SYSTEM.Event.Create("MyResource")

Set message = $SYSTEM.Event.WaitMsg("MyResource", 10)

 

Do $SYSTEM.Event.Signal("MyResource","Hello, from PID:"_$Job)



 

 

Suite à l’envoi du signal, le contenu de la variable "message" sera : 

$lb(1,"Hello, this is a message from PID:710")

La première position est un code de retour:

  • 1: message reçu.
  • 0: timeout (10 secondes dans l’exemple ci-dessus).
  • -1: situation d’erreur ou la resource “MyResource” serait supprimée.

Note: Il n'est pas obligatoire de créer la ressource avec $SYSTEM.Event.Create, dans ce cas, l'attente peut se faire avec la ligne suivante:

Set message = $SYSTEM.Event.WaitMsg("",10) 

Il faut alors que le signal précise le PID du processus en écoute:

$SYSTEM.Event.Signal(<pid>, "Hello from:"_$Job)

Dans cet exemple, nous avons transmis une simple chaîne de caractères, car il n'est pas possible de passer directement des objets. Comme expliqué précédemment dans cet article, un processus ne peut pas accéder à la mémoire d'un autre processus, ce qui rend impossible la transmission de références d'objets. Toutefois, grâce à l'existence de la classe %DynamicObject, nous avons la possibilité de convertir facilement des objets en chaînes de caractères JSON et vice versa. Ainsi, nous pouvons faire :

Do $SYSTEM.Event.Signal("MyResource",{"PID":($JOB),"Msg":"Hello!"}.%ToJSON())

; Et lors de la réception du message : 
Set object = {}.%FromJSON($ListGet(message,2))

De plus, si vous travaillez avec une version récente d'IRIS, vous avez la possibilité d'étendre vos classes existantes en utilisant %JSON.Adaptor
et d'exporter facilement leurs instances avec la méthode %JSONExportToString.

En combinant l'utilisation de $SYSTEM.Event et %DynamicObject, vous pouvez réaliser en seulement quelques lignes la communication entre des processus, en envoyant aussi bien des messages simples que des objets.

Il est important de noter que bien que $SYSTEM.Event soit très pratique, il ne fonctionnera pas sur un système ECP (Enterprise Cache Protocol), comme mentionné dans la documentation officielle :

There is no networking support for these functions - processes can only wait on and awaken resources on the same system.

Il est impératif d'exercer une grande prudence lors de l'utilisation de $SYSTEM.Event, car il repose sur "SharedMemory", un espace limité et essentiel pour le fonctionnement du système. Si un processus envoie un grand nombre de signaux et que le processus de destination ne peut pas les traiter suffisamment rapidement, la file d'attente de signaux va s'allonger, ce qui peut entraîner un effondrement de votre système. Voici un aperçu de ce que vous pourriez retrouver dans le fichier "messages.log" :

 

10/04/23-08:33:47:501 (1057) 1 [Generic.Event] SMH Surrender Stage 1 started.
10/04/23-08:33:47:542 (1057) 1 [Generic.Event] SMH Surrender Stage 2 started.
10/04/23-08:33:48:547 (818) 2 [Utility.Event] ISCLOG: WorkMgr appendError: Error ns=%SYS rtn=%SYS.WorkQueueMgr  data=$lb("rc","0 "_$lb($lb(5002,"<STORE>RunDaemon+167^%SYS.WorkQueueMgr",,,,,,,,$lb(,"%SYS",$lb("D^RunDaemon+167^%SYS.WorkQueueMgr +1","D^StartWorkDaemon+4^STU +1"))))/* ERROR #5002: ObjectScript error: <STORE>RunDaemon+167^%SYS.WorkQueueMgr */,"group",,"stack",$lb("d^RunDaemon+226^%SYS.WorkQueueMgr^1","d^StartWorkDaemon+4^STU^1","d^^^0"),"$zu(56,2)","$Id: //iris/2023.2.0/kernel/common/src/events.c#1 $ 568 4")
10/04/23-08:34:00:202 (1033) 2 [Utility.Event] [SYSTEM MONITOR] SMHState Alert: Shared Memory Heap state Troubled
10/04/23-08:34:00:978 (1032) 0 [Utility.Event] Task Manager Error from CheckSchedule - Error 0
10/04/23-08:34:51:308 (1063) 2 [Utility.Event] ISCLOG: CSPServer  Error displaying login page $ZE= ns=%SYS rtn=%SYS.cspServer
10/04/23-08:35:00:290 (1033) 2 [Utility.Event] [SYSTEM MONITOR] SMHPercentFull Alert: SMHPercentFull = 100, 100, 100 (Max value is 98).

Pour prévenir une telle situation, il est possible d'opter pour une communication bidirectionnelle. Lorsqu'un signal est envoyé, le processus attend une confirmation "ACK" avant de poursuivre, comme illustré dans l'exemple suivant ::

Do $SYSTEM.Event.Create("MyResource")
Write !,"Type < ctrl+c > to stop listening."
Try {
    For  {
        Set result = $SYSTEM.Event.WaitMsg("MyResource",10)
        Set code = $ListGet(result,1)
        Continue:code=0
        Quit:code=-1
        Set event = {}.%FromJSON($ListGet(result,2))
        Write !, "Content : ", event.Content
        Do $SYSTEM.Event.Signal(event.PID) ; Send aknowledgement
    }
} Catch ex {
    If $SYSTEM.Event.Defined("MyResource") {
        Do $SYSTEM.Event.Clear("MyResource")
        Do $SYSTEM.Event.Delete("MyResource")
    }
}

 

For i=1:1:100 {
    Do $SYSTEM.Event.Signal("MyResource",{"PID":($Job),"Content":"Hello!"})
    Set code = $SYSTEM.Event.Wait("",3) ; Wait acknowledgement
    If code < 1 {
        Write "Something went wrong."
        If $SYSTEM.Event.Defined("MyResource") {
            Do $SYSTEM.Event.Clear("MyResource")
            Do $SYSTEM.Event.Delete("MyResource")
        }
        Quit
    }
}



En adoptant cette approche, la file d'attente de votre processus ne contiendra jamais plus d'un message à la fois. Assurez-vous également de toujours effectuer une opération "Clear" avant le "Delete" pour éviter que des messages non traités ne restent indéfiniment dans la file d'attente.

 

Maintenant que nous avons établi les bases, nous pouvons commencer par implémenter une classe de base "abstraite" qui jouera le rôle de Listener.

Class dc.ipcutils.ListenerAbstract Extends %RegisteredObject
{

Parameter EVENTTYPE;
Parameter WAITTIMEOUT = 10;
Parameter VERBOSE = 0;
Property ResourceName As %String [ Internal ];
Property Verbose As %Boolean [ InitialExpression = {$Get(%zverbose,..#VERBOSE)}, Internal ];
Property EventType As %String [ InitialExpression = {..#EVENTTYPE} ];
Property Event As %DynamicObject;
/// could be a string or a dynamicobject
Property Data;
Property Context As %DynamicObject;
Property LastState As %Integer [ InitialExpression = 0 ];
Method OnStartListen(Context As %DynamicObject = {{}}) As %Status
{
    Set ..ResourceName = ..GenResourceName()
    Do $SYSTEM.Event.Create(..ResourceName), ##class(dc.ipcutils.Manager).Subscribe(##this, Context)
    Write:..Verbose !, $zdt($h,3,1), " + Listening ", ..EventType, " with resourcename ", ..ResourceName, " started."
    Quit $$$OK
}

Method Listen() As %Status
{
    Set sc = $$$OK
    $$$QuitOnError(..OnStartListen())
    
    Try {
        For  If ..Wait() = -1 $$$ThrowStatus($$$ERROR($$$GeneralError,$$$FormatText("Resource %1 deleted.",..ResourceName)))
    } Catch Ex {
        If Ex.Name '[ "<INTERRUPT>" Set sc = Ex.AsStatus()
    }
    
    Quit $$$ADDSC(sc,..OnStopListen())
}

Method Wait(
	TimeOut As %Integer = {..#WAITTIMEOUT},
	sc As %Status = {$$$OK}) As %Integer
{
    Do:..LastState=1 ..SendAck() ; We have to send a ack before waiting a new incoming message.
    Set result = $SYSTEM.Event.WaitMsg(..ResourceName, TimeOut)
    Set ..LastState = $ListGet(result,1)

    If ..LastState < 1 Quit ..LastState
    Set ..Event = {}.%FromJSON($ListGet(result,2))
    Set ..Data = ..Event.Data
    Set ..Context = ..Event.Context
    
    Do ..Event.%Remove("Data")
    Do ..Event.%Remove("Context")

    Try {
        Do ..Update(..Event, ..Data, ..Context)
    } Catch (ex) {
        Set sc = ex.AsStatus()
        Set ^Listener.Err("last") = $zdt($h,3,1)_" "_$SYSTEM.Status.GetOneErrorText(sc)
    }

    Quit ..LastState
}

Method SendAck()
{
    Do $SYSTEM.Event.Signal(..Event.PIDSource,..Event.%ToJSON())
}

Method Update(
	EventObject As %DynamicObject,
	Data As %DynamicObject,
	Context As %DynamicObject) As %Status
{
    Quit $$$OK
}

Method WaitEvent(
	Output Event As %DynamicObject,
	TimeOut As %Integer = {..#WAITTIMEOUT}) As %Integer
{
    Set result = $SYSTEM.Event.WaitMsg(..ResourceName, TimeOut), returnCode = $ListGet(result,1), Event = ""
    If returnCode < 1 Quit returnCode
    
    Set ..Event = {}.%FromJSON($ListGet(result,2))
    Set ..Data = Event.Data
    Set ..Context = Event.Context
    Do ..Event.%Remove("Data"), ..Event.%Remove("Context")

    Quit returnCode
}

Method OnStopListen(Context As %DynamicObject = {{}}) As %Status
{
    Write:..Verbose !, $zdt($h,3,1), " - Listening ", ..EventType, " with resourcename ", ..ResourceName, " has been STOPPED."
    Do:$SYSTEM.Event.Defined(..ResourceName) $SYSTEM.Event.Clear(..ResourceName), $SYSTEM.Event.Delete(..ResourceName)
    Quit ##class(dc.ipcutils.Manager).UnSubscribe(##this, Context)
}

Method GenResourceName() As %String [ CodeMode = expression, Private ]
{
$Translate($SYSTEM.Encryption.Base64Encode($Job_$zcrc(..EventType_$ZDT($H,3,1),7)),"=")
}

}

Les listeners doivent s'inscrire auprès d'un "Manager". Passons maintenant à l'écriture de ce manager, qui permettra de gérer les inscriptions, les désinscriptions et les notifications :

Include ipcutils

Class dc.ipcutils.Manager
{

Parameter ACKTIMEOUT = 3;
ClassMethod Subscribe(
	Observer As dc.ipcutils.ListenerAbstract,
	Context As %DynamicObject = {{}}) As %Status
{
    Set $$$Subscribed(Observer.EventType, $Classname(Observer), Observer.ResourceName) = $ListBuild(Observer.EventType,Observer.ResourceName,$zdt($H,3,1),$Job,Context.%ToJSON())
    Quit $$$OK
}

ClassMethod UnSubscribe(
	Observer As %String,
	Context As %DynamicObject) As %Status
{
    Kill $$$Subscribed(Observer.EventType, $Classname(Observer), Observer.ResourceName)
    Quit $$$OK
}

ClassMethod Notify(
	Event As %String,
	Data As %DynamicObject) As %Status
{
    #def1arg ResourceName(%val)     $QSubscript(%val,3)
    #def1arg Context(%val)          {}.%FromJSON($ListGet(%val,5))
    
    #dim NotifyObject As %DynamicObject = ##class(dc.ipcutils.Manager).GenEventObject(Event)
    
    Set sc = $$$OK
    Set node = $Name($$$Subscribed(NotifyObject.EventType))
    For  {
        Set node = $Query(@node,1,value)
        Quit:node=""||(NotifyObject.EventType'=$QSubscript(node,1))
        
        Set resourceName = $$$ResourceName(node)
        Set NotifyObject.Context = $$$Context(value), NotifyObject.Data = Data

        ; check if the resource exists AND if process that create the resource still exists!        
        If '$System.Event.Defined(resourceName) || '$Data(^$JOB($ListGet(value, 4))) Do UnSubscribe Continue
        Do $SYSTEM.Event.Signal(resourceName, NotifyObject.%ToJSON())
        Do WaitAck

        Do NotifyObject.%Remove("Context"), NotifyObject.%Remove("Data")
    }
    Quit sc

UnSubscribe
    Do:$SYSTEM.Event.Defined(resourceName) $SYSTEM.Event.Clear(resourceName), $SYSTEM.Event.Delete(resourceName)
    Kill @node
    Quit
WaitAck
    Set start = $zh, match = $$$NO
    Do {
        Set syncResult = $SYSTEM.Event.WaitMsg("", ..#ACKTIMEOUT)
        Set syncStatus = $ListGet(syncResult, 1)
        If syncStatus < 1 Do UnSubscribe Quit
        Set msg = {}.%FromJSON($ListGet(syncResult, 2))
        Set match = msg.MessageID = NotifyObject.MessageID ; Ok this is the expected ack
        Quit:match
    } While (start + ..#ACKTIMEOUT > $zh)

    Do:'match UnSubscribe ; no ACK received -> force Unsubscribe this subscriber
    Quit
}

ClassMethod GenEventObject(Event As %String) As %DynamicObject
{
    Quit {
        "Event":(Event),
        "EventType":($Piece(Event,":",1)),
        "EventName":($Piece(Event,":",2)),
        "PIDSource":($JOB),
        "Timestamp":($ZDateTime($Horolog,3,1)),
        "MessageID":($Increment(^dc.ipcutils.msg)) 
    }
}

ClassMethod HashContext(Context As %DynamicObject) As %String [ CodeMode = expression, Internal, Private ]
{
$ZCRC($Select($IsObject(Context):Context.%ToJSON(),1:Context),7)
}

ClassMethod ShowSubscribed()
{
    Set node = $Name($$$Subscribed)
    For  {
        Set node = $Query(@node,1,value)
        Quit:node=""
        Write !," * Event: ", $QSubscript(node,1), "  ClassName: ", $QSubscript(node,2)
        Write !,"   Date time: ",$Lg(value,3)
        Write !,"   PID: ", $Lg(value,4)
        Write !,"   Context: ",$Lg(value,5)
        If $ListGet(value,6)=1 Write !,"   ResourceName: ",$QSubscript(node,3)
    }
    Quit
}

ClassMethod Kill() [ Internal ]
{
    Kill $$$Subscribed
}

}

Avec le "Manager" en place, nous pouvons désormais développer un premier listener concret :

/// Basic Listener for demo purpose<br/>
/// Open a terminal: <br/>
/// Set listener = ##class(dc.ipcutils.BasicListener).%New()<br/>
/// Do listener.Listen()<br/>
/// or in one line : Do ##class(dc.ipcutils.BasicListener).%New().Listen()<br/>
/// Type ctrl+c to stop.<br/>
/// Open anoter terminal:<br/>
/// Do ##class(dc.ipcutils.Manager).Notify("Demo:OnTestString","This is string notification")<br/>
/// Do ##class(dc.ipcutils.Manager).Notify("Demo","This is a demo notification")<br/>
/// Do ##class(dc.ipcutils.Manager).Notify("Demo:OnTestObject",{"object":"demo"})<br/>
Class dc.ipcutils.BasicListener Extends dc.ipcutils.ListenerAbstract
{

Parameter EVENTTYPE = "Demo";
Parameter VERBOSE = 1;
Method Listen() As %Status
{
    Set sc = $$$OK
    $$$QuitOnError(..OnStartListen())
    Try {
        Write:..Verbose !,$zdt($h,3,1)," + Type < ctrl+c > to stop listening."
        For  If ..Wait() = -1 $$$ThrowStatus($$$ERROR($$$GeneralError,$$$FormatText("Resource %1 deleted.",..ResourceName)))
    } Catch Ex {
        If Ex.Name '[ "<INTERRUPT>" Set sc = Ex.AsStatus()
        If ..Verbose, $$$ISERR(sc) Do $SYSTEM.Status.DisplayError(sc)
    }
    Quit $$$ADDSC(sc,..OnStopListen())
}

Method Update(
	Event As %DynamicObject,
	Data As %DynamicObject,
	Context As %DynamicObject) As %Status
{
    Set dt = $ZDateTime($Horolog, 3, 1)
    Write:..Verbose !,dt," + Update received!"
    Write !,dt, " = Event : ", !
    Do ##class(%JSON.Formatter).%New().Format(Event)
    Write !,dt, " = Context : ", !
    Do ##class(%JSON.Formatter).%New().Format(Context)
    Write !,dt, " = Data : ", !
    Do ##class(%JSON.Formatter).%New().Format(Data)
    Quit $$$OK
}

ClassMethod Test() As %Status
{
    Quit ..%New().Listen()
}

}

 

Tout est à présent prêt pour effectuer un test. Ouvrez un terminal (ou plusieurs) avec une instance de dc.observer.BasicListener :

IRISAPP>d ##class(dc.ipcutils.BasicListener).Test()

2023-10-17 19:20:34 + Listening Demo with resourcename MjAxMjIzMzE5NzkwOTg1 started.
2023-10-17 19:20:34 + Type < ctrl+c > to stop listening.

Et un autre pour notifier des évènements "Demo":

Do ##class(dc.ipcutils.Manager).Notify("Demo:OnTest",{"Message":"My FirstTest"}.%ToJSON())
Ce qui donnera comme résultat dans le premier terminal:
IRISAPP>d ##class(dc.ipcutils.BasicListener).Test()

2023-10-17 19:20:34 + Listening Demo with resourcename MjAxMjIzMzE5NzkwOTg1 started.
2023-10-17 19:20:34 + Type < ctrl+c > to stop listening.
2023-10-17 19:20:44 + Update received!
2023-10-17 19:20:44 = Event : 
{
  "Event":"Demo:OnTestObject",
  "EventType":"Demo",
  "EventName":"OnTestObject",
  "PIDSource":"20165",
  "Timestamp":"2023-10-17 19:20:44",
  "MessageID":2171
}
2023-10-17 19:20:44 = Context : 
{
}
2023-10-17 19:20:44 = Data : 
{
  "object":"demo-1"
}

Nous disposons désormais d'un exemple fonctionnel et relativement simple à mettre en place. Il vous suffit de créer une classe qui hérite de "dc.ipcutils.ListenerAbstract" et de l'adapter à vos besoins spécifiques. Bien que cela ne soit pas l'Observer Pattern, cette solution accomplit efficacement sa tâche.

À bientôt!

Discussion (2)2
Connectez-vous ou inscrivez-vous pour continuer

Super article, en plus avec une exclusivité en langue française :)

Ça me rappel des souvenir d'un projet ou nous avions construit une machine à état : https://developer.mozilla.org/fr/docs/Glossary/State_machine

A l'époque, les messages queues n'existaient pas et ton tuto non plus d’ailleurs ;) ça nous aurait bien aidé.

Pense le compléter avec un exemple appliqué aux websocket, j'ai l'impression que ça s'y prête bien.