001/*
002 * Copyright (c) 2012-2017 Institut National des Sciences Appliquées de Lyon (INSA-Lyon)
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 */
009
010package gololang.concurrent.workers;
011
012import gololang.FunctionReference;
013import gololang.Predefined;
014
015import java.util.concurrent.ExecutorService;
016import java.util.concurrent.Executors;
017import java.util.concurrent.TimeUnit;
018
019/**
020 * A worker environment is an abstraction over a set of spawned functions that can asynchronously process messages
021 * sent through ports.
022 * <p>
023 * Each port is internally associated to a worker function and a messages queue. The worker environment maintains
024 * an executor that dispatches message processing jobs over its thread pool.
025 */
026public final class WorkerEnvironment {
027
028  private final ExecutorService executor;
029
030  /**
031   * Creates a new worker environment using an executor.
032   *
033   * @param executor the executor.
034   */
035  public WorkerEnvironment(ExecutorService executor) {
036    this.executor = executor;
037  }
038
039  /**
040   * @return a new worker environment with a cached thread pool.
041   * @see java.util.concurrent.Executors#newCachedThreadPool()
042   */
043  public static WorkerEnvironment newWorkerEnvironment() {
044    return new WorkerEnvironment(Executors.newCachedThreadPool());
045  }
046
047  /**
048   * @return a worker environment builder object.
049   */
050  public static Builder builder() {
051    return new Builder();
052  }
053
054  /**
055   * Worker environment builder objects exist mostly to provide a good-looking API in Golo.
056   */
057  public static class Builder {
058
059    /**
060     * @return a worker environment with a cached thread pool.
061     * @see java.util.concurrent.Executors#newCachedThreadPool()
062     */
063    public WorkerEnvironment withCachedThreadPool() {
064      return newWorkerEnvironment();
065    }
066
067    /**
068     * @param size the thread pool size.
069     * @return a worker environment with a fixed-size thread pool.
070     * @see Executors#newFixedThreadPool(int)
071     */
072    public WorkerEnvironment withFixedThreadPool(int size) {
073      return new WorkerEnvironment(Executors.newFixedThreadPool(size));
074    }
075
076    /**
077     * @return a worker environment with a fixed-size thread pool in the number of available processors.
078     * @see Executors#newFixedThreadPool(int)
079     * @see Runtime#availableProcessors()
080     */
081    public WorkerEnvironment withFixedThreadPool() {
082      return withFixedThreadPool(Runtime.getRuntime().availableProcessors());
083    }
084
085    /**
086     * @return a worker environment with a single executor thread.
087     */
088    public WorkerEnvironment withSingleThreadExecutor() {
089      return new WorkerEnvironment(Executors.newSingleThreadExecutor());
090    }
091  }
092
093  /**
094   * Spawns a worker function.
095   *
096   * @param func the worker target.
097   * @return a port to send messages to <code>handle</code>.
098   */
099  public Port spawn(FunctionReference func) {
100    return spawnWorker((WorkerFunction) Predefined.asInterfaceInstance(WorkerFunction.class, func));
101  }
102
103  /**
104   * Spawns a worker function.
105   *
106   * @param function the worker target.
107   * @return a port to send messages to <code>function</code>.
108   */
109  public Port spawnWorker(WorkerFunction function) {
110    return new Port(executor, function);
111  }
112
113  /**
114   * Shutdown the worker environment.
115   *
116   * @return the same worker environment object.
117   * @see java.util.concurrent.ExecutorService#shutdown()
118   */
119  public WorkerEnvironment shutdown() {
120    executor.shutdown();
121    return this;
122  }
123
124  /**
125   * Waits until all remaining messages have been processed.
126   *
127   * @param millis the delay.
128   * @see ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit)
129   */
130  public boolean awaitTermination(int millis) throws InterruptedException {
131    return awaitTermination((long) millis);
132  }
133
134  /**
135   * Waits until all remaining messages have been processed.
136   *
137   * @param millis the delay.
138   * @see ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit)
139   */
140  public boolean awaitTermination(long millis) throws InterruptedException {
141    return awaitTermination(millis, TimeUnit.MILLISECONDS);
142  }
143
144  /**
145   * Waits until all remaining messages have been processed.
146   *
147   * @param timeout the delay.
148   * @param unit    the delay time unit.
149   * @see ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit)
150   */
151  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
152    return executor.awaitTermination(timeout, unit);
153  }
154
155  /**
156   * @see java.util.concurrent.ExecutorService#isShutdown()
157   */
158  public boolean isShutdown() {
159    return executor.isShutdown();
160  }
161
162  /**
163   * @see java.util.concurrent.ExecutorService#isTerminated()
164   */
165  public boolean isTerminated() {
166    return executor.isTerminated();
167  }
168}