首先 jstorm得概念请参考官网:点击打开链接,官网实例:点击打开链接

运行Jstorm可分为本地调试和分布式环境

    1.先说分布式环境,首先搭建zookeeper集群点击打开链接

    2.搭建kafka集群点击打开链接

    3.搭建jstorm集群点击打开链接

    4.开始贴代码

pom.xml   

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.li</groupId>
    <artifactId>demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>com.alibaba.jstorm</groupId>
            <artifactId>jstorm-core</artifactId>
            <version>2.1.1</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-nop</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-jdk14</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>jcl-over-slf4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.9.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.9.0.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.7.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

自定义Bolt入口

 

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import com.esotericsoftware.minlog.Log;

import java.io.Serializable;
import java.util.Map;

/**
 * Title:
 * <p>
 * Description:TODO
 * <p>
 * Copyright:Copyright(c)2005
 * <p>
 * Company:
 * <p>
 * Author:lsj
 * <p>
 * Date:2018/4/12 10:10
 */
public class MyBolt implements IRichBolt,Serializable {

    private static final long serialVersionUID = 1L;

    OutputCollector collector;

    public void execute(Tuple input) {
        try {
            String string = input.getString(0);
            System.out.println(string+"................................");
            collector.ack(input);
        } catch (Exception e) {
            collector.fail(input);
            Log.error("解析数据异常", e);
            e.printStackTrace();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields());
    }

    @SuppressWarnings("rawtypes")
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void cleanup() {

    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

自定义Topology入口

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;

import java.util.ArrayList;
import java.util.List;

/**
 * Title:
 * <p>
 * Description:TODO
 * <p>
 * Copyright:Copyright(c)2005
 * <p>
 * Company:lj
 * <p>
 * Author:lsj
 * <p>
 * Date:2018/4/12 10:00
 */
public class MyTopology {

    public static void main(String[] args) throws InterruptedException {

        String brokerZkStr = "119.23.20.*:2181,120.77.200.*:2181,39.108.5.*:2181";
        String brokerZkPath = "/brokers";
        //消费kafka得top
        String topic = "testTopic";
        String offset = "";
        //id 可以随意命名
        String id = "testTopic";
        Integer workerNumSpout = 3;
        Integer workerNumBolt = 3;
        Integer maxSpoutPending = 2000;

        if(args.length > 1){
            topic = args[1];
        }
        if(args.length > 2){
            workerNumSpout = Integer.parseInt(args[2]);
            workerNumBolt = Integer.parseInt(args[3]);
        }
        if(args.length > 4){
            maxSpoutPending = Integer.parseInt(args[4]);
        }
        ZkHosts zk = new ZkHosts(brokerZkStr,brokerZkPath);
        SpoutConfig spoutConf = new SpoutConfig(zk, topic,
                offset,
                id);

        List<String> zkServices = new ArrayList<String>();

        for(String str : zk.brokerZkStr.split(",")){
            zkServices.add(str.split(":")[0]);
        }

        spoutConf.zkServers = zkServices;
        spoutConf.zkPort = 2181;
        spoutConf.forceFromStart = false;
        spoutConf.socketTimeoutMs = 60 * 1000;
        spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());

        TopologyBuilder builder = new TopologyBuilder();
        // Kafka我们创建了一个5分区的Topic,这里并行度设置为5
        builder.setSpout("data", new KafkaSpout(spoutConf), workerNumSpout);
        builder.setBolt("analyze", new MyBolt(), workerNumBolt) .shuffleGrouping("data");

        Config config = new Config();
        config.setDebug(false);
        config.setNumWorkers(workerNumSpout);
        config.setMaxSpoutPending(1);
        config.setNumAckers(0);
        config.setDebug(false);
        if(maxSpoutPending > 0){
            config.setMaxSpoutPending(maxSpoutPending);
        }
        System.out.println(" topic = " + topic + " workerNumSpout = " + workerNumSpout +
                " workerNumBolt = " + workerNumBolt + " maxSpoutPending = " + maxSpoutPending);

        if(args.length>0){
            try {
                // args有参数时在分布式上提交任务
                StormSubmitter.submitTopology(args[0], config, builder.createTopology());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }else{
            // args没有参数时在本地提交任务
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("ImsiTopology", config, builder.createTopology());
        }
    }
}

5.打成jar 上传到分布式环境运行

# jstorm jar XXXX.jar com.li.test.MyTopology myTopic  

xxxx.jar 为打包后的jar
com.alibaba.xxxx.xx 为入口类,即提交任务的类
parameter即为提交参数

 6.本地环境调试

    

 注释掉上图配置运行即可。

 最后奉上源码地址点击打开链接

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐