"Will code for travel"

Search technical info
HL7 server and message converter in pure python
Written: 2023-04-10 14:51:52 Last update: 2023-05-03 07:04:16
Many hospitals using HL7 protocol for data communication, including ordering patient, updating data, getting AI result, etc. There are some python libraries that can provide an easy way to create HL7 server (listener) and HL7 client (sender), due to strong hospital security requirement that mandate a regular security vulnerability scan to all systems to make sure there is no un-authorized party can steal or break the system, this security vulnerability scan sometimes found vulnerability on the third party libraries and product vendor must update their product to use the latest version of the libraries. HL7 network protocol and HL7 message format can be handled by pure python without large code, below is the pure python code for the server (listener) and when received an HL7 message then need to reply with ACK message, the message is just an example
import socket
from datetime import datetime
import re

template_pattern = r'{(\d+)-(\d+)}'

ACK_message_template = "MSH|^~\&|SENDING_APP|SENDING_FACILITY|{0-2}|{0-3}|||20230403111111||ACK|{0-9}|P|2.4||||||UNICODE UTF-8"
ACK_message_template += "\nMSA|AA|{0-9}"

SERVER_PORT = 5123

import re

def ai_hl7_converter(source, template):
    print('chatgpt_hl7_converter(source, template)')

    # Split the message into segments
    source_segments = re.split(r'[\r\n]+', source)

    # Replace placeholders in the template using a callback function
    def replace_placeholder(match):
        segment_index = int(match.group(1))
        field_index = int(match.group(2))
        return source_segments[segment_index].split('|')[field_index]

    new_message = re.sub(template_pattern, replace_placeholder, template)

    return new_message

def print_hl7_message(hl7_message):
    # convert from bytes to string and replace newline for printing
    msg = hl7_message.decode().replace("\r", "\n")

    print('HL7 message:')
    segments = msg.split("\n")
    # print all segments (lines)
    for i, segment in enumerate(segments):
        print(f"segments[{i}]: {segment}")

    # print first segment ONLY
    fields = segments[0].split('|')
    for i, f in enumerate(fields):
        print(f"fields[{i}]: {f}")

def handle_incoming_client(sock, addr):
    print(f'handle_incoming_client(socket, {addr})')

    # receive the message
    message = sock.recv(4096)

    # print the message
    print_hl7_message(message)

    # convert original message to ACK message
    response_ack_str = ai_hl7_converter(message.decode().replace("\r", "\n"), ACK_message_template)

    response_ack_bytes = response_ack_str.replace("\n", "\r").encode()

    # add HL7 message MLLP signature, startb == ['\x0b'], end == [b'\x1c\r']
    # see: http://www.hl7.org/documentcenter/public/wg/inm/mllp_transport_specification.PDF
    response_ack_bytes = b'\x0b' + response_ack_bytes + b'\x1c\r'
    print('ACK message response:')
    print(response_ack_bytes)

    # response with ACK
    sock.sendall(response_ack_bytes)

    # close the connection
    print(f'... closing client socket: {addr}')
    sock.close()


def start_hl7_server(ip, port):
    print(f'start_hl7_server({ip}, {port})')

    # create a socket object
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    try:
        # bind the socket to a specific address and port
        server_socket.bind((ip, port))

        # Enable a server to accept connections.
        # If backlog is specified, it must be at least 0 (if it is lower, it is set to 0);
        # it specifies the number of unaccepted connections that the system will allow before refusing new connections.
        # If not specified, a default reasonable value is chosen.
        server_socket.listen()

        # loop forever
        while True:
            # accept the connection
            print(f'Waiting for a connection in port:{port} ...')
            client_socket, client_address = server_socket.accept()

            print('Connected by', client_address)
            handle_incoming_client(client_socket, client_address)

    except Exception as e:
        print(f'Exception: {str(e)}')


def main():
    try:
        start_hl7_server('', SERVER_PORT)
        print('... finished')
    except Exception as e:
        print(f"Error occurred in main: {e}")

if __name__ == "__main__":
    main()
This is the client to send the message to the server
import socket

SERVER_HOST = 'localhost'
SERVER_PORT = 5123

hl7_message_str = "MSH|^~\&|Sender Application|Hospital Inc.|||20230403111111||ORU^R01|ORU20230328084905364|P|2.4||||||UNICODE UTF-8"
hl7_message_str += "\nPID|1||7303875797285206||anon"
hl7_message_str += "\nPV1|1|O"
hl7_message_str += "\nORC|RE||5217542190999113||SC"
hl7_message_str += "\nOBR|1||5217542190999113|MMG0001^Mammography|||20041228000000||||||||||||||||||P|||||||Lunit"
hl7_message_str += "\nOBX|1|NM|RAB0001^Abnormality Score||99.23||||||P|||20041228000000||Lunit"
hl7_message_str += "\nOBX|2|NM|RMG0005^density case||4|||CB|||P|||20041228000000||Lunit"
hl7_message_str += "\nNTE|1||Scattered Fibroglandular Densities"
hl7_message_str += "\nOBX|3|NM|RMG0001^cancer left craniocaudal||80.45||||||P|||20041228000000||Lunit"
hl7_message_str += "\nOBX|4|NM|RMG0002^cancer left mediolateral oblique||99.23||||||P|||20041228000000||Lunit"


with socket.create_connection((SERVER_HOST, SERVER_PORT)) as s:
    print(f'... successfully connected to server {SERVER_HOST}:{SERVER_PORT}')
    
    hl7_message_byte = hl7_message_str.replace('\n', '\r').encode('utf-8')

    # add MLLP signature
    hl7_message_byte = b'\x0b' + hl7_message_byte + b'\x1c\r'

    s.sendall(hl7_message_byte)
    
    print('... message sent, waiting to receive response')
    response = s.recv(1024)
    
    print('... received response:')
    print(response.decode('utf-8').replace("\r", "\n"))
There are so many kinds of HL7 messages for different purposes, some hospitals even use a custom message format, it is common for vendors or integrators to create an HL7 message converter or commonly known as "HL7 broker" application to convert a message A to different message B format. It is fairly easy to create HL7 broker as defined in the function ai_hl7_converter(source, template) inside the code above, we can create a simple template text file and store it outside the application so that we can change the template file without changing the application (python code).
Search more info