Kafka集群配置以及与Storm的spout集成
kafka介绍
Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。
Kafka主要特点:
- 同时为发布和订阅提供高吞吐量。据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
- 可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。通过将数据持久化到硬盘以及replication防止数据丢失。
- 分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的。无需停机即可扩展机器。
- 消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。
- 支持online和offline的场景。
kafka架构:
- Topic:特指Kafka处理的消息源(feeds of messages)的不同分类。
- Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
- Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。
- Producers:消息和数据生产者,向Kafka的一个topic发布消息的过程叫做producers。
- Consumers:消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumers。
- Broker:缓存代理,Kafa集群中的一台或多台服务器统称为broker。
- kafka以topic来进行消息管理,每个topic包含多个partition,每个partition对应一个逻辑log,有多个segment组成。
- 每个segment中存储多条消息,消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。
- 每个partition在内存中对应一个index,记录每个segment中的第一条消息偏移。
- 发布者发到某个topic的消息会被均匀的分布到多个partition上(随机或根据用户指定的回调函数进行分布),broker收到发布消息往对应partition的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment;
- 在Kafka文件存储中,同一个topic下有多个不同partition,在单机模式上每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1;
在多机的分布式kafka中,kafka的partition分配规则如下图所示:
一个Kafka集群中4个Broker举例,创建1个topic包含4个Partition,2 Replication, brokers上分区分配如下图:
当当集群中新增2节点,Partition增加到6个时分布情况如下:
具体可以参考Kafka集群partition replication自动分配分析partion中segment file组成和物理结构:segment file由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件;segment文件命名规则–partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
kafka的运行流程美团团队分析的比较好,可以参考:Kafka文件存储机制那些事–美团技术团队
kafka 单机模式的配置
1) 去kafka官网下载kafka,记得下载bin包,就不用再自己编译:1
wget http://www.webhostingjams.com/mirror/apache/kafka/0.8.2.1/kafka_2.9.1-0.8.2.1.tgz
2) 解压1
tar -xvf kafka_2.9.1-0.8.2.1.tgz
3) 配置zookeeper配置文件和kafka配置文件1
2cd kafka_2.9.1-0.8.2.1
vim config/zookeeper.properties
自带的zookeeper默认三条配置如下,如果要修改端口或者数据文件夹,可以修改:1
2
3
4
5
6# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
然后修改kafka server配置文件:1
vim config/server.properties
对于单机运行而言,指定一个broker即可,host name可以修改下,其他基本不用修改(如果你修改了zookeeper端口,那么相应的zookeeper配置也要修改):1
2
3
4
5
6
7
8
9
10
11############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0 #kafka集群中服务器的编号
############################# Socket Server Settings #############################
# The port the socket server listens on
port=9092 #主机端口
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=localhost #主机名,在分布式kafka中一定要配置
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs #消息log文件存储目录
1 |
|
4)启动1
2
3
4#先启动自带的zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
#再启动kafka
bin/kafka-server-start.sh config/server.properties &
5)先创建topic分区,用于测试1
2#其中指定zookeeper地址, --replication-factor指定冗余分区数目,--partition指定topic的分区数目,--topic指定分区名称
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkatest
查看topic是否成功创建1
bin/kafka-topics.sh --describe --zookeeper localhost:2181
6)消息发送1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkatest
执行命令后,输出一个WARN后,可以开始输入消息,1
2
3
4
5
6
7[2015-07-26 08:58:42,027] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
ssdfff
sdf[2015-07-26 08:58:55,305] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
fsdfsff
dfsff
sdfdffdfd
sdffff
按ctrl-c结束输入
7)接收消息测试1
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatest --from-beginning
刚才发送的消息接收1
2
3
4
5
6[2015-07-26 09:03:20,138] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
ssdfff
sdffsdfsff
dfsff
sdfdffdfd
sdffff
kafka 集群模式的配置
三台服务器的ip分别为: 192.168.61.150(主机名cluster1), 192.168.61.151(主机名cluster2), 192.168.61.152(主机名cluster3)
1) 分别在三台服务器上配置zookeeper,可以参考这篇文章ZooKeeper-3.3.4集群安装配置
2) 配置kafka server配置文件:
对于192.168.61.150机器,配置文件如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23#为了削减篇幅,删除了无用的注释行
############################# Server Basics
broker.id=0 # 服务器的编号
port=9092
host.name=cluster1 #此处主机名要改
#这些默认配置不用改
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
#log.flush.interval.messages=10000
#log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=cluster1:2181,cluster2:2181,cluster3:2181 #此次重要,需要改
zookeeper.connection.timeout.ms=6000
对于192.168.61.151机器,配置文件如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23#为了削减篇幅,删除了无用的注释行
############################# Server Basics
broker.id=1 # 服务器的编号
port=9092
host.name=cluster1 #此处主机名要改
#这些默认配置不用改
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
#log.flush.interval.messages=10000
#log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=cluster1:2181,cluster2:2181,cluster3:2181 #此次重要,需要改
zookeeper.connection.timeout.ms=6000
对于192.168.61.152机器,配置文件如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23#为了削减篇幅,删除了无用的注释行
############################# Server Basics
broker.id=2 # 服务器的编号
port=9092
host.name=cluster1 #此处主机名要改
#这些默认配置不用改
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
#log.flush.interval.messages=10000
#log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=cluster1:2181,cluster2:2181,cluster3:2181 #此次重要,需要改
zookeeper.connection.timeout.ms=6000
3) 分别在每台机器上启动kafka即可1
bin/kafka-server-start.sh config/server.properties &
kafka可配置选项
请参考 Kafka 配置说明 含0.8.1版server.properties
kafka java API
kafka java api使用相当简单,首先是在pom文件中引入kafka jar包。
pom.xml文件
1 | <?xml version="1.0" encoding="UTF-8"?> |
producer API
producer 的代码我主要是参考github上的一个example gwenshap/kafka-examples
DemoProducer的定义代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72package com.test.groups;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class DemoProducer {
String topic;
String sync;
private Properties kafkaProps = new Properties();
private Producer<String, String> producer;
public DemoProducer(String topic) {
this.topic = topic;
}
public void configure(String brokerList, String sync) {
this.sync = sync;
kafkaProps.put("bootstrap.servers", brokerList);
// This is mandatory, even though we don't send keys
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("acks", "1");
// how many times to retry when produce request fails?
kafkaProps.put("retries", "3");
kafkaProps.put("linger.ms", 5);
}
public void start() {
producer = new KafkaProducer<String, String>(kafkaProps);
}
public void produce(String value) throws ExecutionException, InterruptedException {
if (sync.equals("sync"))
produceSync(value);
else if (sync.equals("async"))
produceAsync(value);
else throw new IllegalArgumentException("Expected sync or async, got " + sync);
}
public void close() {
producer.close();
}
/* Produce a record and wait for server to reply. Throw an exception if something goes wrong */
private void produceSync(String value) throws ExecutionException, InterruptedException {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, value);
producer.send(record).get();
}
/* Produce a record without waiting for server. This includes a callback that will print an error if something goes wrong */
private void produceAsync(String value) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, value);
producer.send(record, new DemoProducerCallback());
}
private class DemoProducerCallback implements Callback {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.out.println("Error producing to topic " + recordMetadata.topic());
e.printStackTrace();
}
}
}
}
DemoProducer的测试代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40package com.test.groups;
import java.util.concurrent.ExecutionException;
public class SimpleCounter {
private static DemoProducer producer;
public static void main(String[] args) throws InterruptedException, ExecutionException {
if (args.length == 0) {
System.out.println("SimpleCounterOldProducer {broker-list} {topic} {type old/new} {type sync/async} {delay (ms)} {count}");
return;
}
/* get arguments */
String brokerList = args[0];
String topic = args[1];
String sync = args[2];
int delay = Integer.parseInt(args[3]);
int count = Integer.parseInt(args[4]);
/* start a producer */
producer.configure(brokerList, sync);
producer.start();
long startTime = System.currentTimeMillis();
System.out.println("Starting...");
producer.produce("Starting...");
/* produce the numbers */
for (int i=0; i < count; i++ ) {
producer.produce(Integer.toString(i));
Thread.sleep(delay);
}
long endTime = System.currentTimeMillis();
System.out.println("... and we are done. This took " + (endTime - startTime) + " ms.");
producer.produce("... and we are done. This took " + (endTime - startTime) + " ms.");
/* close shop and leave */
producer.close();
System.exit(0);
}
}
然后在eclipse中运行时加上参数:localhost:9092 kafkatest async 50 10
其中参数【1】为kafka地址,参数【2】为topic name, 参数【3】为同步produce还是异步produce,【4】为发送间隔,【5】为发送消息数目。
consumer API
consumer test java代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89package com.test.groups;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConsumerGroupExample {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
// 创建consumer对象, a_zookeeper是zookeeper地址,
// a_groupId是消费者读取时的group id,可以随便设
// a_topic是读取的topic名称
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}
// 关闭consumer接口
public void shutdown() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
try {
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
} catch (InterruptedException e) {
System.out.println("Interrupted during shutdown, exiting uncleanly");
}
}
// 启动消费者, a_numThreads指定读取线程数,因为topic可以有多个分区,可以多线程读取
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
int threadNumber = 0;
for (final KafkaStream stream : streams) {
// ConsumerThread真正读取消息是在 ConsumerThread中
executor.submit(new ConsumerThread(stream, threadNumber));
threadNumber++;
}
}
// 初始化consumer 的相关属性
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public static void main(String[] args) {
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = Integer.parseInt(args[3]);
ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
example.run(threads);
// sleep时间用于consumer将offset信息更新到zookeeper中,因为offset信息不是同步更新的
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
// example.shutdown();
}
}
ConsumerThread代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22package com.test.groups;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class ConsumerThread implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
//此处循环会不断从topic中读取消息,阻塞函数,除非手动shutdown;
while (it.hasNext())
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
System.out.println("Shutting down Thread: " + m_threadNumber);
}
}
kafka与storm的集成kafka-spout
kafka集群与storm集群的集成很简单,也很自然,让storm从kafka集群中获取数据,一方面实现了数据采集和计算的解耦合,一方面kafka可作为数据缓冲,对此,storm官方已经给予了支持,在apache storm项目中有专门的jar包,只需要在maven文件中添加如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.2-incubating</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.2-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
然后在Topology定义代码中设置kafka spout即可:1
2
3
4
5
6
7
8
9TopologyBuilder builder = new TopologyBuilder();
BrokerHosts hosts = new ZkHosts("192.168.61.150:2181,192.168.61.151:2181,192.168.61.152:2181");
String topic_name = "kafkatest";
String zkRoot = "/" + topic_name;
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic_name, zkRoot, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
// set Spout.
builder.setSpout("spout", kafkaSpout, 3);
1) 上面需要注意的是,如果你的kafka 消息类型不是String, 是自定义的消息类型,那么需要自定义spoutConfig.scheme. spoutConfig.scheme 用于实现kafka-spout如何将接收到的kafka message转换为storm的tuple。
1 | public class TestMessageScheme implements Scheme { |
2)上面的代码是实时的,即启动storm后只能收到之后发送的kafka message;
- 配置过程中遇到的问题
我做的主要是用kafka来做图片消息传输,在测试过程中遇到的问题主要是图片太大,在发送和接收的时候都出现问题。
1) 发送的时候图片太大,导致消息的size大不能传输,解决办法为:
2) 修改完发送端口,发送图片没有问题,可以接收图片的时候总是会卡住,接收几条消息之后consumer就卡住不动了,目前只能切分为小图片发送;
参考资料
- 标点符 » 分布式消息系统:Kafka
- Kafka文件存储机制那些事–美团技术团队
- gwenshap/kafka-examples
- Kafka集群partition replication自动分配分析
- Kafka 配置说明 含0.8.1版server.properties
- ZooKeeper-3.3.4集群安装配置
转载请标明文章出处。本文内容为实践后的原创整理,如果侵犯了您的版权,请联系我进行删除,邮件:yanhealth@163.com