博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka环境安装及简单使用(单机版)
阅读量:6069 次
发布时间:2019-06-20

本文共 6652 字,大约阅读时间需要 22 分钟。

一个分布式发布-订阅消息传递系统

特点:

    高吞吐量、低延迟

使用场景(举例):

    日志收集:用kafka收集各种服务产生的log,通过kafka以统一的接口服务的方式开放给各种consumer,如hadoop,hbase等

 

下载安装:

    1.   选择一个版本的kafka进行下载

    2.解压

tar -zxvf kafka_2.11-0.9.0.1.tgzmv kafka_2.11-0.9.0.1 /opt/

    3.配置环境变量(可选步骤)

 

上手使用:

    1.config目录配置文件(zookeeper.properties,service.properties,producer.properties,consumer.properties)

  我们暂时先不管这些配置文件,遵守初始的配置

    2.先启动zookeeper - kafka依赖与zookeeper 实现分布式一致性

  我们下载的kafka安装包,就自带了zookeeepr,zookeeper.properties就是自带的zk的配置文件

nohup bin/zookeeper-server-start.sh config/zookeeepr.properties&       nohup &是实现在后台启动

   

    3.再启动kafka服务

bin/kafka-server-start.sh config/server.properties

   

    4.创建一个Topic

bin/kafka-topics.sh --create --topic test1 --zookeeper localehost:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1

 

    4.再启动kafka生产端

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1

 

    5.在新窗口再启动kafka消费端

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test1 --from-beginning

 

    6.在生产窗口输入任意字符,观察在消费端是否能够收到相应字符

  

如果无法收到正确字符,或者报错,尝试从以下方面排查:

    1.服务是否都按顺序正常启动

    2.命令中开启的服务端口是否和相应的配置文件中的配置对应

        注:生产端访问的端口不是  zookeeper的localhost:2181, 而是producer.properties中配置的broker的端口,默认为9092

        注:这个broker的端口是需要在 server中有相应的配置才可以

 

简单介绍一下上面提到了config目录下面的配置,以及kafka集群的搭建

server.properties:一个server.properties文件代表了一个kafka服务,也就是一个Broker

所以说,如果我们想搭建一个kafka集群,需要有不同的 server.properties文件,来启动多个broker,多个borker组成kafka cluster

    注:每个server.properties配置文件中的 broker.id(服务器唯一标识)不能一样

         port(服务器监听端口号)不能一样

         zookeeper.connect(zookeeper的连接ip及端口),需和zookeeper.properties保持一致

 

kafka在Java程序的简单示例:

  生产:

public class JavaKafkaProducer {    private Logger logger = Logger.getLogger(JavaKafkaProducer.class);    public static final String TOPIC_NAME = "test1";    public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray();    public static final int chartsLength = charts.length;    public static void main(String[] args) {        String brokerList = "127.0.0.1:9092";        Properties props = new Properties();        props.put("metadata.broker.list", brokerList);        /**         * 0表示不等待结果返回
* 1表示等待至少有一个服务器返回数据接收标识
* -1表示必须接收到所有的服务器返回标识,及同步写入
* */ props.put("request.required.acks", "0"); /** * 内部发送数据是异步还是同步 * sync:同步, 默认 * async:异步 */ props.put("producer.type", "async"); /** * 设置序列化的类 * 可选:kafka.serializer.StringEncoder * 默认:kafka.serializer.DefaultEncoder */ props.put("serializer.class", "kafka.serializer.StringEncoder"); /** * 设置分区类 * 根据key进行数据分区 * 默认是:kafka.producer.DefaultPartitioner ==> 安装key的hash进行分区 * 可选:kafka.serializer.ByteArrayPartitioner ==> 转换为字节数组后进行hash分区 */ props.put("partitioner.class", "com.kafka.JavaKafkaProducerPartitioner"); // 重试次数 props.put("message.send.max.retries", "3"); // 异步提交的时候(async),并发提交的记录数 props.put("batch.num.messages", "200"); // 设置缓冲区大小,默认10KB props.put("send.buffer.bytes", "102400"); // 2. 构建Kafka Producer Configuration上下文 ProducerConfig config = new ProducerConfig(props); // 3. 构建Producer对象 final Producer
producer = new Producer
(config); // 4. 发送数据到服务器,并发线程发送 final AtomicBoolean flag = new AtomicBoolean(true); int numThreads = 50; ExecutorService pool = Executors.newFixedThreadPool(numThreads); for (int i = 0; i < 5; i++) { pool.submit(new Thread(new Runnable() { @Override public void run() { while (flag.get()) { // 发送数据 KeyedMessage message = generateKeyedMessage(); producer.send(message); System.out.println("发送数据:" + message); // 休眠一下 try { int least = 10; int bound = 100; Thread.sleep(ThreadLocalRandom.current().nextInt(least, bound)); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + " shutdown...."); } }, "Thread-" + i)); } // 5. 等待执行完成 long sleepMillis = 600000; try { Thread.sleep(sleepMillis); } catch (InterruptedException e) { e.printStackTrace(); } flag.set(false); // 6. 关闭资源 pool.shutdown(); try { pool.awaitTermination(6, TimeUnit.SECONDS); } catch (InterruptedException e) { } finally { producer.close(); // 最后之后调用 } } /** * 产生一个消息 * * @return */ private static KeyedMessage
generateKeyedMessage() { String key = "key_" + ThreadLocalRandom.current().nextInt(10, 99); StringBuilder sb = new StringBuilder(); int num = ThreadLocalRandom.current().nextInt(1, 5); for (int i = 0; i < num; i++) { sb.append(generateStringMessage(ThreadLocalRandom.current().nextInt(3, 20))).append(" "); } String message = sb.toString().trim(); return new KeyedMessage(TOPIC_NAME, key, message); } /** * 产生一个给定长度的字符串 * * @param numItems * @return */ private static String generateStringMessage(int numItems) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < numItems; i++) { sb.append(charts[ThreadLocalRandom.current().nextInt(chartsLength)]); } return sb.toString(); }}

 

  消费:

public class JavaKafkaConsumerHighAPITest {    public static void main(String[] args) {        String zookeeper = "127.0.0.1";        String groupId = "test-consumer-group";        String topic = "test1";        int threads = 1;        JavaKafkaConsumerHighAPI example = new JavaKafkaConsumerHighAPI(topic, threads, zookeeper, groupId);        new Thread(example).start();        // 执行10秒后结束        int sleepMillis = 600000;        try {            Thread.sleep(sleepMillis);        } catch (InterruptedException e) {            e.printStackTrace();        }        // 关闭        example.shutdown();    }}

 

 

    1.Broker -- 每个kafka server称为一个Broker,多个borker组成kafka cluster。

    2.Topic  --  Topic 就是消息类别名,一个topic中通常放置一类消息。每个topic都有一个或者多个订阅者,也就是消息的消费者consumer。

        Producer将消息推送到topic,由订阅该topic的consumer从topic中拉取消息。

        一个Broker上可以创建一个或者多个Topic。同一个topic可以在同一集群下的多个Broker中分布。

    ....

 

 

参考博文:

 

转载于:https://www.cnblogs.com/xuzekun/p/8986540.html

你可能感兴趣的文章
《深入理解计算机系统》第一章学习笔记
查看>>
Rocket - util - MultiWidthFifo
查看>>
jQuery fullPage 全屏滚动
查看>>
关于C#虚函数和构造函数的一点理解
查看>>
::c++的样子,
查看>>
fuck,两个地方,
查看>>
郁闷,蛋疼的S3C2416 ,哥狠狠的被2416 手册 暗算了一把
查看>>
对象属性访问的总结
查看>>
深搜广搜
查看>>
VisualStudio自定义代码段_方法二
查看>>
WC2008游览计划(BZOJ2595)
查看>>
消息队列
查看>>
《Android深度探索》第六章心得体会
查看>>
嵌入式服务器jetty,让你更快开发web
查看>>
【原创】基于ZYNQ7000的交叉编译工具链Qt+OpenCV+ffmpeg等库支持总结(二)
查看>>
【HDOJ】1493 QQpet exploratory park
查看>>
【HDOJ】3553 Just a String
查看>>
一共81个,开源大数据处理工具汇总(上)(转)
查看>>
并查集模版
查看>>
vue 使用a+ router.push的形式跳转时,地址栏不显示参数
查看>>