Article
· 5 hr il y a 7m de lecture

La puissance de Python intégré dans l'interopérabilité - un cas d'utilisation de MongoDB CDC

L'utilisation de Embedded Python lors de la création d'une solution basée sur InterSystems peut ajouter des fonctionnalités très puissantes et approfondies à votre boîte à outils.

J'aimerais partager un exemple de cas d'utilisation que j'ai rencontré : l'activation d'un CDC (Change Data Capture) pour une collection mongoDB - la capture de ces modifications, leur digestion via un flux d'interopérabilité et, pour finir, la mise à jour d'un EMR via une API REST.

La fonctionnalité principale – « Observation des changements » 👀

L'idée de base est d'utiliser le package PyMongo, qui, entre autres, permet de suivre les modifications au sein d'une base de données mongo via son change_streams.

La partie importante se trouve dans l'adaptateur entrant que j'ai créé et qui possède une méthode OnInit() qui inclut cette ligne :

self.changeStream = client.get_database(...).get_collection(...).watch()

J'utilise la méthode watch() ici pour surveiller les changements d'une collection spécifique dans une base de données.

Je les ai définis via les paramètres du service métier :

Ensuite, dans la méthode OnTask(), j'utilise ceci pour capturer les modifications :

while self.changeStream.alive:
    change = self.changeStream.try_next()
    if change is not None:

Ici, j'utilise la propriété alive et la méthode try_next() pour obtenir les détails des modifications survenues.

Pour chaque modification, je crée un objet IRIS Stream avec le contenu JSON et je l'envoie à la méthode ProcessInput() du service métier.

Dans le service métier, je crée une requête importée à partir du contenu JSON et je l'envoie au routeur.

Le flux général peut être illustré comme suit :

Voici une démonstration vidéo :

https://www.youtube.com/embed/Z_6rq-bhB_s
[Ceci est un lien intégré, mais vous ne pouvez pas consulter le contenu intégré directement sur le site car vous avez refusé les cookies nécessaires pour y accéder. Pour afficher le contenu intégré, vous devez accepter tous les cookies dans vos Paramètres des cookies]

Quelques détails techniques supplémentaires 👩‍💻

Le service métier doit « masser » un peu le JSON car il inclut du JSON « flou », avant que je puisse autoriser son importation en tant que JSON valide.

Par exemple, un message peut ressemble à ceci:

 
Exemple de message JSON de capture de changement 

Vous pouvez voir par exemple que Timestamp et new UUID ne sont pas des JSON valides.

Vous pouvez voir que le « Document » actuel se trouve dans la partie fullDocument, et pour cet exemple j'ai utilisé un schéma spécifique.

Mon document ressemble à ceci :

 
Exemple de document JSON pour cette démo

Dans votre cas, vous pouvez soit modifier cela pour l'adapter à votre schéma, soit même envisager d'adapter mon exemple pour utiliser une approche plus « dynamique » où vous pouvez définir un paramètre qui serait utilisé pour importer dynamiquement le JSON vers un nom de classe variable (par rapport à mon nom codé en dur).

[Dans tous les cas, pour charger votre schéma JSON et créer une classe à partir de celui-ci, vous pouvez utiliser la classe Sample.OpenAPIClassGenerator que j'ai incluse dans mon exemple (adapté du générateur de classe de définition OpenAPI de @Guillaume Rongier (et appeler la méthode ProcessFile() que j'ai ajoutée sur votre fichier de schéma JSON).]

Il s’agit essentiellement de la partie principale de cette fonctionnalité : Adaptateur + Service métier qui envoie des messages avec les modifications apportées à une collection de bases de données Mongo.

Élargir le tableau 👩‍⚕️

À des fins de démonstration et pour créer un flux complet qui « raconte une histoire » (ce qui était le cas d'utilisation réel que j'avais), j'ai également ajouté une cible pour ce CDC, qui est une API REST d'un Dossier Patient Informatisé (DPI ou EMR pour Electronic Medical Record) fictif, qui prend les données des patients et les insère dans une table de patients.

Vous trouverez cette partie sous le package Demo.EMR (Data.Patient pour la table et Util.API pour l'API REST) ; et le package Demo.Int pour la partie Opérations métiers.

En fait, vous pouvez réutiliser cette partie pour tout autre besoin que vous pourriez rencontrer lorsque vous avez besoin d'une API fictive pour envoyer des données de patient à des fins de test en tant que destination cible.

[Notez que la table « Mock EMR » et l'API incluent des éléments de données que je n'ai pas utilisés dans cette démonstration, comme l'adresse e-mail et le numéro de téléphone]

Un exemple de flux complet (avec captures d'écran) 📷

Voici donc un exemple de déroulement de la manière dont cela fonctionne :

1. Ajoutez un document à votre collection mongo.

 
Commandes Shell (Docker exec pour se connecter au conteneur mongo et commandes mongosh pour lancer ReplicaSet et un document vers une collection)

2. Examinez votre table « EMR » et voyez les nouvelles données de Mongo insérées dans celle-ci

 
DBeaver visualisant les données dans la table Patient

3. Examinez les « coulisses » d’InterSystems IRIS en prenant soin de ce flux

 
Trace visuelle du flux de messages

Faites l'essai de votre côté🏎

L'application Open Exchange inclut tout le code associé et les instructions de configuration pour une démonstration complète basée sur le conteneur Docker comme indiqué ci-dessus (avec toutes les parties associées, y compris les conteneurs Mongo).

[Remarque : Mongo a une notion de ReplicaSet et c'est seulement avec cette fonctionnalité que change_streams fonctionne. Pour plus de simplicité, la démonstration ci-dessus suppose que mongo1 (il y a aussi mongo2 et mongo3) est le « principal »].

Encore une fois, ce n’est qu’un exemple de la manière dont Embedded Python peut vous permettre de relever vos défis d’interopérabilité très rapidement et facilement.

Discussion (0)1
Connectez-vous ou inscrivez-vous pour continuer