文章目录
  1. 1. kafka介绍
    1. 1.1. Kafka主要特点:
    2. 1.2. kafka架构:
  2. 2. kafka 单机模式的配置
  3. 3. kafka 集群模式的配置
  4. 4. kafka可配置选项
  5. 5. kafka java API
    1. 5.1. pom.xml文件
    2. 5.2. producer API
    3. 5.3. consumer API
  6. 6. kafka与storm的集成kafka-spout
  7. 7. 参考资料

kafka介绍

Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。

Kafka主要特点

  • 同时为发布和订阅提供高吞吐量。据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
  • 可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。通过将数据持久化到硬盘以及replication防止数据丢失。
  • 分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的。无需停机即可扩展机器。
  • 消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。
  • 支持online和offline的场景。

kafka架构

kafka架构图

  • Topic:特指Kafka处理的消息源(feeds of messages)的不同分类。
  • Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
  • Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。
  • Producers:消息和数据生产者,向Kafka的一个topic发布消息的过程叫做producers。
  • Consumers:消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumers。
  • Broker:缓存代理,Kafa集群中的一台或多台服务器统称为broker。
  • kafka以topic来进行消息管理,每个topic包含多个partition,每个partition对应一个逻辑log,有多个segment组成。
  • 每个segment中存储多条消息,消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。
  • 每个partition在内存中对应一个index,记录每个segment中的第一条消息偏移。
  • 发布者发到某个topic的消息会被均匀的分布到多个partition上(随机或根据用户指定的回调函数进行分布),broker收到发布消息往对应partition的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment;
  • 在Kafka文件存储中,同一个topic下有多个不同partition,在单机模式上每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1;
  • 多机的分布式kafka中,kafka的partition分配规则如下图所示:
    一个Kafka集群中4个Broker举例,创建1个topic包含4个Partition,2 Replication, brokers上分区分配如下图:
    kafka分布式partition分配规则1
    当当集群中新增2节点,Partition增加到6个时分布情况如下:
    kafka分布式partition分配规则2
    具体可以参考Kafka集群partition replication自动分配分析

  • partion中segment file组成和物理结构:segment file由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件;segment文件命名规则–partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。

    kafka的运行流程美团团队分析的比较好,可以参考:Kafka文件存储机制那些事–美团技术团队

kafka 单机模式的配置

1) 去kafka官网下载kafka,记得下载bin包,就不用再自己编译:

1
wget http://www.webhostingjams.com/mirror/apache/kafka/0.8.2.1/kafka_2.9.1-0.8.2.1.tgz

2) 解压

1
tar -xvf kafka_2.9.1-0.8.2.1.tgz

3) 配置zookeeper配置文件和kafka配置文件

1
2
cd kafka_2.9.1-0.8.2.1
vim config/zookeeper.properties

自带的zookeeper默认三条配置如下,如果要修改端口或者数据文件夹,可以修改:

1
2
3
4
5
6
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

然后修改kafka server配置文件:

1
vim config/server.properties

对于单机运行而言,指定一个broker即可,host name可以修改下,其他基本不用修改(如果你修改了zookeeper端口,那么相应的zookeeper配置也要修改):

1
2
3
4
5
6
7
8
9
10
11
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0 #kafka集群中服务器的编号
############################# Socket Server Settings #############################
# The port the socket server listens on
port=9092 #主机端口
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=localhost #主机名,在分布式kafka中一定要配置

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs #消息log文件存储目录

1
2
3
4
5
6
7
8
9
10

############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181 #此处我改为了我的主机名
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

4)启动

1
2
3
4
#先启动自带的zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
#再启动kafka
bin/kafka-server-start.sh config/server.properties &

5)先创建topic分区,用于测试

1
2
#其中指定zookeeper地址, --replication-factor指定冗余分区数目,--partition指定topic的分区数目,--topic指定分区名称
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkatest

查看topic是否成功创建

1
bin/kafka-topics.sh --describe --zookeeper localhost:2181

6)消息发送

1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkatest

执行命令后,输出一个WARN后,可以开始输入消息,

1
2
3
4
5
6
7
[2015-07-26 08:58:42,027] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
ssdfff
sdf[2015-07-26 08:58:55,305] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
fsdfsff
dfsff
sdfdffdfd
sdffff

按ctrl-c结束输入
7)接收消息测试

1
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatest --from-beginning

刚才发送的消息接收

1
2
3
4
5
6
[2015-07-26 09:03:20,138] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
ssdfff
sdffsdfsff
dfsff
sdfdffdfd
sdffff

kafka 集群模式的配置

三台服务器的ip分别为: 192.168.61.150(主机名cluster1), 192.168.61.151(主机名cluster2), 192.168.61.152(主机名cluster3)
1) 分别在三台服务器上配置zookeeper,可以参考这篇文章ZooKeeper-3.3.4集群安装配置
2) 配置kafka server配置文件:
对于192.168.61.150机器,配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#为了削减篇幅,删除了无用的注释行
############################# Server Basics
broker.id=0 # 服务器的编号
port=9092
host.name=cluster1 #此处主机名要改
#这些默认配置不用改
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
#log.flush.interval.messages=10000
#log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=cluster1:2181,cluster2:2181,cluster3:2181 #此次重要,需要改
zookeeper.connection.timeout.ms=6000

对于192.168.61.151机器,配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#为了削减篇幅,删除了无用的注释行
############################# Server Basics
broker.id=1 # 服务器的编号
port=9092
host.name=cluster1 #此处主机名要改
#这些默认配置不用改
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
#log.flush.interval.messages=10000
#log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=cluster1:2181,cluster2:2181,cluster3:2181 #此次重要,需要改
zookeeper.connection.timeout.ms=6000

对于192.168.61.152机器,配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#为了削减篇幅,删除了无用的注释行
############################# Server Basics
broker.id=2 # 服务器的编号
port=9092
host.name=cluster1 #此处主机名要改
#这些默认配置不用改
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
#log.flush.interval.messages=10000
#log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=cluster1:2181,cluster2:2181,cluster3:2181 #此次重要,需要改
zookeeper.connection.timeout.ms=6000

3) 分别在每台机器上启动kafka即可

1
bin/kafka-server-start.sh  config/server.properties  &

kafka可配置选项

请参考 Kafka 配置说明 含0.8.1版server.properties

kafka java API

kafka java api使用相当简单,首先是在pom文件中引入kafka jar包。

pom.xml文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>ProducerExample</groupId>
<artifactId>SimpleCounter</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>

</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<finalName>test-${artifactId}-${version}</finalName>
</configuration>
</plugin>
</plugins>
</build>
</project>

producer API

producer 的代码我主要是参考github上的一个example gwenshap/kafka-examples
DemoProducer的定义代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.test.groups;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class DemoProducer {

String topic;
String sync;
private Properties kafkaProps = new Properties();
private Producer<String, String> producer;

public DemoProducer(String topic) {
this.topic = topic;
}

public void configure(String brokerList, String sync) {
this.sync = sync;
kafkaProps.put("bootstrap.servers", brokerList);

// This is mandatory, even though we don't send keys
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("acks", "1");

// how many times to retry when produce request fails?
kafkaProps.put("retries", "3");
kafkaProps.put("linger.ms", 5);
}

public void start() {
producer = new KafkaProducer<String, String>(kafkaProps);
}

public void produce(String value) throws ExecutionException, InterruptedException {
if (sync.equals("sync"))
produceSync(value);
else if (sync.equals("async"))
produceAsync(value);
else throw new IllegalArgumentException("Expected sync or async, got " + sync);

}

public void close() {
producer.close();
}

/* Produce a record and wait for server to reply. Throw an exception if something goes wrong */
private void produceSync(String value) throws ExecutionException, InterruptedException {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, value);
producer.send(record).get();

}

/* Produce a record without waiting for server. This includes a callback that will print an error if something goes wrong */
private void produceAsync(String value) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, value);
producer.send(record, new DemoProducerCallback());
}

private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.out.println("Error producing to topic " + recordMetadata.topic());
e.printStackTrace();
}
}
}
}

DemoProducer的测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.test.groups;

import java.util.concurrent.ExecutionException;

public class SimpleCounter {
private static DemoProducer producer;
public static void main(String[] args) throws InterruptedException, ExecutionException {

if (args.length == 0) {
System.out.println("SimpleCounterOldProducer {broker-list} {topic} {type old/new} {type sync/async} {delay (ms)} {count}");
return;
}
/* get arguments */
String brokerList = args[0];
String topic = args[1];
String sync = args[2];
int delay = Integer.parseInt(args[3]);
int count = Integer.parseInt(args[4]);

/* start a producer */
producer.configure(brokerList, sync);
producer.start();
long startTime = System.currentTimeMillis();
System.out.println("Starting...");
producer.produce("Starting...");

/* produce the numbers */
for (int i=0; i < count; i++ ) {
producer.produce(Integer.toString(i));
Thread.sleep(delay);
}
long endTime = System.currentTimeMillis();
System.out.println("... and we are done. This took " + (endTime - startTime) + " ms.");
producer.produce("... and we are done. This took " + (endTime - startTime) + " ms.");

/* close shop and leave */
producer.close();
System.exit(0);
}
}

然后在eclipse中运行时加上参数:localhost:9092 kafkatest async 50 10
其中参数【1】为kafka地址,参数【2】为topic name, 参数【3】为同步produce还是异步produce,【4】为发送间隔,【5】为发送消息数目。

consumer API

consumer test java代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.test.groups;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerGroupExample {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;

public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
// 创建consumer对象, a_zookeeper是zookeeper地址,
// a_groupId是消费者读取时的group id,可以随便设
// a_topic是读取的topic名称
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}

// 关闭consumer接口
public void shutdown() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
try {
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
} catch (InterruptedException e) {
System.out.println("Interrupted during shutdown, exiting uncleanly");
}
}

// 启动消费者, a_numThreads指定读取线程数,因为topic可以有多个分区,可以多线程读取
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

// now launch all the threads
executor = Executors.newFixedThreadPool(a_numThreads);

// now create an object to consume the messages
int threadNumber = 0;
for (final KafkaStream stream : streams) {
// ConsumerThread真正读取消息是在 ConsumerThread中
executor.submit(new ConsumerThread(stream, threadNumber));
threadNumber++;
}
}

// 初始化consumer 的相关属性
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");

return new ConsumerConfig(props);
}

public static void main(String[] args) {
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = Integer.parseInt(args[3]);

ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
example.run(threads);

// sleep时间用于consumer将offset信息更新到zookeeper中,因为offset信息不是同步更新的
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {

}
// example.shutdown();
}
}

ConsumerThread代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.test.groups;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class ConsumerThread implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;

public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}

public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
//此处循环会不断从topic中读取消息,阻塞函数,除非手动shutdown;
while (it.hasNext())
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
System.out.println("Shutting down Thread: " + m_threadNumber);
}
}

kafka与storm的集成kafka-spout

kafka集群与storm集群的集成很简单,也很自然,让storm从kafka集群中获取数据,一方面实现了数据采集和计算的解耦合,一方面kafka可作为数据缓冲,对此,storm官方已经给予了支持,在apache storm项目中有专门的jar包,只需要在maven文件中添加如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.2-incubating</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.2-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>

然后在Topology定义代码中设置kafka spout即可:

1
2
3
4
5
6
7
8
9
TopologyBuilder builder = new TopologyBuilder();
BrokerHosts hosts = new ZkHosts("192.168.61.150:2181,192.168.61.151:2181,192.168.61.152:2181");
String topic_name = "kafkatest";
String zkRoot = "/" + topic_name;
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic_name, zkRoot, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
// set Spout.
builder.setSpout("spout", kafkaSpout, 3);

1) 上面需要注意的是,如果你的kafka 消息类型不是String, 是自定义的消息类型,那么需要自定义spoutConfig.scheme. spoutConfig.scheme 用于实现kafka-spout如何将接收到的kafka message转换为storm的tuple。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TestMessageScheme implements Scheme {

private static final Logger LOGGER = LoggerFactory.getLogger(TestMessageScheme.class);

@Override
public List<Object> deserialize(byte[] bytes) {
try {
String msg = new String(bytes, "UTF-8");
return new Values(msg);
} catch (InvalidProtocolBufferException e) {
LOGGER.error("Cannot parse the provided message!");
}

//TODO: what happend if returns null?
return null;
}

@Override
public Fields getOutputFields() {
return new Fields("msg");
}

}

2)上面的代码是实时的,即启动storm后只能收到之后发送的kafka message;

  • 配置过程中遇到的问题
    我做的主要是用kafka来做图片消息传输,在测试过程中遇到的问题主要是图片太大,在发送和接收的时候都出现问题。
    1) 发送的时候图片太大,导致消息的size大不能传输,解决办法为:

2) 修改完发送端口,发送图片没有问题,可以接收图片的时候总是会卡住,接收几条消息之后consumer就卡住不动了,目前只能切分为小图片发送;

参考资料






转载请标明文章出处。本文内容为实践后的原创整理,如果侵犯了您的版权,请联系我进行删除,邮件:yanhealth@163.com

文章目录
  1. 1. kafka介绍
    1. 1.1. Kafka主要特点:
    2. 1.2. kafka架构:
  2. 2. kafka 单机模式的配置
  3. 3. kafka 集群模式的配置
  4. 4. kafka可配置选项
  5. 5. kafka java API
    1. 5.1. pom.xml文件
    2. 5.2. producer API
    3. 5.3. consumer API
  6. 6. kafka与storm的集成kafka-spout
  7. 7. 参考资料