001/* 002 * Copyright (c) 2012-2017 Institut National des Sciences Appliquées de Lyon (INSA-Lyon) 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 */ 009 010package gololang.concurrent.workers; 011 012import gololang.FunctionReference; 013import gololang.Predefined; 014 015import java.util.concurrent.ExecutorService; 016import java.util.concurrent.Executors; 017import java.util.concurrent.TimeUnit; 018 019/** 020 * A worker environment is an abstraction over a set of spawned functions that can asynchronously process messages 021 * sent through ports. 022 * <p> 023 * Each port is internally associated to a worker function and a messages queue. The worker environment maintains 024 * an executor that dispatches message processing jobs over its thread pool. 025 */ 026public final class WorkerEnvironment { 027 028 private final ExecutorService executor; 029 030 /** 031 * Creates a new worker environment using an executor. 032 * 033 * @param executor the executor. 034 */ 035 public WorkerEnvironment(ExecutorService executor) { 036 this.executor = executor; 037 } 038 039 /** 040 * @return a new worker environment with a cached thread pool. 041 * @see java.util.concurrent.Executors#newCachedThreadPool() 042 */ 043 public static WorkerEnvironment newWorkerEnvironment() { 044 return new WorkerEnvironment(Executors.newCachedThreadPool()); 045 } 046 047 /** 048 * @return a worker environment builder object. 049 */ 050 public static Builder builder() { 051 return new Builder(); 052 } 053 054 /** 055 * Worker environment builder objects exist mostly to provide a good-looking API in Golo. 056 */ 057 public static class Builder { 058 059 /** 060 * @return a worker environment with a cached thread pool. 061 * @see java.util.concurrent.Executors#newCachedThreadPool() 062 */ 063 public WorkerEnvironment withCachedThreadPool() { 064 return newWorkerEnvironment(); 065 } 066 067 /** 068 * @param size the thread pool size. 069 * @return a worker environment with a fixed-size thread pool. 070 * @see Executors#newFixedThreadPool(int) 071 */ 072 public WorkerEnvironment withFixedThreadPool(int size) { 073 return new WorkerEnvironment(Executors.newFixedThreadPool(size)); 074 } 075 076 /** 077 * @return a worker environment with a fixed-size thread pool in the number of available processors. 078 * @see Executors#newFixedThreadPool(int) 079 * @see Runtime#availableProcessors() 080 */ 081 public WorkerEnvironment withFixedThreadPool() { 082 return withFixedThreadPool(Runtime.getRuntime().availableProcessors()); 083 } 084 085 /** 086 * @return a worker environment with a single executor thread. 087 */ 088 public WorkerEnvironment withSingleThreadExecutor() { 089 return new WorkerEnvironment(Executors.newSingleThreadExecutor()); 090 } 091 } 092 093 /** 094 * Spawns a worker function. 095 * 096 * @param func the worker target. 097 * @return a port to send messages to <code>handle</code>. 098 */ 099 public Port spawn(FunctionReference func) { 100 return spawnWorker((WorkerFunction) Predefined.asInterfaceInstance(WorkerFunction.class, func)); 101 } 102 103 /** 104 * Spawns a worker function. 105 * 106 * @param function the worker target. 107 * @return a port to send messages to <code>function</code>. 108 */ 109 public Port spawnWorker(WorkerFunction function) { 110 return new Port(executor, function); 111 } 112 113 /** 114 * Shutdown the worker environment. 115 * 116 * @return the same worker environment object. 117 * @see java.util.concurrent.ExecutorService#shutdown() 118 */ 119 public WorkerEnvironment shutdown() { 120 executor.shutdown(); 121 return this; 122 } 123 124 /** 125 * Waits until all remaining messages have been processed. 126 * 127 * @param millis the delay. 128 * @see ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit) 129 */ 130 public boolean awaitTermination(int millis) throws InterruptedException { 131 return awaitTermination((long) millis); 132 } 133 134 /** 135 * Waits until all remaining messages have been processed. 136 * 137 * @param millis the delay. 138 * @see ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit) 139 */ 140 public boolean awaitTermination(long millis) throws InterruptedException { 141 return awaitTermination(millis, TimeUnit.MILLISECONDS); 142 } 143 144 /** 145 * Waits until all remaining messages have been processed. 146 * 147 * @param timeout the delay. 148 * @param unit the delay time unit. 149 * @see ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit) 150 */ 151 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 152 return executor.awaitTermination(timeout, unit); 153 } 154 155 /** 156 * @see java.util.concurrent.ExecutorService#isShutdown() 157 */ 158 public boolean isShutdown() { 159 return executor.isShutdown(); 160 } 161 162 /** 163 * @see java.util.concurrent.ExecutorService#isTerminated() 164 */ 165 public boolean isTerminated() { 166 return executor.isTerminated(); 167 } 168}