关于java:thrift-hive-笔记

100次阅读

共计 4194 个字符,预计需要花费 11 分钟才能阅读完成。


import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
import org.apache.hive.service.rpc.thrift.TCLIService;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.*;
import java.util.concurrent.TimeUnit;

public class MyHiveServer {public static void main(String[] args) throws Exception{

//         PlainSaslHelper.getPlainTransportFactory

//        TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
//
//        saslFactory.addServerDefinition("PLAIN", "CUSTOM", null, new HashMap<String, String>(),new PlainSaslHelper.PlainServerCallbackHandler("CUSTOM"))

//    源码    HiveAuthFactory 第 160 行

        try {System.out.println("服务端开启");
//            transportFactory = new TTransportFactory();
            TCLIService.Processor<ServiceFaceImpl> processor = new TCLIService.Processor<>(new ServiceFaceImpl());
            TServerSocket serverSocket = HiveAuthUtils.getServerSocket("127.0.0.1", 10000);
            TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket).processor(processor)
                    .transportFactory(new TTransportFactory()).protocolFactory(new TBinaryProtocol.Factory())
                    .inputProtocolFactory(new TBinaryProtocol.Factory(true, true, 100*1024*1024, 100*1024*1024))
                    .requestTimeout(100000).requestTimeoutUnit(TimeUnit.SECONDS);

            TThreadPoolServer server = new TThreadPoolServer(sargs);

            // TCP Server
            server.serve();} catch (TTransportException e) {e.printStackTrace();
        }


        System.out.println("服务端已启动:10000");



    }

}

public class ServiceFaceImpl implements TCLIService.Iface {private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);

    @Override
    public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException {TOpenSessionResp resp = new TOpenSessionResp();

        TSessionHandle tSessionHandle = new TSessionHandle(new HandleIdentifier().toTHandleIdentifier());

        resp.setSessionHandle(tSessionHandle);

        System.out.println("OpenSession");

        HashMap<String, String> configurationMap = new HashMap<>();

        configurationMap.put("","");

        resp.setConfiguration(configurationMap);
//        resp.setServerProtocolVersion(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9);
        resp.setStatus(OK_STATUS);

        return resp;
    }


    @Override
    public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException {System.out.println("CloseSession");
        return null;
    }

    @Override
    public TGetInfoResp GetInfo(TGetInfoReq req) throws TException {System.out.println("GetInfo");
        return null;
    }

    @Override
    public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException {String statement = req.getStatement();
        System.out.println(statement);
        System.out.println("ExecuteStatement");
        return null;
    }

    @Override
    public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException {return null;}

    @Override
    public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException {return null;}

    @Override
    public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException {return null;}

    @Override
    public TGetTablesResp GetTables(TGetTablesReq req) throws TException {return null;}

    @Override
    public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws TException {return null;}

    @Override
    public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException {return null;}

    @Override
    public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException {return null;}

    @Override
    public TGetPrimaryKeysResp GetPrimaryKeys(TGetPrimaryKeysReq req) throws TException {return null;}

    @Override
    public TGetCrossReferenceResp GetCrossReference(TGetCrossReferenceReq req) throws TException {return null;}

    @Override
    public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException {return null;}

    @Override
    public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws TException {return null;}

    @Override
    public TCloseOperationResp CloseOperation(TCloseOperationReq req) throws TException {return null;}

    @Override
    public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq req) throws TException {return null;}

    @Override
    public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException {return null;}

    @Override
    public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req) throws TException {return null;}

    @Override
    public TCancelDelegationTokenResp CancelDelegationToken(TCancelDelegationTokenReq req) throws TException {return null;}

    @Override
    public TRenewDelegationTokenResp RenewDelegationToken(TRenewDelegationTokenReq req) throws TException {return null;}
}

正文完
 0