代码语言
.
CSharp
.
JS
Java
Asp.Net
C
MSSQL
PHP
Css
PLSQL
Python
Shell
EBS
ASP
Perl
ObjC
VB.Net
VBS
MYSQL
GO
Delphi
AS
DB2
Domino
Rails
ActionScript
Scala
代码分类
文件
系统
字符串
数据库
网络相关
图形/GUI
多媒体
算法
游戏
Jquery
Extjs
Android
HTML5
菜单
网页交互
WinForm
控件
企业应用
安全与加密
脚本/批处理
开放平台
其它
【
Python
】
操作 SparkSQL (spark版本2.3) JOIN使用临时表版本
作者:
黎昕
/ 发布于
2018/4/19
/
1849
# coding: utf-8 """ 使用SPAKR 对资源的操作类 """ import time from pyspark.sql import SparkSession from pyspark.sql.functions import broadcast from ccnu_resource.settings import MYSQL_CONF class ResourceError(Exception): pass class SparkBase(object): """ SPARK 基类 """ def init_jdbc(self): raise NotImplementedError class SparkSql(SparkBase): SPARK_MASTER = 'local[4]' APP_NAME = 'spark-t1' def __init__(self): self.spark = None self.jdbc = None self.init_spark() self.init_jdbc() def init_spark(self): """ 初始化spark :return: """ self.spark = SparkSession.builder \ .master(self.SPARK_MASTER) \ .appName(self.APP_NAME) \ .getOrCreate() def init_jdbc(self): """ 初始化jdbc :return: """ jdbc_url = 'jdbc:mysql://{0}:{1}/{2}'.format( MYSQL_CONF['host'], MYSQL_CONF['port'], MYSQL_CONF['db'] ) self.jdbc = self.spark.read \ .format("jdbc") \ .option("url", jdbc_url) \ .option("user", MYSQL_CONF['user']) \ .option("password", MYSQL_CONF['password']) def load_table_df(self, table_name): """ 读取数据库表的DATAFRAME :param table_name: :return: DATAFRAME """ jdbc_df = self.jdbc.option('dbtable', table_name).load() return jdbc_df def __del__(self): """ 对象任务完成后关闭链接 :return: """ print('关闭spark链接') self.spark.stop() class SparkResource(object): def __init__(self, spark_sql=None): if not isinstance(spark_sql, SparkSql): raise ResourceError('缺少SparkSql实例') self.spark_sql = spark_sql def get_question_diff_distri(self, faculty=None, subject=None): """ 得到某学科和学段下面试题的困难度分布 :param faculty: :param subject: :return: """ if not faculty or not subject: raise ResourceError('缺少faculty或者subject') filter_str = "faculty = {0} and subject = {1}".format( faculty, subject ) # 读取表的dataframe question_df = self.spark_sql.load_table_df('question') # 统计排序 res_df = question_df.filter(filter_str).groupBy( "diff" ).count() return res_df def get_question_freq_top_n(self, n=20, faculty=None, subject=None): """ 得到某学科和学段下面的试卷里面的试题使用频繁度 :param n: :param faculty: :param subject: :return: """ if not faculty or not subject: raise ResourceError('缺少faculty或者subject') filter_str = "faculty = {0} and subject = {1}".format( faculty, subject ) # 读取表的dataframe sub_q_df = self.spark_sql.load_table_df('paper_subtype_question') question_df = self.spark_sql.load_table_df('question') # 生成临时表 sub_q_df.createOrReplaceTempView('tmp_paper_subtype_question') question_df.createOrReplaceTempView('tmp_question') # 检索临时表 df = self.spark_sql.spark.sql( 'SELECT question_id, faculty, subject ' 'from tmp_paper_subtype_question tpsq ' 'LEFT JOIN tmp_question tq ' 'ON tpsq.question_id = tq.qid ' 'WHERE tq.structure_string IS NOT NULL ' ) # 统计排序 res_df = df.filter(filter_str).groupBy( "question_id" ).count().sort('count', ascending=False).limit(n) return res_df def get_knowledge_top_n(self, n=20, faculty=None, subject=None): """ 获得某学科和学段下面的试题获得所绑定知识点的TOP数 :param n: :param faculty: :param subject: :return: """ if not faculty or not subject: raise ResourceError('缺少faculty或者subject') filter_str = "faculty = {0} and subject = {1}".format( faculty, subject ) # 读取表的dataframe cog_map_df = self.spark_sql.load_table_df('question_cognition_map') question_df = self.spark_sql.load_table_df('question') # 生成临时表 cog_map_df.createOrReplaceTempView('tmp_question_cognition_map') question_df.createOrReplaceTempView('tmp_question') # 检索临时表 df = self.spark_sql.spark.sql( 'SELECT cognition_map_num, faculty, subject ' 'from tmp_question_cognition_map tqcm ' 'LEFT JOIN tmp_question tq ' 'ON tqcm.question_id = tq.qid' ) # 统计排序 res_df = df.filter(filter_str).groupBy( "cognition_map_num" ).count().sort('count', ascending=False).limit(n) return res_df def get_paper_info(self): df = self.spark_sql.load_table('paper') df.createOrReplaceTempView("tmp_paper") sqlDF = self.spark_sql.spark.sql("SELECT id FROM tmp_paper") sqlDF.show() start_time = time.time() sr = SparkResource(spark_sql=SparkSql()) knowledge_df = sr.get_knowledge_top_n(faculty=3, subject=1) print(knowledge_df.toJSON().collect()) diff_df = sr.get_question_diff_distri(faculty=3, subject=1) print(diff_df.toJSON().collect()) question_freq = sr.get_question_freq_top_n(faculty=3, subject=1) print(question_freq.toJSON().collect()) print(time.time() - start_time)
试试其它关键字
同语言下
.
比较两个图片的相似度
.
过urllib2获取带有中文参数的url内容
.
不下载获取远程图片的宽度和高度及文件大小
.
通过qrcode库生成二维码
.
通过httplib发送GET和POST请求
.
Django下解决小文件下载
.
遍历windows的所有窗口并输出窗口标题
.
根据窗口标题调用窗口
.
python 抓取搜狗指定公众号
.
pandas读取指定列
可能有用的
.
比较两个图片的相似度
.
过urllib2获取带有中文参数的url内容
.
不下载获取远程图片的宽度和高度及文件大小
.
通过qrcode库生成二维码
.
通过httplib发送GET和POST请求
.
Django下解决小文件下载
.
遍历windows的所有窗口并输出窗口标题
.
根据窗口标题调用窗口
.
python 抓取搜狗指定公众号
.
pandas读取指定列
黎昕
贡献的其它代码
(
12
)
.
操作 SparkSQL (spark版本2.3) JOIN使用临时表版本
.
从HIVE中到出道mysql中
.
请求 Zip 压缩的 HTTP 页面
.
Properties资源文件工具类
.
在UpdatePanel中弹出对话框
.
.pre标签封装代码
.
根据身份证号或营业执照编号取省市区信息
.
原生H5页面模拟APP侧滑删除效果
.
shadow实现的各种漂亮阴影效果
.
使用触发器操作表1(添加,更新,删除) 同步实现表2的
Copyright © 2004 - 2024 dezai.cn. All Rights Reserved
站长博客
粤ICP备13059550号-3