Java JDK安装

如果已经安装了 Jdk 的话,就略过。

官网下载地址

JDK下载: https://www.oracle.com/java/technologies/

配置环境变量

简单,略过。

Zookeeper安装

因为 kafka 是基于 Zookeeper 的,而 Zookeeper 一般都是一个分布式的集群,尽管 kafka 有自带 Zookeeper,但是一般不使用自带的,都是使用外部安装的,所以首先我们需要安装Zookeeper。

官网下载地址

Zookeeper 官网:https://zookeeper.apache.org/releases.html

本地安装

  1. 把下载好的压缩包解压,放到自己指定想要安装的目录下;
  2. 在bin文件夹同级下,创建data文件夹、log文件夹;
  3. 到config文件夹,将目录中zoo_sample.cfg文件复制一份,重命名为zoo.cfg;
  4. zoo.cfg配置文件,将默认的 dataDir=/tmp/zookeeper 修改成 zookeeper 安装目录所在的 data 文件夹,再增加数据日志的配置;

lzveb4m5.png

启动测试

完成上面所有配置后,进入到 Zookeeper 安装目录下的 bin 文件下,双击运行 zkServer.cmd 启动;
保持cmd窗口不要关掉!!!

 zkServer.cmd 

Kafka安装

官网下载

kafka官网: https://kafka.apache.org/downloads.html

修改文件夹名字

名字不宜过长

修改配置文件中

解压到相应文件夹,进入config目录下,找到server.properties文件

lzvedx1d.png

lzveefmg.png

启动kafka

cmd到kafka解压文件的目录

.\bin\windows\kafka-server-start.bat .\config\server.properties

lzvefey6.png

lzveflw4.png

offset安装

offset是kafka的可视化工具,方便查看,下载地址: https://www.kafkatool.com/download2/offsetexplorer_64bit.exe

lzvegjm0.png

代码示例

package main

import (
    "context"
    "fmt"
    "github.com/segmentio/kafka-go"
    "os"
    "os/signal"
    "syscall"
    "time"
)

var (
    reader *kafka.Reader
    topic  = "user_click"
)

func writeKafka(ctx context.Context) {
    writer := kafka.Writer{
        Addr:                   kafka.TCP("localhost:9092"),
        Topic:                  topic, // topic第一次写不存在,一定会报错
        Balancer:               &kafka.Hash{},
        WriteTimeout:           1 * time.Second,   //超时不等待
        RequiredAcks:           kafka.RequireNone, // 不安全
        AllowAutoTopicCreation: true,              // 线上不要
    }
    defer writer.Close()

    for i := 0; i < 3; i++ {
        err := writer.WriteMessages(ctx,
            // 5 条要么都成功,要么都失败,key主要用于hash
            kafka.Message{Key: []byte("1"), Value: []byte("大")},
            kafka.Message{Key: []byte("1"), Value: []byte("小")},
            kafka.Message{Key: []byte("1"), Value: []byte("白")},
            kafka.Message{Key: []byte("1"), Value: []byte("红")},
            kafka.Message{Key: []byte("1"), Value: []byte("蓝")},
        )
        if err != nil {
            if err == kafka.LeaderNotAvailable { // 第一次失败
                // 重试3次
                continue
            } else {
                fmt.Printf("写失败:%v\n", err.Error())
            }
        }
        break
    }

}

func readKafka(ctx context.Context) {
    reader = kafka.NewReader(kafka.ReaderConfig{
        Brokers:        []string{"localhost:9092"},
        Topic:          topic,
        CommitInterval: 1 * time.Second,
        GroupID:        "rec_team",
        StartOffset:    kafka.FirstOffset, //新的消费方,从头开始
    })

    // kill 的时候,这个不会执行
    //defer reader.Close()

    for {
        msg, err := reader.ReadMessage(ctx)
        if err != nil {
            fmt.Printf("读失败:%v\n", err.Error())
            break
        } else {
            fmt.Printf("topic=%s,Partition=%d,offset=%d,key=%s,value=%s \n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
        }
    }
}

// 监听2和15,收尾工作
func listenSignal() {
    c := make(chan os.Signal, 1)
    signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
    sig := <-c
    fmt.Printf("接收信号 %s", sig.String())
    if reader != nil {
        reader.Close()
    }
    os.Exit(0)
}

func main() {
    ctx := context.Background()
    writeKafka(ctx)
    go listenSignal()
    readKafka(ctx)
}

问题

Kafka报错ERROR Shutdown broker because all log dirs

在使用Kafka时,删除了topic 后出现问题:

Kafka服务开始报错:

ERROR Shutdown broker because all log dirs in D:\Kafka\kafka_2.12-2.0.0\logs have failed (kafka.log.LogManager)。

解决办法:

  1. 去kafka日志目录下删除log里topic日志(重启kafka还是报错)。
  2. 还需要删除zookeeper里zoo.cfg里配置的dataDir目录下的日志。

编写bat脚本一键启动

启动kafka CMD报错:输入行太长,语法错误,在网上查到一个解决方法: kafka目录不要建太深,直接在放在D盘

@echo off
rd /s /q D:\program\kafka\log
rd /s /q D:\program\kafka\logs
rd /s /q D:\program\kafka\programkafkalog
rd /s /q D:\program\zookeeper\data
rd /s /q D:\program\zookeeper\log

REM 启动Zookeeper(根据实际安装路径调整,以下是示例路径)
start "Zookeeper" cmd /k "D:\program\zookeeper\bin\zkServer.cmd"

timeout /t 10

REM 启动kafka
echo Starting Kafka...
start "Kafka" cmd /k "D:\program\kafka\bin\windows\kafka-server-start.bat D:\program\kafka\config\server.properties"

echo All servers started.
最后修改:2024 年 12 月 26 日
如果觉得我的文章对你有用,请随意赞赏