这篇文章给大家聊聊关于c多线程访问网站源码分享,以及多线程编程c对应的知识点,希望对各位有所帮助,不要忘了收藏本站哦。
ConcurrentLinkedQueue源码浅析
队列是一种常见的数据结构,主要特点是FIFO,Java为其定义了接口类:Queue,并提供了丰富的实现,有底层基于数组的[有界]队列,也有基于节点链接的无界队列,有阻塞队列,有非阻塞队列,还有并发安全的队列。
常见的队列实现的两种方式:数组、节点链接。
Java对队列的基本实现在包:java.util中,对并发安全实现主要存在于Java的JUC包下。在使用Java的线程池工具:ThreadPoolExecutor,其使用阻塞队列来缓存任务,因为阻塞队列具备通知唤醒的功能,能够在任务添加或消耗时进行线程通知,同时保证了线程并发安全;而JUC中还有另外一种Queue,是并发安全的非阻塞队列,那就是:ConcurrentLinkedQueue。
适用场景
ConcurrentLinkedQueue是java提供的一个无界非阻塞的FIFO队列,具备并发安全特性。适用于多线程共享访问相同的集合,要求多线程主动获取而不是线程阻塞等待通知;并且于队列的大小要求无限制,这常常是使用节点链接的形式来实现。不过,无界的场景也可能会导致内存占用过大。
底层实现(本文基于jdk15源码)
从名称我们可以看出来ConcurrentLinedQueue是基于节点链接的形式。节点的定义如下:
staticfinalclassNode<E>{\nvolatileEitem;\nvolatileNode<E>next;\nNode(Eitem){\nITEM.set(this,item);\n}\n/**Constructsadeaddummynode.*/\nNode(){}\nvoidappendRelaxed(Node<E>next){\nNEXT.set(this,next);\n}\nbooleancasItem(Ecmp,Eval){\nreturnITEM.compareAndSet(this,cmp,val);\n}\n}
内部属性只有item、next,是一种常见的队列节点结构,使用链接形式。ConcurrentLinkedQueue的算法实现基于Simple,Fast,andPracticalNon-BlockingandBlockingConcurrentQueueAlgorithms(MagedM.Michael,MichaelL.Scott),是此论文中非阻塞算法的一种修改实现。
在并发过程中,不使用任何锁如:synchronized、Lock等,而是纯粹通过CAS完成。同时,修改的部分主要有:基于JVM的回收环境,让元素能够被自动回收,不会存在ABA的循环引用导致回收不了的问题;还有,提供了remove操作,支持内部元素删除。
添加元素add(E)
按照前述节点链接结构,节点通过next链接到下一节点,而新节点的next总是链接到null中。我们可以这样实现,每次添加到队列时,从head开始遍历到最后一个节点,并通过CAS(next,E),这样,我们一定程度上完成了并发的安全性(这里尚未考虑删除情况)。
booleanadd(Ee){\nEt=head;Enext;\nwhile(t!=null){\nif(t.next!=null){\nt=t.next;//推进到下一节点\n}elseif(CAS(t.next,null,e)){//CAS更新,失败则从头开始\nreturntrue;\n}else{\nt=head;\n}\n}\n
因为队列是无界的,所以此方法将会一直重试直到添加成功,并永远返回true。这种实现虽然无锁,不过明显效率低下,所以我们在实现节点链接时,常常会引入head、tail指针来辅助推进,避免遍历的情况。同样地,ConcurrentLinkedQueue使用了这种优化手段。在初始化时,队列就默认初始化了head指针和tail指针,为dummy节点。
publicConcurrentLinkedQueue(){\nhead=tail=newNode<E>();\n}
但是,next引用与tail指针是两个不同的属性,而CAS在同一个时刻只能更新一个变量,如果想要确保next引用与tail指针的更新具备原子性,又回到了需要用到锁的境地。ConcurrentLinkedQueue是如何解决这个问题的?
答案是:延缓更新。在ConcurrentLinkedQueue的注释文档中,DougLea提及LinkedTransferQueue也同样使用了这种做法,称为:slackthreshold。在每一次插入操作之后,tail指针没有与next节点一起做原子性的更新操作。具体我们看下代码,在这里,我们模拟四个并发插入的线程:
/**\n*队列为无界,所以方法永远不返回false<br/>\n*/\npublicbooleanoffer(Ee){\nfinalNode<E>newNode=newNode<E>(Objects.requireNonNull(e));\n//p、q前后节点\nfor(Node<E>t=tail,p=t;;){\nNode<E>q=p.next;\nif(q==null){\n//并发CAS控制入口\n//pislastnode;p总是最后的节点(即tail)\nif(NEXT.compareAndSet(p,null,newNode)){\n//一次跳俩或多节点,允许失败。如果在if此处阻塞,将会出现tail滞后于head的情况?\n//(不一定是只有最后更新的next节点的线程才可以更新成功,也就是说更新后的tail不一定是最新最准确的)\n//如果不准确,后续的线程依然可以通过next往下推进\nif(p!=t)//hoptwonodesatatime;failureisOK\nTAIL.weakCompareAndSet(this,t,newNode);\nreturntrue;\n}\n//LostCASracetoanotherthread;re-readnext\n}\nelse…\n}\n}
①当线程一添加元素时,head=tail=newNode(),将p(=>pointer:指针)赋值为tail,将q赋值为p的下一个节点,即p=tail.next=null,所以进入后线程一能直接CAS插入节点,此时p=t=tail,所以第一个更新的线程满足p!=t,不会更新tail指针。
②并发的另外三个线程因为CAS失败,重新进入循环。此时q=tail.next!=null,进入第二个分支判断p==q,这是插入时针对并发删除的分支处理。在没有删除操作时,p总是不等于q,因为p=tail;q=p.next=tail.next
elseif(p==q)\n//此处是与删除操作的多线程处理\n//前后节点相等,说明该节点的已经被执行过移除操作\n//节点移除后,如果此时tail未变更(可能阻塞了),也组织不了它被移除的命运。\n//此时,我们要做的是将指针跳到head,一边它能够从头遍历。否则更新tail更好。\n//Wehavefallenofflist.Iftailisunchanged,it\n//willalsobeoff-list,inwhichcaseweneedto\n//jumptohead,fromwhichalllivenodesarealways\n//reachable.Elsethenewtailisabetterbet.\np=(t!=(t=tail))?t:head;
目前在单纯的并发插入操作中,q总是p的下一个节点(next),可能为新节点,也可能为null。在等于null时表示p为尾结点可进行插入,否则,表示尾结点已被其他线程更新,我们需要进入重试,对当前状态进行调整;直接进入第三个分支;
③此时并发的另外三个线程发现尾结点被更新后,它们执行操作:
else\n//此处是与添加操作的多线程并发处理\n//Checkfortailupdatesaftertwohops.\n//①发生在添加时,q=tail.next!=null(进入发现下一级节点不为null,无法添加),通过判断进入此分支,\n//并在这里p!=t,即在未进行移除操作,将p赋值为q(即移动导下一节点),而t=tail没有被执行,所以tail前于p。\n//②上一步骤p=q之后,循环回去操作,如果没有被其他线程添加,则成功执行,并通过p!=t,更新tail节点更新为newNode(最后的更新节点)\n//③如果上一步骤p=q之后,循环回去操作,有其他多个线程操作了,则q==null返回false。然后进入p==q返回false(只有删除操作能导致),\n//而在此方法内t!=(t=tail)总是返回true,所以这里更新了t后,尝试从新的tail开始,即p=t=tail,如果回去操作成功,则是在最末的tail更新\n//否则,重新进入①\n//此处一直负责推进next节点\np=(p!=t&&t!=(t=tail))?t:q;
这里有两个迷惑点:迷惑点1:p!=t似乎不可能成立,因为一开始就赋值p=t。大家需要注意,当前是在一个for(;;)操作中,这个判断条件并非是在当前此次循环中使用,所以第一次的判断总为false;代码执行了:
p=q
也就是说,当线程第一次发现尾结点被更新了(tail.next!=null),那么它将把p(指针)推进到下一个节点(这是推进过程),重新进入循环。三个线程并发推进,重新循环,只有一个能够CAS(tail.next,null,e)。执行成功的线程二,执行判断:
//这里有可能发生阻塞,或者丢失时间片\nif(p!=t)//hoptwonodesatatime;failureisOK\nTAIL.weakCompareAndSet(this,t,newNode);
此时的p是推进的下一节点,而t保持不变,所以p!=t为true,执行tail指针更新。这就是ConcurrentLinkedQueue注释中提及的,tail指针的更新是跳跃式更新,总是大于等于2个节点。并且有可能在判断分支后丢失时间片,未能立即执行CAS操作。
④我们将目光转移到线程三、四,它们因为与线程二争夺CAS失败,所以又一次回到第三个分支判断(它判断了第②依旧失败,因为重新循环后q总是被赋值为p的next节点,q=p.next):
p=(p!=t&&t!=(t=tail))?t:q;
这里我们看到了第二个疑惑点疑惑点2:因为pointer在上一次循环被推进(p=q),所以p!=t。进入第二个条件判断:
t!=(t=tail)
乍一看t!=(t=tail),这个表达式让人很冲动地认为应该总是返回false。因为t是对象(非原生类型),同时先执行括号内表达式,t被赋值为tail后与其自身比较,难道会出现t!=t?网上有些博文直接描述为,由于此处可能发生并发导致t被赋值后又被修改,这是不正确的,因为t是一个局部变量,不可能发生并发修改情况。但结果是:只要tail被更新过,即t!=tail,那么此表达式总是返回true。
我们在idea工具中测试:
可以看到对原生类型,idea可以直接推断并提示永远返回true;对于Object类型,查看其字节码:
原因是:Java是基于堆栈的程序语言,对关系运算法使用的是后缀表达式法——又称逆波兰表达式法。表达式的表现形式分三种:
前缀表达式:计算机常用,从右向左解析,能够消除括号\n中缀表达式:人类适用\n后缀表达式:计算机常用,基于堆栈的程序语言常用,能够消除括号,从左向右解析
对应的上述表达式:
前缀表达式:!=t=tailt\n中缀表达式:t!=(t=tail)\n后缀表达式:ttailt=!=
查看后缀表达式,也就是执行了:
先入栈t\n再入栈tail\n再入栈t\n取出栈顶t、tail,执行赋值\n入栈结果,假定为x(实际值为tail)\n取出栈顶t、x,执行比较,最终返回true。
后缀法的使用导致,该表达式的实际执行效果如下:
t!=tail;\nt=tail;
效果相等,不过要记住的是:先执行了赋值,只不过赋值后被存入到了变量表中,而操作数栈中已经有了旧值t,比较总是不相等的。如果是对象,那它就是比较的引用(对象堆地址)。
好了,疑惑点解决了,回到我们判断分支处表达式:
p=(p!=t&&t!=(t=tail))?t:q;
在线程三、四因抢占失败,进入比较t!=(t=tail)时,如果此时线程二CAS成功并且更新了尾指针(tail指针),那线程三、四的表达式执行就是返回true!
此时true&&true,结果p=t,我们前面说过,疑惑点二这里,实际上已经将t进行了赋值:t=tail,也就是说:在程序发现t已经不是最新的尾结点时(被其他线程执行插入后更新了),它将更新局部变量t和p为最新的tail指针,即获取最新的尾结点,又开始了跟线程一一样的循环处境,并尝试CAS下一节点。
什么时候t!=(t=tail)返回false呢?
就是线程二成功执行了CAS,但尚未执行CAS(tail)时(可能是发生了其他情况,如:系统阻塞了,时间片丢失了等等),那么线程三、四无法获取到最新的尾结点,它们只会推进到下一个节点。
这里就有可能导致,线程三、四推进到下一个节点后,线程四成功CAS并更新了tail指针,然后线程二获取到时间片执行CAS(tail)失败。不过tail指针的CAS失败是允许的。
最后,剩下线程三又回到分支三,并发现tail指针被更新了,执行tail更新,连续跳过两个节点。(如果是n个线程,极端情况下,可连续跳跃n个节点。)
这两者的不同是:推进到下一个节点的单步的,而更新为tail指针则可能实现跨多步的优化。这些或许就是DougLea提及的trick(技巧)吧。
获取元素poll()
ConcurrentLinkedQueue是FIFO队列,所以支持从首部poll出节点。内部实现依旧是采用CAS方式,不过此方法支持返回元素e或者null。
从前面我们可以看到,添加操作完全针对于tail指针;而获取元素则将从头部节点head开始获取,并保证向后推进,从不关心tail指针,代码如下:
//此方法没有与添加操作产生并发冲突,所以只关注删除操作\npublicEpoll(){\nrestartFromHead:for(;;){\n//每次循环都推进到下一节点,注意for循环中的p=q,就是推进\nfor(Node<E>h=head,p=h,q;;p=q){\nfinalEitem;\nif((item=p.item)!=null&&p.casItem(item,null)){\n//并发CAS控制入口\n//SuccessfulCASisthelinearizationpoint\n//foritemtoberemovedfromthisqueue.\nif(p!=h)//hoptwonodesatatime\n//如果p.next==null,则说明到了尾节点,直接返回当前指针p,\n//否则赋值给q返回将head更新为p.next(非空节点)\n//如果p.next!=null,那么head就是一次跳两个节点,否则就是跳一个节点\nupdateHead(h,((q=p.next)!=null)?q:p);\nreturnitem;\n}\nelseif((q=p.next)==null){\n//并发删除操作的退出口,如果已到尾部,如果!=null,将推进指针到下一个节点(for循环中的p=q)\n//指向下一个节点,并判断下一节点是否为null\n//如果为null,则直接更新头节点并返回null(表示已经到尾部了),这是一个辅助操作,不管有没有成功(优化操作)\nupdateHead(h,p);\nreturnnull;\n}\nelseif(p==q)\n//与其他删除线程的并发处理,非退出口,\n//已知节点被其他线程更新(无法获取到下一节点),从头开始(在updateHead中被操作了)\ncontinuerestartFromHead;\n}\n}
我们这里假定4个线程;并且已知p=>pointer,q为p的下个节点。①头部节点是dummy节点,第一次分支判断为false,进入分支二,执行推进到下个节点,如果下个节点不为null,进入分支三判断p==q,重新进入分支一,只有线程一成功更新
②如果此时线程一成功执行updateHead,则head指针将被跳跃式更新到下个节点(下个节点不为空时)或当前指针节点(下个节点为空时):
updateHead(h,((q=p.next)!=null)?q:p);\nfinalvoidupdateHead(Node<E>h,Node<E>p){\n//更新head指针,并将原节点的next引用到其自身\nif(h!=p&&HEAD.compareAndSet(this,h,p))\nNEXT.setRelease(h,h);\n}
更新节点成功以便其他线程能获取到最新的位置进行元素获取,同时,将原head节点的next引用到其自身,这是对算法的改进——使其支持回收环境的一个优化操作;同时此操作能够在后续的迭代方法中被使用,避免过期的head节点的迭代。
比如,我们可以先看方法succ,它用于获取后继节点,如果后继节点是本身,则直接更新为head指针:
finalNode<E>succ(Node<E>p){\n//推进到p=p.next,表明p之前的已被移除并self-ref\n//所以直接更新p=head(也不用再关注原来p之前[也可能是head之前]的元素)\nif(p==(p=p.next))\np=head;\nreturnp;\n}
可见,只有在初始化及下个节点为null时,head节点为dummyNode,其item为null,next执行null。③节点更新成功head指针后,直接返回item。
此时,其他线程在CAS失败后,将执行推进,因为线程一已将原head节点的next引用到自身,所以他们将进入分支三:
elseif(p==q)\ncontinuerestartFromHead;
后续线程又重新进入循环,从新head节点开始,只不过线程headnext指向其自身;线程二继续推进,发现item!=null满足,则尝试跨节点重链,发现head指针已被更新,无法通过cas(head,c,q)来进行更新,cas失败。此时方法:skipDeadNodes返回的就是指针p,而不是pred,但并不影响它继续推进。跳出skipDeadNodes后,将指针p指向下个节点q,q#item=3,跳出回到第一个循环,成功删除item=3。
这里就是skipDeadNodes的返回处理:由于存在并发的跨节点重链,对于失败的线程,说明有其他线程完成跨链操作,就不再为此操作,将pred进行更新,同时与后续的p推进(p=q)依旧形成前后关系,这是一个新的开始,不再需要考虑之前的死亡节点问题。
privateNode<E>skipDeadNodes(Node<E>pred,Node<E>c,Node<E>p,Node<E>q){\nif(q==null){\n//Neverunlinktrailingnode.\nif(c==p)returnpred;\nq=p;\n}\n//如果前置节点为null,也就是删除的是第一个节点,则返回可以形成dummy节点,\n//tryCasSuccessor失败,则返回p,意味着不需要此线程负责跨节点了,更新前置节点为p(过程中的最后一个死亡节点item=null)\n//tryCasSuccessor成功,如果pred==null,直接返回,意味着后续删除操作,可以直接根据pred=null继续更新head节点\n//如果pred!=null且item!=null,返回最后一个死亡节点p,让其能够在后续remove后,通过next指针重链。\n//此方法是返回一个处理后的前置节点,要么是null要么是普通的node,不管哪一个,出了此方法之后,会执行p=q,\n//让方法外部的pred又与p形成前后置关系\nreturn(tryCasSuccessor(pred,c,q)\n&&(pred==null||ITEM.get(pred)!=null))\n?pred:p;
线程一、三都执行完后,线程二执行poll的cas(head)失败,因为head已经被线程三进行了更新
整个remove操作需要考虑极多的并发情况,DougLea在没有算法伪代码参考的情况下,凭实力给出的算法实现却让我掉了不少头发,真是让人想给他献下膝盖。remove操作除了上述的实例,实际还有许多考虑的情况,如:
前节点是否dummy节点删除的是否尾结点poll操作超过remove时等等
大概是下图的一些情况:
ConcurrentLinkedQueue的删除操作在JDK8中的算法实现较目前JDK15的易阅读,JDK15中的实现是DougLea对算法进行一些refactor,因此根据有抽象及分支复杂的情况。除此上述的add、poll、remove之外,ConcurrentLinkedQueue的实现还提供了其他的Iterator等并发实现(也是DougLea自行实现),也是具有学习意义。但本文不再赘述(已经过长)。
总结:
ConcurrentLinkedQueue是基于已有算法的改进版,使其支持回收与删除操作;支持回收的操作有:将原head节点的next关联到其自身;ConcurrentLinkedQueue基于节点链接的形式,但不要求tail指针和head指针为最新状态,支持跳跃式更新,属于优化操作;跨节点连接可跨多个节点;ConcurrentLinkedQueue的添加只关注于节点next的CAS更新,这种做法保证了并发安全与高效;而除了item的CAS更新要求成功,其他的如tail、head均不要求成功更新存在一些trick的代码,如:t!=(t=tail)、p==(p=q)等
注:在idea中debugConcurrentLinkedQueue代码时,会因为idea编辑器自动调用toString等方法,遇到不符合认知逻辑的情况,需要更改idea配置来避免。
好了,关于c多线程访问网站源码分享和多线程编程c的问题到这里结束啦,希望可以解决您的问题哈!
