Kafka Destination

The Kafka destination references the dito kafka-destination from the file ${TIF_ROOT}/etc/destinations.xml file using the id attribute.

See Configure Destinations for more details.

General Usage

In your job configuration, you use the element <Kafka> to send data to a Kafka topic.

You can define the topic to transfer the data to using the topic attribute as shown below. If you omit this attribute, the Kafka handler will try to resolve the topic from the destination with the given id.

<Job>
    <TransferData>
        <Payload>...</Payload>
        <Destinations>
            <Kafka id="kafka-1" (1)
                   topic="TIF-JOB-TEST" /> (2)
        </Destinations>
    </TransferData>
</Job>
1 Reference the destination via the id attribute
2 Here you define the topic to use. Note that you could define a topic in the core destination definition, and omit this one, or vice versa. The topic defined here takes precedence.

Configure the Record Key

You can via the attribute keyMacro specify a macro that will resolve what key the produced Kafka record will use. Note that if you omit the keyMacro, then no key will be associated with the record.

Example below:

<Job>
    <TransferData>
        <Payload>...</Payload>
        <Destinations>
            <Kafka id="kafka-1" topic="TIF-JOB-TEST" keyMacro="${job.source.name}" />
        </Destinations>
    </TransferData>
</Job>

This will use the name from the source object within ENOVIA, which the Job was created for.

Please see here for details around using macros.

Asynchronous Replies

If you want to wait to complete the job after you have received a reply from Kafka, using a reply handler, you need to specify that the Kafka destination will produce a reply later.

This is controller via the asyncReply attribute.

<Kafka id="kafka-1" asyncReply="true"  />

Additional Headers

You can add headers also with the Kafka record that is being created.

First of all, any header specified within the used destination of ${TIF_HOME}/etc/destinations.xml will be added. Additionally, you can specify extra headers per job configuration like shown below.

<Job>
    <TransferData>
        <Payload>...</Payload>
        <Destinations>
            <Kafka id="kafka-1" topic="TIF-JOB-TEST" keyMacro="${job.source.name}">
                <Header name="replyTo" value="TIF-REPLY-TEST" />
                <Header name="tifInstance" value="${tif.instance.id}" />
                <Header name="jobId" value="${job.id}" />
                <Header name="destinationId" value="${destination.id}" />
                <Header name="sourceId" value="${job.source.id}" />
            </Kafka>
        </Destinations>
    </TransferData>
</Job>

As illustrated, the headers also supports macros to allow passing in dynamic values.

Another example shown below:

<Job>
    <TransferData>
        ...
        <Destinations>
            <Kafka>
                <!-- Static parameter -->
                <Header name="test1" value="bar"/>

                <!-- Dynamic parameter, value taken from RPE (ENOVIA Runtime Program Env) -->
                <Header name="test2" value="${job.rpe.TYPE}"/> (1)

                <!-- Dynamic parameter, value taken from the additional arguments -->
                <!-- passed via the trigger program object in ENOVIA for this job -->
                <Header name="test3" value="${paramName}"/>

                <!-- Define a custom header provider -->
                <!-- Such class must implement the interface: -->
                <!-- com.technia.tif.enovia.job.destination.HeaderProvider -->
                <HeaderProvider>com.acme.foo.MyHeaderProvider</HeaderProvider>
            </Kafka>
        </Destinations>
        ...
    </TransferData>
</Job>
1 The macros are described here

Serializer

Per default, the payload (the value of the Kafka record) is serialized using a ByteArraySerializer. The key in turn, if present, is serialized using the StringSerializer.