001/* 002 * Copyright (c) 2012-2017 Institut National des Sciences Appliquées de Lyon (INSA-Lyon) 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 */ 009 010package gololang.concurrent.workers; 011 012import java.util.concurrent.ConcurrentLinkedQueue; 013import java.util.concurrent.ExecutorService; 014import java.util.concurrent.atomic.AtomicBoolean; 015 016/** 017 * A port is the communication endpoint to a worker function. 018 * <p> 019 * A port is obtained from a worker environment when spawning a function. It can then be used to send messages that 020 * will be eventually processed by the target function. Messages are being put in a first-in, first-out queue. 021 */ 022public final class Port { 023 024 private final ExecutorService executor; 025 private final WorkerFunction function; 026 027 private final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<>(); 028 private final AtomicBoolean running = new AtomicBoolean(false); 029 030 /** 031 * Port constructor. 032 * 033 * @param executor the executor to dispatch the asynchronous message handling jobs to. 034 * @param function the target worker function. 035 */ 036 public Port(ExecutorService executor, WorkerFunction function) { 037 this.executor = executor; 038 this.function = function; 039 } 040 041 private final Runnable runner = new Runnable() { 042 @Override 043 public void run() { 044 if (running.get()) { 045 try { 046 function.apply(queue.poll()); 047 } finally { 048 running.set(false); 049 scheduleNext(); 050 } 051 } 052 } 053 }; 054 055 private void scheduleNext() { 056 if (!queue.isEmpty() && running.compareAndSet(false, true)) { 057 try { 058 executor.execute(runner); 059 } catch (Throwable t) { 060 running.set(false); 061 throw t; 062 } 063 } 064 } 065 066 /** 067 * Sends a message to the target worker function. This method returns immediately as message processing is 068 * asynchronous. 069 * 070 * @param message the message of any type. 071 * @return the same port object. 072 */ 073 public Port send(Object message) { 074 queue.offer(message); 075 scheduleNext(); 076 return this; 077 } 078}