xcye

嘿嘿

原理与实践:在网关中实现灵活的埋点上报

2024-07-09

埋点是指在应用程序的特定位置上插入代码,用于收集和记录用户行为数据。埋点上报则是将这些收集到的数据通过网络传输到服务器进行存储和分析。

埋点上报可以简单分为客户端和服务器端上报。他们两个各有各的优点和缺点,客户端埋点的话,能够获取到更真实的用户代理信息(设备Id,操作系统,浏览器版本等)。服务端埋点的话,服务器能够获取到用户非公开数据,但是如果埋点发生了更改,也势必需要修改服务端的代码。

网关埋点是在网关服务中进行的,其基本处理逻辑为,对每一个请求进行拦截判断,如果满足要求,则解析请求体中的参数数据,然后手动的组装埋点上报参数,调用上报接口进行埋点上报。

由上面的定义可以知道网关埋点上报的整体流程,将埋点上报从客户端转移到了网关层。我目前使用到的场景为,如果在新版本的客户端中加了埋点功能,然后存在部分用户或者很多用户都没升级,那么便会导致收集到的埋点数据不全。对于这种情况,就可以使用网关埋点上报解决。

在网关处从低版本客户端发起的请求中获取埋点事件所需要的参数(请求由哪个客户端发起是可以判断的),然后由网关层进行埋点上报。等到后续,所有或大部分用户都完成升级时,再从网关中移除这个埋点事件配置就可以了。

但是网关埋点上报也存在弊端,其工作原理是从请求中获取所需要的数据,那么在某些接口中不可用。如果埋点中需要传递用户代理信息,那么网关埋点就无法实现(请求中只能获取到大致的代理信息)。

实现原理

在业务中会存在,埋点事件需要的参数值并不能直接从请求体中直接获取到,需要多一步转换的过程。比如请求体中的参数只包含身份证号,埋点事件A需要接收的参数却是年龄,性别,出生日期。像这种,如果不使用代码方式,直接使用配置方式,是无法做到直接从身份证号中获取年龄,性别,出生日期的。如果直接在网关中将代码写死,那么就缺少了扩展性。

既要有扩展性,又要能够简单的增加或移除,最好的办法就是使用配置+自定义代码的形式。将拦截每一个请求并从请求参数中解析埋点事件参数的代码通过配置的方式,在配置中进行配置。这样我们就能够做到,在不重启服务的情况下,也能新增或移除对某个请求的拦截,而且自定义代码,可以对请求参数值进行任意构造。上面说的在配置中配置解析代码,是指配置解析代码的文件。

得益于JVM的动态加载功能,对于每一次新增加的参数解析代码,我们可以利用JVM机制,从.java文件中获取源代码,然后在运行期间使用Groovy动态加载该类。这样我们就可以在不停止服务的情况下,向Java程序中加入额外的代码。

具体的流程为,定义一个参数解析接口类ParamValueParser,解析方法为parse(String originParamValue),该方法的返回值将作为埋点参数解析最终值。如果需要对某个参数进行解析,只需要创建一个新的xxx.java,让该类实现ParamValueParser接口,并且重写其中的parse()方法。

// ParamValueParser.java
public interface ParamValueParser {
    String parse(String originParamValue);
}

public class ParseAgeFromIdCard implements ParamValueParser {

    @Override
    public String parse(String originParamValue) {
        return String.valueOf(cn.hutool.core.util.IdcardUtil.getAgeByIdCard(originParamValue));
    }
}

基于这样的设计,可以解决上述中的扩展性问题,但是因为需要网关动态类并执行,所以在安全性上会存在问题。

我目前使用的网关和配置组件分别是Spring Cloud Gateway、Nacos,在Gateway中,提供了过滤器机制,可以获取到一个请求的任意信息。将接口拦截配置放置在Nacos中。但是在编码的时候,需要保证这部分代码对网关的影响很低。无论埋点参数解析或者上报成功与否,都不能有影响。

功能实现

这部分会包含埋点事件,埋点上报,网关接口拦截配置类的Pojo定义,以及Spring Cloud Gateway的Filter中的代码实现。

埋点上报类

public class TrackDataSubmitWebPojo {
    
    /**
     * 事件参数
     */
    private Map<String, Object> eventParams;

    /**
     * 项目名称
     */
    private String projectName;

    /**
     * 事件值
     */
    private String eventId;
}

在埋点上报Pojo中,只有三个参数,他们都是必须的。其中eventId是固定的,但是projectName这个值可能是直接在配置文件中配置的,也可以是从请求参数中获取,eventParams参数值一定是从请求体中获取并组装的。

配置类

配置类是用于配置在网关处,哪些请求需要进行拦截并解析请求参数。会配置多个拦截请求地址,并且每个请求地址和其对应的埋点上报类的值都不一样,所以综合下来,该配置类的Pojo为。

@Data
@ConfigurationProperties(prefix = "xyz.xcye.gateway")
public class GatewayProperties {

    /**
     * 拦截请求信息进行埋点事件上报配置
     */
    private InterceptEventTrackProperties interceptEventTrack;
}

@Data
public class InterceptEventTrackProperties {
    
    /**
     * 是否启用
     */
    private Boolean enable = false;
    
    /**
     * 拦截配置文件根路径
     */
    private String paramParseSourceCodePath;
    
    /**
     * 接口拦截配置
     */
    private List<InterfaceTrackInterceptConfig> interfaceInterceptConfigList;
}

@Data
public class InterfaceTrackInterceptConfig {

    /**
     * 需要拦截的uri
     */
    private String url;

    /**
     * 请求方法
     */
    private String requestMethod = HttpMethod.POST.name();

    /**
     * 项目名
     */
    private String projectName;

    /**
     * 项目名称是否来自于请求参数
     */
    private Boolean projectNameFromRequestParam = false;

    /**
     * 项目名称来自于哪个请求参数
     */
    private String projectNameFromRequestParamName;

    /**
     * 上报的埋点事件id
     */
    private String eventId;

    /**
     * 拦截的参数集合
     */
    private List<InterceptParam> interceptParamList;

    @Data
    public static class InterceptParam {

        /**
         * 请求参数名
         */
        private String requestParamName;

        /**
         * 埋点上报参数名
         */
        private String eventParamName;

        /**
         * 是否为空
         */
        private Boolean allowEmpty;
        
        /**
         * 为空是否的默认值,必须empty为true时,才生效
         */
        private String defaultValue;
        
        /**
         * 参数值限制长度
         */
        private Integer lengthLimit;
        
        /**
         * 解析参数值的方法文件名
         */
        private String paramParseFileName;
    }
}

假设/aa/verifyIdCard这个请求的请求参数为

{
 	"projectName": "projectA",
    "idCard": "110101201010081210[假的]",
    "timestamp": "1717336303",
    "submitTime": "2024-06-02 22:00:00"
}

在埋点系统中,假设埋点事件antiAddiction最终需要用于分别项目的年龄分布以及性别比例,那么该事件的定义可以为:

{
    "事件Id": "antiAddiction",
    "参数": {
        "age": "年龄",
        "birthday": "生日",
        "gender": "性别"
    }
}

如果我们需要在网关中对/aa/verifyIdCard这个接口进行拦截,那么配置为:

xyz:
  xcye:
    gateway:
      intercept-event-track:
        enable: true
        paramParseSourceCodePath: /usr/service/fileData/sourcecode
        interfaceInterceptConfigList:
          - url: "/aa/verifyIdCard"
            requestMethod: 'POST'
            projectNameFromRequestParam: true
            projectNameFromRequestParamName: projectName
            eventId: antiAddiction
            interceptParamList:
              - requestParamName: idCard
                eventParamName: age
                paramParseFileName: ParseAgeFromIdCard.java
              - requestParamName: idCard
                eventParamName: birthday
                paramParseFileName: ParseBirthFromIdCard.java
              - requestParamName: idCard
                eventParamName: gender
                paramParseFileName: ParseGenderFromIdCard.java

上面yaml中的配置意思为,对/aa/verifyIdCard请求进行拦截,并且从请求中解析到请求参数idCard和projectName。在设置埋点事件antiAddiction的age, birthday, gender三个参数时,这三个参数的值分别需要使用/usr/service/fileData/sourcecode目录下的`ParseAgeFromIdCard.java,ParseBirthFromIdCard.java,ParseGenderFromIdCard.java这三个Java文件进行解析。最终组装成的埋点上报信息为

{
    "projectName": "projectA",
    "eventId": "antiAddiction",
    "eventParams": {
        "age": "13",
        "gender": "男",
        "birthday": "2010-10-08"
    }
}

代码

主要是Spring Gateway的Filter的编写,因为代码有点多,很多地方我就使用伪代码的方式,但是整体的逻辑应该不影响。最终代码的执行逻辑为:

  1. 在Filter中获取当前请求的URI和请求方法,并和配置文件中所配置的拦截接口进行比较,如果判断不通过,则直接放形。
  2. 从请求体中获取请求参数,这里需要对不同请求方法的请求体进行区分。对于GET方式的请求,其参数是直接从URI中获取的。
  3. 请求体数据获取到之后,下面便开始埋点数据组装。因为需要保证网关埋点的成功与否是不能影响到网关的正常使用,所以这里需要使用异步方式进行。
    1. 获取埋点拦截配置中的每一个参数配置,并从paramParseSourceCodePath+paramParseFileName文件中获取到.java文件
    2. 对源代码进行md5,判断当前本地缓存中是否已经有已创建好的事例对象
    3. 如果没有,则使用Groovy来加载源代码,获取源代码所对应的Class数据,利用反射机制,实例化该Class,并将实例化后的对象放入本地缓存中
// GatewayEventTrackFilter.java
public class GatewayEventTrackFilter implements GlobalFilter {
    private final AntPathMatcher pathMatcher = new AntPathMatcher();
    private GatewayProperties gatewayProperties;
    private ManualEventTrackSubmit manualEventTrackSubmit;
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

        // 1. 伪代码 判断当前请求是否满足gatewayProperties中的配置要求

        // 伪代码 从请求中获取请求体并且异步执行埋点上报部分
        AtomicReference<Map<String, Object>> atomicRequestParamMap = new AtomicReference<>();
        List<InterfaceTrackInterceptConfig> finalTrackInterceptConfigList = gatewayProperties.getInterceptEventTrack().getInterfaceInterceptConfigList();
        Mono<String> modifiedBody = serverRequest.bodyToMono(String.class).flatMap(body -> {
            if (!isGetRequest) {
                // 2. 伪代码 获取请求参数
                atomicRequestParamMap.set(generateRequestQueryOrBodyMap(body, false));

                // 3. 异步进行埋点上报// 异步进行埋点上报
                executeEventTrack(exchange, atomicRequestParamMap.get(), finalTrackInterceptConfigList);
            }
            return Mono.just(body);
        });

        if (isGetRequest) {
            // 2. 伪代码 获取请求参数
            atomicRequestParamMap.set(generateRequestQueryOrBodyMap(uri.getRawQuery(), true));
            // 3. 异步进行埋点上报// 异步进行埋点上报
            executeEventTrack(exchange, atomicRequestParamMap.get(), finalTrackInterceptConfigList);
        }

        BodyInserter<Mono<String>, ReactiveHttpOutputMessage> bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
        CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
        return bodyInserter.insert(outputMessage, new BodyInserterContext())
                .then(Mono.defer(() -> {
                    ServerHttpRequestDecorator decorator = decorate(exchange, headers, outputMessage);
                    return chain.filter(exchange.mutate().request(decorator).build());
                })).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> {
                    if (!outputMessage.isCommitted()) {
                        return outputMessage.getBody().map(DataBufferUtils::release).then(Mono.error(throwable));
                    }
                    return Mono.error(throwable);
                });
    }
    
    ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers,
                                        CachedBodyOutputMessage outputMessage) {
        return new ServerHttpRequestDecorator(exchange.getRequest()) {
            @Override
            public HttpHeaders getHeaders() {
                long contentLength = headers.getContentLength();
                HttpHeaders httpHeaders = new HttpHeaders();
                httpHeaders.putAll(headers);
                if (contentLength > 0) {
                    httpHeaders.setContentLength(contentLength);
                } else {
                    httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
                }
                return httpHeaders;
            }

            @Override
            public Flux<DataBuffer> getBody() {
                return outputMessage.getBody();
            }
        };
    }

    private void executeEventTrack(ServerWebExchange exchange, Map<String, Object> requestParamMap, List<InterfaceTrackInterceptConfig> trackInterceptConfigList) {
        for (InterfaceTrackInterceptConfig trackInterceptConfig : trackInterceptConfigList) {
            manualEventTrackSubmit.submit(() -> {
                Map<String, Object> parsedRequestParamMap = new HashMap<>();
                Map<String, Object> eventParam = generateEventParam(requestParamMap, trackInterceptConfig, parsedRequestParamMap);
                EventTrackSubmit eventTrackSubmit = new EventTrackSubmit();
                eventTrackSubmit.setEventId(trackInterceptConfig.getEventId());
                eventTrackSubmit.setEventParams(eventParam);
                eventTrackSubmit.setProjectName(trackInterceptConfig.getProjectName());

                if (Boolean.TRUE.equals(trackInterceptConfig.getProjectNameFromRequestParam())) {
                    Object parsedProjectName = parsedRequestParamMap.get(trackInterceptConfig.getProjectNameFromRequestParamName());
                    eventTrackSubmit.setProjectName(parsedProjectName.toString());
                }
                return eventTrackSubmit;
            });
        }
    }

    private Map<String, Object> generateEventParam(Map<String, Object> requestParamMap,
                                                   InterfaceTrackInterceptConfig trackInterceptConfig,
                                                   Map<String, Object> parsedRequestParamMap) {
        Map<String, Object> eventParam = new HashMap<>();
        for (InterfaceTrackInterceptConfig.InterceptParam interceptParam : trackInterceptConfig.getInterceptParamList()) {
            String requestParamName = interceptParam.getRequestParamName();
            String eventParamName = interceptParam.getEventParamName();
            String finallyParamValue = getRequestParamValue(requestParamMap, requestParamName);
            finallyParamValue = new EventTrackParamValueCodeParse().parse(finallyParamValue, interceptParam);
            eventParam.put(eventParamName, finallyParamValue);
            if (parsedRequestParamMap.get(requestParamName) != null) {
                parsedRequestParamMap.put(requestParamName, finallyParamValue);
            }
        }
        return eventParam;
    }
}
// EventTrackParamValueCodeParse.java
public class EventTrackParamValueCodeParse implements EventTrackParamValueParse {
    private static final GroovyClassLoader classLoader = new GroovyClassLoader();
    private static final ConcurrentHashMap<String, ParamValueParser> GROOVY_CLASS_CACHE = new ConcurrentHashMap<>();
    
    @Override
    public String parse(String originParamValue, InterfaceTrackInterceptConfig.InterceptParam interceptParam) {
        String sourceCode = getSourceCodeFromFile(interceptParam);
        return executeGroovyScript(originParamValue, sourceCode);
    }
    
    private String executeGroovyScript(String originParamValue, String sourceCode) {
        ParamValueParser paramValueParser = getParamValueParser(sourceCode);
        return paramValueParser.parse(originParamValue);
    }

    private ParamValueParser getParamValueParser(String sourceCode) {
        try {
            byte[] md5 = MessageDigest.getInstance("MD5").digest(sourceCode.getBytes(StandardCharsets.UTF_8));
            String cacheKey = new BigInteger(1, md5).toString(16);
            ParamValueParser parser = GROOVY_CLASS_CACHE.get(cacheKey);
            if (parser == null) {
                Class<ParamValueParser> aClass = classLoader.parseClass(sourceCode);
                parser = aClass.newInstance();
                GROOVY_CLASS_CACHE.put(cacheKey, parser);
            }
            return parser;
        } catch (Exception e) {
            //
        }
    }
}
// ParamValueParser.java
public interface ParamValueParser {
    String parse(String originParamValue);
}

加上各种参数校验,整体的代码还是有1000多行的,上面是我只保留主要逻辑后的代码。

最终需要注意的是,配置项paramParseFileName中的代码,如果引入了java.lang外的包,我们在使用时,需要使用全限定名。

总结

最后,对网关埋点上报进行一个总结。在网关中增加埋点上报有利有弊,我个人认为,埋点尽量让客户端或者是真正处理请求的那个服务端去实现,在网关中引入埋点上报,会让网关更加复杂。而且对于请求体数据是被加密的情况,我们也不可能再让网关充当一个解密和加密的功能。对于需要对很多个请求进行埋点上报的情况,因为每一个埋点参数解析都需要单独的类,如果埋点参数很多,那么势必会导致在网关中动态创建很多实例,而且因为接口使用频率的不同,可能某些实例,基本上不会被使用到,如果分配的堆空间很小,那么可能会导致OOM。

还有,在上面的代码中,是没有使用MQ的,如果埋点上报服务挂了,那么会导致上报数据丢失。可以将直接调用埋点上报接口换成向MQ发送消息,埋点上报服务也能够以平缓的速度进行消费。