我的博客
想必大家都晓得,并发编程是一项十分重要的技术,无论在面试,还是工作中呈现的频率十分高。
当然,如果是集体级别的小玩意,必定不是特地在意这个。
并发编程 ==
多线程编程。
然而多线程肯定比单线程效率更高?
不肯定,得看具体的业务环境,多个线程之间,会一直的抢占
CPU
的资源,所以说有时候多线程也是绝对耗费工夫的。
并发编程场景
1. 定时工作
你可能纳闷?Thread
类真的能做定时工作吗?
答案是必定的。一些
定时工作框架
的源码,它们的底层也会应用 Thread 类。
public static void init() {new Thread(() -> {while (true) {
try {System.out.println("下载文件");
Thread.sleep(1000 * 60 * 5);
} catch (Exception e) {log.error(e);
}
}
}).start();}
应用 Thread
类能够做最简略的定时工作,在 run
办法中有个 while
的死循环(当然还有其余形式),执行咱们本人的工作。有个须要特地留神的中央是,须要用 try...catch
捕捉异样,否则如果出现异常,就间接退出循环,下次将无奈继续执行了。
但这种形式做的定时工作,只能周期性执行,不能反对定时在某个工夫点执行。
特地揭示一下,该线程倡议定义成 守护线程
,能够通过setDaemon
办法设置,让它在后盾默默执行就好。
应用场景:比方我的项目中有时须要每隔 5 分钟去 下载某个文件
,或者每隔 10 分钟去读取模板文件 生成动态 html 页面
等等,一些简略的周期性工作场景。
应用 Thread
类做定时工作的优缺点:
- 长处:这种定时工作非常简单,学习成本低,容易动手,对于那些简略的周期性工作,是个不错的抉择。
- 毛病:不反对指定某个工夫点执行工作,不反对提早执行等操作,性能过于繁多,无奈应答一些较为简单的场景。
2. 监听器
有时候,咱们须要写个监听器,去监听某些数据的变动。
比方:咱们在应用 canal
的时候,须要监听 binlog
的变动,可能及时把数据库中的数据,同步到另外一个业务数据库中。
如果间接写一个监听器去监听数据就太没意思了,咱们想实现这样一个性能:在配置核心有个开关,配置监听器是否开启,如果开启了应用单线程异步执行。
次要代码如下:
@Service
public CanalService {
private volatile boolean running = false;
private Thread thread;
@Autowired
private CanalConnector canalConnector;
public void handle() {
// 连贯 canal
while(running) {// 业务解决}
}
public void start() {thread = new Thread(this::handle, "name");
running = true;
thread.start();}
public void stop() {if(!running) {return;}
running = false;
}
}
在 start
办法中开启了一个线程,在该线程中异步执行 handle
办法的具体任务。而后通过调用 stop 办法,能够进行该线程。
其中,应用 volatile
关键字管制的 running 变量作为开关,它能够控制线程中的状态。
接下来,有个比拟要害的点是:如何通过配置核心的配置,管制这个开关呢?
以 apollo
配置为例,咱们在配置核心的后盾,批改配置之后,主动获取最新配置的外围代码如下:
public class CanalConfig {
@Autowired
private CanalService canalService;
@ApolloConfigChangeListener
public void change(ConfigChangeEvent event) {String value = event.getChange("test.canal.enable").getNewValue();
if(BooleanUtils.toBoolean(value)) {canalService.start();
} else {canalService.stop();
}
}
}
通过 apollo
的ApolloConfigChangeListener
注解,能够监听配置参数的变动。
如果 test.canal.enable
开关配置的 true
,则调用canalService
类的 start 办法开启 canal
数据同步性能。如果开关配置的 false
,则调用canalService
类的 stop
办法,主动进行 canal
数据同步性能。
3. 收集日志
在某些高并发的场景中,咱们须要收集局部用户的日志(比方:用户登录的日志),写到数据库中,以便于做剖析。
但因为我的项目中,还没有引入消息中间件,比方:kafka
、rocketmq
等。
如果间接将日志同步写入数据库,可能会影响接口性能。
所以,大家很天然想到了异步解决。
实现这个需要最简略的做法是,开启一个线程,异步写入数据到数据库即可。
这样做,能够是能够。
但如果用户登录操作的耗时,比异步写入数据库的工夫要少得多。这样导致的后果是:生产日志的速度,比生产日志的速度要快得多,最终的性能瓶颈在生产端。
其实,还有更优雅的解决形式,虽说没有应用消息中间件,但借用了它的思维。
这套记录登录日志的性能,分为:日志生产端、日志存储端和日志生产端。
先定义了一个阻塞队列。
@Component
public class LoginLogQueue {
private static final int QUEUE_MAX_SIZE = 1000;
private BlockingQueueblockingQueue queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
// 生成音讯
public boolean push(LoginLog loginLog) {return this.queue.add(loginLog);
}
// 生产音讯
public LoginLog poll() {
LoginLog loginLog = null;
try {loginLog = this.queue.take();
} catch (InterruptedException e) {e.printStackTrace();
}
return result;
}
}
而后定义了一个日志的生产者。
@Service
public class LoginSerivce {
@Autowired
private LoginLogQueue loginLogQueue;
public int login(UserInfo userInfo) {
// 业务解决
LoginLog loginLog = convert(userInfo);
loginLogQueue.push(loginLog);
}
}
接下来,定义了日志的消费者。
@Service
public class LoginInfoConsumer {
@Autowired
private LoginLogQueue queue;
@PostConstruct
public voit init {new Thread(() -> {while (true) {LoginLog loginLog = queue.take();
// 写入数据库
}
}).start();}
}
当然,这个例子中应用单线程接管登录日志,为了晋升性能,也能够应用线程池来解决业务逻辑(比方:写入数据库)等。
4.excel 导入
咱们可能会常常收到经营同学提过来的 excel 数据导入需要,比方:将某一大类下的所有子类一次性导入零碎,或者导入一批新的供应商数据等等。
咱们以导入供应商数据为例,它所波及的业务流程很长,比方:
- 调用天眼查接口校验企业名称和对立社会信用代码。
- 写入供应商根本表
- 写入组织表
- 给供应商主动创立一个用户
- 给该用户调配权限
- 自定义域名
- 发站内告诉
等等。
如果在程序中,解析完excel
,读取了所有数据之后。用单线程一条条解决业务逻辑,可能耗时会十分长。
为了晋升 excel
数据导入效率,十分有必要应用多线程来解决。
当然在 java
中实现多线程的伎俩有很多种,上面重点聊聊 java8
中最简略的实现形式:parallelStream
。
伪代码如下:
supplierList.parallelStream().forEach(x -> importSupplier(x));
parallelStream
是一个并行执行的流,它默认通过 ForkJoinPool
实现的,能进步你的多线程工作的速度。
ForkJoinPool
解决的过程会分而治之,它的核心思想是:将一个大工作切分成多个小工作
。每个小工作都能独自执行,最初它会把所用工作的执行后果进行汇总。
上面用一张图简略介绍一下 ForkJoinPool
的原理:
当然除了 excel 导入之外,还有相似的读取文本文件,也能够用相似的办法解决。
舒适的揭示一下,如果一次性导入的数据十分多,用多线程解决,可能会使零碎的
CPU
使用率飙升,须要特地关注。
5. 查问接口
很多时候,咱们须要在某个查问接口中,调用其余服务的接口,组合数据之后,一起返回。
比方有这样的业务场景:
在用户信息查问接口中须要返回:用户名称、性别、等级、头像、积分、成长值等信息。
而用户名称、性别、等级、头像在用户服务中,积分在积分服务中,成长值在成长值服务中。为了汇总这些数据对立返回,须要另外提供一个对外接口服务。
于是,用户信息查问接口须要调用用户查问接口、积分查问接口 和 成长值查问接口,而后汇总数据对立返回。
调用过程如下图所示:
调用近程接口总耗时 530ms = 200ms + 150ms + 180ms
显然这种串行调用近程接口性能是十分不好的,调用近程接口总的耗时为所有的近程接口耗时之和。
那么如何优化近程接口性能呢?
既然串行调用多个近程接口性能很差,为什么不改成并行呢?
如下图所示:
调用近程接口总耗时 200ms = 200ms
(即耗时最长的那次近程接口调用)
在 java8 之前能够通过实现 Callable
接口,获取线程返回后果。
java8
当前通过 CompleteFuture
类实现该性能。咱们这里以 CompleteFuture
为例:
public UserInfo getUserInfo(Long id) throws InterruptedException, ExecutionException {final UserInfo userInfo = new UserInfo();
CompletableFuture userFuture = CompletableFuture.supplyAsync(() -> {getRemoteUserAndFill(id, userInfo);
return Boolean.TRUE;
}, executor);
CompletableFuture bonusFuture = CompletableFuture.supplyAsync(() -> {getRemoteBonusAndFill(id, userInfo);
return Boolean.TRUE;
}, executor);
CompletableFuture growthFuture = CompletableFuture.supplyAsync(() -> {getRemoteGrowthAndFill(id, userInfo);
return Boolean.TRUE;
}, executor);
CompletableFuture.allOf(userFuture, bonusFuture, growthFuture).join();
userFuture.get();
bonusFuture.get();
growthFuture.get();
return userInfo;
}
舒适揭示一下,这两种形式别忘了应用 线程池
。示例中我用到了executor
,示意自定义的线程池,为了避免高并发场景下,呈现线程过多的问题。
6. 获取用户上下文
不晓得你在我的项目开发时,有没有遇到过这样的需要:用户登录之后,在所有的申请接口中,通过某个公共办法,就能获取到以后登录用户的信息?
获取的用户上下文,咱们以 CurrentUser
为例。
CurrentUser
外部蕴含了一个 ThreadLocal
对象,它负责保留以后线程的用户上下文信息。当然为了保障在线程池中,也能从用户上下文中获取到正确的用户信息,这里用了阿里的TransmittableThreadLocal
。伪代码如下:
@Data
public class CurrentUser {private static final TransmittableThreadLocal<CurrentUser> THREA_LOCAL = new TransmittableThreadLocal<>();
private String id;
private String userName;
private String password;
private String phone;
...
public statis void set(CurrentUser user) {THREA_LOCAL.set(user);
}
public static void getCurrent() {return THREA_LOCAL.get();
}
}
这里为什么用了阿里的 TransmittableThreadLocal
,而不是一般的ThreadLocal
呢?在线程池中,因为线程会被屡次复用,导致从一般的 ThreadLocal
中无奈获取正确的用户信息。父线程中的参数,没法传递给子线程,而 TransmittableThreadLocal
很好解决了这个问题。
而后在我的项目中定义一个全局的 spring mvc
拦截器,专门设置用户上下文到 ThreadLocal
中。伪代码如下:
public class UserInterceptor extends HandlerInterceptorAdapter {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {CurrentUser user = getUser(request);
if(Objects.nonNull(user)) {CurrentUser.set(user);
}
}
}
用户在申请咱们接口时,会先触发该拦截器,它会依据用户 cookie
中的 token
,调用调用接口获取redis
中的用户信息。如果能获取到,阐明用户曾经登录,则把用户信息设置到 CurrentUser
类的 ThreadLocal
中。
接下来,在 api
服务的上层,即 business 层的办法中,就能轻松通过 CurrentUser.getCurrent();
办法获取到想要的用户上下文信息了。
这套用户体系的想法是很 good 的,但深刻应用后,发现了一个小插曲:
api
服务和 mq
消费者服务都援用了 business
层,business
层中的办法两个服务都能间接调用。
咱们都晓得在 api
服务中用户是须要登录的,而 mq
消费者服务则不须要登录。
如果 business
中的某个办法刚开始是给 api
开发的,在办法深处应用了 CurrentUser.getCurrent();
获取用户上下文。但起初,某位新来的帅哥在 mq
消费者中也调用了那个办法,并未察觉这个小机关,就会中招,呈现找不到用户上下文的问题。
所以我过后的第一个想法是:代码没做兼容解决,因为之前这类问题偶然会产生一次。
想要解决这个问题,其实也很简略。只需先判断一下是否从 CurrentUser
中获取用户信息,如果不能,则取配置的零碎用户信息。伪代码如下:
@Autowired
private BusinessConfig businessConfig;
CurrentUser user = CurrentUser.getCurrent();
if(Objects.nonNull(user)) {entity.setUserId(user.getUserId());
entity.setUserName(user.getUserName());
} else {entity.setUserId(businessConfig.getDefaultUserId());
entity.setUserName(businessConfig.getDefaultUserName());
}
这种简略无公害的代码,如果只是在一两个中央加还 OK。
此外,家喻户晓,SimpleDateFormat
在 java8
以前,是用来解决工夫的工具类,它是非线程平安的。也就是说,用该办法解析日期会有线程平安问题。
为了防止线程平安问题的呈现,咱们能够把 SimpleDateFormat
对象定义成 局部变量
。但如果你肯定要把它定义成动态变量,能够应用ThreadLocal
保留日期,也能解决线程平安问题。
8. 传递参数
之前见过有些共事写代码时,一个十分乏味的用法,即:应用 MDC
传递参数。
MDC 是什么?
MDC
是 org.slf4j
包下的一个类,它的全称是Mapped Diagnostic Context
,咱们能够认为它是一个线程平安的寄存诊断日志的容器。
MDC
的底层是用了 ThreadLocal
来保留数据的。
例如当初有这样一种场景:咱们应用 RestTemplate
调用近程接口时,有时须要在 header
中传递信息,比方:traceId
,source
等,便于在查问日志时可能串联一次残缺的申请链路,疾速定位问题。
这种业务场景就能通过 ClientHttpRequestInterceptor
接口实现,具体做法如下:
第一步,定义一个 LogFilter
拦挡所有接口申请,在 MDC
中设置traceId
:
public class LogFilter implements Filter {
@Override
public void init(FilterConfig filterConfig) throws ServletException { }
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {MdcUtil.add(UUID.randomUUID().toString());
System.out.println("记录申请日志");
chain.doFilter(request, response);
System.out.println("记录响应日志");
}
@Override
public void destroy() {}
}
第二步,实现 ClientHttpRequestInterceptor
接口,MDC
中获取以后申请的 traceId
,而后设置到header
中:
public class RestTemplateInterceptor implements ClientHttpRequestInterceptor {
@Override
public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {request.getHeaders().set("traceId", MdcUtil.get());
return execution.execute(request, body);
}
}
第三步,定义配置类,配置下面定义的 RestTemplateInterceptor
类:
@Configuration
public class RestTemplateConfiguration {
@Bean
public RestTemplate restTemplate() {RestTemplate restTemplate = new RestTemplate();
restTemplate.setInterceptors(Collections.singletonList(restTemplateInterceptor()));
return restTemplate;
}
@Bean
public RestTemplateInterceptor restTemplateInterceptor() {return new RestTemplateInterceptor();
}
}
其中 MdcUtil
其实是利用 MDC
工具在 ThreadLocal
中存储和获取traceId
public class MdcUtil {
private static final String TRACE_ID = "TRACE_ID";
public static String get() {return MDC.get(TRACE_ID);
}
public static void add(String value) {MDC.put(TRACE_ID, value);
}
}
当然,这个例子中没有演示 MdcUtil
类的 add
办法具体调的中央,咱们能够在 filter
中执行接口办法之前,生成 traceId
,调用MdcUtil
类的 add 办法增加到 MDC
中,而后在同一个申请的其余中央就能通过 MdcUtil
类的 get 办法获取到该traceId
。
能应用 MDC
保留 traceId
等参数的根本原因是,用户申请到应用服务器,Tomcat
会从线程池中调配一个线程去解决该申请。
那么该申请的整个过程中,保留到 MDC
的ThreadLocal
中的参数,也是该线程独享的,所以不会有线程平安问题。
9. 模仿高并发
有时候咱们写的接口,在低并发的场景下,一点问题都没有。
但如果一旦呈现高并发调用,该接口可能会呈现一些意想不到的问题。
为了避免相似的事件产生,个别在我的项目上线前,咱们十分有必要对接口做一下 压力测试
。
当然,当初曾经有比拟成熟的压力测试工具,比方:Jmeter
、LoadRunner
等。
如果你感觉下载压测工具比拟麻烦,也能够手写一个简略的模仿并发操作的工具,用 CountDownLatch
就能实现,例如:
public static void concurrenceTest() {
/**
* 模仿高并发状况代码
*/
final AtomicInteger atomicInteger = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(1000); // 相当于计数器,当所有都筹备好了,再一起执行,模拟多并发,保障并发量
final CountDownLatch countDownLatch2 = new CountDownLatch(1000); // 保障所有线程执行完了再打印 atomicInteger 的值
ExecutorService executorService = Executors.newFixedThreadPool(10);
try {for (int i = 0; i < 1000; i++) {executorService.submit(new Runnable() {
@Override
public void run() {
try {countDownLatch.await(); // 始终阻塞以后线程,直到计时器的值为 0, 保障同时并发
} catch (InterruptedException e) {log.error(e.getMessage(),e);
}
// 每个线程减少 1000 次,每次加 1
for (int j = 0; j < 1000; j++) {atomicInteger.incrementAndGet();
}
countDownLatch2.countDown();}
});
countDownLatch.countDown();}
countDownLatch2.await();// 保障所有线程执行完
executorService.shutdown();} catch (Exception e){log.error(e.getMessage(),e);
}
}
10. 解决 mq 音讯
在高并发的场景中,音讯积压问题,能够说如影随形,真的没方法从根本上解决。外表上看,曾经解决了,但前面不晓得什么时候,就会冒出一次,比方这次:
有天下午,产品过去说:有几个商户投诉过去了,他们说菜品有提早,快查一下起因。
这次问题呈现得有点奇怪。
为什么这么说?
首先这个工夫点就有点奇怪,平时出问题,不都是中午或者早晨用餐高峰期吗?怎么这次问题呈现在下午?
依据以往积攒的教训,我间接看了 kafka
的topic
的数据,果然下面音讯有积压,但这次每个 partition
都积压了十几万的音讯没有生产,比以往加压的音讯数量减少了几百倍。这次音讯积压得极不寻常。
我连忙查服务监控看看消费者挂了没,还好没挂。又查服务日志没有发现异常。这时我有点迷茫,碰运气问了问订单组下午产生了什么事件没?他们说下午有个促销流动,跑了一个 JOB
批量更新过有些商户的订单信息。
这时,我一下子如梦初醒,是他们在 JOB 中批量发消息导致的问题。怎么没有告诉咱们呢?切实太坑了。
虽说晓得问题的起因了,倒是眼前积压的这十几万的音讯该如何解决呢?
此时,如果间接调大 partition
数量是不行的,历史音讯曾经存储到 4
个固定的 partition,只有新增的音讯才会到新的 partition。咱们重点须要解决的是已有的 partition。
间接加服务节点也不行,因为 kafka
容许同组的多个 partition
被一个 consumer
生产,但不容许一个 partition 被同组的多个 consumer 生产,可能会造成资源节约。
看来只有用 多线程
解决了。
为了紧急解决问题,我改成了用线程池解决音讯,外围线程和最大线程数都配置成了50
。
大抵用法如下:
- 先定义一个线程池:
@Configuration
public class ThreadPoolConfig {
@Value("${thread.pool.corePoolSize:5}")
private int corePoolSize;
@Value("${thread.pool.maxPoolSize:10}")
private int maxPoolSize;
@Value("${thread.pool.queueCapacity:200}")
private int queueCapacity;
@Value("${thread.pool.keepAliveSeconds:30}")
private int keepAliveSeconds;
@Value("${thread.pool.threadNamePrefix:ASYNC_}")
private String threadNamePrefix;
@Bean("messageExecutor")
public Executor messageExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setThreadNamePrefix(threadNamePrefix);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
- 再定义一个音讯的 consumer:
@Service
public class MyConsumerService {
@Autowired
private Executor messageExecutor;
@KafkaListener(id="test",topics={"topic-test"})
public void listen(String message){System.out.println("收到音讯:" + message);
messageExecutor.submit(new MyWork(message);
}
}
- 在定义的
Runable
实现类中解决业务逻辑:
public class MyWork implements Runnable {
private String message;
public MyWork(String message) {this.message = message;}
@Override
public void run() {System.out.println(message);
}
}
果然,调整之后音讯积压数量的确降落的十分快,大概半小时后,积压的音讯就十分顺利的解决完了。
但此时有个更重大的问题呈现:我收到了报警邮件,有两个订单零碎的节点宕机了。。。
11. 统计数量
在多线程的场景中,有时候须要统计数量,比方:用多线程导入供应商数据时,统计导入胜利的供应商数有多少。
如果这时候用 count++ 统计次数,最终的后果可能会不准。因为 count++ 并非原子操作,如果多个线程同时执行该操作,则统计的次数,可能会出现异常。
为了解决这个问题,就须要应用 concurent
的atomic
包上面的类,比方:AtomicInteger
、AtomicLong
等。
@Servcie
public class ImportSupplierService {private static AtomicInteger count = new AtomicInteger(0);
public int importSupplier(List<SupplierInfo> supplierList) {if(CollectionUtils.isEmpty(supplierList)) {return 0;}
supplierList.parallelStream().forEach(x -> {
try {importSupplier(x);
count.addAndGet(1);
} catch(Exception e) {log.error(e.getMessage(),e);
}
);
return count.get();}
}
AtomicInteger
的底层说白了应用 自旋锁
+CAS
。
public final int incrementAndGet() {for (;;) {int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
自旋锁
说白了就是一个 死循环
。
而 CAS
是比拟
和替换
的意思。
它的实现逻辑是:将内存地位处的 旧值
与预期值
进行比拟,若相等,则将内存地位处的值替换为 新值
。若不相等,则不做任何操作。
12. 提早定时工作
咱们常常有提早解决数据的需要,比方:如果用户下单后,超过 30 分钟还未实现领取,则零碎主动将该订单勾销。
这里需要就能够应用 提早定时工作
实现。
ScheduledExecutorService
是 JDK1.5+
版本引进的定时工作,该类位于 java.util.concurrent
并发包下。
ScheduledExecutorService 是基于多线程的,设计的初衷是为了解决 Timer
单线程执行,多个工作之间会相互影响的问题。
它次要蕴含 4 个办法:
- schedule(Runnable command,long delay,TimeUnit unit),带延迟时间的调度,只执行一次,调度之后可通过 Future.get()阻塞直至工作执行结束。
- schedule(Callablecallable,long delay,TimeUnit unit),带延迟时间的调度,只执行一次,调度之后可通过 Future.get()阻塞直至工作执行结束,并且能够获取执行后果。
- scheduleAtFixedRate,示意以固定频率执行的工作,如果当前任务耗时较多,超过定时周期 period,则当前任务完结后会立刻执行。
- scheduleWithFixedDelay,示意以固定延时执行工作,延时是绝对当前任务完结为终点计算开始工夫。
实现这种定时工作的具体代码如下:
public class ScheduleExecutorTest {
public static void main(String[] args) {ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
scheduledExecutorService.scheduleAtFixedRate(() -> {System.out.println("doSomething");
},1000,1000, TimeUnit.MILLISECONDS);
}
}
调用 ScheduledExecutorService
类的 scheduleAtFixedRate
办法实现周期性工作,每隔 1 秒钟执行一次,每次提早 1 秒再执行。
这种定时工作是阿里巴巴开发者标准中用来代替 Timer
类的计划,对于多线程执行周期性工作,是个不错的抉择。
应用 ScheduledExecutorService
类做提早定时工作的优缺点:
- 长处:基于多线程的定时工作,多个工作之间不会相干影响,反对周期性的执行工作,并且带提早性能。
- 毛病:不反对一些较简单的定时规定。
当然,你也能够应用分布式定时工作,比方:xxl-job
或者 elastic-job
等等。
其实,在理论工作中我应用多线程的场景远远不只这 12 种,在这里只是抛砖引玉,介绍了一些我认为比拟常见的业务场景。
文章来自:苏三说技术