Lettuce连接池

       Lettuce 连接被设计为线程安全,所以一个连接可以被多个线程共享,同时lettuce连接默认是自动重连.虽然连接池在大多数情况下是不必要的,但在某些用例中可能是有用的.lettuce提供通用的连接池支持. 如有疏漏后续会更新 https://www.cnblogs.com/wei-zw/p/9163687.html

连接池是否有必要?

     Lettuce被线程安全的,它满足了多数场景需求. 所有Redis用户的操作是单线程执行的.使用多连接并不能改善一个应用的性能. 阻塞操作的使用通常与获得专用连接的工作线程结合在一起.
        使用Redis事务是使用动态连接池的典型场景,因为需要专用连接的线程数趋于动态.也就是说,动态连接池的需求是有限的.连接池总是伴随着复杂性和维护成本提升.

同步连接池

  使用命令式编程,同步连接池是正确的选择,因为它在用于执行执行Redis命令的线程上执行所有操作.

   前提条件
       Lettuce需要依赖 Apache的 common-pool2至少是2.2)提供连接池. 确认在你的classpath下包含这个依赖.否则你就不能使用连接池.
如果使用Maven,向你的pom.xml添加如下依赖

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.3</version>
</dependency>

 连接池支持

Lettuce提供通用连接池支持,它需要一个用于创建任何支持类型连接单个,发布订阅,哨兵,主从,集群)的提供者. ConnectionPoolSupport 将根据你的需求创建一个 GenericObjectPool或SoftReferenceObjectPool. 连接池可以分配包装类型或直接连接

包装实例在调用StatefulConnection.close)时,会将连接归还到连接池
直接连接需要调用GenericObjectPool.returnObject…)归还到连接池

基本用法:

  包装连接

  GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig);
        poolConfig.setMaxIdle2);

        GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport.createGenericObjectPool
                ) -> client.connect), poolConfig);
        for int i = 0; i < 10; i++) {
            StatefulRedisConnection<String, String> connection = pool.borrowObject);
            RedisCommands<String, String> sync = connection.sync);
            sync.ping);
            connection.close);
        }

直接连接

     GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport.createGenericObjectPool
                ) -> client.connect), new GenericObjectPoolConfig), false);

        for int i = 0; i < 10; i++) {
            StatefulRedisConnection<String, String> connection = pool.borrowObject);
            RedisCommands<String, String> sync = connection.sync);
            sync.ping);
       //主动将连接归还到连接池  pool.returnObjectconnection); }

  

相关源码分析

 public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool
            Supplier<T> connectionSupplier, GenericObjectPoolConfig config, boolean wrapConnections) {

        LettuceAssert.notNullconnectionSupplier, "Connection supplier must not be null");
        LettuceAssert.notNullconfig, "GenericObjectPoolConfig must not be null");

        AtomicReference<ObjectPool<T>> poolRef = new AtomicReference<>);

        GenericObjectPool<T> pool = new GenericObjectPool<T>new RedisPooledObjectFactory<T>connectionSupplier), config) {

            @Override
            public T borrowObject) throws Exception {
                //如果wrapConnection 设置为true,则对连接创建动态代理
                return wrapConnections ? wrapConnectionsuper.borrowObject), this) : super.borrowObject);
            }

            @Override
            public void returnObjectT obj) {

                if wrapConnections && obj instanceof HasTargetConnection) {
                    super.returnObjectT) HasTargetConnection) obj).getTargetConnection));
                    return;
                }
                super.returnObjectobj);
            }
        };

        poolRef.setpool);

        return pool;
    }

  创建一个包装类型到连接

 private static <T> T wrapConnectionT connection, ObjectPool<T> pool) {
        
        //创建调用处理器
        ReturnObjectOnCloseInvocationHandler<T> handler = new ReturnObjectOnCloseInvocationHandler<T>connection, pool);

        Class<?>[] implementedInterfaces = connection.getClass).getInterfaces);
        Class[] interfaces = new Class[implementedInterfaces.length + 1];
        interfaces[0] = HasTargetConnection.class;
        System.arraycopyimplementedInterfaces, 0, interfaces, 1, implementedInterfaces.length);
        //创建代理连接
        T proxiedConnection = T) Proxy.newProxyInstanceconnection.getClass).getClassLoader), interfaces, handler);
        //向连接调用处理器设置代理连接
        handler.setProxiedConnectionproxiedConnection);
        //返回代理连接
        return proxiedConnection;
    }

  包装类型连接的动态调用处理器

  private static class ReturnObjectOnCloseInvocationHandler<T> extends AbstractInvocationHandler {
        //被代理对连接
        private T connection;
        private T proxiedConnection;
        private Map<Method, Object> connectionProxies = new ConcurrentHashMap<>5, 1);
        //连接池
        private final ObjectPool<T> pool;

        ReturnObjectOnCloseInvocationHandlerT connection, ObjectPool<T> pool) {
            this.connection = connection;
            this.pool = pool;
        }
        
        //设置代理连接
        void setProxiedConnectionT proxiedConnection) {
            this.proxiedConnection = proxiedConnection;
        }

        @Override
        protected Object handleInvocationObject proxy, Method method, Object[] args) throws Throwable {
             //如果调用方法是  getStatefulConnection则返回代理连接
            if method.getName).equals"getStatefulConnection")) {
                return proxiedConnection;
            }
            //如果调用的方法是getTargetConnection 则返回真实连接
            if method.getName).equals"getTargetConnection")) {
                return connection;
            }
            //如果真实连接为null则抛出异常
            if connection == null) {
                throw new RedisException"Connection is deallocated and cannot be used anymore.");
            }
            //如果调用的方法是close则将代理连接归还到连接池,并将真实连接设置和代理连接设置为null
            if method.getName).equals"close")) {
                pool.returnObjectproxiedConnection);
                connection = null;
                proxiedConnection = null;
                connectionProxies.clear);
                return null;
            }

            try {
                //如果调用方法是获取连接则从代理连接池中获取,如果没有则创建代理连接并放入缓存
                if method.getName).equals"sync") || method.getName).equals"async") || method.getName).equals"reactive")) {
                    return connectionProxies.computeIfAbsent
                            method, m -> getInnerProxymethod, args));
                }
                //其它方法不在多任何拦截
                return method.invokeconnection, args);

            } catch InvocationTargetException e) {
                throw e.getTargetException);
            }
        }

        @SuppressWarnings"unchecked")
        private Object getInnerProxyMethod method, Object[] args) {

            try {
                Object result = method.invokeconnection, args);

                result = Proxy.newProxyInstancegetClass).getClassLoader), result.getClass).getInterfaces),
                        new DelegateCloseToConnectionInvocationHandler<>AutoCloseable) proxiedConnection, result));

                return result;
            } catch IllegalAccessException e) {
                throw new RedisExceptione);
            } catch InvocationTargetException e) {
                throw new RedisExceptione.getTargetException));

            }
        }

        public T getConnection) {
            return connection;
        }
    }

  

Published by

风君子

独自遨游何稽首 揭天掀地慰生平