Skip to content

批量上报去重原理

white_shiro_bai edited this page Jan 28, 2024 · 3 revisions

数据重复的现象和原因

在使用ghost_sa分析数据的时候,会发现存在一定的重复数据。数据特征是 同一个用户,在相邻的时间上报了 track_id 相同的数据,all_json字段大部分内容一致。 数据重复的量与使用的SDK选择和SDK配置有强相关关系。

大概规律是

SDK类型 批量上报 单条上报
js 10%的重复率,重复几条取决于用户操作。 0.0001%的重复率,且只重复1条。
MiniProgram 1~3%的重复率,平均重复3条左右。 0.0001%的重复率,且只重复1条。
UNIAPP 1~3%的重复率,平均重复3条左右。 0.0001%的重复率,且只重复1条。
iOS/Android 原生应用 0.001%的重复率,取决于APP的质量。 0.0001%的重复率,且只重复1条。
各种后端SDK 0.001%的重复率,且只重复1条。 0.0001%的重复率,且只重复1条。

总结规律,可以发现数据重复产生原因有两类:

数据上报重复

仅在批量上报功能开启时产生。 这部分主要特征是重复比例非常大,普遍超过了1%。是对分析结果产生影响的主要部分。他的产生原因是浏览器和基于浏览器WebView技术开发的小程序,uniapp等,无法跨页面实现同步锁。在使用批量上报功能时,产生的埋点数据不会立刻上报,而是会等到条数/定时达到阈值时,统一打包上报。由于浏览器可以任意多开页面,且不同页面间属于不同的进程。当不同页面共用一个缓存存储数据时上报时,如果达到阈值,所有的页面都会触发上报。如果是同步接口,此时可以跟上报端交互完成去重。但埋点特征是不干扰业务运行,是异步接口,无论传什么都立即返回正确,所以前端SDK也无法通过接口来协调同步。这时如果服务端不做额外的去重操作,就会写入所有重复的数据副本造成重复。

这部分数据的特点是,track_id和客户端time一致。但是flush_time有一定概率不一致,取决于使用的SDK和操作方式。

数据写入重复

这部分主要是重复比例非常小,且跟随数据库压力下降而减小,在正常压力情况下,重复可以忽略不记,几乎不对分析产生影响。产生这部分重复的原因是写入数据时,数据库同时进行了OLAP操作,数据库性能下降时,写入超时后数据库没有正确返回状态。服务端重试写入造成的,而数据库原本数据也落库成功造成。

去重解决方案

去重原理

数据写入重复,因为影响微乎其微,目前版本暂不做处理。后续的性能优化版本,会通过减少数据库update的次数,以及变更更安全的重试操作等方法,减少这部分重复。

数据上报重复,是目前去重的主要目标。这里分析一下神策上报数据的数据结构。

{
'type':'track',
'distinct_id':'xxx',
'time':1234567890123,
'_flush_time':1234567890123,
'_track_id':'xxx',
'$lib':{'$lib':'js'},
'properies':{'$lib':'js'}
    
}

注意,这里只列出了去重涉及到的部分。跟去重不相关的部分,避免干扰未列出。

SDK上报的所有数据无论类型都通过sa或sa.gif接口上报,上报格式一致且不考虑接口返回数值。其中type表示上报的数据的作用,type=track表示该数据是用户产生的行为,有去重的价值。其他的type类型,以功能性为主且不涉及重复上报会导致结果不一致的情况,如profile_set,这些功能性的数据,SDK虽然会也会重复上报,但是不会有重复识别标志,无需去重。

针对需要去重的type=track的数据,SDK每产生一条数据,在一定版本(现在所有的能下载到的版本)后,会产生time和_track_id两个数据,其中time含义为事件发生时的客户端时间戳(13位毫秒级),早期神策是默认使用服务端时间的,所以SDK不会默认带这个time,现在神策优先使用客户端时间了,所以所有的SDK都有这个time。_track_id是每条数据产生时,SDK赋予的一个唯一性ID,该ID由截取2次分别发生的随机数拼接产生,不涉及到其他有特征的变量引入,但不是所有的SDK都会产生_track_id。在默认使用单条上报的SDK里,开启批量上报之前,_track_id不会上报。在同一设备同毫秒时间内,用2次独立发生的随机数增加区分度,几乎可以表示信息条目的唯一性了。

但从逻辑上推算,使用time和_track_id合并进行去重,虽然同一设备/用户是安全的,但存在不用用户撞车的可能性。实际数据验证也可以证实,日均数据量达到1万时,即可出现多于1个设备,同一时间随机出相同的_track_id。这时,增加上项目和设备的维度,就可以杜绝撞车。确保每一条数据都具有唯一性识别标志。

去重这里有2点要额外注意。

Tip

1.非type=track的数据,没有_track_id,单靠time不能识别出唯一性,不需要也不应该做去重。

2.如果没有使用神策官方的SDK,而是自己手写的上报程序,time的精度可能没有那么高,默认情况下windows的时间精度时16ms,linux是1ms。所以如果windows机器,一秒产生超过62个埋点数据时(无论什么技术栈,都存在时间戳精度不够的问题,这时非实时系统特性,改成16位也还是会重复),会因为时间精度不足,产生2条一致的时间戳。

去重方式

数据清洗方式(高质量)

如果有条件做数据清洗,不仅可以去除上报重复,还可以去除写入重复。这种去重是更可靠和完美的,极端情况下甚至可以一条一条的筛选。挑选相同distinct_id,project,track_id,time只取一条即可。更严谨还可以选择_flush_time最小的一条。_flush_time是SDK上报数据时的客户端时间。重复上报时,根据用户操作的不同,_flush_time存在跟time一致或不一致的情况,取最小的一个为佳。 能用数据清洗方式的团队,通常有能力根据原理开发最适合自己数仓的清洗流程,这里就不班门弄斧了。

上报识别方式(更经济)

使用ghost_sa的用户,资源上相对比较敏感,数据团队都比较小。增加数据清洗环节降低了实时性且提高了使用成本。所以在上报时由服务端识别并去重是一种更快捷且经济的选择。去重方式也是遵循原理,使用distinct_id,project,track_id,time四个字段,在重复数据落库上支持两种方式,在重复数据识别时机上支持两种方式,在重复数据识别组件上,支持多种方式。

落库方式:

只保留一条不重复的数据和保留所有重复数据并对重复数据做标记。这个值可以在admin参数里配置,默认会保留所有重复数据。

保留重复数据并标记(推荐)

如果配置为重复数据落库并标记,在落库时,remark字段前会拼接 "du-" 作为识别标记。这样做的好处有两方面,一是依然保留所有数据落库,哪怕后续SDK升级改变,去重策略发生变化,仍然有历史数据支持调整,这是很重要的,因为埋点业务的特点就是要用户侧无感,没办法通过业务侧发现问题,发现问题和调试需要通过收到的数据进行,所以尽可能保留完整的数据,是埋点能维持迭代的关键。二是保留了原来remark的兼容性,所有之前写过的SQL不需要任何改动,即可自动滤除重复数据。而ghost_sa的维护人员,如果需要复盘去重情况,找"du-"开头的remark即可。

*remark是ghost_sa里用来标记环境版本的,神策使用project来区分并进行数据隔离,是基于列存储避免开发数据污染生产环境的需要。ghost_sa使用行存储json字段,没有这方面的顾虑,使用remark字段,可以在不增加DBA工作量的前提下,提供给每个不同的开发环境或版本一套完整的数据。

只保留一条不重复数据

只保留不重复数据的模式,重复数据不会入库。如果是生产端识别模式,重复数据连kafka都不进入,直接过滤掉。

重复数据识别时机:

典型部署的ghost_sa是通过Kafka实现的多生产者和每个模块单一消费者模式。这样做的好处是生产者可以完全无状态,动态缩扩容贴近用户就近部署,降低埋点对业务的干扰。每个模块都是独立的消费者,按需开启,贴近数据库部署,订阅数据。每个模块单例执行,避免产生复杂的锁,提高业务数据落库的可靠性的同时,减少对高性能中间件的依赖,降低系统的复杂度,降低成本。 在强调大并发场景下,无论识别模块采用何种技术方式,想要达到不错不漏,不多不少,都难免会轻微阻塞接口性能,增加客户端SDK延迟,虽然客户端并不依赖埋点返回数据,但会占用客户端的连接数。重复识别是批量上报的附加功能,而批量上报就是为了节约连接数,所以典型部署模式下,推荐使用消费端识别的方式。

消费端识别(推荐)

典型部署模式下,识别模块放在消费端的kafka_consumer.py。生产端生产段的数据跟以前一样,不会进行额外的加工和处理。消费端获取数据后,会使用batch_send模块里方法,在内存里维护去重列表,这是个单例消费并处理,多线写入的程序。本身处理过程不存在瓶颈,性能瓶颈主要在数据库,后续也只会优化插库内容以提高性能。所以不建议运行多个kafka_consumer.py。如需并发运行,需要自行改代码引入中间件同步去重数据。

生产端识别

如下场景,可能不是典型的部署,可以使用生产端识别: 开发或测试环境 并发量极小,不需要异步,不区分生产和消费,直接同步插库即可满足需求。 不使用ghost_sa的数据模型,仅使用ghost_sa生产者进行数据接收插入kafka。后续由企业指定的模型进行消费。如接hive。 使用ghost_sa的数据模型,但还要接其他自研模块的,比如接flink。

使用生产端识别,无论后续是否接kafka,都相当于在第一步就把识别过程走完,用SDK端每次上报增加2~30ms响应延迟的代价,换取后续所有处理的简化。 如果使用了中间件方案,中间件失效会造成数据丢失,同时会造成前端SDK占用连接数不释放,对业务有显著影响。

重复识别组件

重复识别组件在configs/admin.py 里可以配置。

内置组件因为维护在内存里速度较快,所以用project+distinct_id作为key,array里只存track_id和time,辅助一个最新的time。清理缓存时,会根据最新的time清理。清理是distinct_id级别的。 redis或数据库方式,project_distinct_id,track_id,time联合作为key,清理是数据行级别的。

内置组件(ram)

batch_send.py里自己维护一个缓存进行去重,定时检测字典容量,当超过条数或内存占用后,会触发清理,清理大于设定时间的缓存。这些参数都可以在config/admin.py中配置。 ==消费端识别方式下强制使用此组件==。所以如果消费端也分布式部署的场景(消费端不推荐分布式),需要自行改造。 生产端识别方式,如果要使用内置组件识别,因为缓存不共享,需要区分生产端部署方式来确保可靠性。

直接运行 flask_main.py 或者 通过gunicorn 只启动1个worker 都是安全的。适合并发量极小或开发、测试环境。

如果需要考虑大并发,需要部署多个单例,并在反向代理里配置均衡模式使用ip_hash。以确保同一个ip上报的数据,可以被同一个实例处理。这种部署方式比较适合不想用消费端识别,又需要就贴近用户就近部署的场景。

Important

注意,在此模式下,使用gunicorn启动多个worker提高并发,或使用其他wsgi提高并发,因为无法共享缓存,是不安全。如果之前使用了此方法部署,必须使用其他的识别组件或方法。

Redis组件(redis)

使用Redis的 nxset 特性进行去重,使用 ex 特性控制redis缓存容量。使用redis是生产端识别比较推荐的模式,ghost_sa原版的用户,远远够不到单点redis的性能极限,加钱升配就行。(能够到极限的,开发团队实力自己可以写套更好的了。)

Important

注意:使用Redis作为共享缓存的时候,必须考虑生产端和redis之间的延迟,这会直接影响到请求响应的速度。ping值最好不要超过1ms。延迟越大,需要增加worker数来提高并发数缓解线程被占用的问题。

tidb6.5+

使用数据库管理缓存,并利用tidb6.5引入的ttl特性缩减表容量。tidb在这种场景下的返回速度在4ms左右,并不是特别适合这个场景。使用这种方式,只是为了在小规模的情况下,无需增加维护redis的工作。

Important

注意:使用任何db作为共享缓存的时候,由于查询缓存需要耗时非常大,必须提高worker数才能达到效果。建议worker数在50以上,这时对内存的消耗比较大。是一种非常不经济的做法。仅用于不能使用kakfa或者redis的环境。

tidb6.4-

tidb6.4以下的用户,强烈建议升级到7.5或更新的版本。6.4以下没有ttl这个功能,需要像内置组件一样,定时清理过期数据。

Important

注意:使用任何db作为共享缓存的时候,由于查询缓存需要耗时非常大,必须提高worker数才能达到效果。建议worker数在50以上,这时对内存的消耗比较大。是一种非常不经济的做法。仅用于不能使用kakfa或者redis的环境。

mysql

与tidb6.4-实现思路一致,但不使用内置组件清理过期数据。而使用mysql自带定时器清理。使用这个模式,需要在安装时就使用mysql模式安装,否则需要手开启定时器。

Important

注意:使用任何db作为共享缓存的时候,由于查询缓存需要耗时非常大,必须提高worker数才能达到效果。建议worker数在50以上,这时对内存的消耗比较大。是一种非常不经济的做法。仅用于不能使用kakfa或者redis的环境。

mysql-memory

使用mysql的内存引擎做缓存,很多DBA不支持这个用法,能用这个的,完全可以自行开发。

去重效果校验

selftest/test_case.py 提供了一个标准的数据上报用例来测试ghost_sa数据接收的存储的质量。可以模拟单条,批量,压缩方式上报数据(目前不支持模拟beacon上报)。 在上报完成后,会提示应当落库的数据数量和实际落库的数据数量。两者比对结果一致,即可认为数据去重质量可靠。

Q&A

为什么不直接在event表建unique索引来实现去重?

因为只有在批量上报时,才会有track_id用于去重。不适用批量上报时,track_id为0。哪怕采用distinct_id,lib,event,track_id,time五个值作为联合索引。在track_id为0时,也存在因为sdk版本老,time值也为0的情况。剩下的三个值就不够区分了。而且就算是新版sdk,time值不为0,因为SDK提供的time是13位毫秒级,仍然存在两个不同的事件,在1毫秒内上报的情况。仍然会有错误过滤的风险。埋点异步的业务性质决定了,对于重复,宁肯重复,也不能漏掉。所以不能使用数据库层面去重这种方式。还有,纵使使用tidb作为数据库方案,如果使用数据库的unique索引,如果数据量很大,那么去重的时间会很长,影响上报的效率。

为什么不用哈希值?

既然去重方式神策缺少官方说明支持,为什么不根据上报内容做哈希,这不比用project,distinct_id,_track_id,time这样的方式,即好做,又节省存储空间么?确实一开始这么想过,但是考虑了如下几个情况,这么做不合适。

  1. 参考从神策迁移的场景,在迁移过程中,神策写入的数据不光是埋点上报的,还有服务端处理的数据,多次上报结果是不一致的。所以用哈希值,得不到唯一结果。
  2. 支持去重的埋点,上报时会包含两个时间。time是事件发生的时间,_flush_time是埋点具体上报的时间。重复上报的埋点,time一致的,因为事件只发生一次,但是_flush_time存在不一致的情况。这时用哈希值,仍然得不到唯一结果。
  3. 不同sdk上报的json,不都是有序的,会造成哈希值得不到唯一结果。

为什么之前不做?

ghost_sa项目的诞生,源于资源匮乏的小团队想要用大数据。面对的用户就是没钱雇人,业务模式可能还没跑稳定,模型需要支持灵活调整。这个阶段,性能往往不是最重要的。所以ghost_sa模型设计的每一个环节,都贯彻弹性部署+部后不管的思路。花点小钱能扩容解决的,就不花费人力开发高性能。在灵活,经济和性能之间,选择的并不是性能。因为任何高性能的稳定运行,都依赖相对稳定的模式,丰富的QA案例和强大的资源。所以ghost_sa一直追求程序的无状态,不轻易引用中间件的开发思路。选择tidb就是为了满足这方面的需求。

增加去重功能,一定绕不开的点就是需要有地方存这个状态。最开始考虑过使用数据库独立索引做。这样可以最大限度的保证ghost_sa无状态。但那时候tidb还在3.0.3版本,功能上不能跟今天的7.5版本相提并论。而且当时神策的sdk也处于发展阶段,去重依赖的字段质量还不是很可靠,如果用数据库特性来完成去重,风险太大了。毕竟批量上报在ghost_sa诞生的年代还不流行,只有ios和android默认开启,但这俩sdk处理的很好,几乎不会有重复的情况。

现在随着浏览器,特别是微信内置的小程序环境对并发数的限制。单条上报量大时,会影响到业务请求数据,大部分SDK开始把批量上报作为缺省设置。这时增加缓存处理重复的问题,变成了满足基本功能迫不得已的选择。

event表的时间选择倾向?

ghost_sa的数据里存储有4个时间,分别是:

接收时间

date,hour,created_at <-- 这三个时间都是服务端收到埋点上报的时间,date和hour带索引,可以加速筛选数据。created_at采用10位时间戳,精确到秒。这个时间是数据上报时,SDK请求服务器的时间。在不使用生产者消费者模式时,这个时间等同于入库时间。

入库时间

all_json.db_time <--这个时间是数据最后落库的时间。采用10位时间戳,精确到秒。

发生时间

all_json.time <--这个是SDK生成的时间发生时间。13位时间戳,精确到毫秒。

上报时间

all_json._flush_time <--这个是SDK生成的数据上报时间。如果未开启批量上报,这个值可能不存在,也可能与事件发生时间一致。13位时间戳,精确到毫秒

时间选择

2021年前,神策官方版也是数据接收时间。这个时间的优势是计时准确且统一,都采用了服务器时间。无论用户的时间是否准确,或是否时区统一,都可以按照统一时间分析。ghost_sa在这两个时间上,都选择了10位,而不是更精确的13位。是因为批量上报和单挑上报,都会受到网络的干扰,造成到达时间与事件发生时的顺序并不一致。考虑到有些用户真的使用mysql存储数据(虽然极度不推荐),13位时间戳还需要使用bigint,占更大的空间,就没选择13位。

2021年后,神策官方倾向使用事件发生时间。这个时间由SDK产生,时间精度可以描述1秒内事件发生的顺序。