副标题#e#
?
项目背景:
? ? (1) 已有监控系统采用的OpenTSDB方案
? ? (2) ?目前一些大数据应用,尤其是基于spark streaming的流式应用,会实时计算生成一些指标数据,借用监控系统的存储。
? ? (3) 需要前端展示实时分析结果,采用zeppelin展示方式,但是目前zeppelin不支持OpenTSDB后端引擎支持
? ?So,自己开发!
?
一 Interpreter插件流程
? ??? ? 插播: ? ?刚去访问官方发现0.6.0版本发布了!?http://zeppelin.apache.org/docs/0.6.0/
? ? (1) 下载Zeppelin源码
? ? (2) ?创建Zeppelin Maven工程的 Module
????
? ? (3) 添加对zeppelin-interpreter插件包的依赖
????????
? ? ? ? ?由于Zeppelin运行环境已经有了该依赖包,所以我们再创建自定义Interpreter插件的时候只需要在代码中对其依赖,打包过程中不需要打包该包。所以使用provided依赖方式。
? ? ?(4) 添加对OpenTSDB客户端操作API包的依赖
????
? ? ? ?注意:该包为内部开发依赖包
? ? ? (5) 创建实现类继承Zeppelin提供的抽象类org.apache.zeppelin.interpreter.Interpreter;
? ??????????public class TsdInterpreter extends Interpreter
?
? ? ? ?(6) 代码中注册当前插件
? ? ? ? ? 在实现类中添加以下代码实现当前插件的注册
????? ? ? ??static {
? ? ? ? ? ? ? Interpreter.register("tsd","tsd",TsdInterpreter.class.getName());
? ? ? ? ? }
? ? ? ? ? 以tsd名称注册,那么Zeppelin前端在调用OpenTSDB查询的时候,只需要指定后端引擎名称%tsd即可。
? ? ? ?(7) 实现核心抽象方法,即Zeppelin前端提交过来的命令
??????????public InterpreterResult interpret(String cmd,InterpreterContext context)
? ? ? ? ? cmd: 即在Zeppelin交互式界面编写的命令,不包含%tsd
? ? ? ? ? context: 当前插件的上下文,主要包含插件的配置信息,例如操作OpenTSDB的时候就需要从上下文中获取OpenTSDB的IP和端口参数。
????????????
????? ? ? 该方法实现的核心思想就是: ? 解析命令=>实例化OpenTSDB操作客户端=>操作OpenTSDB客户端进行数据查询=> 获取返回结果 封装成InterpreterResult对象。
?
????????? ? 贴核心代码吧:
????????????
? ? ? ? ? Properties intpProperty = getProperty();
? ? ? ? ? for (Object k : intpProperty.keySet()) {
?? ??? ??? ?String key = (String) k;
?? ??? ??? ?String value = (String) intpProperty.get(key);
?? ??? ??? ?if (key.equals("tsd.host") ) {
?? ??? ??? ??? ?host = value;
?? ??? ??? ?} else if (key.equals("tsd.port")) {
?? ??? ??? ??? ?port = value;
?? ??? ??? ?}
?? ??? ?}
? ? ? ? ?propertiesUtil.setOpentsdbIp(host);
? ? ? ? ? propertiesUtil.setPort(Integer.parseInt(port));
? ? ? ? ? ? Scanner scanner = new Scanner(items[1]);
?? ??? ??? ?String start,end,metric,tagsStr;
?? ??? ??? ?if (scanner.hasNext())
?? ??? ??? ??? ?start = scanner.next();
?? ??? ??? ?else {
?? ??? ??? ??? ?return processHelp(InterpreterResult.Code.ERROR,
?? ??? ??? ??? ??? ??? ?"1!Please enter the correct format!");
?? ??? ??? ?}
?? ??? ??? ?
?? ??? ??? ?if (scanner.hasNext())
?? ??? ??? ??? ?end = scanner.next();
?? ??? ??? ?else {
?? ??? ??? ??? ?return processHelp(InterpreterResult.Code.ERROR,
?? ??? ??? ??? ??? ??? ?"2!Please enter the correct format!");
?? ??? ??? ?}
?? ??? ??? ?
?? ??? ??? ?if (scanner.hasNext())
?? ??? ??? ??? ?metric = scanner.next();
?? ??? ??? ?else {
?? ??? ??? ??? ?return processHelp(InterpreterResult.Code.ERROR,
?? ??? ??? ??? ??? ??? ?"3!Please enter the correct format!");
?? ??? ??? ?}
?? ??? ??? ?
?? ??? ??? ?if (scanner.hasNext())
?? ??? ??? ??? ?tagsStr = scanner.next();
?? ??? ??? ?else {
?? ??? ??? ??? ?return processHelp(InterpreterResult.Code.ERROR,
?? ??? ??? ??? ??? ??? ?"4!Please enter the correct format!");
?? ??? ??? ?}
?? ??? ??? ?
?? ??? ??? ?// cpid=tudou,busiid=*,code=1
?? ??? ??? ?String[] tagsStrs = tagsStr.split(",");
?? ??? ??? ?Map<String,String> tags = new HashMap<String,String>();
?? ??? ??? ?for (String s : tagsStrs) {
?? ??? ??? ??? ?int index = s.indexOf('=');
?? ??? ??? ??? ?if (index == -1)
?? ??? ??? ??? ??? ?continue;
?? ??? ??? ??? ?String tagK = s.substring(0,index);
?? ??? ??? ??? ?String tagV = s.substring(index + 1);
?? ??? ??? ??? ?tags.put(tagK,tagV);
?? ??? ??? ?}
?? ??? ??? ?QueryService queryService = new QueryService();
?? ??? ??? ?try {
?? ??? ??? ??? ?List<QueryResponseEntity> responses = queryService
?? ??? ??? ??? ??? ??? ?.queryByMetric(start,tags,null,"sum");
?? ??? ??? ??? ?StringBuffer sb = new StringBuffer();
//?? ??? ??? ??? ?Map<String,String> alldps = new HashMap<String,String>();
?? ??? ??? ??? ?// build header
?? ??? ??? ??? ?Set<String> keys = new HashSet<String>();
?? ??? ??? ??? ?sb.append("time\t");
?? ??? ??? ??? ?for (QueryResponseEntity st : responses) {
?? ??? ??? ??? ??? ?sb.append(st.getTags().toString() + "\t");
?? ??? ??? ??? ??? ?keys.addAll(st.getDps().keySet());
?? ??? ??? ??? ?}
?? ??? ??? ??? ?sb.replace(sb.lastIndexOf("\t"),sb.lastIndexOf("\t") + 1,"\n");
?? ??? ??? ??? ?List<String> keys2 = new ArrayList<String>(keys);
?? ??? ??? ??? ?Collections.sort(keys2);
?? ??? ??? ??? ?// build lines
?? ??? ??? ??? ?Iterator<String> it = keys2.iterator();
?? ??? ??? ??? ?
?? ??? ??? ??? ?long t;
?? ??? ??? ??? ?while (it.hasNext()) {
?? ??? ??? ??? ??? ?String key = it.next(); // 每一行的时间戳
?? ??? ??? ??? ??? ?
?? ??? ??? ??? ??? ?t = Long.parseLong(key);
?? ??? ??? ??? ??? ?sb.append(sdf.format(new Date(t*1000)) + "\t");
?? ??? ??? ??? ??? ?for (QueryResponseEntity st : responses) {
?? ??? ??? ??? ??? ??? ?Map<String,String> dps = st.getDps();
?? ??? ??? ??? ??? ??? ?String value = dps.get(key);
?? ??? ??? ??? ??? ??? ?if (value != null) {
?? ??? ??? ??? ??? ??? ??? ?sb.append(value + "\t");
?? ??? ??? ??? ??? ??? ?} else {
?? ??? ??? ??? ??? ??? ??? ?sb.append(" \t");
?? ??? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ?sb.replace(sb.lastIndexOf("\t"),
?? ??? ??? ??? ??? ??? ??? ?"\n");
?? ??? ??? ??? ?}
?? ??? ??? ??? ?// sb.toString()
?? ??? ??? ??? ?return new InterpreterResult(InterpreterResult.Code.SUCCESS,
?? ??? ??? ??? ??? ??? ?InterpreterResult.Type.TABLE,sb.toString());
????二 插件部署
????????? ? (1) ?实现类的配置 ?
????????????????? ? 在ZEPPELIN_HOME/conf/zeppelin-site.xml
? ? ? ? ? ? ? ? ??
????? ? ? (2) 拷贝OpenTSDB插件包
? ? ? ? ? ? ? ?在ZEPPELIN_HOME/interpreter
? ? ? ? ? ? ? ? 创建文件夹tsd,将所有依赖包拷贝到该文件夹下
? ? ? ? ? ? ? ? ? ??
? ? ? ? ? (3) 重启Zeppelin,在Zeppelin管理界面的 Interpreter中添加 TSD配置
#p#分页标题#e#
????????????????????
?
?
三 ?实现效果
#p#副标题#e#
? ?