How can I create a concurrent List instance, where I can access elements by index? Does the JDK have any classes or factory methods I can use?
This question is related to
java
list
concurrency
CopyOnWriteArrayList is a thread-safe variant of ArrayList in which all mutative operations (add, set, and so on) are implemented by making a fresh copy of the underlying array.
CopyOnWriteArrayList is a concurrent alternative of synchronized List implements List interface and its part of java.util.concurrent packageand its a thread-safe collection.
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable
CopyOnWriteArrayList is fail-safe and doesn't throw ConcurrentModificationException when underlying CopyOnWriteArrayList is modified during Iteration use a separate copy of ArrayList.
This is ordinarily too costly because copy array involved every update operation a cloned copy will be created. CopyOnWriteArrayList is the best choice only for frequent read operation.
/**
* Returns a shallow copy of this list. (The elements themselves
* are not copied.)
*
* @return a clone of this list
*/
public Object clone() {
try {
@SuppressWarnings("unchecked")
CopyOnWriteArrayList<E> clone =
(CopyOnWriteArrayList<E>) super.clone();
clone.resetLock();
return clone;
} catch (CloneNotSupportedException e) {
// this shouldn't happen, since we are Cloneable
throw new InternalError();
}
}
You have these options:
Collections.synchronizedList()
: you can wrap any List
implementation (ArrayList
, LinkedList
or a 3rd-party list). Access to every method (reading and writing) will be protected using synchronized
. When using iterator()
or enhanced for loop, you must manually synchronize the whole iteration. While iterating, other threads are fully blocked even from reading. You can also synchronize separately for each hasNext
and next
calls, but then ConcurrentModificationException
is possible.
CopyOnWriteArrayList
: it's expensive to modify, but wait-free to read. Iterators never throw ConcurrentModificationException
, they return a snapshot of the list at the moment of iterator creation even if the list is modified by another thread while iterating. Useful for infrequently updated lists. Bulk operations like addAll
are preferred for updates - the internal array is copied less many times.
Vector
: very much like synchronizedList(new ArrayList<>())
, but iteration is synchronized too. However, iterators can throw ConcurrentModificationException
if the vector is modified by another thread while iterating.
Other options:
Collections.unmodifiableList()
: lock-free, thread-safe, but non-modifiableList.of
& List.copyOf
: Another non-modifiable list in Java 9 and later.Queue
or Deque
might be an alternative if you only add/remove at the ends of the list and iterate the list. There's no access by index and no adding/removing at arbitrary places. They have multiple concurrent implementations with better performance and better concurrent access, but it's beyond the scope of this question. You can also have a look at JCTools, they contain more performant queue implementations specialized for single consumer or single producer.You can very well use Collections.synchronizedList(List) if all you need is simple invocation synchronization:
List<Object> objList = Collections.synchronizedList(new ArrayList<Object>());
ConcurrentLinkedQueue
If you don't care about having index-based access and just want the insertion-order-preserving characteristics of a List, you could consider a java.util.concurrent.ConcurrentLinkedQueue
. Since it implements Iterable, once you've finished adding all the items, you can loop over the contents using the enhanced for syntax:
Queue<String> globalQueue = new ConcurrentLinkedQueue<String>();
//Multiple threads can safely call globalQueue.add()...
for (String href : globalQueue) {
//do something with href
}
If you never plan to delete elements from the list (since this requires changing the index of all elements after the deleted element), then you can use ConcurrentSkipListMap<Integer, T>
in place of ArrayList<T>
, e.g.
NavigableMap<Integer, T> map = new ConcurrentSkipListMap<>();
This will allow you to add items to the end of the "list" as follows, as long as there is only one writer thread (otherwise there is a race condition between map.size()
and map.put()
):
// Add item to end of the "list":
map.put(map.size(), item);
You can also obviously modify the value of any item in the "list" (i.e. the map) by simply calling map.put(index, item)
.
The average cost for putting items into the map or retrieving them by index is O(log(n)), and ConcurrentSkipListMap
is lock-free, which makes it significantly better than say Vector
(the old synchronized version of ArrayList
).
You can iterate back and forth through the "list" by using the methods of the NavigableMap
interface.
You could wrap all the above into a class that implements the List
interface, as long as you understand the race condition caveats (or you could synchronize just the writer methods) -- and you would need to throw an unsupported operation exception for the remove
methods. There's quite a bit of boilerplate needed to implement all the required methods, but here's a quick attempt at an implementation.
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
public class ConcurrentAddOnlyList<V> implements List<V> {
private NavigableMap<Integer, V> map = new ConcurrentSkipListMap<>();
@Override
public int size() {
return map.size();
}
@Override
public boolean isEmpty() {
return map.isEmpty();
}
@Override
public boolean contains(Object o) {
return map.values().contains(o);
}
@Override
public Iterator<V> iterator() {
return map.values().iterator();
}
@Override
public Object[] toArray() {
return map.values().toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return map.values().toArray(a);
}
@Override
public V get(int index) {
return map.get(index);
}
@Override
public boolean containsAll(Collection<?> c) {
return map.values().containsAll(c);
}
@Override
public int indexOf(Object o) {
for (Entry<Integer, V> ent : map.entrySet()) {
if (Objects.equals(ent.getValue(), o)) {
return ent.getKey();
}
}
return -1;
}
@Override
public int lastIndexOf(Object o) {
for (Entry<Integer, V> ent : map.descendingMap().entrySet()) {
if (Objects.equals(ent.getValue(), o)) {
return ent.getKey();
}
}
return -1;
}
@Override
public ListIterator<V> listIterator(int index) {
return new ListIterator<V>() {
private int currIdx = 0;
@Override
public boolean hasNext() {
return currIdx < map.size();
}
@Override
public V next() {
if (currIdx >= map.size()) {
throw new IllegalArgumentException(
"next() called at end of list");
}
return map.get(currIdx++);
}
@Override
public boolean hasPrevious() {
return currIdx > 0;
}
@Override
public V previous() {
if (currIdx <= 0) {
throw new IllegalArgumentException(
"previous() called at beginning of list");
}
return map.get(--currIdx);
}
@Override
public int nextIndex() {
return currIdx + 1;
}
@Override
public int previousIndex() {
return currIdx - 1;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public void set(V e) {
// Might change size of map if currIdx == map.size(),
// so need to synchronize
synchronized (map) {
map.put(currIdx, e);
}
}
@Override
public void add(V e) {
synchronized (map) {
// Insertion is not supported except at end of list
if (currIdx < map.size()) {
throw new UnsupportedOperationException();
}
map.put(currIdx++, e);
}
}
};
}
@Override
public ListIterator<V> listIterator() {
return listIterator(0);
}
@Override
public List<V> subList(int fromIndex, int toIndex) {
// TODO Auto-generated method stub
return null;
}
@Override
public boolean add(V e) {
synchronized (map) {
map.put(map.size(), e);
return true;
}
}
@Override
public boolean addAll(Collection<? extends V> c) {
synchronized (map) {
for (V val : c) {
add(val);
}
return true;
}
}
@Override
public V set(int index, V element) {
synchronized (map) {
if (index < 0 || index > map.size()) {
throw new IllegalArgumentException("Index out of range");
}
return map.put(index, element);
}
}
@Override
public void clear() {
synchronized (map) {
map.clear();
}
}
@Override
public synchronized void add(int index, V element) {
synchronized (map) {
if (index < map.size()) {
// Insertion is not supported except at end of list
throw new UnsupportedOperationException();
} else if (index < 0 || index > map.size()) {
throw new IllegalArgumentException("Index out of range");
}
// index == map.size()
add(element);
}
}
@Override
public synchronized boolean addAll(
int index, Collection<? extends V> c) {
synchronized (map) {
if (index < map.size()) {
// Insertion is not supported except at end of list
throw new UnsupportedOperationException();
} else if (index < 0 || index > map.size()) {
throw new IllegalArgumentException("Index out of range");
}
// index == map.size()
for (V val : c) {
add(val);
}
return true;
}
}
@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}
@Override
public V remove(int index) {
throw new UnsupportedOperationException();
}
@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException();
}
@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException();
}
}
Don't forget that even with the writer thread synchronization as shown above, you need to be careful not to run into race conditions that might cause you to drop items, if for example you try to iterate through a list in a reader thread while a writer thread is adding to the end of the list.
You can even use ConcurrentSkipListMap
as a double-ended list, as long as you don't need the key of each item to represent the actual position within the list (i.e. adding to the beginning of the list will assign items negative keys). (The same race condition caveat applies here, i.e. there should be only one writer thread.)
// Add item after last item in the "list":
map.put(map.isEmpty() ? 0 : map.lastKey() + 1, item);
// Add item before first item in the "list":
map.put(map.isEmpty() ? 0 : map.firstKey() - 1, item);
Mostly if you need a concurrent list it is inside a model object (as you should not use abstract data types like a list to represent a node in a application model graph) or it is part of a particular service, you can synchronize the access yourself.
class MyClass {
List<MyType> myConcurrentList = new ArrayList<>();
void myMethod() {
synchronzied(myConcurrentList) {
doSomethingWithList;
}
}
}
Often this is enough to get you going. If you need to iterate, iterate over a copy of the list not the list itself and only synchronize the part where you copy the list not while you are iterating over it.
Also when concurrently working on a list you usually do something more than just adding or removing or copying, meaning that the operation becomes meaningful enough to warrent its own method and the list becomes member of a special class representing just this particular list with thread safe behavior.
Even if I agree that a concurrent list implementation is needed and Vector / Collections.sychronizeList(list) do not do the trick as for sure you need something like compareAndAdd or compareAndRemove or get(..., ifAbsentDo), even if you have a ConcurrentList implementation developers often introduce bugs by not considering what is the true transaction when working with a concurrent lists (and maps).
These scenarios where the transactions are too small for what the intended purpose of the interaction with a concurrent ADT (abstract data type) always lead to me hide the list in a special class and synchronizing access to this class objects method using the synchronized on the method level. Its the only way to be sure that the transactions are correct.
I have seen too many bugs to do it any other way - at least if the code is important and handles something like money or security or guarantees some quality of service measures (e.g sending message at least once and only once).
Because the act of acquiring the position and getting the element from the given position naturally requires some locking (you can't have the list have structural changes between those two operations).
The very idea of a concurrent collection is that each operation on its own is atomic and can be done without explicit locking/synchronization.
Therefore getting the element at position n
from a given List
as an atomic operation doesn't make too much sense in a situation where concurrent access is anticipated.
Source: Stackoverflow.com