安装
cd /usr/local/ #上传kafka_2.11-2.1.0.tgz tar xf kafka_2.11-2.1.0.tgz mv kafka_2.11-2.1.0 kafka vim /etc/profile export KAFKA_HOME=/usr/local/kafka :$KAFKA_HOME/bin source /etc/profile cd kafka/config 修改配置文件 vim server.properties broker.id=1 listeners=PLAINTEXT://10.0.0.11:9092 #advertised.listeners=PLAINTEXT://your.host.name:9092 #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/var/kafka/log num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 zookeeper.connect=10.0.0.11:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true #运行kafka kafka-server-start.sh /usr/local/kafka/config/server.properties
#创建一个TOPIC kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test #查看TOPIC kafka-topics.sh --list --zookeeper localhost:2181 #发送一个消息 kafka-console-producer.sh --broker-list 10.0.0.11:9092 --topic test #随便输入内容 DING XIANG HUA #新窗口启动消费者 kafka-console-consumer.sh --bootstrap-server 10.0.0.11:9092 --topic test --from-beginning
修改flume配置文件
cd /usr/local/flume/conf/ vim kafka.conf agent.sources=s1 agent.channels=c1 agent.sinks=k1 agent.sources.s1.type=exec agent.sources.s1.command=tail -F /tmp/logs/kafka.log agent.sources.s1.channels=c1 agent.channels.c1.type=memory agent.channels.c1.capacity=10000 agent.channels.c1.transactionCapacity=100 #设置Kafka接收器 agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink #设置Kafka的broker地址和端口号 agent.sinks.k1.brokerList=10.0.0.11:9092 #设置Kafka的Topic agent.sinks.k1.topic=kafkatest #设置序列化方式 agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder agent.sinks.k1.channel=c1 #编辑shell文件 vim kafka.sh #!/bin/bash for ((i=0;i<=100000;i++)); do echo "kafka_test-"+$i >> /tmp/logs/kafka.log; sleep 10 done #创建目录,文件 mkdir -p /tmp/logs touch /tmp/logs/kafka.log 执行脚本 sh kafka.sh #启动 kafka-server-start.sh /usr/local/kafka/config/server.properties & #打开新终端,在kafka安装目录下执行如下命令,生成对topickafkatest的消费 kafka-console-consumer.sh --bootstrap-server 10.0.0.11:9092 --topic kafkatest --from-beginning #启动flume flume-ng agent --conf . --conf-file /usr/local/flume/conf/kafka.conf --name agent -D flume.root.logger=DEBUG,console