001/*
002 * Copyright (c) 2012-2021 Institut National des Sciences Appliquées de Lyon (INSA Lyon) and others
003 *
004 * This program and the accompanying materials are made available under the
005 * terms of the Eclipse Public License 2.0 which is available at
006 * http://www.eclipse.org/legal/epl-2.0.
007 *
008 * SPDX-License-Identifier: EPL-2.0
009 */
010
011package gololang.concurrent.workers;
012
013import gololang.FunctionReference;
014import gololang.Predefined;
015
016import java.util.concurrent.ExecutorService;
017import java.util.concurrent.Executors;
018import java.util.concurrent.TimeUnit;
019
020/**
021 * A worker environment is an abstraction over a set of spawned functions that can asynchronously process messages
022 * sent through ports.
023 * <p>
024 * Each port is internally associated to a worker function and a messages queue. The worker environment maintains
025 * an executor that dispatches message processing jobs over its thread pool.
026 */
027public final class WorkerEnvironment {
028
029  private final ExecutorService executor;
030
031  /**
032   * Creates a new worker environment using an executor.
033   *
034   * @param executor the executor.
035   */
036  public WorkerEnvironment(ExecutorService executor) {
037    this.executor = executor;
038  }
039
040  /**
041   * @return a new worker environment with a cached thread pool.
042   * @see java.util.concurrent.Executors#newCachedThreadPool()
043   */
044  public static WorkerEnvironment newWorkerEnvironment() {
045    return new WorkerEnvironment(Executors.newCachedThreadPool());
046  }
047
048  /**
049   * @return a worker environment builder object.
050   */
051  public static Builder builder() {
052    return new Builder();
053  }
054
055  /**
056   * Worker environment builder objects exist mostly to provide a good-looking API in Golo.
057   */
058  public static class Builder {
059
060    /**
061     * @return a worker environment with a cached thread pool.
062     * @see java.util.concurrent.Executors#newCachedThreadPool()
063     */
064    public WorkerEnvironment withCachedThreadPool() {
065      return newWorkerEnvironment();
066    }
067
068    /**
069     * @param size the thread pool size.
070     * @return a worker environment with a fixed-size thread pool.
071     * @see Executors#newFixedThreadPool(int)
072     */
073    public WorkerEnvironment withFixedThreadPool(int size) {
074      return new WorkerEnvironment(Executors.newFixedThreadPool(size));
075    }
076
077    /**
078     * @return a worker environment with a fixed-size thread pool in the number of available processors.
079     * @see Executors#newFixedThreadPool(int)
080     * @see Runtime#availableProcessors()
081     */
082    public WorkerEnvironment withFixedThreadPool() {
083      return withFixedThreadPool(Runtime.getRuntime().availableProcessors());
084    }
085
086    /**
087     * @return a worker environment with a single executor thread.
088     */
089    public WorkerEnvironment withSingleThreadExecutor() {
090      return new WorkerEnvironment(Executors.newSingleThreadExecutor());
091    }
092  }
093
094  /**
095   * Spawns a worker function.
096   *
097   * @param func the worker target.
098   * @return a port to send messages to <code>handle</code>.
099   */
100  public Port spawn(FunctionReference func) {
101    return spawnWorker((WorkerFunction) Predefined.asInterfaceInstance(WorkerFunction.class, func));
102  }
103
104  /**
105   * Spawns a worker function.
106   *
107   * @param function the worker target.
108   * @return a port to send messages to <code>function</code>.
109   */
110  public Port spawnWorker(WorkerFunction function) {
111    return new Port(executor, function);
112  }
113
114  /**
115   * Shutdown the worker environment.
116   *
117   * @return the same worker environment object.
118   * @see java.util.concurrent.ExecutorService#shutdown()
119   */
120  public WorkerEnvironment shutdown() {
121    executor.shutdown();
122    return this;
123  }
124
125  /**
126   * Waits until all remaining messages have been processed.
127   *
128   * @param millis the delay.
129   * @see ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit)
130   */
131  public boolean awaitTermination(int millis) throws InterruptedException {
132    return awaitTermination((long) millis);
133  }
134
135  /**
136   * Waits until all remaining messages have been processed.
137   *
138   * @param millis the delay.
139   * @see ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit)
140   */
141  public boolean awaitTermination(long millis) throws InterruptedException {
142    return awaitTermination(millis, TimeUnit.MILLISECONDS);
143  }
144
145  /**
146   * Waits until all remaining messages have been processed.
147   *
148   * @param timeout the delay.
149   * @param unit    the delay time unit.
150   * @see ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit)
151   */
152  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
153    return executor.awaitTermination(timeout, unit);
154  }
155
156  /**
157   * @see java.util.concurrent.ExecutorService#isShutdown()
158   */
159  public boolean isShutdown() {
160    return executor.isShutdown();
161  }
162
163  /**
164   * @see java.util.concurrent.ExecutorService#isTerminated()
165   */
166  public boolean isTerminated() {
167    return executor.isTerminated();
168  }
169}