本文于r format(Sys.Date(), "%Y-%m-%d")更新。 如发现问题或者有建议,欢迎提交 Issue
<!-- 感觉自己一点都不强,还弱弱的,我一定要搞定接口函数,有例子啊! -->
#df_register = source_register(userids)
df_register = pd.read_pickle('C:/Users/Python_bc/baichuan_model/df_register')
df_register = df_register[df_register['userid'] == userid]
翻译sql的心得
- left join 其实不麻烦,全部join起来,再处理 select 的变量。
- 详单表翻译完一张后,热情和习惯来了,继续搞。
- 表的逻辑需要一点点update问清楚。
source_register函数定义
# 所有的数据源
@data_source_logs()
def source_register(userids):
""
数据源: register, 获取ac渠道信息
""
df = user_ac_register_channel(userids)
df = _default_columns(df, ['device', 'insert_time', 'reg_chanel', 'userid'])
df.columns = ['device', 'inserttime', 'regchanel', 'userid']
df = _to_datetime(df, ['inserttime'])
return df
reg_chanel看到这种真的想打人。
装饰器解释
这里,@data_source_logs()是啥意思?
<!-- * [python - How does the @property decorator work? - Stack Overflow](https://stackoverflow.com/questions/17330160/how-does-the-property-decorator-work) -->
<!-- 是装饰器,decorator。 -->
<!-- ``` -->
<!-- @property -->
<!-- def x(self): -->
<!-- return self._x -->
<!-- ``` -->
<!-- 等价于 -->
<!-- ``` -->
<!-- def getx(self): -->
<!-- return self._x -->
<!-- x = property(getx) -->
<!-- ``` -->
<!-- 所以这里等价于 -->
<!-- ``` -->
<!-- # 所有的数据源 -->
<!-- def source_register(userids): -->
<!-- """ -->
<!-- 数据源: register, 获取ac渠道信息 -->
<!-- """ -->
<!-- df = user_ac_register_channel(userids) -->
<!-- df = _default_columns(df, ['device', 'insert_time', 'reg_chanel', 'userid']) -->
<!-- df.columns = ['device', 'inserttime', 'regchanel', 'userid'] -->
<!-- df = _to_datetime(df, ['inserttime']) -->
<!-- return df -->
<!-- x = data_source_logs(df) -->
<!-- ``` -->
所以@是装饰器, 他的作用是记录日志。
from utils import data_source_logs找到你了!
def data_source_logs():
""
数据源日志的装饰器
""
def real_decorator(raw_fun):
source_name = raw_fun.__name__
def wrapper(*args, **kwargs):
logger.info('开始获取数据源: %s ...' % source_name)
try:
df = raw_fun(*args, **kwargs)
logger.info('数据源: %s 获取完成.' % source_name)
except Exception as e:
logger.error('数据源: %s 获取失败. 失败原因: %s' % (source_name, traceback.format_exc()))
raise e
return df
return wrapper
return real_decorator
总之一句话, 就是为了跑日志,告诉哪段代码跑完了。
找接口信息
<!-- # `class`是啥东西? -->
<!-- 相当于`for` loop,暂时不管。 -->
user_ac_register_channel先搞清这个函数。
在这里from model.baichuan.itf.user_ac_register_channel import user_ac_register_channel 。
# encoding: utf-8
# 从接口获取ac 渠道注册信息
# 接口文档: http://userac.ppdapi.com:8083/swagger-ui.html
# 董长龙
# 测试环境: 172.17.3.24 userac.ppdapi.com
# 陈建
################# 输入示例 #############
# 输入: df_userid,类型为 dataframe, 主要包含两个字段: userid, listingtime
出现了接口信息了! 如果你看完了文档,你就觉得这个信息写的很详细了,真的,这是一个好的程序员写的。
<!-- ``` -->
<!-- import requests -->
<!-- import json -->
<!-- import pandas as pd -->
<!-- from model.baichuan.func.utils import config -->
<!-- #default_url = 'http://userac.ppdapi.com/v2/useracregisterchannel/%(userid)s?appid=%(appid)s' -->
<!-- def user_ac_register_channel(userids, url=config['url_ac_register_channel']): -->
<!-- headers = {"Content-type": "application/json", "Accept": "application/json"} -->
<!-- timeout = 10 -->
<!-- appid = '11050001' -->
<!-- data = { -->
<!-- 'userid': ','.join([str(i) for i in userids]), -->
<!-- 'appid': appid -->
<!-- } -->
<!-- url %= data -->
<!-- r = requests.get(url, headers=headers, timeout=timeout).text -->
<!-- if isinstance(r, unicode): -->
<!-- r = r.encode('utf-8') -->
<!-- res = json.loads(r) -->
<!-- if res['ret'] == 0: -->
<!-- res = res.get('data', []) -->
<!-- else: -->
<!-- raise Exception(res['msg']) -->
<!-- df = pd.DataFrame.from_dict(res) -->
<!-- if not df.empty: -->
<!-- df.columns = map(lambda x: x.lower(), df.columns) -->
<!-- else: -->
<!-- df = pd.DataFrame() -->
<!-- return df -->
<!-- if __name__ == '__main__': -->
<!-- d = user_ac_register_channel([3571276,3571275]) -->
<!-- print(d) -->
<!-- ``` -->
这是我唯一的底气。
问题来了,这里的config['url_ac_register_channel']是啥意思?
import yaml
import os
import logging.config
import traceback
from model.baichuan.conf.logging_conf import logging_config
# 项目路径
bash_path = os.path.abspath(os.path.join(__file__, '../../'))
# 配置文件路径
config_path = os.path.join(bash_path, 'conf')
# 配置文件内容
config = yaml.load(open(os.path.join(config_path, 'config.yaml')))
这里是查询__file__, '../../'向上第二个路径。 反正就是说,来看看config.yaml这个文件。 这里面包含了陈建写的所以的接口信息,找起来方便。 如果需要切换成测试环境的接口,config.yaml $\to$ config-dev.yaml 在C:\Users\lijiaxiang\Desktop\秉慧python代码\baichuan\conf\config.yaml中, url_ac_register_channel: http://userac.ppdapi.com/v2/useracregisterchannel/%(userid)s?appid=%(appid)s 因此就找到了这个接口,发现两个标签信息。
搞host
要搞host,这个时候一个用来配置接口的,一个用来读接口的。
switchhost找一下,这是个软件,作用如下。
需要切换host来访问到一个特定IP地址,那这个软件将帮你节省很多时间,高效率开发。
什么是hosts文件? 简单的说,hosts文件是用于本地dns服务(相关主题:什么是DNS缓存,如何清除DNS缓存?)的,采用ip 域名的格式写在一个文本文件当中,Hosts是一个没有扩展名的系统文件,可以用记事本等工具打开,其作用就是将一些常用的网址域名与其对应的IP地址建立一个关联"数据库",当用户在浏览器中输入一个需要登录的网址时,系统会首先自动从Hosts文件中寻找对应的IP地址,一旦找到,系统会立即打开对应网页,如果没有找到,则系统再会将网址提交DNS域名解析服务器进行IP地址的解析。
简单来说,有些网址登陆不上去,你需要配置一下,但是配置一下要来回切换,很麻烦,这个时候,就需要一个自动切换的软件,这就是SwitchHosts的功能之处,而且还可以自由切换,应该加个前缀Auto。
<!-- 这是官方介绍,看的东西真特么多。 -->
<!-- * [oldj/SwitchHosts: Switch hosts quickly!](https://github.com/oldj/SwitchHosts) -->
安装好后,把接口信息粘贴上去。 ( SwtichHosts快速切换Host - CSDN博客 ) follow这里的思路, 然后拨动下左侧的按钮,变绿了,就是开始切换了。 这个时候就可以用浏览器访问网址了。
http://userac.ppdapi.com:8083/swagger-ui.html 这里的8083是测试环境,一般默认80,这也可以找例子。 config.yaml和 config-dev.yaml 里面,相似的,这个地方就是不一样,可以比较比较找找感觉,搞这些,内行看逻辑,我们外行看感觉吧。
因此总结一下,就是把 测试环境: 172.17.3.24 userac.ppdapi.com粘贴在SwitchHosts, 然后浏览器访问 http://userac.ppdapi.com:8083/swagger-ui.html 搞定!
然后阅读开发文档 首次接入,请先阅读 http://confluence.ppdai.com/pages/viewpage.action?pageId=10071880 这个开发文档很简单,主要包含三个信息
- 访问链接
- 接口host
- 例子
<!-- 没有看到啊,卧槽,自己搞!!!起来。 -->
就是说,如果搞不懂,下个postman,快递小哥在不动Python的情况下,也可以实现Python的能力。 这种能力就是input标签,最后返回字段。
<!-- 来搞一下。 -->
postman
config.yaml 打开这个文档,这个文档记录了所有的接口。
# desc: 获取 ppdai_audit..preAudit 表的数据
# doc: http://confluence.ppdai.com/pages/viewpage.action?pageId=9382058
url_pre_audit: http://queryapi.listings.ppdaicorp.com/listLoanService/getPreAuditList
对preaudit开始搞起来。
对这个文件查询,
因此打开postman,选择POST,粘贴网址, http://queryapi.listings.ppdaicorp.com/listLoanService/getPreAuditList , 打开Body, 选择raw 选择JSON格式, 输入我们想要的测试的用户。 <!-- ``` --> <!-- ``` --> applicationId这个就是权限,没有就撸不到数据,是陈建部门的代码。
<!-- 那我为什么知道就需要这些标签呢? -->
<!-- 这个时候要机智啊,看开发文档啊。 -->
请求参数
- applicationId: 不能都为空
- borrowerId: 不能都为空
- listingId:不能都为空
然后在hostman中找到了结果。
{
"result": 1,
"resultMessage": "没有查询到满足条件的数据",
"itemList": null
}
<!-- 机智的一笔。 -->
Python中执行
<!-- 这个时候,需要在Python的环境下也搞一下,为了大同,不要歧视。 -->
迅速找到preAudit的构建函数。
C:\Users\lijiaxiang\Desktop\秉慧python代码\baichuan\itf\preaudit.py
headers = {"Content-type": "application/json", "Accept": "application/json"}
timeout = 10
data = {
'applicationId': ,
}
<!-- `'applicationId': 10000001` -->
这些都不需要改,header不想解释,永远不会知道。 timeout延迟时间,容忍度。 applicationId陈建的番号。
data = json.dumps(data)
response = requests.post(url, data, headers=headers, timeout=timeout).text
response = json.loads(response)
骚操作在这,等价于postman。 因为这是POST的格式,因此打法是json.dumps(data),data有啥呢?
data = {
'applicationId': ,
}
if borrowerid is not None:
data['borrowerId'] = borrowerid
if listingid is not None:
data['listingId'] = listingid
<!-- `'applicationId': 10000001,` -->
<!-- 其实就是输入标签。 -->
<!-- 开始搞起来,测试Python了。 -->
<!-- 好乱,流水账。 -->
别忘了,import json
import json
headers = {"Content-type": "application/json", "Accept": "application/json"}
timeout = 10
url = 'http://queryapi.listings.ppdaicorp.com/listLoanService/getPreAuditList'
data = {
'applicationId': ,
'borrowerId': 1,
'listingId': 1111
}
data = json.dumps(data)
response = requests.post(url, data, headers=headers, timeout=timeout).text
response = json.loads(response)
print(response)
<!-- `'applicationId': 10000001,` -->
模拟的用户数据输入好了。
结果出来了,
{'result': 1, 'resultMessage': '没有查询到满足条件的数据', 'itemList': None}
<!-- 我是天才,结果一模一样! -->
<!-- 真特么乱, -->
<!-- 流水账,我是猪吗? -->
<!-- 这点构建思路都么有? -->
<!-- 那么现在开始结束陈建的其他代码,然后开始构建自己的接口函数,感觉有坑,但是我有翅膀,随便飞。 -->
<!-- `PyCharm`好卡啊,卡到哭了。 -->
<!-- * [Download Postman](https://www.getpostman.com/app/download/win32) -->
写接口函数
这里是最重要的了,前面的重要只有5%,这里是95%,因此是在创新。
打开笔记,看看要写哪些接口。
ods.ppdai_new_invest_extend_paimoneyrecords:ppdai_new_invest_extend.paimoneyrecordsods.zmxy_score:ppdai_user_third_data.zmxy_scoreods.mobileapps宋昉ods.ppdai_user_log_userupdateinfologs:ppdai_user_log.userupdateinfologs
先搞最烦的,宋昉和何岳的。
- 网址: http://confluence/login.action?os_destination=%2Fpages%2Fviewpage.action%3FpageId%3D5079294
通过userid和入库时间查询用户mobileApps信息的接口是: /rest/getAppsByUserIdAndTime
卧槽,这个怎么拿app数据啊,我到底线下表怎么搞来着? appcompany like '%tencent%' and appcompany like '%weixin%' 因此也是抓取的。 没关系,我们试一试。 5786718用一个userid来做例子。
172.17.4.82, readapp.ppdapi.com找好了。
所以还差一个调用的链接。
就是readapp.ppdapi.com + /rest/getAppsByUserIdAndTime, 看了下写的例子:
{
"userId":346,
"guId":"hgahahah",
"startDate":"2015-06-01 00:00:00",
"endDate":"2017-06-01 00:00:00
}
注意啊,/rest/getAppsByUserIdAndTime前面的POST不要乱加。
现在尝试在Python里面实现。
"guId":"hgahahah"这个可以没有啊! 何岳跟我说,guID用工程名称。
import json
import pandas as pd
def mobileapps(userId=None):
headers = {"Content-type": "application/json", "Accept": "application/json"}
timeout = 10
url = "http://readapp.ppdapi.com/rest/getAppsByUserIdAndTime
data = {
"userId":userId,
# "guId":guId, 不需要
"startDate":"2017-01-01 00:00:00",
"endDate":"2017-06-01 00:00:00
}
data = json.dumps(data)
response = requests.post(url, data, headers=headers, timeout=timeout).text
response = json.loads(response)
df = pd.DataFrame(response)
df = df[['AppCompany','InsertTime','UserId']]
if not df.empty:
df.columns = map(lambda x: x.lower(), df.columns)
else:
all_columns=['appcompany', 'inserttime', 'userid']
df = pd.DataFrame(columns=all_columns)
return df
if __name__ == '__main__':
a = mobileapps(userId=346)#, listingids=[100076, 105709, 105890, 106452])
#a = preaudit(16981)
print(a)
第一个终于写好了!!!
之后就撸吧!
ListingAutoFlowSysProcess
<!-- `http://confluence.ppdai.com/pages/viewpage.action?pageId=10881017`接入。 -->
<!-- -->
import json
import pandas as pd
headers = {"Content-type": "application/json", "Accept": "application/json"}
timeout = 100
url = "http://loansd.ppdapi.com/Lightning/QueryService/QueryListingAutoFlowSysProcess
data = {
"UserIds":[24739192,24739203],
"Amounts":[8000]
}
data = json.dumps(data)
response = requests.post(url, data, headers=headers, timeout=timeout).text
response = json.loads(response)
# df = pd.DataFrame(response)
response.items()
# pd.DataFrame(response.items())
# df = pd.DataFrame.from_dict(response)
# df.head()
# 文档: http://confluence.ppdai.com/pages/viewpage.action?pageId=10881075
# 测试环境: 172.17.3.9 loansd.ppdapi.com
#default_url = 'http://loansd.ppdapi.com/Lightning/QueryService/QueryListingAutoFlowUserProcess'
首先配置好接口,然后写Python。
import json
import pandas as pd
headers = {"Content-type": "application/json", "Accept": "application/json"}
timeout = 100
url = "http://loansd.ppdapi.com/Lightning/QueryService/QueryListingAutoFlowUserProcess
起手定义好要用的包、headers、timeout和接入api。
定义好要取用的userid向量。
data = {
"UserIds":[24739192,24739203]
}
因为response的结果是一个字典,字典的第一个value是字典,其他我们都不需要,因此提取第一个value,然后转pd.DataFrame,就friendly read了。
data = json.dumps(data)
response = requests.post(url, data, headers=headers, timeout=timeout).text
response = json.loads(response)['Result']
对了json.loads(response)['Result']是list格式,机智的一笔。
全部代码如下:
import json
import pandas as pd
headers = {"Content-type": "application/json", "Accept": "application/json"}
timeout = 100
url = "http://loansd.ppdapi.com/Lightning/QueryService/QueryListingAutoFlowUserProcess
data = {
"UserIds":[24739192,24739203]
}
data = json.dumps(data)
response = requests.post(url, data, headers=headers, timeout=timeout).text
response = json.loads(response)['Result']
# type(response)
df = pd.DataFrame(response)
# response.items()
# pd.DataFrame(response.items())
# df = pd.DataFrame.from_dict(response)
df.head()
ods.zmxy_score
<!-- ``` -->
<!-- drop table if exists opd.test_25; -->
<!-- create table opd.test_25 as -->
<!-- select userid, listingid, director_dist_time, zhima_credit from ( -->
<!-- select a.userid, a.listingid, director_dist_time, zhima_credit, row_number() over(partition by a.listingid order by inserttime desc) as match_seq -->
<!-- from opd.test_10 a -->
<!-- left join [shuffle] ( -->
<!-- select userid, inserttime, score as zhima_credit -->
<!-- from ods.zmxy_score -->
<!-- ) b -->
<!-- on a.userid = b.userid and b.inserttime < a.director_dist_time -->
<!-- ) c -->
<!-- where match_seq = 1; -->
<!-- compute stats opd.test_25; -->
<!-- ``` -->
<!-- `ods.zmxy_score`: `ppdai_user_third_data.zmxy_score` -->
<!-- 找不到啊!哥!找开发团队! -->
172.17.4.98和3rdreadapi.ppdapi.com 找到了!
dict通过json.dumps转化 就成了json。
<!-- `10000001` 这个我也试了一下。 -->
这个appid要是appid有误,只能找业务提供方了,陈建也没办法。
import json
import pandas as pd
import requests
headers = {"Content-type": "application/json", "Accept": "application/json"}
timeout = 10
url = "http://3rdreadapi.ppdapi.com/queryData
data = {
"appid": "",
"userid": "50070214",
"options": "zhimaquery",
"version": "1.0.0",
"async": "1",
"atom": 0,
"role": 0
}
data = json.dumps(data)
response = requests.post(url, data, headers=headers, timeout=timeout).text
response = json.loads(response)
response
<!-- 11050001 -->
{'code': 0,
'data': {'zhimaquery': {'bizNo': 'ZM201702153000000217600651746412',
'id': 7667,
'inserttime': '2018-01-02 11:21:17',
'isactive': 1,
'score': ,
'updatetime': '2018-01-02 11:21:17',
'userid': 50070214}},
'message': 'success',
'version': None}
<!-- `'score': 666,` -->
<!-- 例子。 -->
<!-- { -->
<!-- "appid": "11050001", -->
<!-- "userid": "50070214", -->
<!-- "options": "zhimaquery", -->
<!-- "version": "1.0.0", -->
<!-- "async": "1", -->
<!-- "atom": 0, -->
<!-- "role": 0 -->
<!-- } -->
sql问题
def get_commondaily(userid, engine=engine_cbd):
mod = userid % 10
sql = '''
select
user_id, cmstr_reg_channl, cmstr_sec_lev, cmstr_idnm_gen, cmstr_idnm_age,
cmstr_idnm_dst, cmstr_idnm_pro, cmstr_idnm_city, cmstr_pho_pro,
cmstr_pho_cit, cmstr_pho_cit_lev, cmstr_pho_opr, cmstr_edu_sef,
cmstr_edu_cer, cmstr_edu_typ, cmstr_edu_sch, cmstr_enr_cer, cmstr_enr_typ,
cmstr_enr_sch, cmstr_mar, cmstr_occ, cmstr_occ_lif,
cmstr_pho_mod_lt, cmstr_pho_os_lt, cmstr_pho_os_ver_lt, cmstr_pho_net_lt,
cmstr_app_reg_channl
from ppdai_app_score_biz.common_user_daily000{}
where user_id = {}
'''.format(mod, userid)
df = pd.read_sql(sql, engine)
return df
这里% 10 表示连续扫描10个模。
其他
- 另外,敏感信息都过滤了,code可以重新拿。
- Appid,芝麻分。
dict $\to$ pd.DataFrame
<!-- -->
In [11]: pd.DataFrame(d.items()) # or list(d.items()) in python 3
Out[11]:
0 1
0 2012-07-02 392
1 2012-07-06 392
2 2012-06-29 391
3 2012-06-28 391
...
In [12]: pd.DataFrame(d.items(), columns=['Date', 'DateValue'])
Out[12]:
Date DateValue
0 2012-07-02 392
1 2012-07-06 392
2 2012-06-29 391
我需要再搞几个接口函数,因为不是统一的,比如中台和其他的不一样。 然后修改下文档,太乱。 这个只是解释陈建的代码文档,并非要复盘,因此看完后,就可以copy陈建代码,自己写函数了。
ValueError: If using all scalar values, you must pass an index
一个key只有一个value的字典如果直接转化成数据框会报错。
pd.DataFrame.from_dict(response['data']['zhimaquery'], orient='index').T 加入orient='index'且转置.T。
for loop
172.17.4.8和loansd.ppdapi.com 是host。
df_2=pd.concat(a)为了merge两个pd.DataFrame合并。 type(a)的格式是list,所以可以merge。
%s的使用
str = 'I love %s and %s, he loves %s and %s.'
str % ('ha','haha','hahaha','hahahaha')
%就是这么用,'I love ha and haha, he loves hahaha and hahahaha.'看句子。 哈哈真机智。 ( python about multiple %s in a string - Stack Overflow )
The d in %d stands for decimal. %d is for formatting numbers. %s is for formatting strings.
df.to_dict要具体情况具体分析
In [13]: df.to_dict('Records')[0]
Out[13]: {'a': 1, 'b': 2}