整合营销服务商

电脑端+手机端+微信端=数据同步管理

免费咨询热线:

Spring 源码-BeanFactoryPostProcessor怎么执行的(6)

bstractApplicationContext提供的postProcessBeanFactory空方法

postProcessBeanFactory这个方法没名字跟BeanFactoryPostProcessor接口中的方法一样,但是他的功能是提供给子类进行添加一些额外的功能,比如添加BeanPostProcessor接口的实现,或者定制一些其他的功能也是可以的,因为这个方法你可以拿到BeanFactory,自然是可以对他进行一些功能的定制的。

这里看下Spring 提供的子类GenericWebApplicationContext是如何实现的:

@Override
protected void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) {
  if (this.servletContext != null) {
    beanFactory.addBeanPostProcessor(new ServletContextAwareProcessor(this.servletContext));
    beanFactory.ignoreDependencyInterface(ServletContextAware.class);
  }
  WebApplicationContextUtils.registerWebApplicationScopes(beanFactory, this.servletContext);
  WebApplicationContextUtils.registerEnvironmentBeans(beanFactory, this.servletContext);
}

这里他注册了一个ServletContextAwreProcessor 到beanFactory中,ServletContexAwareProcessor是一个BeanPostProcessor接口的子类。

重头戏BeanFactoryPostProcessor

接下来分析AbstractApplicationContext#refresh中的invokeBeanFactoryPostProcessors方法,这个方法用来注册和执行BeanFactoryPostProcessor的。

直接上源码:

protected void invokeBeanFactoryPostProcessors(ConfigurableListableBeanFactory beanFactory) {
  // 执行所有的BeanFactoryPostProcessor
  PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(beanFactory, getBeanFactoryPostProcessors());

  // Detect a LoadTimeWeaver and prepare for weaving, if found in the meantime
  // (e.g. through an @Bean method registered by ConfigurationClassPostProcessor)
  // aop的处理
  if (beanFactory.getTempClassLoader() == null && beanFactory.containsBean(LOAD_TIME_WEAVER_BEAN_NAME)) {
    beanFactory.addBeanPostProcessor(new LoadTimeWeaverAwareProcessor(beanFactory));
    beanFactory.setTempClassLoader(new ContextTypeMatchClassLoader(beanFactory.getBeanClassLoader()));
  }
}

重点在这里:

PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(beanFactory, getBeanFactoryPostProcessors());

首先获取BeanFactoryPostProcessor的集合,这里获取到都是用户在定制BeanFactory时add加入进去的,进入这个方法:

public static void invokeBeanFactoryPostProcessors(
  ConfigurableListableBeanFactory beanFactory, List<BeanFactoryPostProcessor> beanFactoryPostProcessors) {

  // Invoke BeanDefinitionRegistryPostProcessors first, if any.
  // 已经处理的Bean
  Set<String> processedBeans = new HashSet<>();
  // 先进性外部BFPP的处理,并且判断当前Factory是否是BeanDefinitionRegistry
  if (beanFactory instanceof BeanDefinitionRegistry) {
    BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
    // 保存BFPP的Bean
    List<BeanFactoryPostProcessor> regularPostProcessors = new ArrayList<>();
    // 保存BDRPP的Bean
    List<BeanDefinitionRegistryPostProcessor> registryProcessors = new ArrayList<>();
    // 开始处理外部传入的BFPP
    for (BeanFactoryPostProcessor postProcessor : beanFactoryPostProcessors) {
      // 先处理BDRPP
      if (postProcessor instanceof BeanDefinitionRegistryPostProcessor) {
        BeanDefinitionRegistryPostProcessor registryProcessor =
          (BeanDefinitionRegistryPostProcessor) postProcessor;
        // 直接调用BDRPP的接口方法,后面的postProcessBeanFactory 方法后面统一处理
        registryProcessor.postProcessBeanDefinitionRegistry(registry);
        // 加入到BFPP的集合中
        registryProcessors.add(registryProcessor);
      }
      else {
        // 加入到BDRPP的集合中
        regularPostProcessors.add(postProcessor);
      }
    }

    // Do not initialize FactoryBeans here: We need to leave all regular beans
    // uninitialized to let the bean factory post-processors apply to them!
    // Separate between BeanDefinitionRegistryPostProcessors that implement
    // PriorityOrdered, Ordered, and the rest.
    // 保存当前的BDRPP
    List<BeanDefinitionRegistryPostProcessor> currentRegistryProcessors = new ArrayList<>();

    // First, invoke the BeanDefinitionRegistryPostProcessors that implement PriorityOrdered.
    // 按类型获取BeanName
    String[] postProcessorNames =
      beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
    for (String ppName : postProcessorNames) {
      // 判断当前的beanName是都是实现了PriorityOrdered
      if (beanFactory.isTypeMatch(ppName, PriorityOrdered.class)) {
        // 加入到当前注册的BDRPP集合中
        currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
        // 加入到已经处理的bean集合中
        processedBeans.add(ppName);
      }
    }
    // 对当前的BDRPP进行排序
    sortPostProcessors(currentRegistryProcessors, beanFactory);
    // 将当前的BDRPP全部加入到最前面定义的BDRPP的集合中
    registryProcessors.addAll(currentRegistryProcessors);
    // 执行当前的BDRPP的postProcessBeanDefinitionRegistry方法
    invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
    // 清空当前的BDRPP
    currentRegistryProcessors.clear();

    // Next, invoke the BeanDefinitionRegistryPostProcessors that implement Ordered.
    // 再次获取bdrpp,因为上面的执行可能还会加入新的bdrpp进来
    postProcessorNames = beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
    for (String ppName : postProcessorNames) {
      // 判断是否已经处理过,并且是否实现了Ordered接口
      if (!processedBeans.contains(ppName) && beanFactory.isTypeMatch(ppName, Ordered.class)) {
        // 加入到当前的BDRPP的集合中
        currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
        // 添加到已经处理的集合中
        processedBeans.add(ppName);
      }
    }
    // 排序
    sortPostProcessors(currentRegistryProcessors, beanFactory);
    // 加入到BDRPP集合中
    registryProcessors.addAll(currentRegistryProcessors);
    // 执行bdrpp的postProcessBeanDefinitionRegistry方法
    invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
    // 清空当前bdrpp集合
    currentRegistryProcessors.clear();

    // Finally, invoke all other BeanDefinitionRegistryPostProcessors until no further ones appear.
    boolean reiterate = true;
    // 循环去获取BDRPP,然后进行排序、执行操作,直到所有的BDRPP全部执行完
    while (reiterate) {
      reiterate = false;
      // 获取BDRPP
      postProcessorNames = beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
      for (String ppName : postProcessorNames) {
        // 如果已经处理过,就执行BDRPP,并且退出循环,否则继续循环
        if (!processedBeans.contains(ppName)) {
          currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
          processedBeans.add(ppName);
          reiterate = true;
        }
      }
      // 排序
      sortPostProcessors(currentRegistryProcessors, beanFactory);
      // 加入到BDRPP集合中
      registryProcessors.addAll(currentRegistryProcessors);
      // 执行bdrpp
      invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
      currentRegistryProcessors.clear();
    }

    // Now, invoke the postProcessBeanFactory callback of all processors handled so far.
    // 执行bdrpp 中的postProcessBeanFactory方法
    invokeBeanFactoryPostProcessors(registryProcessors, beanFactory);
    // 执行bfpp 中的postProcessBeanFactory方法
    invokeBeanFactoryPostProcessors(regularPostProcessors, beanFactory);
  }

  else {
    // 如果不是bdrpp,那么直接执行bfpp的postProcessBeanFactory
    // Invoke factory processors registered with the context instance.
    invokeBeanFactoryPostProcessors(beanFactoryPostProcessors, beanFactory);
  }

  // Do not initialize FactoryBeans here: We need to leave all regular beans
  // uninitialized to let the bean factory post-processors apply to them!
  // 获取BFPP的beanName集合
  String[] postProcessorNames =
    beanFactory.getBeanNamesForType(BeanFactoryPostProcessor.class, true, false);

  // Separate between BeanFactoryPostProcessors that implement PriorityOrdered,
  // Ordered, and the rest.
  // 定义实现了PriorityOrdered的BFPP
  List<BeanFactoryPostProcessor> priorityOrderedPostProcessors = new ArrayList<>();
  // 定义实现了Ordered接口的集合
  //		List<String> orderedPostProcessorNames = new ArrayList<>();
  List<BeanFactoryPostProcessor> orderedPostProcessors = new ArrayList<>();
  // 定义没有排序的集合
  //		List<String> nonOrderedPostProcessorNames = new ArrayList<>();
  List<BeanFactoryPostProcessor> nonOrderedPostProcessors = new ArrayList<>();
  for (String ppName : postProcessorNames) {
    // 如果已经处理过了就不做处理
    if (processedBeans.contains(ppName)) {
      // skip - already processed in first phase above
    }
    else if (beanFactory.isTypeMatch(ppName, PriorityOrdered.class)) {
      priorityOrderedPostProcessors.add(beanFactory.getBean(ppName, BeanFactoryPostProcessor.class));
    }
    else if (beanFactory.isTypeMatch(ppName, Ordered.class)) {
      //				orderedPostProcessorNames.add(ppName);
      orderedPostProcessors.add(beanFactory.getBean(ppName,BeanFactoryPostProcessor.class));
    }
    else {
      //				nonOrderedPostProcessorNames.add(ppName);
      nonOrderedPostProcessors.add(beanFactory.getBean(ppName,BeanFactoryPostProcessor.class));
    }
  }

  // First, invoke the BeanFactoryPostProcessors that implement PriorityOrdered.
  // 排序
  sortPostProcessors(priorityOrderedPostProcessors, beanFactory);
  // 先执行PriorityOrdered接口的bfpp
  invokeBeanFactoryPostProcessors(priorityOrderedPostProcessors, beanFactory);

  // Next, invoke the BeanFactoryPostProcessors that implement Ordered.
  // 这里将上面获取到Ordered接口的BFPP进行集合转换,然后排序,然后执行,这里其实可以直接合并,
  // 在上述进行获取时就放在这个orderedPostProcessors集合中
  //		List<BeanFactoryPostProcessor> orderedPostProcessors = new ArrayList<>(orderedPostProcessorNames.size());
  //		for (String postProcessorName : orderedPostProcessorNames) {
  //			orderedPostProcessors.add(beanFactory.getBean(postProcessorName, BeanFactoryPostProcessor.class));
  //		}
  sortPostProcessors(orderedPostProcessors, beanFactory);
  invokeBeanFactoryPostProcessors(orderedPostProcessors, beanFactory);

  // Finally, invoke all other BeanFactoryPostProcessors.
  // 处理没有排序的
  //		List<BeanFactoryPostProcessor> nonOrderedPostProcessors = new ArrayList<>(nonOrderedPostProcessorNames.size());
  //		for (String postProcessorName : nonOrderedPostProcessorNames) {
  //			nonOrderedPostProcessors.add(beanFactory.getBean(postProcessorName, BeanFactoryPostProcessor.class));
  //		}
  invokeBeanFactoryPostProcessors(nonOrderedPostProcessors, beanFactory);

  // Clear cached merged bean definitions since the post-processors might have
  // modified the original metadata, e.g. replacing placeholders in values...
  // 清除缓存的元数据,因为经过BFPP的执行,可能BeanDefinition的属性值已经个变化,比如使用占位符的属性值
  beanFactory.clearMetadataCache();
}

这个方法大概很长,实际上就做了一下这么几点事情:

  • 先执行外部传入的BeanFactoryPostProcessor的实现
  • 处理时先处理BeanFactoryPostProcessor的子接口BeanDefinitionRegistryPostProcessor的实现
  • 处理BeanDefinitionRegistryPostProcessor实现的时候先处理实现了PriorityOrdered接口的实现
  • 处理完PriorityOrdered接口实现的类之后再处理实现了Ordered接口的实现
  • 处理完Ordered接口的实现类之后处理没有排序的
  • 处理完BeanDefinitionRegistryPostProcessor的实现之后处理BeanFactoryPostProcessor的实现
  • 处理顺序也是PriorityOreded,Ordered,没有排序的

这里大概逻辑就是这个,看起来可能不是很懂,画个流程图:

通过流程图可以简化为:先遍历执行外部传入的BFPP,再执行BDRPP,再执行BFPP三部分,处理每一部分可能会进行排序操作,排序按照PriorityOrdered,Ordered,noSort进行排序再执行。

这里解释下BeanDefinitionRegistryPostProcessor,这个接口是BeanFactoryPostProcessor,它里面包含一个方法叫postProcessBeanDefinitionRegistry,这个方法非常重要,在实现类ConfigurationClassPostProcessor中就是使用这个方法进行注解的解析的,而且这个类也是实现SpringBoot自动装配的关键。

ConfigurationClassPostProcessor这个类是什么时候加入到Spring容器的呢?

在我们启动容器的时候,Spring会进行BeanDefinition的扫描,如果我们在xml配置文件中开启了注解扫描:

<context:component-scan base-package="com.redwinter.test"/>

那么这个时候就会自动添加多个BeanDefinition到Spring容器中,beanName为org.springframework.context.annotation.internalConfigurationAnnotationProcessor,其他还有几个:

前面的文章 https://www.cnblogs.com/redwinter/p/16165878.html 讲到自定义标签,在spring解析xml时分为默认的命名空间和自定义的命名空间的,而context就是自定义的命名空间,这个标签的解析器为ComponentScanBeanDefinitionParser,这个类中的parse方法就是解析逻辑处理:

@Override
@Nullable
public BeanDefinition parse(Element element, ParserContext parserContext) {
  String basePackage = element.getAttribute(BASE_PACKAGE_ATTRIBUTE);
  basePackage = parserContext.getReaderContext().getEnvironment().resolvePlaceholders(basePackage);
  String[] basePackages = StringUtils.tokenizeToStringArray(basePackage,
                                                            ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS);
  // Actually scan for bean definitions and register them.
  // 配置扫描器
  ClassPathBeanDefinitionScanner scanner = configureScanner(parserContext, element);
  // 扫描BeanDefinition,在指定的包下
  Set<BeanDefinitionHolder> beanDefinitions = scanner.doScan(basePackages);
  // 注册组件
  registerComponents(parserContext.getReaderContext(), beanDefinitions, element);

  return null;
}

这个方法执行流程:

  • 创建一个配置扫描器
  • 扫描指定包下标有注解的类并解析为BeanDefinition
  • 执行registerComponents方法,注册组件

registerComponents方法里面就是添加ConfigurationClassPostProcessor的地方,由于代码太多这里只贴部分代码:

// ...省略部分代码
Set<BeanDefinitionHolder> beanDefs = new LinkedHashSet<>(8);
		// 判断注册器中个是否包含org.springframework.context.annotation.internalConfigurationAnnotationProcessor
		// 不包含就加入一个ConfigurationClassPostProcessor的BeanDefinition
		// 用于解析注解
		if (!registry.containsBeanDefinition(CONFIGURATION_ANNOTATION_PROCESSOR_BEAN_NAME)) {
			// 创建一个BeanDefinition为ConfigurationClassPostProcessor
			RootBeanDefinition def = new RootBeanDefinition(ConfigurationClassPostProcessor.class);
			def.setSource(source);
			// 注册一个beanName为org.springframework.context.annotation.internalConfigurationAnnotationProcessor
			// 的BeanDefinition,class为ConfigurationClassPostProcessor
			beanDefs.add(registerPostProcessor(registry, def, CONFIGURATION_ANNOTATION_PROCESSOR_BEAN_NAME));
		}
		// 创建一个AutowiredAnnotationBeanPostProcessor的BeanDefinition
		// 用于自动装配
		if (!registry.containsBeanDefinition(AUTOWIRED_ANNOTATION_PROCESSOR_BEAN_NAME)) {
			RootBeanDefinition def = new RootBeanDefinition(AutowiredAnnotationBeanPostProcessor.class);
			def.setSource(source);
			beanDefs.add(registerPostProcessor(registry, def, AUTOWIRED_ANNOTATION_PROCESSOR_BEAN_NAME));
		}
// ...省略部分代码

源码中注册了一个beanName为CONFIGURATION_ANNOTATION_PROCESSOR_BEAN_NAME常量的名字,这个常量就是org.springframework.context.annotation.internalConfigurationAnnotationProcessor,class为ConfigurationClassPostProcessor

那注解的解析是如何进行解析的呢?由于篇幅过长,下一篇再来解析。


如果本文对你有帮助,别忘记给我个3连 ,点赞,转发,评论,,咱们下期见。

收藏 等于白嫖,点赞才是真情。





原文 https://www.cnblogs.com/redwinter/p/16196359.html

做过微信或支付宝支付的童鞋,可能遇到过这种问题,就是填写支付结果回调,就是在支付成功之后,支付宝要根据我们给的地址给我们进行通知,通知我们用户是否支付成功,如果成功我们就要去处理下面相应的业务逻辑,如果在测试服务,那么这个回调地址我们就需要填写测试服务的,如果发布到线上那么我们就需要改成线上的地址。

针对上面的场景,我们一般都会通过如下的方式,进行一个动态配置,不需要每次去改,防止出现问题。

public class PayTest {

    @Value("${spring.profiles.active}")
    private String environment;

    public Object notify(HttpServletRequest request) {

        if ("prod".equals(environment)) {
            // 正式环境
        } else if ("test".equals(environment)) {

            // 测试环境
        }
        return "SUCCESS";
    }
}
复制代码

上面的代码看起来没有一点问题,但是身为搬砖的我们咋可能这样搬,姿势不对呀!

问题:

扩展性太差,如果这个参数我们还需要在别的地方用到,那么我们是不是还要使用@Value的注解获取一遍,假如有天我们的leader突然说吗,test这个单词看着太low了,换个高端一点的,换成dev,那么我们是不是要把项目中所有的test都要改过来,如果少还好,要是很多,那我们怕不是凉了。

所以我们能不能将这些配置参数搞成一个全局的静态变量,这样的话我们直接饮用就好了,哪怕到时候真的要改,那我也只需要改动一处就好了。

注意大坑

有的朋友可能就比较自信了,那我直接加个static修饰下不就好了,如果你真是打算这样做,那你就准备卷好铺盖走人吧。直接加static获取到的值其实是一个null,至于原因,大家复习下类以及静态变量变量的加载顺序。

@PostConstruct注解

那么既然说出了问题,肯定就有解决方法,不然你以为我跟你玩呢。

首先这个注解是由Java提供的,它用来修饰一个非静态的void方法。它会在服务器加载Servlet的时候运行,并且只运行一次

改造:

@Component
public class SystemConstant {

    public static String surroundings;

    @Value("${spring.profiles.active}")
    public String environment;

    @PostConstruct
    public void initialize() {
        System.out.println("初始化环境...");
        surroundings = this.environment;
    }
}
复制代码

结果:

我们可以看到在项目启动的时候进行了初始化


到这里我们已经可以拿到当前运行的环境是测试还是正式,这样就可以做到动态配置


最后想说

其实这个注解远不止这点用处,像我之前写的Redis工具类,我使用的是RedisTemplate操作Redis,导致写出来的方法没办法用static修饰,每次使用Redis工具类只能先注入到容器然后再调用,使用了这个注解就可以完美的解决这种尴尬的问题。代码如下。

- 简介

这个系列是我学习Flink之后,想到加强一下我的FLink能力,所以开始一系列的自己拟定业务场景,进行开发。

这里更类似笔记,而不是教学,所以不会特别细致,敬请谅解。

这里是实战的,具体一些环境,代码基础知识不会讲解,例如docker,flink语法之类的,看情况做具体讲解,所以需要一些技术门槛。

2 - 准备

  • flink - 1.12.0
  • elasticsearch - 7.12
  • kafka - 2.12-2.5.0
  • kibana - 7.12
  • filebeat - 7.12

这里就不做下载地址的分享了,大家自行下载吧。

3 - 代码

Flink代码

maven pom依赖,别问为啥这么多依赖,问我就说不知道,你就复制吧。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.iolo</groupId>
    <artifactId>flink_study</artifactId>
    <version>1.0.0</version>
    <!-- 指定仓库位置,依次为aliyun、apache和cloudera仓库 -->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>apache</id>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

    <properties>
        <encoding>UTF-8</encoding>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <java.version>1.8</java.version>
        <scala.version>2.12</scala.version>
        <flink.version>1.12.0</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- blink执行计划,1.11+默认的-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- flink连接器-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>flink-streaming-java_2.11</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>flink-runtime_2.11</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>flink-core</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>flink-java</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-10.0</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>

        <!-- 日志 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.44</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp</artifactId>
            <version>4.9.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <!-- 打包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!--
                                        zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- 设置jar包的入口类(可选) -->
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

下面是flink的,具体讲解都在代码里

package com.iolo.flink.cases;

import com.alibaba.fastjson.JSONObject;
import com.iolo.common.util.DingDingUtil;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.shaded.hadoop2.com.google.gson.Gson;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * @author iolo
 * @date 2021/3/17
 * 监控日志实时报警
 * <p>
 * 准备环境
 * 1 - flink 1.12
 * 2 - kafka
 * 3 - filebeat
 * 4 - Springboot服务(可以产生各类级别日志的接口)
 * 5 - es+kibana
 * <p>
 * filebeat 监控Springboot服务日志 提交给kafka(主题sever_log_to_flink_consumer)
 * flink消费kafka主题日志 ,整理收集,如果遇到error日志发送邮件,或者发钉钉(这里可以调用Springboot服务,或者直接flink发送)
 * 然后将所有日志存入es 进行 kibana分析
 **/
public class case_1_kafka_es_log {
    public static void main(String[] args) throws Exception {
        // TODO env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        Properties props = new Properties();
        //集群地址
        props.setProperty("bootstrap.servers", "127.0.0.1:9092");
        //消费者组id
        props.setProperty("group.id", "test-consumer-group");
        //latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费
        //earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
        props.setProperty("auto.offset.reset", "latest");
        //会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
        props.setProperty("flink.partition-discovery.interval-millis", "5000");
        //自动提交(提交到默认主题,后续学习了Checkpoint后随着Checkpoint存储在Checkpoint和默认主题中)
        props.setProperty("enable.auto.commit", "true");
        //自动提交的时间间隔
        props.setProperty("auto.commit.interval.ms", "2000");

        // TODO source
        FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<>("sever_log_to_flink_consumer", new SimpleStringSchema(), props);

        DataStreamSource<String> ds = env.addSource(source);

        // TODO transformation
        SingleOutputStreamOperator<Tuple3<String, String, String>> result = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public void flatMap(String s, Collector<Tuple3<String, String, String>> collector) throws Exception {
                JSONObject json = (JSONObject) JSONObject.parse(s);
                String timestamp = json.getString("@timestamp");
                String message = json.getString("message");
                String[] split = message.split(" ");
                String level = split[3];
                if ("[ERROR]".equalsIgnoreCase(level)) {
                    System.out.println("error!");
                    DingDingUtil.dingdingPost("error");
                }

                collector.collect(Tuple3.of(timestamp, level, message));
            }
        });

        // TODO sink
        result.print();

        /**
         * https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/elasticsearch.html
         */
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
        ElasticsearchSink.Builder<Tuple3<String, String, String>> esSinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ElasticsearchSinkFunction<Tuple3<String, String, String>>() {
                    @Override
                    public void process(Tuple3<String, String, String> stringStringStringTuple3, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                        Map<String, String> json = new HashMap<>();
                        json.put("@timestamp", stringStringStringTuple3.f0);
                        json.put("level", stringStringStringTuple3.f1);
                        json.put("message", stringStringStringTuple3.f2);
                        IndexRequest item = Requests.indexRequest()
                                .index("my-log")
                                .source(json);
                        requestIndexer.add(item);
                    }
                });
        esSinkBuilder.setBulkFlushMaxActions(1);
        result.addSink(esSinkBuilder.build());
        // TODO execute
        env.execute("case_1_kafka_es_log");
    }
}

其中为了告警通知,做了个钉钉自定义机器人通知,需要的可以去百度查看一下,很方便。

https://developers.dingtalk.com/document/app/custom-robot-access/title-jfe-yo9-jl2

package com.iolo.common.util;

import lombok.extern.slf4j.Slf4j;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;

import java.io.IOException;

/**
 * @author iolo
 * @date 2021/3/30
 * https://developers.dingtalk.com/document/app/custom-robot-access/title-jfe-yo9-jl2
 **/
@Slf4j
public class DingDingUtil {
    private static final String url = "https://oapi.dingtalk.com/robot/send?access_token=你自己的token替换";

    /**
     * 秘钥token
     *
     * @param
     * @return java.lang.String
     * @author fengxinxin
     * @date 2021/3/30 下午5:03
     **/
    public static void dingdingPost(String text) throws Exception {
        MediaType JSON = MediaType.parse("application/json");
        OkHttpClient client = new OkHttpClient();
        String json = "{\"msgtype\": \"text\",\"text\": {\"content\": \"FlinkLog:" + text + "\"}}";
        RequestBody body = RequestBody.create(JSON, json);
        Request request = new Request.Builder()
                .url(url)
                .post(body)
                .build();
        try (Response response = client.newCall(request).execute()) {
            String responseBody = response.body().string();
            log.info(responseBody);

        } catch (IOException e) {
            log.error(e.getMessage());
        }
    }
}

然后可以直接在控制面板直接启动这个main方法


Springboot

gitee地址直接下载,不做详细讲解

接口地址 http://127.0.0.1:8080/test/log?level=error&count=10


Kafka

操作命令,这些命令都是在Kafka里的bin目录下,Zookeeper是kafka自带的那个

# Zookeeper 启动命令
./zookeeper-server-start.sh ../config/zookeeper.properties
# Kafka 启动命令
./kafka-server-start.sh ../config/server.properties
# 创建 topic sever_log_to_flink_consumer
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sever_log_to_flink_consumer
# 查看是否创建成功
./kafka-topics.sh --list --zookeeper localhost:2181
# 这是生产者
./kafka-console-producer.sh --broker-list localhost:9092 --topic sever_log_to_flink_consumer
# 这是消费者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sever_log_to_flink_consumer --from-beginning

Elasticsearch

这里开始使用docker,具体环境可以自行搭建,并且以后docker的场景会越来越多,直接上命令。

docker run \
--name fxx-es \
-p 9200:9200 \
-p 9300:9300 \
-v /Users/iOLO/dev/docker/es/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-e "discovery.type=single-node" \
docker.elastic.co/elasticsearch/elasticsearch:7.12.0

验证


Kibana

docker run \
--name fxx-kibana \
--link fxx-es:elasticsearch \
-p 5601:5601 \
docker.elastic.co/kibana/kibana:7.12.0

我这里去容器内部设置中文,你可以不做

设置的方法,在配置文件kibana.yml增加i18n.locale: "zh-CN"

验证 地址 是 127.0.0.1:5601

具体操作的时候进行图文讲解

Filebeat

下载地址 https://www.elastic.co/cn/downloads/beats/filebeat

选择自己电脑环境进行下载,我是MAC

解压之后修改配置文件里,直接上配置文件

# ============================== Filebeat inputs ===============================

filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

- type: log

  # Change to true to enable this input configuration. 这里是需要修改
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths. 这里改成你本地下载那个Springboot的log文件地址
  paths:
    - /Users/iOLO/dev/Java/flinklog/logs/flink-log.*.log

# ------------------------------ Kafka Output -------------------------------
output.kafka:
  # initial brokers for reading cluster metadata kafka的连接地址,这是直接从官网粘贴过来的,
  # https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html
  hosts: ["127.0.01:9092"]

  # message topic selection + partitioning 然后就是消费topic,其他都是官网的默认值,我就没做改动
  topic: 'sever_log_to_flink_consumer'
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000


4 - 实战

环境和程序都准备好了之后,别忘了启动Springboot服务

然后通过请求接口服务 127.0.0.1:8080/test/log?level=error&count=10 来产生日志

通过查看钉钉 看是否有报警信息


钉钉成功!!!

然后就是Kibana的操作

直接上结果页面

然后就是操作步骤

第一先去es选择index

第二步根据红框点击进去es查询index页面

最后在输入框里查询你刚才的index ,咱们的代码就是my-index,根据提示进行下一步,我这里已经创建过了,所以就不再演示。最后就可以会有之前页面的效果。

5 - 结束语

整体就是这样,很多人肯定会提出质疑,说直接filebeat+ELK 也能完成这类效果,好的,你别杠,我这是学习flink之后,然后自己出业务场景,进行flink的实战总结,如果你有更好的方案,就干你的。

然后如果大家有啥想要的,遇到的场景,都可以提出来,我会斟酌后进行采纳进行实战实施。

最后感谢阅读。