共计 12037 个字符,预计需要花费 31 分钟才能阅读完成。
概述
Thrift 是一个可互操作和可伸缩服务的框架,用来进行可扩大且跨语言的服务的开发。它联合了功能强大的软件堆栈和代码生成引擎,以构建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 等等编程语言间无缝联合的、高效的服务。
Thrift 最后由 facebook 开发,07 年四月开放源码,08 年 5 月进入 apache 孵化器。thrift 容许你定义一个简略的定义文件中的数据类型和服务接口(IDL)。以作为输出文件,编译器生成代码用来不便地生成 RPC 客户端和服务器通信的无缝跨编程语言。
其传输数据采纳二进制格局,绝对于 XML 和 JSON 等序列化形式体积更小,对于高并发、大数据量和多语言的环境更有劣势。Thrift 它含有三个次要的组件:protocol,transport 和 server,其中,protocol 定义了音讯是怎么序列化的,transport 定义了音讯是怎么在客户端和服务器端之间通信的,server 用于从 transport 接管序列化的音讯,依据 protocol 反序列化之,调用用户定义的音讯处理器,并序列化音讯处理器的响应,而后再将它们写回 transport。
官网地址:thrift.apache.org
基本概念
架构图
堆栈的顶部是从 Thrift 定义文件生成的代码。Thrift 服务生成的客户端和处理器代码。这些由图中的棕色框示意。红色框为发送的数据结构(内置类型除外)也会生成代码。协定和传输是 Thrift 运行时库的一部分。因而应用 Thrift 能够定义服务,并且能够自在更改协定和传输,而无需从新生成代码。Thrift 还包含一个服务器根底构造,用于将协定和传输绑定在一起。有可用的阻塞,非阻塞,单线程和多线程服务器。堆栈的“底层 I / O”局部依据所开发语言而有所不同。对于 Java 和 Python 网络 I / O,Thrift 库利用内置库,而 C ++ 实现应用本人的自定义实现。
数据类型:
根本类型:
bool:布尔值,true 或 false,对应 Java 的 boolean
byte:8 位有符号整数,对应 Java 的 byte
i16:16 位有符号整数,对应 Java 的 short
i32:32 位有符号整数,对应 Java 的 int
i64:64 位有符号整数,对应 Java 的 long
double:64 位浮点数,对应 Java 的 double
string:未知编码文本或二进制字符串,对应 Java 的 String
构造体类型:
struct:定义公共的对象,相似于 C 语言中的构造体定义,在 Java 中是一个 JavaBean
汇合类型:
list:对应 Java 的 ArrayList
set:对应 Java 的 HashSet
map:对应 Java 的 HashMap
异样类型:
exception:对应 Java 的 Exception
服务类型:
service:对应服务的类
数据传输层 Transport
TSocket —— 应用阻塞式 I/O 进行传输,是最常见的模式
TFramedTransport —— 应用非阻塞形式,按块的大小进行传输,相似于 Java 中的 NIO,若应用 TFramedTransport 传输层,其服务器必须批改为非阻塞的服务类型
TNonblockingTransport —— 应用非阻塞形式,用于构建异步客户端
数据传输协定 Protocol
Thrift 能够让用户抉择客户端与服务端之间传输通信协议的类别,在传输协定上总体划分为文本 (text) 和二进制 (binary) 传输协定,为节约带宽,进步传输效率,个别状况下应用二进制类型的传输协定为少数,有时还会应用基于文本类型的协定,这须要依据我的项目 / 产品中的理论需要。
罕用协定有以下几种:
TBinaryProtocol : 二进制格局.
TCompactProtocol : 高效率的、密集的二进制压缩格局
TJSONProtocol : JSON 格局
TSimpleJSONProtocol : 提供 JSON 只写协定, 生成的文件很容易通过脚本语言解析
留神:客户端和服务端的协定要统一。
服务器类型 Server
TSimpleServer ——单线程服务器端应用规范的阻塞式 I/O,个别用于测试。
TThreadPoolServer —— 多线程服务器端应用规范的阻塞式 I/O,事后创立一组线程解决申请。
TNonblockingServer —— 多线程服务器端应用非阻塞式 I/O,服务端和客户端须要指定 TFramedTransport 数据传输的形式。
THsHaServer —— 半同步半异步的服务端模型,须要指定为:TFramedTransport 数据传输的形式。它应用一个独自的线程来解决网络 I /O,一个独立的 worker 线程池来解决音讯。这样,只有有闲暇的 worker 线程,音讯就会被立刻解决,因而多条音讯能被并行处理。
TThreadedSelectorServer —— TThreadedSelectorServer 容许你用多个线程来解决网络 I /O。它保护了两个线程池,一个用来解决网络 I /O,另一个用来进行申请的解决。当网络 I / O 是瓶颈的时候,TThreadedSelectorServer 比 THsHaServer 的体现要好。
实现逻辑
服务端
实现服务解决接口 impl
创立 TProcessor 创立 TServerTransport 创立 TProtocol 创立 TServer 启动 Server
客户端
创立 Transport 创立 TProtocol 基于 TTransport 和 TProtocol 创立 Client 调用 Client 的相应办法
ThriftServerDemo 实例
新建 Maven 我的项目,并且增加 thrift 依赖包
<dependencies>
<dependency>
<groupId>
org.apache.thrift
</groupId>
<artifactId>
libthrift
</artifactId>
<version>
0.9.3
</version>
</dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-log4j12
</artifactId>
<version>
1.7.12
</version>
</dependency>
<dependency>
<groupId>
org.apache.logging.log4j
</groupId>
<artifactId>
log4j-api
</artifactId>
<version>
2.7
</version>
</dependency>
<dependency>
<groupId>
org.apache.logging.log4j
</groupId>
<artifactId>
log4j-core
</artifactId>
<version>
2.7
</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-compiler-plugin
</artifactId>
<version>
3.3
</version>
<configuration>
<source>
1.8
</source>
<target>
1.8
</target>
<encoding>
utf-8
</encoding>
</configuration>
</plugin>
</plugins>
</build>
编写 IDL 接口并生成接口文件
namespace java thrift
.
service
// 计算类型 – 仅限整数四则运算
enum
ComputeType
{
ADD
=
0
;
SUB
=
1
;
MUL
=
2
;
DIV
=
3
;
}
// 服务申请
struct
ComputeRequest
{
1
:
required i64 x
;
2
:
required i64 y
;
3
:
required
ComputeType
computeType
;
}
// 服务响应
struct
ComputeResponse
{
1
:
required i32 errorNo
;
2
:
optional string errorMsg
;
3
:
required i64 computeRet
;
}
service
ComputeServer
{
ComputeResponse
getComputeResult
(
1
:
ComputeRequest
request
);
}
执行编译命令:
thrift
–
0.11
.
0.exe
–
r
–
gen java computeServer
.
thrift
拷贝生成的 Service 类文件到 IDEA
服务端接口实现
public
class
ThriftTestImpl
implements
ComputeServer
.
Iface
{
private
static
final
Logger
logger
=
LogManager
.
getLogger
(
ThriftTestImpl
.
class
);
public
ComputeResponse
getComputeResult
(
ComputeRequest
request
)
{
ComputeType
computeType
=
request
.
getComputeType
();
long
x
=
request
.
getX
();
long
y
=
request
.
getY
();
logger
.
info
(
“get compute result begin. [x:{}] [y:{}] [type:{}]”
,
x
,
y
,
computeType
.
toString
());
long
begin
=
System
.
currentTimeMillis
();
ComputeResponse
response
=
new
ComputeResponse
();
response
.
setErrorNo
(
0
);
try
{
long
ret
;
if
(
computeType
ComputeType
.
ADD
)
{
ret
=
add
(
x
,
y
);
response
.
setComputeRet
(
ret
);
}
else
if
(
computeType
ComputeType
.
SUB
)
{
ret
=
sub
(
x
,
y
);
response
.
setComputeRet
(
ret
);
}
else
if
(
computeType
ComputeType
.
MUL
)
{
ret
=
mul
(
x
,
y
);
response
.
setComputeRet
(
ret
);
}
else
{
ret
=
div
(
x
,
y
);
response
.
setComputeRet
(
ret
);
}
}
catch
(
Exception
e
)
{
response
.
setErrorNo
(
1001
);
response
.
setErrorMsg
(
e
.
getMessage
());
logger
.
error
(
“exception:”
,
e
);
}
long
end
=
System
.
currentTimeMillis
();
logger
.
info
(
“get compute result end. [errno:{}] cost:[{}ms]”
,
response
.
getErrorNo
(),
(
end
–
begin
));
return
response
;
}
private
long
add
(
long
x
,
long
y
)
{
return
x
+
y
;
}
private
long
sub
(
long
x
,
long
y
)
{
return
x
–
y
;
}
private
long
mul
(
long
x
,
long
y
)
{
return
x
*
y
;
}
private
long
div
(
long
x
,
long
y
)
{
return
x
/
y
;
}
}
服务端实现
public
class
ServerMain
{
private
static
final
Logger
logger
=
LogManager
.
getLogger
(
ServerMain
.
class
);
public
static
void
main
(
String
[]
args
)
{
try
{
// 实现服务解决接口 impl
ThriftTestImpl
workImpl
=
new
ThriftTestImpl
();
// 创立 TProcessor
TProcessor
tProcessor
=
new
ComputeServer
.
Processor
<
ComputeServer
.
Iface
(
workImpl
);
// 创立 TServerTransport, 非阻塞式 I/O,服务端和客户端须要指定 TFramedTransport 数据传输的形式
final
TNonblockingServerTransport
transport
=
new
TNonblockingServerSocket
(
9999
);
// 创立 TProtocol
TThreadedSelectorServer
.
Args
ttpsArgs
=
new
TThreadedSelectorServer
.
Args
(
transport
);
ttpsArgs
.
transportFactory
(
new
TFramedTransport
.
Factory
());
// 二进制格局反序列化
ttpsArgs
.
protocolFactory
(
new
TBinaryProtocol
.
Factory
());
ttpsArgs
.
processor
(
tProcessor
);
ttpsArgs
.
selectorThreads
(
16
);
ttpsArgs
.
workerThreads
(
32
);
logger
.
info
(
“compute service server on port :”
+
9999
);
// 创立 TServer
TServer
server
=
new
TThreadedSelectorServer
(
ttpsArgs
);
// 启动 Server
server
.
serve
();
}
catch
(
Exception
e
)
{
logger
.
error
(
e
);
}
}
}
服务端整体代码构造
log4j2.xml 配置文件
<?
xml version
=
“1.0”
encoding
=
“UTF-8”
?>
<!– 日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL –>
<!–Configuration 前面的 status,这个用于设置 log4j2 本身外部的信息输入,能够不设置,当设置成 trace 时,你会看到 log4j2 外部各种具体输入 –>
<!–monitorInterval:Log4j 可能自动检测批改配置 文件和重新配置自身,设置距离秒数 –>
<configuration
status
=
“INFO”
monitorInterval
=
“30”
<!– 先定义所有的 appender–>
<appenders>
<!– 这个输入控制台的配置 –>
<console
name
=
“Console”
target
=
“SYSTEM_OUT”
<!– 输入日志的格局 –>
<PatternLayout
pattern
=
“%highlight{[%p] [%-d{yyyy-MM-dd HH:mm:ss}] [%l] %m%n}”
/>
</console>
<RollingFile
name
=
“RollingFileInfo”
fileName
=
“log/log.log”
filePattern
=
“log/log.log.%d{yyyy-MM-dd}”
<!– 只承受 level=INFO 以上的日志 –>
<ThresholdFilter
level
=
“info”
onMatch
=
“
ACCEPT
“
onMismatch
=
“
DENY
“
/>
<PatternLayout
pattern
=
“[%p] [%-d{yyyy-MM-dd HH:mm:ss}] [LOGID:%X{logid} ] [%l] %m%n”
/>
<Policies>
<TimeBasedTriggeringPolicy
modulate
=
“true”
interval
=
“1”
/>
<SizeBasedTriggeringPolicy/>
</Policies>
</RollingFile>
<RollingFile
name
=
“RollingFileError”
fileName
=
“log/error.log”
filePattern
=
“log/error.log.%d{yyyy-MM-dd}”
<!– 只承受 level=WARN 以上的日志 –>
<Filters>
<ThresholdFilter
level
=
“warn”
onMatch
=
“
ACCEPT
“
onMismatch
=
“
DENY
“
/>
</Filters>
<PatternLayout
pattern
=
“[%p] %-d{yyyy-MM-dd HH:mm:ss} [%t:%r] [%l] %m%n”
/>
<Policies>
<TimeBasedTriggeringPolicy
modulate
=
“true”
interval
=
“1”
/>
<SizeBasedTriggeringPolicy/>
</Policies>
</RollingFile>
</appenders>
<!– 而后定义 logger,只有定义了 logger 并引入的 appender,appender 才会失效 –>
<loggers>
<!– 过滤掉 spring 和 mybatis 的一些无用的 DEBUG 信息 –>
<logger
name
=
“org.springframework”
level
=
“INFO”
</logger>
<logger
name
=
“org.mybatis”
level
=
“INFO”
</logger>
<root
level
=
“all”
<appender-ref
ref
=
“Console”
/>
<appender-ref
ref
=
“RollingFileInfo”
/>
<appender-ref
ref
=
“RollingFileError”
/>
</root>
</loggers>
</configuration>
Jmeter 测试类编写
利用 JMeter 调用 Java 测试类去调用对应的后盾服务,并记住每次调用并获取反馈值的 RT,ERR%,只须要依照单线程的形式去实现测试业务,也无需增加各种埋点收集数据
新建一个 JavaMaven 工程, 增加 JMeter 及 thrift 依赖包
<dependencies>
<dependency>
<groupId>
org.apache.jmeter
</groupId>
<artifactId>
ApacheJMeter_core
</artifactId>
<version>
4.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.jmeter
</groupId>
<artifactId>
ApacheJMeter_java
</artifactId>
<version>
4.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.thrift
</groupId>
<artifactId>
libthrift
</artifactId>
<version>
0.11.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.logging.log4j
</groupId>
<artifactId>
log4j-api
</artifactId>
<version>
2.11.1
</version>
</dependency>
<dependency>
<groupId>
org.apache.logging.log4j
</groupId>
<artifactId>
log4j-core
</artifactId>
<version>
2.11.1
</version>
</dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-log4j12
</artifactId>
<version>
1.7.25
</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-compiler-plugin
</artifactId>
<version>
3.7.0
</version>
<configuration>
<source>
1.8
</source>
<target>
1.8
</target>
<encoding>
utf-8
</encoding>
</configuration>
</plugin>
</plugins>
</build>
ThriftClient 测试类编写
public
class
ThriftClient
{
private
ComputeServer
.
Client
client
=
null
;
private
TTransport
tTransport
=
null
;
public
ThriftClient
(
String
ip
,
int
port
){
try
{
TTransport
tTransport
=
new
TFramedTransport
(
new
TSocket
(
ip
,
port
));
tTransport
.
open
();
TProtocol
tProtocol
=
new
TBinaryProtocol
(
tTransport
);
client
=
new
ComputeServer
.
Client
(
tProtocol
);
}
catch
(
TTransportException
e
)
{
e
.
printStackTrace
();
}
}
public
ComputeResponse
getResponse
(
ComputeRequest
request
){
try
{
ComputeResponse
response
=
client
.
getComputeResult
(
request
);
return
response
;
}
catch
(
TException
e
)
{
e
.
printStackTrace
();
return
null
;
}
}
public
void
close
(){
if
(
tTransport
!=
null
&&
tTransport
.
isOpen
()){
tTransport
.
close
();
}
}
}
留神:须要把编写 IDL 接口文件拷贝到工程里
新建一个 JavaClass,如下例中的 TestThriftByJmeter,并继承 AbstractJavaSamplerClient。AbstractJavaSamplerClient 中默认实现了四个能够笼罩的办法,别离是 getDefaultParameters(),setupTest(),runTest()和 teardownTest()办法。
getDefaultParameters 办法次要用于设置传入界面的参数;
setupTest 办法为初始化办法,用于初始化性能测试时的每个线程;
runTest 办法为性能测试时的线程运行体;
teardownTest 办法为测试完结办法,用于完结性能测试中的每个线程。
编写 TestThriftByJmeter 测试类
public
class
TestThriftByJmeter
extends
AbstractJavaSamplerClient
{
private
ThriftClient
client
;
private
ComputeRequest
request
;
private
ComputeResponse
response
;
// 设置传入界面的参数
@Override
public
Arguments
getDefaultParameters
(){
Arguments
arguments
=
new
Arguments
();
arguments
.
addArgument
(
“ip”
,
“172.16.14.251”
);
arguments
.
addArgument
(
“port”
,
“9999”
);
arguments
.
addArgument
(
“X”
,
“0”
);
arguments
.
addArgument
(
“Y”
,
“0”
);
arguments
.
addArgument
(
“type”
,
“0”
);
return
arguments
;
}
// 初始化办法
@Override
public
void
setupTest
(
JavaSamplerContext
context
){
// 获取 Jmeter 中设置的参数
String
ip
=
context
.
getParameter
(
“ip”
);
int
port
=
context
.
getIntParameter
(
“port”
);
int
x
=
context
.
getIntParameter
(
“X”
);
int
y
=
context
.
getIntParameter
(
“Y”
);
ComputeType
type
=
ComputeType
.
findByValue
(
context
.
getIntParameter
(
“type”
));
// 创立客户端
client
=
new
ThriftClient
(
ip
,
port
);
// 设置 request 申请
request
=
new
ComputeRequest
(
x
,
y
,
type
);
super
.
setupTest
(
context
);
}
// 性能测试线程运行体
@Override
public
SampleResult
runTest
(
JavaSamplerContext
context
)
{
SampleResult
result
=
new
SampleResult
();
// 开始统计响应工夫标记
result
.
sampleStart
();
try
{
long
begin
=
System
.
currentTimeMillis
();
response
=
client
.
getResponse
(
request
);
long
cost
=
(
System
.
currentTimeMillis
()
–
begin
);
// 打印工夫戳差值。Java 申请响应工夫
System
.
out
.
println
(
response
.
toString
()+
” 总计破费:[“
+
cost
+
“ms]”
);
if
(
response
null
){
// 设置测试后果为 fasle
result
.
setSuccessful
(
false
);
return
result
;
}
if
(
response
.
getErrorNo
()
==
0
){
// 设置测试后果为 true
result
.
setSuccessful
(
true
);
}
else
{
result
.
setSuccessful
(
false
);
result
.
setResponseMessage
(
“ERROR”
);
}
}
catch
(
Exception
e
){
result
.
setSuccessful
(
false
);
result
.
setResponseMessage
(
“ERROR”
);
e
.
printStackTrace
();
}
finally
{
// 完结统计响应工夫标记
result
.
sampleEnd
();
}
return
result
;
}
// 测试完结办法
public
void
tearDownTest
(
JavaSamplerContext
context
)
{
if
(
client
!=
null
)
{
client
.
close
();
}
super
.
teardownTest
(
context
);
}
}
特地阐明:
result
.
setSamplerLabel
(
“7D”
);
// 设置 java Sampler 的题目
result
.
setResponseOK
();
// 设置响应胜利
result
.
setResponseData
();
// 设置响应内容
编写测试 Run Main 办法
public
class
RunMain
{
public
static
void
main
(
String
[]
args
)
{
Arguments
arguments
=
new
Arguments
();
arguments
.
addArgument
(
“ip”
,
“172.16.14.251”
);
arguments
.
addArgument
(
“port”
,
“9999”
);
arguments
.
addArgument
(
“X”
,
“1”
);
arguments
.
addArgument
(
“Y”
,
“3”
);
arguments
.
addArgument
(
“type”
,
“0”
);
JavaSamplerContext
context
=
new
JavaSamplerContext
(
arguments
);
TestThriftByJmeter
jmeter
=
new
TestThriftByJmeter
();
jmeter
.
setupTest
(
context
);
jmeter
.
runTest
(
context
);
jmeter
.
tearDownTest
(
context
);
}
}
测试后果通过
应用 mvn cleanpackage 打包测试代码
应用 mvn dependency:copy-dependencies-DoutputDirectory=lib 复制所依赖的 jar 包都会到我的项目下的 lib 目录下
复制测试代码 jar 包到 jmeter\lib\ext 目录下,复制依赖包到 jmeter\lib 目录下
这里有两点须要留神:
如果你的 jar 依赖了其余第三方 jar,须要将其一起放到 lib/ext 下,否则会呈现 ClassNotFound 谬误
如果在将 jar 放入 lib/ext 后,你还是无奈找到你编写的类,且此时你是开着 JMeter 的,则须要重启一下 JMeter
关上 Jmeter,在增加 Java 申请时,留神要抉择 Jmeter 测试类,上面的列表中能够看到参数和默认值。
上面咱们将进行性能压测,设置线程组,设置 10 个并发线程。
服务端日志: