001/* 002 * Copyright (c) 2012-2021 Institut National des Sciences Appliquées de Lyon (INSA Lyon) and others 003 * 004 * This program and the accompanying materials are made available under the 005 * terms of the Eclipse Public License 2.0 which is available at 006 * http://www.eclipse.org/legal/epl-2.0. 007 * 008 * SPDX-License-Identifier: EPL-2.0 009 */ 010 011package gololang.concurrent.workers; 012 013import java.util.concurrent.ConcurrentLinkedQueue; 014import java.util.concurrent.ExecutorService; 015import java.util.concurrent.atomic.AtomicBoolean; 016 017/** 018 * A port is the communication endpoint to a worker function. 019 * <p> 020 * A port is obtained from a worker environment when spawning a function. It can then be used to send messages that 021 * will be eventually processed by the target function. Messages are being put in a first-in, first-out queue. 022 */ 023public final class Port { 024 025 private final ExecutorService executor; 026 private final WorkerFunction function; 027 028 private final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<>(); 029 private final AtomicBoolean running = new AtomicBoolean(false); 030 031 /** 032 * Port constructor. 033 * 034 * @param executor the executor to dispatch the asynchronous message handling jobs to. 035 * @param function the target worker function. 036 */ 037 public Port(ExecutorService executor, WorkerFunction function) { 038 this.executor = executor; 039 this.function = function; 040 } 041 042 private final Runnable runner = new Runnable() { 043 @Override 044 public void run() { 045 if (running.get()) { 046 try { 047 function.apply(queue.poll()); 048 } finally { 049 running.set(false); 050 scheduleNext(); 051 } 052 } 053 } 054 }; 055 056 private void scheduleNext() { 057 if (!queue.isEmpty() && running.compareAndSet(false, true)) { 058 try { 059 executor.execute(runner); 060 } catch (Throwable t) { 061 running.set(false); 062 throw t; 063 } 064 } 065 } 066 067 /** 068 * Sends a message to the target worker function. This method returns immediately as message processing is 069 * asynchronous. 070 * 071 * @param message the message of any type. 072 * @return the same port object. 073 */ 074 public Port send(Object message) { 075 queue.offer(message); 076 scheduleNext(); 077 return this; 078 } 079}