001/*
002 * Copyright (c) 2012-2021 Institut National des Sciences Appliquées de Lyon (INSA Lyon) and others
003 *
004 * This program and the accompanying materials are made available under the
005 * terms of the Eclipse Public License 2.0 which is available at
006 * http://www.eclipse.org/legal/epl-2.0.
007 *
008 * SPDX-License-Identifier: EPL-2.0
009 */
010
011package gololang.concurrent.workers;
012
013import java.util.concurrent.ConcurrentLinkedQueue;
014import java.util.concurrent.ExecutorService;
015import java.util.concurrent.atomic.AtomicBoolean;
016
017/**
018 * A port is the communication endpoint to a worker function.
019 * <p>
020 * A port is obtained from a worker environment when spawning a function. It can then be used to send messages that
021 * will be eventually processed by the target function. Messages are being put in a first-in, first-out queue.
022 */
023public final class Port {
024
025  private final ExecutorService executor;
026  private final WorkerFunction function;
027
028  private final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<>();
029  private final AtomicBoolean running = new AtomicBoolean(false);
030
031  /**
032   * Port constructor.
033   *
034   * @param executor the executor to dispatch the asynchronous message handling jobs to.
035   * @param function the target worker function.
036   */
037  public Port(ExecutorService executor, WorkerFunction function) {
038    this.executor = executor;
039    this.function = function;
040  }
041
042  private final Runnable runner = new Runnable() {
043    @Override
044    public void run() {
045      if (running.get()) {
046        try {
047          function.apply(queue.poll());
048        } finally {
049          running.set(false);
050          scheduleNext();
051        }
052      }
053    }
054  };
055
056  private void scheduleNext() {
057    if (!queue.isEmpty() && running.compareAndSet(false, true)) {
058      try {
059        executor.execute(runner);
060      } catch (Throwable t) {
061        running.set(false);
062        throw t;
063      }
064    }
065  }
066
067  /**
068   * Sends a message to the target worker function. This method returns immediately as message processing is
069   * asynchronous.
070   *
071   * @param message the message of any type.
072   * @return the same port object.
073   */
074  public Port send(Object message) {
075    queue.offer(message);
076    scheduleNext();
077    return this;
078  }
079}