💡 作者:韩信子@ShowMeAI
📘 大数据技术 ◉ 技能晋升系列:https://www.showmeai.tech/tutorials/84
📘 行业名企利用系列:https://www.showmeai.tech/tutorials/63
📘 本文地址:https://www.showmeai.tech/article-detail/296
📢 申明:版权所有,转载请分割平台与作者并注明出处
📢 珍藏ShowMeAI查看更多精彩内容
💡 背景
Sparkify 是一个音乐流媒体平台,用户能够获取局部免费音乐资源,也有不少用户开启了会员订阅打算(参考QQ音乐),在Sparkify中享受优质音乐内容。
用户能够随时对本人的会员订阅打算降级甚至勾销,而当下极其内卷和竞争强烈的大环境下,获取新客的老本十分高,因而保护现有用户并确保他们长期会员订阅至关重要。同时因为咱们有很多用户在平台的历史应用记录,基于这些数据撑持去开掘客户偏向,定制正当的业务策略,也更加有保障和数据撑持。
但当初稍大一些的互联网公司,数据动辄成千盈百万,咱们要在这么微小的数据规模下实现开掘与建模,又要借助各种解决海量数据的大数据平台。在本文中ShowMeAI将联合 Sparkify 的业务场景和海量数据,解说基于 Spark 的客户散失建模预测案例。
本文波及到大数据处理剖析及机器学习建模相干内容,ShowMeAI为这些内容制作了具体的教程与工具速查手册,大家能够通过如下内容开展学习或者回顾相干常识。
- 图解数据分析:从入门到精通系列教程
- 图解大数据技术:从入门到精通系列教程
- 图解机器学习算法:从入门到精通系列教程
- 数据迷信工具库速查表 | Spark RDD 速查表
- 数据迷信工具库速查表 | Spark SQL 速查表
💡 数据
本文用到的 Sparkify 数据有3个大小的数据规格,大家能够依据本人的计算资源状况,抉择适合的大小,本文代码都兼容和匹配,对应的数据大家能够通过ShowMeAI的百度网盘地址获取。
🏆 实战数据集下载(百度网盘):公众号『ShowMeAI钻研核心』回复『实战』,或者点击 这里 获取本文 [[9] Spark 海量数据上的用户留存剖析开掘与建模](https://www.showmeai.tech/art…) 『sparkify 用户散失数据集』
⭐ ShowMeAI官网GitHub:https://github.com/ShowMeAI-Hub
- mini_sparkify_event_data.json: 最小的数据子集 (125 MB)
- medium-sparkify-event-data.json: 中型大小数据子集 (237 MB)
- sparkify_event_data.json: 全量数据 (12 GB)
💡 探索性数据分析(EDA)
在进行建模之前,咱们首先要深刻理解咱们的数据,这能够帮忙咱们更有针对性地构建特色和抉择模型。也就是ShowMeAI之前提到过的「探索性数据分析(EDA)」的过程。
① 导入工具库
# 根底数据处理与绘图
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import requests
from datetime import datetime
# spark相干
from pyspark.sql import SparkSession
from pyspark.sql import Window, Row
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, FloatType
② 初步数据摸索
Sparkify 数据集中,每一个用户的行为都被记录成了一条带有工夫戳的操作记录,包含用户登记、播放歌曲、点赞歌曲和降级订阅打算等。
# 初始化spark session
spark_session = SparkSession.builder \
.master("local") \
.appName("sparkify") \
.getOrCreate()
# 加载数据与长久化
src = "data/mini_sparkify_event_data.json"
df = spark_session.read.json(src)
# 构建视图(不便查问)
df.createOrReplaceTempView("sparkify_table")
df.persist()
# 查看前5行数据
df . limit(5) . toPandas()
用全量数据集(12GB)做EDA可能会耗费大量的资源且很慢,所以这个过程咱们抉择小子集(128MB)来实现,如果采样形式正当,小子集上的数据分布能很大水平体现全量数据上的散布个性。
对于中小数据集上的EDA大家能够参考ShowMeAI分享过的自动化数据分析工具,能够更快捷地获取一些数据信息与剖析论断。
- 自动化数据分析 (EDA) 工具库大全
📌 根底数据维度信息
# 查看数据维度信息
print(f'数据集有 {len(df.columns)} 列')
print(f'数据集有 {df.count()} 行')
结果显示有 18 列 和 286500 行。
理论这份小子集中只有 225 个惟一用户 ID,这意味着均匀每个客户与平台有 286500/225≈1200 多个交互操作。
📌 字段信息
# 查看字段信息
df . printSchema()
咱们通过上述命令查看数据字段信息,输入后果如下,蕴含字段名和类型等:
|-- artist: string (nullable = true)
|-- auth: string (nullable = true)
|-- firstName: string (nullable = true)
|-- gender: string (nullable = true)
|-- itemInSession: long (nullable = true)
|-- lastName: string (nullable = true)
|-- length: double (nullable = true)
|-- level: string (nullable = true)
|-- location: string (nullable = true)
|-- method: string (nullable = true)
|-- page: string (nullable = true)
|-- registration: long (nullable = true)
|-- sessionId: long (nullable = true)
|-- song: string (nullable = true)
|-- status: long (nullable = true)
|-- ts: long (nullable = true)
|-- userAgent: string (nullable = true)
|-- userId: string (nullable = true)
咱们获取的一些初步信息如下:
- 字符串类型的字段包含
song
,artist
,gender
和level
- 一些工夫和ID类的字段特色
ts
(工夫戳),registration
(工夫戳),page
和userId
。 - 可能作用不太大的一些字段
firstName
,lastName
,method
,status
,userAgent
和auth
等(期待进一步开掘)
📌 时间跨度信息
# 排序
df = df . sort('ts', ascending= False)
# 获取最大最小工夫戳
df . select(F . max(df . ts), F . min(df . ts)) . show()
# https://www.programiz.com/python-programming/datetime/timestamp-datetime
# 转换为日期
print("Min date =", datetime.fromtimestamp(1538352117000 / 1000))
print("Max date =", datetime.fromtimestamp(1543799476000 / 1000))
# 最早注册工夫
df.select(F.min(df.registration)).show()
print("Min register =", datetime.fromtimestamp(1521380675000 / 1000))
📌 字段散布
# 统计字段的不同取值数量
cols = df.columns
n_unique = []
for col in cols:
n_unique.append(df.select(col).distinct().count())
pd.DataFrame(data={'col':cols, 'n_unique':n_unique}).sort_values('n_unique', ascending=False)
后果如下,ID类的属性有最多的取值,其余的字段属性绝对集中。
📌 类别型取值散布
咱们来看看下面剖析的尾部,散布比拟集中的类别型字段的取值有哪些。
# method
df . select(['method']) . distinct() . show()
# level
df.select(['level']).distinct().show()
# status
df . select(['status']) . distinct() . show()
# gender
df.select(['gender']).distinct().show()
# auth
df . select(['auth']) . distinct() . show()
咱们再看看取值中等和较多的字段
# page
df . select(['page']) . distinct() . show()
# userAgent
df.select(['userAgent']).distinct().show()
# artist
df.select(['artist']).distinct().show()
# song
df.select(['song']).distinct().show()
③ 缺失值剖析
咱们首先剔除掉userId为空的数据记录,总共删除了 8,346 行。
no_userId = df . where(df . userId == "")
no_userId . count()
no_userId . limit(10) . toPandas()
# 构建无userId缺失数据的视图
df = df . where(df . userId != "")
df . createOrReplaceTempView("sparkify_table")
咱们再统计一下其余字段的缺失情况
# 类别型字段
general_string_type = ['auth', 'firstName', 'gender', 'lastName', 'level', 'location', 'method', 'page', 'userAgent', 'userId']
for col in general_string_type:
null_vals = df.select(col).where(df[col].isNull()).count()
print(f'{col}: {null_vals}')
# 数值型字段
numerical_cols = ['itemInSession', 'length', 'registration', 'sessionId', 'status', 'ts']
for col in numerical_cols:
null_vals = df.select(col).where(df[col] == np.nan).count()
print(f'{col}: {null_vals}')
# 间接统计缺失值并输入信息
# Reference
# https://sparkbyexamples.com/pyspark/pyspark-find-count-of-null-none-nan-values/
def make_missing_bool_index(c):
'''
Generates boolean index to check missing value/NULL values
@param c (string) - string of column of dataframe
returns boolean index created
'''
# removed checking these 2 since they would flag some incorrect rows, e.g. the song "None More Black" would be flagged
# col(c).contains('None') | \
# col(c).contains('NULL') | \
bool_index = (F.col(c) == "") | \
F.col(c).isNull() | \
F.isnan(c)
return bool_index
missing_count = [F.count(F.when(make_missing_bool_index(c), c)).alias(c)
for c in df.columns]
df.select(missing_count).toPandas()
④ EDA洞察&论断
因为咱们的数据是基于各种有工夫戳的交易来组织的,以事件为根底(基于 “页 “列),咱们须要执行额定的特色工程来定制咱们的数据以适应咱们的机器学习模型。
📌 指标&问题
- 用户散失是什么意思?是指勾销订阅吗?
📌 重要字段列
-
ts
– 工夫戳,在以下场景有用- 订阅与勾销之间的工夫点信息
- 构建「听歌的均匀工夫」特色
- 构建「听歌之间的工夫距离」特色
- 基于工夫戳构建数据样本,比方选定用户散失前的3个月或6个月
registration
– 工夫戳 – 用于辨认交易的范畴-
page
– 用户正在参加的事件- 自身并无用处
- 须要进一步特色工程,从页面类型中提取信息,或联合工夫戳等信息
-
userId
- 自身并无用处
- 基于用户分组实现统计特色
📌 配合特色工程有用的字段列
-
song
– 歌名,可用于构建相似下述的特色:- 用户听的不同歌曲数量
- 用户听同一首歌的次数
-
artist
– 歌手,可用于构建相似下述的特色:- 每个用户收听的歌手数量
-
因为是明文的歌名,咱们甚至能够通过内部API补充信息构建特色:
- 用户收听的音乐类型(并察看类型是否影响流失率)。
-
gender
– 性别- 不同性别的人可能有不同的音乐偏好。
-
level
– 等级- 辨别用户是收费的还是付费的
-
location
– 地区- 地区差异
📌 无用字段列(咱们会间接删除)
firstName
和lastName
– 名字个别在模型中很难间接给到信息。method
– 仅仅有PUT或GET取值,是网络申请类型,作用不大。status
– 仅仅是API响应,例如200/404,作用不大。-
userAgent
–指定用户应用的浏览器类型- 有可能不同浏览器代表的用户群体有差异,这个能够进一步调研
auth
– 登入登出等信息,作用不大
💡 数据处理
① 定义散失
咱们的 page
性能有 22 个独特的标签,代表用户点击或拜访的页面,联合下面的数据分析大家能够看到页面包含对于
、登录
、注册
等。
能够帮忙咱们定义散失的页面是 Cancellation Confirmation
,示意 收费 和 付费 用户均存在流媒体平台。
# 定义散失用户
is_churn = F.udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0, IntegerType())
df = df.withColumn("churn", is_churn(df.page))
df.createOrReplaceTempView("sparkify_table")
user_window = Window \
.partitionBy('userId') \
.orderBy(F.desc('ts')) \
.rangeBetween(Window.unboundedPreceding, 0)
# manually define schema
# https://stackoverflow.com/questions/40517553/pyspark-valueerror-some-of-types-cannot-be-determined-after-inferring
tmp_row = spark_local.sparkContext.parallelize(Row(second_row)).toDF(schema=df.schema)
df.where(df.userId == 100001).union(tmp_row).withColumn('pre_churn', F.sum('churn').over(user_window)).limit(5).toPandas()
df = df.withColumn('preChurn', F.sum('churn').over(user_window))
df.createOrReplaceTempView("sparkify_table")
对用户散失状况做简略剖析
spark_local.sql('''
SELECT SUM(churn)
FROM sparkify_table
GROUP BY userId
''').toPandas().value_counts()
在咱们采样进去的小数据集中:有225 个用户, 23%(52 个用户)散失 。
② 特色工程
对于特色工程能够参考ShowMeAI的以下文章详解
- 机器学习实战 | 机器学习特色工程最全解读
本文中所应用到的特色工程如下:
- ① 歌曲和歌手相干:
uniqueSongs
,uniqueArtists
,uniqueSongArtist
. - ② 用户服务时长:
dayServiceLen
(注册到上次与网站互动之间的天数) - ③ 用户行为统计:
countListen
(收听次数),countSession
(session数量),lengthListen
(听的总时长) - ④ 应用②和③的组合
lengthListenPerDay
,countListenPerDay
,sessionPerDay
等 - ⑤ 针对一些统计值(
countListen
,countSession
, 和lengthListen
等)计算的差别度。
📌 清理数据
# 清理数据
def clean_data(df):
'''
Cleans raw dataframe to:
i. sort values
ii. remove null userId rows
@param df: raw spark dataframe
returns updated spark dataframe
'''
# sort values
df = df.sort('ts', ascending=False)
# remove null userIds
df = df.where(df.userId != "")
return df
📌 定义用户散失标签
# 定义用户散失
def define_churn(df):
'''
Define churn
@param df - spark dataframe
returns updated spark dataframe
'''
# define churn as cancellation confirmation
is_churn = F.udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0, IntegerType())
df = df.withColumn("churn", is_churn(df.page))
return df
📌 清理脏数据
有一部分用户在散失之后,还有一些数据信息,这可能是工夫戳的问题,咱们把这部分数据清理掉
# 清理脏数据
def remove_post_churn_rows(df, spark, sql_table):
'''
Remove post-churn rows
@param df - spark dataframe
@param spark - SparkSession instance
@param sql_table - string representing name of sql table
returns updated spark dataframe
'''
# define window function to mark non-churn related rows
user_window = Window \
.partitionBy('userId') \
.orderBy(F.desc('ts')) \
.rangeBetween(Window.unboundedPreceding, 0)
df = df.withColumn('preChurn', F.sum('churn').over(user_window))
# remove rows for userIds which are marked as churn but have a timestamp after the 'Cancellation Confirmation' page
# define GROUP BY and merge against larger df
churn_df = spark.sql(f'''
SELECT
userId AS tmpId,
MAX(churn) AS tmpChurn
FROM {sql_table}
GROUP BY userId
''')
df = df.join(churn_df, df.userId == churn_df.tmpId, "left")
# remove instances where churned userIds have transctions post Cancellation Confirmation
df = df.where(~((df.preChurn == 0) & (df.tmpChurn == 1)))
# remove tmp rows
df = df.drop('tmpId', 'tmpChurn')
return df
📌 工夫特色
def prelim_feature_eng(df):
'''
Feature engineer columns:
i timeSinceRegister
ii. columns representing time scope of entry
@param df: raw spark dataframe
returns updated spark dataframe
'''
# create new column representing time since registration (ms)
time_since_register = F.col('ts') - F.col('registration')
df = df.withColumn("timeSinceRegister", time_since_register)
# create 3 new columns representing when row data relates to
mth_3 = 60 * 60 * 24 * 90
mth_6 = 60 * 60 * 24 * 180
mth_12 = 60 * 60 * 24 * 365
mth_3_f = F.udf(lambda x : 1 if x / 1000 <= mth_3 else 0, IntegerType())
mth_6_f = F.udf(lambda x : 1 if x / 1000 <= mth_6 else 0, IntegerType())
mth_12_f = F.udf(lambda x : 1 if x / 1000 <= mth_12 else 0, IntegerType())
df = df.withColumn("month3", mth_3_f(df.timeSinceRegister))\
.withColumn("month6", mth_6_f(df.timeSinceRegister))\
.withColumn("month12", mth_12_f(df.timeSinceRegister))
return df
📌 统计&组合特色
def melt_data(df, spark, sql_table):
'''
Melts data to show entries on a user basis for the following columns:
- userId
- gender
- level
- location
- uniqueSongs
- uniqueArtists
- dayServiceLen
- countListen1H,
- countSession1H,
- lengthListen1H,
- countListen2H,
- countSession2H,
- lengthListen2H
- churn
@param df - spark dataframe
@param spark - SparkSession instance
@param sql_table - string representing name of sql table
returns updated spark datafraem
'''
melt1 = spark.sql(f'''
SELECT userId,
MIN(gender) AS gender,
MIN(level) AS level,
MAX(location) AS location,
COUNT(DISTINCT(song)) AS uniqueSongs,
COUNT(DISTINCT(artist)) AS uniqueArtists,
COUNT(DISTINCT(song, artist)) AS uniqueSongArtist,
MAX(Churn) AS churn
FROM {sql_table}
GROUP BY userId
''')
melt2 = spark.sql(f'''
WITH sparkify_table_upt AS (
SELECT * FROM {sql_table}
WHERE page = "NextSong"
),
msServiceTable AS (
SELECT userId,
MAX(ts) - MIN(ts) AS msServiceLen,
MIN(ts) + (MAX(ts) - MIN(ts)) / 2 AS midTs
FROM sparkify_table_upt
GROUP BY userId
),
earlyHalfTable AS (
SELECT a.userId,
COUNT(1) AS countListen1H,
COUNT(DISTINCT(a.sessionId)) AS countSession1H,
SUM(a.length) AS lengthListen1H
FROM sparkify_table_upt AS a
LEFT JOIN msServiceTable AS b ON b.userId = a.userId
WHERE a.ts < b.midTs
GROUP BY a.userId
),
lateHalfTable AS (
SELECT a.userId,
COUNT(1) AS countListen2H,
COUNT(DISTINCT(a.sessionId)) AS countSession2H,
SUM(a.length) AS lengthListen2H
FROM sparkify_table_upt AS a
LEFT JOIN msServiceTable AS b ON b.userId = a.userId
WHERE a.ts >= b.midTs
GROUP BY a.userId
),
concatTable AS (
SELECT m.userId AS tmpUserId,
milisecToDay(msServiceLen) AS dayServiceLen,
countListen1H + countListen2H AS countListen,
countSession1H + countSession2H AS countSession,
lengthListen1H + lengthListen2H AS lengthListen,
countListen2H - countListen1H AS countListenDiff,
countSession2H - countSession1H AS countSessionDiff,
lengthListen2H - lengthListen1H AS lengthListenDiff
FROM msServiceTable as m
LEFT JOIN earlyHalfTable as e ON e.userId = m.userId
LEFT JOIN lateHalfTable AS l ON l.userId = m.userId
)
SELECT *,
lengthListen / dayServiceLen AS lengthListenPerDay,
countListen / dayServiceLen AS countListenPerDay,
countSession / dayServiceLen AS sessionPerDay,
lengthListen / countListen AS lengthPerListen,
lengthListen / countSession AS lengthPerSession
FROM concatTable
''')
melt_concat = melt1.join(melt2, melt1.userId == melt2.tmpUserId, "Left")
melt_concat = melt_concat.drop('tmpUserId')
return melt_concat
📌 地位信息
def location_feature_eng(df, census):
'''
Create 2 new columns from location -> Region and Division
@param df: raw spark dataframe
@param census: csv file containing location mapping based on state code
returns updated spark dataframe
'''
# some census data contains two states, for simplicity, selecting last location
map_region = F.udf(lambda x: census.loc[census['State Code'] == x[-2:], 'Region'].iloc[0], StringType())
map_division = F.udf(lambda x: census.loc[census['State Code'] == x[-2:], 'Division'].iloc[0], StringType())
df = df.withColumn("region", map_region(df.location))\
.withColumn("division", map_division(df.location))
return df
📌 组织数据&特色流水线
# 读数据
df_train = spark_session.read.json(src)
# 剔除无用字段
df_train = df_train.drop('firstName', 'lastName', 'method', 'status', 'userAgent', 'auth')
# 清理数据
df_train = clean_data(df_train)
df_train = define_churn(df_train)
df_train.createOrReplaceTempView("table")
# 革除脏数据
df_train = remove_post_churn_rows(df_train, spark_local, "table")
# 根底特色
df_train = prelim_feature_eng(df_train)
# 更新表
df_train.createOrReplaceTempView("table")
# 增加更多特色
df_melt = melt_data(df_train, spark_local, "table")
df_melt = location_feature_eng(df_melt, census)
📌 查看数据特色
pd_melt = df_melt . toPandas()
pd_melt . describe()
💡 进一步数据摸索
① 流失率
predictor = pd_melt['churn'].value_counts()
print(predictor)
plt.title('Churn distribution')
predictor.plot.pie(autopct='%.0f%%')
plt.show()
② 数值vs类别型特色
label = 'churn'
categorical = ['gender', 'level' , 'location', 'region', 'division']
numerical = ['uniqueSongs', 'uniqueArtists', 'uniqueSongArtist', 'dayServiceLen', \
'countListen', 'countSession', 'lengthListen', 'countListenDiff', 'countSessionDiff',\
'lengthListenDiff', 'lengthListenPerDay', 'countListenPerDay',\
'sessionPerDay', 'lengthPerListen', 'lengthPerSession']
plt.title('Distribution of numerical/categorical features')
plt.pie([len(categorical), len(numerical)], labels=['categorical', 'numerical'], autopct='%.0f%%')
plt.show()
在咱们所有的特色中,25% 是类别型的。
③ 数值型特色散布
📌 数值特色&散失散布
def plot_distribution(df, hue, filter_col=None, bins='auto'):
'''
Plots distribution of numerical columns
By default, exclude object, datetime, timedelta and bool types and only consider numerical columns
@param df (DataFrame) - dataset
@param hue (str) - column of dataset to apply hue (useful for classification)
@param filter_col (array) - optional argument, features to be included in plot
@param bins (int) - defaults to auto for seaborn, sets number of bins of histograms
'''
if filter_col == None:
filter_col = df.select_dtypes(exclude=['object', 'datetime', 'timedelta', 'bool']).columns
num_cols = len(list(filter_col))
width = 3
height = num_cols // width if num_cols % width == 0 else num_cols // width + 1
plt.figure(figsize=(18, height * 3))
for i, col in zip(range(num_cols), filter_col):
plt.subplot(height, width, i + 1)
plt.xlabel(col)
plt.ylabel('Count')
plt.title(f'Distribution of {col}')
sns.histplot(df, x=col, hue=hue, element="step", stat="count", common_norm=False, bins=bins)
plt.tight_layout()
plt.show()
# 绘制数值型特色分布图
plot_distribution(pd_melt, 'churn', filter_col=numerical)
咱们的数值型特色上能够看出:
- 散失与非散失用户都有右偏偏向的散布
dayServiceLen
字段有最显著的散失客户和非散失客户散布差别。
📌 数值型特色相关度
# 定义数值型特色
numerical_churn = numerical + ['churn']
# 计算相关性
corr_data = pd_melt[numerical_churn].corr()
# 绘制热力求显示相关性
plt.figure(figsize=(16,16))
plt.title('Heat map of correlation for all variables')
matrix = np.triu(corr_data)
sns.heatmap(corr_data, cmap='Blues', annot=True, mask=matrix)
plt.show()
- 咱们从热力求上没有看到有数值型特色与散失标签列有显著的高相关性。
- 有几组特色,uniqueArtists、uniqueSongArtist、countListen、countSession和lengthListen,它们之间有十分高的相关性。如果大家应用线性模型,能够思考做特征选择,咱们后续应用非线性模型的话,能够思考保留。
④ 类别型特色的散布
def plot_cat_distribution(data, colname):
'''
Plots barplot for categorical columns and piechart showing proportions of churned vs non-churned customers
@param - data (panas dataframe)
@param - colname (str) - column of dataframe referenced
'''
# https://www.statology.org/seaborn-stacked-bar-plot/
plt.figure(figsize=(15,5))
ax1 = plt.subplot(1, 3, 1)
tmp = data.copy()
tmp['count'] = 1
x = tmp.groupby([colname, 'churn']).count().reset_index()[[colname, 'churn','count']]
# churn index 0, 1 doesn't relate to No, Yes, relates to pivoted index only
x = x.pivot(index='churn', columns=colname).transpose().reset_index().drop('level_0', axis=1)
x = x.fillna(0)
plt.title(f'Distribution of {colname}')
plt.ylabel('Count')
x.plot.bar(x=colname, stacked=True, ax=ax1, color=['green', 'lightgreen'])
ax2 = plt.subplot(1, 3, 2)
plt.title(f'Proportion of {colname} for churned customers')
plt.pie(x['Yes'], labels=x[colname], autopct='%.0f%%')
plt.subplot(1, 3, 3)
plt.title(f'Proportion of {colname} for non-churned customers')
plt.pie(x['No'], labels=x[colname], autopct='%.0f%%')
plt.tight_layout()
plt.show()
x.index.rename('index', inplace=True)
print(x)
tmp_sum = x[['No','Yes']].sum(axis=1)
x['No'] = x['No'] / tmp_sum
x['Yes'] = x['Yes'] / tmp_sum
print(x)
print(tmp_sum / tmp_sum.sum())
tmp_pd_melt = pd_melt.copy()
tmp_pd_melt['churn'] = tmp_pd_melt['churn'].apply(lambda x: 'Yes' if x == 1 else 'No')
📌 性别&散失散布
plot_cat_distribution(tmp_pd_melt, 'gender')
散失客户的男性比例更高。
📌 等级&散失散布
plot_cat_distribution(tmp_pd_melt, 'level')
收费和付费客户的散失比例简直没有差别(差2%),尽管图上表明付费客户散失的可能性稍小一点,但这个特色在建模过程中可能作用不大。
📌 地区&散失散布
plot_cat_distribution(tmp_pd_melt, 'region')
图上能够看出地区有一些差别,南部地区的散失要重大一些,相比之下北部地区的散失用户少一些。
能够进一步对地区细化和绘图
plot_cat_distribution(tmp_pd_melt, 'division')
📌 类别型特色取值数量散布
def cardinality_plot(df, filter_col=None):
'''
Input list of categorical variables to filter
Default is None where it would only consider columns which have type 'Object'
@param df (DataFrame) - dataset
@param filter_col (array) - optional argument to specify columns we want to filter
'''
if filter_col == None:
filter_col = df.select_dtypes(include='object').columns
num_unique = []
for col in filter_col:
num_unique.append(len(df[col].unique()))
plt.bar(list(filter_col), num_unique)
plt.title('Number of unique categorical variables')
plt.xlabel('Column name')
plt.ylabel('Num unique')
plt.xticks(rotation=90)
plt.yticks([0, 1, 2, 3, 4])
plt.show()
return pd.Series(num_unique, index=filter_col).sort_values(ascending=False)
cardinality_plot(pd_melt, categorical)
间接看最喜爱的location,取值数量有点太多了,咱们能够思考用粗粒度的地理位置信息,可能辨别能力会强一些。
下述局部,咱们会应用spark进行特色工程&大数据建模与调优,相干内容能够浏览ShowMeAI的以下文章,咱们对它的用法做了具体的解说
- 📂 图解大数据 | 工作流与特色工程@Spark机器学习
<!—->
- 📂 图解大数据 | 建模与超参调优@Spark机器学习
💡 建模优化
咱们先对数值型特色做一点小小的数据变换(这里用到的是log变换),这样咱们的原始数值型特色散布能够失去肯定水平的校对。
def log_transform(df, columns):
'''
Log trasform columns in dataframe
@df - spark dataframe
@columns - array of string of column names to be log transformed
returns updated spark dataframe
'''
log_transform_func = F.udf(lambda x: np.log10(x + 1), FloatType())
for col in columns:
df = df.withColumn(col, log_transform_func(df[col]))
return df
# 数值型特色log变换
df_melt = log_transform(df_melt, numerical)
① 数据切分
接下来咱们把数据集拆分为 60:20:20 的3局部,别离用于训练、验证和测试。
df_melt_copy = df_melt . withColumn("label", df_melt . churn)
rest, test = df_melt_copy.randomSplit([0.8, 0.2], seed=42)
train, val = rest.randomSplit([0.75, 0.25], seed=42)
② 建模流水线
# 导入工具库
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, MinMaxScaler, OneHotEncoder, StringIndexer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score
from sklearn.metrics import roc_curve, precision_recall_curve, confusion_matrix, ConfusionMatrixDisplay
import re
# 数值型特色解决流水线
numerical_assembler = VectorAssembler(inputCols=numerical, outputCol="numericalFeatures")
standardise = StandardScaler(inputCol="numericalFeatures", outputCol="standardNumFeatures", withStd=True, withMean=True)
minmax = MinMaxScaler(inputCol="standardNumFeatures", outputCol="minmaxNumFeatures")
# 类别型特色解决流水线
inputCols = ['gender', 'level', 'region', 'division']
outputColsIndexer = [x + 'SI' for x in inputCols]
indexer = StringIndexer(inputCols = inputCols, outputCols=outputColsIndexer)
outputColsOH = [x + 'OH' for x in inputCols]
onehot = OneHotEncoder(inputCols=outputColsIndexer, outputCols=outputColsOH)
categorical_assembler = VectorAssembler(inputCols=outputColsOH, outputCol="categoricalFeatures")
# 组合两类特色
total_assembler = VectorAssembler(inputCols=['minmaxNumFeatures', 'categoricalFeatures'], outputCol='features')
pipeline = Pipeline(stages=[numerical_assembler, standardise, minmax, indexer, onehot, categorical_assembler, total_assembler])
# 运行流水线对数据进行解决
pipeline . fit(train) . transform(train) . head()
失去如下后果
Row(userId='10', gender='M', level='paid', location='Laurel, MS', uniqueSongs=629, uniqueArtists=565, uniqueSongArtist=633, churn=0, dayServiceLen=42.43672561645508, countListen=673, countSession=6, lengthListen=166866.37250999993, countListenDiff=-203, countSessionDiff=2, lengthListenDiff=-48180.54478999992, lengthListenPerDay=3932.121766842835, countListenPerDay=15.858904998528928, sessionPerDay=0.14138696878331883, lengthPerListen=247.94408991084686, lengthPerSession=27811.062084999987, region='South', division='East South Central', label=0, numericalFeatures=DenseVector([629.0, 565.0, 633.0, 42.4367, 673.0, 6.0, 166866.3725, -203.0, 2.0, -48180.5448, 3932.1218, 15.8589, 0.1414, 247.9441, 27811.0621]), standardNumFeatures=DenseVector([-0.3973, -0.331, -0.3968, -0.016, -0.3968, -0.6026, -0.3993, -0.6779, 0.6836, -0.6549, -0.3678, -0.3625, -0.1256, -0.1374, 1.1354]), minmaxNumFeatures=DenseVector([0.1053, 0.1587, 0.1034, 0.6957, 0.0838, 0.0392, 0.0835, 0.5701, 0.5, 0.5692, 0.0264, 0.0245, 0.0002, 0.5344, 0.56]), genderSI=0.0, levelSI=1.0, regionSI=0.0, divisionSI=4.0, genderOH=SparseVector(1, {0: 1.0}), levelOH=SparseVector(1, {}), regionOH=SparseVector(3, {0: 1.0}), divisionOH=SparseVector(8, {4: 1.0}), categoricalFeatures=SparseVector(13, {0: 1.0, 2: 1.0, 9: 1.0}), features=DenseVector([0.1053, 0.1587, 0.1034, 0.6957, 0.0838, 0.0392, 0.0835, 0.5701, 0.5, 0.5692, 0.0264, 0.0245, 0.0002, 0.5344, 0.56, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0]))
③ 初步建模&评估
咱们先定义一个模型评估函数,因为是类别非均衡场景,咱们这里笼罩比拟多的评估准则,包含罕用的precision、recall以及排序准则auc等。
# 模型评估函数
def evaluate_model(y_trueTrain, y_predTrain, y_trueTest, y_predTest, y_testProba):
'''
Wrapper function for evaluating classification results
'''
train_acc = accuracy_score(y_trueTrain, y_predTrain)
test_acc = accuracy_score(y_trueTest, y_predTest)
fscore = f1_score(y_trueTest, y_predTest, zero_division=0)
precision = precision_score(y_trueTest, y_predTest, zero_division=0)
recall = recall_score(y_trueTest, y_predTest, zero_division=0)
# linear models would not have .predict_proba method so, if fails, append 0 to roc_auc
try:
roc_auc = roc_auc_score(y_trueTest, y_testProba)
except:
roc_auc = 0
return {
'train_acc': train_acc,
'test_acc' : test_acc,
'fscore': fscore,
'precision': precision,
'recall': recall,
'roc_auc': roc_auc
}
📌 逻辑回归
# 定义模型
lr = LogisticRegression(maxIter=10, regParam=0.0, elasticNetParam=0)
pipeline_lr = Pipeline(stages=[numerical_assembler, standardise, minmax, indexer, onehot, categorical_assembler, total_assembler, lr])
# 拟合
lrModel = pipeline_lr.fit(train)
lr_res_test = lrModel.transform(val).select('label', 'prediction', 'probability').toPandas()
lr_res_train = lrModel.transform(train).select('label', 'prediction', 'probability').toPandas()
# 评估
lr_results = evaluate_model(lr_res_train['label'],lr_res_train['prediction'],lr_res_test['label'],lr_res_test['prediction'], lr_res_test['probability'].apply(lambda x: x[1]))
lr_results
后果如下
{'train_acc': 0.8456375838926175,
'test_acc': 0.8780487804878049,
'fscore': 0.7368421052631579,
'precision': 0.5833333333333334,
'recall': 1.0,
'roc_auc': 0.9579831932773109}
📌 梯度晋升树GBT
# 定义模型
gbt = GBTClassifier()
pipeline_gbt = Pipeline(stages=[numerical_assembler, standardise, minmax, indexer, onehot, categorical_assembler, total_assembler, gbt])
# 拟合
gbtModel = pipeline_gbt.fit(train)
gbt_res_test = gbtModel.transform(val).select('label', 'prediction', 'probability').toPandas()
gbt_res_train = gbtModel.transform(train).select('label', 'prediction', 'probability').toPandas()
# 评估
gbt_results = evaluate_model(gbt_res_train['label'],gbt_res_train['prediction'],gbt_res_test['label'],gbt_res_test['prediction'],\
gbt_res_test['probability'].apply(lambda x: x[1]))
gbt_results
后果如下
{'train_acc': 1.0,
'test_acc': 0.8048780487804879,
'fscore': 0.6,
'precision': 0.46153846153846156,
'recall': 0.8571428571428571,
'roc_auc': 0.8193277310924371}
📌 随机森林
# 定义模型
rf = RandomForestClassifier()
pipeline_rf = Pipeline(stages=[numerical_assembler, standardise, minmax, indexer, onehot, categorical_assembler, total_assembler, rf])
# 拟合
rfModel = pipeline_rf.fit(train)
rf_res_test = rfModel.transform(val).select('label', 'prediction', 'probability').toPandas()
rf_res_train = rfModel.transform(train).select('label', 'prediction', 'probability').toPandas()
# 评估
rf_results = evaluate_model(rf_res_train['label'],rf_res_train['prediction'],rf_res_test['label'],rf_res_test['prediction'], rf_res_test['probability'].apply(lambda x: x[1]))
rf_results
后果如下
{'train_acc': 0.959731543624161,
'test_acc': 0.8780487804878049,
'fscore': 0.6666666666666666,
'precision': 0.625,
'recall': 0.7142857142857143,
'roc_auc': 0.9243697478991597}
📌 综合比照
cv_results = pd.DataFrame(columns=['accuracy_train','accuracy_cv','fscore_cv','precision_cv','recall_cv', 'roc_auc_cv'])
cv_results.loc['LogisticRegression'] = lr_results.values()
cv_results.loc['GradientBoostingTree'] = gbt_results.values()
cv_results.loc['RandomForest'] = rf_results.values()
cv_results.style.apply(lambda x: ["background: lightgreen" if abs(v) == max(x) else "" for v in x], axis = 0)
综合比照后果如下:
咱们在上述建模与评估过程中,综合比照了训练集和验证集的后果。对于评估准则:
- accuracy通常不是掂量类别非均衡场景下的分类好指标。 极其的状况下,仅预测咱们所有的客户“不散失”就达到 77% 的accuracy。
- recall掂量咱们的正样本中有多少被模型预估为正样本,即
TP / (TP + FN)
,咱们上述建模过程中,LogisticRegression
正确辨认所有会散失的客户。 - recall还须要联合precision一起看,例如,上述
LogisticRegression
预估的散失客户中,只有 58% 真正散失了。 (这意味着如果咱们要发展营销流动来解决客户散失问题,有42% (1 – 0.58) 的老本会节约在未散失客户身上)。 - 能够应用 fscore 指标来综合思考recall和precision。
- ROC_AUC 掂量咱们的真阳性与假阳性率。 咱们的 AUC 越高,模型在辨别正类和负类方面的性能就越好。
上述指标中,咱们优先关注ROC_AUC,其次是 fscore,咱们上述指标中LogisticRegression
成果良好,上面咱们基于它进一步调优。
④ 超参数调优
📌 穿插验证
咱们下面的建模只是敲定了一组超参数,超参数会影响模型的最终成果,咱们能够应用spark的CrossValidator
进行超参数调优,选出最优的超参数。
paramGrid = ParamGridBuilder() \
.addGrid(lr.regParam,[0.0, 0.1]) \
.addGrid(lr.maxIter,[50, 100]) \
.build()
crossval = CrossValidator(estimator=pipeline_lr,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator(),
numFolds=3)
# 穿插验证调参
cvModel = crossval . fit(rest)
cvModel . avgMetrics
输入后果如下
[0.8011084544393228,
0.8222872837788751,
0.7284659848286738,
0.7284659848286738]
咱们对测试集做评估
# 穿插验证评估
cv_res_test = cvModel.transform(test).select('label', 'prediction', 'probability').toPandas()
cv_res_train = cvModel.transform(rest).select('label', 'prediction', 'probability').toPandas()
cv_metrics = evaluate_model(cv_res_train['label'],cv_res_train['prediction'],cv_res_test['label'],cv_res_test['prediction'], cv_res_test['probability'].apply(lambda x: x[1]))
cv_metrics
{'train_acc': 0.8894736842105263,
'test_acc': 0.8571428571428571,
'fscore': 0.7368421052631577,
'precision': 0.7,
'recall': 0.7777777777777778,
'roc_auc': 0.858974358974359}
📌 最优超参数
cvModel . getEstimatorParamMaps()[np . argmax(cvModel . avgMetrics)]
# 输入后果
{Param(parent='LogisticRegression_e765de70ec6a', name='regParam', doc='regularization parameter (>= 0).'): 0.0,
Param(parent='LogisticRegression_e765de70ec6a', name='maxIter', doc='max number of iterations (>= 0).'): 100}
💡 后果评估
咱们的 ROC_AUC 从 95.7 降落到 85.9。 这并不奇怪,因为我狐疑 95.7 的后果是因为适度拟合造成的。
{'train_acc': 0.8894736842105263,
'test_acc': 0.8571428571428571,
'fscore': 0.7368421052631577,
'precision': 0.7,
'recall': 0.7777777777777778,
'roc_auc': 0.858974358974359}
最好的参数是 regParam
为 0 和 maxIter
100 个。
① 混同矩阵
咱们定一个函数来绘制一下混同矩阵(即对正负样本和预估后果划分4个象限进行评估)。
def plot_confusion_matrix(y_true, y_pred, title):
'''
Plots confusion matrix
@param y_true - array of actual labels
@param y_pred - array of predictions
@title title - string of title
'''
conf_matrix = confusion_matrix(y_true, y_pred)
matrix_display = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=["No Churn", "Churn"])
matrix_display.plot(cmap='Greens')
# adding title - https://github.com/scikit-learn/scikit-learn/discussions/20690
matrix_display.ax_.set_title(title)
plt.grid(False)
plt.show()
# Calculating recall (sensitivity), precision and specificity
tn = conf_matrix[0][0]
tp = conf_matrix[1][1]
fn = conf_matrix[1][0]
fp = conf_matrix[0][1]
print(f'True Positive Rate/Recall/Sensitivity: {round(tp/(tp+fn), 6)}')
# basically inverse of TPR
print(f'False Positive Rate/(1 - Specificity): {round(fp/(tn+fp), 6)}')
print(f'Precision : {round(tp/(tp+fp), 6)}')
# 绘制混同矩阵
plot_confusion_matrix(cv_res_test['label'], cv_res_test['prediction'], "Confusion matrix at 50% threshold (default)")
查看上面的混同矩阵,用0.5的默认概率阈值可能正确预测 77.78% 的散失客户 (7/(7+2))
,也具备 70% 的不错的precision (7/(7+3))
② ROC_AUC 曲线
# 预测概率
test_proba = cv_res_test['probability'] . apply(lambda x: x[1])
# fpr = false positive rate
# tpr = true positive rate
fpr, tpr, _ = roc_curve(cv_res_test['label'], test_proba)
# 绘图
plt.figure(figsize=(10,8))
plt.title('ROC AUC Curve for customer churn')
plt.xlabel('False Positive Rate (FPR)')
plt.ylabel('True Postive Rate (FPR) / Recall')
plt.plot(fpr, tpr, marker='.', label='LR')
plt.plot([0, 1], [0, 1])
plt.show()
上面的 ROC AUC 曲线分明地显示了召回率(真阳性率)和假阳性率之间的衡量。
③ PR 曲线
lr_precision, lr_recall, _ = precision_recall_curve(cv_res_test['label'], test_proba)
# 绘制PR曲线
plt.figure(figsize=(10,8))
plt.title('Recall/Precision curve')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.plot(lr_recall, lr_precision, marker='.', label='LR')
plt.axhline(y=cv_metrics['precision'], color='r')
plt.axvline(x=cv_metrics['recall'], color='r')
plt.show()
上面的召回/精度图中的交点代表了咱们调整后的LogisticRegression
模型的召回-精度。默认的50%的决策阈值得出了77.8%/70%的召回率-精确度的衡量。
通过调整咱们的决策阈值,咱们能够定制咱们想要的召回/准确率。
💡 总结&业务思考
咱们能够调整咱们的决策(概率)阈值,以取得一个最称心的召回率或精确度。比方在咱们的场景下,应用了0.72的阈值取代默认的0.5,后果是在召回率没有降落的根底上,晋升了精度。
事实中,召回率和精确度之间必定会有衡量,特地是当咱们在比拟大的数据集上建模利用时。
def classify_custom_threshold(y_true, y_pred_proba, threshold=0.5):
'''
Identifies custom threshold and plots confusion matrix
@y_true - array of actual labels
@y_pred_proba - array of probabilities of predictions
@threshold - decision threshold which is defaulted to 50%
'''
y_pred = y_pred_proba >= threshold
plot_confusion_matrix(y_true, y_pred, f'Confusion matrix at {round(threshold * 100, 1)}% decision threshold')
classify_custom_threshold(cv_res_test['label'], test_proba, 0.72)
咱们还须要与业务管理人员踊跃沟通,理解他们更有倾向性的指标(更看重precision还是recall):
- 优先思考recall意味着咱们能判断出大部分理论散失的客户,但这可能会升高精度,就像咱们之前提到的,这可能会导致成本增加。
- 咱们以后的后果曾经很不错了,如果业务负责人想谋求更高的召回率,并违心为此破费一些老本,咱们能够升高决策(概率)门槛。
举例来说,在咱们以后的例子中,如果咱们将决策断定概率从0.5升高到0.25,能够把召回率晋升到88.9%,但随之发生变化的是精度升高到47%。
lr_precision, lr_recall, _ = precision_recall_curve(cv_res_test['label'], test_proba)
plt.figure(figsize=(10,8))
plt.title('Recall/Precision curve')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.plot(lr_recall, lr_precision, marker='.', label='LR')
plt.axhline(y=cv_metrics['precision'], color='r', alpha=0.3)
plt.axvline(x=cv_metrics['recall'], color='r', alpha=0.3)
plt.axhline(y=0.470588, color='r')
plt.axvline(x=0.888889, color='r')
plt.show()
classify_custom_threshold(cv_res_test['label'], test_proba, 0.25)
参考资料
- 🏆 实战数据集下载(百度网盘):公众号『ShowMeAI钻研核心』回复『实战』,或者点击 这里 获取本文 [[9] Spark 海量数据上的用户留存剖析开掘与建模](https://www.showmeai.tech/art…) 『sparkify 用户散失数据集』
- ⭐ ShowMeAI官网GitHub:https://github.com/ShowMeAI-Hub
- 📘 图解数据分析:从入门到精通系列教程:https://www.showmeai.tech/tutorials/33
- 📘 图解大数据技术:从入门到精通系列教程:https://www.showmeai.tech/tutorials/84
- 📘 图解机器学习算法:从入门到精通系列教程:https://www.showmeai.tech/tutorials/34
- 📘 数据迷信工具库速查表 | Spark RDD 速查表:https://www.showmeai.tech/article-detail/106
- 📘 数据迷信工具库速查表 | Spark SQL 速查表:https://www.showmeai.tech/article-detail/107
- 📘 自动化数据分析 (EDA) 工具库大全:https://www.showmeai.tech/article-detail/284
- 📘 图解大数据 | 建模与超参调优 Spark机器学习:https://www.showmeai.tech/article-detail/181
- 📘 图解大数据 | 工作流与特色工程 Spark机器学习:https://www.showmeai.tech/article-detail/180
- 📘 机器学习实战 | 机器学习特色工程最全解读:https://www.showmeai.tech/article-detail/208
发表回复