关于flink:Zeppelin-SDK-Flink-平台建设的基石

6次阅读

共计 7684 个字符,预计需要花费 20 分钟才能阅读完成。

用过 Zeppelin 的人应该比拟相熟 Zeppelin 的 UI,因为 Zeppelin 的次要应用场景都是交互式,用户须要手动来操作。那除了这种手动的形式,还有其余的形式吗?如果你不想用 Zeppelin UI,但又想用 Zeppelin 提交和治理大数据作业(比方 Flink Job)的能力该怎么办?或者是你在 Zeppelin 里写好了代码,想定时调度起来,或者集成到其余零碎里,该怎么办?

如果你有这样的诉求,那么 Zeppelin Client API(SDK)就是你所须要的货色。

Zeppelin 简介

对于不相熟 Zeppelin 的人,能够用一句话来解释 Zeppelin:大数据引擎的入口,交互式大数据分析平台底座。Zeppelin 最大的特点是连贯多种引擎,具备可插拔式,上面这张图例举了一些罕用的引擎,当然 Zeppelin 还反对其余很多引擎,这里就不一一例举。

尽管 Zeppelin 有 Rest API,然而 Zeppelin 的 Rest API 太多,对于很多不相熟 Zeppelin 的人来说应用 Rest API 门槛太高,所以 Zeppelin 专门开发了一个 Client API(SDK),不便大家做集成。Zeppelin Client API(SDK)分为 2 个层面的的货色(接下来会一一具体介绍):

  • Zeppelin Client API(Low Level API)
  • Session API(High Level API)

Zeppelin Client API(Low Level API)

Zeppelin Client API 能够在 Note 和 Paragraph 的粒度进行操作。你能够先在 notebook 里写好代码 (比方开发阶段在 notebook 里写代码,做测试),而后用 Low Level API 用编程的形式把 Job 跑起来(比方生产阶段把作业定时调度起来)。Zeppelin Client API 最重要的 class 是 ZeppelinClient,也是 Zeppelin Client API 的入口。上面例举几个重要的接口(这些 API 都比拟直观,我就不多做解释了)。

public String createNote(String notePath) throws Exception 

public void deleteNote(String noteId) throws Exception 

public NoteResult executeNote(String noteId) throws Exception 

public NoteResult executeNote(String noteId, 
                              Map<String, String> parameters) throws Exception
                              
public NoteResult queryNoteResult(String noteId) throws Exception 

public NoteResult submitNote(String noteId) throws Exception

public NoteResult submitNote(String noteId, 
                             Map<String, String> parameters) throws Exception 
                             
public NoteResult waitUntilNoteFinished(String noteId) throws Exception

public String addParagraph(String noteId, 
                           String title, 
                           String text) throws Exception
                           
public void updateParagraph(String noteId, 
                            String paragraphId, 
                            String title, 
                            String text) throws Exception
                            
public ParagraphResult executeParagraph(String noteId,
                                        String paragraphId,
                                        String sessionId,
                                        Map<String, String> parameters) throws Exception
                                        
public ParagraphResult submitParagraph(String noteId,
                                       String paragraphId,
                                       String sessionId,
                                       Map<String, String> parameters) throws Exception
                                       
public void cancelParagraph(String noteId, String paragraphId)
    
public ParagraphResult queryParagraphResult(String noteId, String paragraphId) 
    
public ParagraphResult waitUtilParagraphFinish(String noteId, String paragraphId)

那这些 API 能用来做什么呢?

一个典型的用处是咱们在 Zeppelin 里写好代码,做好测试,而后在第三方零碎里集成进来。比方上面的代码就是把 Zeppelin 自带的 Spark Basic Features 用编程的形式跑起来,你不仅能够跑 Zeppelin Note,还能够拿到运行后果(ParagraphResult)。怎么解决运行后果,就留给你施展设想的空间吧(能够在你的零碎里展现进去,或者可视化进去,或者传给其余零碎做生产等等)。

此外,对于 Dynamic forms(动静控件,比方文本框,下拉框等等),你还能够动静的提供参数,如上面例子里的 maxAge 和 marital。

ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
ZeppelinClient zClient = new ZeppelinClient(clientConfig);

String zeppelinVersion = zClient.getVersion();
System.out.println("Zeppelin version:" + zeppelinVersion);

ParagraphResult paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015259_1403135953");
System.out.println("Execute the 1st spark tutorial paragraph, paragraph result:" + paragraphResult);

paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015302_1492795503");
System.out.println("Execute the 2nd spark tutorial paragraph, paragraph result:" + paragraphResult);

Map<String, String> parameters = new HashMap<>();
parameters.put("maxAge", "40");
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150212-145404_867439529", parameters);
System.out.println("Execute the 3rd spark tutorial paragraph, paragraph result:" + paragraphResult);

parameters = new HashMap<>();
parameters.put("marital", "married");
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150213-230422_1600658137", parameters);
System.out.println("Execute the 4th spark tutorial paragraph, paragraph result:" + paragraphResult);

这上面这张图就是下面咱们要 Zeppelin Client API 跑的 Zeppelin 自带的 Spark Basic Features。

Session API(High Level API)

Session API 是 Zeppelin 的 high level api,Session API 里没有 Note,Paragraph 的概念,粒度是你提交的代码。Session API 里最重要的 class 就是 ZSession,这也是 Session API 的入口,一个 ZSession 代表一个独立的 Zeppelin Interpreter 过程,对于 Flink 来说就是一个独立的 Flink Session Cluster。上面例举一些典型的接口(这些 API 都比拟直观,我就不多做解释了)。

public void start() throws Exception

public void start(MessageHandler messageHandler) throws Exception

public void stop() throws Exception

public ExecuteResult execute(String code) throws Exception

public ExecuteResult execute(String subInterpreter,
                             Map<< span="">String, String> localProperties,
                             String code,
                             StatementMessageHandler messageHandler) throws Exception

public ExecuteResult submit(String code) throws Exception

public ExecuteResult submit(String subInterpreter,
                            Map<< span="">String, String> localProperties,
                            String code,
                            StatementMessageHandler messageHandler) throws Exception
                           
public void cancel(String statementId) throws Exception
 
public ExecuteResult queryStatement(String statementId) throws Exception

public ExecuteResult waitUntilFinished(String statementId) throws Exception

那这个 API 能用来做什么呢?一个典型的用处是就是咱们动态创建 Session(Zeppelin Interpreter 过程),动静的提交运行代码,并拿到运行后果。比方你不想用 Zeppelin 的 UI,要本人做一个 Flink 的开发治理平台,那么你就能够本人做 UI,让用户在 UI 上配置 Flink Job,输出 SQL,而后把所有的这些信息发送到后端,后端调用 ZSession 来运行 Flink Job。

上面的 Java 代码就是用编程的形式调用了 2 条 Flink SQL 语句,并且在 MyStatementMessageHandler1 和 MyStatementMessageHandler2 中读取源源不断发送过去更新的 SQL 运行后果(怎么来应用这个后果就靠你的想象力了)。

须要阐明的是像 Flink Interpreter 这种流式后果数据更新是通过 WebSocket 实现的,所以上面的代码里有会有 CompositeMessageHandler,MyStatementMessageHandler1 以及 MyStatementMessageHandler2,这些 MessageHandler 就是用来解决通过 WebSocket 发送过去的流式数据后果。上面是 2 条咱们在 Zeppelin 里运行的 Flink SQL。

接下来咱们会用 Zeppelin Session API 来跑着这 2 条 Flink SQL,而后咱们会在 MyStatementMessageHandler1,MyStatementMessageHandler2 里拿到后果展现进去。

ZSession session = null;
try {ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
    Map<< span="">String, String> intpProperties = new HashMap<>();

    session = ZSession.builder()
        .setClientConfig(clientConfig)
        .setInterpreter("flink")
        .setIntpProperties(intpProperties)
        .build();

    // CompositeMessageHandler allow you to add StatementMessageHandler for each statement.
    // otherwise you have to use a global MessageHandler.
    session.start(new CompositeMessageHandler());
    System.out.println("Flink Web UI:" + session.getWeburl());

    System.out.println("-----------------------------------------------------------------------------");
    String initCode = IOUtils.toString(FlinkAdvancedExample.class.getResource("/init_stream.scala"));
    ExecuteResult result = session.execute(initCode);
    System.out.println("Job status:" + result.getStatus() + ", data:" + result.getResults().get(0).getData());

    // run flink ssql
    Map<< span="">String, String> localProperties = new HashMap<>();
    localProperties.put("type", "update");
    result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url",
                            new MyStatementMessageHandler1());
    session.waitUntilFinished(result.getStatementId());

    result = session.submit("ssql", localProperties, "select upper(url), count(1) as pv from log group by url",
                            new MyStatementMessageHandler2());
    session.waitUntilFinished(result.getStatementId());

} catch (Exception e) {e.printStackTrace();
} finally {if (session != null) {
        try {session.stop();
        } catch (Exception e) {e.printStackTrace();
        }
    }
}

public static class MyStatementMessageHandler1 implements StatementMessageHandler {

    @Override
    public void onStatementAppendOutput(String statementId, int index, String output) {System.out.println("MyStatementMessageHandler1, append output:" + output);
    }

    @Override
    public void onStatementUpdateOutput(String statementId, int index, String type, String output) {System.out.println("MyStatementMessageHandler1, update output:" + output);
    }
}

public static class MyStatementMessageHandler2 implements StatementMessageHandler {

    @Override
    public void onStatementAppendOutput(String statementId, int index, String output) {System.out.println("MyStatementMessageHandler2, append output:" + output);
    }

    @Override
    public void onStatementUpdateOutput(String statementId, int index, String type, String output) {System.out.println("MyStatementMessageHandler2, update output:" + output);
    }
}

除了编程形式跑 Flink Job,这个 Session API 还能给咱们带来什么呢?

在 Zeppelin 里如果你能够通过 %flink.conf 来对你的 Flink Cluster 进行十分丰盛的配置,然而 %flink.conf 是纯文本的配置,不相熟 Flink 的人很容易配错(如下图)。如果你是本人做 Flink 开发平台的话就能够做一个更残缺的 UI,用一些下拉框等等把一些配置选项固定下来,用户只有抉择就行了,不须要本人输出文原本配置。

还有上面这类 paragraph 的 local properties 配置,比方 type,template, resumeFromLatestCheckpoint 也是比拟容易写错的,同理你能够在本人 UI 里用一些控件把这些选项提前固定下来,而不是让用户输出文本的形式。

我置信 Zeppelin Client API 还有很多能够施展和设想的空间,大家脑洞起来吧。

更多 Zeppelin 技术干货及应用交换可退出 Flink on Zeppelin 钉钉群。

(钉钉扫码加群)

正文完
 0