123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- package com.rdlze.radializebase.mocks;
- import java.rmi.NotBoundException;
- import java.rmi.RemoteException;
- import java.rmi.registry.LocateRegistry;
- import java.rmi.registry.Registry;
- import java.rmi.server.UnicastRemoteObject;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Queue;
- import java.util.concurrent.ConcurrentLinkedQueue;
- import com.rdlze.radializebase.interfaces.InterfaceNotifiable;
- import com.rdlze.radializeutils.notification.Notification;
- public class MockBroker implements InterfaceNotifiable {
-
- private static final long serialVersionUID = 1L;
- private static InterfaceNotifiable notifiable;
- private List<InterfaceNotifiable> registeredNotifiables;
- public Queue<Notification> queConc;
- String name = "";
- MockNotifiable mn = null;
- public MockBroker(String name){
- this.name=name;
- queConc = new ConcurrentLinkedQueue<Notification>();
- registeredNotifiables = new ArrayList<InterfaceNotifiable>();
- NotifierThread not = new NotifierThread(this);
- not.start();
- try {
- LocateRegistry.createRegistry(15000);
- } catch (Exception e) {
- System.out.println("createRegistryException....");
- }
- try {
- InterfaceNotifiable stub = (InterfaceNotifiable) UnicastRemoteObject.exportObject(
- this, 0);
- Registry registry = LocateRegistry.getRegistry(15000);
- registry.rebind(name, stub);
- System.out.println("MockBroker - OnLine");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- @Override
- public void notify(Notification notification) throws RemoteException {
- System.out.println(this.getName()+" received notification "+notification.getRadioId());
- queConc.add(notification);
- }
- @Override
- public void notify(List list) throws RemoteException {
- System.out.println("received list");
- queConc.addAll(list);
- }
- @Override
- public void registerNotifiable(String IP, String serviceName, int port)
- throws RemoteException {
- try {
- System.out
- .println("COMPONENTE TENTANDO SE REGISTRAR NO BROKEN IP = "
- + IP + " NAME = " + serviceName);
- Registry registry = LocateRegistry.getRegistry(IP, port);
- notifiable = (InterfaceNotifiable) registry.lookup(serviceName);
- registeredNotifiables.add(notifiable);
- System.out.println("Conectou no " + notifiable.getName()
- + " com sucesso");
- System.out.println("COMPONENTE = " + notifiable.getName()
- + " SE CONECTOU NO BROKEN COM SUCESSO");
- } catch (RemoteException e) {
- e.printStackTrace();
- System.out.println((new StringBuilder("RemoteException")).append(e)
- .toString());
- } catch (NotBoundException e) {
- e.printStackTrace();
- System.out.println((new StringBuilder("NotBoun1dException"))
- .append(e).toString());
- } catch (Exception e) {
- e.printStackTrace();
- System.out.println((new StringBuilder("Exception")).append(e)
- .toString());
- }
- }
- @Override
- public String getName() throws RemoteException {
- return this.name;
- }
- public static void main(String[] args){
- new MockNotifiable("teste");
- }
- private Queue<Notification> getQuee() {
- return this.queConc;
- }
- public List<InterfaceNotifiable> getRegistersNotifiable() {
- return this.registeredNotifiables;
- }
- private final class NotifierThread extends Thread {
- MockBroker noticationBroken;
- public NotifierThread(MockBroker noticationBroken) {
- this.noticationBroken = noticationBroken;
- System.out.println("NotifierThread init...");
- }
- public void run() {
- int indexerControllerExceptionCount = 0;
- int streamCrawlerExceptionCount = 0;
- while (true) {
- try {
- if (this.noticationBroken.getQuee().size() > 0) {
- Notification value = null;
- while ((value = (Notification) this.noticationBroken
- .getQuee().poll()) != null) {
- for (int i = this.noticationBroken
- .getRegistersNotifiable().size(); --i >= 0;) {
- try {
- ((InterfaceNotifiable) this.noticationBroken
- .getRegistersNotifiable().get(i))
- .notify(value);
- } catch (RemoteException e) {
- InterfaceNotifiable topNot = this.noticationBroken
- .getRegistersNotifiable().get(i);
- if (topNot.getName().equals(
- "IndexerControler"))
- ;
- {
- indexerControllerExceptionCount++;
- if (indexerControllerExceptionCount == 10)
- this.noticationBroken
- .getRegistersNotifiable()
- .remove(i);
- }
- if (topNot.getName()
- .equals("StreamCrawler"))
- ;
- {
- streamCrawlerExceptionCount++;
- if (streamCrawlerExceptionCount == 10)
- this.noticationBroken
- .getRegistersNotifiable()
- .remove(i);
- }
- System.out.println("Error: "
- + e.getMessage());
- e.printStackTrace();
- } catch (Exception e) {
- System.out.println(e.getMessage());
- }
- }
- }
- } else
- Thread.sleep(100);
- } catch (Exception e) {
- System.out.println("Error: " + e.getMessage());
- e.printStackTrace();
- }
- }
- }
- }
- }
|