欢迎访问宙启技术站
智能推送

Java函数:如何实现分布式锁

发布时间:2023-06-12 19:22:29

分布式锁是一种在分布式系统中用户协调进程或线程实现互斥的技术。对于多个进程或线程访问同一个资源的情况,通过使用分布式锁可以防止并发出现问题,保障数据的一致性和正确性。

在Java中,实现分布式锁的方式有很多种,本文将介绍其中比较常见的几种实现方式。

1. 基于Redis实现分布式锁

Redis是一个开源的内存数据存储系统,支持多种数据结构,特别适合于高并发、分布式应用环境中的数据处理。使用Redis的SETNX命令可以实现分布式锁。

示例代码如下:

public class DistributedLock {

    private static JedisPool jedisPool = new JedisPool("localhost", 6379);

    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";
    private static final Long RELEASE_SUCCESS = 1L;

    /**
     * 尝试获取分布式锁
     *
     * @param jedis      Redis客户端
     * @param lockKey    锁
     * @param requestId  请求标识
     * @param expireTime 超期时间
     * @return 是否获取成功
     */
    public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
        return LOCK_SUCCESS.equals(result);
    }

    /**
     * 释放分布式锁
     *
     * @param jedis     Redis客户端
     * @param lockKey   锁
     * @param requestId 请求标识
     * @return 是否释放成功
     */
    public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
        return RELEASE_SUCCESS.equals(result);
    }

}

该代码使用了Jedis连接池来创建Redis连接,使用了SETNX命令来获取锁,使用LUA脚本来释放锁。其中,SETIFNOTEXISTS参数保证了只有一个进程可以获取锁,SETWITHEXPITETIME参数保证了获取锁之后如果进程异常宕机导致锁没有释放,过期时间到达后可以自动释放锁。

2. 基于Zookeeper实现分布式锁

Zookeeper是一个开源的分布式协调服务框架,可以用于实现分布式锁。例如,可以在Zookeeper中创建一个顺序节点,获取锁的进程是顺序节点序号最小的进程,在释放锁的时候删除该顺序节点即可。

示例代码如下:

public class DistributedLock {
    private static String hosts = "localhost:2181";
    private static String lockPath = "/lock";
    private static int sessionTimeout = 3000;
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    private static ZooKeeper zkClient;
    private String lockId;

    public DistributedLock() {
        try {
            zkClient = new ZooKeeper(hosts, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                        countDownLatch.countDown();
                    }
                }
            });
            countDownLatch.await();
            // 创建锁节点
            Stat exists = zkClient.exists(lockPath, false);
            if (exists == null) {
                zkClient.create(lockPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取分布式锁
     *
     * @return 是否获得锁
     */
    public boolean acquire() {
        try {
            // 创建锁节点
            String path = zkClient.create(lockPath + "/", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            // 如果创建成功
            if (path != null) {
                lockId = path.substring(lockPath.length() + 1);
                // 判断是否是顺序号最小的节点,如果是,表示获得锁
                List<String> children = zkClient.getChildren(lockPath, false);
                Collections.sort(children);
                if (lockId.equals(children.get(0))) {
                    return true;
                } else {
                    // 不是顺序号最小的节点,删除自己创建的节点
                    zkClient.delete(path, -1);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 释放分布式锁
     *
     * @return 是否释放成功
     */
    public boolean release() {
        try {
            zkClient.delete(lockPath + "/" + lockId, -1);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
}

该代码使用了ZooKeeper客户端来连接ZooKeeper服务器,使用了创建序列节点的方法来获取锁,使用了删除节点的方法来释放锁。

3. 基于数据库实现分布式锁

在关系型数据库中,可以使用数据库中的一个表来存储锁的信息。例如,可以在该表中创建一个 索引来实现对某一资源的加锁。

示例代码如下:

public class DistributedLock {
    private static String driver = "com.mysql.jdbc.Driver";
    private static String url = "jdbc:mysql://localhost:3306/test";
    private static String username = "root";
    private static String password = "123456";
    private static Connection connection;

    static {
        try {
            Class.forName(driver);
            connection = DriverManager.getConnection(url, username, password);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取分布式锁
     *
     * @param lockName    锁名称
     * @param expireTime  锁超时时间
     * @param retryTimes  获取锁的最大重试次数
     * @param sleepMillis 获取锁失败后的等待时间
     * @return 是否获得锁
     */
    public static boolean acquire(String lockName, long expireTime, int retryTimes, long sleepMillis) {
        int tried = 0;
        while (tried < retryTimes) {
            try {
                PreparedStatement preparedStatement = connection.prepareStatement("INSERT INTO lock_table VALUES (?)");
                preparedStatement.setString(1, lockName);
                preparedStatement.executeUpdate();
                return true;
            } catch (SQLException e) {
                // 数据库异常或者锁已存在
                try {
                    ResultSet resultSet = connection.prepareStatement("SELECT GET_LOCK(?, ?)").executeQuery();
                    if (resultSet.next() && resultSet.getInt(1) == 1) {
                        return true;
                    }
                } catch (SQLException e1) {
                    // 数据库异常
                }
            }
            tried++;
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // 线程被打断
            }
        }
        return false;
    }

    /**
     * 释放分布式锁
     *
     * @param lockName 锁名称
     * @return 是否释放成功
     */
    public static boolean release(String lockName) {
        try {
            PreparedStatement preparedStatement = connection.prepareStatement("DELETE FROM lock_table WHERE lock_name = ?");
            preparedStatement.setString(1, lockName);
            int count = preparedStatement.executeUpdate();
            return count > 0;
        } catch (SQLException e) {
            // 数据库异常
        }
        return false;
    }
}

该代码使用了JDBC连接池来创建数据库连接,使用了MySQL中的GETLOCK函数来获取锁。如果获取锁失败,则等待一段时间后重试,最多重试retryTimes次。释放锁时,只需在锁表中删除该锁即可。

以上是基于Redis、ZooKeeper和数据库实现分布式锁的示例代码,根据实际需求可以选择不同的实现方式。使用