001package com.mrivanplays.process; 002 003import java.util.Set; 004import java.util.concurrent.ConcurrentHashMap; 005import java.util.concurrent.CountDownLatch; 006import java.util.concurrent.Executor; 007import java.util.function.Consumer; 008 009/** 010 * Represents a completion of all the processes. 011 * 012 * @since 0.0.1 013 * @author <a href="mailto:[email protected]">Ivan Pekov</a> 014 */ 015public final class ProcessesCompletion { 016 017 /** 018 * Runs the specified {@code callback} when all the specified {@code completions} complete. 019 * 020 * @param async whether this method to be called asynchronously or not 021 * @param callback a {@link Consumer} of the errors (if any). if there are no errors, the value 022 * will be an empty set. 023 * @param completions completions to wait for 024 */ 025 public static void whenAllDone( 026 boolean async, Consumer<Set<ProcessException>> callback, ProcessesCompletion... completions) { 027 CountDownLatch latch = new CountDownLatch(completions.length); 028 Set<ProcessException> errors = ConcurrentHashMap.newKeySet(); 029 for (ProcessesCompletion completion : completions) { 030 completion.whenDoneAsync( 031 (err) -> { 032 errors.addAll(err); 033 latch.countDown(); 034 }); 035 } 036 if (async) { 037 // grab an async executor from somewhere 038 completions[0].asyncExecutor.execute( 039 () -> { 040 try { 041 latch.await(); 042 callback.accept(errors); 043 } catch (InterruptedException e) { 044 Thread.currentThread().interrupt(); 045 } 046 }); 047 } else { 048 try { 049 latch.await(); 050 callback.accept(errors); 051 } catch (InterruptedException e) { 052 Thread.currentThread().interrupt(); 053 } 054 } 055 } 056 057 /** 058 * Called when all {@link Process Proccesses} are done. 059 * 060 * <p><b>WARNING: THREAD BLOCKING METHOD.</b> 061 * 062 * @param callback a {@link Consumer} of the errors (if any). if there are no errors, the value 063 * will be an empty set. 064 */ 065 public void whenDone(Consumer<Set<ProcessException>> callback) { 066 if (this.latch.getCount() == 0) { 067 callback.accept(this.errors); 068 return; 069 } 070 try { 071 this.latch.await(); 072 callback.accept(this.errors); 073 } catch (InterruptedException e) { 074 Thread.currentThread().interrupt(); 075 } 076 } 077 078 /** 079 * Called when all {@link Process Processes} are done. The difference from {@link 080 * #whenDone(Consumer)} is that the waiting and the {@code callback} call is done asynchronously. 081 * 082 * @param callback see {@link #whenDone(Consumer)} 083 * @see #whenDone(Consumer) 084 */ 085 public void whenDoneAsync(final Consumer<Set<ProcessException>> callback) { 086 this.asyncExecutor.execute(() -> this.whenDone(callback)); 087 } 088 089 // ====================================== 090 091 private final CountDownLatch latch; 092 private final Executor asyncExecutor; 093 private Set<ProcessException> errors; 094 095 ProcessesCompletion(int processCount, Executor asyncExecutor) { 096 this.latch = new CountDownLatch(processCount); 097 this.asyncExecutor = asyncExecutor; 098 this.errors = ConcurrentHashMap.newKeySet(); 099 } 100 101 void countDown() { 102 this.latch.countDown(); 103 } 104 105 void countDownWithError(ProcessException error) { 106 this.errors.add(error); 107 this.latch.countDown(); 108 } 109}