随着企业系统规模的不断扩大,系统的日志越来越庞大,如果没有一个可靠的日志收集和分析系统,就很难有效地监控和维护系统。本文将介绍如何基于Spring Boot和Flume构建一个高效的日志收集和分析系统。
- 前置条件
在开始之前,需要安装和设置以下软件:
- JDK 8 或以上版本
- Maven 3.3 或以上版本
- Apache Flume 1.9.0 或以上版本
- Elasticsearch 7.6.2 或以上版本
- Kibana 7.6.2 或以上版本
- Spring Boot应用配置
首先,我们需要创建一个Spring Boot应用,并添加所需的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
在application.properties文件中,添加以下配置:
# 应用端口号
server.port=8080
# log4j2配置
logging.config=classpath:log4j2.xml
# flume配置
flume.agentName=myflume
flume.sourceType=avro
flume.clientType=load-balancing
flume.hosts=localhost:41414
# elasticsearch配置
spring.elasticsearch.rest.uris=http://localhost:9200
以上配置中,我们指定了应用程序的端口号、log4j2配置文件、Flume的相关配置和Elasticsearch的访问URI。
- 日志收集器
为了将应用程序日志发送到Flume,我们需要创建一个自定义的log4j2 Appender。
@Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
public class FlumeAppender extends AbstractAppender {
private static final ObjectMapper MAPPER = new ObjectMapper();
private final FlumeClient client;
private final String sourceType;
protected FlumeAppender(String name, Filter filter, Layout<? extends Serializable> layout,
FlumeClient client, String sourceType) {
super(name, filter, layout, true);
this.client = client;
this.sourceType = sourceType;
}
@PluginFactory
public static FlumeAppender createAppender(@PluginAttr("name") String name,
@PluginElement("Filters") Filter filter,
@PluginElement("Layout") Layout<? extends Serializable> layout,
@PluginAttr("sourceType") String sourceType,
@PluginAttr("hosts") String hosts) {
if (name == null) {
LOGGER.error("FlumeAppender missing name");
return null;
}
if (client == null) {
LOGGER.error("FlumeAppender missing client");
return null;
}
return new FlumeAppender(name, filter, layout, createClient(hosts), sourceType);
}
private static FlumeClient createClient(String hosts) {
LoadBalancingRpcClient rpcClient = new LoadBalancingRpcClient();
String[] hostArray = hosts.split(",");
for (String host : hostArray) {
String[] hostParts = host.split(":");
rpcClient.addHost(new InetSocketAddress(hostParts[0], Integer.parseInt(hostParts[1])));
}
Properties props = new Properties();
props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "default_loadbalance");
props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, hosts);
props.setProperty(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, "10000");
AvroEventSerializer serializer = new AvroEventSerializer();
serializer.configure(props, false);
return new FlumeClient(rpcClient, serializer);
}
@Override
public void append(LogEvent event) {
try {
byte[] body = ((StringLayout) this.getLayout()).toByteArray(event);
Map<String, String> headers = new HashMap<>();
headers.put("timestamp", Long.toString(event.getTimeMillis()));
headers.put("source", "log4j");
headers.put("sourceType", sourceType);
Event flumeEvent = EventBuilder.withBody(body, headers);
client.sendEvent(flumeEvent);
} catch (Exception e) {
LOGGER.error("Failed to send event to Flume", e);
}
}
}
以上代码中,我们实现了一个log4j2 Appender,它会将日志事件打包成一个Flume Event,并发送到Flume服务器。
创建一个log4j2配置文件,配置FlumeAppender。
<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
<Appenders>
<Flume name="flume" sourceType="spring-boot" hosts="${flume.hosts}">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Flume>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="flume"/>
</Root>
</Loggers>
</Configuration>
在这个log4j2配置文件中,我们定义了一个FlumeAppender,并在Root Logger中引用它。
- Flume配置
我们需要配置Flume,在Flume Agent中接收从应用程序发送的日志消息,并将它们发送到Elasticsearch。
创建一个Flume配置文件,如下所示。
# Define the agent name and the agent sources and sinks
myflume.sources = mysource
myflume.sinks = mysink
myflume.channels = channel1
# Define the source
myflume.sources.mysource.type = avro
myflume.sources.mysource.bind = 0.0.0.0
myflume.sources.mysource.port = 41414
# Define the channel
myflume.channels.channel1.type = memory
myflume.channels.channel1.capacity = 10000
myflume.channels.channel1.transactionCapacity = 1000
# Define the sink
myflume.sinks.mysink.type = org.elasticsearch.hadoop.flume.ElasticsearchSink
myflume.sinks.mysink.hostNames = localhost:9200
myflume.sinks.mysink.indexName = ${type}-%{+YYYY.MM.dd}
myflume.sinks.mysink.batchSize = 1000
myflume.sinks.mysink.typeName = ${type}
# Link the source and sink with the channel
myflume.sources.mysource.channels = channel1
myflume.sinks.mysink.channel = channel1
在Flume配置文件中,我们定义了一个agent,一个source和一个sink。source是一个avro类型,绑定
.........................................................