Kafka 接收消息 exactly once
– Start下面的方法演示将 offset 存储在数据库中,和消息处理放在同一事务中,真正实现 exactly once.创建表CREATE TABLE KAFKA_OFFSET(TOPICVARCHAR2(100),PARTITIONNUMBER(3, 0),OFFSETNUMBER(18, 0));IN...
·
– Start
点击此处观看本系列配套视频
下面的方法演示将 offset 存储在数据库中,和消息处理放在同一事务中,真正实现 exactly once.
创建表
CREATE TABLE KAFKA_OFFSET
(
TOPIC VARCHAR2(100),
PARTITION NUMBER(3, 0),
OFFSET NUMBER(18, 0)
);
INSERT INTO KAFKA_OFFSET VALUES ('topic0', 0, 0);
COMMIT;
CREATE TABLE KAFKA_MESSAGE
(
TOPIC VARCHAR2(100),
PARTITION NUMBER(3, 0),
OFFSET NUMBER(18, 0),
KEY VARCHAR2(100),
MESSAGE VARCHAR2(100)
);
接收消息代码
package shangbo.kafka.example6;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
public class App {
public static void main(String[] args) {
ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
Service service = context.getBean(Service.class);
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "ConsumerGroup3"); // 消费者组的标识
props.put("enable.auto.commit", "false"); // 收到消息后,手动提交 offset
// 创建 Consumer, 从 topic0 接收数据
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// 手动设置查询 offset
List<PartitionOffset> partitionOffsets = service.queryPartitionOffset("topic0");
consumer.assign(partitionOffsets.stream().map(p -> new TopicPartition("topic0", p.getPartition())).collect(Collectors.toList()));
for (PartitionOffset partitionOffset : partitionOffsets) {
TopicPartition tp = new TopicPartition("topic0", partitionOffset.getPartition());
consumer.seek(tp, partitionOffset.getOffset());
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
service.process(records);
}
}
}
package shangbo.kafka.example6;
import javax.sql.DataSource;
import org.apache.commons.dbcp.BasicDataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@Configuration
@EnableTransactionManagement // 开启事务管理
public class AppConfig {
@Bean(destroyMethod = "close")
public BasicDataSource dataSource() {
BasicDataSource dataSource = new BasicDataSource();
dataSource.setDriverClassName("oracle.jdbc.driver.OracleDriver");
dataSource.setUrl("jdbc:oracle:thin:@localhost:1521:xe");
dataSource.setUsername("hr");
dataSource.setPassword("123456");
return dataSource;
}
@Bean
public DataSourceTransactionManager txManager(DataSource dataSource) {
DataSourceTransactionManager txManager = new DataSourceTransactionManager();
txManager.setDataSource(dataSource);
return txManager;
}
@Bean
public Service service(DataSource dataSource) {
Service service = new ServiceImpl();
service.setDataSource(dataSource);
return service;
}
}
package shangbo.kafka.example6;
import java.util.List;
import javax.sql.DataSource;
import org.apache.kafka.clients.consumer.ConsumerRecords;
public interface Service {
List<PartitionOffset> queryPartitionOffset(String topic);
void process(ConsumerRecords<String, String> records);
void setDataSource(DataSource dataSource);
}
package shangbo.kafka.example6;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;
import javax.sql.DataSource;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.annotation.Transactional;
public class ServiceImpl implements Service {
private JdbcTemplate jdbcTemplate;
@Transactional(readOnly = true)
public List<PartitionOffset> queryPartitionOffset(String topic) {
String sql = "select * from kafka_offset where topic = ?";
return jdbcTemplate.query(sql, new Object[] { topic }, new BeanPropertyRowMapper<PartitionOffset>(PartitionOffset.class));
}
@Transactional
public void process(ConsumerRecords<String, String> records) {
Set<TopicPartition> topicPartitions = records.partitions();
for (TopicPartition tp : topicPartitions) {
final List<ConsumerRecord<String, String>> recs = records.records(tp);
// 保存消息
String insertKafkaMsgSql = "insert into KAFKA_MESSAGE values (?, ?, ?, ?, ?)";
jdbcTemplate.batchUpdate(insertKafkaMsgSql, new BatchPreparedStatementSetter() {
public void setValues(PreparedStatement ps, int i) throws SQLException {
ps.setString(1, recs.get(i).topic());
ps.setInt(2, recs.get(i).partition());
ps.setLong(3, recs.get(i).offset());
ps.setString(4, recs.get(i).key());
ps.setString(5, recs.get(i).value());
}
public int getBatchSize() {
return recs.size();
}
});
// 保存 offset
Long maxOffset = recs.stream().mapToLong(ConsumerRecord::offset).max().getAsLong();
maxOffset += 1;
String updateOffsetSql = "update kafka_offset set offset = ? where topic = ? and partition = ?";
jdbcTemplate.update(updateOffsetSql, maxOffset, tp.topic(), tp.partition());
}
}
//
// Setter
//
public void setDataSource(DataSource dataSource) {
jdbcTemplate = new JdbcTemplate(dataSource);
}
}
package shangbo.kafka.example6;
public class PartitionOffset {
private Integer partition;
private Long offset;
public Integer getPartition() {
return partition;
}
public void setPartition(Integer partition) {
this.partition = partition;
}
public Long getOffset() {
return offset;
}
public void setOffset(Long offset) {
this.offset = offset;
}
}
– 更多参见:Kafka 精萃
– 声 明:转载请注明出处
– Last Edited on 2018-06-13
– Written by ShangBo on 2018-06-13
– End
更多推荐
已为社区贡献6条内容
所有评论(0)