Java JDK安装
如果已经安装了 Jdk 的话,就略过。
官网下载地址
JDK下载: https://www.oracle.com/java/technologies/
配置环境变量
简单,略过。
Zookeeper安装
因为 kafka 是基于 Zookeeper 的,而 Zookeeper 一般都是一个分布式的集群,尽管 kafka 有自带 Zookeeper,但是一般不使用自带的,都是使用外部安装的,所以首先我们需要安装Zookeeper。
官网下载地址
Zookeeper 官网:https://zookeeper.apache.org/releases.html
本地安装
- 把下载好的压缩包解压,放到自己指定想要安装的目录下;
- 在bin文件夹同级下,创建data文件夹、log文件夹;
- 到config文件夹,将目录中zoo_sample.cfg文件复制一份,重命名为zoo.cfg;
- zoo.cfg配置文件,将默认的 dataDir=/tmp/zookeeper 修改成 zookeeper 安装目录所在的 data 文件夹,再增加数据日志的配置;
启动测试
完成上面所有配置后,进入到 Zookeeper 安装目录下的 bin 文件下,双击运行 zkServer.cmd 启动;
保持cmd窗口不要关掉!!!
zkServer.cmd
Kafka安装
官网下载
kafka官网: https://kafka.apache.org/downloads.html
修改文件夹名字
名字不宜过长
修改配置文件中
解压到相应文件夹,进入config目录下,找到server.properties文件
启动kafka
cmd到kafka解压文件的目录
.\bin\windows\kafka-server-start.bat .\config\server.properties
offset安装
offset是kafka的可视化工具,方便查看,下载地址: https://www.kafkatool.com/download2/offsetexplorer_64bit.exe
代码示例
package main
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"os"
"os/signal"
"syscall"
"time"
)
var (
reader *kafka.Reader
topic = "user_click"
)
func writeKafka(ctx context.Context) {
writer := kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: topic, // topic第一次写不存在,一定会报错
Balancer: &kafka.Hash{},
WriteTimeout: 1 * time.Second, //超时不等待
RequiredAcks: kafka.RequireNone, // 不安全
AllowAutoTopicCreation: true, // 线上不要
}
defer writer.Close()
for i := 0; i < 3; i++ {
err := writer.WriteMessages(ctx,
// 5 条要么都成功,要么都失败,key主要用于hash
kafka.Message{Key: []byte("1"), Value: []byte("大")},
kafka.Message{Key: []byte("1"), Value: []byte("小")},
kafka.Message{Key: []byte("1"), Value: []byte("白")},
kafka.Message{Key: []byte("1"), Value: []byte("红")},
kafka.Message{Key: []byte("1"), Value: []byte("蓝")},
)
if err != nil {
if err == kafka.LeaderNotAvailable { // 第一次失败
// 重试3次
continue
} else {
fmt.Printf("写失败:%v\n", err.Error())
}
}
break
}
}
func readKafka(ctx context.Context) {
reader = kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic,
CommitInterval: 1 * time.Second,
GroupID: "rec_team",
StartOffset: kafka.FirstOffset, //新的消费方,从头开始
})
// kill 的时候,这个不会执行
//defer reader.Close()
for {
msg, err := reader.ReadMessage(ctx)
if err != nil {
fmt.Printf("读失败:%v\n", err.Error())
break
} else {
fmt.Printf("topic=%s,Partition=%d,offset=%d,key=%s,value=%s \n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
}
}
// 监听2和15,收尾工作
func listenSignal() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
sig := <-c
fmt.Printf("接收信号 %s", sig.String())
if reader != nil {
reader.Close()
}
os.Exit(0)
}
func main() {
ctx := context.Background()
writeKafka(ctx)
go listenSignal()
readKafka(ctx)
}
问题
Kafka报错ERROR Shutdown broker because all log dirs
在使用Kafka时,删除了topic 后出现问题:
Kafka服务开始报错:
ERROR Shutdown broker because all log dirs in D:\Kafka\kafka_2.12-2.0.0\logs have failed (kafka.log.LogManager)。
解决办法:
- 去kafka日志目录下删除log里topic日志(重启kafka还是报错)。
- 还需要删除zookeeper里zoo.cfg里配置的dataDir目录下的日志。
编写bat脚本一键启动
启动kafka CMD报错:输入行太长,语法错误,在网上查到一个解决方法: kafka目录不要建太深,直接在放在D盘
@echo off
rd /s /q D:\program\kafka\log
rd /s /q D:\program\kafka\logs
rd /s /q D:\program\kafka\programkafkalog
rd /s /q D:\program\zookeeper\data
rd /s /q D:\program\zookeeper\log
REM 启动Zookeeper(根据实际安装路径调整,以下是示例路径)
start "Zookeeper" cmd /k "D:\program\zookeeper\bin\zkServer.cmd"
timeout /t 10
REM 启动kafka
echo Starting Kafka...
start "Kafka" cmd /k "D:\program\kafka\bin\windows\kafka-server-start.bat D:\program\kafka\config\server.properties"
echo All servers started.