最近和一位做内审的朋友交流,他们目前集团在做数字化审计,
想利用一些IT技术手段,在数据核查中发挥作用,能够从海量数据中缩小范围,查找目标异常。
其实我个人感觉做这事,基本上有三步:
梳理核查思路。 挖掘数据特征。 编写代码模型。
而最重要的我感觉就是梳理核查思路,基本上重要程度占到 80% 。
当有了核查思路后,很自然能去想出对应通过什么样的数据特征反应出异常,从而再去编写代码找出这样的数据。
下面我们以资金核查中查找“快进快出”资金来反映这个思路。
核查思路
事实上,我作为外部审计,很少对纯粹的资金流水数据分析。
而朋友作为内审,给我看了很多他们想做的分析维度,其中一个“快进快出”是一个他们用传统的 Excel 比较难完成的。
我也搜索了下“快进快出”分析主要应对的风险和场景是什么。
chatgpt 老师给我列举了一些场景。
而朋友主要想看被审计单位和交易对手之间有没有快进快出的资金流水。
数据特征
我们以下面示例数据为例:
所谓快进快出,就是在较短期间内,进入的资金又转出。
这里从数据上来说有两个要点:
时间上发生在较短期间。如1天,或者几天。 累计进入的资金和累计出去的资金相同。
其中,这里我们用累计的资金的概念,而不是一笔资金进对应一笔资金出去查看,因为可能会有多笔对应多笔的情况。
再进一步,我们可以思考下,是否金额非要完成相等呢?
其实,也未必吧,比如,我进来了 100 万,转出了 99 万,是不是也可以认为是相同呢?
所以,我们可以有一个 转出/转入资金比 的概念,例如,只要是差异在 5% 内就可以认为符合,
当然,这个比例完全可以自定义。
这样去设置后,又会出现新的问题,比如我转入 100 元,转出 99 元,
这样低金额的数据按照上面的筛选方法,也会被筛选出来。
因此,我们还需要引入一个 阈值 的概念,需要累计转入的金额大于这个阈值才会统计,
这样就可以避开小资金流水的干扰。
以上,基本就是根据我们风险场景想出的对应的数据特征。
编写代码模型
我们将资金交易数据按时间顺序排列,
用下图中1至9的点代码每个资金数据:
我们要找到满足条件的期间内的数据,那么我们就用起始点 i ,从第一个数据开始循环到倒数第二个数据。
这个点就是起始点。
我们从起始点的下一个点 j 开始循环,一直循环到最后一个点。
这个点就是截止点。
起始点和截止点就组成了一个期间。
我们只需要计算这个期间内的累计资金流入和累计资金流出,满足差异金额在一个较小比例,且金额大于我们设定的阈值,那么就找到了我们想要的一个期间数据。
这个时候,起始点,再从截止点开始继续往下循环一直到遍历完所有数据,就可以找出所有满足条件的数据。
import pandas as pd
from datetime import timedelta
def format_date(date: int) -> str:
"""
将形如 20241101 的整数日期转换为 2024-11-01 的字符串格式。
:param date: 整数形式的日期,例如 20241101。
:return: 字符串形式的日期,例如 '2024-11-01'。
"""
# 将整数转为字符串,方便切片处理
date_str = str(date)
if len(date_str) != 8:
raise ValueError("输入的日期格式不正确,应该是形如 20241101 的 8 位数字。")
# 按格式拼接
formatted_date = f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:]}"
return formatted_date
def match_funds(data, time_window=10, tolerance=0.05, threshold=100000):
data["日期"] = pd.to_datetime(data["日期"])
# 按天汇总
daily_summary = data.groupby("日期").apply(
lambda x: pd.Series({
"收入总额": x.loc[x["交易类型"] == "收", "金额"].sum(),
"支出总额": x.loc[x["交易类型"] == "支", "金额"].sum()
}),
include_groups=False
).reset_index()
# 结果存储
results = []
# 滑动窗口匹配逻辑
n = len(daily_summary)
i = 0
while i < n:
start_date = daily_summary.loc[i, "日期"]
income_sum = daily_summary.loc[i, "收入总额"]
expense_sum = daily_summary.loc[i, "支出总额"]
matched = False
# 检查当天的收支金额是否满足条件
net_amount = income_sum - expense_sum
percentage_difference = abs(net_amount) / income_sum if income_sum > 0 else float("inf")
if max(income_sum, expense_sum) >= threshold and percentage_difference <= tolerance:
# 记录匹配结果
results.append({
"收入开始日期": start_date,
"支出结束日期": start_date,
"收入累计": income_sum,
"支出累计": expense_sum,
"净额": net_amount,
"差异占比": percentage_difference
})
matched = True
i += 1 # 跳到下一天继续
continue # 跳过后续累加逻辑
# 在时间窗口内累加后续日期的收入和支出
for j in range(i + 1, n):
end_date = daily_summary.loc[j, "日期"]
if (end_date - start_date).days > time_window:
break
income_sum += daily_summary.loc[j, "收入总额"]
expense_sum += daily_summary.loc[j, "支出总额"]
# 检查是否满足金额阈值
if max(income_sum, expense_sum) < threshold:
continue # 如果不满足阈值,跳过当前窗口
# 检查是否满足差异条件
net_amount = income_sum - expense_sum
percentage_difference = abs(net_amount) / income_sum if income_sum > 0 else float("inf")
if percentage_difference <= tolerance:
# 记录匹配结果
results.append({
"收入开始日期": start_date,
"支出结束日期": end_date,
"收入累计": income_sum,
"支出累计": expense_sum,
"净额": net_amount,
"差异占比": percentage_difference
})
matched = True
i = j # 跳到匹配的结束日期之后
break
if not matched:
i += 1 # 如果未匹配,跳到下一天继续
# 转换结果为DataFrame
result_df = pd.DataFrame(results)
return result_df
if __name__ == "__main__":
# 定义数据列表
data = [
['A公司', '20200624', '支', 1964.00, 'B公司', '外购纸品费'],
['B公司', '20200807', '收', 23436.20, 'A公司', '汇款业务撤销'],
['B公司', '20200807', '收', 23629.20, 'A公司', '汇款业务撤销'],
['A公司', '20200807', '支', 23629.20, 'B公司', '品质改造'],
['A公司', '20200807', '支', 23436.20, 'B公司', '品质改造'],
['A公司', '20200811', '支', 23436.20, 'B公司', '物业费'],
['A公司', '20200811', '支', 23629.20, 'B公司', '物业费'],
['A公司', '20200824', '支', 3126.00, 'B公司', '品质提升'],
['A公司', '20200914', '支', 1692.00, 'B公司', '物业费'],
['A公司', '20201030', '支', 5808.00, 'B公司', '物业人工费'],
['A公司', '20201030', '支', 23541.80, 'B公司', '物业人工费'],
['A公司', '20201030', '支', 24008.00, 'B公司', '物业人工费']
]
# 定义列名
columns = ['付款户名', '交易日期', '交易方向', '交易金额', '收款户名', '备注']
# 创建DataFrame
df = pd.DataFrame(data, columns=columns)
check_company = 'A公司'
data = []
for index,row in df.iterrows():
row_data = []
date = row['交易日期']
date = format_date(date)
amount = row['交易金额']
note = row['备注']
if row['收款户名'] == check_company:
company = check_company
type = '收'
opponent = row['付款户名']
row_data = [date,company,opponent,type,amount,note]
data.append(row_data)
elif row['付款户名'] == check_company:
company = check_company
type = '支'
opponent = row['收款户名']
row_data = [date,company,opponent,type,amount,note]
data.append(row_data)
data = pd.DataFrame(data,columns=['日期','公司名称','对方单位','交易类型','金额','备注'])
company = check_company
opponents = data['对方单位'].unique()
output = []
for opponent in opponents:
df = data[data['对方单位']==opponent]
df = df.reset_index()
result = match_funds(df,10,0.05,10000)
if not result.empty:
result.insert(0,'对方单位',opponent)
result.insert(0,'公司名称',company)
output.append(result)
df_output = pd.concat(output)
df_output = df_output.sort_values(by='收入累计',ascending=False)
df_output.reset_index()
print(df_output)
执行结果:
可以看到,我们找到 2020-08-07 这天有快进快出的资金。
当然,如果你数据够多,也可以查找出跨日期内满足条件的。
( 注:因为数据中交易时间只精确到日期,因此我先将数据按天汇总了。如果数据是精确到秒,我们可以逐笔执行)
当然,这个代码是针对一个公司转入又转回的情形,
如果你想看一个公司转入又转出到另一个公司,其实也可以修改代码实现。
主要还是看你的核查思路。