安装

cd /usr/local/
#上传kafka_2.11-2.1.0.tgz
tar xf kafka_2.11-2.1.0.tgz
mv kafka_2.11-2.1.0 kafka
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
:$KAFKA_HOME/bin
source /etc/profile
cd kafka/config
修改配置文件
vim server.properties
broker.id=1
listeners=PLAINTEXT://10.0.0.11:9092
#advertised.listeners=PLAINTEXT://your.host.name:9092
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/kafka/log
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
zookeeper.connect=10.0.0.11:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true

#运行kafka
kafka-server-start.sh /usr/local/kafka/config/server.properties

#创建一个TOPIC
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
#查看TOPIC
kafka-topics.sh --list --zookeeper localhost:2181
#发送一个消息
kafka-console-producer.sh --broker-list 10.0.0.11:9092 --topic test
#随便输入内容
DING XIANG HUA
#新窗口启动消费者
kafka-console-consumer.sh --bootstrap-server 10.0.0.11:9092 --topic test --from-beginning

修改flume配置文件

cd /usr/local/flume/conf/
vim kafka.conf
agent.sources=s1
agent.channels=c1
agent.sinks=k1

agent.sources.s1.type=exec
agent.sources.s1.command=tail -F /tmp/logs/kafka.log
agent.sources.s1.channels=c1
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100

#设置Kafka接收器
agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
agent.sinks.k1.brokerList=10.0.0.11:9092
#设置Kafka的Topic
agent.sinks.k1.topic=kafkatest
#设置序列化方式
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder

agent.sinks.k1.channel=c1


#编辑shell文件
vim kafka.sh
#!/bin/bash
for ((i=0;i<=100000;i++));
do
echo "kafka_test-"+$i >> /tmp/logs/kafka.log;
sleep 10
done

#创建目录,文件
mkdir -p /tmp/logs
touch /tmp/logs/kafka.log
执行脚本
sh kafka.sh
#启动
kafka-server-start.sh /usr/local/kafka/config/server.properties &
#打开新终端,在kafka安装目录下执行如下命令,生成对topickafkatest的消费
kafka-console-consumer.sh --bootstrap-server 10.0.0.11:9092 --topic kafkatest --from-beginning
#启动flume
flume-ng agent --conf . --conf-file /usr/local/flume/conf/kafka.conf --name agent -D flume.root.logger=DEBUG,console