关于java:hive-thrift-demo001

7次阅读

共计 5725 个字符,预计需要花费 15 分钟才能阅读完成。




import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
import org.apache.hive.service.auth.PlainSaslHelper;
import org.apache.hive.service.rpc.thrift.*;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.Subject;
import javax.security.sasl.SaslException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;

/**
 * HiveConnection.
 *
 */
public class HiveXConn {public static final Logger LOG = LoggerFactory.getLogger(HiveXConn.class.getName());

    private String jdbcUriString;
    private String host;
    private int port;

    private JdbcConnectionParams connParams;

    private TTransport transport;

    // TODO should be replaced by CliServiceClient
    private TCLIService.Iface client;
    private TSessionHandle sessHandle = null;
    private int loginTimeout = 0;
    private TProtocolVersion protocol;
    private int fetchSize = 1000;






    public HiveXConn(String uri, Properties info) throws Exception {



        // JDBC URL: jdbc:hive2://<host>:<port>/dbName;sess_var_list?hive_conf_list#hive_var_list
        // each list: <key1>=<val1>;<key2>=<val2> and so on
        // sess_var_list -> sessConfMap
        // hive_conf_list -> hiveConfMap
        // hive_var_list -> hiveVarMap
        host = "192.168.11.9";
        port = 10000;


        // open the client transport
        openTransport();
        // set up the client
        client = new TCLIService.Client(new TBinaryProtocol(transport));
        // open client session
        openSession();

        client = newSynchronizedClient(client);



    }


    public static TCLIService.Iface newSynchronizedClient(TCLIService.Iface client) {return (TCLIService.Iface) Proxy.newProxyInstance(org.apache.hive.jdbc.HiveConnection.class.getClassLoader(),
                new Class [] { TCLIService.Iface.class},
                new SynchronizedHandler(client));
    }




    private void openTransport() throws Exception {transport =  createBinaryTransport();
        if (!transport.isOpen()) {transport.open();
        }
    }

//    https://github.com/ds112/hbase-on-windows/blob/77e5f31715f3b4a258f212b242cd698ad983af60/Samples/Java/Hive/ThriftAPI/src/main/java/Client.java


    private TTransport createBinaryTransport() throws SaslException {
        // we are using PLAIN Sasl connection with user/password
        String userName = "hive";
        String passwd = "hive";

        TTransport  socketTransport = HiveAuthUtils.getSocketTransport(host, port, loginTimeout);
        transport = PlainSaslHelper.getPlainTransport(userName, passwd, socketTransport);
        return transport;
    }



    private void openSession() throws SQLException {TOpenSessionReq openReq = new TOpenSessionReq();

        Map<String, String> openConf = new HashMap<String, String>();
        openConf.put("use:database", "default");
        // set the fetchSize
        openConf.put("set:hiveconf:hive.server2.thrift.resultset.default.fetch.size",
                Integer.toString(fetchSize));
        openReq.setConfiguration(openConf);


        try {TOpenSessionResp openResp = client.OpenSession(openReq);

            protocol = openResp.getServerProtocolVersion();

            sessHandle = openResp.getSessionHandle();

            TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, "SELECT * FROM test limit 10");
//            TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, "show tables");
            TExecuteStatementResp execResp = client.ExecuteStatement(execReq);

            TOperationHandle stmtHandle = execResp.getOperationHandle();


            TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, TFetchOrientation.FETCH_FIRST, 100);
            TFetchResultsResp resultsResp = client.FetchResults(fetchReq);

            TRowSet resultsSet = resultsResp.getResults();

            List<List> tableResult = new ArrayList<List>();
            if (resultsSet.getColumns() != null && resultsSet.getColumns().size() != 0)
            {List<TColumn> resultColumns = resultsSet.getColumns();
                for(int i=0;i<resultColumns.size();i++ )
                {TColumn resultRow = resultColumns.get(i);
                    List result = new ArrayList();
                    if(resultRow.isSetBinaryVal() == true)
                    {result = resultRow.getBinaryVal().getValues();}
                    else if(resultRow.isSetBoolVal() == true)
                    {result =resultRow.getBoolVal().getValues();}
                    else if(resultRow.isSetByteVal() == true)
                    {result =resultRow.getByteVal().getValues();}
                    else if(resultRow.isSetDoubleVal() == true)
                    {result=resultRow.getDoubleVal().getValues();}
                    else if(resultRow.isSetI16Val() == true)
                    {result =resultRow.getI16Val().getValues();}
                    else if(resultRow.isSetI32Val() == true)
                    {result =resultRow.getI32Val().getValues();}
                    else if(resultRow.isSetI64Val() == true)
                    {result =resultRow.getI64Val().getValues();}
                    else if(resultRow.isSetStringVal()==true)
                    {result = resultRow.getStringVal().getValues();}
                    tableResult.add(result);
                }
            }
            for(int i=0;i<tableResult.get(0).size();i++)
            {for (List list : tableResult) {System.out.print(list.get(i).toString() + "\t");
                }
                System.out.println();}



            TCloseOperationReq closeReq = new TCloseOperationReq();
            closeReq.setOperationHandle(stmtHandle);
            client.CloseOperation(closeReq);
            TCloseSessionReq closeConnectionReq = new TCloseSessionReq(sessHandle);
            client.CloseSession(closeConnectionReq);

            transport.close();} catch (TException e) {LOG.error("Error opening session", e);
            throw new SQLException("Could not establish connection to"
                    + jdbcUriString + ":" + e.getMessage(), "08S01", e);
        }
    }




    private static class SynchronizedHandler implements InvocationHandler {
        private final TCLIService.Iface client;
        private final ReentrantLock lock = new ReentrantLock(true);

        SynchronizedHandler(TCLIService.Iface client) {this.client = client;}

        @Override
        public Object invoke(Object proxy, Method method, Object [] args)
                throws Throwable {
            try {lock.lock();
                return method.invoke(client, args);
            } catch (InvocationTargetException e) {
                // all IFace APIs throw TException
                if (e.getTargetException() instanceof TException) {throw (TException)e.getTargetException();} else {
                    // should not happen
                    throw new TException("Error in calling method" + method.getName(),
                            e.getTargetException());
                }
            } catch (Exception e) {throw new TException("Error in calling method" + method.getName(), e);
            } finally {lock.unlock();
            }
        }
    }

    public static void main(String[] args) throws Exception{HiveXConn d1 = new HiveXConn("",null);


    }
}
正文完
 0