Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
import type Database from 'better-sqlite3';
import { BaseRepository } from './base.repository.js';
import { buildRuleName } from '../../shared/utils/rule-name.js';

export interface TrafficUpdate {
domain: string;
Expand Down Expand Up @@ -147,8 +148,7 @@ export class TrafficWriterRepository extends BaseRepository {

if (update.upload === 0 && update.download === 0) return;

const ruleName = update.chains.length > 1 ? update.chains[update.chains.length - 1] :
update.rulePayload ? `${update.rule}(${update.rulePayload})` : update.rule;
const ruleName = buildRuleName(update);
const finalProxy = update.chains.length > 0 ? update.chains[0] : 'DIRECT';
const fullChain = update.chains.join(' > ') || update.chain || 'DIRECT';
const s = this.singleStmts;
Expand All @@ -171,7 +171,7 @@ export class TrafficWriterRepository extends BaseRepository {
s.ruleIpUpsert.run({ backendId, rule: ruleName, ip: update.ip, upload: update.upload, download: update.download, timestamp });

if (update.chains.length > 1) {
s.ruleProxyInsert.run({ backendId, rule: update.chains[update.chains.length - 1], proxy: update.chains[0] });
s.ruleProxyInsert.run({ backendId, rule: ruleName, proxy: update.chains[0] });
}

s.hourlyUpsert.run({ backendId, hour, upload: update.upload, download: update.download });
Expand Down Expand Up @@ -238,8 +238,7 @@ export class TrafficWriterRepository extends BaseRepository {
if (update.upload === 0 && update.download === 0) continue;
const connections = this.normalizeConnections(update.connections);

const ruleName = update.chains.length > 1 ? update.chains[update.chains.length - 1] :
update.rulePayload ? `${update.rule}(${update.rulePayload})` : update.rule;
const ruleName = buildRuleName(update);
const finalProxy = update.chains.length > 0 ? update.chains[0] : 'DIRECT';
const fullChain = update.chains.join(' > ') || update.chain || 'DIRECT';
const { hourKey, minuteKey } = getTimeKeys(update.timestampMs ?? now.getTime());
Expand Down Expand Up @@ -532,7 +531,7 @@ export class TrafficWriterRepository extends BaseRepository {
chains = CASE WHEN domain_stats.chains IS NULL THEN @chain WHEN LENGTH(domain_stats.chains) > 4000 THEN domain_stats.chains WHEN INSTR(domain_stats.chains, @chain) > 0 THEN domain_stats.chains ELSE domain_stats.chains || ',' || @chain END
`);
for (const [, data] of domainMap) {
const ruleName = data.chains.length > 1 ? data.chains[data.chains.length - 1] : data.rulePayload ? `${data.rule}(${data.rulePayload})` : data.rule;
const ruleName = buildRuleName(data);
const fullChain = data.chains.join(' > ');
domainStmt.run({ backendId, domain: data.domain, ip: data.ip, upload: data.upload, download: data.download, count: data.count, timestamp, rule: ruleName, chain: fullChain });
}
Expand Down
51 changes: 51 additions & 0 deletions apps/collector/src/database/repositories/traffic-writer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,5 +247,56 @@ describe('TrafficWriterRepository', () => {
const summary = db.getSummary(backendId);
expect(summary.totalConnections).toBe(1);
});

it('should use rule and rulePayload instead of treating the last chain hop as the rule', () => {
db.batchUpdateTrafficStats(backendId, [
{
domain: 'media.example',
ip: '8.8.8.8',
chain: '香港 01',
chains: ['香港 01', '🔯 大流量节点'],
rule: 'RULE-SET',
rulePayload: 'OpenAI',
upload: 120,
download: 340,
sourceIP: '192.168.1.8',
timestampMs: Date.now(),
},
]);

const rules = db.getRuleStats(backendId);
expect(rules).toHaveLength(1);
expect(rules[0].rule).toBe('RULE-SET(OpenAI)');
expect(rules[0].finalProxy).toBe('香港 01');

const ruleProxyMap = db.getRuleProxyMap(backendId);
expect(ruleProxyMap).toHaveLength(1);
expect(ruleProxyMap[0]).toEqual({
rule: 'RULE-SET(OpenAI)',
proxies: ['香港 01'],
});
});

it('should preserve Surge rule names when the last chain hop already matches the resolved rule', () => {
db.batchUpdateTrafficStats(backendId, [
{
domain: 'stream.media',
ip: '203.0.113.8',
chain: 'JP-Sakura',
chains: ['JP-Sakura', 'Manual|Select', 'YouTube|Media'],
rule: 'YouTube|Media',
rulePayload: 'RULE-SET',
upload: 88,
download: 188,
sourceIP: '192.168.1.18',
timestampMs: Date.now(),
},
]);

const rules = db.getRuleStats(backendId);
expect(rules).toHaveLength(1);
expect(rules[0].rule).toBe('YouTube|Media');
expect(rules[0].finalProxy).toBe('JP-Sakura');
});
});
});
8 changes: 2 additions & 6 deletions apps/collector/src/modules/clickhouse/clickhouse.writer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { TrafficUpdate } from '../collector/batch-buffer.js';
import { loadClickHouseConfig } from './clickhouse.config.js';
import { buildRuleName } from '../../shared/utils/rule-name.js';

interface CountryMinuteUpdate {
country: string;
Expand Down Expand Up @@ -89,12 +90,7 @@ export class ClickHouseWriter {
ip: item.ip || '',
source_ip: item.sourceIP || '',
chain: item.chains.join(' > ') || item.chain || 'DIRECT',
rule:
item.chains.length > 1
? item.chains[item.chains.length - 1]
: item.rulePayload
? `${item.rule}(${item.rulePayload})`
: item.rule,
rule: buildRuleName(item),
upload: Math.max(0, Math.floor(item.upload)),
download: Math.max(0, Math.floor(item.download)),
connections: Math.max(0, Math.floor(item.connections ?? 1)),
Expand Down
7 changes: 2 additions & 5 deletions apps/collector/src/modules/collector/batch-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
getClickHouseWriter,
type TrafficWriteOutcome,
} from "../clickhouse/clickhouse.writer.js";
import { buildRuleName } from "../../shared/utils/rule-name.js";
import { shouldSkipSqliteStatsWrites } from "../stats/stats-write-mode.js";

export interface TrafficUpdate {
Expand Down Expand Up @@ -136,11 +137,7 @@ export class BatchBuffer {

for (const update of updates) {
if (update.domain) domains.add(update.domain);
const initialRule =
update.chains.length > 0
? update.chains[update.chains.length - 1]
: "DIRECT";
rules.add(initialRule);
rules.add(buildRuleName(update) || "DIRECT");
}

let trafficOk = true;
Expand Down
8 changes: 2 additions & 6 deletions apps/collector/src/modules/realtime/realtime.store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type {
TrafficTrendPoint,
} from '@neko-master/shared';
import type { StatsDatabase } from '../db/db.js';
import { buildRuleName } from '../../shared/utils/rule-name.js';

type SummaryDelta = {
upload: number;
Expand Down Expand Up @@ -364,12 +365,7 @@ export class RealtimeStore {

this.pruneOldBuckets(minuteMap, timestamp);

const ruleName =
meta.chains.length > 1
? meta.chains[meta.chains.length - 1]
: meta.rulePayload
? `${meta.rule}(${meta.rulePayload})`
: meta.rule;
const ruleName = buildRuleName(meta);
const fullChain = meta.chains.join(' > ');
const lastSeen = new Date(timestamp).toISOString();

Expand Down
4 changes: 2 additions & 2 deletions apps/collector/src/modules/stats/stats.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ describe('StatsService', () => {
ip: '203.0.113.10',
chain: 'JP-Sakura|IEPL',
chains: ['JP-Sakura|IEPL', 'Manual|Select', 'YouTube|Media'],
rule: 'RULE-SET',
rulePayload: 'YouTube',
rule: 'YouTube|Media',
rulePayload: 'RULE-SET',
upload: 123,
download: 456,
sourceIP: '192.168.1.88',
Expand Down
23 changes: 23 additions & 0 deletions apps/collector/src/shared/utils/rule-name.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
export type RuleNameInput = {
rule: string;
rulePayload: string;
chains: string[];
};

export function buildRuleName(input: RuleNameInput): string {
const rule = input.rule.trim();
const rulePayload = input.rulePayload.trim();
const lastChain = input.chains[input.chains.length - 1]?.trim() || '';

// Surge collector already resolves the real rule/group name into `rule`
// and mirrors it as the last chain hop. Preserve that behavior.
if (rule && lastChain === rule) {
return rule;
}

if (rule) {
return rulePayload ? `${rule}(${rulePayload})` : rule;
}

return lastChain || 'DIRECT';
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ export function InteractiveRuleStats({
}));

// Append zero-traffic rules from Gateway API, using the target proxy group name
// (rule.proxy) which matches how traffic data stores rule names.
// (rule.proxy) which matches how traffic data stores rule names for Surge.
// Multiple low-level rules (RuleSet, ProcessName, etc.) can target the same
// proxy group, so we deduplicate by proxy group name.
if (gatewayRules?.rules) {
Expand Down
Loading