Article
· Jan 7 8m de lecture

Envoi de messages Kafka via JAVA PEX pour le traitement des pronostics d'examen de quarantaine.

Introduction

Cet article vise à explorer le fonctionnement du système FHIR-PEX et a été développé, en tirant parti des capacités d'InterSystems IRIS.

En rationalisant l'identification et le traitement des examens médicaux dans les centres de diagnostic clinique, notre système vise à améliorer l'efficacité et la précision des flux de travail de soins de santé. En intégrant les normes FHIR à la base de données InterSystems IRIS Java-PEX, le système aide les professionnels de santé avec des capacités de validation et de routage, contribuant ainsi à améliorer la prise de décision et les soins aux patients.

how it works

  • Interopérabilité IRIS :
    Reçoit les messages au standard FHIR, garantissant l'intégration et la compatibilité avec les données de santé.

  • Traitement de l'information avec 'PEX Java' :
    Traite les messages au format FHIR et les dirige vers des sujets Kafka en fonction de règles configurées globalement dans la base de données, facilitant ainsi le traitement et le routage efficaces des données, en particulier pour les examens dirigés vers la quarantaine.

  • Gestion des retours Kafka via un backend Java externe :
    Interprète uniquement les examens dirigés vers la quarantaine, permettant au système de gérer les retours de Kafka via un backend Java externe. Il facilite la génération d'informations pronostiques pour les professionnels de la santé grâce à l'IA générative, en s'appuyant sur les consultations des résultats d'examens précédents pour les patients respectifs.

Development

Grâce au PEX (Production EXtension) d'InterSystems, un outil d'extensibilité permettant d'améliorer et de personnaliser le comportement du système, nous avons élaboré une Opération Métier. Ce composant est chargé de traiter les messages entrants au format FHIR au sein du système. Comme exemple suivant :

import com.intersystems.enslib.pex.*;
import com.intersystems.jdbc.IRISObject;
import com.intersystems.jdbc.IRIS;
import com.intersystems.jdbc.IRISList;
import com.intersystems.gateway.GatewayContext;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.*;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;

public class KafkaOperation extends BusinessOperation {
// Connection to InterSystems IRIS
private IRIS iris;

// Connection to Kafka
private Producer<Long, String> producer;

// Kafka server address (comma separated if several)
public String SERVERS;

// Name of our Producer
public String CLIENTID;

/// Path to Config File
public String CONFIG;

public void OnInit() throws Exception {
[...]
}

public void OnTearDown() throws Exception {
[...]
}

public Object OnMessage(Object request) throws Exception {
    IRISObject req = (IRISObject) request;
    LOGINFO("Received object: " + req.invokeString("%ClassName", 1));

    // Create record
    String value = req.getString("Text");
    String topic = getTopicPush(req);
    final ProducerRecord<Long, String> record = new ProducerRecord<>(topic, value);

    // Send new record
    RecordMetadata metadata = producer.send(record).get();

    // Return record info
    IRISObject response = (IRISObject)(iris.classMethodObject("Ens.StringContainer","%New",topic+"|"+metadata.offset()));
    return response;
}

private Producer<Long, String> createProducer() throws IOException {
[...]
}

private String getTopicPush(IRISObject req) {
[...]
}

[...]
}

`

Au sein de l'application, la méthode getTopicPush se charge d'identifier le sujet auquel le message sera envoyé.

La détermination du sujet auquel le message sera envoyé dépend de l'existence d'une règle dans la globale "quarantineRule", telle que lue dans IRIS.

String code = FHIRcoding.path("code").asText();
String system = FHIRcoding.path("system").asText();

IRISList quarantineRule = iris.getIRISList("quarantineRule",code,system);

 String reference = quarantineRule.getString(1);
 String value = quarantineRule.getString(2);

 String observationValue = fhir.path("valueQuantity").path("value").asText()

When the global ^quarantineRule exists, validation of the FHIR object can be validated.

private boolean quarantineValueQuantity(String reference, String value, String observationValue) {
    LOGINFO("quarantine rule reference/value: " + reference + "/" + value);
    double numericValue = Double.parseDouble(value);
    double numericObservationValue = Double.parseDouble(observationValue);

    if ("<".equals(reference)) {
        return numericObservationValue < numericValue;
    }
    else if (">".equals(reference)) {
        return numericObservationValue > numericValue;
    }
    else if ("<=".equals(reference)) {
        return numericObservationValue <= numericValue;
    }
    else if (">=".equals(reference)) {
        return numericObservationValue >= numericValue;
    }

    return false;
}

Exemple pratique :

Lors de la définition d'une globale, telle que :

Set ^quarantineRule("59462-2","http://loinc.org") = $LB(">","500") 

Ceci établit une règle pour le code "59462-2" et le système ""http://loinc.org"" dans la globale ^quarantineRule , spécifiant une condition dans laquelle la valeur supérieure à 500 est définie comme quarantaine. Dans l'application, la méthode getTopicPush peut ensuite utiliser cette règle pour déterminer le sujet approprié pour envoyer le message en fonction du résultat de la validation.

Compte tenu de l'affectation, le JSON ci-dessous serait envoyé en quarantaine car il correspond à la condition spécifiée en ayant :

 {
          "system": "http://loinc.org",
          "code": "59462-2",
          "display": "Testosterone"
}

"valueQuantity": { "value": 550, "unit": "ng/dL", "system": "http://unitsofmeasure.org", "code": "ng/dL" }

FHIR Observation:

{
    "resourceType": "Observation",
    "id": "3a8c7d54-1a2b-4c8f-b54a-3d2a7efc98c9",
    "status": "final",
    "category": [
      {
        "coding": [
          {
            "system": "http://terminology.hl7.org/CodeSystem/observation-category",
            "code": "laboratory",
            "display": "laboratory"
          }
        ]
      }
    ],
    "code": {
      "coding": [
        {
          "system": "http://loinc.org",
          "code": "59462-2",
          "display": "Testosterone"
        }
      ],
      "text": "Testosterone"
    },
    "subject": {
      "reference": "urn:uuid:274f5452-2a39-44c4-a7cb-f36de467762e"
    },
    "encounter": {
      "reference": "urn:uuid:100b4a8f-5c14-4192-a78f-7276abdc4bc3"
    },
    "effectiveDateTime": "2022-05-15T08:45:00+00:00",
    "issued": "2022-05-15T08:45:00.123+00:00",
    "valueQuantity": {
      "value": 550,
      "unit": "ng/dL",
      "system": "http://unitsofmeasure.org",
      "code": "ng/dL"
    }
}

L'application Java Quarkus

Après envoi sur le sujet souhaité, une application Quarkus Java a été construite pour recevoir les examens en quarantaine.
@ApplicationScoped
public class QuarentineObservationEventListener {

@Inject
PatientService patientService;

@Inject
EventBus eventBus;

@Transactional
@Incoming("observation_quarantine")
public CompletionStage<Void> onIncomingMessage(Message<QuarentineObservation> quarentineObservationMessage) {
    var quarentineObservation = quarentineObservationMessage.getPayload();
    var patientId = quarentineObservation.getSubject()
            .getReference();
    var patient = patientService.addObservation(patientId, quarentineObservation);
    publishSockJsEvent(patient.getId(), quarentineObservation.getCode()
            .getText());
    return quarentineObservationMessage.ack();
}

private void publishSockJsEvent(Long patientId, String text) {
    eventBus.publish("monitor", MonitorEventDto.builder()
            .id(patientId)
            .message(" is on quarentine list by observation ." + text)
            .build());
}
 }

Ce segment du système est chargé de conserver les informations reçues de Kafka, de les stocker dans les observations du patient dans la base de données et de notifier l'événement au moniteur.

The monitor

Enfin, le moniteur du système est chargé de fournir une visualisation frontale simple. Cela permet aux professionnels de la santé d’examiner les données des patients/examens et de prendre les mesures nécessaires.

Implementation of langchainPT

Grâce au moniteur, le système permet aux professionnels de la santé de demander des recommandations à l'IA générative.

@Unremovable
@Slf4j
@ApplicationScoped
public class PatientRepository {
    @Tool("Get anamnesis information for a given patient id")
    public Patient getAnamenisis(Long patientId) {
        log.info("getAnamenisis called with id " + patientId);
        Patient patient = Patient.findById(patientId);
        return patient;
    }

    @Tool("Get the last clinical results for a given patient id")
    public List<Observation> getObservations(Long patientId) {
        log.info("getObservations called with id " + patientId);
        Patient patient = Patient.findById(patientId);
        return patient.getObservationList();
    }

}

suivre la mise en œuvre de Langchain4j

@RegisterAiService(chatMemoryProviderSupplier = RegisterAiService.BeanChatMemoryProviderSupplier.class, tools = {PatientRepository.class})
public interface PatientAI {

    @SystemMessage("""
            You are a health care assistant AI. You have to recommend exams for patients based on history information.
            """)
    @UserMessage("""
             Your task is to recommend clinical exams for the patient id {patientId}.

             To complete this task, perform the following actions:
             1 - Retrieve anamnesis information for patient id {patientId}.
             2 - Retrieve the last clinical results for patient id {patientId}, using the property 'name' as the name of exam and 'value' as the value.
             3 - Analyse results against well known conditions of health care.

             Answer with a **single** JSON document containing:
             - the patient id in the 'patientId' key
             - the patient weight in the 'weight' key
             - the exam recommendation list in the 'recommendations' key, with properties exam, reason and condition.
             - the 'explanation' key containing an explanation of your answer, especially about well known diseases.

            Your response must be just the raw JSON document, without ```json, ``` or anything else.
             """)
    String recommendExams(Long patientId);
}

Le système peut ainsi aider les professionnels de santé à prendre des décisions et à mener des actions.

Video demo

VIDEO

Authors

NOTE:

L'application https://openexchange.intersystems.com/package/fhir-pex participe actuellement au concours InterSystems Java 2023. N'hésitez pas à explorer davantage la solution et n'hésitez pas à nous contacter si vous avez des questions ou avez besoin d'informations supplémentaires. Nous vous recommandons d'exécuter l'application dans votre environnement local pour une expérience pratique.
Merci pour l'opportunité 😀!

Discussion (0)1
Connectez-vous ou inscrivez-vous pour continuer