# coherence-kafka
**Repository Path**: mirrors_oracle/coherence-kafka
## Basic Information
- **Project Name**: coherence-kafka
- **Description**: Coherence Kafka Integration
- **Primary Language**: Unknown
- **License**: UPL-1.0
- **Default Branch**: main
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2021-10-28
- **Last Updated**: 2025-10-26
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# Coherence integration with Apache Kafka
This project provides component libraries that support moving data from a Coherence map to a Kafka topic and vice versa.
Supported on Coherence CE 21.06 and later.
## Getting Started
The Coherence Kafka integration project publishes a set of jar dependencies that you use in your application.
There is nothing to install, you use the Maven modules just as you would any other Java dependencies.
## The following components are provided
- **Kafka Entry Store** - a CacheStore type of extension that copies data from a Coherence map to a Kafka topic upon modification. This is contained in the `coherence-kafka-core` project.
- **Kafka Sink Connector** - an Apache Kafka Connect plugin of type "sink", which copies data posted to a Kafka topic to a Coherence map. This is contained in the `coherence-kafka-sink` project
Serializers and deserializers can be configured to handle data to and from Kafka, or an optimized pass-through mode will handle serialized entries which are already in a Coherence binary form.
## Kafka Entry Store
When configuring a cachestore-scheme as follows, Coherence will send entries to the specified Kafka broker:
Click to see config
```xml
  
    
      foo
      partitioned-rwbm-kafka
    
  
  
    
      partitioned-rwbm-kafka
      
        true
        
          
            
            
          
          
            
              
                
                {cache-name}
                
                localhost:9092
                org.apache.kafka.common.serialization.StringSerializer
                org.apache.kafka.common.serialization.StringSerializer
                5000
              
            
          
        
      
      true
    
  
```
 
The `kafka:producer` portion takes properties that are intended for a Kafka producer and uses them (the dash is turned into a dot, for example `bootstrap-servers` to `bootstrap.servers`)
### Supported operations are:
- `load()` and `loadAll()`: not supported; see Kafka Sink Connector for this
- `store()`: Corresponds to a `put()` operation on a `NamedMap`, and performs a `producer.send()` on the specified Kafka topic
- `storeAll()`: Corresponds to a `putAll()` operation on a `NamedMap`, where each element in the map will be sent to the specified Kafka topic
- `erase()`: Corresponds to a `remove()` operation on a `NamedMap`; when a topic is configured with a `compact` policy, all records with the same key will be deleted (aka `tombstone`)
- `eraseAll()`: Corresponds to a `removeAll()` operation on a `NamedMap`, and is equivalent to calling `erase()` on each element of the passed map
For more details see the `coherence-kafka-core` project.
## Kafka Sink Connector
The sink connector is built as a zip file intended for being deployed in a Connect process' environment.
### Supported operations are:
- Coherence Entry creation/update: records published on a topic are forwarded to a Coherence NamedMap and corresponding entries are created
- Coherence Entry deletion: when Kafka records have a null value, the corresponding entry in Coherence is removed if it existed
**Note:** Kafka records with a null key cannot be represented in Coherence and are discarded with a logged message such as:
```text
connect> [2021-10-20 19:21:35,680] ERROR The key is null for record: SinkRecord{kafkaOffset=1, timestampType=CreateTime} ConnectRecord{topic='MyTopic', kafkaPartition=0, key=null, keySchema=Schema{STRING}, value=User(NoKey, 20), valueSchema=Schema{STRUCT}, timestamp=1634757687699, headers=ConnectHeaders(headers=)} (com.oracle.coherence.kafka.connect.sink.CoherenceSinkTask)
```
Configuration is done using a curl command such as the following (Confluent Kafka)
Click to see command
```
curl -X POST -H "Content-Type: application/json" \
    http://localhost:8083/connectors \
    -d '{"name":"coh-sink",
             "config":
                 {
                     "connector.class":"com.oracle.coherence.kafka.connect.CoherenceSinkConnector",
                     "topics":"MyTopic",
                     "coherence.cache.mappings":"MyTopic->MyCache"
                 }
         }'
```
Click to expand
```properties
name=coh-sink
topics=MyTopic
tasks.max=1
connector.class=com.oracle.coherence.kafka.connect.CoherenceSinkConnector
coherence.cache.mappings=MyTopic->MyCache
key.converter=com.oracle.coherence.kafka.connect.util.CustomConverter
value.converter=com.oracle.coherence.kafka.connect.util.CustomConverter
value.converter.serializer=org.apache.kafka.common.serialization.StringSerializer
value.converter.deserializer=org.apache.kafka.common.serialization.StringDeserializer
key.converter.serializer=org.apache.kafka.common.serialization.StringSerializer
key.converter.deserializer=org.apache.kafka.common.serialization.StringDeserializer
```