001package com.mrivanplays.process;
002
003import java.util.Set;
004import java.util.concurrent.ConcurrentHashMap;
005import java.util.concurrent.CountDownLatch;
006import java.util.concurrent.Executor;
007import java.util.function.Consumer;
008
009/**
010 * Represents a completion of all the processes.
011 *
012 * @since 0.0.1
013 * @author <a href="mailto:[email protected]">Ivan Pekov</a>
014 */
015public final class ProcessesCompletion {
016
017  /**
018   * Runs the specified {@code callback} when all the specified {@code completions} complete.
019   *
020   * @param async whether this method to be called asynchronously or not
021   * @param callback a {@link Consumer} of the errors (if any). if there are no errors, the value
022   *     will be an empty set.
023   * @param completions completions to wait for
024   */
025  public static void whenAllDone(
026      boolean async, Consumer<Set<ProcessException>> callback, ProcessesCompletion... completions) {
027    CountDownLatch latch = new CountDownLatch(completions.length);
028    Set<ProcessException> errors = ConcurrentHashMap.newKeySet();
029    for (ProcessesCompletion completion : completions) {
030      completion.whenDoneAsync(
031          (err) -> {
032            errors.addAll(err);
033            latch.countDown();
034          });
035    }
036    if (async) {
037      // grab an async executor from somewhere
038      completions[0].asyncExecutor.execute(
039          () -> {
040            try {
041              latch.await();
042              callback.accept(errors);
043            } catch (InterruptedException e) {
044              Thread.currentThread().interrupt();
045            }
046          });
047    } else {
048      try {
049        latch.await();
050        callback.accept(errors);
051      } catch (InterruptedException e) {
052        Thread.currentThread().interrupt();
053      }
054    }
055  }
056
057  /**
058   * Called when all {@link Process Proccesses} are done.
059   *
060   * <p><b>WARNING: THREAD BLOCKING METHOD.</b>
061   *
062   * @param callback a {@link Consumer} of the errors (if any). if there are no errors, the value
063   *     will be an empty set.
064   */
065  public void whenDone(Consumer<Set<ProcessException>> callback) {
066    if (this.latch.getCount() == 0) {
067      callback.accept(this.errors);
068      return;
069    }
070    try {
071      this.latch.await();
072      callback.accept(this.errors);
073    } catch (InterruptedException e) {
074      Thread.currentThread().interrupt();
075    }
076  }
077
078  /**
079   * Called when all {@link Process Processes} are done. The difference from {@link
080   * #whenDone(Consumer)} is that the waiting and the {@code callback} call is done asynchronously.
081   *
082   * @param callback see {@link #whenDone(Consumer)}
083   * @see #whenDone(Consumer)
084   */
085  public void whenDoneAsync(final Consumer<Set<ProcessException>> callback) {
086    this.asyncExecutor.execute(() -> this.whenDone(callback));
087  }
088
089  // ======================================
090
091  private final CountDownLatch latch;
092  private final Executor asyncExecutor;
093  private Set<ProcessException> errors;
094
095  ProcessesCompletion(int processCount, Executor asyncExecutor) {
096    this.latch = new CountDownLatch(processCount);
097    this.asyncExecutor = asyncExecutor;
098    this.errors = ConcurrentHashMap.newKeySet();
099  }
100
101  void countDown() {
102    this.latch.countDown();
103  }
104
105  void countDownWithError(ProcessException error) {
106    this.errors.add(error);
107    this.latch.countDown();
108  }
109}