001/* 002 * Copyright (c) 2012-2021 Institut National des Sciences Appliquées de Lyon (INSA Lyon) and others 003 * 004 * This program and the accompanying materials are made available under the 005 * terms of the Eclipse Public License 2.0 which is available at 006 * http://www.eclipse.org/legal/epl-2.0. 007 * 008 * SPDX-License-Identifier: EPL-2.0 009 */ 010 011package gololang.concurrent.workers; 012 013import gololang.FunctionReference; 014import gololang.Predefined; 015 016import java.util.concurrent.ExecutorService; 017import java.util.concurrent.Executors; 018import java.util.concurrent.TimeUnit; 019 020/** 021 * A worker environment is an abstraction over a set of spawned functions that can asynchronously process messages 022 * sent through ports. 023 * <p> 024 * Each port is internally associated to a worker function and a messages queue. The worker environment maintains 025 * an executor that dispatches message processing jobs over its thread pool. 026 */ 027public final class WorkerEnvironment { 028 029 private final ExecutorService executor; 030 031 /** 032 * Creates a new worker environment using an executor. 033 * 034 * @param executor the executor. 035 */ 036 public WorkerEnvironment(ExecutorService executor) { 037 this.executor = executor; 038 } 039 040 /** 041 * @return a new worker environment with a cached thread pool. 042 * @see java.util.concurrent.Executors#newCachedThreadPool() 043 */ 044 public static WorkerEnvironment newWorkerEnvironment() { 045 return new WorkerEnvironment(Executors.newCachedThreadPool()); 046 } 047 048 /** 049 * @return a worker environment builder object. 050 */ 051 public static Builder builder() { 052 return new Builder(); 053 } 054 055 /** 056 * Worker environment builder objects exist mostly to provide a good-looking API in Golo. 057 */ 058 public static class Builder { 059 060 /** 061 * @return a worker environment with a cached thread pool. 062 * @see java.util.concurrent.Executors#newCachedThreadPool() 063 */ 064 public WorkerEnvironment withCachedThreadPool() { 065 return newWorkerEnvironment(); 066 } 067 068 /** 069 * @param size the thread pool size. 070 * @return a worker environment with a fixed-size thread pool. 071 * @see Executors#newFixedThreadPool(int) 072 */ 073 public WorkerEnvironment withFixedThreadPool(int size) { 074 return new WorkerEnvironment(Executors.newFixedThreadPool(size)); 075 } 076 077 /** 078 * @return a worker environment with a fixed-size thread pool in the number of available processors. 079 * @see Executors#newFixedThreadPool(int) 080 * @see Runtime#availableProcessors() 081 */ 082 public WorkerEnvironment withFixedThreadPool() { 083 return withFixedThreadPool(Runtime.getRuntime().availableProcessors()); 084 } 085 086 /** 087 * @return a worker environment with a single executor thread. 088 */ 089 public WorkerEnvironment withSingleThreadExecutor() { 090 return new WorkerEnvironment(Executors.newSingleThreadExecutor()); 091 } 092 } 093 094 /** 095 * Spawns a worker function. 096 * 097 * @param func the worker target. 098 * @return a port to send messages to <code>handle</code>. 099 */ 100 public Port spawn(FunctionReference func) { 101 return spawnWorker((WorkerFunction) Predefined.asInterfaceInstance(WorkerFunction.class, func)); 102 } 103 104 /** 105 * Spawns a worker function. 106 * 107 * @param function the worker target. 108 * @return a port to send messages to <code>function</code>. 109 */ 110 public Port spawnWorker(WorkerFunction function) { 111 return new Port(executor, function); 112 } 113 114 /** 115 * Shutdown the worker environment. 116 * 117 * @return the same worker environment object. 118 * @see java.util.concurrent.ExecutorService#shutdown() 119 */ 120 public WorkerEnvironment shutdown() { 121 executor.shutdown(); 122 return this; 123 } 124 125 /** 126 * Waits until all remaining messages have been processed. 127 * 128 * @param millis the delay. 129 * @see ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit) 130 */ 131 public boolean awaitTermination(int millis) throws InterruptedException { 132 return awaitTermination((long) millis); 133 } 134 135 /** 136 * Waits until all remaining messages have been processed. 137 * 138 * @param millis the delay. 139 * @see ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit) 140 */ 141 public boolean awaitTermination(long millis) throws InterruptedException { 142 return awaitTermination(millis, TimeUnit.MILLISECONDS); 143 } 144 145 /** 146 * Waits until all remaining messages have been processed. 147 * 148 * @param timeout the delay. 149 * @param unit the delay time unit. 150 * @see ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit) 151 */ 152 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 153 return executor.awaitTermination(timeout, unit); 154 } 155 156 /** 157 * @see java.util.concurrent.ExecutorService#isShutdown() 158 */ 159 public boolean isShutdown() { 160 return executor.isShutdown(); 161 } 162 163 /** 164 * @see java.util.concurrent.ExecutorService#isTerminated() 165 */ 166 public boolean isTerminated() { 167 return executor.isTerminated(); 168 } 169}