5 min read

SynchroTrap: 基于聚类的同步行为检测算法

引言

在网络安全和风控领域,同步行为检测是一种重要的技术手段,用于识别可能的协同欺诈活动。本文将介绍SynchroTrap算法,这是一种专门用于检测在线社交网络和金融系统中协调恶意活动的算法,通过识别账户群同步行动模式来发现潜在的风险。

算法原理

SynchroTrap算法的核心思想是利用聚类技术,通过分析用户行为的相似性来发现协同作案的账户群体。其主要步骤包括:

1. 行为集合构建

对于每个用户,我们构建一个行为集合,包含该用户在特定时间窗口内的所有行为。每个行为可以表示为一个元组:

(行为类型, 时间桶, 目标对象, 金额桶)

例如,一个转账行为可能表示为:("transfer", 12345, "account123", 2),表示在时间桶12345内向account123转账,金额在第2个桶范围内。

2. Jaccard相似度计算

Jaccard相似系数是衡量两个集合相似度的经典指标,定义为:

Sim(Ui,Uj)=|AiAj||AiAj|

其中,AiAj 分别是用户 UiUj 的行为集合。Jaccard相似度的值在0到1之间,值越大表示两个用户的行为越相似。

3. 相似度图构建

基于计算得到的Jaccard相似度,我们构建一个无向加权图: - 节点:用户 - 边:当两个用户的相似度超过阈值时,在它们之间添加一条边 - 边权重:用户间的Jaccard相似度

4. 社区发现算法

社区发现是图论中的一个重要问题,旨在识别网络中高度互连的节点群组。在SynchroTrap中,我们主要使用以下社区发现算法:

Louvain算法

Louvain算法是一种基于模块度优化的社区发现算法,其核心思想是最大化网络的模块度(Modularity)。模块度是衡量网络划分质量的指标,定义为:

Q=12mi,j[Aijkikj2m]δ(ci,cj)

其中: - Aij 是节点i和j之间的边权重 - kikj 分别是节点i和j的度 - m 是网络中所有边的权重总和 - δ(ci,cj) 是一个指示函数,当节点i和j在同一社区时为1,否则为0

Louvain算法通过两个阶段交替进行: 1. 局部移动:将节点移动到能够最大化模块度增益的邻居社区 2. 社区聚合:将同一社区的节点聚合为一个”超级节点”,构建新的网络

这个过程会一直重复,直到模块度不再增加。

连通分量分析

当Louvain算法不可用时,我们退而求其次使用连通分量分析。连通分量是图中的一个子图,其中任意两个节点之间都存在路径。在SynchroTrap中,我们将每个连通分量视为一个潜在的协同作案群体。

代码架构

SynchroTrap算法的实现采用了面向对象的设计模式,主要包含以下组件:

1. SynchrotrapDetector类

核心检测器类,负责整个检测流程的协调和执行。

2. 数据预处理模块

  • _build_action_sets:构建用户行为集合
  • _bucket_amount:金额分桶,减少噪声影响

3. 相似度计算模块

  • _calculate_jaccard_similarity:计算用户间的Jaccard相似度

4. 图分析模块

  • _build_similarity_graph:构建相似度图
  • _detect_communities:使用社区发现算法检测用户群体

5. 结果输出模块

  • analyze:整合分析结果,返回检测到的同步行为群体

示例数据与检测结果

示例交易数据

以下是一组模拟的交易数据示例:

[
  {
    "user_id": "user_42",
    "amount": 723,
    "timestamp": "2025-09-14T14:59:12",
    "action_type": "withdraw",
    "target": "account_5678"
  },
  {
    "user_id": "user_0",
    "amount": 489,
    "timestamp": "2025-09-14T15:00:00",
    "action_type": "transfer",
    "target": "account_1234"
  },
  {
    "user_id": "user_1",
    "amount": 502,
    "timestamp": "2025-09-14T15:00:03",
    "action_type": "transfer",
    "target": "account_1234"
  },
  {
    "user_id": "user_2",
    "amount": 495,
    "timestamp": "2025-09-14T15:00:01",
    "action_type": "transfer",
    "target": "account_1234"
  },
  {
    "user_id": "user_3",
    "amount": 510,
    "timestamp": "2025-09-14T15:00:02",
    "action_type": "transfer",
    "target": "account_1234"
  },
  {
    "user_id": "user_4",
    "amount": 485,
    "timestamp": "2025-09-14T15:00:04",
    "action_type": "transfer",
    "target": "account_1234"
  },
  {
    "user_id": "user_15",
    "amount": 312,
    "timestamp": "2025-09-14T14:00:05",
    "action_type": "withdraw",
    "target": "account_5678"
  },
  {
    "user_id": "user_16",
    "amount": 298,
    "timestamp": "2025-09-14T14:00:08",
    "action_type": "withdraw",
    "target": "account_5678"
  },
  {
    "user_id": "user_20",
    "amount": 1005,
    "timestamp": "2025-09-14T14:30:00",
    "action_type": "deposit",
    "target": "account_9876"
  },
  {
    "user_id": "user_21",
    "amount": 1005,
    "timestamp": "2025-09-14T14:30:00",
    "action_type": "deposit",
    "target": "account_9876"
  }
]

检测结果

使用SynchroTrap算法分析上述交易数据,我们得到以下检测结果:

检测到的同步行为群体:
群体 1: ['user_0', 'user_1', 'user_2', 'user_3', 'user_4']
群体 2: ['user_15', 'user_16', 'user_17']
群体 3: ['user_20', 'user_21', 'user_22']

可视化结果

SynchroTrap可视化
SynchroTrap可视化

在可视化图中,我们可以清晰地看到三个不同的社区(用不同颜色表示):

  1. 群体1(红色):由5个用户组成(user_0到user_4),这些用户之间的连接非常密集,表明它们的行为高度相似。这个群体在短时间内(15:00左右)向同一目标账户进行了金额相近的转账操作。

  2. 群体2(绿色):由3个用户组成(user_15到user_17),这些用户之间的连接相对较松散,表明它们的行为相似度中等。这个群体在14:00左右向同一目标账户进行了金额相近的取款操作。

  3. 群体3(蓝色):由3个用户组成(user_20到user_22),这些用户之间的连接非常紧密,表明它们的行为几乎完全一致。这个群体在14:30进行了完全相同的存款操作,包括相同的金额、时间和目标账户。

图中的边表示用户之间的相似度,边的粗细表示相似度的大小。可以看到,群体3中的边最粗,表明这个群体的行为最为同步;群体1次之;群体2的边最细,表明这个群体的行为同步性相对较低。

应用场景

SynchroTrap算法在以下场景中有广泛应用:

  1. 社交媒体反作弊:检测批量注册的虚假账号和协同点赞/评论行为
  2. 金融风控:识别协同洗钱、套现等欺诈行为
  3. 电商平台:发现刷单、虚假评价等行为
  4. 网络安全:检测DDoS攻击中的僵尸网络

参数调优

SynchroTrap算法的效果很大程度上依赖于参数的选择:

  1. 时间窗口大小:根据业务场景选择合适的时间粒度,通常从几秒到几分钟不等
  2. 相似度阈值:通常设置在0.3-0.7之间,阈值越低,检出率越高但误报也可能增加
  3. 最小聚类大小:根据预期的恶意群体规模设置,通常为3-5

结论

SynchroTrap算法通过分析用户行为的同步性,有效地检测出协同作案的账户群体。该算法计算简单、易于实现,且在实际应用中展现出良好的效果。随着欺诈手段的不断演进,我们也需要不断优化算法,增加更多维度的特征,以提高检测的准确性和鲁棒性。

附录:完整代码实现

synchrotrap.py - 核心算法实现

import numpy as np
from typing import List, Dict, Set, Tuple
from collections import defaultdict
import networkx as nx
from datetime import datetime

class SynchrotrapDetector:
    def __init__(self, time_window: int = 60, similarity_threshold: float = 0.5, min_cluster_size: int = 3):
        """
        初始化SynchroTrap检测器
        
        参数:
            time_window: 检测时间窗口(秒)
            similarity_threshold: Jaccard相似度阈值(0-1)
            min_cluster_size: 最小聚类大小
        """
        self.time_window = time_window
        self.similarity_threshold = similarity_threshold
        self.min_cluster_size = min_cluster_size
        
    def _build_action_sets(self, transactions: List[Dict]) -> Dict[str, Set[Tuple]]:
        """
        为每个用户构建行为集合
        
        返回:
            用户行为集合 {user_id: {(action_type, time_bucket, target), ...}}
        """
        action_sets = defaultdict(set)
        
        # 将时间分桶,粒度为time_window秒
        for tx in transactions:
            user_id = tx['user_id']
            timestamp = tx['timestamp']
            
            # 时间分桶
            if isinstance(timestamp, str):
                timestamp = datetime.fromisoformat(timestamp)
            time_bucket = int(timestamp.timestamp() // self.time_window)
            
            # 构建行为特征元组: (行为类型, 时间桶, 目标对象)
            # 例如: ("transfer", 12345, "account123")
            action_type = tx.get('action_type', 'transaction')
            target = tx.get('target', '')
            amount_bucket = self._bucket_amount(tx['amount'])
            
            # 添加到用户行为集合
            action = (action_type, time_bucket, target, amount_bucket)
            action_sets[user_id].add(action)
            
        return action_sets
        
    def _bucket_amount(self, amount: float) -> int:
        """将金额分桶,减少噪声影响"""
        if amount < 100:
            return 0
        elif amount < 500:
            return 1
        elif amount < 1000:
            return 2
        elif amount < 5000:
            return 3
        else:
            return 4
        
    def _calculate_jaccard_similarity(self, action_sets: Dict[str, Set[Tuple]]) -> Dict[Tuple[str, str], float]:
        """
        计算用户间的Jaccard相似度
        
        Jaccard相似度 = |A ∩ B| / |A ∪ B|
        
        返回:
            相似度字典 {(user_i, user_j): similarity}
        """
        users = list(action_sets.keys())
        n = len(users)
        similarities = {}
        
        for i in range(n):
            for j in range(i+1, n):
                user_i = users[i]
                user_j = users[j]
                
                set_i = action_sets[user_i]
                set_j = action_sets[user_j]
                
                # 计算Jaccard相似度
                intersection = len(set_i.intersection(set_j))
                union = len(set_i.union(set_j))
                
                # 避免除零错误
                if union > 0:
                    similarity = intersection / union
                else:
                    similarity = 0.0
                    
                if similarity >= self.similarity_threshold:
                    similarities[(user_i, user_j)] = similarity
                    
        return similarities
        
    def _build_similarity_graph(self, similarities: Dict[Tuple[str, str], float]) -> nx.Graph:
        """
        构建相似度图,用于社区发现
        """
        G = nx.Graph()
        
        # 添加边和权重
        for (user_i, user_j), similarity in similarities.items():
            G.add_edge(user_i, user_j, weight=similarity)
            
        return G
        
    def _detect_communities(self, G: nx.Graph) -> List[List[str]]:
        """
        使用社区发现算法检测用户群体
        """
        # 使用Louvain算法进行社区发现
        try:
            import community as community_louvain
            partition = community_louvain.best_partition(G)
            
            # 将相同社区的节点分组
            communities = defaultdict(list)
            for node, community_id in partition.items():
                communities[community_id].append(node)
                
            # 过滤掉小于min_cluster_size的社区
            return [members for members in communities.values() if len(members) >= self.min_cluster_size]
        except ImportError:
            # 如果没有安装community库,使用连通分量作为替代
            return [list(c) for c in nx.connected_components(G) if len(c) >= self.min_cluster_size]
        
    def analyze(self, transactions: List[Dict]) -> Dict:
        """
        分析交易数据,检测同步行为群体
        
        返回:
            检测结果 {
                'synchronized_groups': [[user_id1, user_id2, ...], ...],
                'similarity_graph': networkx图对象,
                'action_counts': {user_id: action_count}
            }
        """
        # 1. 构建用户行为集合
        action_sets = self._build_action_sets(transactions)
        
        # 2. 计算用户间的Jaccard相似度
        similarities = self._calculate_jaccard_similarity(action_sets)
        
        # 3. 构建相似度图
        similarity_graph = self._build_similarity_graph(similarities)
        
        # 4. 使用社区发现算法检测用户群体
        communities = self._detect_communities(similarity_graph)
        
        # 5. 返回结果
        return {
            'synchronized_groups': communities,
            'similarity_graph': similarity_graph,
            'action_counts': {user: len(actions) for user, actions in action_sets.items()}
        }

main.py - 示例程序和可视化

from synchrotrap import SynchrotrapDetector
import random
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import networkx as nx

# 生成模拟交易数据
def generate_mock_data(num_users=100, num_tx=1000):
    users = [f"user_{i}" for i in range(num_users)]
    transactions = []
    
    # 生成正常交易
    for _ in range(num_tx - 200):
        tx_time = datetime.now() - timedelta(seconds=random.randint(0, 3600))
        transactions.append({
            'user_id': random.choice(users),
            'amount': random.randint(10, 1000),
            'timestamp': tx_time,
            'action_type': random.choice(['transfer', 'deposit', 'withdraw']),
            'target': f"account_{random.randint(1000, 9999)}"
        })
    
    # 生成同步行为交易(3个恶意群体)
    # 群体1: 5个用户,高度同步
    sync_users_1 = users[:5]
    base_time = datetime.now()
    for i in range(40):
        tx_time = base_time - timedelta(seconds=i*30)
        target = f"account_{random.randint(1000, 9999)}"
        for user in sync_users_1:
            transactions.append({
                'user_id': user,
                'amount': 500 + random.randint(-20, 20),
                'timestamp': tx_time + timedelta(seconds=random.randint(0, 5)),
                'action_type': 'transfer',
                'target': target
            })
    
    # 群体2: 8个用户,中度同步
    sync_users_2 = users[10:18]
    base_time = datetime.now() - timedelta(hours=1)
    for i in range(30):
        tx_time = base_time - timedelta(seconds=i*45)
        target = f"account_{random.randint(1000, 9999)}"
        for user in sync_users_2:
            if random.random() < 0.9:  # 90%的概率执行同步行为
                transactions.append({
                    'user_id': user,
                    'amount': 300 + random.randint(-50, 50),
                    'timestamp': tx_time + timedelta(seconds=random.randint(0, 10)),
                    'action_type': 'withdraw',
                    'target': target
                })
    
    # 群体3: 3个用户,完全同步
    sync_users_3 = users[20:23]
    base_time = datetime.now() - timedelta(minutes=30)
    for i in range(20):
        tx_time = base_time - timedelta(seconds=i*60)
        target = f"account_{random.randint(1000, 9999)}"
        amount = 1000 + random.randint(-10, 10)
        action_type = 'deposit'
        for user in sync_users_3:
            transactions.append({
                'user_id': user,
                'amount': amount,  # 完全相同的金额
                'timestamp': tx_time,  # 完全相同的时间
                'action_type': action_type,  # 完全相同的行为
                'target': target  # 完全相同的目标
            })
    
    return transactions

def visualize_similarity_graph(graph, communities=None):
    """可视化相似度图和检测到的社区"""
    plt.figure(figsize=(12, 8))
    
    # 为不同社区设置不同颜色
    if communities:
        colors = []
        for node in graph.nodes():
            for i, community in enumerate(communities):
                if node in community:
                    colors.append(i)
                    break
            else:
                colors.append(len(communities))
    else:
        colors = [0] * len(graph.nodes())
    
    # 使用spring_layout布局
    pos = nx.spring_layout(graph)
    
    # 绘制节点和边
    nx.draw_networkx_nodes(graph, pos, node_size=100, node_color=colors, cmap=plt.get_cmap('rainbow'))
    nx.draw_networkx_edges(graph, pos, width=1.0, edge_color='gray', alpha=0.5)
    nx.draw_networkx_labels(graph, pos, font_size=8)
    
    plt.title("SynchroTrap: 用户行为相似度图与检测到的社区")
    plt.axis('off')
    plt.tight_layout()
    plt.savefig("synchrotrap_visualization.png", dpi=300)
    plt.show()

if __name__ == "__main__":
    # 生成测试数据
    print("生成模拟交易数据...")
    transactions = generate_mock_data()
    
    # 检测同步行为
    print("使用SynchroTrap算法分析数据...")
    detector = SynchrotrapDetector(time_window=30, similarity_threshold=0.3, min_cluster_size=3)
    results = detector.analyze(transactions)
    
    # 输出检测结果
    print("\n检测到的同步行为群体:")
    for i, group in enumerate(results['synchronized_groups']):
        print(f"群体 {i+1}: {group}")
        
    # 可视化结果
    try:
        print("\n生成可视化图表...")
        visualize_similarity_graph(results['similarity_graph'], results['synchronized_groups'])
        print("可视化图表已保存为 'synchrotrap_visualization.png'")
    except Exception as e:
        print(f"可视化失败: {e}")
        print("请确保已安装matplotlib和networkx库")