influxdb测量平台,编写flume InfluxSink 订阅kakfa相关数据到influxdb
influxdb 生态圈 自带采集数据工具 telegraf,telegraf集成了很多功能,包括订阅kafka数据,但是由于telegraf 订阅 kafka 性能太低,所以自定义开发了 flume sink插件,使得 订阅数据性能获得了极大提高。InfluxSink样例代码如下,以飨读者。package org.apache.flume.sink.influx;i...
·
influxdb 生态圈 自带采集数据工具 telegraf,telegraf集成了很多功能,包括订阅kafka数据,但是由于telegraf 订阅 kafka 性能太低,所以自定义开发了 flume sink插件,使得 订阅数据性能获得了极大提高。
InfluxSink
样例代码如下,以飨读者。
package org.apache.flume.sink.influx;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.log4j.Logger;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
public class InfluxSink extends AbstractSink implements Configurable {
private static final Logger LOG = Logger.getLogger(InfluxSink.class);
private String url;
private int batchSize;
private String database;
private String username;
private String password;
private String influxsource;
private InfluxDB influxDB;
private String truncateTimestamp;
private SinkCounter sinkCounter;
public void configure(Context context) {
String host = context.getString("host", "localhost");
String port = context.getString("port", "8186");
String database = context.getString("database", "flumetest");
int batchSize = context.getInteger("batchSize", 1000);
String username = context.getString("username","root");
String password = context.getString("password","root");
String influxsource = context.getString("influxsource","body");
String url = "http://"+host+":"+port;
this.url = url;
this.batchSize = batchSize;
this.database = database;
this.username = username;
this.password = password;
this.influxsource = influxsource;
this.truncateTimestamp = truncateTimestamp;
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
}
@Override
public void start() {
LOG.info("Starting Influx Sink {} ...Connecting to "+url);
try {
InfluxDB influxDB = InfluxDBFactory.connect(url,username,password);
this.influxDB = influxDB;
sinkCounter.incrementConnectionCreatedCount();
}
catch ( Exception e ){
e.printStackTrace();
LOG.error(e.getMessage());
sinkCounter.incrementConnectionFailedCount();
}
sinkCounter.start();
}
@Override
public void stop () {
LOG.info("Stopping Influx Sink {} ...");
sinkCounter.incrementConnectionClosedCount();
sinkCounter.stop();
}
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
StringBuilder batch = new StringBuilder();
Event event = null;
int count = 0;
sinkCounter.incrementEventDrainAttemptCount();
for (count = 0; count <= batchSize; ++count) {
event = ch.take();
if (event == null) {
break;
}
String InfluxEvent = ExtractInfluxEvent(event, influxsource);
if ( batch.length() > 0) {
batch.append("\n");
}
batch.append(InfluxEvent);
sinkCounter.incrementConnectionCreatedCount();
}
LOG.info("process() count:" + count);
if (count <= 0) {
sinkCounter.incrementBatchEmptyCount();
sinkCounter.incrementEventDrainSuccessCount();
status = Status.BACKOFF;
}
else {
try {
influxDB.write(database, "autogen", InfluxDB.ConsistencyLevel.ONE, batch.toString());
status = Status.READY;
if ( count < batchSize ) {
sinkCounter.incrementBatchUnderflowCount();
}
sinkCounter.incrementBatchCompleteCount();
}
catch ( Exception e) {
e.printStackTrace();
LOG.info(e.getMessage());
//txn.rollback();
status = Status.BACKOFF;
sinkCounter.incrementConnectionFailedCount();
}
}
txn.commit();
if(event == null) {
status = Status.BACKOFF;
}
return status;
}
catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
LOG.info(t.getMessage());
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
}
finally {
txn.close();
}
return status;
}
private String ExtractInfluxEvent(Event event, String influx_source) {
if ( influx_source.equals("body")) {
String body = new String(event.getBody());
//Fix for data coming from windows
// body = body.replaceAll("\\r","");
return body.toString();
}
else { LOG.error("Just body is supported for the moment");
return null;
}
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)