001/*
002 * Copyright (c) 2012-2017 Institut National des Sciences Appliquées de Lyon (INSA-Lyon)
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 */
009
010package gololang.concurrent.workers;
011
012import java.util.concurrent.ConcurrentLinkedQueue;
013import java.util.concurrent.ExecutorService;
014import java.util.concurrent.atomic.AtomicBoolean;
015
016/**
017 * A port is the communication endpoint to a worker function.
018 * <p>
019 * A port is obtained from a worker environment when spawning a function. It can then be used to send messages that
020 * will be eventually processed by the target function. Messages are being put in a first-in, first-out queue.
021 */
022public final class Port {
023
024  private final ExecutorService executor;
025  private final WorkerFunction function;
026
027  private final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<>();
028  private final AtomicBoolean running = new AtomicBoolean(false);
029
030  /**
031   * Port constructor.
032   *
033   * @param executor the executor to dispatch the asynchronous message handling jobs to.
034   * @param function the target worker function.
035   */
036  public Port(ExecutorService executor, WorkerFunction function) {
037    this.executor = executor;
038    this.function = function;
039  }
040
041  private final Runnable runner = new Runnable() {
042    @Override
043    public void run() {
044      if (running.get()) {
045        try {
046          function.apply(queue.poll());
047        } finally {
048          running.set(false);
049          scheduleNext();
050        }
051      }
052    }
053  };
054
055  private void scheduleNext() {
056    if (!queue.isEmpty() && running.compareAndSet(false, true)) {
057      try {
058        executor.execute(runner);
059      } catch (Throwable t) {
060        running.set(false);
061        throw t;
062      }
063    }
064  }
065
066  /**
067   * Sends a message to the target worker function. This method returns immediately as message processing is
068   * asynchronous.
069   *
070   * @param message the message of any type.
071   * @return the same port object.
072   */
073  public Port send(Object message) {
074    queue.offer(message);
075    scheduleNext();
076    return this;
077  }
078}