Lets take a simple example of how we can stream data from a Jini Server to a Jini Client. Data Streaming means that we can send huge, unbounded data to a consumer. In this example, we will read from a DataStore and directly send it to the client.
We have the following simple api:
public interface BankDetailStreamingService extends Remote { void streamAllBankDetails(RemoteDataListenerbankDetailRemoteListener) throws RemoteException; }
The RemoteDataListener is a remote call-back where the data would be published from the Jini server as it becomes available.
public interface RemoteDataListenerextends Remote { void newData(T data) throws RemoteException; void endOfData() throws RemoteException; }
Note that this is a Remote interface as well.
The server implementation is very simple: its a straight delegation to the Dao.
@Service public class BankDetailStreamingServiceImpl implements BankDetailStreamingService { private final BankDetailStreamingDao bankDetailStreamingDao; @Autowired public BankDetailStreamingServiceImpl(BankDetailStreamingDao bankDetailStreamingDao) { this.bankDetailStreamingDao = bankDetailStreamingDao; } @Override public void streamAllBankDetails(RemoteDataListenerbankDetailRemoteListener) { bankDetailStreamingDao.streamAllBankDetails(bankDetailRemoteListener); } }
The Dao just pushes data to the RemoteDataListener as shown below:
@Repository public class BankDetailStreamingDaoImpl implements BankDetailStreamingDao { private final JdbcTemplate jdbcTemplate; @Autowired public BankDetailStreamingDaoImpl(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } @Override public void streamAllBankDetails(RemoteDataListenerbankDetailRemoteListener) { RowMapper rowMapper = new BankDetailMapper(); jdbcTemplate.query("select * from bank_details", new ResultSetExtractor () { @Override public Void extractData(ResultSet resultSet) throws SQLException, DataAccessException { while (resultSet.next()) { try { bankDetailRemoteListener.newData(rowMapper.mapRow(resultSet, -1)); } catch (RemoteException e) { throw new RuntimeException(e); } } try { bankDetailRemoteListener.endOfData(); } catch (RemoteException e) { throw new RuntimeException(e); } return null; } }); } }
The trick really is in the client implementation. In this example, since the RemoteDataListener is a Remote listener, it is an exported Jini service which lives on the client. This is an example where the Jini Client and Jini Server swaps roles and the client becomes the server. Just to illustrate our point, we have created a very simple client which just writes the data received on to the console.
public class RemoteDataListenerConsolePrinterImplimplements RemoteDataListener { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteDataListenerConsolePrinterImpl.class); @Override public void newData(T data) { LOGGER.info("New data recieved: {}", data); } @Override public void endOfData() { LOGGER.info("End of data, exiting..."); System.exit(0); } }
Then, we have a simple class with the main method to invoke the streaming service:
public class SimpleStreamingClient { public static void main(String[] args) throws RemoteException { SpringContextHelper springContextHelper = new SpringContextHelper(); RemoteDataListenerremoteDataListener = new RemoteDataListenerConsolePrinterImpl<>(); Exporter exporter = new BasicJeriExporter(TcpServerEndpoint.getInstance(0), new BasicILFactory()); @SuppressWarnings("unchecked") RemoteDataListener exportedRemoteDataListener = (RemoteDataListener ) exporter.export(remoteDataListener); BankDetailStreamingService bankDetailStreamingService = springContextHelper.getBankDetailStreamingService(); bankDetailStreamingService.streamAllBankDetails(exportedRemoteDataListener); } }
Note that just before invoking the streaming service and passing the RemoteDataListener, it is being duly exported as a Jini service and the exported RemoteDataListener is then passed to the streaming service. That is the trick, really.
The SpringContextHelper is a simple class to load up Spring Context and help look up remote services:
public class SpringContextHelper { private static final Logger LOG = LoggerFactory.getLogger(SpringContextHelper.class); private final BankDetailService bankDetailService; private final BankDetailStreamingService bankDetailStreamingService; public SpringContextHelper() { String policyFilePath = SpringContextHelper.class.getResource("/policy.all").getFile(); LOG.info("Starting with the policy file {}", policyFilePath); System.setProperty("java.security.policy", policyFilePath); if (System.getSecurityManager() == null) { System.setSecurityManager(new SecurityManager()); } try (ConfigurableApplicationContext context = new AnnotationConfigApplicationContext(RmiClientConfig.class)) { bankDetailService = context.getBean("bankDetailService", BankDetailService.class); bankDetailStreamingService = context.getBean("bankDetailStreamingService", BankDetailStreamingService.class); } } public BankDetailService getBankDetailService() { return bankDetailService; } public BankDetailStreamingService getBankDetailStreamingService() { return bankDetailStreamingService; } }
We have embedded the Reggie in, so simply run the SpringNonSecureRmiServer to start the Reggie and the Jini Server. After the Jini Server starts up, run the SimpleStreamingClient. You would see the following on the console:
INFO [(JSK) mux request dispatch] RemoteDataListenerConsolePrinterImpl.newData(14) | New data recieved: BankDetail [id=4519, age=57, job=technician, marital=married, education=secondary, defaulted=no, balance=295, housing=no, loan=no, contact=cellular, day=19, month=aug, duration=151, campaign=11, pdays=-1, previous=0, poutcome=unknown, y=no] INFO [(JSK) mux request dispatch] RemoteDataListenerConsolePrinterImpl.newData(14) | New data recieved: BankDetail [id=4520, age=28, job=blue-collar, marital=married, education=secondary, defaulted=no, balance=1137, housing=no, loan=no, contact=cellular, day=6, month=feb, duration=129, campaign=4, pdays=211, previous=3, poutcome=other, y=no] INFO [(JSK) mux request dispatch] RemoteDataListenerConsolePrinterImpl.newData(14) | New data recieved: BankDetail [id=4521, age=44, job=entrepreneur, marital=single, education=tertiary, defaulted=no, balance=1136, housing=yes, loan=yes, contact=cellular, day=3, month=apr, duration=345, campaign=2, pdays=249, previous=7, poutcome=other, y=no] INFO [(JSK) mux request dispatch] RemoteDataListenerConsolePrinterImpl.endOfData(19) | End of data, exiting...
The sources can be found here: https://github.com/paawak/blog/tree/master/code/jini/unsecure/streaming-with-jini
There are 3 Maven projects under that: