Açıklama Yok

1a0ab60de0253ca6e3a1112669127ae163ab4bb5.svn-base 5.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package com.rdlze.radializebase.mocks;
  2. import java.rmi.NotBoundException;
  3. import java.rmi.RemoteException;
  4. import java.rmi.registry.LocateRegistry;
  5. import java.rmi.registry.Registry;
  6. import java.rmi.server.UnicastRemoteObject;
  7. import java.util.ArrayList;
  8. import java.util.List;
  9. import java.util.Queue;
  10. import java.util.concurrent.ConcurrentLinkedQueue;
  11. import com.rdlze.radializebase.interfaces.InterfaceNotifiable;
  12. import com.rdlze.radializeutils.notification.Notification;
  13. public class MockBroker implements InterfaceNotifiable {
  14. /**
  15. *
  16. */
  17. private static final long serialVersionUID = 1L;
  18. private static InterfaceNotifiable notifiable;
  19. private List<InterfaceNotifiable> registeredNotifiables;
  20. public Queue<Notification> queConc;
  21. String name = "";
  22. MockNotifiable mn = null;
  23. public MockBroker(String name){
  24. this.name=name;
  25. queConc = new ConcurrentLinkedQueue<Notification>();
  26. registeredNotifiables = new ArrayList<InterfaceNotifiable>();
  27. NotifierThread not = new NotifierThread(this); // Thread "consumidor"
  28. not.start();
  29. try {
  30. LocateRegistry.createRegistry(15000);
  31. } catch (Exception e) {
  32. System.out.println("createRegistryException....");
  33. }
  34. try {
  35. InterfaceNotifiable stub = (InterfaceNotifiable) UnicastRemoteObject.exportObject(
  36. this, 0);
  37. Registry registry = LocateRegistry.getRegistry(15000);
  38. registry.rebind(name, stub);
  39. System.out.println("MockBroker - OnLine");
  40. } catch (Exception e) {
  41. e.printStackTrace();
  42. }
  43. }
  44. @Override
  45. public void notify(Notification notification) throws RemoteException {
  46. System.out.println(this.getName()+" received notification "+notification.getRadioId());
  47. queConc.add(notification);
  48. }
  49. @Override
  50. public void notify(List list) throws RemoteException {
  51. System.out.println("received list");
  52. queConc.addAll(list);
  53. }
  54. @Override
  55. public void registerNotifiable(String IP, String serviceName, int port)
  56. throws RemoteException {
  57. try {
  58. System.out
  59. .println("COMPONENTE TENTANDO SE REGISTRAR NO BROKEN IP = "
  60. + IP + " NAME = " + serviceName);
  61. Registry registry = LocateRegistry.getRegistry(IP, port);
  62. notifiable = (InterfaceNotifiable) registry.lookup(serviceName);
  63. registeredNotifiables.add(notifiable);
  64. System.out.println("Conectou no " + notifiable.getName()
  65. + " com sucesso");
  66. System.out.println("COMPONENTE = " + notifiable.getName()
  67. + " SE CONECTOU NO BROKEN COM SUCESSO");
  68. } catch (RemoteException e) {
  69. e.printStackTrace();
  70. System.out.println((new StringBuilder("RemoteException")).append(e)
  71. .toString());
  72. } catch (NotBoundException e) {
  73. e.printStackTrace();
  74. System.out.println((new StringBuilder("NotBoun1dException"))
  75. .append(e).toString());
  76. } catch (Exception e) {
  77. e.printStackTrace();
  78. System.out.println((new StringBuilder("Exception")).append(e)
  79. .toString());
  80. }
  81. }
  82. @Override
  83. public String getName() throws RemoteException {
  84. return this.name;
  85. }
  86. public static void main(String[] args){
  87. new MockNotifiable("teste");
  88. }
  89. private Queue<Notification> getQuee() {
  90. return this.queConc;
  91. }
  92. public List<InterfaceNotifiable> getRegistersNotifiable() {
  93. return this.registeredNotifiables;
  94. }
  95. private final class NotifierThread extends Thread {
  96. MockBroker noticationBroken;
  97. public NotifierThread(MockBroker noticationBroken) {
  98. this.noticationBroken = noticationBroken;
  99. System.out.println("NotifierThread init...");
  100. }
  101. public void run() {
  102. int indexerControllerExceptionCount = 0;
  103. int streamCrawlerExceptionCount = 0;
  104. while (true) {
  105. try {
  106. if (this.noticationBroken.getQuee().size() > 0) {
  107. Notification value = null;
  108. while ((value = (Notification) this.noticationBroken
  109. .getQuee().poll()) != null) {
  110. for (int i = this.noticationBroken
  111. .getRegistersNotifiable().size(); --i >= 0;) {
  112. try {
  113. ((InterfaceNotifiable) this.noticationBroken
  114. .getRegistersNotifiable().get(i))
  115. .notify(value);
  116. } catch (RemoteException e) {
  117. InterfaceNotifiable topNot = this.noticationBroken
  118. .getRegistersNotifiable().get(i);
  119. if (topNot.getName().equals(
  120. "IndexerControler"))
  121. ;
  122. {
  123. indexerControllerExceptionCount++;
  124. if (indexerControllerExceptionCount == 10)
  125. this.noticationBroken
  126. .getRegistersNotifiable()
  127. .remove(i);
  128. }
  129. if (topNot.getName()
  130. .equals("StreamCrawler"))
  131. ;
  132. {
  133. streamCrawlerExceptionCount++;
  134. if (streamCrawlerExceptionCount == 10)
  135. this.noticationBroken
  136. .getRegistersNotifiable()
  137. .remove(i);
  138. }
  139. System.out.println("Error: "
  140. + e.getMessage());
  141. e.printStackTrace();
  142. } catch (Exception e) {
  143. System.out.println(e.getMessage());
  144. }
  145. }
  146. }
  147. } else
  148. Thread.sleep(100);
  149. } catch (Exception e) {
  150. System.out.println("Error: " + e.getMessage());
  151. e.printStackTrace();
  152. }
  153. }
  154. }
  155. }
  156. }