本地测试storm读取kafka数据然后打印。
搭建单机版kafka 创建topic :test
1:maven依赖
4.0.0 com.hbhr storm 1.0-SNAPSHOT junit junit 4.12 test org.apache.kafka kafka_2.10 0.8.2.0 org.slf4j slf4j-log4j12 org.apache.storm storm-kafka 1.1.0 org.apache.storm storm-core 1.1.0 org.slf4j log4j-over-slf4j org.slf4j slf4j-api org.elasticsearch elasticsearch 5.1.2 org.elasticsearch.client transport 5.1.2 log4j log4j 1.2.17 maven-assembly-plugin jar-with-dependencies cn.ljh.storm.helloworld.ExclamationTopology
2:编写topology ,简单编写
public class myTopology { public static void main(String[] args) { //本地单机版zookeeper ZkHosts zkHosts = new ZkHosts("192.168.3.141:2181"); //test:kafka的topic,/aa 应该是在zookeeper的目录 ,uuid唯一标识 SpoutConfig spoutConfig = new SpoutConfig(zkHosts, "test", "/aa",UUID.randomUUID().toString()); TopologyBuilder topologyBuilder = new TopologyBuilder(); //之前工程启动报错,work-die 日志查看为类型转换错误 //spoutConfig.scheme =new SchemeAsMultiScheme(new StringScheme());添加这段代码应该可 //以是bolt在转换kafka数据的时候直接tuple.getString(0)获取不会出现work-die错误 spoutConfig.scheme =new SchemeAsMultiScheme(new StringScheme()); topologyBuilder.setSpout("kafkaSpout",new KafkaSpout(spoutConfig),1); topologyBuilder.setBolt("kafkaBolt",new myBolt(),1).shuffleGrouping("kafkaSpout"); Config config = new Config(); config.setDebug(true); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("myStorm",config,topologyBuilder.createTopology()); }}
3:创建bolt :接收数据打印出来。
public class myBolt implements IRichBolt { OutputCollector collector; public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } public void execute(Tuple tuple) { System.err.println(tuple.getValue(0)); } public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } public MapgetComponentConfiguration() { return null; }}