博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
storm 读取kafka数据 单机测试
阅读量:6707 次
发布时间:2019-06-25

本文共 3832 字,大约阅读时间需要 12 分钟。

hot3.png

本地测试storm读取kafka数据然后打印。

搭建单机版kafka 创建topic :test

1:maven依赖

4.0.0
com.hbhr
storm
1.0-SNAPSHOT
junit
junit
4.12
test
org.apache.kafka
kafka_2.10
0.8.2.0
org.slf4j
slf4j-log4j12
org.apache.storm
storm-kafka
1.1.0
org.apache.storm
storm-core
1.1.0
org.slf4j
log4j-over-slf4j
org.slf4j
slf4j-api
org.elasticsearch
elasticsearch
5.1.2
org.elasticsearch.client
transport
5.1.2
log4j
log4j
1.2.17
maven-assembly-plugin
jar-with-dependencies
cn.ljh.storm.helloworld.ExclamationTopology

2:编写topology ,简单编写

public class myTopology {    public static void main(String[] args) {        //本地单机版zookeeper        ZkHosts zkHosts = new ZkHosts("192.168.3.141:2181");        //test:kafka的topic,/aa 应该是在zookeeper的目录 ,uuid唯一标识        SpoutConfig spoutConfig = new SpoutConfig(zkHosts, "test",                                     "/aa",UUID.randomUUID().toString());        TopologyBuilder topologyBuilder = new TopologyBuilder();        //之前工程启动报错,work-die 日志查看为类型转换错误        //spoutConfig.scheme =new SchemeAsMultiScheme(new StringScheme());添加这段代码应该可             //以是bolt在转换kafka数据的时候直接tuple.getString(0)获取不会出现work-die错误        spoutConfig.scheme =new SchemeAsMultiScheme(new StringScheme());        topologyBuilder.setSpout("kafkaSpout",new KafkaSpout(spoutConfig),1);        topologyBuilder.setBolt("kafkaBolt",new myBolt(),1).shuffleGrouping("kafkaSpout");        Config config = new Config();        config.setDebug(true);        LocalCluster localCluster = new LocalCluster();        localCluster.submitTopology("myStorm",config,topologyBuilder.createTopology());    }}

3:创建bolt  :接收数据打印出来。

public class myBolt implements IRichBolt {    OutputCollector collector;    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {        this.collector = outputCollector;    }    public void execute(Tuple tuple) {        System.err.println(tuple.getValue(0));    }    public void cleanup() {    }    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {    }    public Map
getComponentConfiguration() { return null; }}

 

 

转载于:https://my.oschina.net/renzhimin/blog/1855392

你可能感兴趣的文章
bzoj千题计划300:bzoj4823: [Cqoi2017]老C的方块
查看>>
检测项目
查看>>
【转】MMO即时战斗:地图角色同步管理和防作弊实现
查看>>
暑假初学知识笔记
查看>>
kindeditor.net应用
查看>>
Autofac的实例
查看>>
STL之vector
查看>>
【模板】Hash拉链法
查看>>
SQL系列(三)—— 注释(comment)
查看>>
C语言中 fputs() fgets() 的使用方法
查看>>
Bzoj3697 采药人的路径
查看>>
一步一步学Remoting之二:激活模式
查看>>
Python集合
查看>>
Deep learning:四十二(Denoise Autoencoder简单理解)
查看>>
Copley-STM32串口+CANopen实现双电机力矩同步
查看>>
关于localstorage的本地缓存以及封装
查看>>
在找一份相对完整的Webpack项目配置指南么?这里有
查看>>
微信小程序 - 入门指引
查看>>
hdu 1002
查看>>
动态规划总结
查看>>