2016-08-03 3 views
4

Ich habe ein Problem, wenn meine Verbindung eingerichtet, mqtt veröffentlichen nicht beim Reconnect senden, wie es zu lösen? Im folgenden this answer aber nicht funktioniertWie zu speichern Daten mqtt wenn offline und senden sie, wenn online

Was ich gemacht habe:

  • Im bereits einen Dienst MQTT implementieren GPS-Position und arbeiten wie gewohnt, wenn Sie online zu senden.
  • Setzen Sie Qos auf 1.
  • Legen Sie ClientId fest.
  • Set veröffentlichen Qos auf 1.
  • Set sauber Sitzung auf false

Aber das Ergebnis, wenn ich noch Daten veröffentlichen wieder verbinden, wenn ich online & nicht gespeichert Persistenz Daten zu veröffentlichen.

Hier ist meine Quellcode:

package id.trustudio.android.mdm.service; 

import android.app.Service; 
import android.content.Context; 
import android.content.Intent; 
import android.content.SharedPreferences; 
import android.content.pm.ApplicationInfo; 
import android.content.pm.PackageManager; 
import android.net.TrafficStats; 
import android.os.Handler; 
import android.os.IBinder; 
import android.support.annotation.Nullable; 
import android.util.Log; 

import org.eclipse.paho.android.service.MqttAndroidClient; 
import org.eclipse.paho.client.mqttv3.IMqttActionListener; 
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 
import org.eclipse.paho.client.mqttv3.IMqttToken; 
import org.eclipse.paho.client.mqttv3.MqttCallback; 
import org.eclipse.paho.client.mqttv3.MqttClient; 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 
import org.eclipse.paho.client.mqttv3.MqttException; 
import org.eclipse.paho.client.mqttv3.MqttMessage; 
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 

import java.io.UnsupportedEncodingException; 

import id.trustudio.android.mdm.http.DetectConnection; 
import id.trustudio.android.mdm.util.Cons; 
import id.trustudio.android.mdm.util.Debug; 
import id.trustudio.android.mdm.util.GPSTracker; 
import id.trustudio.android.mdm.util.GPSTracker2; 

public class MqttService extends Service implements MqttCallback { 

    public static boolean isStarted = false; 

    private double latitude = 0; 
    private double longitude = 0; 
    private GPSTracker mGPSTracker; 
    private GPSTracker2 mGPSTracker2; 

    boolean isInternetPresent = false; 

    private SharedPreferences mPrivatePref; 
    private SharedPreferences.Editor editor; 

    private DetectConnection mDetectConnection; 
    String deviceID,Name; 
    int totalbyte; 
    String packages; 
    MemoryPersistence persistence; 
    String clientId; 
    MqttAndroidClient client; 

    @Nullable 
    @Override 
    public IBinder onBind(Intent intent) { 
     return null; 
    } 

    @Override 
    public void onCreate() { 
     super.onCreate(); 

     mPrivatePref = this.getSharedPreferences(Cons.PRIVATE_PREF, Context.MODE_PRIVATE); 
     editor = mPrivatePref.edit(); 

     deviceID = mPrivatePref.getString(Cons.APP_PACKAGE + "deviceid", ""); 
     Name = mPrivatePref.getString(Cons.APP_PACKAGE + "user", ""); 

     clientId = MqttClient.generateClientId(); 
     persistence = new MemoryPersistence(); 

     client = 
       new MqttAndroidClient(getApplicationContext(), "tcp://broker.administrasi.id:1883", 
         clientId, persistence); 

     client.setCallback(this); 

     try{ 
      MqttConnectOptions connOpts = new MqttConnectOptions(); 
      connOpts.setCleanSession(false); 
      client.connect(connOpts,null, new IMqttActionListener() { 

         @Override 
         public void onSuccess(IMqttToken asyncActionToken) { 

         } 

         @Override 
         public void onFailure(IMqttToken asyncActionToken, Throwable exception) { 

         } 
        }); 
     }catch (Exception e){ 
      e.printStackTrace(); 
     } 

     mHandler.postDelayed(mUpdateTask, 1000); 
    } 


    public int onStartCommand(Intent intent, int flags, int startId) { 

     int res = super.onStartCommand(intent, flags, startId); 

     //check if your service is already started 
     if (isStarted){  //yes - do nothing 
      return Service.START_STICKY; 
     } else {    //no 
      isStarted = true; 
     } 

     return Service.START_STICKY; 

    } 

    private Handler mHandler = new Handler(); 
    private Runnable mUpdateTask = new Runnable() { 
     public void run() { 

      getLatLng(); 
      if (latitude == 0.0 || longitude == 0.0) getLatLngWifi(); 

         Debug.e("MQTT","Connect"); 
         String topic = "gps/kodeupi/kodeap/kodeup/" + deviceID; 
         Debug.e("MQTT CLIENT", clientId); 
         int qos = 1; 
         try { 
          IMqttToken subToken = client.subscribe(topic, qos); 
          subToken.setActionCallback(new IMqttActionListener() { 
           @Override 
           public void onSuccess(IMqttToken asyncActionToken) { 
            // The message was published 

            String topic = "gps/kodeupi/kodeap/kodeup/" + deviceID; 
            long CurrentTime = System.currentTimeMillis(); 

            String payload = deviceID + "|" + latitude + "|" + longitude + "|" + CurrentTime; 

            byte[] encodedPayload = new byte[0]; 
            try { 
             encodedPayload = payload.getBytes("UTF-8"); 
             MqttMessage message = new MqttMessage(encodedPayload); 
             client.publish(topic, message); 
             message.setRetained(true); 
             // set quality of service 
             message.setQos(1); 
             Log.d("TAG", "onSuccess"); 
            } catch (UnsupportedEncodingException | MqttException e) { 
             e.printStackTrace(); 
            } 
           } 

           @Override 
           public void onFailure(IMqttToken asyncActionToken, 
                 Throwable exception) { 
            // The subscription could not be performed, maybe the user was not 
            // authorized to subscribe on the specified topic e.g. using wildcards 

           } 
          }); 
         } catch (MqttException e) { 
          e.printStackTrace(); 
         } 

      mHandler.postDelayed(this, 20000); 
     } 
    }; 

    private void getLatLng() { 
     mGPSTracker2  = new GPSTracker2(this); 
     isInternetPresent = mDetectConnection.isConnectingToInternet(); 
     if (isInternetPresent == true) { 
      if (mGPSTracker2.canGetLocation()) { 
       latitude = mGPSTracker2.getLatitude(); 
       longitude = mGPSTracker2.getLongitude(); 

       if(latitude != 0.0 && longitude != 0.0) { 
        editor.putString(Cons.APP_LATITUDE, latitude+""); 
        editor.putString(Cons.APP_LONGITUDE, longitude+""); 
        editor.commit(); 
       } 
      } else { 
//    getLatLngWifi(); 
       Debug.i(Cons.TAG, "on gps failed, please check"); 

      } 
     } else { 
      Debug.i(Cons.TAG, "no connection"); 

      if(mGPSTracker2 != null) 
       mGPSTracker2.stopUsingGPS(); 
     } 
    } 

    private void getLatLngWifi() { 
     mGPSTracker   = new GPSTracker(this); 
     isInternetPresent = mDetectConnection.isConnectingToInternet(); 
     if (isInternetPresent == true) { 
      if (mGPSTracker.canGetLocation()) { 
       latitude = mGPSTracker.getLatitude(); 
       longitude = mGPSTracker.getLongitude(); 

       if(latitude != 0.0 && longitude != 0.0) { 
        editor.putString(Cons.APP_LATITUDE, latitude+""); 
        editor.putString(Cons.APP_LONGITUDE, longitude+""); 
        editor.commit(); 
       } 

      } else { 
       Debug.i(Cons.TAG, "wifi " + "on gps failed, please check"); 

      } 
     } else { 
      Debug.i(Cons.TAG, "wifi " + "no connection"); 

      if(mGPSTracker != null) 
       mGPSTracker.stopUsingGPS(); 
     } 
    } 

    @Override 
    public void connectionLost(Throwable cause) { 

    } 

    @Override 
    public void messageArrived(String topic, MqttMessage message) throws Exception { 

    } 

    @Override 
    public void deliveryComplete(IMqttDeliveryToken token) { 

    } 
} 

Sorry für mein schlechtes Englisch

+0

Die Antwort in der zu Frage verknüpft, wie Nachrichten an einen Kunden geliefert zu bekommen, dass wurde getrennt, nicht wie Nachrichten von einem getrennten Client zu senden, wenn es erneut verbindet. Wenn Sie versuchen, eine Nachricht zu veröffentlichen, während der Client nicht verbunden ist, wird eine Ausnahme ausgelöst. Sie müssen diese Ausnahme abfangen und die Nachricht speichern, damit sie erneut gesendet wird. – hardillb

+0

Auch Sie setzen die QOS und beibehaltenen Flags, nachdem die Nachricht veröffentlicht wurde, dies wird nicht funktionieren, da die Nachricht bereits weg ist. – hardillb

+0

Gibt es irgendwelche Callbacks für die Bibliothek, oder muss ich dafür mein eigenes Skript implementieren? –

Antwort

1

Wie in den Kommentaren gelegt.

Dies ist etwas, das Sie selbst codieren müssen. Es gibt keine Unterstützung für das Speichern von Nachrichten, die nicht gesendet wurden, da der Client im Framework nicht verbunden ist. Die MQTT-Persistenz wird nur verwendet, um sicherzustellen, dass Nachrichten mit QOS 1/2 nicht verloren gehen, wenn die Verbindung zum Broker unterbrochen wird, bevor der QOS-Handshake abgeschlossen ist.

Wenn Sie versuchen, eine Nachricht zu veröffentlichen, während die Verbindung unterbrochen wird, löst der Aufruf client.publish(topic, message) eine Ausnahme aus. Sie müssen diese Ausnahme abfangen und dann den Inhalt der Nachricht speichern, wenn die Verbindung wiederhergestellt wird. Sobald eine Verbindung wieder hergestellt ist und läuft, müssen Sie die gespeicherten Details durchlaufen und das Senden erneut versuchen.

1

Also hier das Beispiel, als hardillb Antwort Im machen mein eigenes implementieren, um Daten in der lokalen Datenbank zu speichern und alle bei der Verbindung neu zu veröffentlichen.

hier meine Quellcode

private Handler mHandler = new Handler(); 
     private Runnable mUpdateTask = new Runnable() { 
     public void run() { 

      getLatLng(); 
      if (latitude == 0.0 || longitude == 0.0) getLatLngWifi(); 

         Debug.e("MQTT","Connect"); 
         String topic = "gps/kodeupi/kodeap/kodeup/" + deviceID; 
         Debug.e("MQTT CLIENT", clientId); 
         int qos = 1; 
         try { 
          IMqttToken subToken = client.subscribe(topic, qos); 
          subToken.setActionCallback(new IMqttActionListener() { 
           @Override 
           public void onSuccess(IMqttToken asyncActionToken) { 
            // The message was published 
            mList = getLocationAll();//call all data stored on sqlite 
         Debug.e("MQTT","Connected. Size list = "+mList.size()); 

         if(mList.size() > 0){//if data found then send in looping 
            for (int i = 0; i < mList.size() ; i++) { 
            final String Latitude = mList.get(i).latitude; 
            final String Longitude = mList.get(i).longitude; 
            final String timestamps = mList.get(i).CurrentTimes; 

            String payload = deviceID + "|" + timestamps + "|" + Name + "|" + Latitude + "|" + Longitude; 

           byte[] encodedPayload = new byte[0]; 
           try { 
            encodedPayload = payload.getBytes("UTF-8"); 
            MqttMessage message = new MqttMessage(encodedPayload); 
            // set quality of service 
            client.publish(topic, message); 
           } catch (UnsupportedEncodingException | MqttException e) { 
            e.printStackTrace(); 
           } 
           } 

           DeleteAllLocation(); 

          } 
            String topic = "gps/kodeupi/kodeap/kodeup/" + deviceID; 
            long CurrentTime = System.currentTimeMillis(); 

            String payload = deviceID + "|" + latitude + "|" + longitude + "|" + CurrentTime; 

            byte[] encodedPayload = new byte[0]; 
            try { 
             encodedPayload = payload.getBytes("UTF-8"); 
             MqttMessage message = new MqttMessage(encodedPayload); 
             client.publish(topic, message); 
             message.setRetained(true); 
             // set quality of service 
             message.setQos(1); 
             Log.d("TAG", "onSuccess"); 
            } catch (UnsupportedEncodingException | MqttException e) { 
             e.printStackTrace(); 
            } 
           } 

           @Override 
           public void onFailure(IMqttToken asyncActionToken, 
                 Throwable exception) { 
            // The subscription could not be performed, maybe the user was not 
            // authorized to subscribe on the specified topic e.g. using wildcards 
            long CurrentTime = System.currentTimeMillis(); 
            addLocation(deviceID, CurrentTime+"", Name,  latitude+"" , longitude+""); //add data to sqlite when offline 
            Debug.e("MQTT","Failure"); 

           } 
          }); 
         } catch (MqttException e) { 
          e.printStackTrace(); 
         } 

      mHandler.postDelayed(this, 20000); 
     } 
    }; 

hier meine SQLite speichern & Daten löschen

public void addLocation(String device_id, String timestamp, String user_id, String latitude, String longitude) { 
    if (sqLite == null) { 
     Debug.i(Cons.TAG, "null database"); 
     return; 
    } 

    ContentValues values = new ContentValues(); 

    values.put("device_id", device_id); 
    values.put("timestamp", timestamp); 
    values.put("user_id", user_id); 
    values.put("latitude", latitude); 
    values.put("longitude", longitude); 

    Debug.i(Cons.TAG, "Insert location : title = " + device_id); 

    sqLite.insert("tbl_location", null, values); 
} 

public ArrayList<LocationModel> getLocationAll() { 
    ArrayList<LocationModel> result = new ArrayList<LocationModel>(); 
    if (sqLite == null || result == null) { 
     return result; 
    } 

    String sql = "SELECT * FROM tbl_location ORDER BY timestamp ASC"; 

    Cursor c = sqLite.rawQuery(sql, null); 

    int device_id  = c.getColumnIndex("device_id"); 
    int timestamp  = c.getColumnIndex("timestamp"); 
    int userid   = c.getColumnIndex("user_id"); 
    int latitude  = c.getColumnIndex("latitude"); 
    int longitude   = c.getColumnIndex("longitude"); 

    if (c != null) { 
     if (c.moveToFirst()) { 

      while (c.isAfterLast() == false) { 
       LocationModel mApps = new LocationModel(); 

       mApps.DeviceId  = c.getInt(device_id); 
       mApps.CurrentTimes = c.getString(timestamp); 
       mApps.UserId  = c.getString(userid); 
       mApps.latitude  = c.getString(latitude); 
       mApps.longitude  = c.getString(longitude); 

       result.add(mApps); 

       c.moveToNext(); 
      } 

      c.close(); 
     } 
    } 

    return result; 
} 

public void DeleteAllLocation() { 
    if (sqLite == null) 
     return; 

    sqLite.delete("tbl_location", null, null); 
} 
Verwandte Themen