关于后端:Java分布式环境下并发编程实践

3次阅读

共计 10649 个字符,预计需要花费 27 分钟才能阅读完成。

在 Java 中,咱们能够应用线程来实现并发编程,然而在多线程编程中,咱们须要思考线程平安、锁、死锁等问题。本文将介绍 Java 中的并发编程,包含线程平安、锁、死锁等内容,同时提供理论的代码案例,让读者更容易了解和把握。

随着分布式系统越来越遍及,分布式系统中的并发编程成为了一个重要的话题。Java 作为一种高级编程语言,其并发编程能力失去了宽泛的认可。但在分布式系统中,Java 并发编程面临着一些新的挑战。本文将介绍在分布式系统下 Java 并发编程的一些技术和理论案例。

一、线程平安

在多线程编程中,线程平安是一个重要的问题。如果多个线程同时拜访同一个共享资源,就会呈现线程平安问题。例如,在银行账户转账时,如果多个线程同时对同一个账户进行操作,就会呈现线程平安问题。

解决线程平安问题的办法之一是应用 synchronized 关键字。synchronized 关键字能够将代码块或办法锁定,保障同一时间只有一个线程能够执行该代码块或办法。

上面是一个应用 synchronized 关键字的示例:

public class Counter {
    private int count;
    
    public synchronized void increment() {count++;}
    
    public synchronized void decrement() {count--;}
    
    public int getCount() {return count;}
}

在这个示例中,Counter 类有两个办法 increment()和 decrement(),它们都是应用 synchronized 关键字来保障线程平安。这样,同一时间只有一个线程能够执行 increment()和 decrement()办法。

二、锁

在 Java 中,锁是一种同步机制,能够用于管制多个线程对共享资源的拜访。Java 中的锁有两种类型:内置锁和显式锁。

内置锁是 Java 中的一个非凡对象,每个对象都有一个内置锁。能够应用 synchronized 关键字来获取内置锁。例如:

public synchronized void increment() {count++;}

在这个示例中,synchronized 关键字获取了 Counter 对象的内置锁。这样,在同一时间只有一个线程能够拜访 increment()办法。

显式锁是 Java 中的另一种锁类型,能够应用 java.util.concurrent.locks 包中的 Lock 接口来实现。与内置锁不同,显式锁提供了更多的灵活性和管制。例如:

public class Counter {
    private int count;
    private Lock lock = new ReentrantLock();
    
    public void increment() {lock.lock();
        try {count++;} finally {lock.unlock();
        }
    }
    
    public void decrement() {lock.lock();
        try {count--;} finally {lock.unlock();
        }
    }
    
    public int getCount() {return count;}
}

在这个示例中,Counter 类应用 ReentrantLock 类来创立一个显式锁。increment()和 decrement()办法获取锁并开释锁。这样,在同一时间只有一个线程能够拜访 increment()和 decrement()办法。

三、死锁

<img src=”https://image.xiaoxiaofeng.site/blog/image/image-20230409205211771.png?xiaoxiaofeng” alt=”image-20230409205211771″ style=”zoom:50%;” />

死锁是多线程编程中的一种问题,它产生在两个或多个线程相互期待对方开释锁的状况下。例如:

public class DeadlockExample {private final Object lock1 = new Object();
    private final Object lock2 = new Object();
    
    public void method1() {synchronized (lock1) {
            // do something
            synchronized (lock2) {// do something}
        }
    }
    
    public void method2() {synchronized (lock2) {
            // do something
            synchronized (lock1) {// do something}
        }
    }
}

在这个示例中,DeadlockExample 类有两个办法 method1()和 method2(),它们都应用两个锁 lock1 和 lock2。如果一个线程调用 method1()办法并获取了 lock1 锁,另一个线程调用 method2()办法并获取了 lock2 锁,那么两个线程都无奈继续执行,因为它们都在期待对方开释锁。这就是死锁。

防止死锁的办法之一是应用定时锁。定时锁能够在肯定工夫内主动开释锁,防止死锁。例如:

public class DeadlockExample {private final Object lock1 = new Object();
    private final Object lock2 = new Object();
    private final Lock timedLock1 = new ReentrantLock();
    private final Lock timedLock2 = new ReentrantLock();
    
    public void method1() {timedLock1.lock();
        try {
            // do something
            if (timedLock2.tryLock(500, TimeUnit.MILLISECONDS)) {
                try {// do something} finally {timedLock2.unlock();
                }
            }
        } catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {timedLock1.unlock();
        }
    }
    
    public void method2() {timedLock2.lock();
        try {
            // do something
            if (timedLock1.tryLock(500, TimeUnit.MILLISECONDS)) {
                try {// do something} finally {timedLock1.unlock();
                }
            }
        } catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {timedLock2.unlock();
        }
    }
}

在这个示例中,DeadlockExample 类应用定时锁 timedLock1 和 timedLock2 来防止死锁。如果一个线程调用 method1()办法并获取了 timedLock1 锁,另一个线程调用 method2()办法并获取了 timedLock2 锁,那么它们会期待一段时间,如果在这段时间内无奈获取到对方的锁,就会主动开释本人的锁,防止死锁。

四、分布式系统下的并发编程挑战

<img src=”https://image.xiaoxiaofeng.site/blog/image/image-20230409205403525.png?xiaoxiaofeng” alt=”image-20230409205403525″ style=”zoom:50%;” />

在分布式系统中,因为不同的节点之间通过网络进行通信,因而会带来以下一些挑战:

  1. 网络提早

在分布式系统中,因为节点之间通过网络进行通信,因而会存在网络提早。这会导致节点之间的通信变慢,从而影响并发编程的效率。为了解决这个问题,能够采纳异步编程模型,即通过回调函数的形式来解决网络通信。

  1. 数据一致性

在分布式系统中,因为数据分布在不同的节点上,因而会存在数据一致性的问题。如果不同节点上的数据不统一,就会导致系统出现异常。为了解决这个问题,能够采纳分布式锁或者分布式事务来保证数据一致性。

  1. 容错性

在分布式系统中,因为节点之间存在网络通信,因而会存在节点宕机的状况。为了保证系统的容错性,须要采纳一些容错机制,例如备份节点、主动故障转移等。

五、分布式锁的实现

在分布式系统中,为了保证数据一致性,须要采纳分布式锁来管制对共享资源的拜访。上面介绍一种基于 Redis 实现的分布式锁。

<img src=”https://image.xiaoxiaofeng.site/blog/image/image-20230409205625391.png?xiaoxiaofeng” alt=”image-20230409205625391″ style=”zoom:50%;” />

1. Redis 实现分布式锁的原理

Redis 是一个高性能的键值存储系统,反对多种数据结构,例如字符串、哈希表、列表等。Redis 提供了一种原子性的操作,能够实现分布式锁。

实现分布式锁的原理如下:

1)客户端向 Redis 发送一个 SETNX 命令,尝试去设置一个 key 的值为 1,如果这个 key 不存在,则设置胜利,否则设置失败。

2)客户端设置了这个 key 的值为 1 之后,就领有了这个锁。

3)其余客户端也能够向 Redis 发送 SETNX 命令,尝试去设置这个 key 的值为 1,然而因为这个 key 曾经存在了,因而设置失败。

4)当客户端实现了对共享资源的拜访之后,须要将这个 key 删除,以便其余客户端能够取得这个锁。

2. Redis 实现分布式锁的代码实现

上面是基于 Redis 实现分布式锁的代码实现:

public class RedisDistributedLock {
    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";
    private static final Long RELEASE_SUCCESS = 1L;
    private static final String LOCK_PREFIX = "lock:";
    private JedisPool jedisPool;

    public RedisDistributedLock(JedisPool jedisPool) {this.jedisPool = jedisPool;}

    public boolean tryLock(String key, String requestId, int expireTime) {
        Jedis jedis = null;
        try {jedis = jedisPool.getResource();
            String lockKey = LOCK_PREFIX + key;
            String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
            if (LOCK_SUCCESS.equals(result)) {return true;}
        } finally {if (jedis != null) {jedis.close();
            }
        }
        return false;
    }

    public boolean releaseLock(String key, String requestId) {
        Jedis jedis = null;
        try {jedis = jedisPool.getResource();
            String lockKey = LOCK_PREFIX + key;
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
            if (RELEASE_SUCCESS.equals(result)) {return true;}
        } finally {if (jedis != null) {jedis.close();
            }
        }
        return false;
    }
}

3. Redisson 实现分布式锁的代码实现

当然,这里介绍的是比拟原生的形式,咱们也能够间接应用 Redisson 框架封装的分布式锁。

Redisson 是一个基于 Redis 的 Java 客户端,提供了丰盛的分布式数据结构和服务。其中就包含分布式锁的实现,上面介绍一下如何应用 Redisson 实现分布式锁。

  • 引入 Redisson 依赖
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.15.5</version>
</dependency>
  • 应用分布式锁

好的,上面提供一个更具体的代码示例:

import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

import java.util.concurrent.TimeUnit;

public class DistributedLockDemo {public static void main(String[] args) {
        // 创立 Redisson 客户端
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        RedissonClient redisson = Redisson.create(config);

        // 获取分布式锁
        RLock lock = redisson.getLock("mylock");
        try {
            // 尝试获取锁,等待时间为 10 秒,锁的有效期为 5 秒
            boolean isLocked = lock.tryLock(10, 5, TimeUnit.SECONDS);
            if (isLocked) {
                // 获取锁胜利,执行业务逻辑
                System.out.println("获取锁胜利,执行业务逻辑...");
                Thread.sleep(3000); // 模仿业务逻辑耗时
            } else {
                // 获取锁失败,解决异常情况
                System.out.println("获取锁失败,解决异常情况...");
            }
        } catch (Exception e) {
            // 解决异常情况
            System.out.println("解决异常情况...");
        } finally {
            // 开释锁
            lock.unlock();
            System.out.println("开释锁...");
        }

        // 敞开 Redisson 客户端
        redisson.shutdown();}
}

在下面的代码中,咱们首先创立了一个 RedissonClient 对象,而后通过该对象获取一个 RLock 对象。在 try...catch...finally 代码块中,咱们调用 tryLock 办法尝试获取锁,如果获取胜利就执行业务逻辑;否则就解决异常情况。最初,在 finally 代码块中开释锁,并敞开 RedissonClient 对象。

须要留神的是,在理论利用中,咱们须要将下面的代码封装成一个可重入的分布式锁工具类,不便各个业务模块应用。

六、分布式事务的实现

在分布式系统中,为了保证数据一致性,须要采纳分布式事务来管制对共享资源的拜访。上面介绍一种基于 XA 协定实现的分布式事务。

<img src=”https://image.xiaoxiaofeng.site/blog/image/image-20230409205723358.png?xiaoxiaofeng” alt=”image-20230409205723358″ style=”zoom:33%;” />

1. XA 协定的原理

XA 协定是一种分布式事务协定,能够用于协调多个数据库的事务。XA 协定的原理如下:

1)事务管理器向数据库发送 XA START 命令,开始一个分布式事务。

2)事务管理器向数据库发送 XA END 命令,完结一个分布式事务。

3)事务管理器向数据库发送 XA PREPARE 命令,筹备提交一个分布式事务。

4)如果所有数据库都筹备好提交事务,则事务管理器向数据库发送 XA COMMIT 命令,提交分布式事务。

5)如果有任何一个数据库无奈提交事务,则事务管理器向所有数据库发送 XA ROLLBACK 命令,回滚分布式事务。

2. XA 协定的代码实现

上面是基于 XA 协定实现分布式事务的代码实现:

public class XADistributedTransaction {
    private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
    private static final String DB_URL = "jdbc:mysql://localhost:3306/test";
    private static final String USER = "root";
    private static final String PASSWORD = "root";
    private static final String XA_DATASOURCE_CLASSNAME = "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource";
    private static final String XA_DATASOURCE_URL = "jdbc:mysql://localhost:3306/test";
    private static final String XA_DATASOURCE_USER = "root";
    private static final String XA_DATASOURCE_PASSWORD = "root";
    private static final String XID_PREFIX = "xa_";
    private static final String TABLE_NAME = "account";
    private static final String TABLE_SCHEMA = "CREATE TABLE account (id INT PRIMARY KEY, balance INT)";
    private static final String INSERT_SQL = "INSERT INTO account (id, balance) VALUES (?, ?)";
    private static final String UPDATE_SQL = "UPDATE account SET balance = ? WHERE id = ?";

    public void transferMoney(int fromId, int toId, int amount) throws SQLException {XADataSource xaDataSource = getXADataSource();
        Connection connection = xaDataSource.getXAConnection().getConnection();
        Xid xid = createXid();
        try {connection.setAutoCommit(false);
            XAResource xaResource = getXAResource(connection);
            xaResource.start(xid, XAResource.TMNOFLAGS);
            try (PreparedStatement preparedStatement = connection.prepareStatement(UPDATE_SQL)) {preparedStatement.setInt(1, getBalance(connection, fromId) - amount);
                preparedStatement.setInt(2, fromId);
                preparedStatement.executeUpdate();}
            xaResource.end(xid, XAResource.TMSUCCESS);

            xaResource.start(xid, XAResource.TMNOFLAGS);
            try (PreparedStatement preparedStatement = connection.prepareStatement(UPDATE_SQL)) {preparedStatement.setInt(1, getBalance(connection, toId) + amount);
                preparedStatement.setInt(2, toId);
                preparedStatement.executeUpdate();}
            xaResource.end(xid, XAResource.TMSUCCESS);

            int prepare = xaResource.prepare(xid);
            if (prepare == XAResource.XA_OK) {xaResource.commit(xid, false);
            } else {xaResource.rollback(xid);
            }

            connection.commit();} catch (SQLException | XAException e) {connection.rollback();
            throw e;
        } finally {connection.close();
        }
    }

    private XADataSource getXADataSource() throws SQLException {MysqlXADataSource xaDataSource = new MysqlXADataSource();
        xaDataSource.setUrl(XA_DATASOURCE_URL);
        xaDataSource.setUser(XA_DATASOURCE_USER);
        xaDataSource.setPassword(XA_DATASOURCE_PASSWORD);
        return xaDataSource;
    }

    private Xid createXid() throws XAException {byte[] gtrid = new byte[10];
        byte[] bqual = new byte[10];
        Arrays.fill(gtrid, (byte) 9);
        Arrays.fill(bqual, (byte) 9);
        return new XidImpl(0x1234, gtrid, bqual);
    }

    private XAResource getXAResource(Connection connection) throws SQLException {return connection.unwrap(XAResource.class);
    }

    private int getBalance(Connection connection, int id) throws SQLException {try (PreparedStatement preparedStatement = connection.prepareStatement("SELECT balance FROM account WHERE id = ?")) {preparedStatement.setInt(1, id);
            try (ResultSet resultSet = preparedStatement.executeQuery()) {if (resultSet.next()) {return resultSet.getInt("balance");
                }
            }
        }
        throw new RuntimeException("Account not found:" + id);
    }

    public void init() throws SQLException {try (Connection connection = DriverManager.getConnection(DB_URL, USER, PASSWORD)) {try (Statement statement = connection.createStatement()) {statement.executeUpdate("DROP TABLE IF EXISTS account");
                statement.executeUpdate(TABLE_SCHEMA);
                try (PreparedStatement preparedStatement = connection.prepareStatement(INSERT_SQL)) {preparedStatement.setInt(1, 1);
                    preparedStatement.setInt(2, 1000);
                    preparedStatement.executeUpdate();}
                try (PreparedStatement preparedStatement = connection.prepareStatement(INSERT_SQL)) {preparedStatement.setInt(1, 2);
                    preparedStatement.setInt(2, 1000);
                    preparedStatement.executeUpdate();}
            }
        }
    }
}

七、总结

本文介绍了 Java 中的并发编程,包含线程平安、锁、死锁等内容。在多线程编程中,线程平安是一个重要的问题,能够应用 synchronized 关键字或显式锁来实现。死锁是一个常见的问题,能够应用定时锁来防止。多线程编程须要认真思考线程平安和锁的问题,能力保障程序的正确性和性能。

并且解说了在分布式系统下 Java 并发编程的一些技术和理论案例。在分布式系统中,Java 并发编程须要面对网络提早、数据一致性和容错性等挑战,须要采纳一些技术和机制来解决这些问题。例如,能够采纳基于 Redis 实现的分布式锁来管制对共享资源的拜访,也能够采纳基于 XA 协定实现的分布式事务来保证数据一致性。

正文完
 0