代码语言
.
CSharp
.
JS
Java
Asp.Net
C
MSSQL
PHP
Css
PLSQL
Python
Shell
EBS
ASP
Perl
ObjC
VB.Net
VBS
MYSQL
GO
Delphi
AS
DB2
Domino
Rails
ActionScript
Scala
代码分类
文件
系统
字符串
数据库
网络相关
图形/GUI
多媒体
算法
游戏
Jquery
Extjs
Android
HTML5
菜单
网页交互
WinForm
控件
企业应用
安全与加密
脚本/批处理
开放平台
其它
【
Java
】
hive udf 批量写入redis
作者:
/ 发布于
2017/9/19
/
559
import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.*; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.IntWritable; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; import redis.clients.jedis.Pipeline; import java.io.IOException; import java.util.HashMap; import java.util.Map; @Description(name = "redis_batch_hset", value = "_FUNC_(host_and_port,keyField, array<map>) - Return ret " ) public class RedisBatchHSetUDF extends GenericUDF { private HostAndPort hostAndPort; private String keyField; private Object writableKeyField; //实际上是org.apache.hadoop.io.Text类型 private StandardListObjectInspector paramsListInspector; private StandardMapObjectInspector paramsElementInspector; @Override public Object evaluate(DeferredObject[] arg0) throws HiveException { try (Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), 10000, 60000); Pipeline pipeline = jedis.pipelined() ) { for (int i = 0; i < paramsListInspector.getListLength(arg0[2].get()); i++) { Object row = paramsListInspector.getListElement(arg0[2].get(), i); Map<?, ?> map = paramsElementInspector.getMap(row); // Object obj = ObjectInspectorUtils.copyToStandardJavaObject(row,paramsElementInspector); //转成标准的java map,否则里面的key value字段为hadoop writable对象 if (map.containsKey(writableKeyField)) { String did = map.get(writableKeyField).toString(); Map<String, String> data = new HashMap<>(); for (Map.Entry<?, ?> entry : map.entrySet()) { if (!writableKeyField.equals(entry.getKey()) && entry.getValue() != null && !"".equals(entry.getValue().toString())) { data.put(entry.getKey().toString(), entry.getValue().toString()); } } pipeline.hmset(did,data); } } pipeline.sync(); return new IntWritable(1); } catch (IOException e) { e.printStackTrace(); throw new HiveException(e); } } @Override public String getDisplayString(String[] arg0) { return "redis_batch_hset(redishost_and_port,keyField, array<map<string,string>>)"; } @Override public ObjectInspector initialize(ObjectInspector[] arg0) throws UDFArgumentException { if (arg0.length != 3) { throw new UDFArgumentException(" Expecting two arguments:<redishost:port> <keyField> array<map<string,string>> "); } //第一个参数校验 if (arg0[0].getCategory() == Category.PRIMITIVE && ((PrimitiveObjectInspector) arg0[0]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) { if (!(arg0[0] instanceof ConstantObjectInspector)) { throw new UDFArgumentException("redis host:port must be constant"); } ConstantObjectInspector redishost_and_port = (ConstantObjectInspector) arg0[0]; String[] host_and_port = redishost_and_port.getWritableConstantValue().toString().split(":"); hostAndPort = new HostAndPort(host_and_port[0], Integer.parseInt(host_and_port[1])); } //第2个参数校验 if (arg0[1].getCategory() == Category.PRIMITIVE && ((PrimitiveObjectInspector) arg0[1]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) { if (!(arg0[1] instanceof ConstantObjectInspector)) { throw new UDFArgumentException("redis hset key must be constant"); } ConstantObjectInspector keyFieldOI = (ConstantObjectInspector) arg0[1]; keyField = keyFieldOI.getWritableConstantValue().toString(); writableKeyField = keyFieldOI.getWritableConstantValue(); } //第3个参数校验 if (arg0[2].getCategory() != Category.LIST) { throw new UDFArgumentException(" Expecting an array<map<string,string>> field as third argument "); } ListObjectInspector third = (ListObjectInspector) arg0[2]; if (third.getListElementObjectInspector().getCategory() != Category.MAP) { throw new UDFArgumentException(" Expecting an array<map<string,string>> field as third argument "); } paramsListInspector = ObjectInspectorFactory.getStandardListObjectInspector(third.getListElementObjectInspector()); paramsElementInspector = (StandardMapObjectInspector) third.getListElementObjectInspector(); System.out.println(paramsElementInspector.getMapKeyObjectInspector().getCategory()); System.out.println(paramsElementInspector.getMapValueObjectInspector().getCategory()); return PrimitiveObjectInspectorFactory.writableIntObjectInspector; } }
试试其它关键字
同语言下
.
List 切割成几份 工具类
.
一行一行读取txt的内容
.
Java PDF转换成图片并输出给前台展示
.
java 多线程框架
.
double类型如果小数点后为零则显示整数否则保留两位小
.
将图片转换为Base64字符串公共类抽取
.
sqlParser 处理SQL(增删改查) 替换schema 用于多租户
.
JAVA 月份中的第几周处理 1-7属于第一周 依次类推 29-
.
java计算两个经纬度之间的距离
.
输入时间参数计算年龄
可能有用的
.
List 切割成几份 工具类
.
一行一行读取txt的内容
.
Java PDF转换成图片并输出给前台展示
.
java 多线程框架
.
double类型如果小数点后为零则显示整数否则保留两位小
.
将图片转换为Base64字符串公共类抽取
.
sqlParser 处理SQL(增删改查) 替换schema 用于多租户
.
JAVA 月份中的第几周处理 1-7属于第一周 依次类推 29-
.
java计算两个经纬度之间的距离
.
输入时间参数计算年龄
贡献的其它代码
Label
Copyright © 2004 - 2024 dezai.cn. All Rights Reserved
站长博客
粤ICP备13059550号-3